Skip to content
Snippets Groups Projects
Verified Commit 85bb527f authored by Dorian Koch's avatar Dorian Koch
Browse files

track job status in k8s, formatting

parent a6756303
No related branches found
No related tags found
No related merge requests found
...@@ -4,6 +4,7 @@ from actions.spawn_job import SpawnJob ...@@ -4,6 +4,7 @@ from actions.spawn_job import SpawnJob
from event_queue import RecurringEvent from event_queue import RecurringEvent
from job_controller import ControllerState from job_controller import ControllerState
class FindReadyJobs(RecurringEvent): class FindReadyJobs(RecurringEvent):
def __str__(self): def __str__(self):
return f"RecurringEvent(action={self.action}, due_at={self.due_at})" return f"RecurringEvent(action={self.action}, due_at={self.due_at})"
......
from event_queue import Event, EventResult from collections.abc import Generator
from event_queue import GeneratorRecurringEvent
from job_api import Job from job_api import Job
from job_controller import ControllerState from job_controller import ControllerState
import datetime
class SpawnJob(Event): MAX_CONCURRENT_JOBS = 5
class SpawnJob(GeneratorRecurringEvent):
def __init__(self, job: Job): def __init__(self, job: Job):
super().__init__() super().__init__()
self.job = job self.job = job
self.has_active_job = False
def generator(self, cstate: ControllerState) -> Generator[datetime.timedelta, None, None]:
# wait for empty queue
while True:
total_num_jobs = 0
for evt in cstate.event_queue.queue.queue:
if isinstance(evt, SpawnJob) and evt.has_active_job:
total_num_jobs += 1
if total_num_jobs < MAX_CONCURRENT_JOBS:
break
print("Too many active jobs, waiting 10 seconds...")
yield datetime.timedelta(seconds=10)
def action(self, cstate: ControllerState):
print("Spawning job with id: {}".format(self.job.job_id)) print("Spawning job with id: {}".format(self.job.job_id))
# this is where the actual job spawning would happen self.has_active_job = True
ret = cstate.k8s.create_job(self.job) ret = cstate.k8s.create_job(self.job)
print(ret) if ret is None:
raise Exception("Failed to spawn job")
yield datetime.timedelta(seconds=5) # wait 5 seconds before checking if job is done
while True:
job_info = cstate.k8s.get_job(self.job.job_id)
if job_info.status.succeeded is not None:
print("Job finished successfully")
break
if job_info.status.failed is not None:
print("Job failed")
break
print("Job status:", job_info.status)
print("Job not finished yet, waiting 5 seconds...")
yield datetime.timedelta(seconds=5)
self.has_active_job = False
# job is done, process it
# TODO: implement processing
...@@ -2,19 +2,21 @@ from abc import abstractmethod ...@@ -2,19 +2,21 @@ from abc import abstractmethod
import queue import queue
import datetime import datetime
from enum import Enum from enum import Enum
from typing import Any from collections.abc import Generator
class EventResult(Enum): class EventResult(Enum):
DONE = "done" # remove from queue DONE = "done" # remove from queue
REQUEUE = "requeue" # put back in queue REQUEUE = "requeue" # put back in queue
class Event(): class Event():
def __init__(self, due_at: datetime.datetime = datetime.datetime.fromtimestamp(0)): def __init__(self, due_at: datetime.datetime = datetime.datetime.fromtimestamp(0)):
self.due_at = due_at self.due_at = due_at
@abstractmethod @abstractmethod
def action(self, cstate: "ControllerState") -> Any: def action(self, cstate: "ControllerState"):
pass pass
def __str__(self): def __str__(self):
...@@ -28,11 +30,8 @@ class Event(): ...@@ -28,11 +30,8 @@ class Event():
raise Exception(f"Event is not due yet: {self}") raise Exception(f"Event is not due yet: {self}")
return self.action(cstate) return self.action(cstate)
class RecurringEvent(Event):
def __init__(self, due_at: datetime.datetime = datetime.datetime.fromtimestamp(0)):
super().__init__(due_at=due_at)
class RecurringEvent(Event):
def __str__(self): def __str__(self):
return f"RecurringEvent(action={self.action}, due_at={self.due_at})" return f"RecurringEvent(action={self.action}, due_at={self.due_at})"
...@@ -40,9 +39,30 @@ class RecurringEvent(Event): ...@@ -40,9 +39,30 @@ class RecurringEvent(Event):
interval = super().__call__(cstate) interval = super().__call__(cstate)
if interval is None: if interval is None:
return EventResult.DONE return EventResult.DONE
# test if interval is a timedelta
if not isinstance(interval, datetime.timedelta):
raise Exception(f"Expected a timedelta from action, got {interval}")
self.due_at = datetime.datetime.now() + interval self.due_at = datetime.datetime.now() + interval
return EventResult.REQUEUE return EventResult.REQUEUE
class GeneratorRecurringEvent(RecurringEvent):
def __str__(self):
return f"GeneratorRecurringEvent(action={self.action}, due_at={self.due_at})"
@abstractmethod
def generator(self, cstate: "ControllerState") -> Generator[datetime.timedelta, None, None]:
pass
def action(self, cstate: "ControllerState") -> datetime.timedelta | None:
if not hasattr(self, "gen"):
self.gen = self.generator(cstate)
try:
return next(self.gen)
except StopIteration:
return None
class EventQueue(): class EventQueue():
def __init__(self): def __init__(self):
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
from enum import Enum from enum import Enum
from typing import Optional, List from typing import Optional, List
# job states:
class JobState(Enum): class JobState(Enum):
READY = "ready" READY = "ready"
SPAWNING = "spawning" SPAWNING = "spawning"
...@@ -13,12 +13,15 @@ class JobState(Enum): ...@@ -13,12 +13,15 @@ class JobState(Enum):
FAILED_AND_PROCESSED = "failed_and_processed" FAILED_AND_PROCESSED = "failed_and_processed"
DELETED = "deleted" DELETED = "deleted"
# this should represent an object extracted from the database
class Job():
def __init__(self, job_id: str, job_state: JobState = JobState.READY): class Job():
# this should represent an object extracted from the database
def __init__(self, job_id: str, image: str, command: List[str] | None = None, args: List[str] | None = None, job_state: JobState = JobState.READY):
self.job_id = job_id self.job_id = job_id
self.job_state = job_state self.job_state = job_state
self.image = image
self.command = command
self.args = args
# this is just a dummy implementation # this is just a dummy implementation
...@@ -45,5 +48,3 @@ class JobApi(): ...@@ -45,5 +48,3 @@ class JobApi():
def create_job(self, job: Job): def create_job(self, job: Job):
self.job_queue.append(job) self.job_queue.append(job)
self.db_state[job.job_id] = job self.db_state[job.job_id] = job
...@@ -2,6 +2,7 @@ from event_queue import EventQueue ...@@ -2,6 +2,7 @@ from event_queue import EventQueue
from job_api import JobApi from job_api import JobApi
from kubernetes_api import K8sApi from kubernetes_api import K8sApi
class ControllerState(): class ControllerState():
def __init__(self) -> None: def __init__(self) -> None:
......
from kubernetes import client, config from kubernetes import client, config
from kubernetes.client.models.v1_job import V1Job
from kubernetes.client.models.v1_job_status import V1JobStatus
import os import os
from job_api import Job from job_api import Job
...@@ -6,6 +8,7 @@ from job_api import Job ...@@ -6,6 +8,7 @@ from job_api import Job
NAMESPACE = "videoag-prod" NAMESPACE = "videoag-prod"
WORKER_LABEL = "videoag-worker" WORKER_LABEL = "videoag-worker"
class K8sApi(): class K8sApi():
def __init__(self): def __init__(self):
...@@ -41,6 +44,7 @@ class K8sApi(): ...@@ -41,6 +44,7 @@ class K8sApi():
}, },
"spec": { "spec": {
"ttlSecondsAfterFinished": 5 * 60, # automatically delete job after 5 minutes "ttlSecondsAfterFinished": 5 * 60, # automatically delete job after 5 minutes
"activeDeadlineSeconds": 60 * 60 * 24, # kill job after 24 hours if it's still running
"template": { "template": {
"metadata": { "metadata": {
"labels": { "labels": {
...@@ -51,14 +55,30 @@ class K8sApi(): ...@@ -51,14 +55,30 @@ class K8sApi():
"containers": [ "containers": [
{ {
"name": "worker", "name": "worker",
"image": "busybox", "image": job.image,
"args": [ "args": job.args,
"sleep", "command": job.command,
"10"
]
} }
], ],
"restartPolicy": "Never" "restartPolicy": "Never",
"affinity": {
"nodeAffinity": {
"preferredDuringSchedulingIgnoredDuringExecution": [
{
"weight": 1,
"preference": {
"matchExpressions": [
{
"key": "kubernetes.io/hostname",
"operator": "In",
"values": ["video-hoern"]
}
]
}
}
]
}
}
} }
} }
} }
...@@ -66,7 +86,11 @@ class K8sApi(): ...@@ -66,7 +86,11 @@ class K8sApi():
return self.batch_v1.create_namespaced_job(NAMESPACE, job_manifest) return self.batch_v1.create_namespaced_job(NAMESPACE, job_manifest)
def delete_job_by_name(self, job_name): def delete_job_by_name(self, job_name):
return self.batch_v1.delete_namespaced_job(job_name, NAMESPACE, propagation_policy="Background") # cascadingly deletes pods in the background # cascadingly deletes pods in the background
return self.batch_v1.delete_namespaced_job(job_name, NAMESPACE, propagation_policy="Background")
def delete_job_by_id(self, job_id): def delete_job_by_id(self, job_id):
return self.delete_job_by_name(f"{WORKER_LABEL}-{job_id}") return self.delete_job_by_name(f"{WORKER_LABEL}-{job_id}")
def get_job(self, job_id) -> V1Job:
return self.batch_v1.read_namespaced_job(f"{WORKER_LABEL}-{job_id}", NAMESPACE)
...@@ -8,6 +8,7 @@ import time ...@@ -8,6 +8,7 @@ import time
import argparse import argparse
import sys import sys
def main(): def main():
parser = argparse.ArgumentParser(description='Run the job controller') parser = argparse.ArgumentParser(description='Run the job controller')
parser.add_argument('--purge_existing_jobs', action='store_true', help='Delete existing jobs in k8s (dangerous!)') parser.add_argument('--purge_existing_jobs', action='store_true', help='Delete existing jobs in k8s (dangerous!)')
...@@ -39,7 +40,7 @@ def main(): ...@@ -39,7 +40,7 @@ def main():
# make some dummy jobs # make some dummy jobs
for i in range(2): for i in range(2):
cstate.job_api.create_job(Job("job{}".format(i))) cstate.job_api.create_job(Job("job{}".format(i), "quay.io/msrd0/render_video:latest", args=["--help"]))
cstate.event_queue.put(FindReadyJobs()) cstate.event_queue.put(FindReadyJobs())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment