Select Git revision
kubernetes_api.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
kubernetes_api.py 3.81 KiB
from kubernetes import client, config
from kubernetes.client.models.v1_job import V1Job
from kubernetes.client.models.v1_job_list import V1JobList
import os
from job_api import Job
NAMESPACE = "videoag-prod"
WORKER_LABEL = "videoag-worker"
class K8sApi():
def __init__(self):
if os.path.isfile(config.incluster_config.SERVICE_TOKEN_FILENAME):
print("Using incluster config")
config.load_incluster_config()
else:
print("Using local config")
config.load_kube_config()
self.api = client.ApiClient()
self.v1 = client.CoreV1Api(self.api)
self.batch_v1 = client.BatchV1Api(self.api)
print("#############")
print("Client version: {}".format(client.__version__))
print("Cluster version: {}".format(client.VersionApi(self.api).get_code()))
print("Please verify compatibility!")
print("https://github.com/kubernetes-client/python?tab=readme-ov-file#compatibility-matrix-of-supported-client-versions")
print("#############")
def list_worker_jobs(self) -> V1JobList:
return self.batch_v1.list_namespaced_job(NAMESPACE, label_selector=f"app={WORKER_LABEL}")
def create_job(self, job: Job):
job_manifest = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": f"{WORKER_LABEL}-{job.job_id}",
"labels": {
"app": WORKER_LABEL,
"job_id": job.job_id
}
},
"spec": {
"ttlSecondsAfterFinished": 5 * 60, # automatically delete job after 5 minutes
"activeDeadlineSeconds": 60 * 60 * 24, # kill job after 24 hours if it's still running
"template": {
"metadata": {
"labels": {
"app": WORKER_LABEL
}
},
"spec": {
"containers": [
{
"name": "worker",
"image": job.image,
"args": job.args,
"command": job.command,
}
],
"restartPolicy": "Never",
"affinity": {
"nodeAffinity": {
"preferredDuringSchedulingIgnoredDuringExecution": [
{
"weight": 1,
"preference": {
"matchExpressions": [
{
"key": "kubernetes.io/hostname",
"operator": "In",
"values": ["video-hoern"]
}
]
}
}
]
}
}
}
}
}
}
return self.batch_v1.create_namespaced_job(NAMESPACE, job_manifest)
def delete_job_by_name(self, job_name):
# cascadingly deletes pods in the background
return self.batch_v1.delete_namespaced_job(job_name, NAMESPACE, propagation_policy="Background")
def delete_job_by_id(self, job_id):
return self.delete_job_by_name(f"{WORKER_LABEL}-{job_id}")
def get_job(self, job_id) -> V1Job:
return self.batch_v1.read_namespaced_job(f"{WORKER_LABEL}-{job_id}", NAMESPACE)