Skip to content
Snippets Groups Projects
Commit cc919e54 authored by Simon Künzel's avatar Simon Künzel
Browse files

Allow job controller to only handle specific jobs

parent a6b97101
No related branches found
No related tags found
No related merge requests found
......@@ -29,6 +29,10 @@ DATABASE = {
"log_all_statements": False
}
JOB_TYPES_TO_HANDLE = "all"
# JOB_TYPES_TO_HANDLE = "workload"
# JOB_TYPES_TO_HANDLE = "non-workload"
# Only for testing local-docker!
JOB_EXECUTOR = "local-docker"
# JOB_EXECUTOR = "kubernetes"
......
......@@ -19,6 +19,8 @@ from job_controller.executor_api.job_executor_api import JobExecutorApi
_JOB_EXECUTOR_NAME = job_controller.config["JOB_EXECUTOR"]
_JOB_TYPES_TO_HANDLE = job_controller.config["JOB_TYPES_TO_HANDLE"]
class JobScheduler:
......@@ -81,13 +83,34 @@ class JobController:
# str in tuple is job type
self._event_lister_by_event: dict[str, list[tuple[str, EventListener]]] = {}
self._scheduler = Scheduler()
self._job_types_to_handle: list[str] = []
for metadata in self._job_metadata_by_id.values():
for listener in metadata.event_listener:
self._event_lister_by_event.setdefault(listener.event_type, []).append((metadata.id, listener))
handle_this_job_type = False
match _JOB_TYPES_TO_HANDLE:
case "all":
handle_this_job_type = True
case "workload":
if metadata.on_workload_node:
handle_this_job_type = True
case "non-workload":
if not metadata.on_workload_node:
handle_this_job_type = True
case _:
raise ValueError(f"Unknown value for config variable JOB_TYPES_TO_HANDLE: {_JOB_TYPES_TO_HANDLE}")
if handle_this_job_type:
self._job_types_to_handle.append(metadata.id)
if metadata.interval_value > 0:
JobScheduler(self._scheduler, metadata.interval_value * metadata.interval_unit.seconds(), metadata.id)
if _JOB_TYPES_TO_HANDLE in ["all", "non-workload"]:
self._job_types_to_handle.extend(["handle_event", "no_op"])
print(f"Found {len(self._job_metadata_by_id)} jobs: {', '.join(sorted(self._job_metadata_by_id.keys()))}")
print(f"Handling only: {', '.join(sorted(self._job_types_to_handle))}")
self._immediate_next_cycle = False
threading.Thread(target=self._run_scheduler_thread, daemon=True).start()
......@@ -117,6 +140,7 @@ class JobController:
ready_jobs = session.scalars(
Job.sudo_select()
.where(Job.state == JobState.READY)
.where(Job.type.in_(self._job_types_to_handle))
).all()
for job in ready_jobs:
print(f"Spawning job {job.id} of type {job.type}")
......@@ -166,6 +190,7 @@ class JobController:
active_jobs = session.scalars(
Job.sudo_select()
.where(Job.state.in_([JobState.SPAWNING, JobState.RUNNING]))
.where(Job.type.in_(self._job_types_to_handle))
).all()
active_count = 0
for job in active_jobs:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment