Skip to content
Snippets Groups Projects
Select Git revision
  • cb12c4ca65649a9ccd26fc53cc608eff0902bd43
  • master default protected
  • forbid-save-as
  • upload-via-token
  • moodle-integration
  • patch-double-tap-seek
  • patch_datum_anzeigen
  • patch_raum_anzeigen
  • intros
  • live_sources
  • bootstrap4
  • modules
12 results

jobs.py

Blame
  • Forked from Video AG Infrastruktur / website
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    spawn_job.py 3.41 KiB
    from collections.abc import Generator
    from event_queue import GeneratorRecurringEvent
    from job_api import JobData, JobState
    from job_controller import ControllerState
    import datetime
    
    from jobs.wrapped_job import WrappedJob
    
    MAX_CONCURRENT_JOBS = 5
    
    
    class SpawnJob(GeneratorRecurringEvent):
        def __init__(self, job: WrappedJob):
            super().__init__()
            self.job = job
    
        def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]:
            # wait for empty queue
            while True:
                total_num_jobs = 0
                for evt in cstate.event_queue.queue.queue:
                    if isinstance(evt, WatchJob):
                        total_num_jobs += 1
                if total_num_jobs < MAX_CONCURRENT_JOBS:
                    break
                print("Too many active jobs, waiting 10 seconds...")
                yield datetime.timedelta(seconds=10)
    
            print("Spawning job with id: {}".format(self.job.jobData.job_id))
            self.job.jobData.update_state(cstate, JobState.SPAWNING)
            ret = cstate.k8s.create_job(self.job)
            if ret is None:
                raise Exception("Failed to spawn job")
    
            # spawn a WatchJob event to watch the job finish/fail
            cstate.event_queue.put(WatchJob(self.job, due_at=datetime.timedelta(seconds=5)))
    
    
    class WatchJob(GeneratorRecurringEvent):
        def __init__(self, job: WrappedJob, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.job = job
    
        def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]:
            while True:
                job_info = cstate.k8s.get_job(self.job.jobData.job_id)
                if job_info.status.succeeded is not None:
                    print("Job finished successfully")
                    break
                if job_info.status.failed is not None:
                    print("Job failed")
                    break
                # print("Job status:", job_info.status)
                print("Job not finished yet, waiting 5 seconds...")
                yield datetime.timedelta(seconds=5)
    
            # look at database to see if the job has marked itself as finished/failed
            old_id = self.job.refresh_data(cstate)
            if self.job.jobData is None:
                raise Exception(f"Could not find job in db with id={old_id}")
            if self.job.job_state == JobState.DELETED or self.job.job_state == JobState.FINISHED_AND_PROCESSED or self.job.job_state == JobState.FAILED_AND_PROCESSED:
                print(f"Job already processed in db, state: {self.job.job_state}")
                return
            expected_state = JobState.FINISHED if job_info.status.succeeded is not None else JobState.FAILED
            if not self.job.job_state == expected_state and (self.job.job_state == JobState.FINISHED or self.job.job_state == JobState.FAILED):
                print(f"Job not marked as expected in db, expected={expected_state} != db={self.job.job_state}")
                print("Marking job as failed in db (this should not happen, make sure the job exits with a zero code iff. it finished)")
                # the only state where this is expected is on startup when old k8s jobs are reconciled
                expected_state = JobState.FAILED
    
            if self.job.job_state != expected_state:
                print(f"Updating job state in db to: {expected_state}, from: {self.job.job_state}")
                self.job.jobData.update_state(cstate, expected_state)
    
            cstate.event_queue.put(self.job)  # the jobwrapper class processes its' jobs result