diff --git a/job_controller/config/job_controller_example_config.py b/job_controller/config/job_controller_example_config.py index dd7877f463f43f2a7da3592ca0ed0b3a24562c79..0a5b6e84e0e50e71de87a1b954b3ef05c61ab2cc 100644 --- a/job_controller/config/job_controller_example_config.py +++ b/job_controller/config/job_controller_example_config.py @@ -29,6 +29,10 @@ DATABASE = { "log_all_statements": False } +JOB_TYPES_TO_HANDLE = "all" +# JOB_TYPES_TO_HANDLE = "workload" +# JOB_TYPES_TO_HANDLE = "non-workload" + # Only for testing local-docker! JOB_EXECUTOR = "local-docker" # JOB_EXECUTOR = "kubernetes" diff --git a/job_controller/src/job_controller/job_controller.py b/job_controller/src/job_controller/job_controller.py index 0d79daecdcc803c1da1045656d4a5f4f765e24bc..62b8dca361c1ae96e253ff635104c0fbe55cc483 100644 --- a/job_controller/src/job_controller/job_controller.py +++ b/job_controller/src/job_controller/job_controller.py @@ -19,6 +19,8 @@ from job_controller.executor_api.job_executor_api import JobExecutorApi _JOB_EXECUTOR_NAME = job_controller.config["JOB_EXECUTOR"] +_JOB_TYPES_TO_HANDLE = job_controller.config["JOB_TYPES_TO_HANDLE"] + class JobScheduler: @@ -81,13 +83,34 @@ class JobController: # str in tuple is job type self._event_lister_by_event: dict[str, list[tuple[str, EventListener]]] = {} self._scheduler = Scheduler() + self._job_types_to_handle: list[str] = [] for metadata in self._job_metadata_by_id.values(): for listener in metadata.event_listener: self._event_lister_by_event.setdefault(listener.event_type, []).append((metadata.id, listener)) - if metadata.interval_value > 0: - JobScheduler(self._scheduler, metadata.interval_value * metadata.interval_unit.seconds(), metadata.id) + + handle_this_job_type = False + match _JOB_TYPES_TO_HANDLE: + case "all": + handle_this_job_type = True + case "workload": + if metadata.on_workload_node: + handle_this_job_type = True + case "non-workload": + if not metadata.on_workload_node: + handle_this_job_type = True + case _: + raise ValueError(f"Unknown value for config variable JOB_TYPES_TO_HANDLE: {_JOB_TYPES_TO_HANDLE}") + + if handle_this_job_type: + self._job_types_to_handle.append(metadata.id) + if metadata.interval_value > 0: + JobScheduler(self._scheduler, metadata.interval_value * metadata.interval_unit.seconds(), metadata.id) + + if _JOB_TYPES_TO_HANDLE in ["all", "non-workload"]: + self._job_types_to_handle.extend(["handle_event", "no_op"]) print(f"Found {len(self._job_metadata_by_id)} jobs: {', '.join(sorted(self._job_metadata_by_id.keys()))}") + print(f"Handling only: {', '.join(sorted(self._job_types_to_handle))}") self._immediate_next_cycle = False threading.Thread(target=self._run_scheduler_thread, daemon=True).start() @@ -117,6 +140,7 @@ class JobController: ready_jobs = session.scalars( Job.sudo_select() .where(Job.state == JobState.READY) + .where(Job.type.in_(self._job_types_to_handle)) ).all() for job in ready_jobs: print(f"Spawning job {job.id} of type {job.type}") @@ -166,6 +190,7 @@ class JobController: active_jobs = session.scalars( Job.sudo_select() .where(Job.state.in_([JobState.SPAWNING, JobState.RUNNING])) + .where(Job.type.in_(self._job_types_to_handle)) ).all() active_count = 0 for job in active_jobs: