Select Git revision
pdm_build.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
db.py 4.67 KiB
from server import *
if config['DB_ENGINE'] == 'sqlite':
import sqlite3
# From sqlite3 module, but with error catching
def convert_timestamp(val):
try:
datepart, timepart = val.split(b" ")
year, month, day = map(int, datepart.split(b"-"))
timepart_full = timepart.split(b".")
hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
val = datetime(year, month, day, hours, minutes, seconds, 0)
except ValueError:
val = datetime.fromtimestamp(0)
return val
sqlite3.register_converter('datetime', convert_timestamp)
sqlite3.register_converter('timestamp', convert_timestamp)
if config['DB_ENGINE'] == 'sqlite':
created = not os.path.exists(config['SQLITE_DB'])
db = sqlite3.connect(config['SQLITE_DB'])
cur = db.cursor()
if config['SQLITE_INIT_SCHEMA']:
print('Init db schema')
cur.executescript(open(config['DB_SCHEMA']).read())
if config['SQLITE_INIT_DATA'] and created:
print('Init db data')
cur.executescript(open(config['DB_DATA']).read())
db.commit()
db.close()
def get_dbcursor():
if 'db' not in g:
g.db = sqlite3.connect(config['SQLITE_DB'], detect_types=sqlite3.PARSE_DECLTYPES)
g.db.isolation_level = None
if not hasattr(request, 'db'):
request.db = g.db.cursor()
return request.db
def fix_query(operation, params):
params = [(p.replace(microsecond=0) if isinstance(p, datetime) else p) for p in params]
return operation, params
def show(operation, host=None):
return {}
elif config['DB_ENGINE'] == 'mysql':
import mysql.connector
def get_dbcursor():
if 'db' not in g or not g.db.is_connected():
g.db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=config.get('MYSQL_HOST', None), port=config.get('MYSQL_PORT', 3306), unix_socket=config.get('MYSQL_UNIX', None), database=config['MYSQL_DB'])
if not hasattr(request, 'db'):
request.db = g.db.cursor()
return request.db
def fix_query(operation, params):
operation = operation.replace('?', '%s')
params = [(p.replace(microsecond=0) if isinstance(p, datetime) else p) for p in params]
return operation, params
def show(operation, host=config.get('MYSQL_HOST', None)):
if host:
db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=host, port=config.get('MYSQL_PORT', 3306))
else:
db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], unix_socket=config.get('MYSQL_UNIX', None))
cur = db.cursor()
cur.execute(operation)
rows = []
try:
rows = cur.fetchall()
except mysql.connector.errors.InterfaceError as ie:
if ie.msg == 'No result set to fetch from.':
# no problem, we were just at the end of the result set
pass
else:
raise
res = {}
for row in rows:
res[row[0]] = row[1]
cur.close()
db.close()
return res
def query(operation, *params, delim="sep", nlfix=True):
operation, params = fix_query(operation, params)
tries = 0
while (tries < 10):
try:
cur = get_dbcursor()
cur.execute(operation, params)
except mysql.connector.errors.InternalError as e:
if e.msg == 'Deadlock found when trying to get lock; try restarting transaction':
tries += 1
continue
else:
raise
break
rows = []
try:
rows = cur.fetchall()
except mysql.connector.errors.InterfaceError as ie:
if ie.msg == 'No result set to fetch from.':
# no problem, we were just at the end of the result set
pass
else:
raise
res = []
for row in rows:
res.append({})
ptr = res[-1]
for col, desc in zip(row, cur.description):
name = desc[0].split('.')[-1].split(':')[0]
if name == delim:
ptr = res[-1][col] = {}
continue
if type(col) == str and nlfix:
col = col.replace('\\n', '\n').replace('\\r', '\r')
ptr[name] = col
return res
def modify(operation, *params):
operation, params = fix_query(operation, params)
cur = get_dbcursor()
cur.execute(operation, params)
return cur.lastrowid
@app.teardown_request
def commit_db(*args):
if hasattr(request, 'db'):
request.db.close()
g.db.commit()
@app.teardown_appcontext
def close_db(*args):
if 'db' in g:
g.db.close()
del g.db
def searchquery(text, columns, match, tables, suffix, *suffixparams):
params = []
subexprs = []
words = text.split(' ')
prio = len(words)+1
for word in words:
if word == '' or word.isspace():
continue
matchexpr = ' OR '.join(['%s LIKE ?'%column for column in match])
subexprs.append('SELECT %s, %s AS _prio FROM %s WHERE %s'%(columns, str(prio), tables, matchexpr))
params += ['%'+word+'%']*len(match)
prio -= 1
if subexprs == []:
return []
expr = 'SELECT *,SUM(_prio) AS _score FROM (%s) AS _tmp %s'%(' UNION '.join(subexprs), suffix)
return query(expr, *(list(params)+list(suffixparams)))