Skip to content
Snippets Groups Projects
Select Git revision
  • ab68dcd283b06d615e2a11583feb57ed8a5d0dc4
  • master default protected
  • postgres_integration
  • s3compatible
  • intros
  • bootstrap4
  • modules
7 results

moderator.js

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    db.py 4.35 KiB
    import sqlite3
    from flask import g
    
    from server import *
    
    if config['DB_ENGINE'] == 'sqlite':
    	# From sqlite3 module, but with error catching
    	def convert_timestamp(val):
    		try:
    			datepart, timepart = val.split(b" ")
    			year, month, day = map(int, datepart.split(b"-"))
    			timepart_full = timepart.split(b".")
    			hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
    			val = datetime(year, month, day, hours, minutes, seconds, 0)
    		except ValueError:
    			val = datetime.fromtimestamp(0)
    		return val
    
    	sqlite3.register_converter('datetime', convert_timestamp)
    	sqlite3.register_converter('timestamp', convert_timestamp)
    
    	if config['DB_ENGINE'] == 'sqlite':
    		DBCREATED = not os.path.exists(config['SQLITE_DB'])
    		db = sqlite3.connect(config['SQLITE_DB'])
    		cur = db.cursor()
    		if config['SQLITE_INIT_SCHEMA']:
    			print('Init db schema')
    			cur.executescript(open(config['DB_SCHEMA']).read())
    		if config['SQLITE_INIT_DATA'] and DBCREATED:
    			print('Init db data')
    			cur.executescript(open(config['DB_DATA']).read())
    		db.commit()
    		db.close()
    
    	def get_dbcursor():
    		if 'db' not in g:
    			g.db = sqlite3.connect(config['SQLITE_DB'], detect_types=sqlite3.PARSE_DECLTYPES)
    			g.db.isolation_level = None
    		if not hasattr(request, 'db'):
    			request.db = g.db.cursor()
    		return request.db
    
    	def fix_query(operation, params):
    		params = [(p.replace(microsecond=0) if isinstance(p, datetime) else p) for p in params]
    		return operation, params
    
    	def show(operation, host=None): #pylint: disable=unused-argument
    		return {}
    
    elif config['DB_ENGINE'] == 'mysql':
    	import mysql.connector
    	def get_dbcursor():
    		if 'db' not in g or not g.db.is_connected():
    			g.db = mysql.connector.connect(
    				user=config['MYSQL_USER'],
    				password=config['MYSQL_PASSWD'],
    				host=config.get('MYSQL_HOST', None),
    				port=config.get('MYSQL_PORT', 3306),
    				unix_socket=config.get('MYSQL_UNIX', None),
    				database=config['MYSQL_DB'])
    			g.db.cmd_query("SET SESSION sql_mode = 'ANSI_QUOTES'")
    		if not hasattr(request, 'db'):
    			request.db = g.db.cursor()
    		return request.db
    
    	def fix_query(operation, params):
    		operation = operation.replace('?', '%s')
    		params = [(p.replace(microsecond=0) if isinstance(p, datetime) else p) for p in params]
    		return operation, params
    
    	def show(operation, host=config.get('MYSQL_HOST', None)):
    		if host:
    			db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], host=host, port=config.get('MYSQL_PORT', 3306))
    		else:
    			db = mysql.connector.connect(user=config['MYSQL_USER'], password=config['MYSQL_PASSWD'], unix_socket=config.get('MYSQL_UNIX', None))
    		cur = db.cursor()
    		cur.execute(operation)
    		rows = []
    		try:
    			rows = cur.fetchall()
    		except mysql.connector.errors.InterfaceError as e:
    			if e.msg == 'No result set to fetch from.':
    				# no problem, we were just at the end of the result set
    				pass
    			else:
    				raise
    		res = {}
    		for row in rows:
    			res[row[0]] = row[1]
    		cur.close()
    		db.close()
    		return res
    
    def query(operation, *params, delim="sep", nlfix=True):
    	operation, params = fix_query(operation, params)
    	tries = 0
    	retry = True
    	while tries < 10 and retry:
    		retry = False
    		try:
    			cur = get_dbcursor()
    			cur.execute(operation, params)
    		except Exception as e:
    			if str(e) == 'Deadlock found when trying to get lock; try restarting transaction':
    				tries += 1
    				retry = True
    			else:
    				raise
    	rows = []
    	try:
    		rows = cur.fetchall()
    	except Exception as e:
    		if str(e) == 'No result set to fetch from.' or str(e) == "the last operation didn't produce a result":
    			# no problem, we were just at the end of the result set
    			pass
    		else:
    			raise
    	res = []
    	for row in rows:
    		res.append({})
    		ptr = res[-1]
    		for col, desc in zip(row, cur.description):
    			name = desc[0].split('.')[-1].split(':')[0]
    			if name == delim:
    				ptr = res[-1][col] = {}
    				continue
    			if isinstance(col, str) and nlfix:
    				col = col.replace('\\n', '\n').replace('\\r', '\r')
    			ptr[name] = col
    	return res
    
    def modify(operation, *params, get_id=False):
    	operation, params = fix_query(operation, params)
    	cur = get_dbcursor()
    	cur.execute(operation, params)
    	if not get_id:
    		return None
    	return cur.lastrowid
    
    @app.teardown_request
    def commit_db(*args): #pylint: disable=unused-argument
    	if hasattr(request, 'db'):
    		request.db.close()
    		g.db.commit()
    
    @app.teardown_appcontext
    def close_db(*args): #pylint: disable=unused-argument
    	if 'db' in g:
    		g.db.close()
    		del g.db