From 7b912ff54ca4fe033a44438732b4688828097500 Mon Sep 17 00:00:00 2001
From: Dorian Koch <doriank@fsmpi.rwth-aachen.de>
Date: Sat, 28 Sep 2024 11:41:44 +0200
Subject: [PATCH] Add proper logging

---
 src/job_controller/actions/find_ready_jobs.py |  4 +-
 src/job_controller/actions/run_sorter.py      |  8 +--
 src/job_controller/actions/spawn_job.py       | 20 +++----
 src/job_controller/custom_formatter.py        | 24 ++++++++
 src/job_controller/event_queue.py             |  8 +++
 .../job_handlers/cut_and_transcode.py         |  4 +-
 src/job_controller/job_handlers/dummy_job.py  |  6 +-
 src/job_controller/job_handlers/thumbnail.py  |  4 +-
 .../job_handlers/wrapped_job.py               |  5 ++
 src/main.py                                   | 60 +++++++++++--------
 10 files changed, 94 insertions(+), 49 deletions(-)
 create mode 100644 src/job_controller/custom_formatter.py

diff --git a/src/job_controller/actions/find_ready_jobs.py b/src/job_controller/actions/find_ready_jobs.py
index 9e3ef7f..447c38c 100644
--- a/src/job_controller/actions/find_ready_jobs.py
+++ b/src/job_controller/actions/find_ready_jobs.py
@@ -8,13 +8,13 @@ from job_controller.job_controller import ControllerState
 class FindReadyJobs(RecurringEvent):
     def action(self, cstate: ControllerState) -> Optional[datetime.timedelta]:
         if cstate.event_queue.queue.qsize() > 20:
-            print("Event queue is getting full, not spawning more jobs")
+            self.logger.info("Event queue is getting full, not spawning more jobs")
             return datetime.timedelta(seconds=20)
 
         # get next jobs to spawn
         jobs = cstate.job_api.get_next_jobs_and_set_spawning(5)
         if len(jobs) > 0:
-            print("Found {} jobs to spawn".format(len(jobs)))
+            self.logger.info("Found {} jobs to spawn".format(len(jobs)))
             for job in jobs:
                 wrapped = cstate.wrap_job(job)
                 cstate.event_queue.put(SpawnJob(wrapped))
diff --git a/src/job_controller/actions/run_sorter.py b/src/job_controller/actions/run_sorter.py
index f07bb5b..0b9cc64 100644
--- a/src/job_controller/actions/run_sorter.py
+++ b/src/job_controller/actions/run_sorter.py
@@ -25,16 +25,16 @@ class RunSorter(GeneratorRecurringEvent):
                 cstate.event_queue.put(spawnJobEvt)
                 while not spawnJobEvt.done:  # wait for spawn job to be done
                     yield spawnJobEvt.due_at  # because the event queue is stable, we will be queued after the spawnjob event executes
-                print("Sorter job spawned")
+                self.logger.info("Sorter job spawned")
 
                 watchJobEvt = spawnJobEvt.watch_job_evt
                 if watchJobEvt is None:
-                    print("Spawning sorter job failed!")
+                    self.logger.error("Spawning sorter job failed!")
                 else:
                     while not watchJobEvt.done:
                         yield watchJobEvt.due_at
                     # spawn job is done
-                    print("Sorter job done")
+                    self.logger.info("Sorter job done")
             except Exception as e:
-                print(f"Error in RunSorter: {e}")
+                self.logger.error(f"Error in RunSorter: {e}")
             yield self.interval
diff --git a/src/job_controller/actions/spawn_job.py b/src/job_controller/actions/spawn_job.py
index 3fe556d..f5a3505 100644
--- a/src/job_controller/actions/spawn_job.py
+++ b/src/job_controller/actions/spawn_job.py
@@ -23,13 +23,13 @@ class SpawnJob(GeneratorRecurringEvent):
                     total_num_jobs += 1
             if total_num_jobs < MAX_CONCURRENT_JOBS:
                 break
-            print("Too many active jobs, waiting 10 seconds...")
+            self.logger.info("Too many active jobs, waiting 10 seconds...")
             yield datetime.timedelta(seconds=10)
 
-        print("Preparing to spawn job with id: {}".format(self.job.jobData.job_id))
+        self.logger.info("Preparing to spawn job with id: {}".format(self.job.jobData.job_id))
         self.job.jobData.update_state(cstate, JobState.SPAWNING)
         self.job.prepare(cstate)
-        print("Job prepared, spawning...")
+        self.logger.info("Job prepared, spawning...")
         ret = cstate.k8s.create_job(self.job)
         if ret is None:
             raise Exception("Failed to spawn job")
@@ -48,13 +48,13 @@ class WatchJob(GeneratorRecurringEvent):
         while True:
             job_info = cstate.k8s.get_job(self.job.jobData.job_id)
             if job_info.status.succeeded is not None:
-                print("Job finished successfully")
+                self.logger.info("Job finished successfully")
                 break
             if job_info.status.failed is not None:
-                print("Job failed")
+                self.logger.error("Job failed")
                 break
             # print("Job status:", job_info.status)
-            print("Job not finished yet, waiting 5 seconds...")
+            self.logger.info("Job not finished yet, waiting 5 seconds...")
             yield datetime.timedelta(seconds=5)
 
         # look at database to see if the job has marked itself as finished/failed
@@ -62,17 +62,17 @@ class WatchJob(GeneratorRecurringEvent):
         if self.job.jobData is None:
             raise Exception(f"Could not find job in db with id={old_id}")
         if self.job.job_state == JobState.CANCELED or self.job.job_state == JobState.FINISHED_AND_PROCESSED or self.job.job_state == JobState.FAILED_AND_PROCESSED:
-            print(f"Job already processed in db, state: {self.job.job_state}, not processing again")
+            self.logger.info(f"Job already processed in db, state: {self.job.job_state}, not processing again")
             return
         expected_state = JobState.FINISHED if job_info.status.succeeded is not None else JobState.FAILED
         if not self.job.job_state == expected_state and (self.job.job_state == JobState.FINISHED or self.job.job_state == JobState.FAILED):
-            print(f"Job not marked as expected in db, expected={expected_state} != db={self.job.job_state}")
-            print("Marking job as failed in db (this should not happen, make sure the job exits with a zero code iff. it finished)")
+            self.logger.warning(f"Job not marked as expected in db, expected={expected_state} != db={self.job.job_state}")
+            self.logger.warning("Marking job as failed in db (this should not happen, make sure the job exits with a zero code iff. it finished)")
             # the only state where this is expected is on startup when old k8s jobs are reconciled
             expected_state = JobState.FAILED
 
         if self.job.job_state != expected_state:
-            print(f"Updating job state in db to: {expected_state}, from: {self.job.job_state}")
+            self.logger.info(f"Updating job state in db to: {expected_state}, from: {self.job.job_state}")
             self.job.jobData.update_state(cstate, expected_state)
 
         cstate.event_queue.put(self.job)  # the jobwrapper class processes its' jobs result
diff --git a/src/job_controller/custom_formatter.py b/src/job_controller/custom_formatter.py
new file mode 100644
index 0000000..d1bbea7
--- /dev/null
+++ b/src/job_controller/custom_formatter.py
@@ -0,0 +1,24 @@
+import logging
+
+
+class CustomFormatter(logging.Formatter):
+
+    grey = "\x1b[38;20m"
+    yellow = "\x1b[33;20m"
+    red = "\x1b[31;20m"
+    bold_red = "\x1b[31;1m"
+    reset = "\x1b[0m"
+    format_str = "%(asctime)s - %(name)-12s [%(levelname)-8s]: %(message)s (%(filename)s:%(lineno)s)"
+
+    FORMATS = {
+        logging.DEBUG: grey + format_str + reset,
+        logging.INFO: grey + format_str + reset,
+        logging.WARNING: yellow + format_str + reset,
+        logging.ERROR: red + format_str + reset,
+        logging.CRITICAL: bold_red + format_str + reset
+    }
+
+    def format(self, record):
+        log_fmt = self.FORMATS.get(record.levelno)
+        formatter = logging.Formatter(log_fmt)
+        return formatter.format(record)
diff --git a/src/job_controller/event_queue.py b/src/job_controller/event_queue.py
index 6d4e255..9b20bb4 100644
--- a/src/job_controller/event_queue.py
+++ b/src/job_controller/event_queue.py
@@ -3,6 +3,7 @@ import queue
 import datetime
 from enum import Enum
 from collections.abc import Generator
+import logging
 
 
 class EventResult(Enum):
@@ -18,11 +19,17 @@ class Event(metaclass=ABCMeta):
         else:
             self.due_at = due_at
         self.done = False
+        self.event_queue_id = -1
 
     @abstractmethod
     def action(self, cstate: "ControllerState"):
         pass
 
+    @property
+    def logger(self):
+        # name contains class name and id
+        return logging.getLogger(f"{self.__class__.__name__}[{self.event_queue_id}]")
+
     def __str__(self):
         return f"Event(action={self.action}, due_at={self.due_at})"
 
@@ -93,6 +100,7 @@ class EventQueue():
         self.counter = 0
 
     def put(self, event: Event):
+        event.event_queue_id = self.counter
         self.queue.put((event.due_at, self.counter, event))
         self.counter += 1  # this enforces FIFO in the queue (after comparing due_at)
 
diff --git a/src/job_controller/job_handlers/cut_and_transcode.py b/src/job_controller/job_handlers/cut_and_transcode.py
index 93a5cfe..c021f0b 100644
--- a/src/job_controller/job_handlers/cut_and_transcode.py
+++ b/src/job_controller/job_handlers/cut_and_transcode.py
@@ -60,7 +60,7 @@ class CutAndTranscode(WrappedJob):
             f.write(self.make_project_toml())
 
     def success(self, cstate):
-        print("Transcode success!")
+        self.logger.info("Transcode success!")
 
     def failure(self, cstate):
-        print("Transcode fail :(")
+        self.logger.info("Transcode fail :(")
diff --git a/src/job_controller/job_handlers/dummy_job.py b/src/job_controller/job_handlers/dummy_job.py
index 4270dc5..e668424 100644
--- a/src/job_controller/job_handlers/dummy_job.py
+++ b/src/job_controller/job_handlers/dummy_job.py
@@ -12,10 +12,10 @@ class DummyJob(WrappedJob):
         return ContainerConfig(image="busybox", args=["sleep", "60"])
 
     def prepare(self, cstate: "ControllerState"):
-        print("Preparing dummy job")
+        self.logger.info("Preparing dummy job")
 
     def success(self, cstate):
-        print("Dummy job succeeded")
+        self.logger.info("Dummy job succeeded")
 
     def failure(self, cstate):
-        print("Dummy job failed")
+        self.logger.info("Dummy job failed")
diff --git a/src/job_controller/job_handlers/thumbnail.py b/src/job_controller/job_handlers/thumbnail.py
index 04e9914..3b7efdc 100644
--- a/src/job_controller/job_handlers/thumbnail.py
+++ b/src/job_controller/job_handlers/thumbnail.py
@@ -54,7 +54,7 @@ class Thumbnail(WrappedJob):
             os.remove(os.path.join(self.in_dir, "thumb.tmp.jpg"))
 
     def success(self, cstate):
-        print("Thumbnail success!")
+        self.logger.info("Thumbnail success!")
 
     def failure(self, cstate):
-        print("Thumbnail fail :(")
+        self.logger.info("Thumbnail fail :(")
diff --git a/src/job_controller/job_handlers/wrapped_job.py b/src/job_controller/job_handlers/wrapped_job.py
index 4fab190..a522a3f 100644
--- a/src/job_controller/job_handlers/wrapped_job.py
+++ b/src/job_controller/job_handlers/wrapped_job.py
@@ -1,6 +1,7 @@
 from abc import abstractmethod
 from job_controller.event_queue import Event, EventResult
 from job_controller.job_database_api import JobData, JobState
+import logging
 
 
 class ContainerConfig:
@@ -65,6 +66,10 @@ class WrappedJob(Event):
             raise Exception(f"Job is in unexpected state: {self.job_state}")
         return EventResult.DONE
 
+    @property
+    def logger(self):
+        return logging.getLogger(f"{self.__class__.__name__}[{self.jobData.job_id}]")
+
     @property
     def job_id(self):
         return self.jobData.job_id
