Skip to content
Snippets Groups Projects
Commit 5ff71ae6 authored by Simon Künzel's avatar Simon Künzel
Browse files

Migrate to SQLAlchemy, Rewrite Object API, throw away whatever that old changelog versioning was

parent ed9691cd
Branches
Tags
No related merge requests found
Showing
with 5736 additions and 3678 deletions
[submodule "submodules/backend_common_py"]
path = submodules/backend_common_py
url = git@git.fsmpi.rwth-aachen.de:videoag/backend_common_py.git
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -42,31 +42,18 @@ DB_CONNECTIONS = {
"max_write_attempts": 2
}
# DB_ENGINE = "mysql"
MYSQL = {
"host": "10.0.0.101",
"port": 43045,
"user": "video",
"password": "video",
"database": "video_ag_test",
# "unix_socket": "/var/run/mysqld/mysqld.sock"
}
# DB_ENGINE = "postgres"
POSTGRES = {
"host": "10.0.0.101",
DATABASE = {
"engine": "postgres",
"postgres": {
"host": "10.0.0.23",
"port": 5432,
"user": "videoag",
"password": "",
"database": "videoag"
}
DB_ENGINE = "sqlite"
SQLITE = {
"file": "db.sqlite",
"schema": "../config/db_schema_sqlite.sql",
"init_schema": True,
"init_data": True
"database": "videoag_alchemy",
"auto_migration": True,
"ignore_no_connection": False,
},
"log_all_statements": True # TODO
}
# This is host based. It uses a simple counter. For example for a window size of one hour, every hour the specified
......
DELETE FROM public.changelog_entry WHERE id >= 0;
DELETE FROM public.publish_medium WHERE id >= 0;
DELETE FROM public.chapter WHERE id >= 0;
DELETE FROM public.lecture WHERE id >= 0;
DELETE FROM public.course_responsible;
DELETE FROM public."user" WHERE id >= 0;
DELETE FROM public.course WHERE id >= 0;
DELETE FROM public.featured WHERE id >= 0;
DELETE FROM public.announcement WHERE id >= 0;
INSERT INTO public.announcement (id, deleted, visible, type, page_visibility, text, publish_time, expiration_time)
SELECT
id,
deleted,
visible,
(CASE
WHEN level = 0 THEN 'info'
WHEN level = 1 THEN 'info'
WHEN level = 2 THEN 'warning'
WHEN level = 3 THEN 'important'
END)::announcement_type AS type,
(CASE
WHEN level = 0 THEN 'only_main_page'
WHEN level = 1 THEN 'all_pages'
WHEN level = 2 THEN 'all_pages'
WHEN level = 3 THEN 'all_pages_and_embed'
END)::announcement_page_visibility AS page_visibility,
text,
time_publish,
time_expire
FROM old_data.announcements
;
INSERT INTO public.course (id, deleted, visible, handle, full_name, short_name, semester, organizer, topic, description,
show_chapters_on_course, authentication_information, listed, internal_comment, allow_download, allow_embed,
view_perm_type, view_perm_rwth_auth, view_perm_fsmpi_auth, view_perm_extra_json)
SELECT
id, deleted, visible, handle, title, short,
CASE -- TODO better check
WHEN semester = '' THEN 'none'
ELSE semester
END,
organizer,
subject,
description,
coursechapters,
login_info,
listed,
internal,
downloadable,
NOT embedinvisible,
'inherit', -- TODO
False,
False,
NULL
FROM old_data.courses_data
; -- TODO migrate legacy responsible to internal comment
UPDATE public.course
SET view_perm_extra_json = '{"moodle_course_ids": [], "passwords": {}}'
WHERE view_perm_type = 'authentication'
;
INSERT INTO public."user" (id, handle, name, last_login)
SELECT id, name, realname, last_login
FROM old_data.users
;
INSERT INTO public.course_responsible (course_id, user_id)
SELECT DISTINCT course_id, user_id FROM old_data.responsible
WHERE course_id IN (SELECT id FROM public.course) -- There are some for unknown courses
;
INSERT INTO public.featured (id, deleted, visible, title, text, display_priority, type, image_url, course_id, lecture_id)
SELECT id, deleted, visible, title, text, 0,
(CASE
WHEN type = 'plain' THEN 'plain'
WHEN type = 'image' THEN 'image'
ELSE 'invalid' -- Will cause exception
END)::featured_type,
CASE
WHEN type = 'image' THEN COALESCE(param, '')
ELSE NULL
END,
NULL,
NULL
FROM old_data.featured
WHERE NOT deleted
;
INSERT INTO public.lecture (id, deleted, visible, course_id, title, speaker, location, "time", duration, description,
no_recording, livestream_planned, internal_comment, view_perm_type, view_perm_rwth_auth, view_perm_fsmpi_auth,
view_perm_extra_json)
SELECT id, deleted, visible, course_id, title, speaker, place, "time", duration, comment,
norecording, live, internal,
'inherit', False, False, NULL -- TODO
FROM old_data.lectures_data
WHERE course_id <> 0 -- These are empty
;
UPDATE public.lecture
SET view_perm_extra_json = '{"moodle_course_ids": [], "passwords": {}}'
WHERE view_perm_type = 'authentication'
;
INSERT INTO public.chapter (id, deleted, visible, lecture_id, start_time, name)
SELECT id, deleted, visible, lecture_id, "time", text
FROM old_data.chapters
;
INSERT INTO public.publish_medium (id, deleted, visible, lecture_id, title)
SELECT id, deleted, visible, lecture_id,
CASE
WHEN comment <> '' THEN comment
ELSE title
END
FROM old_data.videos_data
;
SELECT setval(pg_get_serial_sequence('announcement', 'id'), COALESCE((SELECT MAX("id")+1 FROM "announcement"), 1), false);
SELECT setval(pg_get_serial_sequence('course', 'id'), COALESCE((SELECT MAX("id")+1 FROM "course"), 1), false);
SELECT setval(pg_get_serial_sequence('featured', 'id'), COALESCE((SELECT MAX("id")+1 FROM "featured"), 1), false);
SELECT setval(pg_get_serial_sequence('lecture', 'id'), COALESCE((SELECT MAX("id")+1 FROM "lecture"), 1), false);
SELECT setval(pg_get_serial_sequence('chapter', 'id'), COALESCE((SELECT MAX("id")+1 FROM "chapter"), 1), false);
SELECT setval(pg_get_serial_sequence('publish_medium', 'id'), COALESCE((SELECT MAX("id")+1 FROM "publish_medium"), 1), false);
SELECT setval(pg_get_serial_sequence('user', 'id'), COALESCE((SELECT MAX("id")+1 FROM "user"), 1), false);
SELECT setval(pg_get_serial_sequence('changelog_entry', 'id'), COALESCE((SELECT MAX("id")+1 FROM "changelog_entry"), 1), false);
\ No newline at end of file
......@@ -16,6 +16,7 @@ ldap3==2.9.1
mysql-connector-python==8.4.0
psycopg[c]==3.1.19
# sqlite is part of the standard library
sqlalchemy==2.0.31
# required for testing
coverage==7.5.1
......
from datetime import datetime
from api.database import db_pool, PreparedStatement
from api.authentication import is_moderator
_SQL_GET_ANNOUNCEMENTS = PreparedStatement("""
SELECT * FROM "announcements"
WHERE NOT "deleted"
AND (? OR (
"visible"
AND (("time_expire" IS NULL) OR "time_expire" > ?)
AND (("time_publish" IS NULL) OR "time_publish" <= ?)
))
""")
def query_announcements():
is_mod = is_moderator()
current_time: datetime = datetime.now()
announcements_db = db_pool.execute_read_statement_in_transaction(
_SQL_GET_ANNOUNCEMENTS, is_mod, current_time, current_time)
announcements_json = []
for announcement_db in announcements_db:
announcement_json = {
"id": announcement_db["id"],
"text": announcement_db["text"]
}
if is_mod:
time_publish = announcement_db["time_publish"]
time_expire = announcement_db["time_expire"]
has_not_expired = time_expire is None or time_expire > current_time
announcement_json["is_currently_visible"] = (announcement_db["visible"]
and (time_publish is None or time_publish <= current_time)
and has_not_expired)
announcement_json["has_expired"] = not has_not_expired
announcements_json.append(announcement_json)
level_db: int = announcement_db["level"]
if level_db == 0:
announcement_json["type"] = "info"
announcement_json["visibility"] = "only_main_page"
elif level_db == 1:
announcement_json["type"] = "info"
announcement_json["visibility"] = "all_pages"
elif level_db == 2:
announcement_json["type"] = "warning"
announcement_json["visibility"] = "all_pages"
elif level_db == 3:
announcement_json["type"] = "important"
announcement_json["visibility"] = "all_pages_and_embed"
else:
raise Exception("Unknown announcement level: " + str(level_db))
return announcements_json
import secrets
import re
import string
from enum import StrEnum
from ipaddress import ip_address, ip_network
import requests
from requests.exceptions import JSONDecodeError
from functools import wraps
from flask import session, request
from videoag_common.objects import *
from api.miscellaneous import *
from api.database import *
import api
class ViewPermissionsType(StrEnum):
PUBLIC = "public"
PRIVATE = "private"
AUTHENTICATION = "authentication"
INHERIT = "inherit"
class ViewPermissions:
def __init__(self,
type: ViewPermissionsType,
rwth_authentication: bool or None = None,
fsmpi_authentication: bool or None = None,
moodle_course_ids: list[int] or None = None,
passwords: dict[str, str] or None = None):
super().__init__()
self.type = type
self.rwth_authentication = rwth_authentication
self.fsmpi_authentication = fsmpi_authentication
self.moodle_course_ids = moodle_course_ids
self.passwords = passwords
is_auth = type == ViewPermissionsType.AUTHENTICATION
if ((rwth_authentication is not None) != is_auth) \
or ((fsmpi_authentication is not None) != is_auth) \
or ((moodle_course_ids is not None) != is_auth) \
or ((passwords is not None) != is_auth):
raise TypeError()
if is_auth \
and not rwth_authentication \
and not fsmpi_authentication \
and len(moodle_course_ids) == 0 \
and len(passwords) == 0:
raise RuntimeError("At least one authentication method must be present for type AUTHENTICATION")
@staticmethod
def from_json(json: CJsonValue or JsonTypes) -> "ViewPermissions":
is_client = isinstance(json, CJsonValue)
if not is_client:
json = CJsonValue(json)
json = json.as_object()
try:
type_str: str = json.get_string("type", 100)
try:
type = ViewPermissionsType(type_str)
except ValueError:
raise json.raise_error(f"Unknown type '{truncate_string(type_str)}'")
rwth_authentication = None
fsmpi_authentication = None
moodle_course_ids = None
passwords = None
if type == ViewPermissionsType.AUTHENTICATION:
rwth_authentication = json.get_bool("rwth_authentication")
fsmpi_authentication = json.get_bool("fsmpi_authentication")
moodle_course_ids = [v.as_sint32() for v in json.get_array("moodle_course_ids")]
passwords_json = json.get_object("passwords")
passwords = {k: passwords_json.get_string(k, 1024) for k in passwords_json.keys()}
if not rwth_authentication \
and not fsmpi_authentication \
and len(moodle_course_ids) == 0 \
and len(passwords) == 0:
raise ApiClientException(ERROR_BAD_REQUEST(f"For type {ViewPermissionsType.AUTHENTICATION.value} at"
f" least one authentication method must be present"))
else:
json.raise_if_present("rwth_authentication")
json.raise_if_present("fsmpi_authentication")
json.raise_if_present("moodle_course_ids")
json.raise_if_present("passwords")
return ViewPermissions(
type,
rwth_authentication,
fsmpi_authentication,
moodle_course_ids,
passwords
)
except ApiClientException as e:
if is_client:
raise e
raise RuntimeError(f"Invalid json: {e.error.message}")
def to_json(self) -> dict:
if self.type == ViewPermissionsType.AUTHENTICATION:
return {
"type": self.type.value,
"rwth_authentication": self.rwth_authentication,
"fsmpi_authentication": self.fsmpi_authentication,
"moodle_course_ids": self.moodle_course_ids,
"passwords": self.passwords
}
else:
return {
"type": self.type.value
}
class EffectiveViewPermissions(ViewPermissions):
def __init__(self,
object_permissions: ViewPermissions,
inherited_permissions: ViewPermissions = ViewPermissions(ViewPermissionsType.PUBLIC)):
"""
:param object_permissions: The permissions set for the object
:param inherited_permissions: Only relevant if `permissions.type == INHERIT`. May not have type INHERIT
"""
effective = object_permissions
if object_permissions.type == ViewPermissionsType.INHERIT:
if inherited_permissions.type == ViewPermissionsType.INHERIT:
raise RuntimeError("No indirect inheritance allowed")
effective = inherited_permissions
super().__init__(
effective.type,
effective.rwth_authentication,
effective.fsmpi_authentication,
effective.moodle_course_ids,
effective.passwords
)
self.object_permissions = object_permissions
def api_moderator_route(require_csrf_token: bool = False):
def decorator(func):
if hasattr(func, "is_api_route") and func.is_api_route:
if hasattr(func, "api_function"):
raise Exception("@api_moderator_route() seems to be applied after @api_route(). @api_moderator_route() "
"should be the first (lowest) decorator!")
......@@ -144,9 +25,11 @@ def api_moderator_route(require_csrf_token: bool = False):
if not is_moderator():
raise ApiClientException(ERROR_UNAUTHORIZED)
if require_csrf_token:
check_csrf_token()
_check_csrf_token()
return func(*args, **kwargs)
setattr(wrapper, "api_function_only_mod", True)
setattr(wrapper, "api_function_requires_csrf_token", require_csrf_token)
return wrapper
return decorator
......@@ -156,7 +39,7 @@ def is_moderator():
return "user" in session
def check_csrf_token():
def _check_csrf_token():
if "X-Csrf-Token" not in request.headers \
or not is_valid_csrf_token(request.headers["X-Csrf-Token"]):
raise ApiClientException(ERROR_INVALID_CSRF_TOKEN)
......@@ -184,9 +67,10 @@ def get_user_info():
if "user" not in session:
return None
user = session["user"]
# TODO this is the same as User object (and also reported as such in the docs). Just serialize user object into cookie and return that?
return {
"id": user["dbid"],
"id_string": user["uid"],
"handle": user["uid"],
"name": user["givenName"]
}
......@@ -199,112 +83,21 @@ def is_rwth_ip(ip_string: str) -> bool:
return False
def _get_raw_view_permissions(auth_list_db: []) -> ViewPermissions:
if len(auth_list_db) == 0:
return ViewPermissions(ViewPermissionsType.INHERIT)
has_rwth: bool = False
has_fsmpi: bool = False
has_none: bool = False
moodle_course_ids: list[int] = []
passwords: dict[str, str] = {}
for auth_db in auth_list_db:
match auth_db["type"]:
case "public":
return ViewPermissions(ViewPermissionsType.PUBLIC)
case "rwth":
has_rwth = True
case "moodle":
moodle_course_ids.append(int(auth_db["param1"]))
case "fsmpi":
has_fsmpi = True
case "password":
username = auth_db["param1"]
password = auth_db["param2"]
if username in passwords:
print(f"Warning: Duplicate username in password permissions: {username} (Perm id: {auth_db['id']})")
passwords[username] = password
case "none":
has_none = True
case "l2p":
pass
case _:
raise ValueError("Unknown authentication method: " + str(type))
if has_rwth or has_fsmpi or len(moodle_course_ids) > 0 or len(passwords) > 0:
return ViewPermissions(
ViewPermissionsType.AUTHENTICATION,
has_rwth,
has_fsmpi,
moodle_course_ids,
passwords
)
if has_none:
return ViewPermissions(ViewPermissionsType.PRIVATE)
# We must only have a l2p permission. This is effectively private
return ViewPermissions(ViewPermissionsType.PRIVATE)
def get_effective_view_permissions(auth_list_db: [], for_lecture: bool) -> EffectiveViewPermissions:
course_auth = []
lecture_auth = []
for auth_db in auth_list_db:
if "course_id" in auth_db and auth_db["course_id"] is not None:
course_auth.append(auth_db)
elif "lecture_id" in auth_db and auth_db["lecture_id"] is not None:
lecture_auth.append(auth_db)
elif "video_id" in auth_db and auth_db["video_id"] is not None:
pass # Not supported anymore
else:
raise Exception("Permission has no course or lecture id set")
course_perm = EffectiveViewPermissions(_get_raw_view_permissions(course_auth))
if for_lecture:
return EffectiveViewPermissions(
_get_raw_view_permissions(lecture_auth),
course_perm
)
else:
return course_perm
def get_authentication_methods(auth_methods_db: [], for_lecture: bool) -> [str]:
perm = get_effective_view_permissions(auth_methods_db, for_lecture)
match perm.type:
case ViewPermissionsType.PUBLIC:
return ["public"]
case ViewPermissionsType.PRIVATE:
return []
case ViewPermissionsType.AUTHENTICATION:
auth_list_json: list[str] = []
if perm.rwth_authentication:
auth_list_json.append("rwth")
if perm.fsmpi_authentication:
auth_list_json.append("fsmpi")
if len(perm.moodle_course_ids) > 0:
auth_list_json.append("moodle")
if len(perm.passwords) > 0:
auth_list_json.append("password")
return auth_list_json
case ViewPermissionsType.INHERIT:
raise ValueError("Got INHERIT permission from get_effective_view_permissions")
case _:
raise ValueError(f"Unknown type {perm.type}")
def is_lecture_authenticated(lecture_id: int) -> bool:
if is_moderator():
return True
from api.course import lecture_query_auth
return is_authenticated(lecture_query_auth(lecture_id), True)
lecture = database.query_one_or_none_and_expunge(
Lecture.select(False, True)
.where(Lecture.id == lecture_id)
)
if lecture is None:
raise ApiClientException(ERROR_OBJECT_ERROR(f"Unknown lecture with id '{lecture_id}'"))
return is_authenticated(lecture.effective_view_permissions)
def is_authenticated(auth_list: [], for_lecture: bool):
def is_authenticated(perm: EffectiveViewPermissions):
if is_moderator():
return True
perm = get_effective_view_permissions(auth_list, for_lecture)
match perm.type:
case ViewPermissionsType.PUBLIC:
return True
......@@ -332,7 +125,7 @@ def is_authenticated(auth_list: [], for_lecture: bool):
return True
case ViewPermissionsType.INHERIT:
raise ValueError("Got INHERIT permission from get_effective_view_permissions")
raise ValueError("Got INHERIT permission from EffectiveViewPermissions")
case _:
raise ValueError(f"Unknown type {perm.type}")
......@@ -343,8 +136,12 @@ def authenticate_password(lecture_id: int, username: str, password: str):
Only returns if authentication was successful.
"""
from api.course import lecture_query_auth
perm = get_effective_view_permissions(lecture_query_auth(lecture_id), True)
lecture = database.query_one_or_none_and_expunge(Lecture.select(False, True))
if lecture is None:
raise ApiClientException(ERROR_OBJECT_ERROR(f"Unknown lecture with id '{lecture_id}'"))
perm = lecture.effective_view_permissions
if perm.type != ViewPermissionsType.AUTHENTICATION or len(perm.passwords) == 0:
raise ApiClientException(ERROR_LECTURE_HAS_NO_PASSWORD)
......@@ -404,38 +201,6 @@ else:
raise ApiClientException(ERROR_AUTHENTICATION_NOT_AVAILABLE())
_SQL_GET_USER_BY_NAME = PreparedStatement("""
SELECT * FROM "users" WHERE "name" = ?
""")
_SQL_INSERT_USER = PreparedStatement("""
INSERT INTO "users"
("name", "realname", "fsacc", "level", "calendar_key", "rfc6238")
VALUES (?, ?, ?, 1, '', '')
RETURNING "id"
""")
def _db_get_user_id(trans: ReadTransaction, user_id: str) -> int:
user_list_db = trans.execute_statement_and_close(_SQL_GET_USER_BY_NAME, user_id)
if len(user_list_db) < 1:
raise ApiClientException(ERROR_AUTHENTICATION_NOT_AVAILABLE(
"Site is read-only and we can not create a new account for you in the database"))
user_db = user_list_db[0]
return user_db["id"]
def _db_get_or_create_user_id(trans: WriteTransaction, user_id: str, given_name: str) -> int:
user_list_db = trans.execute_statement(_SQL_GET_USER_BY_NAME, user_id)
if len(user_list_db) < 1:
result = trans.execute_statement_and_commit(
_SQL_INSERT_USER, user_id, given_name, user_id)
return result[0]["id"]
else:
trans.commit()
user_db = user_list_db[0]
return user_db["id"]
def authenticate_fsmpi(username: str, password: str) -> {}:
"""
May throw APIClientException.
......@@ -445,17 +210,31 @@ def authenticate_fsmpi(username: str, password: str) -> {}:
user_data = __ldap_authenticate(username, password)
if user_data is None:
raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
user_id, given_name, surname, groups = user_data
user_handle, given_name, surname, groups = user_data
if not __ldap_is_moderator(groups):
raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
if api.live_config.is_readonly():
user_db_id = db_pool.execute_read_transaction(_db_get_user_id, user_id)
user_db = database.query_one_or_none_and_expunge(User.select(is_mod=True).where(User.handle == user_handle))
if user_db is None:
raise ApiClientException(ERROR_AUTHENTICATION_NOT_AVAILABLE(
"Site is read-only and we can not create a new account for you in the database"))
user_db_id = user_db.id
else:
user_db_id = db_pool.execute_write_transaction(_db_get_or_create_user_id, user_id, given_name)
def _trans(session: SessionDb):
user_db = session.scalar(User.select(is_mod=True).where(User.handle == user_handle))
if user_db is None:
# TODO test new
user_db = User(handle=user_handle, name=given_name)
session.add(user_db)
user_db.last_login = datetime.now()
user_id = user_db.id
session.commit()
return user_id
user_db_id = database.execute_write_transaction(_trans)
session["user"] = {
"uid": user_id,
"uid": user_handle,
"dbid": user_db_id,
"givenName": given_name,
"sn": surname
......
from api.database import *
from api.miscellaneous import *
from api.authentication import get_authentication_methods
import api
COURSE_SECONDARY_COLUMN_VISIBLE = "course_visible"
COURSE_SECONDARY_COLUMN_DOWNLOADABLE = "course_downloadable"
COURSE_SECONDARY_COLUMN_EMBED_INVISIBLE = "course_embedinvisible"
COURSE_SECONDARY_DB_SELECTION = f"""
"courses"."handle" AS "course_handle",
"courses"."title" AS "course_title",
"courses"."short" AS "course_short",
"courses"."organizer" AS "course_organizer",
"courses"."subject" AS "course_subject",
"courses"."description" AS "course_description",
"courses"."coursechapters" AS "course_coursechapters",
"courses"."semester" AS "course_semester",
"courses"."login_info" AS "course_login_info",
"courses"."listed" AS "course_listed",
"courses"."visible" AS "{COURSE_SECONDARY_COLUMN_VISIBLE}",
"courses"."embedinvisible" AS "{COURSE_SECONDARY_COLUMN_EMBED_INVISIBLE}",
"courses"."downloadable" AS "{COURSE_SECONDARY_COLUMN_DOWNLOADABLE}"
"""
def _semester_db_to_json(semester_db: str):
if semester_db is None or len(semester_db) == 0 or semester_db.isspace():
return "none" # pragma: no cover
from api.objects.type import SEMESTER_STRING_PATTERN
if SEMESTER_STRING_PATTERN.fullmatch(semester_db) is None: # pragma: no cover
print(f"Warning: Invalid semester string in database: {truncate_string(semester_db)}")
return "none"
return semester_db
def course_list_db_to_json_no_lectures(courses_db: [], auth_db: [], is_mod: bool):
"""
:param courses_db: Must be sorted by id ascending
:param auth_db: authentication methods. Must be sorted by id ascending
:param is_mod:
:return:
"""
auth_i = 0
courses_json = []
for course_db in courses_db:
auth_i, course_auth_db = db_collect_id_sorted_data(auth_db, auth_i, "course_id", course_db["id"])
courses_json.append(course_db_to_json_no_lectures(course_db, course_auth_db, is_mod))
return courses_json
def course_db_to_json_no_auth_lectures(course_db: {}, is_mod: bool):
course_json = {
"id": course_db["id"],
"id_string": course_db["handle"],
"full_name": course_db["title"],
"short_name": course_db["short"],
"semester": _semester_db_to_json(course_db["semester"]),
"organizer": course_db["organizer"],
"topic": course_db["subject"],
"description": course_db["description"],
"show_chapters_on_course": bool(course_db["coursechapters"])
}
if len(course_db["login_info"]) > 0:
course_json["authentication_information"] = course_db["login_info"]
if is_mod:
course_json["is_listed"] = bool(course_db["listed"])
course_json["is_visible"] = bool(course_db["visible"])
return course_json
def course_db_to_json_no_lectures(course_db: {}, auth_db: [], is_mod: bool):
course_json = course_db_to_json_no_auth_lectures(course_db, is_mod)
course_json["default_authentication_methods"] = get_authentication_methods(auth_db, False)
return course_json
def course_secondary_db_to_json_no_lectures(secondary_course_db: {}, auth_db: [], is_mod: bool):
course_json = {
"id": secondary_course_db["course_id"],
"id_string": secondary_course_db["course_handle"],
"full_name": secondary_course_db["course_title"],
"short_name": secondary_course_db["course_short"],
"organizer": secondary_course_db["course_organizer"],
"topic": secondary_course_db["course_subject"],
"description": secondary_course_db["course_description"],
"show_chapters_on_course": bool(secondary_course_db["course_coursechapters"]),
"default_authentication_methods": get_authentication_methods(auth_db, False)
}
if "course_semester" in secondary_course_db:
course_json["semester"] = _semester_db_to_json(secondary_course_db["course_semester"])
if "course_login_info" in secondary_course_db and len(secondary_course_db["course_login_info"]) > 0:
course_json["authentication_information"] = secondary_course_db["course_login_info"]
if is_mod:
course_json["is_listed"] = bool(secondary_course_db["course_listed"])
course_json["is_visible"] = bool(secondary_course_db["course_visible"])
return course_json
_SQL_GET_COURSE_AUTH = PreparedStatement("""
SELECT * FROM "perm"
WHERE "perm"."course_id" = ?
AND (NOT "deleted")
""")
def course_query_auth(course_id: int, transaction: AbstractTransaction or None = None) -> DbResultSet:
"""
Returns a result even if course is not visible
"""
if transaction is None:
return db_pool.execute_read_statement_in_transaction(_SQL_GET_COURSE_AUTH, course_id)
else:
return transaction.execute_statement(_SQL_GET_COURSE_AUTH, course_id)
def course_queue_query_auth(transaction: AbstractTransaction, course_id: int) -> FilledStatement:
"""
Returns a result even if course is not visible
"""
return transaction.queue_statement(_SQL_GET_COURSE_AUTH, course_id)
def lecture_db_to_json_no_chapters_media(lecture_db: {}, auth_db: [], course_allow_embed: bool, is_mod: bool):
lecture_id: int = lecture_db["id"]
lecture_json = {
"id": lecture_id,
"course_id": lecture_db["course_id"],
"title": lecture_db["title"],
"speaker": lecture_db["speaker"],
"location": lecture_db["place"],
"time": lecture_db["time"].replace(tzinfo=None).isoformat(sep="T", timespec="seconds"),
"duration": lecture_db["duration"],
"description": lecture_db["comment"],
"thumbnail_url": api.config["FILE_PATH_PREFIX"] + "thumbnail/l_" + str(lecture_id) + ".jpg",
"no_recording": bool(lecture_db["norecording"]),
"livestream_planned": bool(lecture_db["live"] or lecture_db["stream_job"]),
"authentication_methods": get_authentication_methods(auth_db, True),
"allow_embed": course_allow_embed
}
if lecture_db["stream_job"]:
lecture_json["livestream_url"] = "pub/hls/" + str(lecture_id) + ".m3u8"
if is_mod:
lecture_json["is_visible"] = bool(lecture_db["visible"])
lecture_json["internal_comment"] = lecture_db["internal"]
return lecture_json
_SQL_GET_LECTURE_AUTH_WITH_COURSE_ID = PreparedStatement("""
SELECT "perm".*
FROM "perm"
WHERE ("perm"."lecture_id" = ? OR "perm"."course_id" = ?)
AND (NOT "deleted")
""")
_SQL_GET_LECTURE_AUTH_NO_COURSE_ID = PreparedStatement("""
SELECT "perm".*
FROM "lectures"
JOIN "perm" ON ("perm"."lecture_id" = "lectures"."id" OR "perm"."course_id" = "lectures"."course_id")
WHERE
"lectures"."id" = ?
AND (NOT "perm"."deleted")
""")
def lecture_query_auth(lecture_id: int,
course_id: int = None,
transaction: AbstractTransaction = None) -> DbResultSet:
if course_id:
if transaction is None:
return db_pool.execute_read_statement_in_transaction(
_SQL_GET_LECTURE_AUTH_WITH_COURSE_ID, lecture_id, course_id)
else:
return transaction.execute_statement(_SQL_GET_LECTURE_AUTH_WITH_COURSE_ID, lecture_id, course_id)
else:
if transaction is None:
return db_pool.execute_read_statement_in_transaction(
_SQL_GET_LECTURE_AUTH_NO_COURSE_ID, lecture_id)
else:
return transaction.execute_statement(_SQL_GET_LECTURE_AUTH_NO_COURSE_ID, lecture_id)
def lecture_queue_query_auth(transaction: AbstractTransaction,
lecture_id: int,
course_id: int = None) -> FilledStatement:
if course_id:
return transaction.queue_statement(_SQL_GET_LECTURE_AUTH_WITH_COURSE_ID, lecture_id, course_id)
else:
return transaction.queue_statement(_SQL_GET_LECTURE_AUTH_NO_COURSE_ID, lecture_id)
def chapter_db_to_json(chapter_db: {}, is_mod: bool):
chapter_json = {
"start_time": chapter_db["time"],
"name": chapter_db["text"]
}
if is_mod:
chapter_json["id"] = chapter_db["id"]
chapter_json["is_visible"] = bool(chapter_db["visible"])
return chapter_json
def media_source_with_format_db_to_json(media_source_db: {}, course_is_downloadable: bool, is_mod: bool):
media_source_json = {
"quality": {
"name": media_source_db["format_description"],
"resolution": media_source_db["format_resolution"],
"aspect_ration": media_source_db["format_aspect"],
"priority": media_source_db["format_player_prio"]
},
"size": media_source_db["file_size"],
"comment": media_source_db["comment"],
"player_url": api.config["FILE_PATH_PREFIX"] + media_source_db["path"],
}
if media_source_db["downloadable"] == 1 and course_is_downloadable:
media_source_json["download_url"] = media_source_json["player_url"]
if is_mod:
media_source_json["id"] = media_source_db["id"]
media_source_json["is_visible"] = bool(media_source_db["visible"])
return media_source_json
def course_queue_search(transaction: ReadTransaction, search_term: str, is_mod: bool):
return _queue_search(
transaction,
"courses",
["title", "short", "handle", "organizer", "subject", "description"],
None,
None,
None if is_mod else 'WHERE "courses"."visible" AND "courses"."listed"',
'"courses"."semester" DESC',
20,
search_term
)
def lecture_queue_search(transaction: ReadTransaction, search_term: str, is_mod: bool):
return _queue_search(
transaction,
"lectures",
["title", "comment", "speaker"],
'JOIN "courses" ON ("lectures"."course_id" = "courses"."id")',
COURSE_SECONDARY_DB_SELECTION,
None if is_mod else 'WHERE "courses"."visible" AND "courses"."listed" AND "lectures"."visible"',
'"lectures"."time" DESC',
30,
search_term
)
def _queue_search(transaction: ReadTransaction,
table: str,
search_columns: list[str],
join_clause: str or None,
extra_select_columns: str or None,
where_clause: str or None,
extra_ordering: str or None,
limit: int,
search_term: str):
base_sub_query = f"""
SELECT "{table}"."id" AS "_id", CAST(%s AS INT) AS "_priority" FROM "{table}" WHERE {" OR ".join(
map(lambda column: f'LOWER("{table}"."{column}") LIKE ?',
search_columns))}
"""
words: list[str] = list(filter(lambda w: not w.isspace(), search_term.split(" ")))
if len(words) == 0:
raise ValueError("No words provided") # pragma: no cover
sub_queries: list[str] = []
all_values: list[DbValueType] = []
prio = len(words)
for word in words:
word = word.lower()
word = word.replace("%", "\\%").replace("_", "\\_")
word = "%" + word + "%"
sub_queries.append(base_sub_query % prio)
for _ in range(0, len(search_columns)):
all_values.append(word)
prio -= 1
return transaction.queue_statement(f"""
SELECT "{table}".* {"" if extra_select_columns is None else "," + extra_select_columns}
FROM "{table}"
JOIN (
SELECT "_id", CAST(SUM("_priority") AS INT) AS "_score"
FROM ({"UNION ALL".join(sub_queries)}) AS "_sub_result"
GROUP BY "_id"
) AS "_data" ON ("{table}"."id" = "_data"."_id")
{"" if join_clause is None else join_clause}
{"" if where_clause is None else where_clause}
ORDER BY "_data"."_score" DESC{"" if extra_ordering is None else ", " + extra_ordering} LIMIT {limit}
""", *all_values)
import sys
from datetime import datetime
from typing import Literal
from videoag_common.database import *
import api
from api.miscellaneous import DIAGNOSTICS_TRACKER, DiagnosticsCounter
DbValueType = int | str | bytes | datetime
_TransactionType = Literal["read", "write"]
def _create_diagnostics_counters(id: str) -> dict[_TransactionType, DiagnosticsCounter]:
return {
"read": DIAGNOSTICS_TRACKER.register_counter(f"database.transaction.read.{id}"),
"write": DIAGNOSTICS_TRACKER.register_counter(f"database.transaction.write.{id}")
}
_COUNTERS_TRANSACTION_COUNT = _create_diagnostics_counters("count")
_COUNTERS_TRANSACTION_ABORTED_BY_USER = _create_diagnostics_counters("aborted_by_user")
_COUNTERS_TRANSACTION_CONFLICTS = _create_diagnostics_counters("conflicts")
_COUNTERS_TRANSACTION_ABORTS_AFTER_REPEATED_CONFLICTS = _create_diagnostics_counters("aborts_after_repeated_conflicts")
class ApiDatabase(Database):
def _on_transaction_started(self, writeable: bool):
_COUNTERS_TRANSACTION_COUNT["write" if writeable else "read"].trigger()
def _on_transaction_aborted_by_user(self, writeable: bool):
_COUNTERS_TRANSACTION_ABORTED_BY_USER["write" if writeable else "read"].trigger()
def _on_transaction_conflict(self, writeable: bool):
_COUNTERS_TRANSACTION_CONFLICTS["write" if writeable else "read"].trigger()
def _on_transaction_aborted_after_repeated_conflict(self, writeable: bool):
_COUNTERS_TRANSACTION_ABORTS_AFTER_REPEATED_CONFLICTS["write" if writeable else "read"].trigger()
database = ApiDatabase(api.config["DATABASE"])
if api.config["DATABASE"].get("log_all_statements", False):
def _init_logger():
import logging
logger = logging.getLogger('sqlalchemy.engine')
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
_init_logger()
import os.path
import sqlite3
from api.database.database import (PreparedStatement, FilledStatement,
AbstractTransaction, ReadTransaction, WriteTransaction,
DbConnection,
DbConnectionPool,
TransactionIsolationLevel,
DbResult, DbResultSet, DbResultRow, DbValueType, DatabaseWarning,
DB_RESULT_EXCEPTION, DB_RESULT_WARNINGS, DB_RESULT_SET, DB_RESULT_AFFECTED_ROWS)
def __create_pool() -> DbConnectionPool:
import api
if api.config["DB_ENGINE"] == "mysql":
print("Using MySQL engine for database")
import api.database.mysql_connector
from api.miscellaneous import DEBUG_ENABLED
db_config = api.config["MYSQL"]
__factory = mysql_connector.MySqlDbConnectionFactory(
not DEBUG_ENABLED,
DEBUG_ENABLED,
db_config.get("host", None),
db_config.get("port", 3306),
db_config.get("unix_socket", None),
db_config["user"],
db_config["password"],
db_config["database"]
)
elif api.config["DB_ENGINE"] == "postgres":
print("Using PostgreSQL engine for database")
import api.database.postgres_connector
from api.miscellaneous import DEBUG_ENABLED
db_config = api.config["POSTGRES"]
__factory = postgres_connector.PostgresDbConnectionFactory(
not DEBUG_ENABLED,
DEBUG_ENABLED,
db_config["host"],
db_config.get("port", 5432),
db_config["user"],
db_config["password"],
db_config["database"]
)
elif api.config["DB_ENGINE"] == "sqlite":
print("Using SQLite engine for database")
import api.database.sqlite_connector
from pathlib import Path
db_config = api.config["SQLITE"]
db_path = Path(os.path.join(os.getcwd(), db_config["file"]))
print(f"SQLite file: {db_path}")
if db_config.get("init_schema", False) or db_config.get("init_data", False):
file_existed = db_path.exists()
db = sqlite3.connect(db_path)
cur = db.cursor()
if db_config.get("init_schema", False):
print("Initializing sqlite schema")
cur.executescript(Path(os.path.join(os.getcwd(), db_config["schema"])).read_text(encoding="UTF-8"))
if db_config.get("init_data", False) and not file_existed:
print("Initializing sqlite data")
cur.executescript(Path(os.path.join(os.getcwd(), api.config["DB_DATA"])).read_text(encoding="UTF-8"))
db.commit()
db.close()
__factory = sqlite_connector.SqLiteDbConnectionFactory(db_path)
else: # pragma: no cover
raise ValueError(f"Unknown database engine: {api.config['DB_ENGINE']}")
connection_config = api.config["DB_CONNECTIONS"]
return DbConnectionPool(
__factory,
connection_config["max_count"],
connection_config["readonly_percent"],
connection_config["max_wait_time_sec"],
connection_config["max_waiting_count"],
connection_config["max_read_attempts"],
connection_config["max_write_attempts"]
)
db_pool = __create_pool()
import time
from typing import TypeVar, Generic
from abc import ABC, ABCMeta, abstractmethod
import traceback
from functools import reduce
from api.database.database import (PreparedStatement, FilledStatement, DbConnection,
DbResult, DbResultSet, DbValueType, DbAffectedRows,
DatabaseWarning, DatabaseError,
SQL_PARAMETER_INDICATOR, TransactionConflictError)
_DEBUG_PRINT_STATEMENT_EXECUTION: bool = False
Connection = TypeVar("Connection")
Cursor = TypeVar("Cursor")
class PythonDbConnection(DbConnection, Generic[Connection, Cursor], ABC):
__metaclass__ = ABCMeta
def __init__(self,
parameter_indicator: str,
caller_exception: tuple[type, ...],
py_connection: Connection):
super().__init__()
self._parameter_indicator = parameter_indicator
self._caller_exception = caller_exception
self._py_connection = py_connection
self._closed = False
# str is statement with SQL_PARAMETER_INDICATOR replaced
self._prepared_statement_cache: dict[PreparedStatement, tuple[str, Cursor or None]] = {}
self._unprepared_cursor: Cursor or None = None
self._last_successful_request: int = time.time_ns()
def _clear_cache_on_new_connection(self):
self._prepared_statement_cache = {}
self._unprepared_cursor = None
def execute_statements(self, statements: [FilledStatement]) -> list[DbResult]:
if len(statements) > 1 and self._can_use_multi_query(statements):
return self._execute_statements_multi_query(statements)
else:
return self._execute_statements_prepared(statements)
def _execute_statements_prepared(self, statements: [FilledStatement]) -> list[DbResult]:
if self._closed:
raise RuntimeError("Already closed") # pragma: no cover
results: list[DbResult] = []
for filled_stat in statements:
if _DEBUG_PRINT_STATEMENT_EXECUTION: # pragma: no cover
start_time = time.time_ns()
print(f"Executing statement:\n{filled_stat.statement}\nwith values: {filled_stat.values}")
exception = None
try:
if isinstance(filled_stat.statement, PreparedStatement):
res = self._execute_prepared_statement(filled_stat.statement, filled_stat.values)
else:
assert isinstance(filled_stat.statement, str)
res = self._execute_unprepared_statement(filled_stat.statement, filled_stat.values)
warnings, affected_rows, result_set = res
except self._caller_exception as e: # pragma: no cover
exception = e
warnings = []
affected_rows = None
result_set = []
except Exception as e:
if self._is_transaction_conflict_exception(e):
raise TransactionConflictError("Conflict in transactions") from e
if filled_stat.queue_traceback is not None:
raise DatabaseError(f"An exception occurred while executing statement, queued at:\n\n"
f"{reduce(lambda res, s: res + s, traceback.format_list(filled_stat.queue_traceback), '')}") from e
else:
raise DatabaseError("An exception occurred while executing statement") from e
results.append((exception, warnings, result_set, affected_rows))
if _DEBUG_PRINT_STATEMENT_EXECUTION: # pragma: no cover
# noinspection PyUnboundLocalVariable
print(f"Executed statement in {((time.time_ns() - start_time )//1000 ) /1000} ms\n")
return results
def _execute_statements_multi_query(self, statements: [FilledStatement]) -> list[DbResult]:
if self._closed:
raise RuntimeError("Already closed")
if _DEBUG_PRINT_STATEMENT_EXECUTION:
print("Executing multi query with following statements:")
full_query = ""
all_values = []
for filled_stat in statements:
if _DEBUG_PRINT_STATEMENT_EXECUTION:
print(f"{filled_stat.statement}\nwith values: {filled_stat.values}\n")
if full_query != "":
full_query += ";"
if isinstance(filled_stat.statement, PreparedStatement):
replaced_stat, cursor = self._get_prepared_statement_data(filled_stat.statement)
else:
replaced_stat = self._replace_parameter_indicator(filled_stat.statement)
full_query += replaced_stat
all_values.extend(filled_stat.values)
if self._unprepared_cursor is None:
self._unprepared_cursor = self._create_cursor(False)
if _DEBUG_PRINT_STATEMENT_EXECUTION:
start_time = time.time_ns()
results: list[DbResult] = []
try:
for warnings, affected_rows, result_set in self._db_execute_multiple_statements(
self._unprepared_cursor, full_query, all_values):
results.append((None, warnings, result_set, affected_rows))
except self._caller_exception as e:
for i in range(0, len(statements)):
results.append((e, [], [], None, None))
except Exception as e:
raise DatabaseError("An exception occurred while executing statements") from e
if _DEBUG_PRINT_STATEMENT_EXECUTION:
# noinspection PyUnboundLocalVariable
print(f"Executed query in {((time.time_ns() - start_time) // 1000) / 1000} ms\n")
return results
def _replace_parameter_indicator(self, statement: str) -> str:
return statement.replace(SQL_PARAMETER_INDICATOR, self._parameter_indicator)
def _get_prepared_statement_data(self, statement: PreparedStatement):
if statement in self._prepared_statement_cache:
return self._prepared_statement_cache[statement]
replaced_stat = self._replace_parameter_indicator(statement.statement)
data = (replaced_stat, self._create_cursor(True))
self._prepared_statement_cache[statement] = data
return data
def _execute_prepared_statement(self,
statement: PreparedStatement,
values: list[DbValueType]) -> tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]:
replaced_stat, cursor = self._get_prepared_statement_data(statement)
return self._db_execute_single_statement(cursor, replaced_stat, values)
def _execute_unprepared_statement(self,
statement: str,
values: list[DbValueType]) -> tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]:
if self._unprepared_cursor is None:
self._unprepared_cursor = self._create_cursor(False)
replaced_stat = self._replace_parameter_indicator(statement)
return self._db_execute_single_statement(self._unprepared_cursor, replaced_stat, values)
@staticmethod
def _create_column_mapping(cursor: Cursor) -> dict[str, int]:
column_mapping = {}
i = -1
for column_description in cursor.description:
i += 1
name = column_description[0] # As in the Python DB Api v2
column_mapping[name] = i
return column_mapping
@abstractmethod
def _can_use_multi_query(self, statements: [FilledStatement]):
pass # pragma: no cover
@abstractmethod
def _create_cursor(self, prepared: bool):
pass # pragma: no cover
@abstractmethod
def _is_transaction_conflict_exception(self, exception: Exception) -> bool:
pass # pragma: no cover
@abstractmethod
def _db_execute_single_statement(self,
cursor: Cursor,
statement: str,
values: list[DbValueType]) -> tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]:
pass # pragma: no cover
@abstractmethod
def _db_execute_multiple_statements(self,
cursor: Cursor,
statements: str,
values: list[DbValueType]) -> list[tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]]:
pass # pragma: no cover
def execute_script(self, script: str):
if self._closed:
raise RuntimeError("Already closed") # pragma: no cover
if _DEBUG_PRINT_STATEMENT_EXECUTION: # pragma: no cover
start_time = time.time_ns()
print(f"Executing script")
try:
if self._unprepared_cursor is None:
self._unprepared_cursor = self._create_cursor(False)
self._db_execute_script(self._unprepared_cursor, script)
except Exception as e:
raise DatabaseError("An exception occurred while executing script") from e
if _DEBUG_PRINT_STATEMENT_EXECUTION: # pragma: no cover
# noinspection PyUnboundLocalVariable
print(f"Executed script in {((time.time_ns() - start_time) // 1000) / 1000} ms\n")
@abstractmethod
def _db_execute_script(self, cursor: Cursor, script: str):
"""
See execute_script of :class:`DbConnectionPool`
"""
pass # pragma: no cover
This diff is collapsed.
from mysql.connector.cursor import MySQLCursor
from mysql.connector.errors import ProgrammingError, DataError, NotSupportedError, InterfaceError
from mysql.connector.version import VERSION_TEXT
from mysql.connector import MySQLConnection, NUMBER, STRING, BINARY, DATETIME, InternalError
from time import time_ns
from datetime import datetime, date
from api.database import DbValueType, DbResultSet, FilledStatement
from api.database.database import PreparedStatement, DbConnectionFactory, DatabaseWarning, DatabaseError, \
DbAffectedRows, \
DatabaseResultRow, TransactionIsolationLevel
from api.database.abstract_py_connector import PythonDbConnection
print(f"Using MySQL Connector Version: {VERSION_TEXT}")
_GRACE_TIME_NO_PING_NS: int = 10 * 1000 * 1000 * 1000
class _MySqlDbConnection(PythonDbConnection[MySQLConnection, MySQLCursor]):
def __init__(self, use_multi_query: bool, fetch_warnings: bool, py_connection: MySQLConnection):
super().__init__(
"%s",
(ProgrammingError, DataError, NotSupportedError),
py_connection)
self._use_multi_query = use_multi_query
self._get_warnings = fetch_warnings
self._disconnected = False
def _can_use_multi_query(self, statements: [FilledStatement]):
return self._use_multi_query
def is_disconnected(self, force_ping: bool = False) -> bool:
if self._closed:
raise RuntimeError("Already closed")
if self._disconnected:
return True
if not force_ping and self._last_successful_request + _GRACE_TIME_NO_PING_NS > time_ns():
return False
if self._py_connection.is_connected():
return False
self._disconnected = True
return True
def try_reconnect(self) -> bool:
# noinspection PyBroadException
try:
self._py_connection.reconnect(attempts=1)
self._py_connection.cmd_query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")
self._py_connection.cmd_query("SET SESSION sql_mode = 'ANSI_QUOTES'")
self._py_connection.get_warnings = self._get_warnings
except Exception:
return False
self._clear_cache_on_new_connection()
self._disconnected = False
return True
def close(self):
self._closed = True
self._py_connection.close()
def get_transaction_begin_statements(self, writable: bool, isolation_level: TransactionIsolationLevel) -> list[PreparedStatement or str]:
if writable:
return [
f"SET SESSION TRANSACTION ISOLATION LEVEL {isolation_level.value}",
"START TRANSACTION READ WRITE",
]
else:
return [
f"SET SESSION TRANSACTION ISOLATION LEVEL {isolation_level.value}",
"START TRANSACTION READ ONLY",
]
def get_transaction_end_statement(self, commit: bool) -> PreparedStatement or str:
if commit:
return "COMMIT"
else:
return "ROLLBACK"
def _create_cursor(self, prepared: bool):
return self._py_connection.cursor(prepared=prepared)
def _is_transaction_conflict_exception(self, exception: Exception) -> bool:
return isinstance(exception, InternalError) and "try restarting transaction" in exception.msg
def _db_execute_single_statement(self,
cursor: MySQLCursor,
statement: str,
values: list[DbValueType]) -> tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]:
# Note: Warnings seem to be broken. When a warning is generated, an error is thrown in the library:
# TypeError: expected string or bytes-like object, got 'Warning'
cursor.execute(statement, params=values)
return self._get_result(cursor)
def _db_execute_multiple_statements(self,
cursor: MySQLCursor,
statements: str,
values: list[DbValueType]) -> list[tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]]:
# With multi=True the warnings seem to break. When a warning is generated we can't seem to be able to retrieve
# it, but in later queries (without multi) the following error is thrown:
# InterfaceError: Use cmd_query_iter for statements with multiple queries.
results = []
for res_cursor in cursor.execute(statements, params=values, multi=True):
results.append(self._get_result(res_cursor))
return results
def _get_result(self, cursor) -> tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]:
row_count = cursor.rowcount
result_set = self._get_result_set(cursor)
if row_count == -1:
# Do not get rowcount after fetchall if it was valid before. If there is no result, mysql sometimes
# just sets this to 0 in fetchall
row_count = cursor.rowcount
return self._fetch_warnings(cursor), row_count, result_set
def _fetch_warnings(self, cursor: MySQLCursor) -> list[DatabaseWarning]:
if not self._get_warnings:
return []
try:
mysql_warnings = cursor.fetchwarnings()
if mysql_warnings is None:
warnings = []
else:
# mysql warning is tuple (type, id, message)
warnings = list(map(lambda warn: DatabaseWarning(f"{warn[0]} ({warn[1]}): {warn[2]}"), mysql_warnings))
return warnings
except InterfaceError as e:
if e.msg == "Failed getting warnings; No result set to fetch from.": # Why ever this throws an exception
return []
raise e
@staticmethod
def _get_result_set(cursor: MySQLCursor) -> DbResultSet:
try:
cursor_rows = cursor.fetchall()
except InterfaceError as e:
if e.msg == "No result set to fetch from.": # Why ever this throws an exception
return []
raise e
if len(cursor_rows) == 0:
return []
column_mapping = PythonDbConnection._create_column_mapping(cursor)
result_set = []
for cursor_row in cursor_rows:
result_row = []
for type_code, cursor_value in zip(
map(lambda desc: desc[1], # As in the Python DB Api v2
cursor.description),
cursor_row):
if cursor_value is None:
result_value = None
elif type_code == NUMBER:
if isinstance(cursor_value, int):
result_value = cursor_value
else:
raise TypeError(f"Unknown type {type(cursor_value).__name__} for NUMBER") # pragma: no cover
# Unfortunately there is no way to know if it's a BLOB or a TEXT column in MySql. As we currently don't
# use any BLOBs we just interpret all binary columns as strings
# (Note that the flags in the cursor.description (8th entry) do not contain this information either)
elif type_code == STRING or type_code == BINARY:
if isinstance(cursor_value, str):
result_value = cursor_value
elif isinstance(cursor_value, bytes):
result_value = cursor_value.decode()
elif isinstance(cursor_value, bytearray):
result_value = cursor_value.decode()
else:
raise TypeError(
f"Unknown type {type(cursor_value).__name__} for STRING OR BINARY") # pragma: no cover
elif type_code == DATETIME:
if isinstance(cursor_value, datetime):
result_value = cursor_value
elif isinstance(cursor_value, date):
result_value = datetime(
year=cursor_value.year,
month=cursor_value.month,
day=cursor_value.day
)
else:
raise TypeError(f"Unknown type {type(cursor_value).__name__} for DATETIME") # pragma: no cover
else:
raise ValueError(f"Unknown type code {type_code}") # pragma: no cover
result_row.append(result_value)
result_set.append(DatabaseResultRow(column_mapping, tuple(result_row)))
return result_set
def _db_execute_script(self, cursor: MySQLCursor, script: str):
# See comment on multi=True in _db_execute_multiple_statements
for res_cursor in cursor.execute(script, multi=True):
warnings = self._fetch_warnings(res_cursor)
if len(warnings) > 0:
raise DatabaseError(f"Got warnings during execution of script: {warnings}")
try:
res_cursor.fetchall() # We need to always fetch these
except InterfaceError as e:
if e.msg == "No result set to fetch from.": # Why ever this throws an exception
continue
raise e
class MySqlDbConnectionFactory(DbConnectionFactory):
def __init__(self, use_multi_query: bool, fetch_warnings: bool,
host: str, port: int, unix_socket: str,
username: str, password: str, database: str):
super().__init__()
if use_multi_query and fetch_warnings:
# In this case MySQL Connector seems buggy/throws exceptions
raise ValueError("Warnings cannot be fetched with multi queries")
self._use_multi_query = use_multi_query
self._fetch_warnings = fetch_warnings
self._host = host
self._port = port
self._unix_socket = unix_socket
self._username = username
self._password = password
self._database = database
def supports_per_transaction_writeable_flag(self) -> bool:
return True
def new_connection(self, writable: bool = True) -> _MySqlDbConnection:
try:
mysql_connection = MySQLConnection(
host=self._host,
port=self._port,
unix_socket=self._unix_socket,
user=self._username,
password=self._password,
database=self._database
)
mysql_connection.cmd_query("SET SESSION sql_mode = 'ANSI_QUOTES'")
mysql_connection.get_warnings = self._fetch_warnings
except Exception as e:
raise DatabaseError("An exception occurred while connecting to database") from e
return _MySqlDbConnection(self._use_multi_query, self._fetch_warnings, mysql_connection)
from typing import Callable
from psycopg.errors import Diagnostic, SerializationFailure
from psycopg import Connection, Cursor
from psycopg.cursor import TUPLES_OK
import psycopg
from api.database import DbValueType, DbResultSet, FilledStatement
from api.database.database import PreparedStatement, DbConnectionFactory, DatabaseWarning, DatabaseError, \
DbAffectedRows, \
DatabaseResultRow, TransactionIsolationLevel
from api.database.abstract_py_connector import PythonDbConnection
print(f"Using psycopg (Postgres) Connector Version: {psycopg.version.__version__}")
class _PostgresDbConnection(PythonDbConnection[Connection, Cursor]):
def __init__(self,
use_multi_query: bool,
fetch_warnings: bool,
create_new_connection: Callable[[], Connection],
py_connection: Connection):
super().__init__(
"%s",
(), # Can't continue transaction after any exceptions
py_connection)
self._use_multi_query = use_multi_query
if fetch_warnings:
self._py_connection.add_notice_handler(self._handle_notice)
self._get_warnings = fetch_warnings
self._create_new_connection = create_new_connection
self._disconnected = False
self._last_warnings: list[DatabaseWarning] = []
def _handle_notice(self, diagnostic: Diagnostic):
self._last_warnings.append(DatabaseWarning(f"""\
severity: {diagnostic.severity}
severity_nonlocalized: {diagnostic.severity_nonlocalized}
sqlstate: {diagnostic.sqlstate}
message_primary: {diagnostic.message_primary}
message_detail: {diagnostic.message_detail}
message_hint: {diagnostic.message_hint}
statement_position: {diagnostic.statement_position}
internal_position: {diagnostic.internal_position}
internal_query: {diagnostic.internal_query}
context: {diagnostic.context}
schema_name: {diagnostic.schema_name}
table_name: {diagnostic.table_name}
column_name: {diagnostic.column_name}
datatype_name: {diagnostic.datatype_name}
constraint_name: {diagnostic.constraint_name}
source_file: {diagnostic.source_file}
source_line: {diagnostic.source_line}
source_function: {diagnostic.source_function}
"""))
def _can_use_multi_query(self, statements: [FilledStatement]):
return self._use_multi_query and all(map(lambda stat: len(stat.values) == 0, statements))
def is_disconnected(self, force_ping: bool = False) -> bool:
if self._closed:
raise RuntimeError("Already closed")
if self._disconnected:
return True
if not self._py_connection.closed: # There is no ping method
return False
self._disconnected = True
return True
def try_reconnect(self) -> bool:
# noinspection PyBroadException
try:
new_conn = self._create_new_connection()
except Exception:
return False
# noinspection PyBroadException
try:
self._py_connection.close()
except Exception:
pass
self._clear_cache_on_new_connection()
self._py_connection = new_conn
if self._get_warnings:
self._py_connection.add_notice_handler(self._handle_notice)
self._disconnected = False
return True
def close(self):
self._closed = True
self._py_connection.close()
def get_transaction_begin_statements(self, writable: bool, isolation_level: TransactionIsolationLevel) -> list[PreparedStatement or str]:
if writable:
return [f"START TRANSACTION ISOLATION LEVEL {isolation_level.value}, READ WRITE"]
else:
return [f"START TRANSACTION ISOLATION LEVEL {isolation_level.value}, READ ONLY"]
def get_transaction_end_statement(self, commit: bool) -> PreparedStatement or str:
if commit:
return "COMMIT"
else:
return "ROLLBACK"
def _create_cursor(self, prepared: bool):
return self._py_connection.cursor()
def _is_transaction_conflict_exception(self, exception: Exception) -> bool:
return isinstance(exception, SerializationFailure)
def _db_execute_single_statement(self,
cursor: Cursor,
statement: str,
values: list[DbValueType]) -> tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]:
cursor.execute(statement.encode(encoding="UTF-8"), params=values)
all_rows = self._fetch_all_rows(cursor)
if len(all_rows) > 0:
column_mapping = PythonDbConnection._create_column_mapping(cursor)
return (self._fetch_warnings(),
cursor.rowcount,
list(map(lambda row: DatabaseResultRow(column_mapping, tuple(row)), all_rows)))
def _db_execute_multiple_statements(self,
cursor: Cursor,
statements: str,
values: list[DbValueType]) -> list[tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]]:
cursor.execute(statements.encode(encoding="UTF-8"), params=values, prepare=False)
results = []
warnings = self._fetch_warnings()
while True:
all_rows = self._fetch_all_rows(cursor)
if len(all_rows) > 0:
column_mapping = PythonDbConnection._create_column_mapping(cursor)
results.append((warnings,
cursor.rowcount,
list(map(
lambda row: DatabaseResultRow(column_mapping, tuple(row)),
all_rows
))))
if not cursor.nextset():
break
return results
@staticmethod
def _fetch_all_rows(cursor: Cursor) -> list[tuple]:
if not cursor.pgresult or cursor.pgresult.status != TUPLES_OK:
return []
return cursor.fetchall()
def _fetch_warnings(self) -> list[DatabaseWarning]:
if not self._get_warnings:
return []
warnings = self._last_warnings
self._last_warnings = []
return warnings
def _db_execute_script(self, cursor: Cursor, script: str):
cursor.execute(script.encode(encoding="UTF-8"), prepare=False)
class PostgresDbConnectionFactory(DbConnectionFactory):
def __init__(self, use_multi_query: bool, fetch_warnings: bool,
host: str, port: int,
user: str, password: str,
database: str):
super().__init__()
self._use_multi_query = use_multi_query
self._fetch_warnings = fetch_warnings
self._host = host
self._port = port
self._user = user
self._password = password
self._database = database
def supports_per_transaction_writeable_flag(self) -> bool:
return True
def _create_new_connection(self) -> Connection:
return Connection.connect(
host=self._host,
port=self._port,
user=self._user,
password=self._password,
dbname=self._database,
autocommit=True # Yep, this is correct. When DISABLED, BEGIN's are send automatically, and we don't want that
)
def new_connection(self, writable: bool = True) -> _PostgresDbConnection:
try:
new_conn = self._create_new_connection()
except Exception as e:
raise DatabaseError("An exception occurred while connecting to database") from e
return _PostgresDbConnection(self._use_multi_query, self._fetch_warnings, self._create_new_connection, new_conn)
import sqlite3
from sqlite3 import Connection, Cursor, DatabaseError, DataError, ProgrammingError, NotSupportedError, OperationalError
from datetime import datetime
from os import PathLike
from api.database import DbValueType, DbResultSet, FilledStatement
from api.database.database import PreparedStatement, DbConnectionFactory, DatabaseWarning, DatabaseError, \
DbAffectedRows, \
DatabaseResultRow, TransactionIsolationLevel
from api.database.abstract_py_connector import PythonDbConnection
print(f"Using SQLite Connector Version: {sqlite3.sqlite_version}")
_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
def _convert_datetime_from_sqlite(value: bytes or None) -> datetime or None:
if value is None:
return None
value_string: str = value.decode()
if value_string == "0000-00-00 00:00:00":
return datetime(1, 1, 1, 0, 0)
datetime_string = value_string.split(".")[0] # Datetime sometimes contains a trailing .1234 for milliseconds, etc.
try:
return datetime.strptime(datetime_string, _DATETIME_FORMAT)
except ValueError as e: # pragma: no cover
print(f"Warning: Exception while converting datetime {value_string} from sqlite database: {str(e)}")
return datetime(1, 1, 1, 0, 0)
def _convert_datetime_to_sqlite(value: datetime or None) -> str or None:
if value is None:
return None
return value.strftime(_DATETIME_FORMAT)
sqlite3.register_converter("timestamp", _convert_datetime_from_sqlite)
sqlite3.register_converter("datetime", _convert_datetime_from_sqlite)
sqlite3.register_adapter(datetime, _convert_datetime_to_sqlite)
class SqLiteDbConnection(PythonDbConnection[Connection, Cursor]):
def __init__(self, py_connection: Connection):
super().__init__(
"?",
(ProgrammingError, DataError, NotSupportedError),
py_connection)
def _can_use_multi_query(self, statements: [FilledStatement]):
return False # Not supported by sqlite
def is_disconnected(self, force_ping: bool = False) -> bool:
if self._closed:
raise RuntimeError("Already closed") # pragma: no cover
return False # SQLite can't really loose connection
def try_reconnect(self) -> bool:
return True # SQLite can't really loose connection
def close(self):
self._closed = True
self._py_connection.close()
def get_transaction_begin_statements(self, writable: bool, isolation_level: TransactionIsolationLevel) -> list[PreparedStatement or str]:
return ["BEGIN DEFERRED TRANSACTION"] # Transactions in SQLite are serializable by default
def get_transaction_end_statement(self, commit: bool) -> PreparedStatement or str:
if commit:
return "COMMIT"
else:
return "ROLLBACK"
def _create_cursor(self, prepared: bool):
return self._py_connection.cursor() # SQLite does not have prepared cursors
def _is_transaction_conflict_exception(self, exception: Exception) -> bool:
return isinstance(exception, OperationalError) and "database is locked" in str(exception)
def _db_execute_single_statement(self,
cursor: Cursor,
statement: str,
values: list[DbValueType]) -> tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]:
cursor.execute(statement, values)
all_rows = cursor.fetchall()
if len(all_rows) > 0:
column_mapping = PythonDbConnection._create_column_mapping(cursor)
return ([], # Warnings seem to be thrown as exceptions
cursor.rowcount,
list(map(lambda row: DatabaseResultRow(column_mapping, tuple(row)), all_rows)))
def _db_execute_multiple_statements(self,
cursor: Cursor,
statements: str,
values: list[DbValueType]) -> list[tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]]:
raise RuntimeError("Multi query is not supported. This method should not have be called") # pragma: no cover
def _db_execute_script(self, cursor: Cursor, script: str):
cursor.executescript(script)
class SqLiteDbConnectionFactory(DbConnectionFactory):
def __init__(self, database: str | PathLike[str]):
super().__init__()
self._database = database
def supports_per_transaction_writeable_flag(self) -> bool:
return False
def new_connection(self, writable: bool = True) -> SqLiteDbConnection:
try:
sqlite_connection = sqlite3.connect(
database=self._database,
detect_types=sqlite3.PARSE_DECLTYPES,
check_same_thread=False
)
except Exception as e: # pragma: no cover
raise DatabaseError("An exception occurred while connecting to database") from e
return SqLiteDbConnection(sqlite_connection)
import math
import re
from api.database import *
from api.miscellaneous import *
FEEDBACK_EMAIL_MAX_LENGTH = 128
FEEDBACK_EMAIL_REGEX = "\\S+@\\S+\\.\\S+"
FEEDBACK_EMAIL_PATTERN = re.compile(FEEDBACK_EMAIL_REGEX)
FEEDBACK_TEXT_MAX_LENGTH = 16384
FEEDBACK_MIN_PAGE_SIZE = 10
FEEDBACK_MAX_PAGE_SIZE = 500
_SQL_INSERT_FEEDBACK = PreparedStatement(f"""
INSERT INTO "feedback" ("email", "text")
VALUES (?, ?)
""")
def _feedback_entry_db_to_json(feedback_entry_db: DbResultRow):
feedback_entry_json = {
"id": feedback_entry_db["id"],
"time_created": feedback_entry_db["time_created"].strftime(API_DATETIME_FORMAT),
"text": feedback_entry_db["text"]
}
if feedback_entry_db["email"] is not None:
feedback_entry_json["email"] = feedback_entry_db["email"]
return feedback_entry_json
def get_feedback_entries(
entries_per_page: int,
page: int) -> tuple[int, JsonTypes]:
if entries_per_page < FEEDBACK_MIN_PAGE_SIZE or entries_per_page > FEEDBACK_MAX_PAGE_SIZE or page < 0:
raise ValueError("Invalid page size or page number") # pragma: no cover
row_count_set, entries_db = db_pool.execute_read_statements_in_transaction(
(f"""
SELECT COUNT(*) AS "count" FROM "feedback"
""", []),
(f"""
SELECT * FROM "feedback"
ORDER BY "time_created" DESC, "id" DESC
LIMIT {entries_per_page} OFFSET {entries_per_page * page}
""", [])
)
return (math.ceil(row_count_set[0]["count"] / entries_per_page),
list(map(_feedback_entry_db_to_json, entries_db)))
def put_feedback(email: str or None, text: str):
db_pool.execute_write_transaction(lambda trans: trans.execute_statement_and_commit(
_SQL_INSERT_FEEDBACK, email, text
))
import math
import re
from datetime import datetime
from api.miscellaneous import *
from api.database import *
JOBS_MIN_PAGE_SIZE = 10
JOBS_MAX_PAGE_SIZE = 500
WORKER_ID_PATTERN = re.compile("[0-9a-zA-Z._-]+")
def _job_db_to_json(job_db: DbResultRow) -> dict:
job_json = {
"id": job_db["id"],
"type": job_db["type"],
"priority": job_db["priority"],
"state": job_db["state"],
"is_canceled": bool(job_db["canceled"]),
"creation_time": job_db["time_created"].strftime(API_DATETIME_FORMAT),
"data": job_db["data"],
"status": job_db["status"],
}
if job_db["worker"] is not None:
job_json["worker_id"] = job_db["worker"]
if job_db["last_ping"] is not None:
job_json["last_worker_ping"] = job_db["last_ping"].strftime(API_DATETIME_FORMAT)
if job_db["time_scheduled"] is not None:
job_json["start_time"] = job_db["time_scheduled"].strftime(API_DATETIME_FORMAT)
if job_db["time_finished"] is not None:
job_json["finish_time"] = job_db["time_finished"].strftime(API_DATETIME_FORMAT)
return job_json
def get_jobs(
entries_per_page: int,
page: int,
type: str or None,
worker_id: str or None,
state: str or None) -> tuple[int, JsonTypes]:
if entries_per_page < JOBS_MIN_PAGE_SIZE or entries_per_page > JOBS_MAX_PAGE_SIZE or page < 0:
raise ValueError("Invalid page size or page number") # pragma: no cover
if type is None:
type = "%"
if worker_id is None:
worker_id = "%"
if state is None:
state = "%"
if worker_id == "%":
worker_condition = ""
worker_condition_values = []
else:
worker_condition = "AND \"worker\" LIKE ? OR (\"worker\" IS NULL AND ? = 'none')"
worker_condition_values = [worker_id, worker_id]
row_count_set, entries_db = db_pool.execute_read_statements_in_transaction(
(f"""
SELECT COUNT(*) AS "count" FROM "jobs"
WHERE "type" LIKE ?
AND "state" LIKE ?
{worker_condition}
""", [type, state, *worker_condition_values]),
(f"""
SELECT * FROM "jobs"
WHERE "type" LIKE ?
AND "state" LIKE ?
{worker_condition}
ORDER BY "time_created" DESC, "id" DESC
LIMIT {entries_per_page} OFFSET {entries_per_page * page}
""", [type, state, *worker_condition_values])
)
return (math.ceil(row_count_set[0]["count"] / entries_per_page),
list(map(_job_db_to_json, entries_db)))
_SQL_GET_WORKERS = PreparedStatement(f"""
SELECT *
FROM "worker"
ORDER BY "last_ping" DESC
""")
def get_workers() -> list[dict]:
workers_db = db_pool.execute_read_statement_in_transaction(_SQL_GET_WORKERS)
workers_json = []
for worker_db in workers_db:
workers_json.append({
"hostname": worker_db["hostname"],
"last_ping": worker_db["last_ping"].strftime(API_DATETIME_FORMAT)
})
return workers_json
_SQL_DELETE_READY_FAILED_JOB = PreparedStatement(f"""
UPDATE "jobs"
SET "state" = 'deleted'
WHERE "id" = ?
AND ("state" = 'ready' OR "state" = 'failed')
""")
_SQL_RESTART_FAILED_JOB = PreparedStatement(f"""
UPDATE "jobs"
SET "state" = 'ready', "canceled" = false
WHERE "id" = ?
AND "state" = 'failed'
""")
_SQL_CANCEL_RUNNING_JOB = PreparedStatement(f"""
UPDATE "jobs"
SET "canceled" = true
WHERE "id" = ?
AND "state" = 'running'
""")
_SQL_COPY_FINISHED_DELETED_JOB = PreparedStatement(f"""
INSERT INTO "jobs" ("type", "priority", "queue", "state", "time_created", "data")
SELECT "type", "priority", "queue", 'ready', ?, "data"
FROM "jobs"
WHERE "id" = ?
AND ("state" = 'finished'
OR "state" = 'deleted'
)
""")
def delete_ready_failed_job(id: int) -> bool:
affected_count = db_pool.execute_write_transaction(
lambda trans: trans.execute_statement_full_result_and_commit(
_SQL_DELETE_READY_FAILED_JOB, id)[DB_RESULT_AFFECTED_ROWS])
return affected_count > 0
def restart_failed_job(id: int) -> bool:
affected_count = db_pool.execute_write_transaction(
lambda trans: trans.execute_statement_full_result_and_commit(
_SQL_RESTART_FAILED_JOB, id)[DB_RESULT_AFFECTED_ROWS])
return affected_count > 0
def cancel_running_job(id: int) -> bool:
affected_count = db_pool.execute_write_transaction(
lambda trans: trans.execute_statement_full_result_and_commit(
_SQL_CANCEL_RUNNING_JOB, id)[DB_RESULT_AFFECTED_ROWS])
return affected_count > 0
def copy_finished_deleted_job(id: int) -> bool:
affected_count = db_pool.execute_write_transaction(
lambda trans: trans.execute_statement_full_result_and_commit(
_SQL_COPY_FINISHED_DELETED_JOB, datetime.now(), id)[DB_RESULT_AFFECTED_ROWS])
return affected_count > 0
......@@ -4,7 +4,6 @@ from pathlib import Path
import api
from api.miscellaneous import *
from api.objects.types import TYPE_STRING_LONG
_LIVE_CONFIG_PATH_STRING = os.environ.get("VIDEOAG_API_LIVE_CONFIG", None)
_LIVE_CONFIG_PATH = None if _LIVE_CONFIG_PATH_STRING is None else Path(_LIVE_CONFIG_PATH_STRING).resolve()
......@@ -41,7 +40,7 @@ class LiveConfig:
visibility = unsafe_announcement.get_string("visibility", 100)
if visibility not in ["only_main_page", "all_pages", "all_pages_and_embed"]:
unsafe_announcement.get("visibility").raise_error("Unknown visibility")
text = unsafe_announcement.get_string("text", TYPE_STRING_LONG.get_max_string_length())
text = unsafe_announcement.get_string("text", 8192)
if not unsafe_announcement.get_bool("is_enabled", True):
continue
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment