diff --git a/src/actions/find_ready_jobs.py b/src/actions/find_ready_jobs.py index 13f8a7c0306a190c8f4b3ae163ccf09465b773a4..501e1ca5069a99e330380008af9e88d1c6a4773e 100644 --- a/src/actions/find_ready_jobs.py +++ b/src/actions/find_ready_jobs.py @@ -19,7 +19,8 @@ class FindReadyJobs(RecurringEvent): if len(jobs) > 0: print("Found {} jobs to spawn".format(len(jobs))) for job in jobs: - cstate.event_queue.put(SpawnJob(job)) + wrapped = cstate.wrap_job(job) + cstate.event_queue.put(SpawnJob(wrapped)) else: return datetime.timedelta(seconds=10) # wait 10 seconds if no jobs are ready return datetime.timedelta(seconds=2) # otherwise wait 2 seconds diff --git a/src/actions/spawn_job.py b/src/actions/spawn_job.py index 1558e63f258928407eeb8772c69c31307274c206..19e902269e74fb3ca6437494dd8dd738f4d4bbe1 100644 --- a/src/actions/spawn_job.py +++ b/src/actions/spawn_job.py @@ -1,14 +1,16 @@ from collections.abc import Generator from event_queue import GeneratorRecurringEvent -from job_api import Job, JobState +from job_api import JobData, JobState from job_controller import ControllerState import datetime +from jobs.wrapped_job import WrappedJob + MAX_CONCURRENT_JOBS = 5 class SpawnJob(GeneratorRecurringEvent): - def __init__(self, job: Job): + def __init__(self, job: WrappedJob): super().__init__() self.job = job @@ -24,24 +26,24 @@ class SpawnJob(GeneratorRecurringEvent): print("Too many active jobs, waiting 10 seconds...") yield datetime.timedelta(seconds=10) - print("Spawning job with id: {}".format(self.job.job_id)) - cstate.job_api.update_job_state(self.job.job_id, JobState.SPAWNING) + print("Spawning job with id: {}".format(self.job.jobData.job_id)) + self.job.jobData.update_state(cstate, JobState.SPAWNING) ret = cstate.k8s.create_job(self.job) if ret is None: raise Exception("Failed to spawn job") # spawn a WatchJob event to watch the job finish/fail - cstate.event_queue.put(WatchJob(self.job.job_id, due_at=datetime.timedelta(seconds=5))) + cstate.event_queue.put(WatchJob(self.job, due_at=datetime.timedelta(seconds=5))) class WatchJob(GeneratorRecurringEvent): - def __init__(self, job_id: str, *args, **kwargs): + def __init__(self, job: WrappedJob, *args, **kwargs): super().__init__(*args, **kwargs) - self.job_id = job_id + self.job = job def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]: while True: - job_info = cstate.k8s.get_job(self.job_id) + job_info = cstate.k8s.get_job(self.job.jobData.job_id) if job_info.status.succeeded is not None: print("Job finished successfully") break @@ -53,19 +55,21 @@ class WatchJob(GeneratorRecurringEvent): yield datetime.timedelta(seconds=5) # look at database to see if the job has marked itself as finished/failed - job_in_db = cstate.job_api.get_job_by_id(self.job_id) - if job_in_db is None: - raise Exception(f"Could not find job in db with id: {self.job_id}") - if job_in_db.job_state == JobState.DELETED or job_in_db.job_state == JobState.FINISHED_AND_PROCESSED or job_in_db.job_state == JobState.FAILED_AND_PROCESSED: - print(f"Job already processed in db, state: {job_in_db.job_state}") + old_id = self.job.refresh_data(cstate) + if self.job.jobData is None: + raise Exception(f"Could not find job in db with id={old_id}") + if self.job.job_state == JobState.DELETED or self.job.job_state == JobState.FINISHED_AND_PROCESSED or self.job.job_state == JobState.FAILED_AND_PROCESSED: + print(f"Job already processed in db, state: {self.job.job_state}") return expected_state = JobState.FINISHED if job_info.status.succeeded is not None else JobState.FAILED - if not job_in_db.job_state == expected_state and (job_in_db.job_state == JobState.FINISHED or job_in_db.job_state == JobState.FAILED): - print(f"Job not marked as expected in db, expected={expected_state} != db={job_in_db.job_state}") + if not self.job.job_state == expected_state and (self.job.job_state == JobState.FINISHED or self.job.job_state == JobState.FAILED): + print(f"Job not marked as expected in db, expected={expected_state} != db={self.job.job_state}") print("Marking job as failed in db (this should not happen, make sure the job exits with a zero code iff. it finished)") # the only state where this is expected is on startup when old k8s jobs are reconciled expected_state = JobState.FAILED - if job_in_db.job_state != expected_state: - print(f"Updating job state in db to: {expected_state}, from: {job_in_db.job_state}") - cstate.job_api.update_job_state(self.job_id, expected_state) + if self.job.job_state != expected_state: + print(f"Updating job state in db to: {expected_state}, from: {self.job.job_state}") + self.job.jobData.update_state(cstate, expected_state) + + cstate.event_queue.put(self.job) # the jobwrapper class processes its' jobs result diff --git a/src/event_queue.py b/src/event_queue.py index 6206a6e993e71c13b046a5888e496878252c07ec..782cfba66009d1933c662c9c65fd138172ab6dca 100644 --- a/src/event_queue.py +++ b/src/event_queue.py @@ -1,4 +1,4 @@ -from abc import abstractmethod +from abc import ABCMeta, abstractmethod import queue import datetime from enum import Enum @@ -10,7 +10,7 @@ class EventResult(Enum): REQUEUE = "requeue" # put back in queue -class Event(): +class Event(metaclass=ABCMeta): def __init__(self, due_at: datetime.datetime | datetime.timedelta = datetime.datetime.fromtimestamp(0)): if isinstance(due_at, datetime.timedelta): @@ -34,6 +34,19 @@ class Event(): return self.action(cstate) +class FunctionalEvent(Event): + + def __init__(self, action_func, due_at: datetime.datetime | datetime.timedelta = datetime.datetime.fromtimestamp(0)): + self.action_func = action_func + super().__init__(due_at) + + def __str__(self): + return f"FunctionalEvent(action={self.action}, due_at={self.due_at})" + + def action(self, cstate: "ControllerState"): + return self.action_func(cstate) + + class RecurringEvent(Event): def __str__(self): return f"RecurringEvent(action={self.action}, due_at={self.due_at})" @@ -77,6 +90,8 @@ class EventQueue(): self.counter += 1 # this enforces FIFO in the queue (after comparing due_at) def get(self): + if self.queue.empty(): + raise Exception("Queue is empty") return self.queue.get()[2] def empty(self): diff --git a/src/job_api.py b/src/job_api.py index b7ca6732392c494f96cf4ec4a8ae52e9b51958dd..eb0ecc75854c17a6efddfd8393584483c0d8af78 100644 --- a/src/job_api.py +++ b/src/job_api.py @@ -1,6 +1,6 @@ from enum import Enum -from typing import Optional, List +from typing import Optional import copy @@ -15,38 +15,40 @@ class JobState(Enum): DELETED = "deleted" -class Job(): +class JobData(): # this should represent an object extracted from the database - def __init__(self, job_id: str, image: str, command: List[str] | None = None, args: List[str] | None = None, job_state: JobState = JobState.READY): + def __init__(self, job_id: str, job_type: str, job_state: JobState = JobState.READY): self.job_id = job_id self.job_state = job_state - self.image = image - self.command = command - self.args = args + self.job_type = job_type + + def update_state(self, cstate: "ControllerState", new_state: JobState): + cstate.job_api.update_job_state(self.job_id, new_state) + self.job_state = new_state # this is just a dummy implementation # this should be replaced by a real implementation that always interacts with the database (no caching!) -class JobApi(): +class JobDatabaseApi(): def __init__(self): self.db_state = {} self.job_queue = [] - def get_next_jobs_and_set_spawning(self, limit: int) -> List[Job]: + def get_next_jobs_and_set_spawning(self, limit: int) -> list[JobData]: if len(self.job_queue) == 0: return [] - ret: List[Job] = [] + ret: list[JobData] = [] while len(ret) < limit and len(self.job_queue) > 0: next = self.job_queue.pop(0) next.job_state = JobState.SPAWNING ret.append(next) return ret - def get_job_by_id(self, job_id: str) -> Optional[Job]: + def get_job_by_id(self, job_id: str) -> Optional[JobData]: return copy.deepcopy(self.db_state.get(job_id, None)) - def create_job(self, job: Job): + def create_job(self, job: JobData): self.job_queue.append(job) self.db_state[job.job_id] = copy.deepcopy(job) diff --git a/src/job_controller.py b/src/job_controller.py index 77cf9515643354849a62ea9aa604f8735225ac3e..72a4de5fc06490c7da132654eba54374f98ff62f 100644 --- a/src/job_controller.py +++ b/src/job_controller.py @@ -1,5 +1,6 @@ from event_queue import EventQueue -from job_api import JobApi +from job_api import JobData, JobDatabaseApi +from jobs.dummy_job import DummyJob from kubernetes_api import K8sApi @@ -7,5 +8,10 @@ class ControllerState(): def __init__(self) -> None: self.k8s = K8sApi() - self.job_api = JobApi() + self.job_api = JobDatabaseApi() self.event_queue = EventQueue() + + def wrap_job(self, job: JobData) -> "WrappedJob": + if job.job_type == "dummy": + return DummyJob(job) + raise Exception(f"Unknown job type: {job.job_type}") diff --git a/src/jobs/dummy_job.py b/src/jobs/dummy_job.py new file mode 100644 index 0000000000000000000000000000000000000000..99521b9f2640bf49149ca5a28f6d1a9bbad5d0d8 --- /dev/null +++ b/src/jobs/dummy_job.py @@ -0,0 +1,15 @@ + +from job_api import JobData +from jobs.wrapped_job import WrappedJob + + +class DummyJob(WrappedJob): + + def __init__(self, jobData: JobData): + super().__init__(jobData, image="quay.io/msrd0/render_video:latest", args=["--help"]) + + def success(self, cstate): + print("Dummy job succeeded") + + def failure(self, cstate): + print("Dummy job failed") diff --git a/src/jobs/wrapped_job.py b/src/jobs/wrapped_job.py new file mode 100644 index 0000000000000000000000000000000000000000..a259a14a5ec4a3bb49eeab083a42b286d23660b9 --- /dev/null +++ b/src/jobs/wrapped_job.py @@ -0,0 +1,45 @@ +from abc import abstractmethod +from event_queue import Event, EventResult +from job_api import JobData, JobState + + +class WrappedJob(Event): + + def __init__(self, jobData: JobData, image: str, command: list[str] | None = None, args: list[str] | None = None): + super().__init__() + self.jobData = jobData + self.image = image + self.command = command + self.args = args + + @abstractmethod + def success(self, cstate: "ControllerState"): + pass + + @abstractmethod + def failure(self, cstate: "ControllerState"): + pass + + def action(self, cstate: "ControllerState"): + if self.job_state == JobState.FINISHED: + self.success(cstate) + self.jobData.update_state(cstate, JobState.FINISHED_AND_PROCESSED) + elif self.job_state == JobState.FAILED: + self.failure(cstate) + self.jobData.update_state(cstate, JobState.FAILED_AND_PROCESSED) + else: + raise Exception(f"Job is in unexpected state: {self.job_state}") + return EventResult.DONE + + @property + def job_id(self): + return self.jobData.job_id + + @property + def job_state(self) -> JobState: + return self.jobData.job_state + + def refresh_data(self, cstate: "ControllerState"): + old_id = self.jobData.job_id + self.jobData = cstate.job_api.get_job_by_id(self.jobData.job_id) + return old_id diff --git a/src/kubernetes_api.py b/src/kubernetes_api.py index 5abbfccec03e64ab80b575dfd93bc2b65f5614eb..851a2a2edb825350562b0483718b18b5d5959a5c 100644 --- a/src/kubernetes_api.py +++ b/src/kubernetes_api.py @@ -3,7 +3,7 @@ from kubernetes.client.models.v1_job import V1Job from kubernetes.client.models.v1_job_list import V1JobList import os -from job_api import Job +from jobs.wrapped_job import WrappedJob NAMESPACE = "videoag-prod" WORKER_LABEL = "videoag-worker" @@ -33,7 +33,7 @@ class K8sApi(): def list_worker_jobs(self) -> V1JobList: return self.batch_v1.list_namespaced_job(NAMESPACE, label_selector=f"app={WORKER_LABEL}") - def create_job(self, job: Job): + def create_job(self, job: WrappedJob): job_manifest = { "apiVersion": "batch/v1", "kind": "Job", diff --git a/src/main.py b/src/main.py index de2b7410efc968d977a4c91ac98539574b81b814..4230586c9e6ddb31f26b0810dd27eb0b3a0eb522 100644 --- a/src/main.py +++ b/src/main.py @@ -1,7 +1,7 @@ from actions.spawn_job import WatchJob from event_queue import EventResult from actions.find_ready_jobs import FindReadyJobs -from job_api import Job +from job_api import JobData from job_controller import ControllerState import datetime @@ -62,7 +62,7 @@ def main(): # make some dummy jobs start_id = int(datetime.datetime.now().timestamp()) for i in range(start_id, start_id + 4): - cstate.job_api.create_job(Job("job{}".format(i), "quay.io/msrd0/render_video:latest", args=["--help"])) + cstate.job_api.create_job(JobData("job{}".format(i), "dummy")) cstate.event_queue.put(FindReadyJobs()) @@ -70,6 +70,9 @@ def main(): def signal_handler(sig, frame): nonlocal run_event_loop + if run_event_loop == False: + print("Force quitting event loop...") + sys.exit(2) print("Stopping event loop...") run_event_loop = False signal.signal(signal.SIGINT, signal_handler)