from server import * import subprocess from time import mktime from email.utils import formatdate app.jinja_env.trim_blocks = True app.jinja_env.lstrip_blocks = True app.add_template_global(random.randint, name='randint') app.add_template_global(datetime, name='datetime') app.add_template_global(timedelta, name='timedelta') app.add_template_global(gethostname, name='gethostname') app.add_template_global(min, name='min') app.add_template_global(max, name='max') # get git commit output = subprocess.check_output(['git', "log", "-g", "-1", "--pretty=%H # %h # %d # %s"]).decode('UTF-8').split('#', 3) app.jinja_env.globals['gitversion'] = {'hash': output[1], 'longhash': output[0], 'branch': output[2], 'msg': output[3]} @app.template_global() def ismod(*args): return ('user' in session) csrf_endpoints = [] def csrf_protect(func): csrf_endpoints.append(func.__name__) @wraps(func) def decorator(*args, **kwargs): if '_csrf_token' in request.values: token = request.values['_csrf_token'] elif request.get_json() and ('_csrf_token' in request.get_json()): token = request.get_json()['_csrf_token'] else: token = None if not ('_csrf_token' in session) or (session['_csrf_token'] != token ) or not token: return 'csrf test failed', 403 else: return func(*args, **kwargs) return decorator @app.url_defaults def csrf_inject(endpoint, values): if endpoint not in csrf_endpoints or not session.get('_csrf_token'): return values['_csrf_token'] = session['_csrf_token'] @app.template_filter() def base64encode(str): try: return base64.b64encode(str.encode('UTF-8')).decode('UTF-8') except: return '' @app.template_filter() def checkperm(perms, username=None, password=None): if ismod(): return True perms = evalperm(perms) for perm in perms: if perm['type'] == 'public': return True elif perm['type'] == 'password': if perm['param1'] == username and perm['param2'] == password: return True elif perm['type'] == 'l2p': if perm['param1'] in session.get('l2p_courses', []): return True elif perm['type'] == 'rwth': if session.get('rwthintern', False): return True if 'X-Real-IP' not in request.headers: continue ip = ip_address(request.headers['X-Real-IP']) for net in config['RWTH_IP_RANGES']: if ip in ip_network(net): return True return False @app.template_filter() def permdescr(perms): perms = evalperm(perms) public = False password = False l2p_courses = [] rwth_intern = False fsmpi_intern = False for perm in perms: if perm['type'] == 'public': public = True elif perm['type'] == 'password': password = True elif perm['type'] == 'l2p': l2p_courses.append(perm['param1']) elif perm['type'] == 'rwth': rwth_intern = True elif perm['type'] == 'fsmpi': fsmpi_intern = True if public or not perms: return 'public', 'Öffentlich verfügbar' if rwth_intern: if password: return 'rwth', 'Nur für RWTH-Angehörige und Nutzer mit Passwort verfügbar' return 'rwth', 'Nur für RWTH-Angehörige verfügbar' if fsmpi_intern: return 'fsmpi', 'Nur für Fachschaftler verfügbar' if l2p_courses: if password: return 'l2p', 'Nur für Teilnehmer der Veranstaltung und Nutzer mit Passwort verfügbar' return 'l2p', 'Nur für Teilnehmer der Veranstaltung verfügbar' if password: return 'password', 'Nur für Nutzer mit Passwort verfügbar' return 'none', 'Nicht verfügbar' # debian ships jinja2 without this test... @app.template_test(name='equalto') def equalto(a,b): return a == b @app.template_filter(name='filterdict') def jinja2_filterdict(value, attrdel): v = dict(value) for a in attrdel: if a in v: del v[a] return dict(v) @app.template_filter(name='semester') def human_semester(s, long=False): if not s or s == 'zeitlos' or len(s) != 6: return 'Zeitlos' year = s[0:4] semester = s[4:6].upper() if not year.isdigit() or semester not in ['SS', 'WS']: print('Invalid semester string "%s"'%s) return '??' if not long: return semester+year[2:] elif semester == 'SS': return 'Sommersemester %s'%year else: return 'Wintersemester %s/%s'%(year, str(int(year)+1)[2:]) @app.template_filter(name='date') def human_date(d): return d.strftime('%d.%m.%Y') @app.template_filter(name='fulldate') def human_fulldate(d): return d.strftime('%a, %d.%m.%Y, %H:%M Uhr') @app.template_filter(name='time') def human_time(d): return d.strftime('%H:%M') @app.template_filter() def rfc3339(d): return d.strftime('%Y-%m-%dT%H:%M:%S+02:00') @app.template_filter() def time_offset(s): return '%02d:%02d:%02d'%(s//3600, (s//60)%60, s%60) @app.template_filter() def rfc822(d): return formatdate(mktime(d.timetuple())) @app.template_global() def get_announcements(minlevel=0): offset = timedelta() if ismod(): offset = timedelta(hours=24) try: return query('SELECT * FROM announcements WHERE NOT deleted AND ((time_expire = NULL) OR time_expire > ?) AND (? OR (visible AND time_publish < ?)) AND level >= ? ORDER BY level DESC', datetime.now()-offset, ismod(), datetime.now(), minlevel) except: return [] @app.template_filter() def fixnl(s): # To be remove, as soon as db schema is cleaned-up return str(s).replace('\n', '<br>') @app.template_filter() def tagid(s): if not s: return 'EMPTY' s = s.replace(' ', '_').lower() r = '' for c in s: if c in string.ascii_lowercase+string.digits+'_': r = r + c return r @app.template_global() def is_readonly(): try: return show('SHOW GLOBAL STATUS LIKE "wsrep_ready"')['wsrep_ready'] != 'ON' except: return True