Skip to content
Snippets Groups Projects
Select Git revision
  • 24966e3ef7485e05388d65bb4a391bb53ea4376f
  • master default protected
  • forbid-save-as
  • upload-via-token
  • moodle-integration
  • patch-double-tap-seek
  • patch_datum_anzeigen
  • patch_raum_anzeigen
  • intros
  • live_sources
  • bootstrap4
  • modules
12 results

ldap.py

Blame
  • Forked from Video AG Infrastruktur / website
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    server.py 10.17 KiB
    #!/bin/python
    
    from flask import *
    from functools import wraps
    import datetime
    import sqlite3
    import os
    import re
    
    app = Flask(__name__)
    config = app.config
    config['DB_SCHEMA'] = 'db_schema.sql'
    config['DB_DATA'] = 'db_example.sql'
    config['DB_ENGINE'] = 'sqlite'
    config['SQLITE_DB'] = 'db.sqlite'
    config['SQLITE_INIT_SCHEMA'] = True
    config['SQLITE_INIT_DATA'] = False
    config['DEBUG'] = False
    config['VIDEOPREFIX'] = 'https://videoag.fsmpi.rwth-aachen.de'
    if __name__ == '__main__':
    	config['SQLITE_INIT_DATA'] = True
    	config['DEBUG'] = True
    config.from_pyfile('config.py', silent=True)
    app.jinja_env.globals['videoprefix'] = config['VIDEOPREFIX']
    
    if config['DB_ENGINE'] == 'sqlite':
    	created = not os.path.exists(config['SQLITE_DB'])
    	db = sqlite3.connect(config['SQLITE_DB'])
    	cur = db.cursor()
    	if config['SQLITE_INIT_SCHEMA']:
    		cur.executescript(open(config['DB_SCHEMA']).read())
    	if config['SQLITE_INIT_DATA'] and created:
    		cur.executescript(open(config['DB_DATA']).read())
    	db.commit()
    	db.close()
    
    # Row wrapper for sqlite
    def dict_factory(cursor, row):
    	d = {}
    	for idx, col in enumerate(cursor.description):
    		if type(row[idx]) == str:
    			d[col[0].split('.')[-1]] = row[idx].replace('\\n','\n').replace('\\r','\r')
    		else:
    			d[col[0].split('.')[-1]] = row[idx]
    	return d
    
    def query(operation, *params):
    	if config['DB_ENGINE'] == 'mysql':
    		import mysql.connector
    		if 'db' not in g or not g.db.is_connected():
    			g.db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=config['MYSQL_HOST'], database=config['MYSQL_DB'])
    		if not hasattr(request, 'db'):
    			request.db = g.db.cursor(dictionary=True)
    		request.db.execute(operation.replace('?', '%s'), params)
    	elif config['DB_ENGINE'] == 'sqlite':
    		if 'db' not in g:
    			g.db = sqlite3.connect(config['SQLITE_DB'])
    			g.db.row_factory = dict_factory
    			g.db.isolation_level = None
    		if not hasattr(request, 'db'):
    			request.db = g.db.cursor()
    		request.db.execute(operation, params)
    	else:
    		return []
    	return request.db.fetchall()
    
    @app.teardown_request
    def commit_db(*args):
    	if hasattr(request, 'db'):
    		request.db.close()
    		g.db.commit()
    
    def searchquery(text, columns, match, tables, suffix, *suffixparams):
    	params = []
    	subexprs = []
    	words = text.split(' ')
    	prio = len(words)+1
    	for word in words:
    		if word == '' or word.isspace():
    			continue
    		matchexpr = ' OR '.join(['%s LIKE ?'%column for column in match])
    		subexprs.append('SELECT %s, %s AS _prio FROM %s WHERE %s'%(columns, str(prio), tables, matchexpr))
    		params += ['%'+word+'%']*len(match)
    		prio -= 1
    	if subexprs == []:
    		return []
    	expr = 'SELECT *,SUM(_prio) AS _score FROM (%s) AS _tmp %s'%(' UNION '.join(subexprs), suffix)
    	return query(expr, *params, *suffixparams)
    
    LDAP_USERRE = re.compile(r'[^a-z0-9]')
    notldap = {
    	'videoag':('videoag', ['users','videoag'], {'uid': 'videoag', 'givenName': 'Video', 'sn': 'Geier'}),
    	'gustav':('passwort', ['users'], {'uid': 'gustav', 'givenName': 'Gustav', 'sn': 'Geier'})
    }
    
    def ldapauth(user, password):
    	user = LDAP_USERRE.sub(r'', user.lower())
    	if 'LDAP_HOST' in config:
    		import ldap3
    		try:
    			conn = ldap3.Connection(config['LDAP_HOST'], 'uid=%s,ou=users,dc=fsmpi,dc=rwth-aachen,dc=de'%user, password, auto_bind=True)
    			if conn.search("ou=groups,dc=fsmpi,dc=rwth-aachen,dc=de", "(&(cn=*)(memberUid=%s))"%user, attributes=['cn']):
    				groups = [e.cn.value for e in conn.entries]
    			conn.unbind()
    			return user, groups
    		except ldap3.core.exceptions.LDAPBindError:
    			pass
    	elif config.get('DEBUG') and user in notldap and password == notldap[user][0]:
    		return user, notldap[user][1]
    	return None, []
    
    def ldapget(user):
    	user = LDAP_USERRE.sub(r'', user.lower())
    	if 'LDAP_HOST' in config:
    		import ldap3
    		conn = ldap3.Connection('ldaps://rumo.fsmpi.rwth-aachen.de', auto_bind=True)
    		conn.search("ou=users,dc=fsmpi,dc=rwth-aachen,dc=de", "(uid=%s)"%user,
    				attributes=ldap3.ALL_ATTRIBUTES)
    		e = conn.entries[0]
    		return {'uid': user, 'givenName': e.givenName.value, 'sn':e.sn.value}
    	else:
    		return notldap[user][2]
    
    def ismod(*args):
    	return ('user' in session)
    
    app.jinja_env.globals['ismod'] = ismod
    
    def mod_required(func):
    	@wraps(func)
    	def decorator(*args, **kwargs):
    		if not ismod():
    			flash('Diese Funktion ist nur für Moderatoren verfügbar!')
    			return redirect(url_for('login', ref=request.url))
    		else:
    			return func(*args, **kwargs)
    	return decorator
    
    @app.route('/')
    def index():
    	return render_template('index.html', latestvideos=query('''
    				SELECT lectures.*, max(videos.time_updated) AS lastvidtime, courses.short, courses.downloadable, courses.title AS coursetitle
    				FROM lectures
    				LEFT JOIN videos ON (videos.lecture_id = lectures.id)
    				LEFT JOIN courses on (courses.id = lectures.course_id)
    				WHERE (? OR (courses.visible AND courses.listed AND lectures.visible AND videos.visible))
    				GROUP BY videos.lecture_id
    				ORDER BY lastvidtime DESC
    				LIMIT 6
    			''', ismod()))
    
    @app.route('/videos')
    def videos():
    	courses = query('SELECT * FROM courses WHERE (? OR (visible AND listed))', ismod())
    	for course in courses:
    		if course['semester'] == '':
    			course['semester'] = 'zeitlos'
    	groupedby = request.args.get('groupedby')
    	if groupedby not in ['title','semester','organizer']:
    		groupedby = 'semester'
    	return render_template('videos.html', courses=courses, groupedby=groupedby)
    
    @app.route('/faq')
    def faq():
    	return render_template('faq.html')
    
    @app.route('/play')
    def play():
    	if not 'lectureid' in request.args:
    		return redirect(url_for('videos'))
    	id = request.args.get('lectureid')
    	lectures = query('SELECT * FROM lectures WHERE id = ? AND (? OR visible)', id, ismod())
    	videos = query('SELECT * FROM videos WHERE lecture_id = ? AND (? OR visible)', id, ismod())
    	if not lectures:
    		flash('Diese Vorlesung existiert nicht!')
    		return app.view_functions['videos'](), 404
    	if not videos:
    		flash('Zu dieser Vorlesung wurden noch keine Videos veröffentlicht!')
    	courses = query('SELECT * FROM courses WHERE id = ? AND (? OR (visible AND listed))', lectures[0]['course_id'], ismod())
    	if not courses:
    		flash('Diese Veranstaltung existiert nicht!')
    		return app.view_functions['videos'](), 404
    	return render_template('play.html', course=courses[0], lecture=lectures[0], videos=videos)
    
    @app.route('/search')
    def search():
    	if 'q' not in request.args:
    		return redirect(url_for('index'))
    	q = request.args['q']
    	courses = searchquery(q, '*', ['title', 'short', 'organizer', 'subject', 'description'],
    			'courses', 'WHERE (? OR (visible AND listed)) GROUP BY id ORDER BY _score DESC, semester DESC LIMIT 20', ismod())
    	lectures = searchquery(q, 'lectures.*, courses.visible AS coursevisible, courses.listed, courses.short, courses.downloadable, courses.title AS coursetitle',
    			['lectures.title', 'lectures.comment', 'lectures.speaker', 'courses.short'],
    			'lectures LEFT JOIN courses on (courses.id = lectures.course_id)',
    			'WHERE (? OR (coursevisible AND listed AND visible)) GROUP BY id ORDER BY _score DESC, time DESC LIMIT 30', ismod())
    	return render_template('search.html', searchtext=request.args['q'], courses=courses, lectures=lectures)
    
    @app.route('/course')
    def course():
    	if not 'courseid' in request.args:
    		return redirect(url_for('videos'))
    	id = request.args['courseid']
    	courses = query('SELECT * FROM courses WHERE handle = ? AND (? OR visible)', id, ismod())
    	if not courses:
    		flash('Diese Veranstaltung existiert nicht!')
    		return app.view_functions['videos'](), 404
    	lectures = query('SELECT * FROM lectures WHERE course_id = ? AND (? OR visible)', courses[0]['id'], ismod())
    	videos = query('''
    			SELECT *, formats.description AS format_description
    			FROM videos
    			JOIN lectures ON (videos.lecture_id = lectures.id)
    			JOIN formats ON (videos.video_format = formats.id)
    			WHERE lectures.course_id= ?
    			ORDER BY formats.prio DESC
    			''', courses[0]['id'])
    	return render_template('course.html', course=courses[0], lectures=lectures, videos=videos)
    
    @app.route('/login', methods=['GET', 'POST'])
    def login():
    	if request.method == 'GET':
    		return render_template('login.html')
    	user, groups = ldapauth(request.form.get('user'), request.form.get('password'))
    	if user and 'users' in groups:
    		session['user'] = ldapget(user)
    	else:
    		flash('Login fehlgeschlagen!')
    	if 'ref' in request.values:
    		return redirect(request.values['ref'])
    	else:
    		return redirect(url_for('index'))
    
    @app.route('/logout')
    def logout():
    	session.pop('user')
    	if 'ref' in request.values:
    		return redirect(request.values['ref'])
    	else:
    		return redirect(url_for('index'))
    
    @app.route('/edit', methods=['GET', 'POST'])
    @mod_required
    def edit():
    	tabs = {
    		'courses': ('courses_data', 'id', ['visible', 'listed', 'title', 'short',
    				'handle', 'organizer', 'subject', 'credits', 'semester', 'downloadable',
    				'internal', 'responsible', 'description']),
    		'lectures': ('lectures_data', 'id', ['visible', 'title', 'comment',
    				'internal', 'speaker', 'place', 'time', 'duration', 'jumplist',
    				'titlefile']),
    		'site_texts': ('site_texts', 'key', ['value']),
    		'videos': ('videos_data', 'id', ['visible', 'downloadable', 'title',
    				'comment', 'internal'])
    	}
    	query('BEGIN TRANSACTION')
    	if request.is_json:
    		changes = request.get_json().items()
    	else:
    		changes = request.args.items()
    	for key, val in changes:
    		table, id, column = key.split('.', 2)
    		assert table in tabs
    		assert column in tabs[table][2]
    		query('UPDATE %s SET %s = ? WHERE %s = ?'%(tabs[table][0], column,
    					tabs[table][1]), val, id)
    	query('COMMIT TRANSACTION')
    	return "OK", 200
    
    @app.route('/auth')
    def auth(): # For use with nginx auth_request
    	if 'X-Original-Uri' not in request.headers:
    		return 'Internal Server Error', 500
    	url = request.headers['X-Original-Uri'].lstrip(config['VIDEOPREFIX'])
    	ip = request.headers.get('X-Real-IP', '')
    	videos = query('''SELECT videos.path, videos.id
    			FROM videos
    			JOIN lectures ON (videos.lecture_id = lectures.id)
    			JOIN courses ON (lectures.course_id = courses.id)
    			WHERE videos.path = ?
    			AND (? OR (courses.visible AND lectures.visible AND videos.visible))''',
    			url, ismod())
    	if videos and (url.startswith('pub') or ismod()):
    		query('INSERT INTO log VALUES (?, "", ?, "video", ?, ?)', ip, datetime.datetime.now(), videos[0]['id'], url)
    		return "OK", 200
    	elif url.endswith('jpg'):
    		return "OK", 200
    	else:
    		return "Not allowed", 403
    
    if __name__ == '__main__':
    	app.run(threaded=True)