From 05e0fb676223fabe34fb920fd1140eb078c5aa81 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Simon=20K=C3=BCnzel?= <simonk@fsmpi.rwth-aachen.de>
Date: Fri, 2 May 2025 02:26:40 +0200
Subject: [PATCH] Handle sorter transaction failure better

---
 job_controller/jobs/source_file_sorter/job.py | 23 +++++++++++++++++--
 1 file changed, 21 insertions(+), 2 deletions(-)

diff --git a/job_controller/jobs/source_file_sorter/job.py b/job_controller/jobs/source_file_sorter/job.py
index c089b95..8d897e2 100644
--- a/job_controller/jobs/source_file_sorter/job.py
+++ b/job_controller/jobs/source_file_sorter/job.py
@@ -140,7 +140,14 @@ def _check_files(session: SessionDb) -> list[str]:
     return to_sort_file_db_paths
 
 
-def _sort_file(session: SessionDb, own_job_id: int, db_path: str):
+class FunctionContainer:
+    
+    def __init__(self):
+        super().__init__()
+        self.func = None
+
+
+def _sort_file(session: SessionDb, own_job_id: int, db_path: str, on_transaction_failure: FunctionContainer):
     sorter_file = session.scalar(
         SorterFile.sudo_select().where(SorterFile.file_path == db_path)
     )
@@ -248,6 +255,7 @@ def _sort_file(session: SessionDb, own_job_id: int, db_path: str):
         else:
             dest_file.parent.mkdir(parents=True, exist_ok=True)
             shutil.move(file, dest_file)
+            on_transaction_failure.func = lambda: shutil.move(dest_file, file)
             # Set AFTER moving the file, as moving is more likely to throw exception than setting the variable
             # Note that with an exception the file's variables will still be updated in the database
             sorter_file.file_path = _get_db_path(dest_file)
@@ -279,6 +287,17 @@ def execute(database, own_job_id, input_data: CJsonObject):
     to_sort_file_db_paths = database.execute_write_transaction_and_commit(_check_files)
     
     for db_path in to_sort_file_db_paths:
+        on_transaction_failure = FunctionContainer()
         # Sort every file in its own transaction to reduce blocking the database
-        database.execute_write_transaction_and_commit(_sort_file, own_job_id, db_path)
+        try:
+            database.execute_write_transaction_and_commit(
+                _sort_file, own_job_id, db_path,
+            )
+        except Exception:
+            if on_transaction_failure.func is not None:
+                try:
+                    on_transaction_failure.func()
+                except Exception as e:
+                    logger.warning("Error while calling failed transaction handler: ", e)
+            raise
         
-- 
GitLab