diff --git a/src/job_controller.py b/src/job_controller.py index afbeb4ea69f990613ff56682c2c59f444705ee39..79cb18026f6ba6e5861ef32b9ad46d128186c5c3 100644 --- a/src/job_controller.py +++ b/src/job_controller.py @@ -1,6 +1,7 @@ import types from event_queue import EventQueue from job_database_api import DummyJobDatabaseApi, JobData, JobDatabaseApi +from jobs.cut_and_transcode import CutAndTranscode from jobs.dummy_job import DummyJob from kubernetes_api import K8sApi import os @@ -9,7 +10,7 @@ import datetime def load_config(): cfgPath = os.environ.get("VIDEOAG_JOB_CONTROLLER_CONFIG", "../config/job_controller_config.py") - filename = os.path.join(os.getcwd(), cfgPath) + filename = os.path.join(os.path.dirname(__file__), cfgPath) d = types.ModuleType("config") d.__file__ = filename try: @@ -33,10 +34,22 @@ class ControllerState(): db_engine = self.config.get("DB_ENGINE") if db_engine == "dummy": self.job_api = DummyJobDatabaseApi() + # make some dummy jobs start_id = int(datetime.datetime.now().timestamp()) for i in range(start_id, start_id + 4): self.job_api.create_job(JobData("job{}".format(i), "dummy")) + + self.job_api.create_job(JobData("job{}".format(start_id + 4), "cut_and_transcode", job_args={ + "input_file": "/mnt/video_data/240605/C01.mp4", + "course": "samplecourse", + "label": "samplelabel", + "docent": "sampledocent", + "date": "2021-06-01", + "start_time": "5", + "end_time": "30", + "lang": "de" + })) else: raise Exception(f"Unknown DB_ENGINE: {db_engine}") self.event_queue = EventQueue() @@ -44,4 +57,6 @@ class ControllerState(): def wrap_job(self, job: JobData) -> "WrappedJob": if job.job_type == "dummy": return DummyJob(job) + if job.job_type == "cut_and_transcode": + return CutAndTranscode(job) raise Exception(f"Unknown job type: {job.job_type}") diff --git a/src/job_database_api.py b/src/job_database_api.py index 93d904c5cf14f37872ef2b9827c68236490f2008..a74e3e147bbe288cb538bcf6836da99a1ee1df8d 100644 --- a/src/job_database_api.py +++ b/src/job_database_api.py @@ -18,10 +18,13 @@ class JobState(Enum): class JobData(): # this should represent an object extracted from the database - def __init__(self, job_id: str, job_type: str, job_state: JobState = JobState.READY): + def __init__(self, job_id: str, job_type: str, job_args: dict = {}, job_state: JobState = JobState.READY, job_result: dict = {}, job_monitoring: dict = {}): self.job_id = job_id self.job_state = job_state self.job_type = job_type + self.job_args = job_args + self.job_result = job_result + self.job_monitoring = job_monitoring def update_state(self, cstate: "ControllerState", new_state: JobState): cstate.job_api.update_job_state(self.job_id, new_state) @@ -49,7 +52,7 @@ class JobDatabaseApi(metaclass=ABCMeta): pass # insert into db @abstractmethod - def update_job_state(self, job_id: str, new_state: JobState): + def update_job_state(self, job_id: str, new_state: JobState, args: dict | None = None, result: dict | None = None, monitoring: dict | None = None): pass # update state in db @@ -79,8 +82,14 @@ class DummyJobDatabaseApi(JobDatabaseApi): self.job_queue.append(job) self.db_state[job.job_id] = copy.deepcopy(job) - def update_job_state(self, job_id: str, new_state: JobState): + def update_job_state(self, job_id: str, new_state: JobState, args: dict | None = None, result: dict | None = None, monitoring: dict | None = None): job = self.db_state.get(job_id, None) if job is None: raise Exception(f"Could not find job with id: {job_id}") job.job_state = new_state + if args is not None: + job.job_args = copy.deepcopy(args) + if result is not None: + job.job_result = copy.deepcopy(result) + if monitoring is not None: + job.job_monitoring = copy.deepcopy(monitoring) diff --git a/src/jobs/cut_and_transcode.py b/src/jobs/cut_and_transcode.py new file mode 100644 index 0000000000000000000000000000000000000000..b41cca27f4a841f9230b75ee298ef3cf86b42a8b --- /dev/null +++ b/src/jobs/cut_and_transcode.py @@ -0,0 +1,65 @@ + +from job_database_api import JobData +from jobs.wrapped_job import ContainerConfig, WrappedJob +import os + + +class CutAndTranscode(WrappedJob): + + def __init__(self, jobData: JobData): + super().__init__(jobData) + self.input_file = jobData.job_args["input_file"] + self.in_dir = os.path.dirname(self.input_file) + + def make_project_toml(self): + return f""" + [lecture] + course = "{self.jobData.job_args['course']}" + label = "{self.jobData.job_args['label']}" + docent = "{self.jobData.job_args['docent']}" + date = "{self.jobData.job_args['date']}" + lang = "{self.jobData.job_args.get("lang", "de")}" + + [source] + files = ["{self.input_file}"] + stereo = false + start = "{self.jobData.job_args['start_time']}" + end = "{self.jobData.job_args['end_time']}" + transcode_lowest = "720p" + transcode_highest = "1440p" + fast = [] + questions = [] + + [progress] + preprocessed = false + asked_start_end = true + asked_fast = true + asked_questions = true + rendered_assets = false + rendered = false + transcoded = ["720p", "1080p", "1440p"] + """ + + def config(self) -> ContainerConfig: + cfg = ContainerConfig(image="registry.git.fsmpi.rwth-aachen.de/videoag_infra/production/dominic_render_video:latest", args=["-C", self.in_dir]) + cfg.mount_video_fs("/mnt/video_data") + return cfg + + def prepare(self, cstate: "ControllerState"): + # prepare preset + if not os.path.exists(self.input_file): + raise Exception(f"Input file {self.input_file} does not exist") + + project_toml = os.path.join(self.in_dir, "project.toml") + + if os.path.exists(project_toml): + os.remove(project_toml) # TODO: handle previous job failure? + + with open(project_toml, "w") as f: + f.write(self.make_project_toml()) + + def success(self, cstate): + print("Transcode success!") + + def failure(self, cstate): + print("Transcode fail :(") diff --git a/src/jobs/dummy_job.py b/src/jobs/dummy_job.py index a4e58246e5fc0b1f9d52b9a08adbd37029e01758..1f67de596e6ed98d38e76c878a523858125b4d4b 100644 --- a/src/jobs/dummy_job.py +++ b/src/jobs/dummy_job.py @@ -1,12 +1,15 @@ from job_database_api import JobData -from jobs.wrapped_job import WrappedJob +from jobs.wrapped_job import ContainerConfig, WrappedJob class DummyJob(WrappedJob): def __init__(self, jobData: JobData): - super().__init__(jobData, image="busybox", args=["sleep", "5"]) + super().__init__(jobData) + + def config(self) -> ContainerConfig: + return ContainerConfig(image="busybox", args=["sleep", "60"]) def prepare(self, cstate: "ControllerState"): print("Preparing dummy job") diff --git a/src/jobs/wrapped_job.py b/src/jobs/wrapped_job.py index d93db4b2df4dc46a0bb29b67ecaab4cfe25f80fc..45e636c1092c6093635d07ef24e28c35ba005a7b 100644 --- a/src/jobs/wrapped_job.py +++ b/src/jobs/wrapped_job.py @@ -3,14 +3,27 @@ from event_queue import Event, EventResult from job_database_api import JobData, JobState -class WrappedJob(Event): +class ContainerConfig: - def __init__(self, jobData: JobData, image: str, command: list[str] | None = None, args: list[str] | None = None): - super().__init__() - self.jobData = jobData + def __init__(self, image: str, command: list[str] | None = None, args: list[str] | None = None): self.image = image self.command = command self.args = args + self.mount_video_fs_path: str | None = None + + def mount_video_fs(self, mount_path: str): + self.mount_video_fs_path = mount_path + + +class WrappedJob(Event): + + def __init__(self, jobData: JobData): + super().__init__() + self.jobData = jobData + + @abstractmethod + def config(self) -> ContainerConfig: + pass def prepare(self, cstate: "ControllerState"): pass @@ -34,6 +47,16 @@ class WrappedJob(Event): raise Exception(f"Job is in unexpected state: {self.job_state}") # delete from k8s cstate.k8s.delete_job_by_id(self.job_id) + # update state to processed + if self.job_state == JobState.FINISHED_AND_PROCESSED or self.job_state == JobState.FAILED_AND_PROCESSED: + return EventResult.DONE + + if self.job_state == JobState.FINISHED: + self.jobData.update_state(cstate, JobState.FINISHED_AND_PROCESSED) + elif self.job_state == JobState.FAILED: + self.jobData.update_state(cstate, JobState.FAILED_AND_PROCESSED) + else: + raise Exception(f"Job is in unexpected state: {self.job_state}") return EventResult.DONE @property diff --git a/src/kubernetes_api.py b/src/kubernetes_api.py index 1cbee85015a8b9a2b77fcb8a5a1682813aee0131..47a0f793f7fd1073b8759312c89eb2a624e04003 100644 --- a/src/kubernetes_api.py +++ b/src/kubernetes_api.py @@ -2,6 +2,7 @@ from kubernetes import client, config from kubernetes.client.models.v1_job import V1Job from kubernetes.client.models.v1_job_list import V1JobList import os +import json from jobs.wrapped_job import WrappedJob @@ -42,6 +43,8 @@ class K8sApi(): } resources = {**default_res, **resources} + cfg = job.config() + job_manifest = { "apiVersion": "batch/v1", "kind": "Job", @@ -70,9 +73,9 @@ class K8sApi(): "containers": [ { "name": "worker", - "image": job.image, - "args": job.args, - "command": job.command, + "image": cfg.image, + "args": cfg.args, + "command": cfg.command, "resources": resources } ], @@ -100,6 +103,23 @@ class K8sApi(): } } } + if cfg.mount_video_fs_path is not None: + job_manifest["spec"]["template"]["spec"]["containers"][0]["volumeMounts"] = [ + { + "mountPath": cfg.mount_video_fs_path, + "name": "video-data" + } + ] + job_manifest["spec"]["template"]["spec"]["volumes"] = [ + { + "name": "video-data", + "persistentVolumeClaim": { + "claimName": "video-data-claim" + } + } + ] + + # print(f"Creating job with manifest: {json.dumps(job_manifest)}") return self.batch_v1.create_namespaced_job(NAMESPACE, job_manifest) def delete_job_by_name(self, job_name):