Select Git revision
Forked from
Video AG Infrastruktur / website
Source project has a limited visibility.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
db.py 5.54 KiB
from server import *
import re
if config['DB_ENGINE'] == 'sqlite':
import sqlite3
# From sqlite3 module, but with error catching
def convert_timestamp(val):
try:
datepart, timepart = val.split(b" ")
year, month, day = map(int, datepart.split(b"-"))
timepart_full = timepart.split(b".")
hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
val = datetime(year, month, day, hours, minutes, seconds, 0)
except ValueError:
val = None
return val
sqlite3.register_converter('datetime', convert_timestamp)
sqlite3.register_converter('timestamp', convert_timestamp)
if config['DB_ENGINE'] == 'sqlite':
created = not os.path.exists(config['SQLITE_DB'])
db = sqlite3.connect(config['SQLITE_DB'])
cur = db.cursor()
if config['SQLITE_INIT_SCHEMA']:
cur.executescript(open(config['DB_SCHEMA']).read())
if config['SQLITE_INIT_DATA'] and created:
cur.executescript(open(config['DB_DATA']).read())
db.commit()
db.close()
def get_dbcursor():
if 'db' not in g:
g.db = sqlite3.connect(config['SQLITE_DB'], detect_types=sqlite3.PARSE_DECLTYPES)
g.db.isolation_level = None
if not hasattr(request, 'db'):
request.db = g.db.cursor()
return request.db
def fix_query(operation, params):
params = [(p.replace(microsecond=0) if isinstance(p, datetime) else p) for p in params]
return operation, params
def show(operation, host=None):
return {}
elif config['DB_ENGINE'] == 'mysql':
import mysql.connector
def get_dbcursor():
if 'db' not in g or not g.db.is_connected():
g.db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=config.get('MYSQL_HOST', None), port=config.get('MYSQL_PORT', 3306), unix_socket=config.get('MYSQL_UNIX', None), database=config['MYSQL_DB'])
if not hasattr(request, 'db'):
request.db = g.db.cursor()
return request.db
def fix_query(operation, params):
operation = operation.replace('?', '%s')
params = [(p.replace(microsecond=0) if isinstance(p, datetime) else p) for p in params]
return operation, params
def show(operation, host=config.get('MYSQL_HOST', None)):
if host:
db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=host, port=config.get('MYSQL_PORT', 3306))
else:
db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], unix_socket=config.get('MYSQL_UNIX', None))
cur = db.cursor()
cur.execute(operation)
rows = []
try:
rows = cur.fetchall()
except mysql.connector.errors.InterfaceError as ie:
if ie.msg == 'No result set to fetch from.':
# no problem, we were just at the end of the result set
pass
else:
raise
res = {}
for row in rows:
res[row[0]] = row[1]
cur.close()
db.close()
return res
def query(operation, *params, delim="sep"):
operation, params = fix_query(operation, params)
cur = get_dbcursor()
cur.execute(operation, params)
rows = []
try:
rows = cur.fetchall()
except mysql.connector.errors.InterfaceError as ie:
if ie.msg == 'No result set to fetch from.':
# no problem, we were just at the end of the result set
pass
else:
raise
res = []
for row in rows:
res.append({})
ptr = res[-1]
for col, desc in zip(row, cur.description):
name = desc[0].split('.')[-1].split(':')[0]
if name == delim:
ptr = res[-1][col] = {}
continue
if type(col) == str:
col = col.replace('\\n', '\n').replace('\\r', '\r')
ptr[name] = col
return res
def modify(operation, *params):
operation, params = fix_query(operation, params)
cur = get_dbcursor()
cur.execute(operation, params)
return cur.lastrowid
@app.teardown_request
def commit_db(*args):
if hasattr(request, 'db'):
request.db.close()
g.db.commit()
@app.teardown_appcontext
def close_db(*args):
if 'db' in g:
g.db.close()
del g.db
def searchquery(text, columns, match, tables, suffix, *suffixparams):
params = []
subexprs = []
words = text.split(' ')
prio = len(words)+1
for word in words:
if word == '' or word.isspace():
continue
matchexpr = ' OR '.join(['%s LIKE ?'%column for column in match])
subexprs.append('SELECT %s, %s AS _prio FROM %s WHERE %s'%(columns, str(prio), tables, matchexpr))
params += ['%'+word+'%']*len(match)
prio -= 1
if subexprs == []:
return []
expr = 'SELECT *,SUM(_prio) AS _score FROM (%s) AS _tmp %s'%(' UNION '.join(subexprs), suffix)
return query(expr, *(list(params)+list(suffixparams)))
LDAP_USERRE = re.compile(r'[^a-z0-9]')
if 'LDAP_HOST' in config:
import ldap3
def ldapauth(user, password):
user = LDAP_USERRE.sub(r'', user.lower())
try:
conn = ldap3.Connection(ldap3.Server(config['LDAP_HOST'], port=config['LDAP_PORT'], use_ssl=True), 'fsmpi\\%s'%user, password, auto_bind=True, check_names=False)
except ldap3.core.exceptions.LDAPBindError:
return {}, []
conn.search("cn=users,dc=fsmpi,dc=rwth-aachen,dc=de", "(cn=%s)"%user, attributes=['memberOf', 'givenName', 'sn'])
info = {'uid': user, 'givenName': conn.response[0]['attributes']['givenName'][0], 'sn': conn.response[0]['attributes']['sn'][0]}
groups = [g.split(',')[0].split('=')[-1] for g in conn.response[0]['attributes']['memberOf']]
conn.unbind()
return info, groups
else:
notldap = {
'videoag':('videoag', ['users','videoag'], {'uid': 'videoag', 'givenName': 'Video', 'sn': 'Geier'}),
'gustav':('passwort', ['users'], {'uid': 'gustav', 'givenName': 'Gustav', 'sn': 'Geier'})
}
def ldapauth(user, password):
user = LDAP_USERRE.sub(r'', user.lower())
if config.get('DEBUG') and user in notldap and password == notldap[user][0]:
return notldap[user][2], notldap[user][1]
return {}, []