Skip to content
Snippets Groups Projects
Verified Commit a6756303 authored by Dorian Koch's avatar Dorian Koch
Browse files

create jobs in k8s

parent f1907201
No related branches found
No related tags found
No related merge requests found
import datetime
from typing import Optional
from actions.spawn_job import spawn_job
from event_queue import Event, EventResult
from actions.spawn_job import SpawnJob
from event_queue import RecurringEvent
from job_controller import ControllerState
def find_ready_jobs(cstate: ControllerState) -> Optional[datetime.timedelta]:
class FindReadyJobs(RecurringEvent):
def __str__(self):
return f"RecurringEvent(action={self.action}, due_at={self.due_at})"
def action(self, cstate: ControllerState) -> Optional[datetime.timedelta]:
if cstate.event_queue.queue.qsize() > 20:
print("Event queue is getting full, not spawning more jobs")
return datetime.timedelta(seconds=20)
# get next jobs to spawn
jobs = cstate.job_api.get_next_jobs_and_set_spawning(5)
print("Found {} jobs to spawn".format(len(jobs)))
for job in jobs:
cstate.event_queue.put(Event(action=spawn_job, args=(job,)))
cstate.event_queue.put(SpawnJob(job))
if len(jobs) == 0:
return datetime.timedelta(seconds=5) # wait 5 seconds if no jobs are ready
......
from event_queue import EventResult
from event_queue import Event, EventResult
from job_api import Job
from job_controller import ControllerState
def spawn_job(cstate: ControllerState, job: Job):
print("Spawning job with id: {}".format(job.job_id))
class SpawnJob(Event):
def __init__(self, job: Job):
super().__init__()
self.job = job
def action(self, cstate: ControllerState):
print("Spawning job with id: {}".format(self.job.job_id))
# this is where the actual job spawning would happen
ret = cstate.k8s.create_job(self.job)
print(ret)
from abc import abstractmethod
import queue
import datetime
from enum import Enum
from typing import Any, Callable, Optional
from typing import Any
class EventResult(Enum):
DONE = "done" # remove from queue
......@@ -9,11 +10,12 @@ class EventResult(Enum):
class Event():
def __init__(self, action: Callable[["ControllerState"], Any], due_at: datetime.datetime = datetime.datetime.fromtimestamp(0), args: tuple = (), kwargs: dict = {}):
self.action = action
def __init__(self, due_at: datetime.datetime = datetime.datetime.fromtimestamp(0)):
self.due_at = due_at
self.args = args
self.kwargs = kwargs
@abstractmethod
def action(self, cstate: "ControllerState") -> Any:
pass
def __str__(self):
return f"Event(action={self.action}, due_at={self.due_at})"
......@@ -24,12 +26,12 @@ class Event():
def __call__(self, cstate: "ControllerState"):
if not self.canExecute():
raise Exception(f"Event is not due yet: {self}")
return self.action(cstate, *self.args, **self.kwargs)
return self.action(cstate)
class RecurringEvent(Event):
def __init__(self, action: Callable[["ControllerState"], Optional[datetime.timedelta]], due_at: datetime.datetime = datetime.datetime.fromtimestamp(0), args: tuple = (), kwargs: dict = {}):
super().__init__(action=action, due_at=due_at, args=args, kwargs=kwargs)
def __init__(self, due_at: datetime.datetime = datetime.datetime.fromtimestamp(0)):
super().__init__(due_at=due_at)
def __str__(self):
return f"RecurringEvent(action={self.action}, due_at={self.due_at})"
......
import queue
from event_queue import EventQueue, Event
from event_queue import EventQueue
from job_api import JobApi
from runner_api import K8sApi
from kubernetes_api import K8sApi
class ControllerState():
......
from kubernetes import client, config
import os
from job_api import Job
NAMESPACE = "videoag-prod"
WORKER_LABEL = "videoag-worker"
class K8sApi():
def __init__(self):
......@@ -12,6 +17,7 @@ class K8sApi():
config.load_kube_config()
self.api = client.ApiClient()
self.v1 = client.CoreV1Api(self.api)
self.batch_v1 = client.BatchV1Api(self.api)
print("#############")
print("Client version: {}".format(client.__version__))
print("Cluster version: {}".format(client.VersionApi(self.api).get_code()))
......@@ -19,6 +25,48 @@ class K8sApi():
print("https://github.com/kubernetes-client/python?tab=readme-ov-file#compatibility-matrix-of-supported-client-versions")
print("#############")
def list_pods(self):
ret = self.v1.list_pod_for_all_namespaces(watch=False)
return ret.items
\ No newline at end of file
def list_worker_jobs(self):
return self.batch_v1.list_namespaced_job(NAMESPACE, label_selector=f"app={WORKER_LABEL}")
def create_job(self, job: Job):
job_manifest = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": f"{WORKER_LABEL}-{job.job_id}",
"labels": {
"app": WORKER_LABEL,
"job_id": job.job_id
}
},
"spec": {
"ttlSecondsAfterFinished": 5 * 60, # automatically delete job after 5 minutes
"template": {
"metadata": {
"labels": {
"app": WORKER_LABEL
}
},
"spec": {
"containers": [
{
"name": "worker",
"image": "busybox",
"args": [
"sleep",
"10"
]
}
],
"restartPolicy": "Never"
}
}
}
}
return self.batch_v1.create_namespaced_job(NAMESPACE, job_manifest)
def delete_job_by_name(self, job_name):
return self.batch_v1.delete_namespaced_job(job_name, NAMESPACE, propagation_policy="Background") # cascadingly deletes pods in the background
def delete_job_by_id(self, job_id):
return self.delete_job_by_name(f"{WORKER_LABEL}-{job_id}")
\ No newline at end of file
from event_queue import EventResult, RecurringEvent
from actions.find_ready_jobs import find_ready_jobs
from event_queue import EventResult
from actions.find_ready_jobs import FindReadyJobs
from job_api import Job
from job_controller import ControllerState
import datetime
import time
from job_api import Job
from job_controller import ControllerState
import argparse
import sys
def main():
parser = argparse.ArgumentParser(description='Run the job controller')
parser.add_argument('--purge_existing_jobs', action='store_true', help='Delete existing jobs in k8s (dangerous!)')
args = parser.parse_args()
cstate = ControllerState()
# check existing jobs
print("Checking for existing jobs...")
existing_worker_jobs = cstate.k8s.list_worker_jobs()
if len(existing_worker_jobs.items) > 0:
print(f"Found {len(existing_worker_jobs.items)} existing worker jobs!")
for job in existing_worker_jobs.items:
print(f" - {job.metadata.name}")
if args.purge_existing_jobs:
print("Deleting existing jobs...")
for job in existing_worker_jobs.items:
print(cstate.k8s.delete_job_by_name(job.metadata.name))
print("Done deleting existing jobs")
time.sleep(3) # wait a bit for k8s to delete the jobs
else:
# fail
print("Exiting because existing jobs were found")
print("You can delete them by running this script with the --purge_existing_jobs flag")
sys.exit(1)
return
print("Done checking for existing jobs")
# make some dummy jobs
for i in range(10):
for i in range(2):
cstate.job_api.create_job(Job("job{}".format(i)))
cstate.event_queue.put(RecurringEvent(action=find_ready_jobs))
cstate.event_queue.put(FindReadyJobs())
while True:
evt = cstate.event_queue.get()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment