diff --git a/src/actions/find_ready_jobs.py b/src/actions/find_ready_jobs.py index 86e9b08fd0a9cd3c39cdd2176b0ffaa5cd5f51c0..13f8a7c0306a190c8f4b3ae163ccf09465b773a4 100644 --- a/src/actions/find_ready_jobs.py +++ b/src/actions/find_ready_jobs.py @@ -16,10 +16,10 @@ class FindReadyJobs(RecurringEvent): # get next jobs to spawn jobs = cstate.job_api.get_next_jobs_and_set_spawning(5) - print("Found {} jobs to spawn".format(len(jobs))) - for job in jobs: - cstate.event_queue.put(SpawnJob(job)) - - if len(jobs) == 0: - return datetime.timedelta(seconds=5) # wait 5 seconds if no jobs are ready + if len(jobs) > 0: + print("Found {} jobs to spawn".format(len(jobs))) + for job in jobs: + cstate.event_queue.put(SpawnJob(job)) + else: + return datetime.timedelta(seconds=10) # wait 10 seconds if no jobs are ready return datetime.timedelta(seconds=2) # otherwise wait 2 seconds diff --git a/src/actions/spawn_job.py b/src/actions/spawn_job.py index 41c2d23daeff2c0006a91ebb5f069d0d9ec4dc05..1558e63f258928407eeb8772c69c31307274c206 100644 --- a/src/actions/spawn_job.py +++ b/src/actions/spawn_job.py @@ -1,6 +1,6 @@ from collections.abc import Generator from event_queue import GeneratorRecurringEvent -from job_api import Job +from job_api import Job, JobState from job_controller import ControllerState import datetime @@ -11,14 +11,13 @@ class SpawnJob(GeneratorRecurringEvent): def __init__(self, job: Job): super().__init__() self.job = job - self.has_active_job = False def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]: # wait for empty queue while True: total_num_jobs = 0 for evt in cstate.event_queue.queue.queue: - if isinstance(evt, SpawnJob) and evt.has_active_job: + if isinstance(evt, WatchJob): total_num_jobs += 1 if total_num_jobs < MAX_CONCURRENT_JOBS: break @@ -26,24 +25,47 @@ class SpawnJob(GeneratorRecurringEvent): yield datetime.timedelta(seconds=10) print("Spawning job with id: {}".format(self.job.job_id)) - self.has_active_job = True + cstate.job_api.update_job_state(self.job.job_id, JobState.SPAWNING) ret = cstate.k8s.create_job(self.job) if ret is None: raise Exception("Failed to spawn job") - yield datetime.timedelta(seconds=5) # wait 5 seconds before checking if job is done + # spawn a WatchJob event to watch the job finish/fail + cstate.event_queue.put(WatchJob(self.job.job_id, due_at=datetime.timedelta(seconds=5))) + + +class WatchJob(GeneratorRecurringEvent): + def __init__(self, job_id: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.job_id = job_id + + def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]: while True: - job_info = cstate.k8s.get_job(self.job.job_id) + job_info = cstate.k8s.get_job(self.job_id) if job_info.status.succeeded is not None: print("Job finished successfully") break if job_info.status.failed is not None: print("Job failed") break - print("Job status:", job_info.status) + # print("Job status:", job_info.status) print("Job not finished yet, waiting 5 seconds...") yield datetime.timedelta(seconds=5) - self.has_active_job = False - # job is done, process it - # TODO: implement processing + # look at database to see if the job has marked itself as finished/failed + job_in_db = cstate.job_api.get_job_by_id(self.job_id) + if job_in_db is None: + raise Exception(f"Could not find job in db with id: {self.job_id}") + if job_in_db.job_state == JobState.DELETED or job_in_db.job_state == JobState.FINISHED_AND_PROCESSED or job_in_db.job_state == JobState.FAILED_AND_PROCESSED: + print(f"Job already processed in db, state: {job_in_db.job_state}") + return + expected_state = JobState.FINISHED if job_info.status.succeeded is not None else JobState.FAILED + if not job_in_db.job_state == expected_state and (job_in_db.job_state == JobState.FINISHED or job_in_db.job_state == JobState.FAILED): + print(f"Job not marked as expected in db, expected={expected_state} != db={job_in_db.job_state}") + print("Marking job as failed in db (this should not happen, make sure the job exits with a zero code iff. it finished)") + # the only state where this is expected is on startup when old k8s jobs are reconciled + expected_state = JobState.FAILED + + if job_in_db.job_state != expected_state: + print(f"Updating job state in db to: {expected_state}, from: {job_in_db.job_state}") + cstate.job_api.update_job_state(self.job_id, expected_state) diff --git a/src/event_queue.py b/src/event_queue.py index ccc21b228cda2a670966e43fc1b1b6b07f763d82..4f4a88fd3fe8b1973d62f433847c8d19a87d732f 100644 --- a/src/event_queue.py +++ b/src/event_queue.py @@ -12,8 +12,11 @@ class EventResult(Enum): class Event(): - def __init__(self, due_at: datetime.datetime = datetime.datetime.fromtimestamp(0)): - self.due_at = due_at + def __init__(self, due_at: datetime.datetime | datetime.timedelta = datetime.datetime.fromtimestamp(0)): + if isinstance(due_at, datetime.timedelta): + self.due_at = datetime.datetime.now() + due_at + else: + self.due_at = due_at @abstractmethod def action(self, cstate: "ControllerState"): @@ -48,7 +51,7 @@ class RecurringEvent(Event): class GeneratorRecurringEvent(RecurringEvent): def __str__(self): - return f"GeneratorRecurringEvent(action={self.action}, due_at={self.due_at})" + return f"GeneratorRecurringEvent(generator={self.generator}, due_at={self.due_at})" @abstractmethod def generator(self, cstate: "ControllerState") -> Generator[datetime.timedelta, None, None]: diff --git a/src/job_api.py b/src/job_api.py index 71e43d4553b8c2945e3fce86e92f8a1ea7816f8c..b7ca6732392c494f96cf4ec4a8ae52e9b51958dd 100644 --- a/src/job_api.py +++ b/src/job_api.py @@ -1,6 +1,7 @@ from enum import Enum from typing import Optional, List +import copy class JobState(Enum): @@ -43,8 +44,14 @@ class JobApi(): return ret def get_job_by_id(self, job_id: str) -> Optional[Job]: - return self.db_state.get(job_id, None) + return copy.deepcopy(self.db_state.get(job_id, None)) def create_job(self, job: Job): self.job_queue.append(job) - self.db_state[job.job_id] = job + self.db_state[job.job_id] = copy.deepcopy(job) + + def update_job_state(self, job_id: str, new_state: JobState): + job = self.db_state.get(job_id, None) + if job is None: + raise Exception(f"Could not find job with id: {job_id}") + job.job_state = new_state diff --git a/src/kubernetes_api.py b/src/kubernetes_api.py index 708f3f305c5cd42551b0a06d485b245af283f603..216727c3d92c4c6fe24007cc5525c02311ec0f63 100644 --- a/src/kubernetes_api.py +++ b/src/kubernetes_api.py @@ -1,6 +1,6 @@ from kubernetes import client, config from kubernetes.client.models.v1_job import V1Job -from kubernetes.client.models.v1_job_status import V1JobStatus +from kubernetes.client.models.v1_job_list import V1JobList import os from job_api import Job @@ -28,7 +28,7 @@ class K8sApi(): print("https://github.com/kubernetes-client/python?tab=readme-ov-file#compatibility-matrix-of-supported-client-versions") print("#############") - def list_worker_jobs(self): + def list_worker_jobs(self) -> V1JobList: return self.batch_v1.list_namespaced_job(NAMESPACE, label_selector=f"app={WORKER_LABEL}") def create_job(self, job: Job): diff --git a/src/main.py b/src/main.py index 71537c945301a62bfa62d34351c94bb24900de40..ffadddcacd67dfb3a39417d2f0f4d62dcd86db8f 100644 --- a/src/main.py +++ b/src/main.py @@ -1,3 +1,4 @@ +from actions.spawn_job import WatchJob from event_queue import EventResult from actions.find_ready_jobs import FindReadyJobs from job_api import Job @@ -21,25 +22,39 @@ def main(): existing_worker_jobs = cstate.k8s.list_worker_jobs() if len(existing_worker_jobs.items) > 0: print(f"Found {len(existing_worker_jobs.items)} existing worker jobs!") + marked_for_deletion = [] + marked_for_watch = [] for job in existing_worker_jobs.items: print(f" - {job.metadata.name}") - if args.purge_existing_jobs: - print("Deleting existing jobs...") - for job in existing_worker_jobs.items: - print(cstate.k8s.delete_job_by_name(job.metadata.name)) - print("Done deleting existing jobs") - time.sleep(3) # wait a bit for k8s to delete the jobs - else: - # fail - print("Exiting because existing jobs were found") - print("You can delete them by running this script with the --purge_existing_jobs flag") - sys.exit(1) - return + job_in_db = cstate.job_api.get_job_by_id(job.metadata.labels["job_id"]) + if job_in_db is None: + print(f"Could not find job in db with id: {job.metadata.labels['job_id']}") + marked_for_deletion.append(job) + else: + print(f" - Job state in db: {job_in_db.job_state}") + # TODO: figure out if job already finished by looking at db data + marked_for_watch.append(job) + if len(marked_for_deletion) > 0: + if args.purge_existing_jobs: + print(f"Deleting {len(marked_for_deletion)} existing jobs...") + for job in existing_worker_jobs.items: + cstate.k8s.delete_job_by_name(job.metadata.name) + print("Done deleting existing jobs") + time.sleep(3) # wait a bit for k8s to delete the jobs + else: + # fail + print("Exiting because existing jobs were found that could not be reconciled with db state") + print("You can delete them by running this script with the --purge_existing_jobs flag") + sys.exit(1) + return + for watch in marked_for_watch: + cstate.event_queue.put(WatchJob(watch.metadata.labels["job_id"])) print("Done checking for existing jobs") # make some dummy jobs - for i in range(2): + start_id = int(datetime.datetime.now().timestamp()) + for i in range(start_id, start_id + 4): cstate.job_api.create_job(Job("job{}".format(i), "quay.io/msrd0/render_video:latest", args=["--help"])) cstate.event_queue.put(FindReadyJobs()) @@ -49,13 +64,22 @@ def main(): while not evt.canExecute(): # because the queue is sorted by due_at, we can wait until the this event is due tts = (evt.due_at - datetime.datetime.now()).total_seconds() - print(f"Sleeping for {tts} seconds until next event is due: {evt}") + print(f" >> Sleeping for {tts} seconds until next event is due: {evt}") time.sleep(tts) - ret = evt(cstate) - if ret == EventResult.REQUEUE: - cstate.event_queue.put(evt) - elif not ret == EventResult.DONE and ret is not None: - raise Exception(f"Unexpected return value from event: {ret}, {evt}") + try: + start = datetime.datetime.now() + ret = evt(cstate) + end = datetime.datetime.now() + if (end - start).total_seconds() > 0.5: + print(f"!! Event {evt} took {(end - start).total_seconds()}s to execute!") + if ret == EventResult.REQUEUE: + cstate.event_queue.put(evt) + elif not ret == EventResult.DONE and ret is not None: + raise Exception(f"Unexpected return value from event: {ret}, {evt}") + except Exception as e: + print("###") + print(f"Error in event {evt}: {e}") + print("###") if __name__ == "__main__":