Something went wrong on our end
Select Git revision
-
Andreas Valder authoredAndreas Valder authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
db.py 4.88 KiB
from server import *
import re
if config['DB_ENGINE'] == 'sqlite':
import sqlite3
# From sqlite3 module, but with error catching
def convert_timestamp(val):
try:
datepart, timepart = val.split(b" ")
year, month, day = map(int, datepart.split(b"-"))
timepart_full = timepart.split(b".")
hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
val = datetime(year, month, day, hours, minutes, seconds, 0)
except ValueError:
val = None
return val
sqlite3.register_converter('datetime', convert_timestamp)
sqlite3.register_converter('timestamp', convert_timestamp)
if config['DB_ENGINE'] == 'sqlite':
created = not os.path.exists(config['SQLITE_DB'])
db = sqlite3.connect(config['SQLITE_DB'])
cur = db.cursor()
if config['SQLITE_INIT_SCHEMA']:
cur.executescript(open(config['DB_SCHEMA']).read())
if config['SQLITE_INIT_DATA'] and created:
cur.executescript(open(config['DB_DATA']).read())
db.commit()
db.close()
def get_dbcursor():
if 'db' not in g:
g.db = sqlite3.connect(config['SQLITE_DB'], detect_types=sqlite3.PARSE_DECLTYPES)
g.db.isolation_level = None
if not hasattr(request, 'db'):
request.db = g.db.cursor()
return request.db
def fix_query(operation, params):
params = [(p.replace(microsecond=0) if isinstance(p, datetime) else p) for p in params]
return operation, params
elif config['DB_ENGINE'] == 'mysql':
import mysql.connector
def get_dbcursor():
if 'db' not in g or not g.db.is_connected():
g.db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=config['MYSQL_HOST'], database=config['MYSQL_DB'])
if not hasattr(request, 'db'):
request.db = g.db.cursor()
return request.db
def fix_query(operation, params):
operation = operation.replace('?', '%s')
params = [(p.replace(microsecond=0) if isinstance(p, datetime) else p) for p in params]
return operation, params
def query(operation, *params, delim="sep"):
operation, params = fix_query(operation, params)
cur = get_dbcursor()
cur.execute(operation, params)
rows = []
try:
rows = cur.fetchall()
except mysql.connector.errors.InterfaceError as ie:
if ie.msg == 'No result set to fetch from.':
# no problem, we were just at the end of the result set
pass
else:
raise
res = []
for row in rows:
res.append({})
ptr = res[-1]
for col, desc in zip(row, cur.description):
name = desc[0].split('.')[-1].split(':')[0]
if name == delim:
ptr = res[-1][col] = {}
continue
if type(col) == str:
col = col.replace('\\n', '\n').replace('\\r', '\r')
ptr[name] = col
return res
def modify(operation, *params):
operation, params = fix_query(operation, params)
cur = get_dbcursor()
cur.execute(operation, params)
return cur.lastrowid
@app.teardown_request
def commit_db(*args):
if hasattr(request, 'db'):
request.db.close()
g.db.commit()
def searchquery(text, columns, match, tables, suffix, *suffixparams):
params = []
subexprs = []
words = text.split(' ')
prio = len(words)+1
for word in words:
if word == '' or word.isspace():
continue
matchexpr = ' OR '.join(['%s LIKE ?'%column for column in match])
subexprs.append('SELECT %s, %s AS _prio FROM %s WHERE %s'%(columns, str(prio), tables, matchexpr))
params += ['%'+word+'%']*len(match)
prio -= 1
if subexprs == []:
return []
expr = 'SELECT *,SUM(_prio) AS _score FROM (%s) AS _tmp %s'%(' UNION '.join(subexprs), suffix)
return query(expr, *(list(params)+list(suffixparams)))
LDAP_USERRE = re.compile(r'[^a-z0-9]')
if 'LDAP_HOST' in config:
import ldap3
def ldapauth(user, password):
user = LDAP_USERRE.sub(r'', user.lower())
try:
conn = ldap3.Connection(config['LDAP_HOST'], 'uid=%s,ou=users,dc=fsmpi,dc=rwth-aachen,dc=de'%user, password, auto_bind=True)
if conn.search("ou=groups,dc=fsmpi,dc=rwth-aachen,dc=de", "(&(cn=*)(memberUid=%s))"%user, attributes=['cn']):
groups = [e['attributes']['cn'][0] for e in conn.response]
conn.unbind()
return user, groups
except ldap3.core.exceptions.LDAPBindError:
pass
def ldapget(user):
user = LDAP_USERRE.sub(r'', user.lower())
conn = ldap3.Connection('ldaps://rumo.fsmpi.rwth-aachen.de', auto_bind=True)
conn.search("ou=users,dc=fsmpi,dc=rwth-aachen,dc=de", "(uid=%s)"%user,
attributes=ldap3.ALL_ATTRIBUTES)
if not conn.response:
return {}
e = conn.response[0]
return {'uid': user, 'givenName': e['attributes']['givenName'][0], 'sn':e['attributes']['sn'][0]}
else:
notldap = {
'videoag':('videoag', ['users','videoag'], {'uid': 'videoag', 'givenName': 'Video', 'sn': 'Geier'}),
'gustav':('passwort', ['users'], {'uid': 'gustav', 'givenName': 'Gustav', 'sn': 'Geier'})
}
def ldapauth(user, password):
user = LDAP_USERRE.sub(r'', user.lower())
if config.get('DEBUG') and user in notldap and password == notldap[user][0]:
return user, notldap[user][1]
return None, []
def ldapget(user):
user = LDAP_USERRE.sub(r'', user.lower())
return notldap[user][2]