Skip to content
Snippets Groups Projects
Commit 8af4d986 authored by Simon Künzel's avatar Simon Künzel
Browse files

Add database diagnostics

parent cb272aa8
Branches
No related tags found
No related merge requests found
Pipeline #5878 passed
from threading import Condition from threading import Condition
from datetime import datetime from datetime import datetime
from typing import Callable from typing import Callable, Literal
from abc import ABC, ABCMeta, abstractmethod from abc import ABC, ABCMeta, abstractmethod
from contextlib import contextmanager from contextlib import contextmanager
from traceback import StackSummary, extract_stack from traceback import StackSummary, extract_stack
from api.miscellaneous import DEBUG_ENABLED from api.miscellaneous import DEBUG_ENABLED, DIAGNOSTICS_TRACKER, DiagnosticsCounter
DbValueType = int | str | bytes | datetime DbValueType = int | str | bytes | datetime
_TransactionType = Literal["read", "write"]
def _create_diagnostics_counters(id: str) -> dict[_TransactionType, DiagnosticsCounter]:
return {
"read": DIAGNOSTICS_TRACKER.register_counter(f"database.transaction.read.{id}"),
"write": DIAGNOSTICS_TRACKER.register_counter(f"database.transaction.write.{id}")
}
_COUNTERS_TRANSACTION_COUNT = _create_diagnostics_counters("count")
_COUNTERS_TRANSACTION_ERRORS = _create_diagnostics_counters("errors")
_COUNTERS_TRANSACTION_ATTEMPTED_RECONNECTS = _create_diagnostics_counters("attempted_reconnects")
_COUNTERS_TRANSACTION_SUCCESSFUL_RECONNECTS = _create_diagnostics_counters("successful_reconnects")
_COUNTERS_TRANSACTION_ERRORS_AFTER_RECONNECT = _create_diagnostics_counters("errors_after_reconnect")
_COUNTERS_TRANSACTION_ABORTED_BY_USER = _create_diagnostics_counters("aborted_by_user")
class DatabaseResultRow: class DatabaseResultRow:
...@@ -184,14 +201,17 @@ class DbConnectionFactory(ABC): ...@@ -184,14 +201,17 @@ class DbConnectionFactory(ABC):
class AbstractTransaction(ABC): class AbstractTransaction(ABC):
__metaclass__ = ABCMeta __metaclass__ = ABCMeta
def __init__(self, release_connection: Callable, connection: DbConnection): def __init__(self, type: _TransactionType, release_connection: Callable, connection: DbConnection):
super().__init__() super().__init__()
self._type = type
self._release_connection = release_connection self._release_connection = release_connection
self._closed = False self._closed = False
self._connection = connection self._connection = connection
self._has_executed_statements = False self._has_executed_statements = False
self._has_encountered_errors = False
self._queued_statements: list[FilledStatement] = [] self._queued_statements: list[FilledStatement] = []
self._statement_results: dict[FilledStatement, DbResult] = {} self._statement_results: dict[FilledStatement, DbResult] = {}
_COUNTERS_TRANSACTION_COUNT[type].trigger()
def is_closed(self) -> bool: def is_closed(self) -> bool:
return self._closed return self._closed
...@@ -315,15 +335,24 @@ class AbstractTransaction(ABC): ...@@ -315,15 +335,24 @@ class AbstractTransaction(ABC):
try: try:
results = self._connection.execute_statements(self._queued_statements) results = self._connection.execute_statements(self._queued_statements)
except DatabaseError as e: except DatabaseError as e:
if not self._has_encountered_errors:
_COUNTERS_TRANSACTION_ERRORS[self._type].trigger()
self._has_encountered_errors = True
if self._has_executed_statements or not self._connection.is_disconnected(force_ping=True): if self._has_executed_statements or not self._connection.is_disconnected(force_ping=True):
self.on_error_close_transaction() self.on_error_close_transaction()
raise e raise e
# All triggers after here can be triggered at most once for a single transaction because after this code block
# we have either executed statements or closed the transaction
_COUNTERS_TRANSACTION_ATTEMPTED_RECONNECTS[self._type].trigger()
if not self._connection.try_reconnect(): if not self._connection.try_reconnect():
self.on_error_close_transaction() self.on_error_close_transaction()
raise e raise e
_COUNTERS_TRANSACTION_SUCCESSFUL_RECONNECTS[self._type].trigger()
try: try:
results = self._connection.execute_statements(self._queued_statements) results = self._connection.execute_statements(self._queued_statements)
except Exception: except Exception:
_COUNTERS_TRANSACTION_ERRORS_AFTER_RECONNECT[self._type].trigger()
self.on_error_close_transaction() self.on_error_close_transaction()
raise e # Raise original exception raise e # Raise original exception
except Exception as e: except Exception as e:
...@@ -355,7 +384,7 @@ class AbstractTransaction(ABC): ...@@ -355,7 +384,7 @@ class AbstractTransaction(ABC):
class ReadTransaction(AbstractTransaction): class ReadTransaction(AbstractTransaction):
def __init__(self, release_connection: Callable, connection: DbConnection): def __init__(self, release_connection: Callable, connection: DbConnection):
super().__init__(release_connection, connection) super().__init__("read", release_connection, connection)
self.queue_statement(connection.get_transaction_begin_statement(False)) self.queue_statement(connection.get_transaction_begin_statement(False))
def execute_statement_and_close(self, def execute_statement_and_close(self,
...@@ -421,7 +450,7 @@ class ReadTransaction(AbstractTransaction): ...@@ -421,7 +450,7 @@ class ReadTransaction(AbstractTransaction):
class WriteTransaction(AbstractTransaction): class WriteTransaction(AbstractTransaction):
def __init__(self, release_connection: Callable, connection: DbConnection): def __init__(self, release_connection: Callable, connection: DbConnection):
super().__init__(release_connection, connection) super().__init__("write", release_connection, connection)
self._committed = False self._committed = False
self.queue_statement(connection.get_transaction_begin_statement(True)) self.queue_statement(connection.get_transaction_begin_statement(True))
...@@ -681,6 +710,7 @@ class DbConnectionPool: ...@@ -681,6 +710,7 @@ class DbConnectionPool:
transaction.close() transaction.close()
except Exception: except Exception:
if transaction is not None and not transaction.is_closed(): if transaction is not None and not transaction.is_closed():
_COUNTERS_TRANSACTION_ABORTED_BY_USER["read"].trigger()
transaction.on_error_close_transaction() transaction.on_error_close_transaction()
raise raise
...@@ -705,5 +735,6 @@ class DbConnectionPool: ...@@ -705,5 +735,6 @@ class DbConnectionPool:
transaction.rollback() transaction.rollback()
except Exception: except Exception:
if transaction is not None and not transaction.is_closed(): if transaction is not None and not transaction.is_closed():
_COUNTERS_TRANSACTION_ABORTED_BY_USER["write"].trigger()
transaction.on_error_close_transaction() transaction.on_error_close_transaction()
raise raise
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment