Skip to content
Snippets Groups Projects
Select Git revision
  • a7a6ebde8ef99f258171908204b923bee21e764f
  • master default protected
  • intros
  • live_sources
  • bootstrap4
  • modules
6 results

server.py

Blame
  • Forked from Video AG Infrastruktur / website
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    kubernetes_api.py 3.81 KiB
    from kubernetes import client, config
    from kubernetes.client.models.v1_job import V1Job
    from kubernetes.client.models.v1_job_list import V1JobList
    import os
    
    from job_api import Job
    
    NAMESPACE = "videoag-prod"
    WORKER_LABEL = "videoag-worker"
    
    
    class K8sApi():
    
        def __init__(self):
            if os.path.isfile(config.incluster_config.SERVICE_TOKEN_FILENAME):
                print("Using incluster config")
                config.load_incluster_config()
            else:
                print("Using local config")
                config.load_kube_config()
            self.api = client.ApiClient()
            self.v1 = client.CoreV1Api(self.api)
            self.batch_v1 = client.BatchV1Api(self.api)
            print("#############")
            print("Client version: {}".format(client.__version__))
            print("Cluster version: {}".format(client.VersionApi(self.api).get_code()))
            print("Please verify compatibility!")
            print("https://github.com/kubernetes-client/python?tab=readme-ov-file#compatibility-matrix-of-supported-client-versions")
            print("#############")
    
        def list_worker_jobs(self) -> V1JobList:
            return self.batch_v1.list_namespaced_job(NAMESPACE, label_selector=f"app={WORKER_LABEL}")
    
        def create_job(self, job: Job):
            job_manifest = {
                "apiVersion": "batch/v1",
                "kind": "Job",
                "metadata": {
                    "name": f"{WORKER_LABEL}-{job.job_id}",
                    "labels": {
                        "app": WORKER_LABEL,
                        "job_id": job.job_id
                    }
                },
                "spec": {
                    "ttlSecondsAfterFinished": 5 * 60,  # automatically delete job after 5 minutes
                    "activeDeadlineSeconds": 60 * 60 * 24,  # kill job after 24 hours if it's still running
                    "template": {
                        "metadata": {
                            "labels": {
                                "app": WORKER_LABEL
                            }
                        },
                        "spec": {
                            "containers": [
                                {
                                    "name": "worker",
                                    "image": job.image,
                                    "args": job.args,
                                    "command": job.command,
                                }
                            ],
                            "restartPolicy": "Never",
                            "affinity": {
                                "nodeAffinity": {
                                    "preferredDuringSchedulingIgnoredDuringExecution": [
                                        {
                                            "weight": 1,
                                            "preference": {
                                                "matchExpressions": [
                                                    {
                                                        "key": "kubernetes.io/hostname",
                                                        "operator": "In",
                                                        "values": ["video-hoern"]
                                                    }
                                                ]
                                            }
                                        }
                                    ]
                                }
                            }
                        }
                    }
                }
            }
            return self.batch_v1.create_namespaced_job(NAMESPACE, job_manifest)
    
        def delete_job_by_name(self, job_name):
            # cascadingly deletes pods in the background
            return self.batch_v1.delete_namespaced_job(job_name, NAMESPACE, propagation_policy="Background")
    
        def delete_job_by_id(self, job_id):
            return self.delete_job_by_name(f"{WORKER_LABEL}-{job_id}")
    
        def get_job(self, job_id) -> V1Job:
            return self.batch_v1.read_namespaced_job(f"{WORKER_LABEL}-{job_id}", NAMESPACE)