From a607687e1898393c691b7dfe0ea0674edeb15661 Mon Sep 17 00:00:00 2001
From: Dorian Koch <doriank@fsmpi.rwth-aachen.de>
Date: Thu, 26 Sep 2024 15:39:40 +0200
Subject: [PATCH] Add CutAndTranscode Job

---
 src/job_controller.py         | 17 ++++++++-
 src/job_database_api.py       | 15 ++++++--
 src/jobs/cut_and_transcode.py | 65 +++++++++++++++++++++++++++++++++++
 src/jobs/dummy_job.py         |  7 ++--
 src/jobs/wrapped_job.py       | 31 ++++++++++++++---
 src/kubernetes_api.py         | 26 ++++++++++++--
 6 files changed, 148 insertions(+), 13 deletions(-)
 create mode 100644 src/jobs/cut_and_transcode.py

diff --git a/src/job_controller.py b/src/job_controller.py
index afbeb4e..79cb180 100644
--- a/src/job_controller.py
+++ b/src/job_controller.py
@@ -1,6 +1,7 @@
 import types
 from event_queue import EventQueue
 from job_database_api import DummyJobDatabaseApi, JobData, JobDatabaseApi
+from jobs.cut_and_transcode import CutAndTranscode
 from jobs.dummy_job import DummyJob
 from kubernetes_api import K8sApi
 import os
@@ -9,7 +10,7 @@ import datetime
 
 def load_config():
     cfgPath = os.environ.get("VIDEOAG_JOB_CONTROLLER_CONFIG", "../config/job_controller_config.py")
-    filename = os.path.join(os.getcwd(), cfgPath)
+    filename = os.path.join(os.path.dirname(__file__), cfgPath)
     d = types.ModuleType("config")
     d.__file__ = filename
     try:
@@ -33,10 +34,22 @@ class ControllerState():
         db_engine = self.config.get("DB_ENGINE")
         if db_engine == "dummy":
             self.job_api = DummyJobDatabaseApi()
+
             # make some dummy jobs
             start_id = int(datetime.datetime.now().timestamp())
             for i in range(start_id, start_id + 4):
                 self.job_api.create_job(JobData("job{}".format(i), "dummy"))
+
+            self.job_api.create_job(JobData("job{}".format(start_id + 4), "cut_and_transcode", job_args={
+                "input_file": "/mnt/video_data/240605/C01.mp4",
+                "course": "samplecourse",
+                "label": "samplelabel",
+                "docent": "sampledocent",
+                "date": "2021-06-01",
+                "start_time": "5",
+                "end_time": "30",
+                "lang": "de"
+            }))
         else:
             raise Exception(f"Unknown DB_ENGINE: {db_engine}")
         self.event_queue = EventQueue()
@@ -44,4 +57,6 @@ class ControllerState():
     def wrap_job(self, job: JobData) -> "WrappedJob":
         if job.job_type == "dummy":
             return DummyJob(job)
+        if job.job_type == "cut_and_transcode":
+            return CutAndTranscode(job)
         raise Exception(f"Unknown job type: {job.job_type}")
diff --git a/src/job_database_api.py b/src/job_database_api.py
index 93d904c..a74e3e1 100644
--- a/src/job_database_api.py
+++ b/src/job_database_api.py
@@ -18,10 +18,13 @@ class JobState(Enum):
 
 class JobData():
     # this should represent an object extracted from the database
-    def __init__(self, job_id: str, job_type: str, job_state: JobState = JobState.READY):
+    def __init__(self, job_id: str, job_type: str, job_args: dict = {}, job_state: JobState = JobState.READY, job_result: dict = {}, job_monitoring: dict = {}):
         self.job_id = job_id
         self.job_state = job_state
         self.job_type = job_type
+        self.job_args = job_args
+        self.job_result = job_result
+        self.job_monitoring = job_monitoring
 
     def update_state(self, cstate: "ControllerState", new_state: JobState):
         cstate.job_api.update_job_state(self.job_id, new_state)
@@ -49,7 +52,7 @@ class JobDatabaseApi(metaclass=ABCMeta):
         pass  # insert into db
 
     @abstractmethod
-    def update_job_state(self, job_id: str, new_state: JobState):
+    def update_job_state(self, job_id: str, new_state: JobState, args: dict | None = None, result: dict | None = None, monitoring: dict | None = None):
         pass  # update state in db
 
 
@@ -79,8 +82,14 @@ class DummyJobDatabaseApi(JobDatabaseApi):
         self.job_queue.append(job)
         self.db_state[job.job_id] = copy.deepcopy(job)
 
-    def update_job_state(self, job_id: str, new_state: JobState):
+    def update_job_state(self, job_id: str, new_state: JobState, args: dict | None = None, result: dict | None = None, monitoring: dict | None = None):
         job = self.db_state.get(job_id, None)
         if job is None:
             raise Exception(f"Could not find job with id: {job_id}")
         job.job_state = new_state
+        if args is not None:
+            job.job_args = copy.deepcopy(args)
+        if result is not None:
+            job.job_result = copy.deepcopy(result)
+        if monitoring is not None:
+            job.job_monitoring = copy.deepcopy(monitoring)
diff --git a/src/jobs/cut_and_transcode.py b/src/jobs/cut_and_transcode.py
new file mode 100644
index 0000000..b41cca2
--- /dev/null
+++ b/src/jobs/cut_and_transcode.py
@@ -0,0 +1,65 @@
+
+from job_database_api import JobData
+from jobs.wrapped_job import ContainerConfig, WrappedJob
+import os
+
+
+class CutAndTranscode(WrappedJob):
+
+    def __init__(self, jobData: JobData):
+        super().__init__(jobData)
+        self.input_file = jobData.job_args["input_file"]
+        self.in_dir = os.path.dirname(self.input_file)
+
+    def make_project_toml(self):
+        return f"""
+        [lecture]
+        course = "{self.jobData.job_args['course']}"
+        label = "{self.jobData.job_args['label']}"
+        docent = "{self.jobData.job_args['docent']}"
+        date = "{self.jobData.job_args['date']}"
+        lang = "{self.jobData.job_args.get("lang", "de")}"
+
+        [source]
+        files = ["{self.input_file}"]
+        stereo = false
+        start = "{self.jobData.job_args['start_time']}"
+        end = "{self.jobData.job_args['end_time']}"
+        transcode_lowest = "720p"
+        transcode_highest = "1440p"
+        fast = []
+        questions = []
+
+        [progress]
+        preprocessed = false
+        asked_start_end = true
+        asked_fast = true
+        asked_questions = true
+        rendered_assets = false
+        rendered = false
+        transcoded = ["720p", "1080p", "1440p"]
+        """
+
+    def config(self) -> ContainerConfig:
+        cfg = ContainerConfig(image="registry.git.fsmpi.rwth-aachen.de/videoag_infra/production/dominic_render_video:latest", args=["-C", self.in_dir])
+        cfg.mount_video_fs("/mnt/video_data")
+        return cfg
+
+    def prepare(self, cstate: "ControllerState"):
+        # prepare preset
+        if not os.path.exists(self.input_file):
+            raise Exception(f"Input file {self.input_file} does not exist")
+
+        project_toml = os.path.join(self.in_dir, "project.toml")
+
+        if os.path.exists(project_toml):
+            os.remove(project_toml)  # TODO: handle previous job failure?
+
+        with open(project_toml, "w") as f:
+            f.write(self.make_project_toml())
+
+    def success(self, cstate):
+        print("Transcode success!")
+
+    def failure(self, cstate):
+        print("Transcode fail :(")
diff --git a/src/jobs/dummy_job.py b/src/jobs/dummy_job.py
index a4e5824..1f67de5 100644
--- a/src/jobs/dummy_job.py
+++ b/src/jobs/dummy_job.py
@@ -1,12 +1,15 @@
 
 from job_database_api import JobData
-from jobs.wrapped_job import WrappedJob
+from jobs.wrapped_job import ContainerConfig, WrappedJob
 
 
 class DummyJob(WrappedJob):
 
     def __init__(self, jobData: JobData):
