5 changed files with 176 additions and 182 deletions
@ -1,12 +1,176 @@
|
||||
from flask import Flask |
||||
from flask_login import LoginManager |
||||
import os.path |
||||
import os |
||||
from TreeWalker import TreeWalker |
||||
from functools import wraps |
||||
from mimetypes import guess_type |
||||
from random import shuffle |
||||
|
||||
from flask import Flask, Response, abort, json, jsonify, request |
||||
from flask_login import current_user, login_user, logout_user |
||||
|
||||
from .process import send_process |
||||
from .jsonp import jsonp |
||||
from .login import admin_user, load_user |
||||
from .login import login_manager |
||||
|
||||
app = Flask(__name__) |
||||
app.config.from_pyfile( |
||||
os.path.join(os.path.dirname(os.path.abspath(__file__)), "app.cfg")) |
||||
|
||||
login_manager = LoginManager() |
||||
import login |
||||
login_manager.setup_app(app) |
||||
import endpoints |
||||
|
||||
cwd = os.path.dirname(os.path.abspath(__file__)) |
||||
permission_map = app.config.get('PERMISSION_MAP', []) |
||||
|
||||
|
||||
def accel_redirect(internal, real, relative_name): |
||||
real_path = os.path.join(real, relative_name) |
||||
internal_path = os.path.join(internal, relative_name) |
||||
if not os.path.isfile(real_path): |
||||
abort(404) |
||||
mimetype = None |
||||
types = guess_type(real_path) |
||||
if len(types) != 0: |
||||
mimetype = types[0] |
||||
response = Response(mimetype=mimetype) |
||||
response.headers.add("X-Accel-Redirect", internal_path) |
||||
response.cache_control.public = True |
||||
if mimetype == "application/json": |
||||
response.cache_control.max_age = 3600 |
||||
else: |
||||
response.cache_control.max_age = 29030400 |
||||
return response |
||||
|
||||
|
||||
def cache_base(path): |
||||
path = path.replace( |
||||
'/', '-').replace( |
||||
' ', '_').replace( |
||||
'(', '').replace( |
||||
'&', '').replace( |
||||
',', '').replace( |
||||
')', '').replace( |
||||
'#', '').replace( |
||||
'[', '').replace( |
||||
']', '').replace( |
||||
'"', '').replace( |
||||
"'", '').replace( |
||||
'_-_', '-').lower() |
||||
|
||||
while path.find("--") != -1: |
||||
path = path.replace("--", "-") |
||||
|
||||
while path.find("__") != -1: |
||||
path = path.replace("__", "_") |
||||
|
||||
if len(path) == 0: |
||||
path = "root" |
||||
|
||||
return path |
||||
|
||||
|
||||
def has_permission(path): |
||||
if not current_user.is_anonymous and current_user.is_admin: |
||||
return True |
||||
|
||||
for auth_path in permission_map.keys(): |
||||
# this is a protected object |
||||
if (path.startswith(auth_path) or |
||||
path.startswith(cache_base(auth_path))): |
||||
if current_user.is_anonymous: |
||||
return False |
||||
if current_user.id in permission_map.get(auth_path, []): |
||||
return True |
||||
else: |
||||
return False |
||||
return True |
||||
|
||||
|
||||
def admin_required(fn): |
||||
@wraps(fn) |
||||
def decorated_view(*args, **kwargs): |
||||
if (query_is_admin_user(request.args) or |
||||
(current_user.is_authenticated and current_user.admin)): |
||||
return fn(*args, **kwargs) |
||||
return app.login_manager.unauthorized() |
||||
return decorated_view |
||||
|
||||
|
||||
def query_is_admin_user(query): |
||||
username = query.get("username", None) |
||||
password = query.get("password", None) |
||||
return (username == app.config["ADMIN_USERNAME"] and |
||||
password == app.config["ADMIN_PASSWORD"]) |
||||
|
||||
|
||||
@app.route("/scan") |
||||
@admin_required |
||||
def scan_photos(): |
||||
global cwd |
||||
response = send_process([ |
||||
"stdbuf", |
||||
"-oL", |
||||
os.path.abspath(os.path.join(cwd, "../main.py")), |
||||
os.path.abspath(app.config["ALBUM_PATH"]), |
||||
os.path.abspath(app.config["CACHE_PATH"]) |
||||
], |
||||
os.path.join(cwd, "scanner.pid")) |
||||
response.headers.add("X-Accel-Buffering", "no") |
||||
response.cache_control.no_cache = True |
||||
return response |
||||
|
||||
|
||||
@app.route("/auth") |
||||
def login(): |
||||
if 'logout' in request.args: |
||||
logout_user() |
||||
|
||||
if current_user.is_authenticated: |
||||
logout_user() |
||||
|
||||
if (query_is_admin_user(request.form) or |
||||
query_is_admin_user(request.args)): |
||||
login_user(admin_user, remember=True) |
||||
else: |
||||
user_id = (request.form.get('username') or |
||||
request.args.get('username', None)) |
||||
if user_id: |
||||
login_user(load_user(user_id), remember=True) |
||||
return 'You are now logged in.' |
||||
|
||||
return "" |
||||
|
||||
|
||||
@app.route("/albums/<path:path>") |
||||
def albums(path): |
||||
if not has_permission(path): |
||||
abort(403) |
||||
|
||||
return accel_redirect( |
||||
app.config["ALBUM_ACCEL"], app.config["ALBUM_PATH"], path) |
||||
|
||||
|
||||
@app.route("/cache/<path:path>") |
||||
def cache(path): |
||||
if not has_permission(path): |
||||
abort(403) |
||||
|
||||
return accel_redirect( |
||||
app.config["CACHE_ACCEL"], app.config["CACHE_PATH"], path) |
||||
|
||||
|
||||
@app.route("/photos") |
||||
@jsonp |
||||
def photos(): |
||||
f = open(os.path.join(app.config["CACHE_PATH"], "all_photos.json"), "r") |
||||
photos = json.load(f) |
||||
f.close() |
||||
photos = [photo for photo in photos if has_permission(photo)] |
||||
count = int(request.args.get("count", len(photos))) |
||||
random = request.args.get("random") == "true" |
||||
if random: |
||||
shuffle(photos) |
||||
else: |
||||
photos.reverse() |
||||
response = jsonify(photos=photos[0:count]) |
||||
response.cache_control.no_cache = True |
||||
return response |
||||
|
@ -1,153 +0,0 @@
|
||||
import os |
||||
from mimetypes import guess_type |
||||
from random import shuffle |
||||
|
||||
from flask import Response, abort, json, jsonify, request |
||||
from flask_login import current_user, login_user, logout_user |
||||
|
||||
from floatapp import app |
||||
from floatapp.jsonp import jsonp |
||||
from floatapp.login import (admin_required, admin_user, |
||||
query_is_admin_user, load_user) |
||||
from process import send_process |
||||
from TreeWalker import TreeWalker |
||||
|
||||
cwd = os.path.dirname(os.path.abspath(__file__)) |
||||
permission_map = app.config.get('PERMISSION_MAP', []) |
||||
|
||||
|
||||
@app.route("/scan") |
||||
@admin_required |
||||
def scan_photos(): |
||||
global cwd |
||||
response = send_process([ |
||||
"stdbuf", |
||||
"-oL", |
||||
os.path.abspath(os.path.join(cwd, "../main.py")), |
||||
os.path.abspath(app.config["ALBUM_PATH"]), |
||||
os.path.abspath(app.config["CACHE_PATH"]) |
||||
], |
||||
os.path.join(cwd, "scanner.pid")) |
||||
response.headers.add("X-Accel-Buffering", "no") |
||||
response.cache_control.no_cache = True |
||||
return response |
||||
|
||||
|
||||
@app.route("/auth") |
||||
def login(): |
||||
if 'logout' in request.args: |
||||
logout_user() |
||||
|
||||
if current_user.is_authenticated: |
||||
logout_user() |
||||
|
||||
if (query_is_admin_user(request.form) or |
||||
query_is_admin_user(request.args)): |
||||
login_user(admin_user, remember=True) |
||||
else: |
||||
user_id = (request.form.get('username') or |
||||
request.args.get('username', None)) |
||||
if user_id: |
||||
login_user(load_user(user_id), remember=True) |
||||
return 'You are now logged in.' |
||||
|
||||
return "" |
||||
|
||||
|
||||
@app.route("/albums/<path:path>") |
||||
def albums(path): |
||||
if not has_permission(path): |
||||
abort(403) |
||||
|
||||
return accel_redirect( |
||||
app.config["ALBUM_ACCEL"], app.config["ALBUM_PATH"], path) |
||||
|
||||
|
||||
@app.route("/cache/<path:path>") |
||||
def cache(path): |
||||
if not has_permission(path): |
||||
abort(403) |
||||
|
||||
return accel_redirect( |
||||
app.config["CACHE_ACCEL"], app.config["CACHE_PATH"], path) |
||||
|
||||
|
||||
@app.route("/photos") |
||||
@jsonp |
||||
def photos(): |
||||
f = open(os.path.join(app.config["CACHE_PATH"], "all_photos.json"), "r") |
||||
photos = json.load(f) |
||||
f.close() |
||||
photos = [photo for photo in photos if has_permission(photo)] |
||||
count = int(request.args.get("count", len(photos))) |
||||
random = request.args.get("random") == "true" |
||||
if random: |
||||
shuffle(photos) |
||||
else: |
||||
photos.reverse() |
||||
response = jsonify(photos=photos[0:count]) |
||||
response.cache_control.no_cache = True |
||||
return response |
||||
|
||||
|
||||
def accel_redirect(internal, real, relative_name): |
||||
real_path = os.path.join(real, relative_name) |
||||
internal_path = os.path.join(internal, relative_name) |
||||
if not os.path.isfile(real_path): |
||||
abort(404) |
||||
mimetype = None |
||||
types = guess_type(real_path) |
||||
if len(types) != 0: |
||||
mimetype = types[0] |
||||
response = Response(mimetype=mimetype) |
||||
response.headers.add("X-Accel-Redirect", internal_path) |
||||
response.cache_control.public = True |
||||
if mimetype == "application/json": |
||||
response.cache_control.max_age = 3600 |
||||
else: |
||||
response.cache_control.max_age = 29030400 |
||||
return response |
||||
|
||||
|
||||
def cache_base(path): |
||||
path = path.replace( |
||||
'/', '-').replace( |
||||
' ', '_').replace( |
||||
'(', '').replace( |
||||
'&', '').replace( |
||||
',', '').replace( |
||||
')', '').replace( |
||||
'#', '').replace( |
||||
'[', '').replace( |
||||
']', '').replace( |
||||
'"', '').replace( |
||||
"'", '').replace( |
||||
'_-_', '-').lower() |
||||
|
||||
while path.find("--") != -1: |
||||
path = path.replace("--", "-") |
||||
|
||||
while path.find("__") != -1: |
||||
path = path.replace("__", "_") |
||||
|
||||
if len(path) == 0: |
||||
path = "root" |
||||
|
||||
return path |
||||
|
||||
|
||||
def has_permission(path): |
||||
if not current_user.is_anonymous and current_user.is_admin: |
||||
return True |
||||
|
||||
for auth_path in permission_map.keys(): |
||||
# this is a protected object |
||||
if (path.startswith(auth_path) or |
||||
path.startswith(cache_base(auth_path))): |
||||
if current_user.is_anonymous: |
||||
return False |
||||
if current_user.id in permission_map.get(auth_path, []): |
||||
return True |
||||
else: |
||||
return False |
||||
return True |
Loading…
Reference in new issue