Select Git revision
config.py.example
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
server.py 19.74 KiB
from flask import Flask, g, request, url_for, redirect, session, render_template, flash, Response
from werkzeug.routing import Rule
from functools import wraps
from datetime import date, timedelta, datetime, time, MINYEAR
import threading
import os
import sys
import hashlib
import random
import sched
app = Flask(__name__)
app.jinja_env.trim_blocks = True
app.jinja_env.lstrip_blocks = True
app.add_template_global(random.randint, name='randint')
app.add_template_global(datetime, name='datetime')
app.add_template_global(timedelta, name='timedelta')
scheduler = sched.scheduler()
def run_scheduler():
import time
time.sleep(1) # UWSGI does weird things on startup
while True:
scheduler.run()
time.sleep(10)
def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}):
if firstdelay == None:
firstdelay = random.randint(1, 120)
def wrapper(func):
def sched_wrapper():
with app.test_request_context():
func(*args, **kargs)
scheduler.enter(delay, priority, sched_wrapper)
scheduler.enter(firstdelay, priority, sched_wrapper)
return func
return wrapper
threading.Thread(target=run_scheduler, daemon=True).start()
config = app.config
config.from_pyfile('config.py.example', silent=True)
if sys.argv[0].endswith('run.py'):
config['SQLITE_INIT_DATA'] = True
config['DEBUG'] = True
config.from_pyfile('config.py', silent=True)
if config['DEBUG']:
app.jinja_env.auto_reload = True
if not config.get('SECRET_KEY', None):
config['SECRET_KEY'] = os.urandom(24)
from db import query, modify, searchquery, ldapauth, ldapget
mod_endpoints = []
@app.template_global()
def ismod(*args):
return ('user' in session)
def mod_required(func):
mod_endpoints.append(func.__name__)
@wraps(func)
def decorator(*args, **kwargs):
if not ismod():
flash('Diese Funktion ist nur für Moderatoren verfügbar!')
return redirect(url_for('login', ref=request.url))
else:
return func(*args, **kwargs)
return decorator
def evalauth(auths):
cauths = []
lauths = []
vauths = []
for auth in auths:
if auth['course_id']:
cauths.append(auth)
elif auth['lecture_id']:
lauths.append(auth)
elif auth['video_id']:
vauths.append(auth)
if vauths:
return vauths
elif lauths:
return lauths
elif cauths:
return cauths
return [{'auth_type': 'public'}]
@app.template_filter()
def checkauth(auths, username=None, password=None):
auths = evalauth(auths)
for auth in auths:
if auth['auth_type'] == 'public':
return True
elif auth['auth_type'] == 'password':
if auth['auth_user'] == username and auth['auth_password'] == password:
return True
elif auth['auth_type'] == 'l2p':
if auth['auth_param'] in session.get('l2p_courses', []):
return True
elif auth['auth_type'] == 'rwth':
if session.get('rwthintern', False):
return True
return False
@app.template_filter()
def authdescr(auths):
auths = evalauth(auths)
public = False
password = False
l2p_courses = []
rwth_intern = False
for auth in auths:
if auth['auth_type'] == 'public':
public = True
elif auth['auth_type'] == 'password':
password = True
elif auth['auth_type'] == 'l2p':
l2p_courses.append(auth['auth_param'])
elif auth['auth_type'] == 'rwth':
rwth_intern = True
if public or not auths:
return 'public', 'Öffentlich verfügbar'
if rwth_intern:
if password:
return 'rwth', 'Nur für RWTH-Angehörige und Nutzer mit Passwort verfügbar'
return 'rwth', 'Nur für RWTH-Angehörige verfügbar'
if l2p_courses:
if password:
return 'l2p', 'Nur für Teilnehmer der Veranstaltung und Nutzer mit Passwort verfügbar'
return 'l2p', 'Nur für Teilnehmer der Veranstaltung verfügbar'
if password:
return 'password', 'Nur für Nutzer mit Passwort verfügbar'
return 'public', 'Öffentlich verfügbar'
app.jinja_env.globals['navbar'] = []
# iconlib can be 'bootstrap'
# ( see: http://getbootstrap.com/components/#glyphicons )
# or 'fa'
# ( see: http://fontawesome.io/icons/ )
def register_navbar(name, iconlib='bootstrap', icon=None):
def wrapper(func):
endpoint = func.__name__
app.jinja_env.globals['navbar'].append((endpoint, name, iconlib, icon, not endpoint in mod_endpoints))
return func
return wrapper
def render_endpoint(endpoint, flashtext=None, **kargs):
if flashtext:
flash(flashtext)
# request.endpoint is used for navbar highlighting
request.url_rule = Rule(request.path, endpoint=endpoint)
return app.view_functions[endpoint](**kargs)
def handle_errors(endpoint, text, code, *errors, **epargs):
def wrapper(func):
@wraps(func)
def decorator(*args, **kwargs):
try:
return func(*args, **kwargs)
except errors:
if endpoint:
return render_endpoint(endpoint, text, **epargs), code
else:
return text, code
return decorator
return wrapper
@app.errorhandler(404)
def handle_not_found(e):
return render_endpoint('index', 'Diese Seite existiert nicht!'), 404
@app.template_filter(name='semester')
def human_semester(s, long=False):
if not s or s == 'zeitlos' or len(s) != 6:
return 'Zeitlos'
year = s[0:4]
semester = s[4:6].upper()
if not year.isdigit() or semester not in ['SS', 'WS']:
print('Invalid semester string "%s"'%s)
return '??'
if not long:
return semester+year[2:]
elif semester == 'SS':
return 'Sommersemester %s'%year
else:
return 'Wintersemester %s/%s'%(year, str(int(year)+1)[2:])
@app.template_filter(name='date')
def human_date(d):
return d.strftime('%d.%m.%Y')
@app.template_filter(name='time')
def human_time(d):
return d.strftime('%H:%M')
@app.template_filter()
def rfc3339(d):
return d.strftime('%Y-%m-%dT%H:%M:%S+02:00')
@app.template_global()
def get_announcements(minlevel=0):
offset = timedelta()
if ismod():
offset = timedelta(hours=24)
return query('SELECT * FROM announcements WHERE NOT deleted AND (time_expire ISNULL OR time_expire > ?) AND (? OR (visible AND time_publish < ?)) AND level >= ? ORDER BY level DESC', datetime.now()-offset, ismod(), datetime.now(), minlevel)
@app.template_filter()
def fixnl(s):
# To be remove, as soon as db schema is cleaned-up
return str(s).replace('\n', '<br>')
@app.route('/')
@register_navbar('Home', icon='home')
def index():
start = date.today() - timedelta(days=1)
end = start + timedelta(days=7)
upcomming = query('''
SELECT lectures.*, "course" AS sep, courses.*
FROM lectures
JOIN courses ON (lectures.course_id = courses.id)
WHERE (time > ?) AND (time < ?) and lectures.visible and courses.visible and courses.listed
ORDER BY time ASC LIMIT 30''',start,end)
for i in upcomming:
i['date'] = i['time'].date()
latestvideos=query('''
SELECT lectures.*, "course" AS sep, courses.*
FROM lectures
LEFT JOIN videos ON (videos.lecture_id = lectures.id)
LEFT JOIN courses on (courses.id = lectures.course_id)
WHERE (? OR (courses.visible AND courses.listed AND lectures.visible AND videos.visible))
GROUP BY videos.lecture_id
ORDER BY MAX(videos.time_updated) DESC
LIMIT 6 ''',ismod())
featured = query('SELECT * FROM featured WHERE NOT deleted AND (? OR visible)', ismod())
return render_template('index.html', latestvideos=latestvideos, upcomming=upcomming, featured=featured)
@app.route('/course')
@register_navbar('Videos', icon='film')
def courses():
courses = query('SELECT * FROM courses WHERE (? OR (visible AND listed)) ORDER BY title', ismod())
for course in courses:
if course['semester'] == '':
course['semester'] = 'zeitlos'
groupedby = request.args.get('groupedby')
if groupedby not in ['title', 'semester', 'organizer']:
groupedby = 'semester'
return render_template('courses.html', courses=courses, groupedby=groupedby)
@app.route('/course/<handle>')
@app.route('/course/<int:id>')
@handle_errors('courses', 'Diese Veranstaltung existiert nicht!', 404, IndexError)
def course(id=None, handle=None):
if id:
course = query('SELECT * FROM courses WHERE id = ? AND (? OR visible)', id, ismod())[0]
else:
course = query('SELECT * FROM courses WHERE handle = ? AND (? OR visible)', handle, ismod())[0]
course['auth'] = query('SELECT * FROM auth WHERE course_id = ? ORDER BY auth_type', course['id'])
auths = query('SELECT auth.* FROM auth JOIN lectures ON (auth.lecture_id = lectures.id) WHERE lectures.course_id = ? ORDER BY auth.auth_type', course['id'])
lectures = query('SELECT * FROM lectures WHERE course_id = ? AND (? OR visible) ORDER BY time, duration DESC', course['id'], ismod())
for lecture in lectures:
lecture['auth'] = []
lecture['course'] = course
for auth in auths:
if auth['lecture_id'] == lecture['id']:
lecture['auth'].append(auth)
videos = query('''
SELECT videos.*, (videos.downloadable AND courses.downloadable) as downloadable, formats.description AS format_description, formats.player_prio, formats.prio
FROM videos
JOIN lectures ON (videos.lecture_id = lectures.id)
JOIN formats ON (videos.video_format = formats.id)
JOIN courses ON (lectures.course_id = courses.id)
WHERE lectures.course_id= ? AND (? OR videos.visible)
ORDER BY lectures.time, formats.prio DESC
''', course['id'], ismod())
return render_template('course.html', course=course, lectures=lectures, videos=videos)
@app.route('/faq')
@register_navbar('FAQ', icon='question-sign')
def faq():
return render_template('faq.html')
@app.route('/play/<int:id>')
@app.route('/embed/<int:id>', endpoint='embed')
@handle_errors('course', 'Diese Vorlesung existiert nicht!', 404, IndexError)
def lecture(id):
lecture = query('SELECT * FROM lectures WHERE id = ? AND (? OR visible)', id, ismod())[0]
videos = query('''
SELECT videos.*, (videos.downloadable AND courses.downloadable) as downloadable, formats.description AS format_description, formats.player_prio, formats.prio
FROM videos
JOIN formats ON (videos.video_format = formats.id)
JOIN courses ON (courses.id = ?)
WHERE videos.lecture_id = ? AND (? OR videos.visible)
ORDER BY formats.prio DESC
''', lecture['course_id'], lecture['id'], ismod())
auths = query('SELECT auth.* FROM auth WHERE (auth.lecture_id = ? OR auth.course_id = ?)',
lecture['id'], lecture['course_id'])
if not videos:
flash('Zu dieser Vorlesung wurden noch keine Videos veröffentlicht!')
courses = query('SELECT * FROM courses WHERE id = ? AND (? OR (visible AND listed))', lecture['course_id'], ismod())
if not courses:
return render_endpoint('courses', 'Diese Veranstaltung existiert nicht!'), 404
chapters = query('SELECT * FROM chapters WHERE lecture_id = ? AND NOT deleted AND (? OR visible) ORDER BY time ASC', id, ismod())
if not checkauth(auths):
mode, text = authdescr(auths)
if mode == 'rwth':
flash(text+'. <a target="_blank" href="'+url_for('start_rwthauth')+'">Hier authorisieren</a>.')
elif mode == 'l2p':
flash(text+'. <a target="_blank" href="'+url_for('start_l2pauth')+'">Hier authorisieren</a>.')
else:
flash(text+'.')
return render_template('embed.html' if request.endpoint == 'embed' else 'lecture.html', course=courses[0], lecture=lecture, videos=videos, chapters=chapters)
@app.route('/search')
def search():
if 'q' not in request.args:
return redirect(url_for('index'))
q = request.args['q']
courses = searchquery(q, '*', ['title', 'short', 'organizer', 'subject', 'description'],
'courses', 'WHERE (? OR (visible AND listed)) GROUP BY id ORDER BY _score DESC, semester DESC LIMIT 20', ismod())
lectures = searchquery(q, 'lectures.*, courses.visible AS coursevisible, courses.listed, "course" AS sep, courses.*',
['lectures.title', 'lectures.comment', 'lectures.speaker', 'courses.short'],
'lectures LEFT JOIN courses on (courses.id = lectures.course_id)',
'WHERE (? OR (coursevisible AND listed AND visible)) GROUP BY id ORDER BY _score DESC, time DESC LIMIT 30', ismod())
return render_template('search.html', searchtext=request.args['q'], courses=courses, lectures=lectures)
def check_mod(user, groups):
return user and 'users' in groups
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
return render_template('login.html')
user, groups = ldapauth(request.form.get('user'), request.form.get('password'))
if not check_mod(user, groups):
flash('Login fehlgeschlagen!')
return render_template('login.html')
session['user'] = ldapget(user)
dbuser = query('SELECT * FROM users WHERE name = ?', user)
if not dbuser:
modify('INSERT INTO users (name, realname, fsacc, level, calendar_key, rfc6238) VALUES (?, ?, ?, 1, "", "")', user, session['user']['givenName'], user)
dbuser = query('SELECT * FROM users WHERE name = ?', user)
session['user']['dbid'] = dbuser[0]['id']
return redirect(request.values.get('ref', url_for('index')))
@app.route('/logout', methods=['GET', 'POST'])
def logout():
session.pop('user')
return redirect(request.values.get('ref', url_for('index')))
# name: (tablename, idcolumn, [editable_fields], [fields_to_set_at_creation_time])
tabs = {
'courses': ('courses_data', 'id', ['visible', 'listed', 'title', 'short',
'handle', 'organizer', 'subject', 'semester', 'downloadable',
'internal', 'responsible','deleted','description'],
['created_by', 'time_created', 'time_updated']),
'lectures': ('lectures_data', 'id', ['visible', 'title', 'comment',
'internal', 'speaker', 'place', 'time', 'duration', 'jumplist','deleted'],
['course_id', 'time_created', 'time_updated']),
'videos': ('videos_data', 'id', ['visible','deleted'],
['created_by', 'time_created', 'time_updated']),
'chapters': ('chapters', 'id', ['time', 'text', 'visible', 'deleted'],
['created_by', 'time_created', 'time_updated']),
'announcements': ('announcements', 'id', ['text', 'level', 'visible',
'deleted', 'time_publish', 'time_expire'],
['created_by', 'time_created', 'time_updated']),
'featured': ('featured', 'id', ['title', 'text', 'internal', 'visible', 'deleted'],
['created_by', 'time_created', 'time_updated']),
'auth': ('auth_data', 'auth_id', ['auth_type', 'auth_user', 'auth_passwd', 'deleted'],
['course_id', 'lecture_id', 'video_id', 'created_by', 'time_created', 'time_updated']),
'sorterrorlog': ('sorterrorlog_data', 'id', ['deleted'],
['time_created', 'time_updated'])
}
@app.route('/edit', methods=['GET', 'POST'])
@mod_required
def edit(prefix='', ignore=[]):
# All editable tables are expected to have a 'time_updated' field
ignore.append('ref')
ignore.append('prefix')
if not prefix and 'prefix' in request.args:
prefix = request.args['prefix']
modify('BEGIN')
changes = request.values.items()
if request.is_json:
changes = request.get_json().items()
for key, val in changes:
if key in ignore:
continue
key = prefix+key
table, id, column = key.split('.', 2)
assert table in tabs
assert column in tabs[table][2]
modify('INSERT INTO changelog (`table`,id_value, id_key, field, value_new, value_old, `when`, who, executed) VALUES (?,?,?,?,?,(SELECT %s FROM %s WHERE %s = ?),?,?,1)'%(column, tabs[table][0], tabs[table][1]),
table, id, tabs[table][1], column, val, id, datetime.now(), session['user']['dbid'])
modify('UPDATE %s SET %s = ?, time_updated = ? WHERE %s = ?'%(tabs[table][0], column, tabs[table][1]), val, datetime.now(), id)
modify('COMMIT')
if 'ref' in request.values:
return redirect(request.values['ref'])
return "OK", 200
@app.route('/new/<table>', methods=['GET', 'POST'])
@mod_required
def create(table):
assert table in tabs
defaults = {'created_by': session['user']['dbid'], 'time_created': datetime.now(), 'time_updated': datetime.now()}
columns = []
values = []
for column, val in defaults.items():
if column in tabs[table][3]:
columns.append(column)
values.append(val)
args = request.values
if request.is_json:
args = request.get_json()
for column, val in args.items():
if column == 'ref':
continue
assert column in tabs[table][2]+tabs[table][3]
assert column not in defaults
columns.append(column)
values.append(val)
id = modify('INSERT INTO %s (%s) VALUES (%s)'%(tabs[table][0],
','.join(columns), ','.join(['?']*len(values))), *values)
if 'ref' in request.values:
return redirect(request.values['ref'])
return str(id), 200
@app.route('/auth')
def auth(): # For use with nginx auth_request
if 'X-Original-Uri' not in request.headers:
return 'Internal Server Error', 500
url = request.headers['X-Original-Uri'].lstrip(config['VIDEOPREFIX'])
ip = request.headers.get('X-Real-IP', '')
if url.endswith('jpg'):
return "OK", 200
videos = query('''SELECT videos.path, videos.id, auth.*
FROM videos
JOIN lectures ON (videos.lecture_id = lectures.id)
JOIN courses ON (lectures.course_id = courses.id)
LEFT JOIN auth ON (videos.id = auth.video_id OR lectures.id = auth.lecture_id OR courses.id = auth.course_id)
WHERE videos.path = ?
AND (? OR (courses.visible AND lectures.visible AND videos.visible))
ORDER BY auth.video_id DESC, auth.lecture_id DESC, auth.course_id DESC''',
url, ismod())
if not videos:
return "Not allowed", 403
auth = request.authorization
username = password = None
if auth:
username = auth.username
password = auth.password
if checkauth(videos, username=username, password=password):
return 'OK', 200
modify('INSERT INTO log VALUES (?, "", ?, "video", ?, ?)', ip, datetime.now(), videos[0]['id'], url)
password_auth = False
for video in videos:
if video['auth_type'] == 'password':
password_auth = True
break
if password_auth:
return Response("Login required", 401, {'WWW-Authenticate': 'Basic realm="Login Required"'})
return "Not allowed", 403
@app.route('/stats')
@register_navbar('Statistiken', icon='stats')
@mod_required
def stats():
return render_template('stats.html')
@app.route('/changelog')
@register_navbar('Changelog', icon='book')
@mod_required
def changelog():
changelog = query('SELECT * FROM changelog LEFT JOIN users ON (changelog.who = users.id) ORDER BY `when` DESC LIMIT 50')
for entry in changelog:
entry['path'] = '.'.join([entry['table'], entry['id_value'], entry['field']])
return render_template('changelog.html', changelog=changelog)
@app.route('/files/<filename>')
def files(filename):
return redirect(config['VIDEOPREFIX']+'/'+filename)
@app.route('/newchapter/<int:lectureid>', methods=['POST', 'GET'])
def suggest_chapter(lectureid):
time = request.values['time']
text = request.values['text']
assert(time and text)
try:
x = datetime.strptime(time,'%H:%M:%S')
time= timedelta(hours=x.hour,minutes=x.minute,seconds=x.second).total_seconds()
time = int(time)
except ValueError:
flash('Falsches Zeitformat, "%H:%M:%S" wird erwartet. Z.B. "01:39:42" für eine Kapitel bei Stunde 1, Minute 39, Sekunde 42')
submitter = None
if not ismod():
submitter = request.environ['REMOTE_ADDR']
id = modify('INSERT INTO chapters (lecture_id, time, text, time_created, time_updated, created_by, submitted_by) VALUES (?, ?, ?, ?, ?, ?, ?)',
lectureid, time, text, datetime.now(), datetime.now(), session.get('user', {'dbid':None})['dbid'], submitter)
if 'ref' in request.values:
return redirect(request.values['ref'])
return 'OK', 200
@app.route('/chapters/<int:lectureid>')
def chapters(lectureid):
chapters = query("SELECT * FROM chapters WHERE lecture_id = ? and visible ORDER BY time DESC", lectureid)
last = None
for c in chapters:
c['start'] = c['time']
c['end'] = last['start'] if last else 9999
last = c
return Response(render_template('chapters.srt',chapters=chapters), 200, {'Content-Type':'text/vtt'})
@app.route('/sitemap.xml')
def sitemap():
pages=[]
# static pages
for rule in app.url_map.iter_rules():
if 'GET' in rule.methods and len(rule.arguments)==0:
if rule.endpoint not in mod_endpoints:
pages.append([rule.rule])
for i in query('select * from courses where visible and listed'):
pages.append([url_for('course',handle=i['handle'])])
for j in query('select * from lectures where (course_id = ? and visible)',i['id']):
pages.append([url_for('lecture',id=j['id'])])
return Response(render_template('sitemap.xml', pages=pages), 200, {'Content-Type': 'application/atom+xml'} )
import feeds
import importer
import sorter
if 'ICAL_URL' in config:
import meetings
if 'L2P_APIKEY' in config:
import l2pauth
import worker
import timetable