Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
spawn_job.py 3.41 KiB
from collections.abc import Generator
from event_queue import GeneratorRecurringEvent
from job_api import JobData, JobState
from job_controller import ControllerState
import datetime
from jobs.wrapped_job import WrappedJob
MAX_CONCURRENT_JOBS = 5
class SpawnJob(GeneratorRecurringEvent):
def __init__(self, job: WrappedJob):
super().__init__()
self.job = job
def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]:
# wait for empty queue
while True:
total_num_jobs = 0
for evt in cstate.event_queue.queue.queue:
if isinstance(evt, WatchJob):
total_num_jobs += 1
if total_num_jobs < MAX_CONCURRENT_JOBS:
break
print("Too many active jobs, waiting 10 seconds...")
yield datetime.timedelta(seconds=10)
print("Spawning job with id: {}".format(self.job.jobData.job_id))
self.job.jobData.update_state(cstate, JobState.SPAWNING)
ret = cstate.k8s.create_job(self.job)
if ret is None:
raise Exception("Failed to spawn job")
# spawn a WatchJob event to watch the job finish/fail
cstate.event_queue.put(WatchJob(self.job, due_at=datetime.timedelta(seconds=5)))
class WatchJob(GeneratorRecurringEvent):
def __init__(self, job: WrappedJob, *args, **kwargs):
super().__init__(*args, **kwargs)
self.job = job
def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]:
while True:
job_info = cstate.k8s.get_job(self.job.jobData.job_id)
if job_info.status.succeeded is not None:
print("Job finished successfully")
break
if job_info.status.failed is not None:
print("Job failed")
break
# print("Job status:", job_info.status)
print("Job not finished yet, waiting 5 seconds...")
yield datetime.timedelta(seconds=5)
# look at database to see if the job has marked itself as finished/failed
old_id = self.job.refresh_data(cstate)
if self.job.jobData is None:
raise Exception(f"Could not find job in db with id={old_id}")
if self.job.job_state == JobState.DELETED or self.job.job_state == JobState.FINISHED_AND_PROCESSED or self.job.job_state == JobState.FAILED_AND_PROCESSED:
print(f"Job already processed in db, state: {self.job.job_state}")
return
expected_state = JobState.FINISHED if job_info.status.succeeded is not None else JobState.FAILED
if not self.job.job_state == expected_state and (self.job.job_state == JobState.FINISHED or self.job.job_state == JobState.FAILED):
print(f"Job not marked as expected in db, expected={expected_state} != db={self.job.job_state}")
print("Marking job as failed in db (this should not happen, make sure the job exits with a zero code iff. it finished)")
# the only state where this is expected is on startup when old k8s jobs are reconciled
expected_state = JobState.FAILED
if self.job.job_state != expected_state:
print(f"Updating job state in db to: {expected_state}, from: {self.job.job_state}")
self.job.jobData.update_state(cstate, expected_state)
cstate.event_queue.put(self.job) # the jobwrapper class processes its' jobs result