Skip to content
Snippets Groups Projects
Select Git revision
  • 0c36fe63db62956c44a9809416e76daf9fa42287
  • master default protected
2 results

shib_client.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    main.py 4.94 KiB
    from actions.spawn_job import WatchJob
    from event_queue import EventResult
    from actions.find_ready_jobs import FindReadyJobs
    from job_database_api import JobData
    from job_controller import ControllerState
    
    import datetime
    import time
    import argparse
    import sys
    import signal
    
    
    def main():
        parser = argparse.ArgumentParser(description='Run the job controller')
        parser.add_argument('--purge_existing_jobs', action='store_true', help='Delete existing jobs in k8s (dangerous!)')
        parser.add_argument('--ignore_unreconcilable_jobs', action='store_true', help='Ignore unreconcilable jobs in k8s')
        parser.add_argument('--incluster', action='store_true', help='Require running in k8s cluster')
        args = parser.parse_args()
    
        cstate = ControllerState()
        if args.incluster and cstate.k8s.config_used != "incluster":
            print("Incluster required, but not running in k8s cluster")
            sys.exit(1)
            return
    
        # check existing jobs
        print("Checking for existing jobs...")
        existing_worker_jobs = cstate.k8s.list_worker_jobs()
        if len(existing_worker_jobs.items) > 0:
            print(f"Found {len(existing_worker_jobs.items)} existing worker jobs!")
            marked_for_deletion = []
            marked_for_watch = []
            for job in existing_worker_jobs.items:
                print(f" - {job.metadata.name}")
                job_in_db = cstate.job_api.get_job_by_id(job.metadata.labels["job_id"])
                if job_in_db is None:
                    print(f"Could not find job in db with id: {job.metadata.labels['job_id']}")
                    marked_for_deletion.append(job)
                else:
                    print(f"   - Job state in db: {job_in_db.job_state}")
                    # TODO: figure out if job already finished by looking at db data
                    marked_for_watch.append(job)
            if len(marked_for_deletion) > 0:
                if args.purge_existing_jobs:
                    print(f"Deleting {len(marked_for_deletion)} existing jobs...")
                    for job in existing_worker_jobs.items:
                        cstate.k8s.delete_job_by_name(job.metadata.name)
                    print("Done deleting existing jobs")
                    time.sleep(3)  # wait a bit for k8s to delete the jobs
                elif args.ignore_unreconcilable_jobs:
                    print(f"Ignoring {len(marked_for_deletion)} unreconcilable jobs")
                else:
                    # fail
                    print("Exiting because existing jobs were found that could not be reconciled with db state")
                    print("You can delete them by running this script with the --purge_existing_jobs flag")
                    sys.exit(1)
                    return
            for watch in marked_for_watch:
                cstate.event_queue.put(WatchJob(watch.metadata.labels["job_id"]))
    
        print("Done checking for existing jobs")
        # TODO: check for existing jobs in spawning state in db that are not in k8s and requeue them (ready state will be picked up by FindReadyJobs)
    
        # make some dummy jobs
        start_id = int(datetime.datetime.now().timestamp())
        for i in range(start_id, start_id + 4):
            cstate.job_api.create_job(JobData("job{}".format(i), "dummy"))
    
        cstate.event_queue.put(FindReadyJobs())
    
        run_event_loop = True
    
        def signal_handler(sig, frame):
            nonlocal run_event_loop
            if run_event_loop == False:
                print("Force quitting event loop...")
                sys.exit(2)
            print("Stopping event loop...")
            run_event_loop = False
        signal.signal(signal.SIGINT, signal_handler)
    
        while run_event_loop:
            evt = cstate.event_queue.get()
            while not evt.canExecute() and run_event_loop:
                # because the queue is sorted by due_at, we can wait until the this event is due
                tts = (evt.due_at - datetime.datetime.now()).total_seconds()
                print(f" >> Sleeping for {tts} seconds until next event is due: {evt}")
                while run_event_loop and tts > 0:
                    time.sleep(1)
                    tts = (evt.due_at - datetime.datetime.now()).total_seconds()
            if not run_event_loop:
                cstate.event_queue.put(evt)  # put back before quitting
                break
            try:
                start = datetime.datetime.now()
                ret = evt(cstate)
                end = datetime.datetime.now()
                if (end - start).total_seconds() > 0.5:
                    print(f"!! Event {evt} took {(end - start).total_seconds()}s to execute!")
                if ret == EventResult.REQUEUE:
                    cstate.event_queue.put(evt)
                elif not ret == EventResult.DONE and ret is not None:
                    raise Exception(f"Unexpected return value from event: {ret}, {evt}")
            except Exception as e:
                print("###")
                print(f"Error in event {evt}: {e}")
                print("###")
        print("Event loop stopped")
        # print all remaining events
        print("Remaining events in queue:")
        while not cstate.event_queue.empty():
            print(cstate.event_queue.get())
        sys.exit(0)
    
    
    if __name__ == "__main__":
        main()