Skip to content
Snippets Groups Projects
Select Git revision
  • 022ac8344d144545507a7ce493df7767f4248449
  • master default protected
  • forbid-save-as
  • upload-via-token
  • moodle-integration
  • patch-double-tap-seek
  • patch_datum_anzeigen
  • patch_raum_anzeigen
  • intros
  • live_sources
  • bootstrap4
  • modules
12 results

server.py

Blame
  • Forked from Video AG Infrastruktur / website
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    server.py 4.15 KiB
    #!/bin/python
    from flask import Flask, render_template, g, request, url_for, redirect
    import mysql.connector
    import sqlite3
    import os
    
    app = Flask(__name__)
    config = app.config
    config['DB_SCHEMA'] = 'db_schema.sql'
    config['DB_DATA'] = 'db_example.sql'
    config['DB_ENGINE'] = 'sqlite'
    config['SQLITE_DB'] = 'db.sqlite'
    config['SQLITE_INIT_SCHEMA'] = True
    config['SQLITE_INIT_DATA'] = False
    config['DEBUG'] = False
    if __name__ == '__main__':
    	config['SQLITE_INIT_DATA'] = True
    	config['DEBUG'] = True
    config.from_pyfile('config.py', silent=True)
    
    if config['DB_ENGINE'] == 'sqlite':
    	created = not os.path.exists(config['SQLITE_DB'])
    	db = sqlite3.connect(config['SQLITE_DB'])
    	cur = db.cursor()
    	if config['SQLITE_INIT_SCHEMA']:
    		cur.executescript(open(config['DB_SCHEMA']).read())
    	if config['SQLITE_INIT_DATA'] and created:
    		cur.executescript(open(config['DB_DATA']).read())
    	db.commit()
    	db.close()
    
    # Row wrapper for sqlite
    def dict_factory(cursor, row):
    	d = {}
    	for idx, col in enumerate(cursor.description):
    		if type(row[idx]) == str:
    			d[col[0].split('.')[-1]] = row[idx].replace('\\n','\n')
    		else:
    			d[col[0].split('.')[-1]] = row[idx]
    	return d
    
    def query(operation, *params):
    	if config['DB_ENGINE'] == 'mysql':
    		if 'db' not in g or not g.db.is_connected():
    			g.db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=config['MYSQL_HOST'], database=config['MYSQL_DB'])
    		cur = g.db.cursor(dictionary=True)
    		cur.execute(operation.replace('?', '%s'), params)
    		return cur.fetchall()
    	elif config['DB_ENGINE'] == 'sqlite':
    		if 'db' not in g:
    			g.db = sqlite3.connect(config['SQLITE_DB'])
    			g.db.row_factory = dict_factory
    		cur = g.db.cursor()
    		cur.execute(operation, params)
    		return cur.fetchall()
    
    def searchquery(text, columns, match, tables, suffix, *suffixparams):
    	params = []
    	subexprs = []
    	words = text.split(' ')
    	prio = len(words)+1
    	for word in words:
    		if word == '' or word.isspace():
    			continue
    		matchexpr = ' OR '.join(['%s LIKE ?'%column for column in match])
    		subexprs.append('SELECT %s, %s AS _prio FROM %s WHERE %s'%(columns, str(prio), tables, matchexpr))
    		params += ['%'+word+'%']*len(match)
    		prio -= 1
    	if subexprs == []:
    		return []
    	expr = 'SELECT *,SUM(_prio) AS _score FROM (%s) AS _tmp %s'%(' UNION '.join(subexprs), suffix)
    	return query(expr, *params, *suffixparams)
    
    @app.route('/')
    def index():
    	return render_template('index.html', latestvideos=query('''
    				SELECT lectures.*, max(videos.time_updated) AS lastvidtime, courses.short, courses.downloadable, courses.title AS coursetitle
    				FROM lectures
    				LEFT JOIN videos ON (videos.lecture_id = lectures.id)
    				LEFT JOIN courses on (courses.id = lectures.course_id)
    				WHERE (? OR (courses.visible AND courses.listed AND lectures.visible AND videos.visible))
    				GROUP BY videos.lecture_id
    				ORDER BY lastvidtime DESC
    				LIMIT 6
    			''', False))
    
    @app.route('/videos')
    def videos():
    	return render_template('videos.html')
    
    @app.route('/faq')
    def faq():
    	return render_template('faq.html')
    
    @app.route('/play')
    def play():
    	if 'lectureid' in request.args:
    		id = request.args['lectureid']
    		return render_template('play.html',
    				lecture=query('SELECT * FROM lectures WHERE id = ?', id)[0],
    				videos=query('SELECT * FROM videos WHERE lecture_id = ?', id))
    	else:
    		return redirect(url_for('index'))
    
    @app.route('/search')
    def search():
    	if 'q' not in request.args:
    		return redirect(url_for('index'))
    	q = request.args['q']
    	courses = searchquery(q, '*', ['title', 'short', 'organizer', 'subject', 'description'],
    			'courses', 'WHERE (? OR (visible AND listed)) GROUP BY id ORDER BY _score DESC, semester DESC LIMIT 20', False)
    	lectures = searchquery(q, 'lectures.*, courses.visible AS coursevisible, courses.listed, courses.short, courses.downloadable, courses.title AS coursetitle',
    			['lectures.title', 'lectures.comment', 'lectures.speaker', 'courses.short'],
    			'lectures LEFT JOIN courses on (courses.id = lectures.course_id)',
    			'WHERE (? OR (coursevisible AND listed AND visible)) GROUP BY id ORDER BY _score DESC, time DESC LIMIT 30', False)
    	return render_template('search.html', searchtext=request.args['q'], courses=courses, lectures=lectures)
    
    if __name__ == '__main__':
    	app.run()