Skip to content
Snippets Groups Projects
Select Git revision
  • a13befd0b3fd3707ea444825f4abd935bb58d919
  • master default protected
  • postgres_integration
  • s3compatible
  • intros
  • bootstrap4
  • modules
7 results

config.py.example

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    server.py 19.74 KiB
    from flask import Flask, g, request, url_for, redirect, session, render_template, flash, Response
    from werkzeug.routing import Rule
    from functools import wraps
    from datetime import date, timedelta, datetime, time, MINYEAR
    import threading
    import os
    import sys
    import hashlib
    import random
    import sched
    
    app = Flask(__name__)
    
    app.jinja_env.trim_blocks = True
    app.jinja_env.lstrip_blocks = True
    app.add_template_global(random.randint, name='randint')
    app.add_template_global(datetime, name='datetime')
    app.add_template_global(timedelta, name='timedelta')
    
    scheduler = sched.scheduler()
    def run_scheduler():
    	import time
    	time.sleep(1) # UWSGI does weird things on startup
    	while True:
    		scheduler.run()
    		time.sleep(10)
    
    def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}):
    	if firstdelay == None:
    		firstdelay = random.randint(1, 120)
    	def wrapper(func):
    		def sched_wrapper():
    			with app.test_request_context():
    				func(*args, **kargs)
    			scheduler.enter(delay, priority, sched_wrapper)
    		scheduler.enter(firstdelay, priority, sched_wrapper)
    		return func
    	return wrapper
    
    threading.Thread(target=run_scheduler, daemon=True).start()
    
    config = app.config
    config.from_pyfile('config.py.example', silent=True)
    if sys.argv[0].endswith('run.py'): 
    	config['SQLITE_INIT_DATA'] = True
    	config['DEBUG'] = True
    config.from_pyfile('config.py', silent=True)
    if config['DEBUG']:
    	app.jinja_env.auto_reload = True
    if not config.get('SECRET_KEY', None):
    	config['SECRET_KEY'] = os.urandom(24)
    
    from db import query, modify, searchquery, ldapauth, ldapget
    
    mod_endpoints = []
    
    @app.template_global()
    def ismod(*args):
    	return ('user' in session)
    
    def mod_required(func):
    	mod_endpoints.append(func.__name__)
    	@wraps(func)
    	def decorator(*args, **kwargs):
    		if not ismod():
    			flash('Diese Funktion ist nur für Moderatoren verfügbar!')
    			return redirect(url_for('login', ref=request.url))
    		else:
    			return func(*args, **kwargs)
    	return decorator
    
    def evalauth(auths):
    	cauths = []
    	lauths = []
    	vauths = []
    	for auth in auths:
    		if auth['course_id']:
    			cauths.append(auth)
    		elif auth['lecture_id']:
    			lauths.append(auth)
    		elif auth['video_id']:
    			vauths.append(auth)
    	if vauths:
    		return vauths
    	elif lauths:
    	 	return lauths
    	elif cauths:
    		return cauths
    	return [{'auth_type': 'public'}]
    
    @app.template_filter()
    def checkauth(auths, username=None, password=None):
    	auths = evalauth(auths)
    	for auth in auths:
    		if auth['auth_type'] == 'public':
    			return True
    		elif auth['auth_type'] == 'password':
    			if auth['auth_user'] == username and auth['auth_password'] == password:
    				return True
    		elif auth['auth_type'] == 'l2p':
    			if auth['auth_param'] in session.get('l2p_courses', []):
    				return True
    		elif auth['auth_type'] == 'rwth':
    			if session.get('rwthintern', False):
    				return True
    	return False
    
    @app.template_filter()
    def authdescr(auths):
    	auths = evalauth(auths)
    	public = False
    	password = False
    	l2p_courses = []
    	rwth_intern = False
    	for auth in auths:
    		if auth['auth_type'] == 'public':
    			public = True
    		elif auth['auth_type'] == 'password':
    			password = True
    		elif auth['auth_type'] == 'l2p':
    			l2p_courses.append(auth['auth_param'])
    		elif auth['auth_type'] == 'rwth':
    			rwth_intern = True
    	if public or not auths:
    		return 'public', 'Öffentlich verfügbar'
    	if rwth_intern:
    		if password:
    			return 'rwth', 'Nur für RWTH-Angehörige und Nutzer mit Passwort verfügbar'
    		return 'rwth', 'Nur für RWTH-Angehörige verfügbar'
    	if l2p_courses:
    		if password:
    			return 'l2p', 'Nur für Teilnehmer der Veranstaltung und Nutzer mit Passwort verfügbar'
    		return 'l2p', 'Nur für Teilnehmer der Veranstaltung verfügbar'
    	if password:
    		return 'password', 'Nur für Nutzer mit Passwort verfügbar'
    	return 'public', 'Öffentlich verfügbar'
    
    app.jinja_env.globals['navbar'] = []
    # iconlib can be 'bootstrap'
    # ( see: http://getbootstrap.com/components/#glyphicons )
    # or 'fa'
    # ( see: http://fontawesome.io/icons/ )
    def register_navbar(name, iconlib='bootstrap', icon=None):
    	def wrapper(func):
    		endpoint = func.__name__
    		app.jinja_env.globals['navbar'].append((endpoint, name, iconlib, icon, not endpoint in mod_endpoints))
    		return func
    	return wrapper
    
    def render_endpoint(endpoint, flashtext=None, **kargs):
    	if flashtext:
    		flash(flashtext)
    	# request.endpoint is used for navbar highlighting
    	request.url_rule = Rule(request.path, endpoint=endpoint)
    	return app.view_functions[endpoint](**kargs)
    
    def handle_errors(endpoint, text, code, *errors, **epargs):
    	def wrapper(func):
    		@wraps(func)
    		def decorator(*args, **kwargs):
    			try:
    				return func(*args, **kwargs)
    			except errors:
    				if endpoint:
    					return render_endpoint(endpoint, text, **epargs), code
    				else:
    					return text, code
    		return decorator
    	return wrapper
    
    @app.errorhandler(404)
    def handle_not_found(e):
    	return render_endpoint('index', 'Diese Seite existiert nicht!'), 404
    
    @app.template_filter(name='semester')
    def human_semester(s, long=False):
    	if not s or s == 'zeitlos' or len(s) != 6:
    		return 'Zeitlos'
    	year = s[0:4]
    	semester = s[4:6].upper()
    	if not year.isdigit() or semester not in ['SS', 'WS']:
    		print('Invalid semester string "%s"'%s)
    		return '??'
    	if not long:
    		return semester+year[2:]
    	elif semester == 'SS':
    		return 'Sommersemester %s'%year
    	else:
    		return 'Wintersemester %s/%s'%(year, str(int(year)+1)[2:])
    
    @app.template_filter(name='date')
    def human_date(d):
    	return d.strftime('%d.%m.%Y')
    
    @app.template_filter(name='time')
    def human_time(d):
    	return d.strftime('%H:%M')
    
    @app.template_filter()
    def rfc3339(d):
    	return d.strftime('%Y-%m-%dT%H:%M:%S+02:00')
    
    @app.template_global()
    def get_announcements(minlevel=0):
    	offset = timedelta()
    	if ismod():
    		offset = timedelta(hours=24)
    	return query('SELECT * FROM announcements WHERE NOT deleted AND (time_expire ISNULL OR time_expire > ?) AND (? OR (visible AND time_publish < ?)) AND level >= ? ORDER BY level DESC', datetime.now()-offset, ismod(), datetime.now(), minlevel)
    
    @app.template_filter()
    def fixnl(s):
    	# To be remove, as soon as db schema is cleaned-up
    	return str(s).replace('\n', '<br>')
    
    @app.route('/')
    @register_navbar('Home', icon='home')
    def index():
    	start = date.today() - timedelta(days=1)
    	end = start + timedelta(days=7)
    	upcomming = query('''
    		SELECT lectures.*, "course" AS sep, courses.*
    		FROM lectures
    		JOIN courses ON (lectures.course_id = courses.id)
    		WHERE (time > ?) AND (time < ?) and lectures.visible and courses.visible and courses.listed
    		ORDER BY time ASC LIMIT 30''',start,end)
    	for i in upcomming:
    		i['date'] = i['time'].date()
    	latestvideos=query('''
    		SELECT lectures.*, "course" AS sep, courses.*
    		FROM lectures
    		LEFT JOIN videos ON (videos.lecture_id = lectures.id)
    		LEFT JOIN courses on (courses.id = lectures.course_id)
    		WHERE (? OR (courses.visible AND courses.listed AND lectures.visible AND videos.visible))
    		GROUP BY videos.lecture_id
    		ORDER BY MAX(videos.time_updated) DESC
    		LIMIT 6	''',ismod())
    	featured = query('SELECT * FROM featured WHERE NOT deleted AND (? OR visible)', ismod())
    	return render_template('index.html', latestvideos=latestvideos, upcomming=upcomming, featured=featured)
    
    @app.route('/course')
    @register_navbar('Videos', icon='film')
    def courses():
    	courses = query('SELECT * FROM courses WHERE (? OR (visible AND listed)) ORDER BY title', ismod())
    	for course in courses:
    		if course['semester'] == '':
    			course['semester'] = 'zeitlos'
    	groupedby = request.args.get('groupedby')
    	if groupedby not in ['title', 'semester', 'organizer']:
    		groupedby = 'semester'
    	return render_template('courses.html', courses=courses, groupedby=groupedby)
    
    @app.route('/course/<handle>')
    @app.route('/course/<int:id>')
    @handle_errors('courses', 'Diese Veranstaltung existiert nicht!', 404, IndexError)
    def course(id=None, handle=None):
    	if id:
    		course = query('SELECT * FROM courses WHERE id = ? AND (? OR visible)', id, ismod())[0]
    	else:
    		course = query('SELECT * FROM courses WHERE handle = ? AND (? OR visible)', handle, ismod())[0]
    	course['auth'] = query('SELECT * FROM auth WHERE course_id = ? ORDER BY auth_type', course['id'])
    	auths = query('SELECT auth.* FROM auth JOIN lectures ON (auth.lecture_id = lectures.id) WHERE lectures.course_id = ? ORDER BY auth.auth_type', course['id'])
    	lectures = query('SELECT * FROM lectures WHERE course_id = ? AND (? OR visible) ORDER BY time, duration DESC', course['id'], ismod())
    	for lecture in lectures:
    		lecture['auth'] = []
    		lecture['course'] = course
    		for auth in auths:
    			if auth['lecture_id'] == lecture['id']:
    				lecture['auth'].append(auth)
    	videos = query('''
    			SELECT videos.*, (videos.downloadable AND courses.downloadable) as downloadable, formats.description AS format_description, formats.player_prio, formats.prio
    			FROM videos
    			JOIN lectures ON (videos.lecture_id = lectures.id)
    			JOIN formats ON (videos.video_format = formats.id)
    			JOIN courses ON (lectures.course_id = courses.id)
    			WHERE lectures.course_id= ? AND (? OR videos.visible)
    			ORDER BY lectures.time, formats.prio DESC
    			''', course['id'], ismod())
    	return render_template('course.html', course=course, lectures=lectures, videos=videos)
    
    @app.route('/faq')
    @register_navbar('FAQ', icon='question-sign')
    def faq():
    	return render_template('faq.html')
    
    @app.route('/play/<int:id>')
    @app.route('/embed/<int:id>', endpoint='embed')
    @handle_errors('course', 'Diese Vorlesung existiert nicht!', 404, IndexError)
    def lecture(id):
    	lecture = query('SELECT * FROM lectures WHERE id = ? AND (? OR visible)', id, ismod())[0]
    	videos = query('''
    			SELECT videos.*, (videos.downloadable AND courses.downloadable) as downloadable, formats.description AS format_description, formats.player_prio, formats.prio
    			FROM videos
    			JOIN formats ON (videos.video_format = formats.id)
    			JOIN courses ON (courses.id = ?)
    			WHERE videos.lecture_id = ? AND (? OR videos.visible)
    			ORDER BY formats.prio DESC
    			''', lecture['course_id'], lecture['id'], ismod())
    	auths = query('SELECT auth.* FROM auth WHERE (auth.lecture_id = ? OR auth.course_id = ?)',
    			lecture['id'], lecture['course_id'])
    	if not videos:
    		flash('Zu dieser Vorlesung wurden noch keine Videos veröffentlicht!')
    	courses = query('SELECT * FROM courses WHERE id = ? AND (? OR (visible AND listed))', lecture['course_id'], ismod())
    	if not courses:
    		return render_endpoint('courses', 'Diese Veranstaltung existiert nicht!'), 404
    	chapters = query('SELECT * FROM chapters WHERE lecture_id = ? AND NOT deleted AND (? OR visible) ORDER BY time ASC', id, ismod())
    	if not checkauth(auths):
    		mode, text = authdescr(auths)
    		if mode == 'rwth':
    			flash(text+'. <a target="_blank" href="'+url_for('start_rwthauth')+'">Hier authorisieren</a>.')
    		elif mode == 'l2p':
    			flash(text+'. <a target="_blank" href="'+url_for('start_l2pauth')+'">Hier authorisieren</a>.')
    		else:
    			flash(text+'.')
    	return render_template('embed.html' if request.endpoint == 'embed' else 'lecture.html', course=courses[0], lecture=lecture, videos=videos, chapters=chapters)
    
    
    @app.route('/search')
    def search():
    	if 'q' not in request.args:
    		return redirect(url_for('index'))
    	q = request.args['q']
    	courses = searchquery(q, '*', ['title', 'short', 'organizer', 'subject', 'description'],
    			'courses', 'WHERE (? OR (visible AND listed)) GROUP BY id ORDER BY _score DESC, semester DESC LIMIT 20', ismod())
    	lectures = searchquery(q, 'lectures.*, courses.visible AS coursevisible, courses.listed, "course" AS sep, courses.*',
    			['lectures.title', 'lectures.comment', 'lectures.speaker', 'courses.short'],
    			'lectures LEFT JOIN courses on (courses.id = lectures.course_id)',
    			'WHERE (? OR (coursevisible AND listed AND visible)) GROUP BY id ORDER BY _score DESC, time DESC LIMIT 30', ismod())
    	return render_template('search.html', searchtext=request.args['q'], courses=courses, lectures=lectures)
    
    def check_mod(user, groups):
    	return user and 'users' in groups
    
    @app.route('/login', methods=['GET', 'POST'])
    def login():
    	if request.method == 'GET':
    		return render_template('login.html')
    	user, groups = ldapauth(request.form.get('user'), request.form.get('password'))
    	if not check_mod(user, groups):
    		flash('Login fehlgeschlagen!')
    		return render_template('login.html')
    	session['user'] = ldapget(user)
    	dbuser = query('SELECT * FROM users WHERE name = ?', user)
    	if not dbuser:
    		modify('INSERT INTO users (name, realname, fsacc, level, calendar_key, rfc6238) VALUES (?, ?, ?, 1, "", "")', user, session['user']['givenName'], user)
    		dbuser = query('SELECT * FROM users WHERE name = ?', user)
    	session['user']['dbid'] = dbuser[0]['id']
    	return redirect(request.values.get('ref', url_for('index')))
    
    @app.route('/logout', methods=['GET', 'POST'])
    def logout():
    	session.pop('user')
    	return redirect(request.values.get('ref', url_for('index')))
    
    # name: (tablename, idcolumn, [editable_fields], [fields_to_set_at_creation_time])
    tabs = {
    	'courses': ('courses_data', 'id', ['visible', 'listed', 'title', 'short',
    			'handle', 'organizer', 'subject', 'semester', 'downloadable',
    			'internal', 'responsible','deleted','description'],
    			['created_by', 'time_created', 'time_updated']),
    	'lectures': ('lectures_data', 'id', ['visible', 'title', 'comment',
    			'internal', 'speaker', 'place', 'time', 'duration', 'jumplist','deleted'],
    			['course_id', 'time_created', 'time_updated']),
    	'videos': ('videos_data', 'id', ['visible','deleted'],
    			['created_by', 'time_created', 'time_updated']),
    	'chapters': ('chapters', 'id', ['time', 'text', 'visible', 'deleted'],
    			['created_by', 'time_created', 'time_updated']),
    	'announcements': ('announcements', 'id', ['text', 'level', 'visible',
    			'deleted', 'time_publish', 'time_expire'],
    			['created_by', 'time_created', 'time_updated']),
    	'featured': ('featured', 'id', ['title', 'text', 'internal', 'visible', 'deleted'],
    			['created_by', 'time_created', 'time_updated']),
    	'auth': ('auth_data', 'auth_id', ['auth_type', 'auth_user', 'auth_passwd', 'deleted'],
    			['course_id', 'lecture_id', 'video_id', 'created_by', 'time_created', 'time_updated']),
    	'sorterrorlog': ('sorterrorlog_data', 'id', ['deleted'],
    			['time_created', 'time_updated'])
    }
    
    @app.route('/edit', methods=['GET', 'POST'])
    @mod_required
    def edit(prefix='', ignore=[]):
    	# All editable tables are expected to have a 'time_updated' field
    	ignore.append('ref')
    	ignore.append('prefix')
    	if not prefix and 'prefix' in request.args:
    		prefix = request.args['prefix']
    	modify('BEGIN')
    	changes = request.values.items()
    	if request.is_json:
    		changes = request.get_json().items()
    	for key, val in changes:
    		if key in ignore:
    			continue
    		key = prefix+key
    		table, id, column = key.split('.', 2)
    		assert table in tabs
    		assert column in tabs[table][2]
    		modify('INSERT INTO changelog (`table`,id_value, id_key, field, value_new, value_old, `when`, who, executed) VALUES (?,?,?,?,?,(SELECT %s FROM %s WHERE %s = ?),?,?,1)'%(column, tabs[table][0], tabs[table][1]),
    				table, id, tabs[table][1], column, val, id, datetime.now(), session['user']['dbid'])
    		modify('UPDATE %s SET %s = ?, time_updated = ? WHERE %s = ?'%(tabs[table][0], column, tabs[table][1]), val, datetime.now(), id)
    	modify('COMMIT')
    	if 'ref' in request.values:
    		return redirect(request.values['ref'])
    	return "OK", 200
    
    @app.route('/new/<table>', methods=['GET', 'POST'])
    @mod_required
    def create(table):
    	assert table in tabs
    	defaults = {'created_by': session['user']['dbid'], 'time_created': datetime.now(), 'time_updated': datetime.now()}
    	columns = []
    	values = []
    	for column, val in defaults.items():
    		if column in tabs[table][3]:
    			columns.append(column)
    			values.append(val)
    	args = request.values
    	if request.is_json:
    		args = request.get_json()
    	for column, val in args.items():
    		if column == 'ref':
    			continue
    		assert column in tabs[table][2]+tabs[table][3]
    		assert column not in defaults
    		columns.append(column)
    		values.append(val)
    	id = modify('INSERT INTO %s (%s) VALUES (%s)'%(tabs[table][0],
    				','.join(columns), ','.join(['?']*len(values))), *values)
    	if 'ref' in request.values:
    		return redirect(request.values['ref'])
    	return str(id), 200
    
    @app.route('/auth')
    def auth(): # For use with nginx auth_request
    	if 'X-Original-Uri' not in request.headers:
    		return 'Internal Server Error', 500
    	url = request.headers['X-Original-Uri'].lstrip(config['VIDEOPREFIX'])
    	ip = request.headers.get('X-Real-IP', '')
    	if url.endswith('jpg'):
    		return "OK", 200
    	videos = query('''SELECT videos.path, videos.id, auth.*
          FROM videos
          JOIN lectures ON (videos.lecture_id = lectures.id)
          JOIN courses ON (lectures.course_id = courses.id)
    			LEFT JOIN auth ON (videos.id = auth.video_id OR lectures.id = auth.lecture_id OR courses.id = auth.course_id)
          WHERE videos.path = ?
          AND (? OR (courses.visible AND lectures.visible AND videos.visible))
    			ORDER BY auth.video_id DESC, auth.lecture_id DESC, auth.course_id DESC''',
    			url, ismod())
    
    	if not videos:
    		return "Not allowed", 403
    	auth = request.authorization
    	username = password = None
    	if auth:
    		username = auth.username
    		password = auth.password
    	if checkauth(videos, username=username, password=password):
    		return 'OK', 200
    		modify('INSERT INTO log VALUES (?, "", ?, "video", ?, ?)', ip, datetime.now(), videos[0]['id'], url)
    	password_auth = False
    	for video in videos:
    		if video['auth_type'] == 'password':
    			password_auth = True
    			break
    	if password_auth:
    		return Response("Login required", 401, {'WWW-Authenticate': 'Basic realm="Login Required"'})
    	return "Not allowed", 403
    
    @app.route('/stats')
    @register_navbar('Statistiken', icon='stats')
    @mod_required
    def stats():
    	return render_template('stats.html')
    
    @app.route('/changelog')
    @register_navbar('Changelog', icon='book')
    @mod_required
    def changelog():
    	changelog = query('SELECT * FROM changelog LEFT JOIN users ON (changelog.who = users.id) ORDER BY `when` DESC LIMIT 50')
    	for entry in changelog:
    		entry['path'] = '.'.join([entry['table'], entry['id_value'], entry['field']])
    	return render_template('changelog.html', changelog=changelog)
    
    @app.route('/files/<filename>')
    def files(filename):
    	return redirect(config['VIDEOPREFIX']+'/'+filename)
    
    @app.route('/newchapter/<int:lectureid>', methods=['POST', 'GET'])
    def suggest_chapter(lectureid):
    	time = request.values['time']
    	text = request.values['text']
    	assert(time and text)
    	try:
    		x = datetime.strptime(time,'%H:%M:%S')
    		time= timedelta(hours=x.hour,minutes=x.minute,seconds=x.second).total_seconds()
    		time = int(time)
    	except ValueError:
    		flash('Falsches Zeitformat, "%H:%M:%S" wird erwartet. Z.B. "01:39:42" für eine Kapitel bei Stunde 1, Minute 39, Sekunde 42')
    		
    	submitter = None
    	if not ismod():
    		submitter = request.environ['REMOTE_ADDR']
    	id = modify('INSERT INTO chapters (lecture_id, time, text, time_created, time_updated, created_by, submitted_by) VALUES (?, ?, ?, ?, ?, ?, ?)',
    				lectureid, time, text, datetime.now(), datetime.now(), session.get('user', {'dbid':None})['dbid'], submitter)
    	if 'ref' in request.values:
    		return redirect(request.values['ref'])
    	return 'OK',  200
    
    @app.route('/chapters/<int:lectureid>')
    def chapters(lectureid):
    	chapters = query("SELECT * FROM chapters WHERE lecture_id = ? and visible ORDER BY time DESC", lectureid)
    	last = None
    	for c in chapters:
    		c['start'] = c['time']
    		c['end'] = last['start'] if last else 9999
    		last = c
    	return Response(render_template('chapters.srt',chapters=chapters), 200, {'Content-Type':'text/vtt'})
    
    @app.route('/sitemap.xml')
    def sitemap():
    	pages=[]
    	# static pages
    	for rule in app.url_map.iter_rules():
    		if 'GET' in rule.methods and len(rule.arguments)==0:
    			if rule.endpoint not in mod_endpoints:
    				pages.append([rule.rule])
    	for i in query('select * from courses where visible and listed'):
    		pages.append([url_for('course',handle=i['handle'])])
    		for j in query('select * from lectures where (course_id = ? and visible)',i['id']):
    			pages.append([url_for('lecture',id=j['id'])])
    
    
    	return Response(render_template('sitemap.xml', pages=pages), 200, {'Content-Type': 'application/atom+xml'} )
    
    import feeds
    import importer
    import sorter
    if 'ICAL_URL' in config:
    	import meetings
    if 'L2P_APIKEY' in config:
    	import l2pauth
    import worker
    import timetable