Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
server.py 8.05 KiB
#!/bin/python
from flask import *
from functools import wraps
import sqlite3
import os
import re
app = Flask(__name__)
config = app.config
config['DB_SCHEMA'] = 'db_schema.sql'
config['DB_DATA'] = 'db_example.sql'
config['DB_ENGINE'] = 'sqlite'
config['SQLITE_DB'] = 'db.sqlite'
config['SQLITE_INIT_SCHEMA'] = True
config['SQLITE_INIT_DATA'] = False
config['DEBUG'] = False
if __name__ == '__main__':
config['SQLITE_INIT_DATA'] = True
config['DEBUG'] = True
config.from_pyfile('config.py', silent=True)
if config['DB_ENGINE'] == 'sqlite':
created = not os.path.exists(config['SQLITE_DB'])
db = sqlite3.connect(config['SQLITE_DB'])
cur = db.cursor()
if config['SQLITE_INIT_SCHEMA']:
cur.executescript(open(config['DB_SCHEMA']).read())
if config['SQLITE_INIT_DATA'] and created:
cur.executescript(open(config['DB_DATA']).read())
db.commit()
db.close()
# Row wrapper for sqlite
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
if type(row[idx]) == str:
d[col[0].split('.')[-1]] = row[idx].replace('\\n','\n').replace('\\r','\r')
else:
d[col[0].split('.')[-1]] = row[idx]
return d
def query(operation, *params):
if config['DB_ENGINE'] == 'mysql':
import mysql.connector
if 'db' not in g or not g.db.is_connected():
g.db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=config['MYSQL_HOST'], database=config['MYSQL_DB'])
if not hasattr(request, 'db'):
request.db = g.db.cursor(dictionary=True)
request.db.execute(operation.replace('?', '%s'), params)
elif config['DB_ENGINE'] == 'sqlite':
if 'db' not in g:
g.db = sqlite3.connect(config['SQLITE_DB'])
g.db.row_factory = dict_factory
if not hasattr(request, 'db'):
request.db = g.db.cursor()
request.db.execute(operation, params)
else:
return []
return request.db.fetchall()
def searchquery(text, columns, match, tables, suffix, *suffixparams):
params = []
subexprs = []
words = text.split(' ')
prio = len(words)+1
for word in words:
if word == '' or word.isspace():
continue
matchexpr = ' OR '.join(['%s LIKE ?'%column for column in match])
subexprs.append('SELECT %s, %s AS _prio FROM %s WHERE %s'%(columns, str(prio), tables, matchexpr))
params += ['%'+word+'%']*len(match)
prio -= 1
if subexprs == []:
return []
expr = 'SELECT *,SUM(_prio) AS _score FROM (%s) AS _tmp %s'%(' UNION '.join(subexprs), suffix)
return query(expr, *params, *suffixparams)
LDAP_USERRE = re.compile(r'[^a-z0-9]')
notldap = {
'videoag':('videoag', ['users','videoag'], {'uid': 'videoag', 'givenName': 'Video', 'sn': 'Geier'}),
'gustav':('passwort', ['users'], {'uid': 'gustav', 'givenName': 'Gustav', 'sn': 'Geier'})
}
def ldapauth(user, password):
user = LDAP_USERRE.sub(r'', user.lower())
if 'LDAP_HOST' in config:
import ldap3
try:
conn = ldap3.Connection(config['LDAP_HOST'], 'uid=%s,ou=users,dc=fsmpi,dc=rwth-aachen,dc=de'%user, password, auto_bind=True)
if conn.search("ou=groups,dc=fsmpi,dc=rwth-aachen,dc=de", "(&(cn=*)(memberUid=%s))"%user, attributes=['cn']):
groups = [e.cn.value for e in conn.entries]
conn.unbind()
return user, groups
except ldap3.core.exceptions.LDAPBindError:
pass
elif config.get('DEBUG') and user in notldap and password == notldap[user][0]:
return user, notldap[user][1]
return None, []
def ldapget(user):
user = LDAP_USERRE.sub(r'', user.lower())
if 'LDAP_HOST' in config:
import ldap3
conn = ldap3.Connection('ldaps://rumo.fsmpi.rwth-aachen.de', auto_bind=True)
conn.search("ou=users,dc=fsmpi,dc=rwth-aachen,dc=de", "(uid=%s)"%user,
attributes=ldap3.ALL_ATTRIBUTES)
e = conn.entries[0]
return {'uid': user, 'givenName': e.givenName.value, 'sn':e.sn.value}
else:
return notldap[user][2]
def login_required(func):
@wraps(func)
def decorator(*args, **kwargs):
if not 'user' in session:
flash('Diese Funktion ist nur für Moderatoren verfügbar!')
return redirect(url_for('login', ref=request.url))
else:
return func(*args, **kwargs)
return decorator
@app.route('/')
def index():
return render_template('index.html', latestvideos=query('''
SELECT lectures.*, max(videos.time_updated) AS lastvidtime, courses.short, courses.downloadable, courses.title AS coursetitle
FROM lectures
LEFT JOIN videos ON (videos.lecture_id = lectures.id)
LEFT JOIN courses on (courses.id = lectures.course_id)
WHERE (? OR (courses.visible AND courses.listed AND lectures.visible AND videos.visible))
GROUP BY videos.lecture_id
ORDER BY lastvidtime DESC
LIMIT 6
''', False))
@app.route('/videos')
def videos():
c=query("SELECT * FROM courses")
for i in c:
if i['semester'] == '':
i['semester'] = 'zeitlos'
groupedby = request.args.get('groupedby')
if groupedby not in ['title','semester','organizer']:
groupedby = 'semester'
return render_template('videos.html', courses=c, groupedby=groupedby)
@app.route('/faq')
def faq():
return render_template('faq.html')
@app.route('/play')
def play():
if 'lectureid' in request.args:
id = request.args['lectureid']
return render_template('play.html',
lecture=query('SELECT * FROM lectures WHERE id = ?', id)[0],
videos=query('SELECT * FROM videos WHERE lecture_id = ?', id))
else:
return redirect(url_for('index'))
@app.route('/search')
def search():
if 'q' not in request.args:
return redirect(url_for('index'))
q = request.args['q']
courses = searchquery(q, '*', ['title', 'short', 'organizer', 'subject', 'description'],
'courses', 'WHERE (? OR (visible AND listed)) GROUP BY id ORDER BY _score DESC, semester DESC LIMIT 20', False)
lectures = searchquery(q, 'lectures.*, courses.visible AS coursevisible, courses.listed, courses.short, courses.downloadable, courses.title AS coursetitle',
['lectures.title', 'lectures.comment', 'lectures.speaker', 'courses.short'],
'lectures LEFT JOIN courses on (courses.id = lectures.course_id)',
'WHERE (? OR (coursevisible AND listed AND visible)) GROUP BY id ORDER BY _score DESC, time DESC LIMIT 30', False)
return render_template('search.html', searchtext=request.args['q'], courses=courses, lectures=lectures)
@app.route('/course')
def course():
if 'courseid' in request.args:
id = request.args['courseid']
course = query('SELECT * FROM courses WHERE handle = ?', id)[0]
return render_template('course.html',
course=course,
lectures=query('SELECT * FROM lectures WHERE course_id = ?', course['id']),
videos=query('SELECT *, formats.description AS format_description FROM videos JOIN lectures ON (videos.lecture_id = lectures.id) JOIN formats ON (videos.video_format = formats.id) WHERE lectures.course_id= ? ORDER BY formats.prio DESC', course['id']))
else:
return redirect(url_for('index'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
return render_template('login.html')
user, groups = ldapauth(request.form.get('user'), request.form.get('password'))
if user and 'users' in groups:
session['user'] = ldapget(user)
else:
flash('Login fehlgeschlagen!')
if 'ref' in request.values:
return redirect(request.values['ref'])
else:
return redirect(url_for('index'))
@app.route('/logout')
def logout():
session.pop('user')
if 'ref' in request.values:
return redirect(request.values['ref'])
else:
return redirect(url_for('index'))
@app.route('/edit')
@login_required
def edit():
tabs = {
'courses': ('courses_data', 'id', ['visible', 'listed', 'title', 'short',
'handle', 'organizer', 'subject', 'credits', 'semester', 'downloadable',
'internal', 'responsible']),
'lectures': ('lectures_data', 'id', ['visible', 'title', 'comment',
'internal', 'speaker', 'place', 'time', 'duration', 'jumplist',
'titlefile']),
'site_texts': ('site_texts', 'key' ['value']),
'videos': ('videos_data', 'id', ['visible', 'downloadable', 'title',
'comment', 'internal'])
}
query('BEGIN TRANSACTION')
for key, val in request.get_json():
table, column, id = key.split('.', 2)
assert table in tabs
assert column in tabs[table][2]
query('UPDATE %s SET %s = ? WHERE %s = ?'%(tabs[table][0], column,
tabs[table][1]), val, id)
query('COMMIT TRANSACTION')
if __name__ == '__main__':
app.run()