diff --git a/src/main.py b/src/main.py
index 975ba0d..cab1caf 100644
--- a/src/main.py
+++ b/src/main.py
@@ -4,53 +4,57 @@ from job_controller.actions.find_ready_jobs import FindReadyJobs
 from job_controller.job_database_api import JobState
 from job_controller.job_controller import ControllerState
 from job_controller.actions.run_sorter import RunSorter
+from job_controller.custom_formatter import CustomFormatter
 
 import datetime
 import time
 import argparse
 import sys
 import signal
+import logging
 
 
 def reconcile_existing_k8s_jobs(cstate: ControllerState, args, existing_worker_jobs):
-    print(f"Found {len(existing_worker_jobs.items)} existing worker jobs!")
+    logger = logging.getLogger("main_rec1")
+    logger.info(f"Found {len(existing_worker_jobs.items)} existing worker jobs!")
     marked_for_deletion = []
     marked_for_watch = []
     for job in existing_worker_jobs.items:
-        print(f" - {job.metadata.name}")
+        logger.debug(f" - {job.metadata.name}")
         job_in_db = cstate.job_api.get_job_by_id(job.metadata.labels["job_id"])
         if job_in_db is None:
-            print(f"Could not find job in db with id: {job.metadata.labels['job_id']}")
+            logger.warning(f"Could not find job in db with id: {job.metadata.labels['job_id']}")
             marked_for_deletion.append(job)
         else:
-            print(f"   - Job state in db: {job_in_db.job_state}")
+            logger.info(f"   - Job state in db: {job_in_db.job_state}")
             # TODO: figure out if job already finished by looking at db data
             marked_for_watch.append(job)
     if len(marked_for_deletion) > 0:
         if args.purge_existing_jobs:
-            print(f"Deleting {len(marked_for_deletion)} existing jobs...")
+            logger.info(f"Deleting {len(marked_for_deletion)} existing jobs...")
             for job in existing_worker_jobs.items:
                 cstate.k8s.delete_job_by_name(job.metadata.name)
-            print("Done deleting existing jobs")
+            logger.info("Done deleting existing jobs")
             time.sleep(3)  # wait a bit for k8s to delete the jobs
         elif args.ignore_unreconcilable_jobs:
-            print(f"Ignoring {len(marked_for_deletion)} unreconcilable jobs")
+            logger.info(f"Ignoring {len(marked_for_deletion)} unreconcilable jobs")
         else:
             # fail
-            print("Exiting because existing jobs were found that could not be reconciled with db state")
-            print("You can delete them by running this script with the --purge_existing_jobs flag")
+            logger.warning("Exiting because existing jobs were found that could not be reconciled with db state")
+            logger.warning("You can delete them by running this script with the --purge_existing_jobs flag")
             sys.exit(1)
             return
     for watch in marked_for_watch:
         jobData = cstate.job_api.get_job_by_id(watch.metadata.labels["job_id"])
         if jobData is None:
-            print(f"Could not find job in db with id: {watch.metadata.labels['job_id']}")
+            logger.info(f"Could not find job in db with id: {watch.metadata.labels['job_id']}")
             continue
         wrappedJob = cstate.wrap_job(jobData)
         cstate.event_queue.put(WatchJob(wrappedJob))
 
 
 def reconcile_unspawned_database_jobs(cstate: ControllerState, existing_worker_jobs):
+    logger = logging.getLogger("main_rec2")
     spawning_jobs = cstate.job_api.get_all_spawning_jobs()
     num_resetted = 0
     for job in spawning_jobs:
