Select Git revision
Forked from
Video AG Infrastruktur / website
Source project has a limited visibility.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
server.py 26.09 KiB
from flask import Flask, g, request, url_for, redirect, session, render_template, flash, Response, make_response
from werkzeug.routing import Rule
from functools import wraps
from datetime import date, timedelta, datetime, time, MINYEAR
import threading
import os
import sys
import hashlib
import random
import sched
import traceback
import string
from socket import gethostname
from ipaddress import ip_address, ip_network
import math
import locale
import base64
locale.setlocale(locale.LC_ALL, 'de_DE.utf8')
app = Flask(__name__)
app.jinja_env.trim_blocks = True
app.jinja_env.lstrip_blocks = True
app.add_template_global(random.randint, name='randint')
app.add_template_global(datetime, name='datetime')
app.add_template_global(timedelta, name='timedelta')
app.add_template_global(gethostname, name='gethostname')
app.add_template_global(min, name='min')
app.add_template_global(max, name='max')
scheduler = sched.scheduler()
def run_scheduler():
import time
time.sleep(1) # UWSGI does weird things on startup
while True:
scheduler.run()
time.sleep(10)
def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}):
if firstdelay == None:
firstdelay = random.randint(1, 120)
def wrapper(func):
def sched_wrapper():
with app.test_request_context():
try:
func(*args, **kargs)
except Exception:
traceback.print_exc()
scheduler.enter(delay, priority, sched_wrapper)
scheduler.enter(firstdelay, priority, sched_wrapper)
return func
return wrapper
threading.Thread(target=run_scheduler, daemon=True).start()
config = app.config
config.from_pyfile('config.py.example', silent=True)
if sys.argv[0].endswith('run.py'):
config['SQLITE_INIT_DATA'] = True
config['DEBUG'] = True
config.from_pyfile('config.py', silent=True)
if config['DEBUG']:
app.jinja_env.auto_reload = True
# get git commit
import subprocess
output = subprocess.check_output(['git', "log", "-g", "-1", "--pretty=%H # %h # %d # %s"]).decode('UTF-8').split('#', 3)
app.jinja_env.globals['gitversion'] = { 'hash': output[1], 'longhash': output[0], 'branch': output[2], 'msg': output[3] }
if not config.get('SECRET_KEY', None):
config['SECRET_KEY'] = os.urandom(24)
from db import query, modify, show, searchquery, ldapauth, ldapget
mod_endpoints = []
@app.template_global()
def ismod(*args):
return ('user' in session)
def mod_required(func):
mod_endpoints.append(func.__name__)
@wraps(func)
def decorator(*args, **kwargs):
if not ismod():
flash('Diese Funktion ist nur für Moderatoren verfügbar!')
return redirect(url_for('login', ref=request.url))
else:
return func(*args, **kwargs)
return decorator
csrf_endpoints = []
def csrf_protect(func):
csrf_endpoints.append(func.__name__)
@wraps(func)
def decorator(*args, **kwargs):
if '_csrf_token' in request.values:
token = request.values['_csrf_token']
elif request.get_json() and ('_csrf_token' in request.get_json()):
token = request.get_json()['_csrf_token']
else:
token = None
if not ('_csrf_token' in session) or (session['_csrf_token'] != token ) or not token:
return 'csrf test failed', 403
else:
return func(*args, **kwargs)
return decorator
@app.url_defaults
def csrf_inject(endpoint, values):
if endpoint not in csrf_endpoints or not session.get('_csrf_token'):
return
values['_csrf_token'] = session['_csrf_token']
def evalperm(perms):
cperms = []
lperms = []
vperms = []
for perm in perms:
if perm['course_id']:
cperms.append(perm)
elif perm['lecture_id']:
lperms.append(perm)
elif perm['video_id']:
vperms.append(perm)
if vperms:
return vperms
elif lperms:
return lperms
elif cperms:
return cperms
return [{'type': 'public'}]
@app.template_filter()
def base64encode(str):
return base64.b64encode(str.encode('UTF-8')).decode('UTF-8')
@app.template_filter()
def checkperm(perms, username=None, password=None):
if ismod():
return True
perms = evalperm(perms)
for perm in perms:
if perm['type'] == 'public':
return True
elif perm['type'] == 'password':
if perm['param1'] == username and perm['param2'] == password:
return True
elif perm['type'] == 'l2p':
if perm['param1'] in session.get('l2p_courses', []):
return True
elif perm['type'] == 'rwth':
if session.get('rwthintern', False):
return True
if 'X-Real-IP' not in request.headers:
continue
ip = ip_address(request.headers['X-Real-IP'])
for net in config['RWTH_IP_RANGES']:
if ip in ip_network(net):
return True
return False
@app.template_filter()
def permdescr(perms):
perms = evalperm(perms)
public = False
password = False
l2p_courses = []
rwth_intern = False
fsmpi_intern = False
for perm in perms:
if perm['type'] == 'public':
public = True
elif perm['type'] == 'password':
password = True
elif perm['type'] == 'l2p':
l2p_courses.append(perm['param1'])
elif perm['type'] == 'rwth':
rwth_intern = True
elif perm['type'] == 'fsmpi':
fsmpi_intern = True
if public or not perms:
return 'public', 'Öffentlich verfügbar'
if rwth_intern:
if password:
return 'rwth', 'Nur für RWTH-Angehörige und Nutzer mit Passwort verfügbar'
return 'rwth', 'Nur für RWTH-Angehörige verfügbar'
if fsmpi_intern:
return 'fsmpi', 'Nur für Fachschaftler verfügbar'
if l2p_courses:
if password:
return 'l2p', 'Nur für Teilnehmer der Veranstaltung und Nutzer mit Passwort verfügbar'
return 'l2p', 'Nur für Teilnehmer der Veranstaltung verfügbar'
if password:
return 'password', 'Nur für Nutzer mit Passwort verfügbar'
return 'none', 'Nicht verfügbar'
app.jinja_env.globals['navbar'] = []
# iconlib can be 'bootstrap'
# ( see: http://getbootstrap.com/components/#glyphicons )
# or 'fa'
# ( see: http://fontawesome.io/icons/ )
def register_navbar(name, iconlib='bootstrap', icon=None):
def wrapper(func):
endpoint = func.__name__
app.jinja_env.globals['navbar'].append((endpoint, name, iconlib, icon, not endpoint in mod_endpoints))
return func
return wrapper
def render_endpoint(endpoint, flashtext=None, **kargs):
if flashtext:
flash(flashtext)
# request.endpoint is used for navbar highlighting
request.url_rule = Rule(request.path, endpoint=endpoint)
return app.view_functions[endpoint](**kargs)
def handle_errors(endpoint, text, code, *errors, **epargs):
def wrapper(func):
@wraps(func)
def decorator(*args, **kwargs):
try:
return func(*args, **kwargs)
except errors:
if endpoint:
return make_response(render_endpoint(endpoint, text, **epargs), code)
else:
return text, code
return decorator
return wrapper
@app.errorhandler(404)
def handle_not_found(e):
return render_endpoint('index', 'Diese Seite existiert nicht!'), 404
@app.errorhandler(500)
@app.errorhandler(Exception)
def handle_internal_error(e):
traceback.print_exc()
return render_template('500.html'), 500
@sched_func(5*60, firstdelay=0)
def dump_error_page():
if 'ERROR_PAGE' not in config:
return
request.url_rule = Rule(request.path, endpoint='handle_internal_error')
text = render_template('500.html')
f = open(config['ERROR_PAGE'], 'w')
f.write(text)
f.close()
# debian ships jinja2 without this test...
@app.template_test(name='equalto')
def equalto(a,b):
return a == b
@app.template_filter(name='filterdict')
def jinja2_filterdict(value, attrdel):
v = dict(value)
for a in attrdel:
if a in v:
del v[a]
return dict(v)
@app.template_filter(name='semester')
def human_semester(s, long=False):
if not s or s == 'zeitlos' or len(s) != 6:
return 'Zeitlos'
year = s[0:4]
semester = s[4:6].upper()
if not year.isdigit() or semester not in ['SS', 'WS']:
print('Invalid semester string "%s"'%s)
return '??'
if not long:
return semester+year[2:]
elif semester == 'SS':
return 'Sommersemester %s'%year
else:
return 'Wintersemester %s/%s'%(year, str(int(year)+1)[2:])
@app.template_filter(name='date')
def human_date(d):
return d.strftime('%d.%m.%Y')
@app.template_filter(name='time')
def human_time(d):
return d.strftime('%H:%M')
@app.template_filter()
def rfc3339(d):
return d.strftime('%Y-%m-%dT%H:%M:%S+02:00')
@app.template_global()
def get_announcements(minlevel=0):
offset = timedelta()
if ismod():
offset = timedelta(hours=24)
try:
return query('SELECT * FROM announcements WHERE NOT deleted AND ((time_expire = NULL) OR time_expire > ?) AND (? OR (visible AND time_publish < ?)) AND level >= ? ORDER BY level DESC', datetime.now()-offset, ismod(), datetime.now(), minlevel)
except:
return []
@app.template_filter()
def fixnl(s):
# To be remove, as soon as db schema is cleaned-up
return str(s).replace('\n', '<br>')
@app.template_filter()
def tagid(s):
if not s:
return 'EMPTY'
s = s.replace(' ', '_').lower()
r = ''
for c in s:
if c in string.ascii_lowercase+string.digits+'_':
r = r + c
return r
@app.route('/')
@register_navbar('Home', icon='home')
def index():
# handle legacy urls...
if 'course' in request.args:
return redirect(url_for('course', handle=request.args['course']),code=302)
if 'view' in request.args:
if (request.args['view'] == 'player') and ('lectureid' in request.args) :
courses = query('SELECT courses.handle FROM courses JOIN lectures ON courses.id = lectures.course_id WHERE lectures.id = ?', request.args['lectureid'])
if not courses:
return "Not found", 404
return redirect(url_for('lecture', course=courses[0]['handle'], id=request.args['lectureid']),code=302)
start = date.today()
end = start + timedelta(days=7)
upcomming = query('''
SELECT lectures.*, "course" AS sep, courses.*
FROM lectures
JOIN courses ON (lectures.course_id = courses.id)
WHERE (time > ?) AND (time < ?) AND (? OR (lectures.visible AND courses.visible AND courses.listed)) AND NOT lectures.norecording
ORDER BY time ASC LIMIT 30''', start, end, ismod())
for i in upcomming:
i['date'] = i['time'].date()
latestvideos=query('''
SELECT lectures.*, "course" AS sep, courses.*
FROM lectures
LEFT JOIN videos ON (videos.lecture_id = lectures.id)
LEFT JOIN courses on (courses.id = lectures.course_id)
WHERE (? OR (courses.visible AND courses.listed AND lectures.visible AND videos.visible))
GROUP BY videos.lecture_id
ORDER BY MAX(videos.time_created) DESC
LIMIT 6 ''',ismod())
livestreams = query('''SELECT streams.handle AS livehandle, lectures.*, "course" AS sep, courses.*
FROM streams
JOIN lectures ON lectures.id = streams.lecture_id
JOIN courses ON courses.id = lectures.course_id
WHERE streams.active AND (? OR (streams.visible AND courses.visible AND courses.listed AND lectures.visible))
''', ismod())
featured = query('SELECT * FROM featured WHERE (? OR visible) ORDER BY `order`', ismod())
featured = list(filter(lambda x: not x['deleted'], featured))
for item in featured:
if item['type'] == 'courses':
if item['param'] not in ['title', 'semester', 'organizer', 'subject']:
continue
item['courses'] = query('SELECT * FROM courses WHERE (visible AND listed) AND `%s` = ? ORDER BY `%s`'%(item['param'], item['param']), item['param2'])
return render_template('index.html', latestvideos=livestreams+latestvideos, upcomming=upcomming, featured=featured)
@app.route('/courses')
@register_navbar('Videos', icon='film')
def courses():
courses = query('SELECT * FROM courses WHERE (? OR (visible AND listed)) ORDER BY lower(semester), lower(title)', ismod())
for course in courses:
if course['semester'] == '':
course['semester'] = 'zeitlos'
groupedby = request.args.get('groupedby')
if groupedby not in ['title', 'semester', 'organizer', 'subject']:
groupedby = 'semester'
return render_template('courses.html', courses=courses, groupedby=groupedby)
def genlive(streams):
for stream in streams:
stream['visible'] = True
stream['downloadable'] = False
stream['path'] = 'pub/hls/%s.m3u8'%stream['livehandle']
stream['file_size'] = 0
return streams
@app.route('/<handle>')
@app.route('/<int:id>')
@handle_errors('courses', 'Diese Veranstaltung existiert nicht!', 404, IndexError)
def course(id=None, handle=None):
if id:
course = query('SELECT * FROM courses WHERE id = ? AND (? OR visible)', id, ismod())[0]
else:
course = query('SELECT * FROM courses WHERE handle = ? AND (? OR visible)', handle, ismod())[0]
course['perm'] = query('SELECT * FROM perm WHERE (NOT perm.deleted) AND course_id = ? ORDER BY type', course['id'])
perms = query('SELECT perm.* FROM perm JOIN lectures ON (perm.lecture_id = lectures.id) WHERE (NOT perm.deleted) AND lectures.course_id = ? ORDER BY perm.type', course['id'])
lectures = query('SELECT * FROM lectures WHERE course_id = ? AND (? OR visible) ORDER BY time, duration DESC', course['id'], ismod())
for lecture in lectures:
lecture['perm'] = []
lecture['perm'] += course['perm']
lecture['course'] = course
for perm in perms:
if perm['lecture_id'] == lecture['id']:
lecture['perm'].append(perm)
videos = query('''
SELECT videos.*, (videos.downloadable AND courses.downloadable) as downloadable, formats.description AS format_description, formats.player_prio, formats.prio
FROM videos
JOIN lectures ON (videos.lecture_id = lectures.id)
JOIN formats ON (videos.video_format = formats.id)
JOIN courses ON (lectures.course_id = courses.id)
WHERE lectures.course_id= ? AND (? OR videos.visible)
ORDER BY lectures.time, formats.prio DESC
''', course['id'], ismod())
livestreams = query('''SELECT streams.handle AS livehandle, streams.lecture_id, formats.description AS format_description, formats.player_prio, formats.prio
FROM streams
JOIN lectures ON lectures.id = streams.lecture_id
JOIN formats ON formats.keywords = "hls"
WHERE streams.active AND (? OR streams.visible) AND lectures.course_id = ?
''', ismod(), course['id'])
videos += genlive(livestreams)
return render_template('course.html', course=course, lectures=lectures, videos=videos)
@app.route('/faq')
@register_navbar('FAQ', icon='question-sign')
def faq():
return render_template('faq.html')
@app.route('/<course>/<int:id>')
@app.route('/<int:courseid>/<int:id>')
@app.route('/<course>/<int:id>/embed', endpoint='embed')
@app.route('/<int:courseid>/<int:id>/embed', endpoint='embed')
@handle_errors('course', 'Diese Vorlesung existiert nicht!', 404, IndexError)
def lecture(id, course=None, courseid=None):
lecture = query('SELECT * FROM lectures WHERE id = ? AND (? OR visible)', id, ismod())[0]
videos = query('''
SELECT videos.*, (videos.downloadable AND courses.downloadable) as downloadable, formats.description AS format_description, formats.player_prio, formats.prio, formats.mimetype
FROM videos
JOIN formats ON (videos.video_format = formats.id)
JOIN courses ON (courses.id = ?)
WHERE videos.lecture_id = ? AND (? OR videos.visible)
ORDER BY formats.prio DESC
''', lecture['course_id'], lecture['id'], ismod())
livestreams = query('''SELECT streams.handle AS livehandle, streams.lecture_id, formats.description AS format_description, formats.player_prio, formats.prio, formats.mimetype
FROM streams
JOIN lectures ON lectures.id = streams.lecture_id
JOIN formats ON formats.keywords = "hls"
WHERE streams.active AND (? OR streams.visible) AND lectures.id = ?
''', ismod(), id)
videos += genlive(livestreams)
perms = query('SELECT perm.* FROM perm WHERE ((NOT perm.deleted) AND (perm.lecture_id = ? OR perm.course_id = ?))',
lecture['id'], lecture['course_id'])
if not videos:
if lecture['live'] and lecture['time'] > datetime.now()-timedelta(minutes=30) and lecture['time']-timedelta(hours=20) < datetime.now():
flash('Der Livestream beginnt um '+human_time(lecture['time'])+' Uhr.')
elif lecture['time'] > datetime.now():
flash('Diese Vorlesung hat noch nicht stattgefunden!')
else:
flash('Zu dieser Vorlesung wurden noch keine Videos veröffentlicht!')
courses = query('SELECT * FROM courses WHERE id = ? AND (? OR visible)', lecture['course_id'], ismod())
if not courses:
return render_endpoint('courses', 'Diese Veranstaltung existiert nicht!'), 404
chapters = query('SELECT * FROM chapters WHERE lecture_id = ? AND NOT deleted AND (? OR visible) ORDER BY time ASC', id, ismod())
username = password = None
if request.authorization:
username = request.authorization.username
password = request.authorization.password
if not checkperm(perms, username=username, password=password):
mode, text = permdescr(perms)
if mode == 'rwth':
flash(text+'. <a target="_blank" class="reloadonclose" href="'+url_for('start_rwthauth')+'">Hier authorisieren</a>.', category='player')
elif mode == 'l2p':
if 'l2p_courses' in session:
flash(text+'. Du bist kein Teilnehmer des L2P-Kurses! <a target="_blank" class="reloadonclose" href="'+url_for('start_l2pauth')+'">Kurse aktualisieren</a>.', category='player')
else:
flash(text+'. <a target="_blank" class="reloadonclose" href="'+url_for('start_l2pauth')+'">Hier authorisieren</a>.', category='player')
else:
flash(text+'.', category='player')
return render_template('embed.html' if request.endpoint == 'embed' else 'lecture.html', course=courses[0], lecture=lecture, videos=videos, chapters=chapters)
@app.route('/search')
def search():
if 'q' not in request.args:
return redirect(url_for('index'))
q = request.args['q']
courses = searchquery(q, '*', ['title', 'short', 'organizer', 'subject', 'description'],
'courses', 'WHERE (? OR (visible AND listed)) GROUP BY id ORDER BY _score DESC, semester DESC LIMIT 20', ismod())
#lectures = searchquery(q, 'lectures.*, courses.visible AS coursevisible, courses.listed, "course" AS sep, courses.*',
# ['lectures.title', 'lectures.comment', 'lectures.speaker', 'courses.short'],
# 'lectures LEFT JOIN courses on (courses.id = lectures.course_id)',
# 'WHERE (? OR (coursevisible AND listed AND visible)) GROUP BY id ORDER BY _score DESC, time DESC LIMIT 30', ismod())
lectures = searchquery(q, 'lectures.*, courses.visible AS coursevisible, courses.listed, courses.id AS courses_id, courses.visible AS courses_visible, courses.listed AS courses_listed, courses.title AS courses_title, courses.short AS courses_short, courses.handle AS courses_handle, courses.organizer AS courses_organizer, courses.subject AS courses_subject, courses.credits AS courses_credits, courses.created_by AS courses_created_by, courses.time_created AS courses_time_created, courses.time_updated AS courses_time_updated, courses.semester AS courses_semester, courses.downloadable AS courses_downloadable, courses.embedinvisible AS courses_embedinvisible, courses.description AS courses_description, courses.internal AS courses_internal, courses.responsible AS courses_responsible',
['lectures.title', 'lectures.comment', 'lectures.speaker', 'courses.short'],
'lectures LEFT JOIN courses on (courses.id = lectures.course_id)',
'WHERE (? OR (coursevisible AND listed AND visible)) GROUP BY id ORDER BY _score DESC, time DESC LIMIT 30', ismod())
for lecture in lectures:
lecture['course'] = {}
for key in lecture:
if key.startswith('courses_'):
lecture['course'][key[8:]] = lecture[key]
return render_template('search.html', searchtext=request.args['q'], courses=courses, lectures=lectures)
def check_mod(user, groups):
return user and 'users' in groups
@app.route('/internal/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
return render_template('login.html')
user, groups = ldapauth(request.form.get('user'), request.form.get('password'))
if not check_mod(user, groups):
flash('Login fehlgeschlagen!')
return render_template('login.html')
session['user'] = ldapget(user)
dbuser = query('SELECT * FROM users WHERE name = ?', user)
if not dbuser:
modify('INSERT INTO users (name, realname, fsacc, level, calendar_key, rfc6238) VALUES (?, ?, ?, 1, "", "")', user, session['user']['givenName'], user)
dbuser = query('SELECT * FROM users WHERE name = ?', user)
session['user']['dbid'] = dbuser[0]['id']
session['_csrf_token'] = ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(128))
session.permanent = True
return redirect(request.values.get('ref', url_for('index')))
@app.route('/internal/logout', methods=['GET', 'POST'])
def logout():
session.pop('user', None)
return redirect(request.values.get('ref', url_for('index')))
@app.route('/internal/auth')
def auth(): # For use with nginx auth_request
if 'X-Original-Uri' not in request.headers:
return 'Internal Server Error', 500
url = request.headers['X-Original-Uri'].lstrip(config['VIDEOPREFIX'])
if request.cookies.get('tracking', '') and request.cookies['tracking'].isdigit():
cookie = int(request.cookies['tracking'])
else:
cookie = random.getrandbits(8*8-1)
if url.endswith('jpg') or ismod():
return "OK", 200
if url.startswith('pub/hls/'):
handle = url[len('pub/hls/'):].split('_')[0].split('.')[0]
perms = query('''SELECT lectures.id AS lecture, perm.*
FROM streams
JOIN lectures ON (streams.lecture_id = lectures.id)
JOIN courses ON (lectures.course_id = courses.id)
LEFT JOIN perm ON ((lectures.id = perm.lecture_id OR courses.id = perm.course_id) AND NOT perm.deleted)
WHERE streams.handle = ?
AND (courses.visible AND lectures.visible AND streams.visible)
ORDER BY perm.video_id DESC, perm.lecture_id DESC, perm.course_id DESC''', handle)
else:
perms = query('''SELECT videos.path, videos.id AS vid, perm.*
FROM videos
JOIN lectures ON (videos.lecture_id = lectures.id)
JOIN courses ON (lectures.course_id = courses.id)
LEFT JOIN perm ON ((videos.id = perm.video_id OR lectures.id = perm.lecture_id OR courses.id = perm.course_id) AND NOT perm.deleted)
WHERE videos.path = ?
AND (courses.visible AND lectures.visible AND videos.visible)
ORDER BY perm.video_id DESC, perm.lecture_id DESC, perm.course_id DESC''',
url)
if not perms:
return "Not allowed", 403
auth = request.authorization
username = password = None
if auth:
username = auth.username
password = auth.password
if checkperm(perms, username=username, password=password):
try:
if not url.startswith('pub/hls/'):
modify('INSERT INTO log (id, `time`, `date`, video, source) VALUES (?, ?, ?, ?, 1)', cookie, datetime.now(), datetime.combine(date.today(), time()), perms[0]['vid'])
elif url.endswith('.ts'):
fmt = url.split('_')[-1].split('-')[0]
seg = url.split('.')[0].split('-')[-1]
modify('INSERT INTO hlslog (id, `time`, segment, lecture, handle, format) VALUES (?, ?, ?, ?, ?, ?)', cookie, datetime.now(), seg, perms[0]['lecture'], handle, fmt)
except:
pass
r = make_response('OK', 200)
r.set_cookie('tracking', str(cookie), max_age=2147483647) # Many many years
return r
password_auth = False
for perm in perms:
if perm['type'] == 'password':
password_auth = True
break
if password_auth:
return Response("Login required", 401, {'WWW-Authenticate': 'Basic realm="Login Required"'})
return "Not allowed", 403
@app.route('/files/<filename>')
def files(filename):
return redirect(config['VIDEOPREFIX']+'/'+filename)
@app.route('/sitemap.xml')
def sitemap():
pages=[]
# static pages
for rule in app.url_map.iter_rules():
if 'GET' in rule.methods and len(rule.arguments)==0:
if rule.endpoint not in mod_endpoints:
pages.append([rule.rule])
for i in query('select * from courses where visible and listed'):
pages.append([url_for('course',handle=i['handle'])])
for j in query('select * from lectures where (course_id = ? and visible)',i['id']):
pages.append([url_for('lecture',course=i['handle'],id=j['id'])])
return Response(render_template('sitemap.xml', pages=pages), 200, {'Content-Type': 'application/atom+xml'} )
@app.route('/site/')
@app.route('/site/<string:phpfile>')
def legacy(phpfile=None):
if phpfile=='embed.php' and ('lecture' in request.args):
courses = query('SELECT courses.handle FROM courses JOIN lectures ON courses.id = lectures.course_id WHERE lectures.id = ?', request.args['lecture'])
if not courses:
return render_endpoint('index', 'Diese Seite existiert nicht!'), 404
return redirect(url_for('embed', course=courses[0]['handle'], id=request.args['lecture']),code=302)
if phpfile=='embed.php' and ('vid' in request.args):
lectures = query('SELECT lecture_id FROM videos WHERE id = ?', request.args['vid'])
if not lectures:
return render_endpoint('index', 'Dieses Videos existiert nicht!'), 404
courses = query('SELECT courses.handle FROM courses JOIN lectures ON courses.id = lectures.course_id WHERE lectures.id = ?', lectures[0]['lecture_id'])
if not courses:
return render_endpoint('index', 'Diese Seite existiert nicht!'), 404
return redirect(url_for('embed', course=courses[0]['handle'], id=lectures[0]['lecture_id']),code=302)
if phpfile=='feed.php' and ('all' in request.args):
return redirect(url_for('feed'),code=302)
if phpfile=='feed.php' and ('newcourses' in request.args):
return redirect(url_for('courses_feed'),code=302)
if phpfile=='feed.php':
return redirect(url_for('feed', handle=request.args.copy().popitem()[0]),code=302)
print("Unknown legacy url:",request.url)
return redirect(url_for('index'),code=302)
import json
@app.route('/internal/dbstatus')
@register_navbar('DB-Status', icon='ok')
@mod_required
def dbstatus():
hosts = set()
clusters = {}
status = {}
variables = {}
for host in config.get('MYSQL_DBSTATUS_HOSTS', [])+[config.get('MYSQL_HOST', None)]:
try:
for _host in show('SHOW VARIABLES LIKE "wsrep_cluster_address"', host=host)['wsrep_cluster_address'][len('gcomm://'):].split(','):
hosts.add(_host)
except:
pass
for host in sorted(list(hosts)):
try:
status[host] = show('SHOW GLOBAL STATUS LIKE "wsrep%"', host=host)
variables[host] = show('SHOW GLOBAL VARIABLES LIKE "wsrep%"', host=host)
except:
status[host] = {'wsrep_cluster_state_uuid': '', 'wsrep_local_state_comment': 'Not reachable', 'wsrep_cluster_conf_id': '0', 'wsrep_cluster_status': 'Unknown'}
variables[host] = {'wsrep_node_name': host, 'wsrep_cluster_name': 'unknown'}
cluster = variables[host]['wsrep_cluster_name']+'-'+status[host]['wsrep_cluster_conf_id']
if cluster not in clusters:
clusters[cluster] = []
clusters[cluster].append(host)
return render_template('dbstatus.html', clusters=clusters, statuses=status, vars=variables), 200
@app.template_global()
def is_readonly():
try:
return show('SHOW GLOBAL STATUS LIKE "wsrep_ready"')['wsrep_ready'] != 'ON'
except:
return True
import edit
import feeds
import importer
import stats
import sorter
if 'ICAL_URL' in config:
import meetings
import l2pauth
if 'JOBS_API_KEY' in config:
import jobs
import timetable
import chapters
import icalexport