Skip to content
Snippets Groups Projects
Verified Commit 7465cc19 authored by Dorian Koch's avatar Dorian Koch
Browse files

consolidate interface for handling job success/failure

parent 98203e12
No related branches found
No related tags found
No related merge requests found
...@@ -19,7 +19,8 @@ class FindReadyJobs(RecurringEvent): ...@@ -19,7 +19,8 @@ class FindReadyJobs(RecurringEvent):
if len(jobs) > 0: if len(jobs) > 0:
print("Found {} jobs to spawn".format(len(jobs))) print("Found {} jobs to spawn".format(len(jobs)))
for job in jobs: for job in jobs:
cstate.event_queue.put(SpawnJob(job)) wrapped = cstate.wrap_job(job)
cstate.event_queue.put(SpawnJob(wrapped))
else: else:
return datetime.timedelta(seconds=10) # wait 10 seconds if no jobs are ready return datetime.timedelta(seconds=10) # wait 10 seconds if no jobs are ready
return datetime.timedelta(seconds=2) # otherwise wait 2 seconds return datetime.timedelta(seconds=2) # otherwise wait 2 seconds
from collections.abc import Generator from collections.abc import Generator
from event_queue import GeneratorRecurringEvent from event_queue import GeneratorRecurringEvent
from job_api import Job, JobState from job_api import JobData, JobState
from job_controller import ControllerState from job_controller import ControllerState
import datetime import datetime
from jobs.wrapped_job import WrappedJob
MAX_CONCURRENT_JOBS = 5 MAX_CONCURRENT_JOBS = 5
class SpawnJob(GeneratorRecurringEvent): class SpawnJob(GeneratorRecurringEvent):
def __init__(self, job: Job): def __init__(self, job: WrappedJob):
super().__init__() super().__init__()
self.job = job self.job = job
...@@ -24,24 +26,24 @@ class SpawnJob(GeneratorRecurringEvent): ...@@ -24,24 +26,24 @@ class SpawnJob(GeneratorRecurringEvent):
print("Too many active jobs, waiting 10 seconds...") print("Too many active jobs, waiting 10 seconds...")
yield datetime.timedelta(seconds=10) yield datetime.timedelta(seconds=10)
print("Spawning job with id: {}".format(self.job.job_id)) print("Spawning job with id: {}".format(self.job.jobData.job_id))
cstate.job_api.update_job_state(self.job.job_id, JobState.SPAWNING) self.job.jobData.update_state(cstate, JobState.SPAWNING)
ret = cstate.k8s.create_job(self.job) ret = cstate.k8s.create_job(self.job)
if ret is None: if ret is None:
raise Exception("Failed to spawn job") raise Exception("Failed to spawn job")
# spawn a WatchJob event to watch the job finish/fail # spawn a WatchJob event to watch the job finish/fail
cstate.event_queue.put(WatchJob(self.job.job_id, due_at=datetime.timedelta(seconds=5))) cstate.event_queue.put(WatchJob(self.job, due_at=datetime.timedelta(seconds=5)))
class WatchJob(GeneratorRecurringEvent): class WatchJob(GeneratorRecurringEvent):
def __init__(self, job_id: str, *args, **kwargs): def __init__(self, job: WrappedJob, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.job_id = job_id self.job = job
def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]: def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]:
while True: while True:
job_info = cstate.k8s.get_job(self.job_id) job_info = cstate.k8s.get_job(self.job.jobData.job_id)
if job_info.status.succeeded is not None: if job_info.status.succeeded is not None:
print("Job finished successfully") print("Job finished successfully")
break break
...@@ -53,19 +55,21 @@ class WatchJob(GeneratorRecurringEvent): ...@@ -53,19 +55,21 @@ class WatchJob(GeneratorRecurringEvent):
yield datetime.timedelta(seconds=5) yield datetime.timedelta(seconds=5)
# look at database to see if the job has marked itself as finished/failed # look at database to see if the job has marked itself as finished/failed
job_in_db = cstate.job_api.get_job_by_id(self.job_id) old_id = self.job.refresh_data(cstate)
if job_in_db is None: if self.job.jobData is None:
raise Exception(f"Could not find job in db with id: {self.job_id}") raise Exception(f"Could not find job in db with id={old_id}")
if job_in_db.job_state == JobState.DELETED or job_in_db.job_state == JobState.FINISHED_AND_PROCESSED or job_in_db.job_state == JobState.FAILED_AND_PROCESSED: if self.job.job_state == JobState.DELETED or self.job.job_state == JobState.FINISHED_AND_PROCESSED or self.job.job_state == JobState.FAILED_AND_PROCESSED:
print(f"Job already processed in db, state: {job_in_db.job_state}") print(f"Job already processed in db, state: {self.job.job_state}")
return return
expected_state = JobState.FINISHED if job_info.status.succeeded is not None else JobState.FAILED expected_state = JobState.FINISHED if job_info.status.succeeded is not None else JobState.FAILED
if not job_in_db.job_state == expected_state and (job_in_db.job_state == JobState.FINISHED or job_in_db.job_state == JobState.FAILED): if not self.job.job_state == expected_state and (self.job.job_state == JobState.FINISHED or self.job.job_state == JobState.FAILED):
print(f"Job not marked as expected in db, expected={expected_state} != db={job_in_db.job_state}") print(f"Job not marked as expected in db, expected={expected_state} != db={self.job.job_state}")
print("Marking job as failed in db (this should not happen, make sure the job exits with a zero code iff. it finished)") print("Marking job as failed in db (this should not happen, make sure the job exits with a zero code iff. it finished)")
# the only state where this is expected is on startup when old k8s jobs are reconciled # the only state where this is expected is on startup when old k8s jobs are reconciled
expected_state = JobState.FAILED expected_state = JobState.FAILED
if job_in_db.job_state != expected_state: if self.job.job_state != expected_state:
print(f"Updating job state in db to: {expected_state}, from: {job_in_db.job_state}") print(f"Updating job state in db to: {expected_state}, from: {self.job.job_state}")
cstate.job_api.update_job_state(self.job_id, expected_state) self.job.jobData.update_state(cstate, expected_state)
cstate.event_queue.put(self.job) # the jobwrapper class processes its' jobs result
from abc import abstractmethod from abc import ABCMeta, abstractmethod
import queue import queue
import datetime import datetime
from enum import Enum from enum import Enum
...@@ -10,7 +10,7 @@ class EventResult(Enum): ...@@ -10,7 +10,7 @@ class EventResult(Enum):
REQUEUE = "requeue" # put back in queue REQUEUE = "requeue" # put back in queue
class Event(): class Event(metaclass=ABCMeta):
def __init__(self, due_at: datetime.datetime | datetime.timedelta = datetime.datetime.fromtimestamp(0)): def __init__(self, due_at: datetime.datetime | datetime.timedelta = datetime.datetime.fromtimestamp(0)):
if isinstance(due_at, datetime.timedelta): if isinstance(due_at, datetime.timedelta):
...@@ -34,6 +34,19 @@ class Event(): ...@@ -34,6 +34,19 @@ class Event():
return self.action(cstate) return self.action(cstate)
class FunctionalEvent(Event):
def __init__(self, action_func, due_at: datetime.datetime | datetime.timedelta = datetime.datetime.fromtimestamp(0)):
self.action_func = action_func
super().__init__(due_at)
def __str__(self):
return f"FunctionalEvent(action={self.action}, due_at={self.due_at})"
def action(self, cstate: "ControllerState"):
return self.action_func(cstate)
class RecurringEvent(Event): class RecurringEvent(Event):
def __str__(self): def __str__(self):
return f"RecurringEvent(action={self.action}, due_at={self.due_at})" return f"RecurringEvent(action={self.action}, due_at={self.due_at})"
...@@ -77,6 +90,8 @@ class EventQueue(): ...@@ -77,6 +90,8 @@ class EventQueue():
self.counter += 1 # this enforces FIFO in the queue (after comparing due_at) self.counter += 1 # this enforces FIFO in the queue (after comparing due_at)
def get(self): def get(self):
if self.queue.empty():
raise Exception("Queue is empty")
return self.queue.get()[2] return self.queue.get()[2]
def empty(self): def empty(self):
......
from enum import Enum from enum import Enum
from typing import Optional, List from typing import Optional
import copy import copy
...@@ -15,38 +15,40 @@ class JobState(Enum): ...@@ -15,38 +15,40 @@ class JobState(Enum):
DELETED = "deleted" DELETED = "deleted"
class Job(): class JobData():
# this should represent an object extracted from the database # this should represent an object extracted from the database
def __init__(self, job_id: str, image: str, command: List[str] | None = None, args: List[str] | None = None, job_state: JobState = JobState.READY): def __init__(self, job_id: str, job_type: str, job_state: JobState = JobState.READY):
self.job_id = job_id self.job_id = job_id
self.job_state = job_state self.job_state = job_state
self.image = image self.job_type = job_type
self.command = command
self.args = args def update_state(self, cstate: "ControllerState", new_state: JobState):
cstate.job_api.update_job_state(self.job_id, new_state)
self.job_state = new_state
# this is just a dummy implementation # this is just a dummy implementation
# this should be replaced by a real implementation that always interacts with the database (no caching!) # this should be replaced by a real implementation that always interacts with the database (no caching!)
class JobApi(): class JobDatabaseApi():
def __init__(self): def __init__(self):
self.db_state = {} self.db_state = {}
self.job_queue = [] self.job_queue = []
def get_next_jobs_and_set_spawning(self, limit: int) -> List[Job]: def get_next_jobs_and_set_spawning(self, limit: int) -> list[JobData]:
if len(self.job_queue) == 0: if len(self.job_queue) == 0:
return [] return []
ret: List[Job] = [] ret: list[JobData] = []
while len(ret) < limit and len(self.job_queue) > 0: while len(ret) < limit and len(self.job_queue) > 0:
next = self.job_queue.pop(0) next = self.job_queue.pop(0)
next.job_state = JobState.SPAWNING next.job_state = JobState.SPAWNING
ret.append(next) ret.append(next)
return ret return ret
def get_job_by_id(self, job_id: str) -> Optional[Job]: def get_job_by_id(self, job_id: str) -> Optional[JobData]:
return copy.deepcopy(self.db_state.get(job_id, None)) return copy.deepcopy(self.db_state.get(job_id, None))
def create_job(self, job: Job): def create_job(self, job: JobData):
self.job_queue.append(job) self.job_queue.append(job)
self.db_state[job.job_id] = copy.deepcopy(job) self.db_state[job.job_id] = copy.deepcopy(job)
......
from event_queue import EventQueue from event_queue import EventQueue
from job_api import JobApi from job_api import JobData, JobDatabaseApi
from jobs.dummy_job import DummyJob
from kubernetes_api import K8sApi from kubernetes_api import K8sApi
...@@ -7,5 +8,10 @@ class ControllerState(): ...@@ -7,5 +8,10 @@ class ControllerState():
def __init__(self) -> None: def __init__(self) -> None:
self.k8s = K8sApi() self.k8s = K8sApi()
self.job_api = JobApi() self.job_api = JobDatabaseApi()
self.event_queue = EventQueue() self.event_queue = EventQueue()
def wrap_job(self, job: JobData) -> "WrappedJob":
if job.job_type == "dummy":
return DummyJob(job)
raise Exception(f"Unknown job type: {job.job_type}")
from job_api import JobData
from jobs.wrapped_job import WrappedJob
class DummyJob(WrappedJob):
def __init__(self, jobData: JobData):
super().__init__(jobData, image="quay.io/msrd0/render_video:latest", args=["--help"])
def success(self, cstate):
print("Dummy job succeeded")
def failure(self, cstate):
print("Dummy job failed")
from abc import abstractmethod
from event_queue import Event, EventResult
from job_api import JobData, JobState
class WrappedJob(Event):
def __init__(self, jobData: JobData, image: str, command: list[str] | None = None, args: list[str] | None = None):
super().__init__()
self.jobData = jobData
self.image = image
self.command = command
self.args = args
@abstractmethod
def success(self, cstate: "ControllerState"):
pass
@abstractmethod
def failure(self, cstate: "ControllerState"):
pass
def action(self, cstate: "ControllerState"):
if self.job_state == JobState.FINISHED:
self.success(cstate)
self.jobData.update_state(cstate, JobState.FINISHED_AND_PROCESSED)
elif self.job_state == JobState.FAILED:
self.failure(cstate)
self.jobData.update_state(cstate, JobState.FAILED_AND_PROCESSED)
else:
raise Exception(f"Job is in unexpected state: {self.job_state}")
return EventResult.DONE
@property
def job_id(self):
return self.jobData.job_id
@property
def job_state(self) -> JobState:
return self.jobData.job_state
def refresh_data(self, cstate: "ControllerState"):
old_id = self.jobData.job_id
self.jobData = cstate.job_api.get_job_by_id(self.jobData.job_id)
return old_id
...@@ -3,7 +3,7 @@ from kubernetes.client.models.v1_job import V1Job ...@@ -3,7 +3,7 @@ from kubernetes.client.models.v1_job import V1Job
from kubernetes.client.models.v1_job_list import V1JobList from kubernetes.client.models.v1_job_list import V1JobList
import os import os
from job_api import Job from jobs.wrapped_job import WrappedJob
NAMESPACE = "videoag-prod" NAMESPACE = "videoag-prod"
WORKER_LABEL = "videoag-worker" WORKER_LABEL = "videoag-worker"
...@@ -33,7 +33,7 @@ class K8sApi(): ...@@ -33,7 +33,7 @@ class K8sApi():
def list_worker_jobs(self) -> V1JobList: def list_worker_jobs(self) -> V1JobList:
return self.batch_v1.list_namespaced_job(NAMESPACE, label_selector=f"app={WORKER_LABEL}") return self.batch_v1.list_namespaced_job(NAMESPACE, label_selector=f"app={WORKER_LABEL}")
def create_job(self, job: Job): def create_job(self, job: WrappedJob):
job_manifest = { job_manifest = {
"apiVersion": "batch/v1", "apiVersion": "batch/v1",
"kind": "Job", "kind": "Job",
......
from actions.spawn_job import WatchJob from actions.spawn_job import WatchJob
from event_queue import EventResult from event_queue import EventResult
from actions.find_ready_jobs import FindReadyJobs from actions.find_ready_jobs import FindReadyJobs
from job_api import Job from job_api import JobData
from job_controller import ControllerState from job_controller import ControllerState
import datetime import datetime
...@@ -62,7 +62,7 @@ def main(): ...@@ -62,7 +62,7 @@ def main():
# make some dummy jobs # make some dummy jobs
start_id = int(datetime.datetime.now().timestamp()) start_id = int(datetime.datetime.now().timestamp())
for i in range(start_id, start_id + 4): for i in range(start_id, start_id + 4):
cstate.job_api.create_job(Job("job{}".format(i), "quay.io/msrd0/render_video:latest", args=["--help"])) cstate.job_api.create_job(JobData("job{}".format(i), "dummy"))
cstate.event_queue.put(FindReadyJobs()) cstate.event_queue.put(FindReadyJobs())
...@@ -70,6 +70,9 @@ def main(): ...@@ -70,6 +70,9 @@ def main():
def signal_handler(sig, frame): def signal_handler(sig, frame):
nonlocal run_event_loop nonlocal run_event_loop
if run_event_loop == False:
print("Force quitting event loop...")
sys.exit(2)
print("Stopping event loop...") print("Stopping event loop...")
run_event_loop = False run_event_loop = False
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment