Skip to content
Snippets Groups Projects
Commit 96573fe9 authored by Simon Künzel's avatar Simon Künzel
Browse files

Allow local docker executor to use prebuilt images

parent d6f59dc8
Branches
Tags
No related merge requests found
...@@ -39,6 +39,12 @@ JOB_EXECUTOR = "local-docker" ...@@ -39,6 +39,12 @@ JOB_EXECUTOR = "local-docker"
LOCAL_DOCKER_LOG_BUILD = False LOCAL_DOCKER_LOG_BUILD = False
# LOCAL_DOCKER_USE_PRE_BUILT_IMAGES = True
# LOCAL_DOCKER_PRE_BUILT_IMAGE_PATH_START = "registry.git.fsmpi.rwth-aachen.de/videoag/backend/production_job_"
# LOCAL_DOCKER_PRE_BUILT_IMAGE_TAG_FILE = "../config/local_docker_image_tag.txt"
# LOCAL_DOCKER_MEDIA_DIRECTORY = "../../../live_media/"
# LOCAL_DOCKER_JOB_CONTAINER_NETWORK = "host"
K8S_NAMESPACE = "videoag-prod" K8S_NAMESPACE = "videoag-prod"
K8S_JOB_LABEL = "videoag-job" K8S_JOB_LABEL = "videoag-job"
K8S_DATA_CONTAINER_VOLUME_NAME = "video-data-volume" K8S_DATA_CONTAINER_VOLUME_NAME = "video-data-volume"
......
v2.0.18
\ No newline at end of file
...@@ -20,6 +20,19 @@ _PROJECT_ROOT_DIR = Path("../..").resolve() ...@@ -20,6 +20,19 @@ _PROJECT_ROOT_DIR = Path("../..").resolve()
LOG_BUILD = job_controller.config["LOCAL_DOCKER_LOG_BUILD"] LOG_BUILD = job_controller.config["LOCAL_DOCKER_LOG_BUILD"]
_MEDIA_DIRECTORY = job_controller.config.get("LOCAL_DOCKER_MEDIA_DIRECTORY")
if _MEDIA_DIRECTORY:
_MEDIA_DIRECTORY_PATH = Path(_MEDIA_DIRECTORY).resolve()
else:
_MEDIA_DIRECTORY_PATH = _PROJECT_ROOT_DIR.joinpath(".media")
USE_PRE_BUILT_IMAGES = job_controller.config.get("LOCAL_DOCKER_USE_PRE_BUILT_IMAGES", False)
if USE_PRE_BUILT_IMAGES:
PRE_BUILT_IMAGE_PATH_START = job_controller.config["LOCAL_DOCKER_PRE_BUILT_IMAGE_PATH_START"]
PRE_BUILT_IMAGE_TAG_FILE = job_controller.config.get("LOCAL_DOCKER_PRE_BUILT_IMAGE_TAG_FILE")
JOB_CONTAINER_NETWORK = job_controller.config.get("LOCAL_DOCKER_JOB_CONTAINER_NETWORK", "backend_videoag_backend")
class DockerJob(JobExecutionInfo): class DockerJob(JobExecutionInfo):
...@@ -30,7 +43,6 @@ class DockerJob(JobExecutionInfo): ...@@ -30,7 +43,6 @@ class DockerJob(JobExecutionInfo):
self._metadata = metadata self._metadata = metadata
self._input_data = input_data self._input_data = input_data
self._build_process: subprocess.Popen or None = None self._build_process: subprocess.Popen or None = None
self._run_args: list[str] or None = None
self._run_process: subprocess.Popen or None = None self._run_process: subprocess.Popen or None = None
self._failed = False self._failed = False
self._start_time = None self._start_time = None
...@@ -43,6 +55,9 @@ class DockerJob(JobExecutionInfo): ...@@ -43,6 +55,9 @@ class DockerJob(JobExecutionInfo):
self._finish_time = get_standard_datetime_now() self._finish_time = get_standard_datetime_now()
def spawn(self): def spawn(self):
if USE_PRE_BUILT_IMAGES:
self._scheduler.enter(0, 0, self._check_status_from_scheduler)
return
if self._build_process is not None: if self._build_process is not None:
return return
try: try:
...@@ -59,59 +74,67 @@ class DockerJob(JobExecutionInfo): ...@@ -59,59 +74,67 @@ class DockerJob(JobExecutionInfo):
"-t", f"videoag_job_{self._metadata.id}", "-t", f"videoag_job_{self._metadata.id}",
_PROJECT_ROOT_DIR, _PROJECT_ROOT_DIR,
] ]
if subprocess.run([
_PROJECT_ROOT_DIR.joinpath("dev/generate_dockerfiles.sh")
], cwd=_PROJECT_ROOT_DIR).returncode != 0:
raise ValueError("Error while generating dockerfiles")
print(f"Docker Executor: Building image {self._metadata.id} for job {self.job_id}:")
print(build_args)
# Might have missing quoting, etc. but can be useful for debugging.
print("UNSAFE: " + ' '.join(str(a) for a in build_args))
if LOG_BUILD:
self._build_process = subprocess.Popen(build_args)
else:
self._build_process = subprocess.Popen(build_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self._run_args = [ self._scheduler.enter(0.5, 0, self._check_status_from_scheduler)
def _run_docker_job(self):
print(f"Docker Executor: Running job {self.job_id} of type {self._metadata.id}:")
_run_args = [
"docker", "run", "docker", "run",
# docker-compose networks are prefixed with the directories' name # docker-compose networks are prefixed with the directories' name
"--network", "backend_videoag_backend", "--network", JOB_CONTAINER_NETWORK,
# Not really needed, but keep it for convenience # Not really needed right now, but keep it for convenience
"--add-host", "host.docker.internal:host-gateway", "--add-host", "host.docker.internal:host-gateway",
] ]
if self._metadata.mount_common_config: if self._metadata.mount_common_config:
self._run_args.extend([ _run_args.extend([
"-e", "VIDEOAG_CONFIG=/code/config/common_job_config.py", "-e", "VIDEOAG_CONFIG=/code/config/common_job_config.py",
"--mount", f"type=bind,source={_PROJECT_ROOT_DIR.joinpath('job_controller/config/common_job_example_config.py')}," "--mount",
f"type=bind,source={_PROJECT_ROOT_DIR.joinpath('job_controller/config/common_job_example_config.py')},"
f"destination=/code/config/common_job_config.py", f"destination=/code/config/common_job_config.py",
]) ])
if self._metadata.data_dir_mount is not None: if self._metadata.data_dir_mount is not None:
media_dir = _PROJECT_ROOT_DIR.joinpath(".media") _MEDIA_DIRECTORY_PATH.mkdir(parents=True, exist_ok=True)
media_dir.mkdir(parents=True, exist_ok=True) _run_args.extend([
self._run_args.extend([ "--mount", f"type=bind,source={_MEDIA_DIRECTORY_PATH},"
"--mount", f"type=bind,source={media_dir},"
f"destination={self._metadata.data_dir_mount}", f"destination={self._metadata.data_dir_mount}",
]) ])
self._run_args.append(f"videoag_job_{self._metadata.id}") if USE_PRE_BUILT_IMAGES:
if PRE_BUILT_IMAGE_TAG_FILE:
image_tag = Path(PRE_BUILT_IMAGE_TAG_FILE).resolve().read_text()
else:
image_tag = "latest"
_run_args.append(f"{PRE_BUILT_IMAGE_PATH_START}{self._metadata.id}:{image_tag}")
else:
_run_args.append(f"videoag_job_{self._metadata.id}")
# Job args # Job args
self._run_args.extend([ _run_args.extend([
"--own_job_type", str(self._metadata.id), "--own_job_type", str(self._metadata.id),
"--own_job_id", str(self.job_id), "--own_job_id", str(self.job_id),
"--input_data", json.dumps(self._input_data) "--input_data", json.dumps(self._input_data)
]) ])
if subprocess.run([ print(_run_args)
_PROJECT_ROOT_DIR.joinpath("dev/generate_dockerfiles.sh")
], cwd=_PROJECT_ROOT_DIR).returncode != 0:
raise ValueError("Error while generating dockerfiles")
print(f"Docker Executor: Building image {self._metadata.id} for job {self.job_id}:")
print(build_args)
# Might have missing quoting, etc. but can be useful for debugging. # Might have missing quoting, etc. but can be useful for debugging.
print("UNSAFE: " + ' '.join(str(a) for a in build_args)) print("UNSAFE: " + ' '.join(str(a) for a in _run_args))
if LOG_BUILD: self._run_process = subprocess.Popen(_run_args)
self._build_process = subprocess.Popen(build_args)
else:
self._build_process = subprocess.Popen(build_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self._scheduler.enter(0.5, 0, self._check_status_from_scheduler)
def _run_docker_job(self):
print(f"Docker Executor: Running job {self.job_id} of type {self._metadata.id}:")
print(self._run_args)
# Might have missing quoting, etc. but can be useful for debugging.
print("UNSAFE: " + ' '.join(str(a) for a in self._run_args))
self._run_process = subprocess.Popen(self._run_args)
self._start_time = get_standard_datetime_now() self._start_time = get_standard_datetime_now()
def _check_status_from_scheduler(self): def _check_status_from_scheduler(self):
...@@ -124,10 +147,10 @@ class DockerJob(JobExecutionInfo): ...@@ -124,10 +147,10 @@ class DockerJob(JobExecutionInfo):
Only call from scheduler! Only call from scheduler!
""" """
if self._run_process is None:
if not USE_PRE_BUILT_IMAGES:
if self._build_process is None: if self._build_process is None:
return return
if self._run_process is None:
assert isinstance(self._build_process, subprocess.Popen) assert isinstance(self._build_process, subprocess.Popen)
# We are building right now # We are building right now
self._build_process.poll() self._build_process.poll()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment