Skip to content
Snippets Groups Projects
Select Git revision
  • a50a8a01922543443eaeaa937545ecd8e16be25c
  • master default protected
  • md-export
  • th/mail
  • 179-einladungen-zum-aushaengen-drucken
5 results

tasks.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    server.py 16.23 KiB
    from flask import Flask, g, request, url_for, redirect, session, render_template, flash, Response
    from werkzeug.routing import Rule
    from functools import wraps
    from datetime import date, timedelta, datetime, time
    import threading
    import os
    
    app = Flask(__name__)
    
    app.jinja_env.trim_blocks = True
    app.jinja_env.lstrip_blocks = True
    
    def timer_func():
    	with app.test_request_context():
    		pass # do something
    	timer = threading.Timer(60*60, timer_func)
    	timer.start()
    
    timer = threading.Timer(0, timer_func)
    timer.daemon = True
    timer.start()
    
    config = app.config
    config['DB_SCHEMA'] = 'db_schema.sql'
    config['DB_DATA'] = 'db_example.sql'
    config['DB_ENGINE'] = 'sqlite'
    config['SQLITE_DB'] = 'db.sqlite'
    config['SQLITE_INIT_SCHEMA'] = True
    config['SQLITE_INIT_DATA'] = False
    config['DEBUG'] = False
    config['VIDEOPREFIX'] = 'https://videoag.fsmpi.rwth-aachen.de'
    if __name__ == '__main__':
    	config['SQLITE_INIT_DATA'] = True
    	config['DEBUG'] = True
    config.from_pyfile('config.py', silent=True)
    if config['DEBUG']:
    	app.jinja_env.auto_reload = True
    
    from db import query, searchquery, ldapauth, ldapget
    
    mod_endpoints = []
    
    @app.template_global()
    def ismod(*args):
    	return ('user' in session)
    
    def mod_required(func):
    	mod_endpoints.append(func.__name__)
    	@wraps(func)
    	def decorator(*args, **kwargs):
    		if not ismod():
    			flash('Diese Funktion ist nur für Moderatoren verfügbar!')
    			return redirect(url_for('login', ref=request.url))
    		else:
    			return func(*args, **kwargs)
    	return decorator
    
    app.jinja_env.globals['navbar'] = []
    def register_navbar(name, icon=None):
    	def wrapper(func):
    		endpoint = func.__name__
    		app.jinja_env.globals['navbar'].append((endpoint, name, icon,
    					not endpoint in mod_endpoints))
    		return func
    	return wrapper
    
    def render_endpoint(endpoint, flashtext=None, **kargs):
    	if flashtext:
    		flash(flashtext)
    	# request.endpoint is used for navbar highlighting
    	request.url_rule = Rule(request.path, endpoint=endpoint)
    	return app.view_functions[endpoint](**kargs)
    
    def handle_errors(endpoint, text, code, *errors, **epargs):
    	def wrapper(func):
    		@wraps(func)
    		def decorator(*args, **kwargs):
    			try:
    				return func(*args, **kwargs)
    			except errors:
    				return render_endpoint(endpoint, text, **epargs), code
    		return decorator
    	return wrapper
    
    @app.errorhandler(404)
    def handle_not_found(e):
    	return render_endpoint('index', 'Diese Seite existiert nicht!'), 404
    
    @app.route('/')
    @register_navbar('Home', icon='home')
    def index():
    	return render_template('index.html', latestvideos=query('''
    				SELECT lectures.*, max(videos.time_updated) AS lastvidtime, courses.short, courses.downloadable, courses.title AS coursetitle
    				FROM lectures
    				LEFT JOIN videos ON (videos.lecture_id = lectures.id)
    				LEFT JOIN courses on (courses.id = lectures.course_id)
    				WHERE (? OR (courses.visible AND courses.listed AND lectures.visible AND videos.visible))
    				GROUP BY videos.lecture_id
    				ORDER BY lastvidtime DESC
    				LIMIT 6
    			''', ismod()))
    
    @app.route('/course')
    @register_navbar('Videos', icon='film')
    def course():
    	courses = query('SELECT * FROM courses WHERE (? OR (visible AND listed))', ismod())
    	for course in courses:
    		if course['semester'] == '':
    			course['semester'] = 'zeitlos'
    	groupedby = request.args.get('groupedby')
    	if groupedby not in ['title', 'semester', 'organizer']:
    		groupedby = 'semester'
    	return render_template('course.html', courses=courses, groupedby=groupedby)
    
    @app.route('/course/<id>')
    @app.route('/course/<int:numid>')
    @handle_errors('course', 'Diese Veranstaltung existiert nicht!', 404, IndexError)
    def course_id(numid=None, id=None):
    	if numid:
    		courses = query('SELECT * FROM courses WHERE id = ? AND (? OR visible)', numid, ismod())[0]
    	else:
    		courses = query('SELECT * FROM courses WHERE handle = ? AND (? OR visible)', id, ismod())[0]
    	lectures = query('SELECT * FROM lectures WHERE course_id = ? AND (? OR visible)', courses['id'], ismod())
    	videos = query('''
    			SELECT videos.*, (videos.downloadable AND courses.downloadable) as downloadable, formats.description AS format_description
    			FROM videos
    			JOIN lectures ON (videos.lecture_id = lectures.id)
    			JOIN formats ON (videos.video_format = formats.id)
    			JOIN courses ON (lectures.course_id = courses.id)
    			WHERE lectures.course_id= ? AND (? OR videos.visible)
    			ORDER BY lectures.time, formats.prio DESC
    			''', courses['id'], ismod())
    	return render_template('course_id.html', course=courses, lectures=lectures, videos=videos)
    
    @app.route('/faq')
    @register_navbar('FAQ', icon='question-sign')
    def faq():
    	return render_template('faq.html')
    
    @app.route('/play/<int:id>')
    @handle_errors('course', 'Diese Vorlesung existiert nicht!', 404, IndexError)
    def play(id):
    	lectures = query('SELECT * FROM lectures WHERE id = ? AND (? OR visible)', id, ismod())
    	videos = query('SELECT * FROM videos WHERE lecture_id = ? AND (? OR visible)', id, ismod())
    	if not videos:
    		flash('Zu dieser Vorlesung wurden noch keine Videos veröffentlicht!')
    	courses = query('SELECT * FROM courses WHERE id = ? AND (? OR (visible AND listed))', lectures[0]['course_id'], ismod())
    	if not courses:
    		return render_endpoint('course', 'Diese Veranstaltung existiert nicht!'), 404
    	return render_template('play.html', course=courses[0], lecture=lectures[0], videos=videos)
    
    @app.route('/search')
    def search():
    	if 'q' not in request.args:
    		return redirect(url_for('index'))
    	q = request.args['q']
    	courses = searchquery(q, '*', ['title', 'short', 'organizer', 'subject', 'description'],
    			'courses', 'WHERE (? OR (visible AND listed)) GROUP BY id ORDER BY _score DESC, semester DESC LIMIT 20', ismod())
    	lectures = searchquery(q, 'lectures.*, courses.visible AS coursevisible, courses.listed, courses.short, courses.downloadable, courses.title AS coursetitle',
    			['lectures.title', 'lectures.comment', 'lectures.speaker', 'courses.short'],
    			'lectures LEFT JOIN courses on (courses.id = lectures.course_id)',
    			'WHERE (? OR (coursevisible AND listed AND visible)) GROUP BY id ORDER BY _score DESC, time DESC LIMIT 30', ismod())
    	return render_template('search.html', searchtext=request.args['q'], courses=courses, lectures=lectures)
    
    def check_mod(user, groups):
    	return user and 'users' in groups
    
    @app.route('/login', methods=['GET', 'POST'])
    def login():
    	if request.method == 'GET':
    		return render_template('login.html')
    	user, groups = ldapauth(request.form.get('user'), request.form.get('password'))
    	if not check_mod(user, groups):
    		flash('Login fehlgeschlagen!')
    		return render_template('login.html')
    	session['user'] = ldapget(user)
    	dbuser = query('SELECT * FROM users WHERE name = ?', user)
    	if not dbuser:
    		query('INSERT INTO users (name, realname, fsacc, level, calendar_key, rfc6238) VALUES (?, ?, ?, 1, "", "")', user, session['user']['givenName'], user)
    		dbuser = query('SELECT * FROM users WHERE name = ?', user)
    	session['user']['dbid'] = dbuser[0]['id']
    	return redirect(request.values.get('ref', url_for('index')))
    
    @app.route('/logout', methods=['GET', 'POST'])
    def logout():
    	session.pop('user')
    	return redirect(request.values.get('ref', url_for('index')))
    
    @app.route('/edit', methods=['GET', 'POST'])
    @mod_required
    def edit():
    	tabs = {
    		'courses': ('courses_data', 'id', ['visible', 'listed', 'title', 'short',
    				'handle', 'organizer', 'subject', 'semester', 'downloadable',
    				'internal', 'responsible','deleted']),
    		'lectures': ('lectures_data', 'id', ['visible', 'title', 'comment',
    				'internal', 'speaker', 'place', 'time', 'duration', 'jumplist','deleted']),
    		'site_texts': ('site_texts', 'key', ['value']),
    		'videos': ('videos_data', 'id', ['visible','deleted'])
    	}
    	query('BEGIN')
    	if request.is_json:
    		changes = request.get_json().items()
    	else:
    		changes = request.args.items()
    	for key, val in changes:
    		table, id, column = key.split('.', 2)
    		assert table in tabs
    		assert column in tabs[table][2]
    		query('INSERT INTO changelog ("table",id_value,id_key,field,value_new,value_old,"when",who,executed) VALUES (?,?,?,?,?,(SELECT %s FROM %s WHERE %s = ?),?,?,1)'%(column,tabs[table][0],tabs[table][1]),table,id,tabs[table][1],column,val,id,datetime.now(),session['user']['givenName'])
    		query('UPDATE %s SET %s = ? WHERE %s = ?'%(tabs[table][0], column,tabs[table][1]), val, id)
    	query('COMMIT')
    	return "OK", 200
    
    
    @app.route('/auth')
    def auth(): # For use with nginx auth_request
    	if 'X-Original-Uri' not in request.headers:
    		return 'Internal Server Error', 500
    	url = request.headers['X-Original-Uri'].lstrip(config['VIDEOPREFIX'])
    	ip = request.headers.get('X-Real-IP', '')
    	if url.endswith('jpg'):
    		return "OK", 200
    	videos = query('''SELECT videos.path, videos.id, lectures.id AS lecture_id, courses.id AS course_id, auth.*
          FROM videos
          JOIN lectures ON (videos.lecture_id = lectures.id)
          JOIN courses ON (lectures.course_id = courses.id)
    			LEFT JOIN auth ON (videos.id = auth.video_id OR lectures.id = auth.lecture_id OR courses.id = auth.course_id)
          WHERE videos.path = ?
          AND (? OR (courses.visible AND lectures.visible AND videos.visible))
    			ORDER BY auth.video_id DESC, auth.lecture_id DESC, auth.course_id DESC''',
    			url, ismod())
    	if not videos:
    		return "Not allowed", 403
    	allowed = False
    	types = []
    	auth = request.authorization
    	for video in videos:
    		if videos[0] and ((videos[0]['video_id'] and not video['video_id']) \
    				or (videos[0]['lecture_id'] and not video['lecture_id'])):
    			break
    		types.append(video['auth_type'])
    		if video['auth_type'] == 'public':
    			allowed = True
    			break
    		elif video['auth_type'] == 'password':
    			if auth and video['auth_user'] == auth.username and video['auth_passwd'] == auth.password:
    				allowed = True
    				break
    	if not types[0] or allowed or ismod() or \
    			(auth and check_mod(*ldapauth(auth.username, auth.password))):
    		return 'OK', 200
    		query('INSERT INTO log VALUES (?, "", ?, "video", ?, ?)', ip, datetime.now(), videos[0]['id'], url)
    	elif 'password' in types:
    		return Response("Login required", 401, {'WWW-Authenticate': 'Basic realm="Login Required"'})
    	return "Not allowed", 403
    
    @app.route('/schedule')
    @register_navbar('Drehplan', 'calendar')
    @mod_required
    def schedule():
    	if 'kw' not in request.args:
    		kw=0
    	else:
    		kw=int(request.args['kw'])
    	start = date.today() - timedelta(days=date.today().weekday() -7*kw)
    	days = [{'date': start, 'lectures': [], 'atonce':0, 'index': 0 }]
    	earlieststart=time(23,59)
    	latestend=time(0,0)
    	for i in range(1,7):
    		days.append({'date': days[i-1]['date'] + timedelta(days=1), 'atonce':0, 'index': i, 'lectures':[] })
    	for i in days:
    		# date and times are burning in sqlite
    		s = datetime.combine(i['date'],time())
    		e = datetime.combine(i['date'],time(23,59))
    		i['lectures'] = query ('''
    					SELECT lectures.*,courses.short
    					FROM lectures 
    					JOIN courses ON (lectures.course_id = courses.id) 
    					WHERE (time < ?) AND (time > ?) 
    					ORDER BY time ASC'''
    				,e,s);
    		# sweepline to find out how many lectures overlap
    		maxcol=0;
    		curcol=0;
    		freecol=[];
    		for l in i['lectures']:
    			# who the hell inserts lectures with zero length?!?!?
    			l['time_end'] = l['time']+timedelta(minutes=max(l['duration'],1))
    		for l in sorted([(l['time'],True,l) for l in i['lectures']] + [(l['time_end'],False,l) for l in i['lectures']],key=lambda t:(t[0],t[1])):
    			if l[1]:
    				curcol += 1
    				if curcol > maxcol:
    					maxcol = curcol
    				if len(freecol) == 0:
    					freecol.append(maxcol)
    				l[2]['schedule_col'] = freecol.pop()
    				if earlieststart > l[0].time():
    					earlieststart = l[0].time()
    			else:
    				curcol -= 1
    				freecol.append(l[2]['schedule_col'])
    				if latestend < l[0].time():
    					latestend = l[0].time()
    		i['maxcol'] = max(maxcol,1)
    	times=[]
    	s = min(earlieststart,time(8,0))
    	e = max(latestend,time(19,0))
    	for i in range(s.hour*4,min(int((60*e.hour/15)/4)*4+5,24*4)):
    		t = i*15
    		times.append(time(int(t/60),t%60))
    	return render_template('schedule.html',days=days,times=times,kw=kw)
    
    @app.route('/stats')
    @register_navbar('Statistiken', 'stats')
    @mod_required
    def stats():
    	return render_template('stats.html')
    
    @app.route('/log')
    @register_navbar('Changelog', 'book')
    @mod_required
    def log():
    	changelog = query('SELECT *, ( "table" || "." || id_value || "." ||field) as path FROM changelog LEFT JOIN users ON (changelog.who = users.id) ORDER BY "when" DESC LIMIT 50')
    	return render_template('log.html', changelog=changelog)
    
    @app.route('/import/<source>/<int:id>', methods=['GET', 'POST'])
    @mod_required
    def import_from(source=None, id=None):
    
    	if source != "campus":
    		return "Unknown source", 404
    
    	courses = query('SELECT * FROM courses WHERE id = ?', id)[0]
    	lectures = query('SELECT * FROM lectures WHERE course_id = ?', courses['id'])
    	
    	campus={}
    	for i in request.values:
    		group, importid, field = i.split('.', 2)
    		if group == 'campus':
    			if not importid in  campus:
    				campus[importid] = {}
    			campus[importid][field] = request.values[i]
    	for i in campus:
    		if i.startswith('new'):
    			if campus[i]['url'] != '':
    				query('INSERT INTO import_campus (url, type, course_id, last_checked, changed) VALUES (?, ?, ?, ?, 1)',campus[i]['url'],campus[i]['type'],id,datetime.now())
    		else:
    			if campus[i]['url'] != '':
    				query('UPDATE import_campus SET url = ?, `type` = ? WHERE (course_id = ?) AND (id = ?)', campus[i]['url'],campus[i]['type'],id,int(i))	
    			else:
    				query('DELETE FROM import_campus WHERE (id = ?) AND (course_id = ?)',int(i),id)
    	
    	import_campus = query('SELECT * FROM import_campus WHERE course_id = ?',id)
    	events = []
    
    	try:
    		from lxml import html
    		from lxml import etree
    		import urllib.request
    		# if u have to port this to anything new, god be with you.
    		for i in import_campus:
    			remote_html = urllib.request.urlopen(i['url']).read()
    			tablexpath = "//td[text()='Termine und Ort']/following::table[1]"
    			basetable = html.fromstring(remote_html).xpath(tablexpath)[0]
    			parsebase = html.tostring(basetable);
    
    			#parse recurring events
    			toparse = [i['url']]
    			for j in basetable.xpath("//table[@cellpadding='5']//tr[@class='hierarchy4' and td[@name='togglePeriodApp']]"):
    				url = str(j.xpath("td[@name='togglePeriodApp']/a/@href")[0])
    				toparse.append(url)
    			events_raw = []
    			for j in toparse:
    				if j.startswith('event'):
    					url = 'https://www.campus.rwth-aachen.de/rwth/all/'+j
    				else:
    					url = j
    				text = urllib.request.urlopen(url).read()
    				dom = html.fromstring(text).xpath(tablexpath)[0]
    				#we get the "heading" row, from it extract the room and time. best way to get it is to match on the picture -.-
    				baserow = dom.xpath("//table[@cellpadding='5']//tr[@class='hierarchy4' and td[@name='togglePeriodApp']/*/img[@src='../../server/img/minus.gif']]")
    				if not baserow:
    					continue
    				baserow = baserow[0]
    				rowdata = {'dates': []}
    				rowdata['place'] = baserow.xpath("td[6]/text()")[0][2:-1]
    				rowdata['start'] = baserow.xpath("td[3]/text()")[0]
    				rowdata['end'] = baserow.xpath("td[5]/text()")[0]
    				rowdata['dates'] = baserow.getparent().xpath("tr[@class='hierarchy5']//td[@colspan='3']/text()")
    				events_raw.append(rowdata)
    
    			# parse single appointments
    			singletable = basetable.xpath("//table[@cellpadding='3']/tr/td[text()='Einmalige Termine:']")[0].getparent().getparent()
    			for row in singletable.xpath("tr/td[2]"):
    				rowdata = {}
    				rowdata['place'] = row.xpath("text()[2]")[0][2:-1]
    				rowdata['dates'] = [row.xpath("text()[1]")[0][4:14]]
    				rowdata['start'] = row.xpath("text()[1]")[0][17:22]
    				rowdata['end'] = row.xpath("text()[1]")[0][27:32]
    				events_raw.append(rowdata)
    
    			#now we have to filter our data and do some lookups
    			for j in events_raw:
    				for k in j['dates']:
    					e = {}
    					fmt= "%d.%m.%Y %H:%M"
    					e['time'] = datetime.strptime("%s %s"%(k,j['start']) ,fmt)
    					e['duration'] = int((datetime.strptime("%s %s"%(k,j['end']) ,fmt) - e['time']).seconds/60)
    					e['place'] = query("SELECT name FROM places WHERE (campus_name = ?) OR ((NOT campus_name) AND name = ?)",j['place'],j['place'])[0]['name'];
    					e['exists'] = len(query("SELECT id from lectures WHERE (time = ?) and (duration = ?) and (place = ?) and (course_id = ?)",e['time'],e['duration'],e['place'],id)) > 0
    					events.append(e)
    			# it is pared.
    
    
    
    	except ImportError:
    		flash('python-lxml not found, campus import will not work.')
    
    	uniqevents = []
    	for i in events:
    		seen = False
    		for j in events:
    			seen = (i['place'] == j['place']) and (i['time'] == j['time']) and (i['duration'] == j['duration'])
    		if not seen:
    			uniqevents.append(i)
    
    	return render_template('import_campus.html', course=courses, lectures=lectures, import_campus=import_campus, events=uniqevents)