Skip to content
Snippets Groups Projects
Select Git revision
  • 99b75708aeaa1c9f6bc988d6fa4aa957d3187062
  • main default
  • full_migration
  • v1.0.9 protected
  • v1.0.8 protected
  • v1.0.7 protected
  • v1.0.6 protected
  • v1.0.5 protected
  • v1.0.4 protected
  • v1.0.3 protected
  • v1.0.2 protected
  • v1.0.1 protected
  • v1.0 protected
13 results

sqlite_connector.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    sqlite_connector.py 4.83 KiB
    import sqlite3
    from sqlite3 import Connection, Cursor, DatabaseError, DataError, ProgrammingError, NotSupportedError, OperationalError
    from datetime import datetime
    from os import PathLike
    
    from api.database import DbValueType, DbResultSet, FilledStatement
    from api.database.database import PreparedStatement, DbConnectionFactory, DatabaseWarning, DatabaseError, DbAffectedRows, \
        DatabaseResultRow
    from api.database.abstract_py_connector import PythonDbConnection
    
    print(f"Using SQLite Connector Version: {sqlite3.sqlite_version}")
    
    _DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
    
    
    def _convert_datetime_from_sqlite(value: bytes or None) -> datetime or None:
        if value is None:
            return None
        value_string: str = value.decode()
        if value_string == "0000-00-00 00:00:00":
            return datetime(1, 1, 1, 0, 0)
        datetime_string = value_string.split(".")[0]  # Datetime sometimes contains a trailing .1234 for milliseconds, etc.
        try:
            return datetime.strptime(datetime_string, _DATETIME_FORMAT)
        except ValueError as e:  # pragma: no cover
            print(f"Warning: Exception while converting datetime {value_string} from sqlite database: {str(e)}")
            return datetime(1, 1, 1, 0, 0)
    
    
    def _convert_datetime_to_sqlite(value: datetime or None) -> str or None:
        if value is None:
            return None
        return value.strftime(_DATETIME_FORMAT)
    
    
    sqlite3.register_converter("timestamp", _convert_datetime_from_sqlite)
    sqlite3.register_converter("datetime", _convert_datetime_from_sqlite)
    sqlite3.register_adapter(datetime, _convert_datetime_to_sqlite)
    
    
    class SqLiteDbConnection(PythonDbConnection[Connection, Cursor]):
        
        def __init__(self, py_connection: Connection):
            super().__init__(
                "?",
                (ProgrammingError, DataError, NotSupportedError),
                py_connection)
        
        def _can_use_multi_query(self, statements: [FilledStatement]):
            return False  # Not supported by sqlite
        
        def is_disconnected(self, force_ping: bool = False) -> bool:
            if self._closed:
                raise RuntimeError("Already closed")  # pragma: no cover
            return False  # SQLite can't really loose connection
        
        def try_reconnect(self) -> bool:
            return True  # SQLite can't really loose connection
    
        def close(self):
            self._closed = True
            self._py_connection.close()
        
        def get_transaction_begin_statement(self, writable: bool) -> PreparedStatement or str:
            return "BEGIN DEFERRED TRANSACTION"
        
        def get_transaction_end_statement(self, commit: bool) -> PreparedStatement or str:
            if commit:
                return "COMMIT"
            else:
                return "ROLLBACK"
        
        def _create_cursor(self, prepared: bool):
            return self._py_connection.cursor()  # SQLite does not have prepared cursors
        
        def _is_transaction_conflict_exception(self, exception: Exception) -> bool:
            return isinstance(exception, OperationalError) and "database is locked" in str(exception)
        
        def _db_execute_single_statement(self,
                                         cursor: Cursor,
                                         statement: str,
                                         values: list[DbValueType]) -> tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]:
            cursor.execute(statement, values)
            all_rows = cursor.fetchall()
            if len(all_rows) > 0:
                column_mapping = PythonDbConnection._create_column_mapping(cursor)
            return ([],  # Warnings seem to be thrown as exceptions
                    cursor.rowcount,
                    list(map(lambda row: DatabaseResultRow(column_mapping, tuple(row)), all_rows)))
        
        def _db_execute_multiple_statements(self,
                                            cursor: Cursor,
                                            statements: str,
                                            values: list[DbValueType]) -> list[tuple[list[DatabaseWarning], DbAffectedRows, DbResultSet]]:
            raise RuntimeError("Multi query is not supported. This method should not have be called")  # pragma: no cover
        
        def _db_execute_script(self, cursor: Cursor, script: str):
            cursor.executescript(script)
    
    
    class SqLiteDbConnectionFactory(DbConnectionFactory):
        
        def __init__(self, database: str | PathLike[str]):
            super().__init__()
            self._database = database
        
        def supports_per_transaction_writeable_flag(self) -> bool:
            return False
        
        def new_connection(self, writable: bool = True) -> SqLiteDbConnection:
            try:
                sqlite_connection = sqlite3.connect(
                    database=self._database,
                    detect_types=sqlite3.PARSE_DECLTYPES,
                    check_same_thread=False
                )
            except Exception as e:  # pragma: no cover
                raise DatabaseError("An exception occurred while connecting to database") from e
            return SqLiteDbConnection(sqlite_connection)