diff --git a/server.py b/server.py index 8d2c466741a1ef8d47cfb9cfb5ccf9ff7a15c83c..63cdb60d90cea0838715eddf859ba84e61b43822 100644 --- a/server.py +++ b/server.py @@ -77,10 +77,6 @@ from ldap import ldapauth mod_endpoints = [] -@app.template_global() -def ismod(*args): - return ('user' in session) - def mod_required(func): mod_endpoints.append(func.__name__) @wraps(func) @@ -92,30 +88,6 @@ def mod_required(func): return func(*args, **kwargs) return decorator -csrf_endpoints = [] - -def csrf_protect(func): - csrf_endpoints.append(func.__name__) - @wraps(func) - def decorator(*args, **kwargs): - if '_csrf_token' in request.values: - token = request.values['_csrf_token'] - elif request.get_json() and ('_csrf_token' in request.get_json()): - token = request.get_json()['_csrf_token'] - else: - token = None - if not ('_csrf_token' in session) or (session['_csrf_token'] != token ) or not token: - return 'csrf test failed', 403 - else: - return func(*args, **kwargs) - return decorator - -@app.url_defaults -def csrf_inject(endpoint, values): - if endpoint not in csrf_endpoints or not session.get('_csrf_token'): - return - values['_csrf_token'] = session['_csrf_token'] - def evalperm(perms): cperms = [] lperms = [] @@ -135,72 +107,7 @@ def evalperm(perms): return cperms return [{'type': 'public'}] -@app.template_filter() -def base64encode(str): - try: - return base64.b64encode(str.encode('UTF-8')).decode('UTF-8') - except: - return '' - -@app.template_filter() -def checkperm(perms, username=None, password=None): - if ismod(): - return True - perms = evalperm(perms) - for perm in perms: - if perm['type'] == 'public': - return True - elif perm['type'] == 'password': - if perm['param1'] == username and perm['param2'] == password: - return True - elif perm['type'] == 'l2p': - if perm['param1'] in session.get('l2p_courses', []): - return True - elif perm['type'] == 'rwth': - if session.get('rwthintern', False): - return True - if 'X-Real-IP' not in request.headers: - continue - ip = ip_address(request.headers['X-Real-IP']) - for net in config['RWTH_IP_RANGES']: - if ip in ip_network(net): - return True - return False - -@app.template_filter() -def permdescr(perms): - perms = evalperm(perms) - public = False - password = False - l2p_courses = [] - rwth_intern = False - fsmpi_intern = False - for perm in perms: - if perm['type'] == 'public': - public = True - elif perm['type'] == 'password': - password = True - elif perm['type'] == 'l2p': - l2p_courses.append(perm['param1']) - elif perm['type'] == 'rwth': - rwth_intern = True - elif perm['type'] == 'fsmpi': - fsmpi_intern = True - if public or not perms: - return 'public', 'Öffentlich verfügbar' - if rwth_intern: - if password: - return 'rwth', 'Nur für RWTH-Angehörige und Nutzer mit Passwort verfügbar' - return 'rwth', 'Nur für RWTH-Angehörige verfügbar' - if fsmpi_intern: - return 'fsmpi', 'Nur für Fachschaftler verfügbar' - if l2p_courses: - if password: - return 'l2p', 'Nur für Teilnehmer der Veranstaltung und Nutzer mit Passwort verfügbar' - return 'l2p', 'Nur für Teilnehmer der Veranstaltung verfügbar' - if password: - return 'password', 'Nur für Nutzer mit Passwort verfügbar' - return 'none', 'Nicht verfügbar' +from template_helper import * app.jinja_env.globals['navbar'] = [] # iconlib can be 'bootstrap' @@ -256,77 +163,6 @@ def dump_error_page(): f.write(text) f.close() -# debian ships jinja2 without this test... -@app.template_test(name='equalto') -def equalto(a,b): - return a == b - -@app.template_filter(name='filterdict') -def jinja2_filterdict(value, attrdel): - v = dict(value) - for a in attrdel: - if a in v: - del v[a] - return dict(v) - -@app.template_filter(name='semester') -def human_semester(s, long=False): - if not s or s == 'zeitlos' or len(s) != 6: - return 'Zeitlos' - year = s[0:4] - semester = s[4:6].upper() - if not year.isdigit() or semester not in ['SS', 'WS']: - print('Invalid semester string "%s"'%s) - return '??' - if not long: - return semester+year[2:] - elif semester == 'SS': - return 'Sommersemester %s'%year - else: - return 'Wintersemester %s/%s'%(year, str(int(year)+1)[2:]) - -@app.template_filter(name='date') -def human_date(d): - return d.strftime('%d.%m.%Y') - -@app.template_filter(name='fulldate') -def human_fulldate(d): - return d.strftime('%a, %d.%m.%Y, %H:%M Uhr') - -@app.template_filter(name='time') -def human_time(d): - return d.strftime('%H:%M') - -@app.template_filter() -def rfc3339(d): - return d.strftime('%Y-%m-%dT%H:%M:%S+02:00') - -@app.template_global() -def get_announcements(minlevel=0): - offset = timedelta() - if ismod(): - offset = timedelta(hours=24) - try: - return query('SELECT * FROM announcements WHERE NOT deleted AND ((time_expire = NULL) OR time_expire > ?) AND (? OR (visible AND time_publish < ?)) AND level >= ? ORDER BY level DESC', datetime.now()-offset, ismod(), datetime.now(), minlevel) - except: - return [] - -@app.template_filter() -def fixnl(s): - # To be remove, as soon as db schema is cleaned-up - return str(s).replace('\n', '<br>') - -@app.template_filter() -def tagid(s): - if not s: - return 'EMPTY' - s = s.replace(' ', '_').lower() - r = '' - for c in s: - if c in string.ascii_lowercase+string.digits+'_': - r = r + c - return r - def genlive(streams): for stream in streams: stream['visible'] = True @@ -694,13 +530,6 @@ def dbstatus(): clusters[cluster].append(host) return render_template('dbstatus.html', clusters=clusters, statuses=status, vars=variables), 200 -@app.template_global() -def is_readonly(): - try: - return show('SHOW GLOBAL STATUS LIKE "wsrep_ready"')['wsrep_ready'] != 'ON' - except: - return True - import edit import feeds import importer diff --git a/template_helper.py b/template_helper.py new file mode 100644 index 0000000000000000000000000000000000000000..9eeab6c463d8b68e7c28a05a0c397b15007d27cf --- /dev/null +++ b/template_helper.py @@ -0,0 +1,174 @@ +from server import * + +@app.template_global() +def ismod(*args): + return ('user' in session) + +csrf_endpoints = [] + +def csrf_protect(func): + csrf_endpoints.append(func.__name__) + @wraps(func) + def decorator(*args, **kwargs): + if '_csrf_token' in request.values: + token = request.values['_csrf_token'] + elif request.get_json() and ('_csrf_token' in request.get_json()): + token = request.get_json()['_csrf_token'] + else: + token = None + if not ('_csrf_token' in session) or (session['_csrf_token'] != token ) or not token: + return 'csrf test failed', 403 + else: + return func(*args, **kwargs) + return decorator + +@app.url_defaults +def csrf_inject(endpoint, values): + if endpoint not in csrf_endpoints or not session.get('_csrf_token'): + return + values['_csrf_token'] = session['_csrf_token'] + +@app.template_filter() +def base64encode(str): + try: + return base64.b64encode(str.encode('UTF-8')).decode('UTF-8') + except: + return '' + +@app.template_filter() +def checkperm(perms, username=None, password=None): + if ismod(): + return True + perms = evalperm(perms) + for perm in perms: + if perm['type'] == 'public': + return True + elif perm['type'] == 'password': + if perm['param1'] == username and perm['param2'] == password: + return True + elif perm['type'] == 'l2p': + if perm['param1'] in session.get('l2p_courses', []): + return True + elif perm['type'] == 'rwth': + if session.get('rwthintern', False): + return True + if 'X-Real-IP' not in request.headers: + continue + ip = ip_address(request.headers['X-Real-IP']) + for net in config['RWTH_IP_RANGES']: + if ip in ip_network(net): + return True + return False + +@app.template_filter() +def permdescr(perms): + perms = evalperm(perms) + public = False + password = False + l2p_courses = [] + rwth_intern = False + fsmpi_intern = False + for perm in perms: + if perm['type'] == 'public': + public = True + elif perm['type'] == 'password': + password = True + elif perm['type'] == 'l2p': + l2p_courses.append(perm['param1']) + elif perm['type'] == 'rwth': + rwth_intern = True + elif perm['type'] == 'fsmpi': + fsmpi_intern = True + if public or not perms: + return 'public', 'Öffentlich verfügbar' + if rwth_intern: + if password: + return 'rwth', 'Nur für RWTH-Angehörige und Nutzer mit Passwort verfügbar' + return 'rwth', 'Nur für RWTH-Angehörige verfügbar' + if fsmpi_intern: + return 'fsmpi', 'Nur für Fachschaftler verfügbar' + if l2p_courses: + if password: + return 'l2p', 'Nur für Teilnehmer der Veranstaltung und Nutzer mit Passwort verfügbar' + return 'l2p', 'Nur für Teilnehmer der Veranstaltung verfügbar' + if password: + return 'password', 'Nur für Nutzer mit Passwort verfügbar' + return 'none', 'Nicht verfügbar' + +# debian ships jinja2 without this test... +@app.template_test(name='equalto') +def equalto(a,b): + return a == b + +@app.template_filter(name='filterdict') +def jinja2_filterdict(value, attrdel): + v = dict(value) + for a in attrdel: + if a in v: + del v[a] + return dict(v) + +@app.template_filter(name='semester') +def human_semester(s, long=False): + if not s or s == 'zeitlos' or len(s) != 6: + return 'Zeitlos' + year = s[0:4] + semester = s[4:6].upper() + if not year.isdigit() or semester not in ['SS', 'WS']: + print('Invalid semester string "%s"'%s) + return '??' + if not long: + return semester+year[2:] + elif semester == 'SS': + return 'Sommersemester %s'%year + else: + return 'Wintersemester %s/%s'%(year, str(int(year)+1)[2:]) + +@app.template_filter(name='date') +def human_date(d): + return d.strftime('%d.%m.%Y') + +@app.template_filter(name='fulldate') +def human_fulldate(d): + return d.strftime('%a, %d.%m.%Y, %H:%M Uhr') + +@app.template_filter(name='time') +def human_time(d): + return d.strftime('%H:%M') + +@app.template_filter() +def rfc3339(d): + return d.strftime('%Y-%m-%dT%H:%M:%S+02:00') + +@app.template_global() +def get_announcements(minlevel=0): + offset = timedelta() + if ismod(): + offset = timedelta(hours=24) + try: + return query('SELECT * FROM announcements WHERE NOT deleted AND ((time_expire = NULL) OR time_expire > ?) AND (? OR (visible AND time_publish < ?)) AND level >= ? ORDER BY level DESC', datetime.now()-offset, ismod(), datetime.now(), minlevel) + except: + return [] + +@app.template_filter() +def fixnl(s): + # To be remove, as soon as db schema is cleaned-up + return str(s).replace('\n', '<br>') + +@app.template_filter() +def tagid(s): + if not s: + return 'EMPTY' + s = s.replace(' ', '_').lower() + r = '' + for c in s: + if c in string.ascii_lowercase+string.digits+'_': + r = r + c + return r + +@app.template_global() +def is_readonly(): + try: + return show('SHOW GLOBAL STATUS LIKE "wsrep_ready"')['wsrep_ready'] != 'ON' + except: + return True