Skip to content
Snippets Groups Projects
Select Git revision
  • de9013eec326feddb4876f4ab5c0cfbd067880b8
  • master default protected
  • forbid-save-as
  • upload-via-token
  • moodle-integration
  • patch-double-tap-seek
  • patch_datum_anzeigen
  • patch_raum_anzeigen
  • intros
  • live_sources
  • bootstrap4
  • modules
12 results

db.py

Blame
  • Forked from Video AG Infrastruktur / website
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    db.py 4.18 KiB
    from server import *
    import sqlite3
    import re
    
    if config['DB_ENGINE'] == 'sqlite':
    	created = not os.path.exists(config['SQLITE_DB'])
    	db = sqlite3.connect(config['SQLITE_DB'])
    	cur = db.cursor()
    	if config['SQLITE_INIT_SCHEMA']:
    		cur.executescript(open(config['DB_SCHEMA']).read())
    	if config['SQLITE_INIT_DATA'] and created:
    		cur.executescript(open(config['DB_DATA']).read())
    	db.commit()
    	db.close()
    
    # Row wrapper for sqlite
    def dict_factory(cursor, row):
    	d = {}
    	for idx, col in enumerate(cursor.description):
    		if type(row[idx]) == str:
    			d[col[0].split('.')[-1]] = row[idx].replace('\\n','\n').replace('\\r','\r')
    		else:
    			d[col[0].split('.')[-1]] = row[idx]
    	return d
    
    # From sqlite3 module, but with error catching
    def convert_timestamp(val):
    	try:
    		datepart, timepart = val.split(b" ")
    		year, month, day = map(int, datepart.split(b"-"))
    		timepart_full = timepart.split(b".")
    		hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
    		val = datetime(year, month, day, hours, minutes, seconds, 0)
    	except ValueError:
    		val = None
    	return val
    
    sqlite3.register_converter('datetime', convert_timestamp)
    sqlite3.register_converter('timestamp', convert_timestamp)
    
    def query(operation, *params):
    	_params = []
    	for p in params:
    		if isinstance(p, datetime):
    			p = p.replace(microsecond=0)
    		_params.append(p)
    	params = _params
    	if config['DB_ENGINE'] == 'mysql':
    		import mysql.connector
    		if 'db' not in g or not g.db.is_connected():
    			g.db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=config['MYSQL_HOST'], database=config['MYSQL_DB'])
    		if not hasattr(request, 'db'):
    			request.db = g.db.cursor(dictionary=True)
    		request.db.execute(operation.replace('?', '%s'), params)
    	elif config['DB_ENGINE'] == 'sqlite':
    		if 'db' not in g:
    			g.db = sqlite3.connect(config['SQLITE_DB'], detect_types=sqlite3.PARSE_DECLTYPES)
    			g.db.row_factory = dict_factory
    			g.db.isolation_level = None
    		if not hasattr(request, 'db'):
    			request.db = g.db.cursor()
    		request.db.execute(operation, params)
    	else:
    		return []
    	try:
    		rows = request.db.fetchall()
    	except:
    		rows = []
    	if not rows and request.db.lastrowid != None:
    		return request.db.lastrowid
    	return rows
    
    @app.teardown_request
    def commit_db(*args):
    	if hasattr(request, 'db'):
    		request.db.close()
    		g.db.commit()
    
    def searchquery(text, columns, match, tables, suffix, *suffixparams):
    	params = []
    	subexprs = []
    	words = text.split(' ')
    	prio = len(words)+1
    	for word in words:
    		if word == '' or word.isspace():
    			continue
    		matchexpr = ' OR '.join(['%s LIKE ?'%column for column in match])
    		subexprs.append('SELECT %s, %s AS _prio FROM %s WHERE %s'%(columns, str(prio), tables, matchexpr))
    		params += ['%'+word+'%']*len(match)
    		prio -= 1
    	if subexprs == []:
    		return []
    	expr = 'SELECT *,SUM(_prio) AS _score FROM (%s) AS _tmp %s'%(' UNION '.join(subexprs), suffix)
    	return query(expr, *params, *suffixparams)
    
    LDAP_USERRE = re.compile(r'[^a-z0-9]')
    notldap = {
    	'videoag':('videoag', ['users','videoag'], {'uid': 'videoag', 'givenName': 'Video', 'sn': 'Geier'}),
    	'gustav':('passwort', ['users'], {'uid': 'gustav', 'givenName': 'Gustav', 'sn': 'Geier'})
    }
    
    def ldapauth(user, password):
    	user = LDAP_USERRE.sub(r'', user.lower())
    	if 'LDAP_HOST' in config:
    		import ldap3
    		try:
    			conn = ldap3.Connection(config['LDAP_HOST'], 'uid=%s,ou=users,dc=fsmpi,dc=rwth-aachen,dc=de'%user, password, auto_bind=True)
    			if conn.search("ou=groups,dc=fsmpi,dc=rwth-aachen,dc=de", "(&(cn=*)(memberUid=%s))"%user, attributes=['cn']):
    				groups = [e.cn.value for e in conn.entries]
    			conn.unbind()
    			return user, groups
    		except ldap3.core.exceptions.LDAPBindError:
    			pass
    	elif config.get('DEBUG') and user in notldap and password == notldap[user][0]:
    		return user, notldap[user][1]
    	return None, []
    
    def ldapget(user):
    	user = LDAP_USERRE.sub(r'', user.lower())
    	if 'LDAP_HOST' in config:
    		import ldap3
    		conn = ldap3.Connection('ldaps://rumo.fsmpi.rwth-aachen.de', auto_bind=True)
    		conn.search("ou=users,dc=fsmpi,dc=rwth-aachen,dc=de", "(uid=%s)"%user,
    				attributes=ldap3.ALL_ATTRIBUTES)
    		if not conn.entries:
    			return {}
    		e = conn.entries[0]
    		return {'uid': user, 'givenName': e.givenName.value, 'sn':e.sn.value}
    	else:
    		return notldap[user][2]