Select Git revision
shib_client.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
main.py 4.94 KiB
from actions.spawn_job import WatchJob
from event_queue import EventResult
from actions.find_ready_jobs import FindReadyJobs
from job_database_api import JobData
from job_controller import ControllerState
import datetime
import time
import argparse
import sys
import signal
def main():
parser = argparse.ArgumentParser(description='Run the job controller')
parser.add_argument('--purge_existing_jobs', action='store_true', help='Delete existing jobs in k8s (dangerous!)')
parser.add_argument('--ignore_unreconcilable_jobs', action='store_true', help='Ignore unreconcilable jobs in k8s')
parser.add_argument('--incluster', action='store_true', help='Require running in k8s cluster')
args = parser.parse_args()
cstate = ControllerState()
if args.incluster and cstate.k8s.config_used != "incluster":
print("Incluster required, but not running in k8s cluster")
sys.exit(1)
return
# check existing jobs
print("Checking for existing jobs...")
existing_worker_jobs = cstate.k8s.list_worker_jobs()
if len(existing_worker_jobs.items) > 0:
print(f"Found {len(existing_worker_jobs.items)} existing worker jobs!")
marked_for_deletion = []
marked_for_watch = []
for job in existing_worker_jobs.items:
print(f" - {job.metadata.name}")
job_in_db = cstate.job_api.get_job_by_id(job.metadata.labels["job_id"])
if job_in_db is None:
print(f"Could not find job in db with id: {job.metadata.labels['job_id']}")
marked_for_deletion.append(job)
else:
print(f" - Job state in db: {job_in_db.job_state}")
# TODO: figure out if job already finished by looking at db data
marked_for_watch.append(job)
if len(marked_for_deletion) > 0:
if args.purge_existing_jobs:
print(f"Deleting {len(marked_for_deletion)} existing jobs...")
for job in existing_worker_jobs.items:
cstate.k8s.delete_job_by_name(job.metadata.name)
print("Done deleting existing jobs")
time.sleep(3) # wait a bit for k8s to delete the jobs
elif args.ignore_unreconcilable_jobs:
print(f"Ignoring {len(marked_for_deletion)} unreconcilable jobs")
else:
# fail
print("Exiting because existing jobs were found that could not be reconciled with db state")
print("You can delete them by running this script with the --purge_existing_jobs flag")
sys.exit(1)
return
for watch in marked_for_watch:
cstate.event_queue.put(WatchJob(watch.metadata.labels["job_id"]))
print("Done checking for existing jobs")
# TODO: check for existing jobs in spawning state in db that are not in k8s and requeue them (ready state will be picked up by FindReadyJobs)
# make some dummy jobs
start_id = int(datetime.datetime.now().timestamp())
for i in range(start_id, start_id + 4):
cstate.job_api.create_job(JobData("job{}".format(i), "dummy"))
cstate.event_queue.put(FindReadyJobs())
run_event_loop = True
def signal_handler(sig, frame):
nonlocal run_event_loop
if run_event_loop == False:
print("Force quitting event loop...")
sys.exit(2)
print("Stopping event loop...")
run_event_loop = False
signal.signal(signal.SIGINT, signal_handler)
while run_event_loop:
evt = cstate.event_queue.get()
while not evt.canExecute() and run_event_loop:
# because the queue is sorted by due_at, we can wait until the this event is due
tts = (evt.due_at - datetime.datetime.now()).total_seconds()
print(f" >> Sleeping for {tts} seconds until next event is due: {evt}")
while run_event_loop and tts > 0:
time.sleep(1)
tts = (evt.due_at - datetime.datetime.now()).total_seconds()
if not run_event_loop:
cstate.event_queue.put(evt) # put back before quitting
break
try:
start = datetime.datetime.now()
ret = evt(cstate)
end = datetime.datetime.now()
if (end - start).total_seconds() > 0.5:
print(f"!! Event {evt} took {(end - start).total_seconds()}s to execute!")
if ret == EventResult.REQUEUE:
cstate.event_queue.put(evt)
elif not ret == EventResult.DONE and ret is not None:
raise Exception(f"Unexpected return value from event: {ret}, {evt}")
except Exception as e:
print("###")
print(f"Error in event {evt}: {e}")
print("###")
print("Event loop stopped")
# print all remaining events
print("Remaining events in queue:")
while not cstate.event_queue.empty():
print(cstate.event_queue.get())
sys.exit(0)
if __name__ == "__main__":
main()