From 96573fe99bdbae27853f17ef4cdaea0f28055e20 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Simon=20K=C3=BCnzel?= <simonk@fsmpi.rwth-aachen.de>
Date: Sat, 3 May 2025 20:53:33 +0200
Subject: [PATCH] Allow local docker executor to use prebuilt images

---
 .../config/job_controller_example_config.py   |   6 +
 .../config/local_docker_image_tag.txt         |   1 +
 .../executor_api/local_docker_executor.py     | 129 +++++++++++-------
 3 files changed, 83 insertions(+), 53 deletions(-)
 create mode 100644 job_controller/config/local_docker_image_tag.txt

diff --git a/job_controller/config/job_controller_example_config.py b/job_controller/config/job_controller_example_config.py
index 0a5b6e8..0945a28 100644
--- a/job_controller/config/job_controller_example_config.py
+++ b/job_controller/config/job_controller_example_config.py
@@ -39,6 +39,12 @@ JOB_EXECUTOR = "local-docker"
 
 LOCAL_DOCKER_LOG_BUILD = False
 
+# LOCAL_DOCKER_USE_PRE_BUILT_IMAGES = True
+# LOCAL_DOCKER_PRE_BUILT_IMAGE_PATH_START = "registry.git.fsmpi.rwth-aachen.de/videoag/backend/production_job_"
+# LOCAL_DOCKER_PRE_BUILT_IMAGE_TAG_FILE = "../config/local_docker_image_tag.txt"
+# LOCAL_DOCKER_MEDIA_DIRECTORY = "../../../live_media/"
+# LOCAL_DOCKER_JOB_CONTAINER_NETWORK = "host"
+
 K8S_NAMESPACE = "videoag-prod"
 K8S_JOB_LABEL = "videoag-job"
 K8S_DATA_CONTAINER_VOLUME_NAME = "video-data-volume"
diff --git a/job_controller/config/local_docker_image_tag.txt b/job_controller/config/local_docker_image_tag.txt
new file mode 100644
index 0000000..7082988
--- /dev/null
+++ b/job_controller/config/local_docker_image_tag.txt
@@ -0,0 +1 @@
+v2.0.18
\ No newline at end of file
diff --git a/job_controller/src/job_controller/executor_api/local_docker_executor.py b/job_controller/src/job_controller/executor_api/local_docker_executor.py
index 024f02f..10fe5c7 100644
--- a/job_controller/src/job_controller/executor_api/local_docker_executor.py
+++ b/job_controller/src/job_controller/executor_api/local_docker_executor.py
@@ -20,6 +20,19 @@ _PROJECT_ROOT_DIR = Path("../..").resolve()
 
 LOG_BUILD = job_controller.config["LOCAL_DOCKER_LOG_BUILD"]
 
+_MEDIA_DIRECTORY = job_controller.config.get("LOCAL_DOCKER_MEDIA_DIRECTORY")
+if _MEDIA_DIRECTORY:
+    _MEDIA_DIRECTORY_PATH = Path(_MEDIA_DIRECTORY).resolve()
+else:
+    _MEDIA_DIRECTORY_PATH = _PROJECT_ROOT_DIR.joinpath(".media")
+
+USE_PRE_BUILT_IMAGES = job_controller.config.get("LOCAL_DOCKER_USE_PRE_BUILT_IMAGES", False)
+if USE_PRE_BUILT_IMAGES:
+    PRE_BUILT_IMAGE_PATH_START = job_controller.config["LOCAL_DOCKER_PRE_BUILT_IMAGE_PATH_START"]
+PRE_BUILT_IMAGE_TAG_FILE = job_controller.config.get("LOCAL_DOCKER_PRE_BUILT_IMAGE_TAG_FILE")
+
+JOB_CONTAINER_NETWORK = job_controller.config.get("LOCAL_DOCKER_JOB_CONTAINER_NETWORK", "backend_videoag_backend")
+
 
 class DockerJob(JobExecutionInfo):
     
@@ -30,7 +43,6 @@ class DockerJob(JobExecutionInfo):
         self._metadata = metadata
         self._input_data = input_data
         self._build_process: subprocess.Popen or None = None
-        self._run_args: list[str] or None = None
         self._run_process: subprocess.Popen or None = None
         self._failed = False
         self._start_time = None
@@ -43,6 +55,9 @@ class DockerJob(JobExecutionInfo):
         self._finish_time = get_standard_datetime_now()
     
     def spawn(self):
+        if USE_PRE_BUILT_IMAGES:
+            self._scheduler.enter(0, 0, self._check_status_from_scheduler)
+            return
         if self._build_process is not None:
             return
         try:
@@ -59,59 +74,67 @@ class DockerJob(JobExecutionInfo):
             "-t", f"videoag_job_{self._metadata.id}",
             _PROJECT_ROOT_DIR,
         ]
+        if subprocess.run([
+            _PROJECT_ROOT_DIR.joinpath("dev/generate_dockerfiles.sh")
+        ], cwd=_PROJECT_ROOT_DIR).returncode != 0:
+            raise ValueError("Error while generating dockerfiles")
         
-        self._run_args = [
+        print(f"Docker Executor: Building image {self._metadata.id} for job {self.job_id}:")
+        print(build_args)
+        # Might have missing quoting, etc. but can be useful for debugging.
+        print("UNSAFE: " + ' '.join(str(a) for a in build_args))
+        if LOG_BUILD:
+            self._build_process = subprocess.Popen(build_args)
+        else:
+            self._build_process = subprocess.Popen(build_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        
+        self._scheduler.enter(0.5, 0, self._check_status_from_scheduler)
+    
+    def _run_docker_job(self):
+        print(f"Docker Executor: Running job {self.job_id} of type {self._metadata.id}:")
+        
+        _run_args = [
             "docker", "run",
             # docker-compose networks are prefixed with the directories' name
-            "--network", "backend_videoag_backend",
-            # Not really needed, but keep it for convenience
+            "--network", JOB_CONTAINER_NETWORK,
+            # Not really needed right now, but keep it for convenience
             "--add-host", "host.docker.internal:host-gateway",
         ]
         if self._metadata.mount_common_config:
-            self._run_args.extend([
+            _run_args.extend([
                 "-e", "VIDEOAG_CONFIG=/code/config/common_job_config.py",
-                "--mount", f"type=bind,source={_PROJECT_ROOT_DIR.joinpath('job_controller/config/common_job_example_config.py')},"
-                           f"destination=/code/config/common_job_config.py",
+                "--mount",
+                f"type=bind,source={_PROJECT_ROOT_DIR.joinpath('job_controller/config/common_job_example_config.py')},"
+                f"destination=/code/config/common_job_config.py",
             ])
         
         if self._metadata.data_dir_mount is not None:
-            media_dir = _PROJECT_ROOT_DIR.joinpath(".media")
-            media_dir.mkdir(parents=True, exist_ok=True)
-            self._run_args.extend([
-                "--mount", f"type=bind,source={media_dir},"
+            _MEDIA_DIRECTORY_PATH.mkdir(parents=True, exist_ok=True)
+            _run_args.extend([
+                "--mount", f"type=bind,source={_MEDIA_DIRECTORY_PATH},"
                            f"destination={self._metadata.data_dir_mount}",
             ])
         
-        self._run_args.append(f"videoag_job_{self._metadata.id}")
+        if USE_PRE_BUILT_IMAGES:
+            if PRE_BUILT_IMAGE_TAG_FILE:
+                image_tag = Path(PRE_BUILT_IMAGE_TAG_FILE).resolve().read_text()
+            else:
+                image_tag = "latest"
+            _run_args.append(f"{PRE_BUILT_IMAGE_PATH_START}{self._metadata.id}:{image_tag}")
+        else:
+            _run_args.append(f"videoag_job_{self._metadata.id}")
+        
         # Job args
-        self._run_args.extend([
+        _run_args.extend([
             "--own_job_type", str(self._metadata.id),
             "--own_job_id", str(self.job_id),
             "--input_data", json.dumps(self._input_data)
         ])
         
-        if subprocess.run([
-            _PROJECT_ROOT_DIR.joinpath("dev/generate_dockerfiles.sh")
-        ], cwd=_PROJECT_ROOT_DIR).returncode != 0:
-            raise ValueError("Error while generating dockerfiles")
-        
-        print(f"Docker Executor: Building image {self._metadata.id} for job {self.job_id}:")
-        print(build_args)
-        # Might have missing quoting, etc. but can be useful for debugging.
-        print("UNSAFE: " + ' '.join(str(a) for a in build_args))
-        if LOG_BUILD:
-            self._build_process = subprocess.Popen(build_args)
-        else:
-            self._build_process = subprocess.Popen(build_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-        
-        self._scheduler.enter(0.5, 0, self._check_status_from_scheduler)
-    
-    def _run_docker_job(self):
-        print(f"Docker Executor: Running job {self.job_id} of type {self._metadata.id}:")
-        print(self._run_args)
+        print(_run_args)
         # Might have missing quoting, etc. but can be useful for debugging.
-        print("UNSAFE: " + ' '.join(str(a) for a in self._run_args))
-        self._run_process = subprocess.Popen(self._run_args)
+        print("UNSAFE: " + ' '.join(str(a) for a in _run_args))
+        self._run_process = subprocess.Popen(_run_args)
         self._start_time = get_standard_datetime_now()
     
     def _check_status_from_scheduler(self):
@@ -124,26 +147,26 @@ class DockerJob(JobExecutionInfo):
         Only call from scheduler!
         """
         
-        if self._build_process is None:
-            return
-        
         if self._run_process is None:
-            assert isinstance(self._build_process, subprocess.Popen)
-            # We are building right now
-            self._build_process.poll()
-            if self._build_process.returncode is None:
-                # Still building
-                return
-            if self._build_process.returncode != 0:
-                error_msg = f"Docker build for job {self.job_id} failed. Docker returned non-zero exitcode {self._build_process.returncode}\n"
-                if not LOG_BUILD:
-                    error_msg += f"Here's the building process stdout:\n\n"
-                    error_msg += ''.join(TextIOWrapper(self._build_process.stdout).readlines()) + "\n\n\n"
-                    error_msg += f"Here's the building process stderr:\n\n"
-                    error_msg += ''.join(TextIOWrapper(self._build_process.stderr).readlines()) + "\n\n\n"
-                print(error_msg, file=sys.stderr)
-                self._set_failed()
-                return
+            if not USE_PRE_BUILT_IMAGES:
+                if self._build_process is None:
+                    return
+                assert isinstance(self._build_process, subprocess.Popen)
+                # We are building right now
+                self._build_process.poll()
+                if self._build_process.returncode is None:
+                    # Still building
+                    return
+                if self._build_process.returncode != 0:
+                    error_msg = f"Docker build for job {self.job_id} failed. Docker returned non-zero exitcode {self._build_process.returncode}\n"
+                    if not LOG_BUILD:
+                        error_msg += f"Here's the building process stdout:\n\n"
+                        error_msg += ''.join(TextIOWrapper(self._build_process.stdout).readlines()) + "\n\n\n"
+                        error_msg += f"Here's the building process stderr:\n\n"
+                        error_msg += ''.join(TextIOWrapper(self._build_process.stderr).readlines()) + "\n\n\n"
+                    print(error_msg, file=sys.stderr)
+                    self._set_failed()
+                    return
             try:
                 self._run_docker_job()
             except Exception as e:
-- 
GitLab