-        super().__init__(jobData, image="busybox", args=["sleep", "5"])
+        super().__init__(jobData)
+
+    def config(self) -> ContainerConfig:
+        return ContainerConfig(image="busybox", args=["sleep", "60"])
 
     def prepare(self, cstate: "ControllerState"):
         print("Preparing dummy job")
diff --git a/src/jobs/wrapped_job.py b/src/jobs/wrapped_job.py
index d93db4b..45e636c 100644
--- a/src/jobs/wrapped_job.py
+++ b/src/jobs/wrapped_job.py
@@ -3,14 +3,27 @@ from event_queue import Event, EventResult
 from job_database_api import JobData, JobState
 
 
-class WrappedJob(Event):
+class ContainerConfig:
 
-    def __init__(self, jobData: JobData, image: str, command: list[str] | None = None, args: list[str] | None = None):
-        super().__init__()
-        self.jobData = jobData
+    def __init__(self, image: str, command: list[str] | None = None, args: list[str] | None = None):
         self.image = image
         self.command = command
         self.args = args
+        self.mount_video_fs_path: str | None = None
+
+    def mount_video_fs(self, mount_path: str):
+        self.mount_video_fs_path = mount_path
+
+
+class WrappedJob(Event):
+
+    def __init__(self, jobData: JobData):
+        super().__init__()
+        self.jobData = jobData
+
+    @abstractmethod
+    def config(self) -> ContainerConfig:
+        pass
 
     def prepare(self, cstate: "ControllerState"):
         pass
@@ -34,6 +47,16 @@ class WrappedJob(Event):
             raise Exception(f"Job is in unexpected state: {self.job_state}")
         # delete from k8s
         cstate.k8s.delete_job_by_id(self.job_id)
+        # update state to processed
+        if self.job_state == JobState.FINISHED_AND_PROCESSED or self.job_state == JobState.FAILED_AND_PROCESSED:
+            return EventResult.DONE
+
+        if self.job_state == JobState.FINISHED:
+            self.jobData.update_state(cstate, JobState.FINISHED_AND_PROCESSED)
+        elif self.job_state == JobState.FAILED:
+            self.jobData.update_state(cstate, JobState.FAILED_AND_PROCESSED)
+        else:
+            raise Exception(f"Job is in unexpected state: {self.job_state}")
         return EventResult.DONE
 
     @property
diff --git a/src/kubernetes_api.py b/src/kubernetes_api.py
index 1cbee85..47a0f79 100644
--- a/src/kubernetes_api.py
+++ b/src/kubernetes_api.py
@@ -2,6 +2,7 @@ from kubernetes import client, config
 from kubernetes.client.models.v1_job import V1Job
 from kubernetes.client.models.v1_job_list import V1JobList
 import os
+import json
 
 from jobs.wrapped_job import WrappedJob
 
@@ -42,6 +43,8 @@ class K8sApi():
         }
         resources = {**default_res, **resources}
 
+        cfg = job.config()
+
         job_manifest = {
             "apiVersion": "batch/v1",
             "kind": "Job",
@@ -70,9 +73,9 @@ class K8sApi():
                         "containers": [
                             {
                                 "name": "worker",
-                                "image": job.image,
-                                "args": job.args,
-                                "command": job.command,
+                                "image": cfg.image,
+                                "args": cfg.args,
+                                "command": cfg.command,
                                 "resources": resources
                             }
                         ],
@@ -100,6 +103,23 @@ class K8sApi():
                 }
             }
         }
+        if cfg.mount_video_fs_path is not None:
+            job_manifest["spec"]["template"]["spec"]["containers"][0]["volumeMounts"] = [
+                {
+                    "mountPath": cfg.mount_video_fs_path,
+                    "name": "video-data"
+                }
+            ]
+            job_manifest["spec"]["template"]["spec"]["volumes"] = [
+                {
+                    "name": "video-data",
+                    "persistentVolumeClaim": {
+                        "claimName": "video-data-claim"
+                    }
+                }
+            ]
+
+        # print(f"Creating job with manifest: {json.dumps(job_manifest)}")
         return self.batch_v1.create_namespaced_job(NAMESPACE, job_manifest)
 
     def delete_job_by_name(self, job_name):
-- 
GitLab