Skip to content
Snippets Groups Projects
Commit 98203e12 authored by Dorian Koch's avatar Dorian Koch
Browse files

add signal handler

parent 8d359802
No related branches found
No related tags found
No related merge requests found
...@@ -74,7 +74,10 @@ class EventQueue(): ...@@ -74,7 +74,10 @@ class EventQueue():
def put(self, event: Event): def put(self, event: Event):
self.queue.put((event.due_at, self.counter, event)) self.queue.put((event.due_at, self.counter, event))
self.counter += 1 # this enforces stability in the queue self.counter += 1 # this enforces FIFO in the queue (after comparing due_at)
def get(self): def get(self):
return self.queue.get()[2] return self.queue.get()[2]
def empty(self):
return self.queue.empty()
...@@ -15,9 +15,11 @@ class K8sApi(): ...@@ -15,9 +15,11 @@ class K8sApi():
if os.path.isfile(config.incluster_config.SERVICE_TOKEN_FILENAME): if os.path.isfile(config.incluster_config.SERVICE_TOKEN_FILENAME):
print("Using incluster config") print("Using incluster config")
config.load_incluster_config() config.load_incluster_config()
self.config_used = "incluster"
else: else:
print("Using local config") print("Using local config")
config.load_kube_config() config.load_kube_config()
self.config_used = "local"
self.api = client.ApiClient() self.api = client.ApiClient()
self.v1 = client.CoreV1Api(self.api) self.v1 = client.CoreV1Api(self.api)
self.batch_v1 = client.BatchV1Api(self.api) self.batch_v1 = client.BatchV1Api(self.api)
......
...@@ -8,14 +8,20 @@ import datetime ...@@ -8,14 +8,20 @@ import datetime
import time import time
import argparse import argparse
import sys import sys
import signal
def main(): def main():
parser = argparse.ArgumentParser(description='Run the job controller') parser = argparse.ArgumentParser(description='Run the job controller')
parser.add_argument('--purge_existing_jobs', action='store_true', help='Delete existing jobs in k8s (dangerous!)') parser.add_argument('--purge_existing_jobs', action='store_true', help='Delete existing jobs in k8s (dangerous!)')
parser.add_argument('--incluster', action='store_true', help='Require running in k8s cluster')
args = parser.parse_args() args = parser.parse_args()
cstate = ControllerState() cstate = ControllerState()
if args.incluster and cstate.k8s.config_used != "incluster":
print("Incluster required, but not running in k8s cluster")
sys.exit(1)
return
# check existing jobs # check existing jobs
print("Checking for existing jobs...") print("Checking for existing jobs...")
...@@ -51,6 +57,7 @@ def main(): ...@@ -51,6 +57,7 @@ def main():
cstate.event_queue.put(WatchJob(watch.metadata.labels["job_id"])) cstate.event_queue.put(WatchJob(watch.metadata.labels["job_id"]))
print("Done checking for existing jobs") print("Done checking for existing jobs")
# TODO: check for existing jobs in spawning state in db that are not in k8s and requeue them (ready state will be picked up by FindReadyJobs)
# make some dummy jobs # make some dummy jobs
start_id = int(datetime.datetime.now().timestamp()) start_id = int(datetime.datetime.now().timestamp())
...@@ -59,13 +66,26 @@ def main(): ...@@ -59,13 +66,26 @@ def main():
cstate.event_queue.put(FindReadyJobs()) cstate.event_queue.put(FindReadyJobs())
while True: run_event_loop = True
def signal_handler(sig, frame):
nonlocal run_event_loop
print("Stopping event loop...")
run_event_loop = False
signal.signal(signal.SIGINT, signal_handler)
while run_event_loop:
evt = cstate.event_queue.get() evt = cstate.event_queue.get()
while not evt.canExecute(): while not evt.canExecute() and run_event_loop:
# because the queue is sorted by due_at, we can wait until the this event is due # because the queue is sorted by due_at, we can wait until the this event is due
tts = (evt.due_at - datetime.datetime.now()).total_seconds() tts = (evt.due_at - datetime.datetime.now()).total_seconds()
print(f" >> Sleeping for {tts} seconds until next event is due: {evt}") print(f" >> Sleeping for {tts} seconds until next event is due: {evt}")
time.sleep(tts) while run_event_loop and tts > 0:
time.sleep(1)
tts = (evt.due_at - datetime.datetime.now()).total_seconds()
if not run_event_loop:
cstate.event_queue.put(evt) # put back before quitting
break
try: try:
start = datetime.datetime.now() start = datetime.datetime.now()
ret = evt(cstate) ret = evt(cstate)
...@@ -80,6 +100,12 @@ def main(): ...@@ -80,6 +100,12 @@ def main():
print("###") print("###")
print(f"Error in event {evt}: {e}") print(f"Error in event {evt}: {e}")
print("###") print("###")
print("Event loop stopped")
# print all remaining events
print("Remaining events in queue:")
while not cstate.event_queue.empty():
print(cstate.event_queue.get())
sys.exit(0)
if __name__ == "__main__": if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment