From cc919e543a81379fb4962bf9b938ba933b5b5dfd Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Simon=20K=C3=BCnzel?= <simonk@fsmpi.rwth-aachen.de>
Date: Sat, 3 May 2025 20:46:53 +0200
Subject: [PATCH] Allow job controller to only handle specific jobs

---
 .../config/job_controller_example_config.py   |  4 +++
 .../src/job_controller/job_controller.py      | 29 +++++++++++++++++--
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/job_controller/config/job_controller_example_config.py b/job_controller/config/job_controller_example_config.py
index dd7877f..0a5b6e8 100644
--- a/job_controller/config/job_controller_example_config.py
+++ b/job_controller/config/job_controller_example_config.py
@@ -29,6 +29,10 @@ DATABASE = {
     "log_all_statements": False
 }
 
+JOB_TYPES_TO_HANDLE = "all"
+# JOB_TYPES_TO_HANDLE = "workload"
+# JOB_TYPES_TO_HANDLE = "non-workload"
+
 # Only for testing local-docker!
 JOB_EXECUTOR = "local-docker"
 # JOB_EXECUTOR = "kubernetes"
diff --git a/job_controller/src/job_controller/job_controller.py b/job_controller/src/job_controller/job_controller.py
index 0d79dae..62b8dca 100644
--- a/job_controller/src/job_controller/job_controller.py
+++ b/job_controller/src/job_controller/job_controller.py
@@ -19,6 +19,8 @@ from job_controller.executor_api.job_executor_api import JobExecutorApi
 
 _JOB_EXECUTOR_NAME = job_controller.config["JOB_EXECUTOR"]
 
+_JOB_TYPES_TO_HANDLE = job_controller.config["JOB_TYPES_TO_HANDLE"]
+
 
 class JobScheduler:
     
@@ -81,13 +83,34 @@ class JobController:
         # str in tuple is job type
         self._event_lister_by_event: dict[str, list[tuple[str, EventListener]]] = {}
         self._scheduler = Scheduler()
+        self._job_types_to_handle: list[str] = []
         for metadata in self._job_metadata_by_id.values():
             for listener in metadata.event_listener:
                 self._event_lister_by_event.setdefault(listener.event_type, []).append((metadata.id, listener))
-            if metadata.interval_value > 0:
-                JobScheduler(self._scheduler, metadata.interval_value * metadata.interval_unit.seconds(), metadata.id)
+            
+            handle_this_job_type = False
+            match _JOB_TYPES_TO_HANDLE:
+                case "all":
+                    handle_this_job_type = True
+                case "workload":
+                    if metadata.on_workload_node:
+                        handle_this_job_type = True
+                case "non-workload":
+                    if not metadata.on_workload_node:
+                        handle_this_job_type = True
+                case _:
+                    raise ValueError(f"Unknown value for config variable JOB_TYPES_TO_HANDLE: {_JOB_TYPES_TO_HANDLE}")
+            
+            if handle_this_job_type:
+                self._job_types_to_handle.append(metadata.id)
+                if metadata.interval_value > 0:
+                    JobScheduler(self._scheduler, metadata.interval_value * metadata.interval_unit.seconds(), metadata.id)
+        
+        if _JOB_TYPES_TO_HANDLE in ["all", "non-workload"]:
+            self._job_types_to_handle.extend(["handle_event", "no_op"])
         
         print(f"Found {len(self._job_metadata_by_id)} jobs: {', '.join(sorted(self._job_metadata_by_id.keys()))}")
+        print(f"Handling only: {', '.join(sorted(self._job_types_to_handle))}")
         
         self._immediate_next_cycle = False
         threading.Thread(target=self._run_scheduler_thread, daemon=True).start()
@@ -117,6 +140,7 @@ class JobController:
         ready_jobs = session.scalars(
             Job.sudo_select()
             .where(Job.state == JobState.READY)
+            .where(Job.type.in_(self._job_types_to_handle))
         ).all()
         for job in ready_jobs:
             print(f"Spawning job {job.id} of type {job.type}")
@@ -166,6 +190,7 @@ class JobController:
         active_jobs = session.scalars(
             Job.sudo_select()
             .where(Job.state.in_([JobState.SPAWNING, JobState.RUNNING]))
+            .where(Job.type.in_(self._job_types_to_handle))
         ).all()
         active_count = 0
         for job in active_jobs:
-- 
GitLab