Skip to content
Snippets Groups Projects
Verified Commit 25488dda authored by Dorian Koch's avatar Dorian Koch
Browse files

Load config file

parent f08a1877
No related branches found
No related tags found
No related merge requests found
import os
DB_ENGINE = "dummy" # could be postgres
'''
POSTGRES = {
"host": "database",
"port": 5432,
"user": os.environ["DATABASE_username"],
"password": os.environ["DATABASE_password"],
"database": "videoag"
}
'''
from collections.abc import Generator from collections.abc import Generator
from event_queue import GeneratorRecurringEvent from event_queue import GeneratorRecurringEvent
from job_api import JobData, JobState from job_database_api import JobData, JobState
from job_controller import ControllerState from job_controller import ControllerState
import datetime import datetime
......
import types
from event_queue import EventQueue from event_queue import EventQueue
from job_api import JobData, JobDatabaseApi from job_database_api import DummyJobDatabaseApi, JobData, JobDatabaseApi
from jobs.dummy_job import DummyJob from jobs.dummy_job import DummyJob
from kubernetes_api import K8sApi from kubernetes_api import K8sApi
import os
def load_config():
cfgPath = os.environ.get("VIDEOAG_JOB_CONTROLLER_CONFIG", "../config/job_controller_config.py")
filename = os.path.join(os.getcwd(), cfgPath)
d = types.ModuleType("config")
d.__file__ = filename
try:
with open(filename, mode="rb") as config_file:
exec(compile(config_file.read(), filename, "exec"), d.__dict__)
except OSError as e:
print("Unable to load config file: ", e)
raise
cfgMap = {}
for key in dir(d):
if key.isupper():
cfgMap[key] = getattr(d, key)
return cfgMap
class ControllerState(): class ControllerState():
def __init__(self) -> None: def __init__(self) -> None:
self.config = load_config()
self.k8s = K8sApi() self.k8s = K8sApi()
self.job_api = JobDatabaseApi() db_engine = self.config.get("DB_ENGINE")
if db_engine == "dummy":
self.job_api = DummyJobDatabaseApi()
else:
raise Exception(f"Unknown DB_ENGINE: {db_engine}")
self.event_queue = EventQueue() self.event_queue = EventQueue()
def wrap_job(self, job: JobData) -> "WrappedJob": def wrap_job(self, job: JobData) -> "WrappedJob":
......
from abc import ABCMeta, abstractmethod
from enum import Enum from enum import Enum
from typing import Optional from typing import Optional
import copy import copy
...@@ -29,7 +30,26 @@ class JobData(): ...@@ -29,7 +30,26 @@ class JobData():
# this is just a dummy implementation # this is just a dummy implementation
# this should be replaced by a real implementation that always interacts with the database (no caching!) # this should be replaced by a real implementation that always interacts with the database (no caching!)
class JobDatabaseApi(): class JobDatabaseApi(metaclass=ABCMeta):
@abstractmethod
def get_next_jobs_and_set_spawning(self, limit: int) -> list[JobData]:
pass
@abstractmethod
def get_job_by_id(self, job_id: str) -> Optional[JobData]:
pass
@abstractmethod
def create_job(self, job: JobData):
pass
@abstractmethod
def update_job_state(self, job_id: str, new_state: JobState):
pass
class DummyJobDatabaseApi(JobDatabaseApi):
def __init__(self): def __init__(self):
self.db_state = {} self.db_state = {}
......
from job_api import JobData from job_database_api import JobData
from jobs.wrapped_job import WrappedJob from jobs.wrapped_job import WrappedJob
class DummyJob(WrappedJob): class DummyJob(WrappedJob):
def __init__(self, jobData: JobData): def __init__(self, jobData: JobData):
super().__init__(jobData, image="quay.io/msrd0/render_video:latest", args=["--help"]) super().__init__(jobData, image="busybox", args=["sleep", "5"])
def prepare(self, cstate: "ControllerState"): def prepare(self, cstate: "ControllerState"):
print("Preparing dummy job") print("Preparing dummy job")
......
from abc import abstractmethod from abc import abstractmethod
from event_queue import Event, EventResult from event_queue import Event, EventResult
from job_api import JobData, JobState from job_database_api import JobData, JobState
class WrappedJob(Event): class WrappedJob(Event):
......
from actions.spawn_job import WatchJob from actions.spawn_job import WatchJob
from event_queue import EventResult from event_queue import EventResult
from actions.find_ready_jobs import FindReadyJobs from actions.find_ready_jobs import FindReadyJobs
from job_api import JobData from job_database_api import JobData
from job_controller import ControllerState from job_controller import ControllerState
import datetime import datetime
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment