Select Git revision
moderator.js
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
db.py 4.35 KiB
import sqlite3
from flask import g
from server import *
if config['DB_ENGINE'] == 'sqlite':
# From sqlite3 module, but with error catching
def convert_timestamp(val):
try:
datepart, timepart = val.split(b" ")
year, month, day = map(int, datepart.split(b"-"))
timepart_full = timepart.split(b".")
hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
val = datetime(year, month, day, hours, minutes, seconds, 0)
except ValueError:
val = datetime.fromtimestamp(0)
return val
sqlite3.register_converter('datetime', convert_timestamp)
sqlite3.register_converter('timestamp', convert_timestamp)
if config['DB_ENGINE'] == 'sqlite':
DBCREATED = not os.path.exists(config['SQLITE_DB'])
db = sqlite3.connect(config['SQLITE_DB'])
cur = db.cursor()
if config['SQLITE_INIT_SCHEMA']:
print('Init db schema')
cur.executescript(open(config['DB_SCHEMA']).read())
if config['SQLITE_INIT_DATA'] and DBCREATED:
print('Init db data')
cur.executescript(open(config['DB_DATA']).read())
db.commit()
db.close()
def get_dbcursor():
if 'db' not in g:
g.db = sqlite3.connect(config['SQLITE_DB'], detect_types=sqlite3.PARSE_DECLTYPES)
g.db.isolation_level = None
if not hasattr(request, 'db'):
request.db = g.db.cursor()
return request.db
def fix_query(operation, params):
params = [(p.replace(microsecond=0) if isinstance(p, datetime) else p) for p in params]
return operation, params
def show(operation, host=None): #pylint: disable=unused-argument
return {}
elif config['DB_ENGINE'] == 'mysql':
import mysql.connector
def get_dbcursor():
if 'db' not in g or not g.db.is_connected():
g.db = mysql.connector.connect(
user=config['MYSQL_USER'],
password=config['MYSQL_PASSWD'],
host=config.get('MYSQL_HOST', None),
port=config.get('MYSQL_PORT', 3306),
unix_socket=config.get('MYSQL_UNIX', None),
database=config['MYSQL_DB'])
g.db.cmd_query("SET SESSION sql_mode = 'ANSI_QUOTES'")
if not hasattr(request, 'db'):
request.db = g.db.cursor()
return request.db
def fix_query(operation, params):
operation = operation.replace('?', '%s')
params = [(p.replace(microsecond=0) if isinstance(p, datetime) else p) for p in params]
return operation, params
def show(operation, host=config.get('MYSQL_HOST', None)):
if host:
db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=host, port=config.get('MYSQL_PORT', 3306))
else:
db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], unix_socket=config.get('MYSQL_UNIX', None))
cur = db.cursor()
cur.execute(operation)
rows = []
try:
rows = cur.fetchall()
except mysql.connector.errors.InterfaceError as e:
if e.msg == 'No result set to fetch from.':
# no problem, we were just at the end of the result set
pass
else:
raise
res = {}
for row in rows:
res[row[0]] = row[1]
cur.close()
db.close()
return res
def query(operation, *params, delim="sep", nlfix=True):
operation, params = fix_query(operation, params)
tries = 0
retry = True
while tries < 10 and retry:
retry = False
try:
cur = get_dbcursor()
cur.execute(operation, params)
except Exception as e:
if str(e) == 'Deadlock found when trying to get lock; try restarting transaction':
tries += 1
retry = True
else:
raise
rows = []
try:
rows = cur.fetchall()
except Exception as e:
if str(e) == 'No result set to fetch from.' or str(e) == "the last operation didn't produce a result":
# no problem, we were just at the end of the result set
pass
else:
raise
res = []
for row in rows:
res.append({})
ptr = res[-1]
for col, desc in zip(row, cur.description):
name = desc[0].split('.')[-1].split(':')[0]
if name == delim:
ptr = res[-1][col] = {}
continue
if isinstance(col, str) and nlfix:
col = col.replace('\\n', '\n').replace('\\r', '\r')
ptr[name] = col
return res
def modify(operation, *params, get_id=False):
operation, params = fix_query(operation, params)
cur = get_dbcursor()
cur.execute(operation, params)
if not get_id:
return None
return cur.lastrowid
@app.teardown_request
def commit_db(*args): #pylint: disable=unused-argument
if hasattr(request, 'db'):
request.db.close()
g.db.commit()
@app.teardown_appcontext
def close_db(*args): #pylint: disable=unused-argument
if 'db' in g:
g.db.close()
del g.db