diff --git a/config/job_controller_config.py b/config/job_controller_config.py new file mode 100644 index 0000000000000000000000000000000000000000..c33c2d8f3e45da6d1ab0829162ec14fa570644ca --- /dev/null +++ b/config/job_controller_config.py @@ -0,0 +1,13 @@ +import os + +DB_ENGINE = "dummy" # could be postgres + +''' +POSTGRES = { + "host": "database", + "port": 5432, + "user": os.environ["DATABASE_username"], + "password": os.environ["DATABASE_password"], + "database": "videoag" +} +''' diff --git a/src/actions/spawn_job.py b/src/actions/spawn_job.py index 3fdcd5a9995ac921f9d8a567e19b6f8d07f73ec7..d1b550fad156d780456b100e9104775b0330b1e1 100644 --- a/src/actions/spawn_job.py +++ b/src/actions/spawn_job.py @@ -1,6 +1,6 @@ from collections.abc import Generator from event_queue import GeneratorRecurringEvent -from job_api import JobData, JobState +from job_database_api import JobData, JobState from job_controller import ControllerState import datetime diff --git a/src/job_controller.py b/src/job_controller.py index 72a4de5fc06490c7da132654eba54374f98ff62f..294638a809294dd4a3d1a5b07f8da4a5565247c1 100644 --- a/src/job_controller.py +++ b/src/job_controller.py @@ -1,14 +1,39 @@ +import types from event_queue import EventQueue -from job_api import JobData, JobDatabaseApi +from job_database_api import DummyJobDatabaseApi, JobData, JobDatabaseApi from jobs.dummy_job import DummyJob from kubernetes_api import K8sApi +import os + + +def load_config(): + cfgPath = os.environ.get("VIDEOAG_JOB_CONTROLLER_CONFIG", "../config/job_controller_config.py") + filename = os.path.join(os.getcwd(), cfgPath) + d = types.ModuleType("config") + d.__file__ = filename + try: + with open(filename, mode="rb") as config_file: + exec(compile(config_file.read(), filename, "exec"), d.__dict__) + except OSError as e: + print("Unable to load config file: ", e) + raise + cfgMap = {} + for key in dir(d): + if key.isupper(): + cfgMap[key] = getattr(d, key) + return cfgMap class ControllerState(): def __init__(self) -> None: + self.config = load_config() self.k8s = K8sApi() - self.job_api = JobDatabaseApi() + db_engine = self.config.get("DB_ENGINE") + if db_engine == "dummy": + self.job_api = DummyJobDatabaseApi() + else: + raise Exception(f"Unknown DB_ENGINE: {db_engine}") self.event_queue = EventQueue() def wrap_job(self, job: JobData) -> "WrappedJob": diff --git a/src/job_api.py b/src/job_database_api.py similarity index 78% rename from src/job_api.py rename to src/job_database_api.py index eb0ecc75854c17a6efddfd8393584483c0d8af78..6271e9470971fc6124e5caae9e3b27400d78507a 100644 --- a/src/job_api.py +++ b/src/job_database_api.py @@ -1,4 +1,5 @@ +from abc import ABCMeta, abstractmethod from enum import Enum from typing import Optional import copy @@ -29,7 +30,26 @@ class JobData(): # this is just a dummy implementation # this should be replaced by a real implementation that always interacts with the database (no caching!) -class JobDatabaseApi(): +class JobDatabaseApi(metaclass=ABCMeta): + + @abstractmethod + def get_next_jobs_and_set_spawning(self, limit: int) -> list[JobData]: + pass + + @abstractmethod + def get_job_by_id(self, job_id: str) -> Optional[JobData]: + pass + + @abstractmethod + def create_job(self, job: JobData): + pass + + @abstractmethod + def update_job_state(self, job_id: str, new_state: JobState): + pass + + +class DummyJobDatabaseApi(JobDatabaseApi): def __init__(self): self.db_state = {} diff --git a/src/jobs/dummy_job.py b/src/jobs/dummy_job.py index 25eaa9a04f4d25f696601998d896f1c5f624626b..a4e58246e5fc0b1f9d52b9a08adbd37029e01758 100644 --- a/src/jobs/dummy_job.py +++ b/src/jobs/dummy_job.py @@ -1,12 +1,12 @@ -from job_api import JobData +from job_database_api import JobData from jobs.wrapped_job import WrappedJob class DummyJob(WrappedJob): def __init__(self, jobData: JobData): - super().__init__(jobData, image="quay.io/msrd0/render_video:latest", args=["--help"]) + super().__init__(jobData, image="busybox", args=["sleep", "5"]) def prepare(self, cstate: "ControllerState"): print("Preparing dummy job") diff --git a/src/jobs/wrapped_job.py b/src/jobs/wrapped_job.py index c8c760af91ee0bae296a7cbb53ac59fd3be56cde..d93db4b2df4dc46a0bb29b67ecaab4cfe25f80fc 100644 --- a/src/jobs/wrapped_job.py +++ b/src/jobs/wrapped_job.py @@ -1,6 +1,6 @@ from abc import abstractmethod from event_queue import Event, EventResult -from job_api import JobData, JobState +from job_database_api import JobData, JobState class WrappedJob(Event): diff --git a/src/main.py b/src/main.py index 3f10a4823a31147420da3b0100914afb3b8b0604..e0cf58d74228d32dc1281b202b8a877d731ae4d8 100644 --- a/src/main.py +++ b/src/main.py @@ -1,7 +1,7 @@ from actions.spawn_job import WatchJob from event_queue import EventResult from actions.find_ready_jobs import FindReadyJobs -from job_api import JobData +from job_database_api import JobData from job_controller import ControllerState import datetime