Skip to content
Snippets Groups Projects
Verified Commit 7b912ff5 authored by Dorian Koch's avatar Dorian Koch
Browse files

Add proper logging

parent 53eae502
No related branches found
No related tags found
No related merge requests found
...@@ -8,13 +8,13 @@ from job_controller.job_controller import ControllerState ...@@ -8,13 +8,13 @@ from job_controller.job_controller import ControllerState
class FindReadyJobs(RecurringEvent): class FindReadyJobs(RecurringEvent):
def action(self, cstate: ControllerState) -> Optional[datetime.timedelta]: def action(self, cstate: ControllerState) -> Optional[datetime.timedelta]:
if cstate.event_queue.queue.qsize() > 20: if cstate.event_queue.queue.qsize() > 20:
print("Event queue is getting full, not spawning more jobs") self.logger.info("Event queue is getting full, not spawning more jobs")
return datetime.timedelta(seconds=20) return datetime.timedelta(seconds=20)
# get next jobs to spawn # get next jobs to spawn
jobs = cstate.job_api.get_next_jobs_and_set_spawning(5) jobs = cstate.job_api.get_next_jobs_and_set_spawning(5)
if len(jobs) > 0: if len(jobs) > 0:
print("Found {} jobs to spawn".format(len(jobs))) self.logger.info("Found {} jobs to spawn".format(len(jobs)))
for job in jobs: for job in jobs:
wrapped = cstate.wrap_job(job) wrapped = cstate.wrap_job(job)
cstate.event_queue.put(SpawnJob(wrapped)) cstate.event_queue.put(SpawnJob(wrapped))
......
...@@ -25,16 +25,16 @@ class RunSorter(GeneratorRecurringEvent): ...@@ -25,16 +25,16 @@ class RunSorter(GeneratorRecurringEvent):
cstate.event_queue.put(spawnJobEvt) cstate.event_queue.put(spawnJobEvt)
while not spawnJobEvt.done: # wait for spawn job to be done while not spawnJobEvt.done: # wait for spawn job to be done
yield spawnJobEvt.due_at # because the event queue is stable, we will be queued after the spawnjob event executes yield spawnJobEvt.due_at # because the event queue is stable, we will be queued after the spawnjob event executes
print("Sorter job spawned") self.logger.info("Sorter job spawned")
watchJobEvt = spawnJobEvt.watch_job_evt watchJobEvt = spawnJobEvt.watch_job_evt
if watchJobEvt is None: if watchJobEvt is None:
print("Spawning sorter job failed!") self.logger.error("Spawning sorter job failed!")
else: else:
while not watchJobEvt.done: while not watchJobEvt.done:
yield watchJobEvt.due_at yield watchJobEvt.due_at
# spawn job is done # spawn job is done
print("Sorter job done") self.logger.info("Sorter job done")
except Exception as e: except Exception as e:
print(f"Error in RunSorter: {e}") self.logger.error(f"Error in RunSorter: {e}")
yield self.interval yield self.interval
...@@ -23,13 +23,13 @@ class SpawnJob(GeneratorRecurringEvent): ...@@ -23,13 +23,13 @@ class SpawnJob(GeneratorRecurringEvent):
total_num_jobs += 1 total_num_jobs += 1
if total_num_jobs < MAX_CONCURRENT_JOBS: if total_num_jobs < MAX_CONCURRENT_JOBS:
break break
print("Too many active jobs, waiting 10 seconds...") self.logger.info("Too many active jobs, waiting 10 seconds...")
yield datetime.timedelta(seconds=10) yield datetime.timedelta(seconds=10)
print("Preparing to spawn job with id: {}".format(self.job.jobData.job_id)) self.logger.info("Preparing to spawn job with id: {}".format(self.job.jobData.job_id))
self.job.jobData.update_state(cstate, JobState.SPAWNING) self.job.jobData.update_state(cstate, JobState.SPAWNING)
self.job.prepare(cstate) self.job.prepare(cstate)
print("Job prepared, spawning...") self.logger.info("Job prepared, spawning...")
ret = cstate.k8s.create_job(self.job) ret = cstate.k8s.create_job(self.job)
if ret is None: if ret is None:
raise Exception("Failed to spawn job") raise Exception("Failed to spawn job")
...@@ -48,13 +48,13 @@ class WatchJob(GeneratorRecurringEvent): ...@@ -48,13 +48,13 @@ class WatchJob(GeneratorRecurringEvent):
while True: while True:
job_info = cstate.k8s.get_job(self.job.jobData.job_id) job_info = cstate.k8s.get_job(self.job.jobData.job_id)
if job_info.status.succeeded is not None: if job_info.status.succeeded is not None:
print("Job finished successfully") self.logger.info("Job finished successfully")
break break
if job_info.status.failed is not None: if job_info.status.failed is not None:
print("Job failed") self.logger.error("Job failed")
break break
# print("Job status:", job_info.status) # print("Job status:", job_info.status)
print("Job not finished yet, waiting 5 seconds...") self.logger.info("Job not finished yet, waiting 5 seconds...")
yield datetime.timedelta(seconds=5) yield datetime.timedelta(seconds=5)
# look at database to see if the job has marked itself as finished/failed # look at database to see if the job has marked itself as finished/failed
...@@ -62,17 +62,17 @@ class WatchJob(GeneratorRecurringEvent): ...@@ -62,17 +62,17 @@ class WatchJob(GeneratorRecurringEvent):
if self.job.jobData is None: if self.job.jobData is None:
raise Exception(f"Could not find job in db with id={old_id}") raise Exception(f"Could not find job in db with id={old_id}")
if self.job.job_state == JobState.CANCELED or self.job.job_state == JobState.FINISHED_AND_PROCESSED or self.job.job_state == JobState.FAILED_AND_PROCESSED: if self.job.job_state == JobState.CANCELED or self.job.job_state == JobState.FINISHED_AND_PROCESSED or self.job.job_state == JobState.FAILED_AND_PROCESSED:
print(f"Job already processed in db, state: {self.job.job_state}, not processing again") self.logger.info(f"Job already processed in db, state: {self.job.job_state}, not processing again")
return return
expected_state = JobState.FINISHED if job_info.status.succeeded is not None else JobState.FAILED expected_state = JobState.FINISHED if job_info.status.succeeded is not None else JobState.FAILED
if not self.job.job_state == expected_state and (self.job.job_state == JobState.FINISHED or self.job.job_state == JobState.FAILED): if not self.job.job_state == expected_state and (self.job.job_state == JobState.FINISHED or self.job.job_state == JobState.FAILED):
print(f"Job not marked as expected in db, expected={expected_state} != db={self.job.job_state}") self.logger.warning(f"Job not marked as expected in db, expected={expected_state} != db={self.job.job_state}")
print("Marking job as failed in db (this should not happen, make sure the job exits with a zero code iff. it finished)") self.logger.warning("Marking job as failed in db (this should not happen, make sure the job exits with a zero code iff. it finished)")
# the only state where this is expected is on startup when old k8s jobs are reconciled # the only state where this is expected is on startup when old k8s jobs are reconciled
expected_state = JobState.FAILED expected_state = JobState.FAILED
if self.job.job_state != expected_state: if self.job.job_state != expected_state:
print(f"Updating job state in db to: {expected_state}, from: {self.job.job_state}") self.logger.info(f"Updating job state in db to: {expected_state}, from: {self.job.job_state}")
self.job.jobData.update_state(cstate, expected_state) self.job.jobData.update_state(cstate, expected_state)
cstate.event_queue.put(self.job) # the jobwrapper class processes its' jobs result cstate.event_queue.put(self.job) # the jobwrapper class processes its' jobs result
import logging
class CustomFormatter(logging.Formatter):
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
format_str = "%(asctime)s - %(name)-12s [%(levelname)-8s]: %(message)s (%(filename)s:%(lineno)s)"
FORMATS = {
logging.DEBUG: grey + format_str + reset,
logging.INFO: grey + format_str + reset,
logging.WARNING: yellow + format_str + reset,
logging.ERROR: red + format_str + reset,
logging.CRITICAL: bold_red + format_str + reset
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
...@@ -3,6 +3,7 @@ import queue ...@@ -3,6 +3,7 @@ import queue
import datetime import datetime
from enum import Enum from enum import Enum
from collections.abc import Generator from collections.abc import Generator
import logging
class EventResult(Enum): class EventResult(Enum):
...@@ -18,11 +19,17 @@ class Event(metaclass=ABCMeta): ...@@ -18,11 +19,17 @@ class Event(metaclass=ABCMeta):
else: else:
self.due_at = due_at self.due_at = due_at
self.done = False self.done = False
self.event_queue_id = -1
@abstractmethod @abstractmethod
def action(self, cstate: "ControllerState"): def action(self, cstate: "ControllerState"):
pass pass
@property
def logger(self):
# name contains class name and id
return logging.getLogger(f"{self.__class__.__name__}[{self.event_queue_id}]")
def __str__(self): def __str__(self):
return f"Event(action={self.action}, due_at={self.due_at})" return f"Event(action={self.action}, due_at={self.due_at})"
...@@ -93,6 +100,7 @@ class EventQueue(): ...@@ -93,6 +100,7 @@ class EventQueue():
self.counter = 0 self.counter = 0
def put(self, event: Event): def put(self, event: Event):
event.event_queue_id = self.counter
self.queue.put((event.due_at, self.counter, event)) self.queue.put((event.due_at, self.counter, event))
self.counter += 1 # this enforces FIFO in the queue (after comparing due_at) self.counter += 1 # this enforces FIFO in the queue (after comparing due_at)
......
...@@ -60,7 +60,7 @@ class CutAndTranscode(WrappedJob): ...@@ -60,7 +60,7 @@ class CutAndTranscode(WrappedJob):
f.write(self.make_project_toml()) f.write(self.make_project_toml())
def success(self, cstate): def success(self, cstate):
print("Transcode success!") self.logger.info("Transcode success!")
def failure(self, cstate): def failure(self, cstate):
print("Transcode fail :(") self.logger.info("Transcode fail :(")
...@@ -12,10 +12,10 @@ class DummyJob(WrappedJob): ...@@ -12,10 +12,10 @@ class DummyJob(WrappedJob):
return ContainerConfig(image="busybox", args=["sleep", "60"]) return ContainerConfig(image="busybox", args=["sleep", "60"])
def prepare(self, cstate: "ControllerState"): def prepare(self, cstate: "ControllerState"):
print("Preparing dummy job") self.logger.info("Preparing dummy job")
def success(self, cstate): def success(self, cstate):
print("Dummy job succeeded") self.logger.info("Dummy job succeeded")
def failure(self, cstate): def failure(self, cstate):
print("Dummy job failed") self.logger.info("Dummy job failed")
...@@ -54,7 +54,7 @@ class Thumbnail(WrappedJob): ...@@ -54,7 +54,7 @@ class Thumbnail(WrappedJob):
os.remove(os.path.join(self.in_dir, "thumb.tmp.jpg")) os.remove(os.path.join(self.in_dir, "thumb.tmp.jpg"))
def success(self, cstate): def success(self, cstate):
print("Thumbnail success!") self.logger.info("Thumbnail success!")
def failure(self, cstate): def failure(self, cstate):
print("Thumbnail fail :(") self.logger.info("Thumbnail fail :(")
from abc import abstractmethod from abc import abstractmethod
from job_controller.event_queue import Event, EventResult from job_controller.event_queue import Event, EventResult
from job_controller.job_database_api import JobData, JobState from job_controller.job_database_api import JobData, JobState
import logging
class ContainerConfig: class ContainerConfig:
...@@ -65,6 +66,10 @@ class WrappedJob(Event): ...@@ -65,6 +66,10 @@ class WrappedJob(Event):
raise Exception(f"Job is in unexpected state: {self.job_state}") raise Exception(f"Job is in unexpected state: {self.job_state}")
return EventResult.DONE return EventResult.DONE
@property
def logger(self):
return logging.getLogger(f"{self.__class__.__name__}[{self.jobData.job_id}]")
@property @property
def job_id(self): def job_id(self):
return self.jobData.job_id return self.jobData.job_id
......
...@@ -4,53 +4,57 @@ from job_controller.actions.find_ready_jobs import FindReadyJobs ...@@ -4,53 +4,57 @@ from job_controller.actions.find_ready_jobs import FindReadyJobs
from job_controller.job_database_api import JobState from job_controller.job_database_api import JobState
from job_controller.job_controller import ControllerState from job_controller.job_controller import ControllerState
from job_controller.actions.run_sorter import RunSorter from job_controller.actions.run_sorter import RunSorter
from job_controller.custom_formatter import CustomFormatter
import datetime import datetime
import time import time
import argparse import argparse
import sys import sys
import signal import signal
import logging
def reconcile_existing_k8s_jobs(cstate: ControllerState, args, existing_worker_jobs): def reconcile_existing_k8s_jobs(cstate: ControllerState, args, existing_worker_jobs):
print(f"Found {len(existing_worker_jobs.items)} existing worker jobs!") logger = logging.getLogger("main_rec1")
logger.info(f"Found {len(existing_worker_jobs.items)} existing worker jobs!")
marked_for_deletion = [] marked_for_deletion = []
marked_for_watch = [] marked_for_watch = []
for job in existing_worker_jobs.items: for job in existing_worker_jobs.items:
print(f" - {job.metadata.name}") logger.debug(f" - {job.metadata.name}")
job_in_db = cstate.job_api.get_job_by_id(job.metadata.labels["job_id"]) job_in_db = cstate.job_api.get_job_by_id(job.metadata.labels["job_id"])
if job_in_db is None: if job_in_db is None:
print(f"Could not find job in db with id: {job.metadata.labels['job_id']}") logger.warning(f"Could not find job in db with id: {job.metadata.labels['job_id']}")
marked_for_deletion.append(job) marked_for_deletion.append(job)
else: else:
print(f" - Job state in db: {job_in_db.job_state}") logger.info(f" - Job state in db: {job_in_db.job_state}")
# TODO: figure out if job already finished by looking at db data # TODO: figure out if job already finished by looking at db data
marked_for_watch.append(job) marked_for_watch.append(job)
if len(marked_for_deletion) > 0: if len(marked_for_deletion) > 0:
if args.purge_existing_jobs: if args.purge_existing_jobs:
print(f"Deleting {len(marked_for_deletion)} existing jobs...") logger.info(f"Deleting {len(marked_for_deletion)} existing jobs...")
for job in existing_worker_jobs.items: for job in existing_worker_jobs.items:
cstate.k8s.delete_job_by_name(job.metadata.name) cstate.k8s.delete_job_by_name(job.metadata.name)
print("Done deleting existing jobs") logger.info("Done deleting existing jobs")
time.sleep(3) # wait a bit for k8s to delete the jobs time.sleep(3) # wait a bit for k8s to delete the jobs
elif args.ignore_unreconcilable_jobs: elif args.ignore_unreconcilable_jobs:
print(f"Ignoring {len(marked_for_deletion)} unreconcilable jobs") logger.info(f"Ignoring {len(marked_for_deletion)} unreconcilable jobs")
else: else:
# fail # fail
print("Exiting because existing jobs were found that could not be reconciled with db state") logger.warning("Exiting because existing jobs were found that could not be reconciled with db state")
print("You can delete them by running this script with the --purge_existing_jobs flag") logger.warning("You can delete them by running this script with the --purge_existing_jobs flag")
sys.exit(1) sys.exit(1)
return return
for watch in marked_for_watch: for watch in marked_for_watch:
jobData = cstate.job_api.get_job_by_id(watch.metadata.labels["job_id"]) jobData = cstate.job_api.get_job_by_id(watch.metadata.labels["job_id"])
if jobData is None: if jobData is None:
print(f"Could not find job in db with id: {watch.metadata.labels['job_id']}") logger.info(f"Could not find job in db with id: {watch.metadata.labels['job_id']}")
continue continue
wrappedJob = cstate.wrap_job(jobData) wrappedJob = cstate.wrap_job(jobData)
cstate.event_queue.put(WatchJob(wrappedJob)) cstate.event_queue.put(WatchJob(wrappedJob))
def reconcile_unspawned_database_jobs(cstate: ControllerState, existing_worker_jobs): def reconcile_unspawned_database_jobs(cstate: ControllerState, existing_worker_jobs):
logger = logging.getLogger("main_rec2")
spawning_jobs = cstate.job_api.get_all_spawning_jobs() spawning_jobs = cstate.job_api.get_all_spawning_jobs()
num_resetted = 0 num_resetted = 0
for job in spawning_jobs: for job in spawning_jobs:
...@@ -65,7 +69,7 @@ def reconcile_unspawned_database_jobs(cstate: ControllerState, existing_worker_j ...@@ -65,7 +69,7 @@ def reconcile_unspawned_database_jobs(cstate: ControllerState, existing_worker_j
job.update_state(cstate, JobState.READY) job.update_state(cstate, JobState.READY)
num_resetted += 1 num_resetted += 1
if num_resetted > 0: if num_resetted > 0:
print(f"Resetted {num_resetted} spawning jobs to ready state") logger.info(f"Resetted {num_resetted} spawning jobs to ready state")
def initial_jobs(cstate: ControllerState): def initial_jobs(cstate: ControllerState):
...@@ -74,6 +78,10 @@ def initial_jobs(cstate: ControllerState): ...@@ -74,6 +78,10 @@ def initial_jobs(cstate: ControllerState):
def main(): def main():
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(CustomFormatter())
logging.basicConfig(level=logging.DEBUG, handlers=[stream_handler])
logger = logging.getLogger("main")
parser = argparse.ArgumentParser(description='Run the job controller') parser = argparse.ArgumentParser(description='Run the job controller')
parser.add_argument('--purge_existing_jobs', action='store_true', help='Delete existing jobs in k8s (dangerous!)') parser.add_argument('--purge_existing_jobs', action='store_true', help='Delete existing jobs in k8s (dangerous!)')
parser.add_argument('--ignore_unreconcilable_jobs', action='store_true', help='Ignore unreconcilable jobs in k8s') parser.add_argument('--ignore_unreconcilable_jobs', action='store_true', help='Ignore unreconcilable jobs in k8s')
...@@ -82,20 +90,20 @@ def main(): ...@@ -82,20 +90,20 @@ def main():
cstate = ControllerState() cstate = ControllerState()
if args.incluster and cstate.k8s.config_used != "incluster": if args.incluster and cstate.k8s.config_used != "incluster":
print("Incluster required by commandline flag, but not running in k8s cluster") logger.error("Incluster required by commandline flag, but not running in k8s cluster")
sys.exit(1) sys.exit(1)
return return
# check existing jobs # check existing jobs
print("Checking for existing jobs...") logger.info("Checking for existing jobs...")
existing_worker_jobs = cstate.k8s.list_worker_jobs() existing_worker_jobs = cstate.k8s.list_worker_jobs()
if len(existing_worker_jobs.items) > 0: if len(existing_worker_jobs.items) > 0:
reconcile_existing_k8s_jobs(cstate, args, existing_worker_jobs) reconcile_existing_k8s_jobs(cstate, args, existing_worker_jobs)
print("Done checking for existing jobs") logger.info("Done checking for existing jobs")
# find spawning jobs that are not in k8s # find spawning jobs that are not in k8s
reconcile_unspawned_database_jobs(cstate, existing_worker_jobs) reconcile_unspawned_database_jobs(cstate, existing_worker_jobs)
print("Reconcilation done") logger.info("Reconcilation done")
initial_jobs(cstate) initial_jobs(cstate)
...@@ -104,9 +112,9 @@ def main(): ...@@ -104,9 +112,9 @@ def main():
def signal_handler(sig, frame): def signal_handler(sig, frame):
nonlocal run_event_loop nonlocal run_event_loop
if run_event_loop == False: if run_event_loop == False:
print("Force quitting event loop...") logger.warning("Force quitting event loop...")
sys.exit(2) sys.exit(2)
print("Stopping event loop...") logger.warning("Stopping event loop...")
run_event_loop = False run_event_loop = False
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
...@@ -115,7 +123,7 @@ def main(): ...@@ -115,7 +123,7 @@ def main():
while not evt.canExecute() and run_event_loop: while not evt.canExecute() and run_event_loop:
# because the queue is sorted by due_at, we can wait until the this event is due # because the queue is sorted by due_at, we can wait until the this event is due
tts = (evt.due_at - datetime.datetime.now()).total_seconds() tts = (evt.due_at - datetime.datetime.now()).total_seconds()
print(f" >> Sleeping for {tts} seconds until next event is due: {evt}") logger.debug(f" >> Sleeping for {tts} seconds until next event is due: {evt}")
while run_event_loop and tts > 0: while run_event_loop and tts > 0:
time.sleep(1) time.sleep(1)
tts = (evt.due_at - datetime.datetime.now()).total_seconds() tts = (evt.due_at - datetime.datetime.now()).total_seconds()
...@@ -127,20 +135,20 @@ def main(): ...@@ -127,20 +135,20 @@ def main():
ret = evt(cstate) ret = evt(cstate)
end = datetime.datetime.now() end = datetime.datetime.now()
if (end - start).total_seconds() > 0.5: if (end - start).total_seconds() > 0.5:
print(f"!! Event {evt} took {(end - start).total_seconds()}s to execute!") logger.warning(f"!! Event {evt} took {(end - start).total_seconds()}s to execute!")
if ret == EventResult.REQUEUE: if ret == EventResult.REQUEUE:
cstate.event_queue.put(evt) cstate.event_queue.put(evt)
elif not ret == EventResult.DONE and ret is not None: elif not ret == EventResult.DONE and ret is not None:
raise Exception(f"Unexpected return value from event: {ret}, {evt}") raise Exception(f"Unexpected return value from event: {ret}, {evt}")
except Exception as e: except Exception as e:
print("###") logger.error("###")
print(f"Error in event {evt}: {e}") logger.error(f"Error in event {evt}: {e}")
print("###") logger.error("###")
print("Event loop stopped") logger.info("Event loop stopped")
# set all jobs that were supposed to be spawned back to ready # set all jobs that were supposed to be spawned back to ready
print("Remaining events in queue:") logger.info("Remaining events in queue:")
for evt in cstate.event_queue.queue.queue: for evt in cstate.event_queue.queue.queue:
print(f" - {evt}") logger.info(f" - {evt}")
num_readied = 0 num_readied = 0
for evt in cstate.event_queue.queue.queue: for evt in cstate.event_queue.queue.queue:
if isinstance(evt, SpawnJob): if isinstance(evt, SpawnJob):
...@@ -150,7 +158,7 @@ def main(): ...@@ -150,7 +158,7 @@ def main():
job.update_state(cstate, JobState.READY) job.update_state(cstate, JobState.READY)
num_readied += 1 num_readied += 1
if num_readied > 0: if num_readied > 0:
print(f"Readied {num_readied} jobs that were supposed to be spawned") logger.info(f"Readied {num_readied} jobs that were supposed to be spawned")
sys.exit(0) sys.exit(0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment