Select Git revision
Forked from
Video AG Infrastruktur / website
Source project has a limited visibility.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
server.py 4.15 KiB
#!/bin/python
from flask import Flask, render_template, g, request, url_for, redirect
import mysql.connector
import sqlite3
import os
app = Flask(__name__)
config = app.config
config['DB_SCHEMA'] = 'db_schema.sql'
config['DB_DATA'] = 'db_example.sql'
config['DB_ENGINE'] = 'sqlite'
config['SQLITE_DB'] = 'db.sqlite'
config['SQLITE_INIT_SCHEMA'] = True
config['SQLITE_INIT_DATA'] = False
config['DEBUG'] = False
if __name__ == '__main__':
config['SQLITE_INIT_DATA'] = True
config['DEBUG'] = True
config.from_pyfile('config.py', silent=True)
if config['DB_ENGINE'] == 'sqlite':
created = not os.path.exists(config['SQLITE_DB'])
db = sqlite3.connect(config['SQLITE_DB'])
cur = db.cursor()
if config['SQLITE_INIT_SCHEMA']:
cur.executescript(open(config['DB_SCHEMA']).read())
if config['SQLITE_INIT_DATA'] and created:
cur.executescript(open(config['DB_DATA']).read())
db.commit()
db.close()
# Row wrapper for sqlite
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
if type(row[idx]) == str:
d[col[0].split('.')[-1]] = row[idx].replace('\\n','\n')
else:
d[col[0].split('.')[-1]] = row[idx]
return d
def query(operation, *params):
if config['DB_ENGINE'] == 'mysql':
if 'db' not in g or not g.db.is_connected():
g.db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=config['MYSQL_HOST'], database=config['MYSQL_DB'])
cur = g.db.cursor(dictionary=True)
cur.execute(operation.replace('?', '%s'), params)
return cur.fetchall()
elif config['DB_ENGINE'] == 'sqlite':
if 'db' not in g:
g.db = sqlite3.connect(config['SQLITE_DB'])
g.db.row_factory = dict_factory
cur = g.db.cursor()
cur.execute(operation, params)
return cur.fetchall()
def searchquery(text, columns, match, tables, suffix, *suffixparams):
params = []
subexprs = []
words = text.split(' ')
prio = len(words)+1
for word in words:
if word == '' or word.isspace():
continue
matchexpr = ' OR '.join(['%s LIKE ?'%column for column in match])
subexprs.append('SELECT %s, %s AS _prio FROM %s WHERE %s'%(columns, str(prio), tables, matchexpr))
params += ['%'+word+'%']*len(match)
prio -= 1
if subexprs == []:
return []
expr = 'SELECT *,SUM(_prio) AS _score FROM (%s) AS _tmp %s'%(' UNION '.join(subexprs), suffix)
return query(expr, *params, *suffixparams)
@app.route('/')
def index():
return render_template('index.html', latestvideos=query('''
SELECT lectures.*, max(videos.time_updated) AS lastvidtime, courses.short, courses.downloadable, courses.title AS coursetitle
FROM lectures
LEFT JOIN videos ON (videos.lecture_id = lectures.id)
LEFT JOIN courses on (courses.id = lectures.course_id)
WHERE (? OR (courses.visible AND courses.listed AND lectures.visible AND videos.visible))
GROUP BY videos.lecture_id
ORDER BY lastvidtime DESC
LIMIT 6
''', False))
@app.route('/videos')
def videos():
return render_template('videos.html')
@app.route('/faq')
def faq():
return render_template('faq.html')
@app.route('/play')
def play():
if 'lectureid' in request.args:
id = request.args['lectureid']
return render_template('play.html',
lecture=query('SELECT * FROM lectures WHERE id = ?', id)[0],
videos=query('SELECT * FROM videos WHERE lecture_id = ?', id))
else:
return redirect(url_for('index'))
@app.route('/search')
def search():
if 'q' not in request.args:
return redirect(url_for('index'))
q = request.args['q']
courses = searchquery(q, '*', ['title', 'short', 'organizer', 'subject', 'description'],
'courses', 'WHERE (? OR (visible AND listed)) GROUP BY id ORDER BY _score DESC, semester DESC LIMIT 20', False)
lectures = searchquery(q, 'lectures.*, courses.visible AS coursevisible, courses.listed, courses.short, courses.downloadable, courses.title AS coursetitle',
['lectures.title', 'lectures.comment', 'lectures.speaker', 'courses.short'],
'lectures LEFT JOIN courses on (courses.id = lectures.course_id)',
'WHERE (? OR (coursevisible AND listed AND visible)) GROUP BY id ORDER BY _score DESC, time DESC LIMIT 30', False)
return render_template('search.html', searchtext=request.args['q'], courses=courses, lectures=lectures)
if __name__ == '__main__':
app.run()