Select Git revision
Forked from
Video AG Infrastruktur / website
Source project has a limited visibility.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
server.py 10.17 KiB
#!/bin/python
from flask import *
from functools import wraps
import datetime
import sqlite3
import os
import re
app = Flask(__name__)
config = app.config
config['DB_SCHEMA'] = 'db_schema.sql'
config['DB_DATA'] = 'db_example.sql'
config['DB_ENGINE'] = 'sqlite'
config['SQLITE_DB'] = 'db.sqlite'
config['SQLITE_INIT_SCHEMA'] = True
config['SQLITE_INIT_DATA'] = False
config['DEBUG'] = False
config['VIDEOPREFIX'] = 'https://videoag.fsmpi.rwth-aachen.de'
if __name__ == '__main__':
config['SQLITE_INIT_DATA'] = True
config['DEBUG'] = True
config.from_pyfile('config.py', silent=True)
app.jinja_env.globals['videoprefix'] = config['VIDEOPREFIX']
if config['DB_ENGINE'] == 'sqlite':
created = not os.path.exists(config['SQLITE_DB'])
db = sqlite3.connect(config['SQLITE_DB'])
cur = db.cursor()
if config['SQLITE_INIT_SCHEMA']:
cur.executescript(open(config['DB_SCHEMA']).read())
if config['SQLITE_INIT_DATA'] and created:
cur.executescript(open(config['DB_DATA']).read())
db.commit()
db.close()
# Row wrapper for sqlite
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
if type(row[idx]) == str:
d[col[0].split('.')[-1]] = row[idx].replace('\\n','\n').replace('\\r','\r')
else:
d[col[0].split('.')[-1]] = row[idx]
return d
def query(operation, *params):
if config['DB_ENGINE'] == 'mysql':
import mysql.connector
if 'db' not in g or not g.db.is_connected():
g.db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=config['MYSQL_HOST'], database=config['MYSQL_DB'])
if not hasattr(request, 'db'):
request.db = g.db.cursor(dictionary=True)
request.db.execute(operation.replace('?', '%s'), params)
elif config['DB_ENGINE'] == 'sqlite':
if 'db' not in g:
g.db = sqlite3.connect(config['SQLITE_DB'])
g.db.row_factory = dict_factory
g.db.isolation_level = None
if not hasattr(request, 'db'):
request.db = g.db.cursor()
request.db.execute(operation, params)
else:
return []
return request.db.fetchall()
@app.teardown_request
def commit_db(*args):
if hasattr(request, 'db'):
request.db.close()
g.db.commit()
def searchquery(text, columns, match, tables, suffix, *suffixparams):
params = []
subexprs = []
words = text.split(' ')
prio = len(words)+1
for word in words:
if word == '' or word.isspace():
continue
matchexpr = ' OR '.join(['%s LIKE ?'%column for column in match])
subexprs.append('SELECT %s, %s AS _prio FROM %s WHERE %s'%(columns, str(prio), tables, matchexpr))
params += ['%'+word+'%']*len(match)
prio -= 1
if subexprs == []:
return []
expr = 'SELECT *,SUM(_prio) AS _score FROM (%s) AS _tmp %s'%(' UNION '.join(subexprs), suffix)
return query(expr, *params, *suffixparams)
LDAP_USERRE = re.compile(r'[^a-z0-9]')
notldap = {
'videoag':('videoag', ['users','videoag'], {'uid': 'videoag', 'givenName': 'Video', 'sn': 'Geier'}),
'gustav':('passwort', ['users'], {'uid': 'gustav', 'givenName': 'Gustav', 'sn': 'Geier'})
}
def ldapauth(user, password):
user = LDAP_USERRE.sub(r'', user.lower())
if 'LDAP_HOST' in config:
import ldap3
try:
conn = ldap3.Connection(config['LDAP_HOST'], 'uid=%s,ou=users,dc=fsmpi,dc=rwth-aachen,dc=de'%user, password, auto_bind=True)
if conn.search("ou=groups,dc=fsmpi,dc=rwth-aachen,dc=de", "(&(cn=*)(memberUid=%s))"%user, attributes=['cn']):
groups = [e.cn.value for e in conn.entries]
conn.unbind()
return user, groups
except ldap3.core.exceptions.LDAPBindError:
pass
elif config.get('DEBUG') and user in notldap and password == notldap[user][0]:
return user, notldap[user][1]
return None, []
def ldapget(user):
user = LDAP_USERRE.sub(r'', user.lower())
if 'LDAP_HOST' in config:
import ldap3
conn = ldap3.Connection('ldaps://rumo.fsmpi.rwth-aachen.de', auto_bind=True)
conn.search("ou=users,dc=fsmpi,dc=rwth-aachen,dc=de", "(uid=%s)"%user,
attributes=ldap3.ALL_ATTRIBUTES)
e = conn.entries[0]
return {'uid': user, 'givenName': e.givenName.value, 'sn':e.sn.value}
else:
return notldap[user][2]
def ismod(*args):
return ('user' in session)
app.jinja_env.globals['ismod'] = ismod
def mod_required(func):
@wraps(func)
def decorator(*args, **kwargs):
if not ismod():
flash('Diese Funktion ist nur für Moderatoren verfügbar!')
return redirect(url_for('login', ref=request.url))
else:
return func(*args, **kwargs)
return decorator
@app.route('/')
def index():
return render_template('index.html', latestvideos=query('''
SELECT lectures.*, max(videos.time_updated) AS lastvidtime, courses.short, courses.downloadable, courses.title AS coursetitle
FROM lectures
LEFT JOIN videos ON (videos.lecture_id = lectures.id)
LEFT JOIN courses on (courses.id = lectures.course_id)
WHERE (? OR (courses.visible AND courses.listed AND lectures.visible AND videos.visible))
GROUP BY videos.lecture_id
ORDER BY lastvidtime DESC
LIMIT 6
''', ismod()))
@app.route('/videos')
def videos():
courses = query('SELECT * FROM courses WHERE (? OR (visible AND listed))', ismod())
for course in courses:
if course['semester'] == '':
course['semester'] = 'zeitlos'
groupedby = request.args.get('groupedby')
if groupedby not in ['title','semester','organizer']:
groupedby = 'semester'
return render_template('videos.html', courses=courses, groupedby=groupedby)
@app.route('/faq')
def faq():
return render_template('faq.html')
@app.route('/play')
def play():
if not 'lectureid' in request.args:
return redirect(url_for('videos'))
id = request.args.get('lectureid')
lectures = query('SELECT * FROM lectures WHERE id = ? AND (? OR visible)', id, ismod())
videos = query('SELECT * FROM videos WHERE lecture_id = ? AND (? OR visible)', id, ismod())
if not lectures:
flash('Diese Vorlesung existiert nicht!')
return app.view_functions['videos'](), 404
if not videos:
flash('Zu dieser Vorlesung wurden noch keine Videos veröffentlicht!')
courses = query('SELECT * FROM courses WHERE id = ? AND (? OR (visible AND listed))', lectures[0]['course_id'], ismod())
if not courses:
flash('Diese Veranstaltung existiert nicht!')
return app.view_functions['videos'](), 404
return render_template('play.html', course=courses[0], lecture=lectures[0], videos=videos)
@app.route('/search')
def search():
if 'q' not in request.args:
return redirect(url_for('index'))
q = request.args['q']
courses = searchquery(q, '*', ['title', 'short', 'organizer', 'subject', 'description'],
'courses', 'WHERE (? OR (visible AND listed)) GROUP BY id ORDER BY _score DESC, semester DESC LIMIT 20', ismod())
lectures = searchquery(q, 'lectures.*, courses.visible AS coursevisible, courses.listed, courses.short, courses.downloadable, courses.title AS coursetitle',
['lectures.title', 'lectures.comment', 'lectures.speaker', 'courses.short'],
'lectures LEFT JOIN courses on (courses.id = lectures.course_id)',
'WHERE (? OR (coursevisible AND listed AND visible)) GROUP BY id ORDER BY _score DESC, time DESC LIMIT 30', ismod())
return render_template('search.html', searchtext=request.args['q'], courses=courses, lectures=lectures)
@app.route('/course')
def course():
if not 'courseid' in request.args:
return redirect(url_for('videos'))
id = request.args['courseid']
courses = query('SELECT * FROM courses WHERE handle = ? AND (? OR visible)', id, ismod())
if not courses:
flash('Diese Veranstaltung existiert nicht!')
return app.view_functions['videos'](), 404
lectures = query('SELECT * FROM lectures WHERE course_id = ? AND (? OR visible)', courses[0]['id'], ismod())
videos = query('''
SELECT *, formats.description AS format_description
FROM videos
JOIN lectures ON (videos.lecture_id = lectures.id)
JOIN formats ON (videos.video_format = formats.id)
WHERE lectures.course_id= ?
ORDER BY formats.prio DESC
''', courses[0]['id'])
return render_template('course.html', course=courses[0], lectures=lectures, videos=videos)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
return render_template('login.html')
user, groups = ldapauth(request.form.get('user'), request.form.get('password'))
if user and 'users' in groups:
session['user'] = ldapget(user)
else:
flash('Login fehlgeschlagen!')
if 'ref' in request.values:
return redirect(request.values['ref'])
else:
return redirect(url_for('index'))
@app.route('/logout')
def logout():
session.pop('user')
if 'ref' in request.values:
return redirect(request.values['ref'])
else:
return redirect(url_for('index'))
@app.route('/edit', methods=['GET', 'POST'])
@mod_required
def edit():
tabs = {
'courses': ('courses_data', 'id', ['visible', 'listed', 'title', 'short',
'handle', 'organizer', 'subject', 'credits', 'semester', 'downloadable',
'internal', 'responsible', 'description']),
'lectures': ('lectures_data', 'id', ['visible', 'title', 'comment',
'internal', 'speaker', 'place', 'time', 'duration', 'jumplist',
'titlefile']),
'site_texts': ('site_texts', 'key', ['value']),
'videos': ('videos_data', 'id', ['visible', 'downloadable', 'title',
'comment', 'internal'])
}
query('BEGIN TRANSACTION')
if request.is_json:
changes = request.get_json().items()
else:
changes = request.args.items()
for key, val in changes:
table, id, column = key.split('.', 2)
assert table in tabs
assert column in tabs[table][2]
query('UPDATE %s SET %s = ? WHERE %s = ?'%(tabs[table][0], column,
tabs[table][1]), val, id)
query('COMMIT TRANSACTION')
return "OK", 200
@app.route('/auth')
def auth(): # For use with nginx auth_request
if 'X-Original-Uri' not in request.headers:
return 'Internal Server Error', 500
url = request.headers['X-Original-Uri'].lstrip(config['VIDEOPREFIX'])
ip = request.headers.get('X-Real-IP', '')
videos = query('''SELECT videos.path, videos.id
FROM videos
JOIN lectures ON (videos.lecture_id = lectures.id)
JOIN courses ON (lectures.course_id = courses.id)
WHERE videos.path = ?
AND (? OR (courses.visible AND lectures.visible AND videos.visible))''',
url, ismod())
if videos and (url.startswith('pub') or ismod()):
query('INSERT INTO log VALUES (?, "", ?, "video", ?, ?)', ip, datetime.datetime.now(), videos[0]['id'], url)
return "OK", 200
elif url.endswith('jpg'):
return "OK", 200
else:
return "Not allowed", 403
if __name__ == '__main__':
app.run(threaded=True)