Skip to content
Snippets Groups Projects
Select Git revision
  • main default protected
  • ci_test
  • v2.0.26 protected
  • v2.0.25 protected
  • v2.0.24 protected
  • v2.0.23 protected
  • v2.0.22 protected
  • v2.0.21 protected
  • v2.0.20 protected
  • v2.0.19 protected
  • v2.0.18 protected
  • v2.0.17 protected
  • v2.0.16 protected
  • v2.0.15 protected
  • v2.0.14 protected
  • v2.0.13 protected
  • v2.0.12 protected
  • v2.0.11 protected
  • v2.0.10 protected
  • v2.0.9 protected
  • v2.0.8 protected
  • v2.0.7 protected
22 results

build_pipeline_generator.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    build_pipeline_generator.py 16.31 KiB
    import os
    import re
    import sys
    from collections import defaultdict
    from pathlib import Path
    from argparse import ArgumentParser
    
    sys.path.append(str(Path("common_py/src/").resolve()))
    os.environ["VIDEOAG_CONFIG"] = "/dev/null"
    from videoag_common.miscellaneous import CJsonValue, JsonDataClass, JsonSerializableEnum, json_field, JsonSerializable, \
        JsonTypes
    
    
    class OsType(JsonSerializableEnum):
        DEBIAN = "debian"
        ALPINE = "alpine"
    
    
    class OsModuleDependencies(JsonSerializable):
        
        def __init__(self, names_by_type: dict[OsType, set[str]] or None = None):
            super().__init__()
            self._names_by_type = defaultdict(lambda: set(), names_by_type or {})
        
        @classmethod
        def from_json(cls, json: CJsonValue):
            json = json.as_object()
            names_by_type = {}
            for key in json.json_keys():
                os_type = OsType.from_json(key)
                names_list = []
                names_by_type[os_type] = names_list
                for val in json.get_array(key.as_string(None)):
                    names_list.append(val.as_string(max_length=100))
            return OsModuleDependencies(names_by_type)
        
        def to_json(self) -> JsonTypes:
            # Not supported/needed
            raise NotImplementedError()
        
        def update(self, other: "OsModuleDependencies"):
            for type, names in other._names_by_type.items():
                self._names_by_type[type].update(names)
        
        def generate_install_cmd(self, os_type: OsType) -> str:
            names = self._names_by_type[os_type]
            if len(names) == 0:
                return ""
            
            names = list(names)
            # Ensure consistent order between runs
            names.sort()
            
            match os_type:
                case OsType.DEBIAN:
                    return f"RUN apt-get update && apt-get --no-install-recommends install -y {' '.join(names)}\n"
                case OsType.ALPINE:
                    return f"RUN apk update && apk add {' '.join(names)}\n"
                case _:
                    raise AssertionError(f"Unknown os type {os_type}")
    
    
    class ModuleBuildConfig(JsonDataClass):
        target_image_name: str = None
        build_module_dependencies: list[str] = json_field(default_factory=lambda: [])
        supported_os_types: list[OsType] = json_field(default_factory=lambda: [])
        os_type: OsType = None
        os_build_dependencies: OsModuleDependencies = json_field(default_factory=lambda: OsModuleDependencies())
        os_runtime_dependencies: OsModuleDependencies = json_field(default_factory=lambda: OsModuleDependencies())
        pip_requirements_file: str = None
        dockerfile_extra: str = ""
        ci_test_job_template: str = None
    
    
    class Module:
        
        def __init__(self, context: "BuildContext", module_dir: Path):
            super().__init__()
            
            self.context = context
            self.module_dir = module_dir.resolve()
            self.name = str(self.module_dir.relative_to(context.build_dir))
            
            config_json = eval(self.module_dir.joinpath("build_config.py").read_text())
            config = ModuleBuildConfig.from_json(CJsonValue(config_json))
    
            self.target_image_name = config.target_image_name
            
            context.add_module(self)
            
            self.os_type = config.os_type
            self.supported_os_types = config.supported_os_types
            
            if len(self.supported_os_types) == 0:
                self.supported_os_types.append(self.os_type)
            elif self.os_type is not None and self.os_type not in self.supported_os_types:
                raise ValueError("os_type must also be in supported_os_types if that list is specified")
            
            self.dependencies: list[Module] = []
            for dep in config.build_module_dependencies:
                dependency_name = str(module_dir.joinpath(Path(dep)).resolve().relative_to(context.build_dir))
                self.dependencies.append(context.get_or_load_module(dependency_name))
            
            self.pip_req_file = None
            if config.pip_requirements_file is not None:
                self.pip_req_file = self.module_dir.joinpath(config.pip_requirements_file)
                if not self.pip_req_file.is_file():
                    raise ValueError(f"Cannot find pip requirements file {self.pip_req_file}")
            
            self.os_build_dependencies = config.os_build_dependencies
            self.os_runtime_dependencies = config.os_runtime_dependencies
            
            self.dockerfile_extra = config.dockerfile_extra
            self.dockerfile_extra = self.dockerfile_extra.replace(
                "$MODULE_DIR",
                str(self.module_dir.relative_to(context.build_dir))
            )
            self.ci_test_job_template = config.ci_test_job_template
    
            self._post_initialized = False
            self._actual_supported_os_types: set[OsType] = set()
            self._actual_os_type: OsType or None = None
            self._all_pip_requirement_files: list[Path] = []
            self._all_dockerfile_extras: list[str] = []
            self._all_os_build_dependencies = OsModuleDependencies()
            self._all_os_runtime_dependencies = OsModuleDependencies()
        
        def check_cyclic_dependency(self, dependents_stack: list[str]):
            if self.name in dependents_stack:
                raise ValueError(f"Dependency cycle involving {self.name} detected")
            dependents_stack = dependents_stack + [self.name]
            for dependency in self.dependencies:
                dependency.check_cyclic_dependency(dependents_stack)
        
        def post_init(self):
            if self._post_initialized:
                return
            self.check_cyclic_dependency([])
            for dep in self.dependencies:
                dep.post_init()
            try:
                self._init_after_dependency_setup()
            except Exception as e:
                raise Exception(f"While post initializing module {self.name}") from e
            self._post_initialized = True
        
        def _init_after_dependency_setup(self):
            dep_specified_os_types = set()
            self._actual_supported_os_types = set(self.supported_os_types)
            for dep in self.dependencies:
                if self.os_type is not None and self.os_type not in dep._actual_supported_os_types:
                    raise ValueError(f"Specified os type {self.os_type} but this is not supported by dependency {dep.name}")
                self._actual_supported_os_types.intersection_update(dep._actual_supported_os_types)
                if dep._actual_os_type is not None:
                    dep_specified_os_types.add(dep._actual_os_type)
            
            if self.os_type is not None:
                assert self.os_type in self._actual_supported_os_types
                self._actual_os_type = self.os_type
            elif len(dep_specified_os_types) == 1:
                self._actual_os_type = list(dep_specified_os_types)[0]
            
            self._all_os_build_dependencies.update(self.os_build_dependencies)
            self._all_os_runtime_dependencies.update(self.os_runtime_dependencies)
            for dep in self.dependencies:
                self._all_os_build_dependencies.update(dep._all_os_build_dependencies)
                self._all_os_runtime_dependencies.update(dep._all_os_runtime_dependencies)
                self._all_dockerfile_extras.extend(dep._all_dockerfile_extras)
                self._all_pip_requirement_files.extend(dep._all_pip_requirement_files)
            
            if self.pip_req_file is not None:
                self._all_pip_requirement_files.append(self.pip_req_file)
            
            # Remove duplicates and ensure consistent order
            self._all_pip_requirement_files = list(set(self._all_pip_requirement_files))
            self._all_pip_requirement_files.sort(key=lambda f: str(f))
            
            self._all_dockerfile_extras.append(self.dockerfile_extra)
        
        def _get_install_cmds(self, os_type: OsType, apt_deps: list[str], apk_deps: list[str]):
            pass
        
        def generate_dockerfile(self):
            if self._actual_os_type is None:
                raise ValueError(f"Cannot generate Dockerfile for {self.name} because no os type is specified (and module"
                                 f"dependencies don't either or have conflicting types)")
            
            match self._actual_os_type:
                case OsType.DEBIAN:
                    base_image = "python:3.13-slim"
                case OsType.ALPINE:
                    base_image = "python:3.13-alpine"
                case _:
                    raise AssertionError(f"Unknown os type {self._actual_os_type}")
            res = f"""\
    #####################################################################
    ### WARNING: THIS FILE WAS AUTOMATICALLY GENERATED. DO NOT EDIT ! ###
    #####################################################################
    FROM {base_image} AS base
    
    RUN mkdir -p /code
    
    WORKDIR /code
    
    ENV PIP_CACHE_DIR=/tmp/pip-cache
    
    {self._all_os_runtime_dependencies.generate_install_cmd(self._actual_os_type)}
    """
            pip_file_count = len(self._all_pip_requirement_files)
            for path, i in zip(self._all_pip_requirement_files, range(pip_file_count)):
                res += f"COPY {str(path.relative_to(self.context.build_dir))} /tmp/req-{i}.txt\n"
            
            pip_requirement_list_arg = " ".join(f"-r /tmp/req-{i}.txt" for i in range(pip_file_count))
            
            res += f"""
    FROM base AS builder
    
    # This step builds (and installs) the requirements and also puts the build result into the pip cache
    {self._all_os_build_dependencies.generate_install_cmd(self._actual_os_type)}
    RUN pip install {pip_requirement_list_arg}
    
    FROM base AS final
    
    # Here we copy the pip cache with the built packages and install them again. Pip will use the cache and won't need the
    # apt build dependencies. This saves a lot of space, compared to leaving the build dependencies in the final image
    # (reduces the final image size by about half)
    COPY --from=builder /tmp/pip-cache /tmp/pip-cache
    RUN pip install {pip_requirement_list_arg}
    """
            for docker in self._all_dockerfile_extras:
                res += "\n" + docker + "\n"
            
            return res
        
        def generate_ci_jobs(self):
            self.context.ensure_in_ci()
            if self.target_image_name is None:
                raise ValueError("This module has no target image name and therefore cannot be built")
            return (self._generate_ci_build_job()
                    + "\n" + self._generate_ci_test_job()
                    + "\n" + self._generate_ci_deploy_job())
        
        def output_dockerfile_path(self) -> Path:
            return self.context.build_dir.joinpath(".dockerfiles").joinpath(self.target_image_name)
        
        def image_full_name(self):
            return f"{self.context.env_type()}_{self.target_image_name}"
        
        def ci_build_job_name(self):
            return f"build-{self.target_image_name}"
        
        @staticmethod
        def _get_auth_echo():
            return """\
    echo "{\\"auths\\":{\\"$CI_REGISTRY\\":{\\"username\\":\\"$CI_REGISTRY_USER\\",\\"password\\":\\"$CI_REGISTRY_PASSWORD\\"}}}" > /kaniko/.docker/config.json\
    """
    
        def _generate_ci_build_job(self):
            kaniko_args = [
                f"--context=git://git.fsmpi.rwth-aachen.de/videoag/backend.git#{self.context.commit_sha}",
                f"--dockerfile={str(self.output_dockerfile_path().relative_to(self.context.build_dir))}",
                f"--git recurse-submodules=true",
                f"--destination=$CI_REGISTRY_IMAGE/{self.image_full_name()}:{self.context.commit_sha}",
                f"--build-arg=GIT_COMMIT_SHA={self.context.commit_sha}",
                f"--cache=true",
                f"--cache-repo=$CI_REGISTRY_IMAGE/{self.context.env_type()}_cache",
                f"--cache-copy-layers=true",
                f"--cache-run-layers=true",
                f"--verbosity=info",
            ]
            
            if self.context.commit_tag is not None:
                kaniko_args.append(f"--build-arg=GIT_COMMIT_TAG={self.context.commit_tag}")
            
            return f"""
    {self.ci_build_job_name()}:
        stage: build-and-test
        timeout: 3h
        needs:
            - pipeline: $PARENT_PIPELINE_ID
              job: generate-pipeline
        image:
            name: gcr.io/kaniko-project/executor:v1.23.2-debug
            entrypoint: [""]
        script:
            - {self._get_auth_echo()}
            - echo {self.context.commit_sha}
            - >-
                /kaniko/executor
                {"\n            ".join(kaniko_args)}
    """
        
        def _generate_ci_test_job(self):
            if self.ci_test_job_template is None:
                return ""
            assert isinstance(self.ci_test_job_template, str)
            res = f"""
    test-{self.target_image_name}:
        stage: build-and-test
        needs: [{self.ci_build_job_name()}]
        image:
            name: $CI_REGISTRY_IMAGE/{self.image_full_name()}:{self.context.commit_sha}
            entrypoint: [""]
    """
            res += "    " + "\n    ".join(self.ci_test_job_template.splitlines()) + "\n"
            return res
        
        def _generate_ci_deploy_job(self):
            destination_args = [
                f"--destination=$CI_REGISTRY_IMAGE/{self.image_full_name()}:latest"
            ]
            if self.context.is_production:
                destination_args.append(f"--destination=$CI_REGISTRY_IMAGE/{self.image_full_name()}:{self.context.commit_tag}")
            
            return f"""
    deploy-{self.target_image_name}:
        stage: deploy
        timeout: 1h
        image:
            name: gcr.io/kaniko-project/executor:v1.23.2-debug
            entrypoint: [""]
        script:
            - {self._get_auth_echo()}
            - mkdir /workdir
            - echo "FROM $CI_REGISTRY_IMAGE/{self.image_full_name()}:{self.context.commit_sha}" > /workdir/Dockerfile
            - >-
                /kaniko/executor
                --context=dir:///workdir
                {"\n            ".join(destination_args)}
    """
    
    
    class BuildContext:
        
        def __init__(self, build_dir: Path, commit_sha: str or None, commit_tag: str or None):
            super().__init__()
            self.build_dir = build_dir
            self.modules: dict[str, Module] = {}
            self._target_image_names: set[str] = set()
            self.commit_sha = commit_sha or None  # Make empty string to None
            self.commit_tag = commit_tag or None
            self.is_production = commit_tag is not None and re.fullmatch("v.*", commit_tag) is not None
        
        def env_type(self) -> str:
            return "production" if self.is_production else "development"
        
        def ensure_in_ci(self):
            if self.commit_sha is None:
                raise Exception("Not in GitLab CI. No commit sha given")
        
        def add_module(self, module: Module):
            if module.target_image_name is not None:
                if module.target_image_name in self._target_image_names:
                    raise ValueError(f"Duplicate target image name {module.target_image_name}")
                self._target_image_names.add(module.target_image_name)
            self.modules[module.name] = module
        
        def get_or_load_module(self, name: str) -> Module:
            if name in self.modules:
                return self.modules[name]
            module_dir = self.build_dir.joinpath(name)
            try:
                return Module(self, module_dir)
            except Exception as e:
                raise Exception(f"Exception while loading module {module_dir}", e)
        
        def generate_ci_pipeline(self):
            self.ensure_in_ci()
            
            pipeline = """
    ####################################################################
    ##### AUTOMATICALLY GENERATED PIPELINE. DO NOT CHANGE MANUALLY! ####
    ####################################################################
    
    stages:
        - build-and-test
        - deploy
    """
            for module in self.modules.values():
                if module.target_image_name is not None:
                    pipeline += module.generate_ci_jobs()
            
            return pipeline
    
    
    def main():
        parser = ArgumentParser()
        parser.add_argument("--build-dir", type=Path, default=Path("."))
        parser.add_argument("--commit-sha", type=str, required=False)
        parser.add_argument("--commit-tag", type=str, required=False)
        parser.add_argument("--ci-pipeline-dest", type=Path, required=False)
        parser.add_argument("modules", nargs="+", type=Path)
        
        args = parser.parse_args()
        
        context = BuildContext(args.build_dir.resolve(), args.commit_sha, args.commit_tag)
        for module in args.modules:
            context.get_or_load_module(str(module.resolve().relative_to(context.build_dir)))
        
        for module in context.modules.values():
            module.post_init()
        
        for module in context.modules.values():
            if module.target_image_name is None:
                continue
            module.output_dockerfile_path().parent.mkdir(parents=True, exist_ok=True)
            module.output_dockerfile_path().write_text(module.generate_dockerfile())
        
        if args.ci_pipeline_dest is not None:
            args.ci_pipeline_dest.parent.mkdir(parents=True, exist_ok=True)
            args.ci_pipeline_dest.write_text(context.generate_ci_pipeline())
    
    
    if __name__ == "__main__":
        main()