@@ -65,7 +69,7 @@ def reconcile_unspawned_database_jobs(cstate: ControllerState, existing_worker_j
             job.update_state(cstate, JobState.READY)
             num_resetted += 1
     if num_resetted > 0:
-        print(f"Resetted {num_resetted} spawning jobs to ready state")
+        logger.info(f"Resetted {num_resetted} spawning jobs to ready state")
 
 
 def initial_jobs(cstate: ControllerState):
@@ -74,6 +78,10 @@ def initial_jobs(cstate: ControllerState):
 
 
 def main():
+    stream_handler = logging.StreamHandler()
+    stream_handler.setFormatter(CustomFormatter())
+    logging.basicConfig(level=logging.DEBUG, handlers=[stream_handler])
+    logger = logging.getLogger("main")
     parser = argparse.ArgumentParser(description='Run the job controller')
     parser.add_argument('--purge_existing_jobs', action='store_true', help='Delete existing jobs in k8s (dangerous!)')
     parser.add_argument('--ignore_unreconcilable_jobs', action='store_true', help='Ignore unreconcilable jobs in k8s')
@@ -82,20 +90,20 @@ def main():
 
     cstate = ControllerState()
     if args.incluster and cstate.k8s.config_used != "incluster":
-        print("Incluster required by commandline flag, but not running in k8s cluster")
+        logger.error("Incluster required by commandline flag, but not running in k8s cluster")
         sys.exit(1)
         return
 
     # check existing jobs
-    print("Checking for existing jobs...")
+    logger.info("Checking for existing jobs...")
     existing_worker_jobs = cstate.k8s.list_worker_jobs()
     if len(existing_worker_jobs.items) > 0:
         reconcile_existing_k8s_jobs(cstate, args, existing_worker_jobs)
 
-    print("Done checking for existing jobs")
+    logger.info("Done checking for existing jobs")
     # find spawning jobs that are not in k8s
     reconcile_unspawned_database_jobs(cstate, existing_worker_jobs)
-    print("Reconcilation done")
+    logger.info("Reconcilation done")
 
     initial_jobs(cstate)
 
@@ -104,9 +112,9 @@ def main():
     def signal_handler(sig, frame):
         nonlocal run_event_loop
         if run_event_loop == False:
-            print("Force quitting event loop...")
+            logger.warning("Force quitting event loop...")
             sys.exit(2)
-        print("Stopping event loop...")
+        logger.warning("Stopping event loop...")
         run_event_loop = False
     signal.signal(signal.SIGINT, signal_handler)
 
@@ -115,7 +123,7 @@ def main():
         while not evt.canExecute() and run_event_loop:
             # because the queue is sorted by due_at, we can wait until the this event is due
             tts = (evt.due_at - datetime.datetime.now()).total_seconds()
-            print(f" >> Sleeping for {tts} seconds until next event is due: {evt}")
+            logger.debug(f" >> Sleeping for {tts} seconds until next event is due: {evt}")
             while run_event_loop and tts > 0:
                 time.sleep(1)
                 tts = (evt.due_at - datetime.datetime.now()).total_seconds()
@@ -127,20 +135,20 @@ def main():
             ret = evt(cstate)
             end = datetime.datetime.now()
             if (end - start).total_seconds() > 0.5:
-                print(f"!! Event {evt} took {(end - start).total_seconds()}s to execute!")
+                logger.warning(f"!! Event {evt} took {(end - start).total_seconds()}s to execute!")
             if ret == EventResult.REQUEUE:
                 cstate.event_queue.put(evt)
             elif not ret == EventResult.DONE and ret is not None:
                 raise Exception(f"Unexpected return value from event: {ret}, {evt}")
         except Exception as e:
-            print("###")
-            print(f"Error in event {evt}: {e}")
-            print("###")
-    print("Event loop stopped")
+            logger.error("###")
+            logger.error(f"Error in event {evt}: {e}")
+            logger.error("###")
+    logger.info("Event loop stopped")
     # set all jobs that were supposed to be spawned back to ready
-    print("Remaining events in queue:")
+    logger.info("Remaining events in queue:")
     for evt in cstate.event_queue.queue.queue:
-        print(f" - {evt}")
+        logger.info(f" - {evt}")
     num_readied = 0
     for evt in cstate.event_queue.queue.queue:
         if isinstance(evt, SpawnJob):
@@ -150,7 +158,7 @@ def main():
                 job.update_state(cstate, JobState.READY)
                 num_readied += 1
     if num_readied > 0:
-        print(f"Readied {num_readied} jobs that were supposed to be spawned")
+        logger.info(f"Readied {num_readied} jobs that were supposed to be spawned")
     sys.exit(0)
 
 
-- 
GitLab