Skip to content
Snippets Groups Projects
Verified Commit 8d359802 authored by Dorian Koch's avatar Dorian Koch
Browse files

reconcile existing k8s jobs

parent 85bb527f
No related branches found
No related tags found
No related merge requests found
...@@ -16,10 +16,10 @@ class FindReadyJobs(RecurringEvent): ...@@ -16,10 +16,10 @@ class FindReadyJobs(RecurringEvent):
# get next jobs to spawn # get next jobs to spawn
jobs = cstate.job_api.get_next_jobs_and_set_spawning(5) jobs = cstate.job_api.get_next_jobs_and_set_spawning(5)
if len(jobs) > 0:
print("Found {} jobs to spawn".format(len(jobs))) print("Found {} jobs to spawn".format(len(jobs)))
for job in jobs: for job in jobs:
cstate.event_queue.put(SpawnJob(job)) cstate.event_queue.put(SpawnJob(job))
else:
if len(jobs) == 0: return datetime.timedelta(seconds=10) # wait 10 seconds if no jobs are ready
return datetime.timedelta(seconds=5) # wait 5 seconds if no jobs are ready
return datetime.timedelta(seconds=2) # otherwise wait 2 seconds return datetime.timedelta(seconds=2) # otherwise wait 2 seconds
from collections.abc import Generator from collections.abc import Generator
from event_queue import GeneratorRecurringEvent from event_queue import GeneratorRecurringEvent
from job_api import Job from job_api import Job, JobState
from job_controller import ControllerState from job_controller import ControllerState
import datetime import datetime
...@@ -11,14 +11,13 @@ class SpawnJob(GeneratorRecurringEvent): ...@@ -11,14 +11,13 @@ class SpawnJob(GeneratorRecurringEvent):
def __init__(self, job: Job): def __init__(self, job: Job):
super().__init__() super().__init__()
self.job = job self.job = job
self.has_active_job = False
def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]: def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]:
# wait for empty queue # wait for empty queue
while True: while True:
total_num_jobs = 0 total_num_jobs = 0
for evt in cstate.event_queue.queue.queue: for evt in cstate.event_queue.queue.queue:
if isinstance(evt, SpawnJob) and evt.has_active_job: if isinstance(evt, WatchJob):
total_num_jobs += 1 total_num_jobs += 1
if total_num_jobs < MAX_CONCURRENT_JOBS: if total_num_jobs < MAX_CONCURRENT_JOBS:
break break
...@@ -26,24 +25,47 @@ class SpawnJob(GeneratorRecurringEvent): ...@@ -26,24 +25,47 @@ class SpawnJob(GeneratorRecurringEvent):
yield datetime.timedelta(seconds=10) yield datetime.timedelta(seconds=10)
print("Spawning job with id: {}".format(self.job.job_id)) print("Spawning job with id: {}".format(self.job.job_id))
self.has_active_job = True cstate.job_api.update_job_state(self.job.job_id, JobState.SPAWNING)
ret = cstate.k8s.create_job(self.job) ret = cstate.k8s.create_job(self.job)
if ret is None: if ret is None:
raise Exception("Failed to spawn job") raise Exception("Failed to spawn job")
yield datetime.timedelta(seconds=5) # wait 5 seconds before checking if job is done # spawn a WatchJob event to watch the job finish/fail
cstate.event_queue.put(WatchJob(self.job.job_id, due_at=datetime.timedelta(seconds=5)))
class WatchJob(GeneratorRecurringEvent):
def __init__(self, job_id: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.job_id = job_id
def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]:
while True: while True:
job_info = cstate.k8s.get_job(self.job.job_id) job_info = cstate.k8s.get_job(self.job_id)
if job_info.status.succeeded is not None: if job_info.status.succeeded is not None:
print("Job finished successfully") print("Job finished successfully")
break break
if job_info.status.failed is not None: if job_info.status.failed is not None:
print("Job failed") print("Job failed")
break break
print("Job status:", job_info.status) # print("Job status:", job_info.status)
print("Job not finished yet, waiting 5 seconds...") print("Job not finished yet, waiting 5 seconds...")
yield datetime.timedelta(seconds=5) yield datetime.timedelta(seconds=5)
self.has_active_job = False # look at database to see if the job has marked itself as finished/failed
# job is done, process it job_in_db = cstate.job_api.get_job_by_id(self.job_id)
# TODO: implement processing if job_in_db is None:
raise Exception(f"Could not find job in db with id: {self.job_id}")
if job_in_db.job_state == JobState.DELETED or job_in_db.job_state == JobState.FINISHED_AND_PROCESSED or job_in_db.job_state == JobState.FAILED_AND_PROCESSED:
print(f"Job already processed in db, state: {job_in_db.job_state}")
return
expected_state = JobState.FINISHED if job_info.status.succeeded is not None else JobState.FAILED
if not job_in_db.job_state == expected_state and (job_in_db.job_state == JobState.FINISHED or job_in_db.job_state == JobState.FAILED):
print(f"Job not marked as expected in db, expected={expected_state} != db={job_in_db.job_state}")
print("Marking job as failed in db (this should not happen, make sure the job exits with a zero code iff. it finished)")
# the only state where this is expected is on startup when old k8s jobs are reconciled
expected_state = JobState.FAILED
if job_in_db.job_state != expected_state:
print(f"Updating job state in db to: {expected_state}, from: {job_in_db.job_state}")
cstate.job_api.update_job_state(self.job_id, expected_state)
...@@ -12,7 +12,10 @@ class EventResult(Enum): ...@@ -12,7 +12,10 @@ class EventResult(Enum):
class Event(): class Event():
def __init__(self, due_at: datetime.datetime = datetime.datetime.fromtimestamp(0)): def __init__(self, due_at: datetime.datetime | datetime.timedelta = datetime.datetime.fromtimestamp(0)):
if isinstance(due_at, datetime.timedelta):
self.due_at = datetime.datetime.now() + due_at
else:
self.due_at = due_at self.due_at = due_at
@abstractmethod @abstractmethod
...@@ -48,7 +51,7 @@ class RecurringEvent(Event): ...@@ -48,7 +51,7 @@ class RecurringEvent(Event):
class GeneratorRecurringEvent(RecurringEvent): class GeneratorRecurringEvent(RecurringEvent):
def __str__(self): def __str__(self):
return f"GeneratorRecurringEvent(action={self.action}, due_at={self.due_at})" return f"GeneratorRecurringEvent(generator={self.generator}, due_at={self.due_at})"
@abstractmethod @abstractmethod
def generator(self, cstate: "ControllerState") -> Generator[datetime.timedelta, None, None]: def generator(self, cstate: "ControllerState") -> Generator[datetime.timedelta, None, None]:
......
from enum import Enum from enum import Enum
from typing import Optional, List from typing import Optional, List
import copy
class JobState(Enum): class JobState(Enum):
...@@ -43,8 +44,14 @@ class JobApi(): ...@@ -43,8 +44,14 @@ class JobApi():
return ret return ret
def get_job_by_id(self, job_id: str) -> Optional[Job]: def get_job_by_id(self, job_id: str) -> Optional[Job]:
return self.db_state.get(job_id, None) return copy.deepcopy(self.db_state.get(job_id, None))
def create_job(self, job: Job): def create_job(self, job: Job):
self.job_queue.append(job) self.job_queue.append(job)
self.db_state[job.job_id] = job self.db_state[job.job_id] = copy.deepcopy(job)
def update_job_state(self, job_id: str, new_state: JobState):
job = self.db_state.get(job_id, None)
if job is None:
raise Exception(f"Could not find job with id: {job_id}")
job.job_state = new_state
from kubernetes import client, config from kubernetes import client, config
from kubernetes.client.models.v1_job import V1Job from kubernetes.client.models.v1_job import V1Job
from kubernetes.client.models.v1_job_status import V1JobStatus from kubernetes.client.models.v1_job_list import V1JobList
import os import os
from job_api import Job from job_api import Job
...@@ -28,7 +28,7 @@ class K8sApi(): ...@@ -28,7 +28,7 @@ class K8sApi():
print("https://github.com/kubernetes-client/python?tab=readme-ov-file#compatibility-matrix-of-supported-client-versions") print("https://github.com/kubernetes-client/python?tab=readme-ov-file#compatibility-matrix-of-supported-client-versions")
print("#############") print("#############")
def list_worker_jobs(self): def list_worker_jobs(self) -> V1JobList:
return self.batch_v1.list_namespaced_job(NAMESPACE, label_selector=f"app={WORKER_LABEL}") return self.batch_v1.list_namespaced_job(NAMESPACE, label_selector=f"app={WORKER_LABEL}")
def create_job(self, job: Job): def create_job(self, job: Job):
......
from actions.spawn_job import WatchJob
from event_queue import EventResult from event_queue import EventResult
from actions.find_ready_jobs import FindReadyJobs from actions.find_ready_jobs import FindReadyJobs
from job_api import Job from job_api import Job
...@@ -21,25 +22,39 @@ def main(): ...@@ -21,25 +22,39 @@ def main():
existing_worker_jobs = cstate.k8s.list_worker_jobs() existing_worker_jobs = cstate.k8s.list_worker_jobs()
if len(existing_worker_jobs.items) > 0: if len(existing_worker_jobs.items) > 0:
print(f"Found {len(existing_worker_jobs.items)} existing worker jobs!") print(f"Found {len(existing_worker_jobs.items)} existing worker jobs!")
marked_for_deletion = []
marked_for_watch = []
for job in existing_worker_jobs.items: for job in existing_worker_jobs.items:
print(f" - {job.metadata.name}") print(f" - {job.metadata.name}")
job_in_db = cstate.job_api.get_job_by_id(job.metadata.labels["job_id"])
if job_in_db is None:
print(f"Could not find job in db with id: {job.metadata.labels['job_id']}")
marked_for_deletion.append(job)
else:
print(f" - Job state in db: {job_in_db.job_state}")
# TODO: figure out if job already finished by looking at db data
marked_for_watch.append(job)
if len(marked_for_deletion) > 0:
if args.purge_existing_jobs: if args.purge_existing_jobs:
print("Deleting existing jobs...") print(f"Deleting {len(marked_for_deletion)} existing jobs...")
for job in existing_worker_jobs.items: for job in existing_worker_jobs.items:
print(cstate.k8s.delete_job_by_name(job.metadata.name)) cstate.k8s.delete_job_by_name(job.metadata.name)
print("Done deleting existing jobs") print("Done deleting existing jobs")
time.sleep(3) # wait a bit for k8s to delete the jobs time.sleep(3) # wait a bit for k8s to delete the jobs
else: else:
# fail # fail
print("Exiting because existing jobs were found") print("Exiting because existing jobs were found that could not be reconciled with db state")
print("You can delete them by running this script with the --purge_existing_jobs flag") print("You can delete them by running this script with the --purge_existing_jobs flag")
sys.exit(1) sys.exit(1)
return return
for watch in marked_for_watch:
cstate.event_queue.put(WatchJob(watch.metadata.labels["job_id"]))
print("Done checking for existing jobs") print("Done checking for existing jobs")
# make some dummy jobs # make some dummy jobs
for i in range(2): start_id = int(datetime.datetime.now().timestamp())
for i in range(start_id, start_id + 4):
cstate.job_api.create_job(Job("job{}".format(i), "quay.io/msrd0/render_video:latest", args=["--help"])) cstate.job_api.create_job(Job("job{}".format(i), "quay.io/msrd0/render_video:latest", args=["--help"]))
cstate.event_queue.put(FindReadyJobs()) cstate.event_queue.put(FindReadyJobs())
...@@ -49,13 +64,22 @@ def main(): ...@@ -49,13 +64,22 @@ def main():
while not evt.canExecute(): while not evt.canExecute():
# because the queue is sorted by due_at, we can wait until the this event is due # because the queue is sorted by due_at, we can wait until the this event is due
tts = (evt.due_at - datetime.datetime.now()).total_seconds() tts = (evt.due_at - datetime.datetime.now()).total_seconds()
print(f"Sleeping for {tts} seconds until next event is due: {evt}") print(f" >> Sleeping for {tts} seconds until next event is due: {evt}")
time.sleep(tts) time.sleep(tts)
try:
start = datetime.datetime.now()
ret = evt(cstate) ret = evt(cstate)
end = datetime.datetime.now()
if (end - start).total_seconds() > 0.5:
print(f"!! Event {evt} took {(end - start).total_seconds()}s to execute!")
if ret == EventResult.REQUEUE: if ret == EventResult.REQUEUE:
cstate.event_queue.put(evt) cstate.event_queue.put(evt)
elif not ret == EventResult.DONE and ret is not None: elif not ret == EventResult.DONE and ret is not None:
raise Exception(f"Unexpected return value from event: {ret}, {evt}") raise Exception(f"Unexpected return value from event: {ret}, {evt}")
except Exception as e:
print("###")
print(f"Error in event {evt}: {e}")
print("###")
if __name__ == "__main__": if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment