Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
job_controller.py 502 B
from event_queue import EventQueue
from job_api import JobData, JobDatabaseApi
from jobs.dummy_job import DummyJob
from kubernetes_api import K8sApi
class ControllerState():
def __init__(self) -> None:
self.k8s = K8sApi()
self.job_api = JobDatabaseApi()
self.event_queue = EventQueue()
def wrap_job(self, job: JobData) -> "WrappedJob":
if job.job_type == "dummy":
return DummyJob(job)
raise Exception(f"Unknown job type: {job.job_type}")