diff --git a/job_controller/jobs/source_file_sorter/job.py b/job_controller/jobs/source_file_sorter/job.py index c089b951b29624d9cc998f8e6d5d1a52c6ddc650..8d897e24e4b644d9bd1882cf0cd32d308a383fbc 100644 --- a/job_controller/jobs/source_file_sorter/job.py +++ b/job_controller/jobs/source_file_sorter/job.py @@ -140,7 +140,14 @@ def _check_files(session: SessionDb) -> list[str]: return to_sort_file_db_paths -def _sort_file(session: SessionDb, own_job_id: int, db_path: str): +class FunctionContainer: + + def __init__(self): + super().__init__() + self.func = None + + +def _sort_file(session: SessionDb, own_job_id: int, db_path: str, on_transaction_failure: FunctionContainer): sorter_file = session.scalar( SorterFile.sudo_select().where(SorterFile.file_path == db_path) ) @@ -248,6 +255,7 @@ def _sort_file(session: SessionDb, own_job_id: int, db_path: str): else: dest_file.parent.mkdir(parents=True, exist_ok=True) shutil.move(file, dest_file) + on_transaction_failure.func = lambda: shutil.move(dest_file, file) # Set AFTER moving the file, as moving is more likely to throw exception than setting the variable # Note that with an exception the file's variables will still be updated in the database sorter_file.file_path = _get_db_path(dest_file) @@ -279,6 +287,17 @@ def execute(database, own_job_id, input_data: CJsonObject): to_sort_file_db_paths = database.execute_write_transaction_and_commit(_check_files) for db_path in to_sort_file_db_paths: + on_transaction_failure = FunctionContainer() # Sort every file in its own transaction to reduce blocking the database - database.execute_write_transaction_and_commit(_sort_file, own_job_id, db_path) + try: + database.execute_write_transaction_and_commit( + _sort_file, own_job_id, db_path, + ) + except Exception: + if on_transaction_failure.func is not None: + try: + on_transaction_failure.func() + except Exception as e: + logger.warning("Error while calling failed transaction handler: ", e) + raise