Skip to content
Snippets Groups Projects
Select Git revision
  • ca71e995423414eeacb0d7fb95dc509fd56869fe
  • master default protected
2 results

edit.py

Blame
  • Forked from Video AG Infrastruktur / website
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    ffmpeg_target.py 8.13 KiB
    from abc import ABC, abstractmethod
    from pathlib import Path
    
    from videoag_common.miscellaneous import *
    from videoag_common.database import *
    from videoag_common.ffmpeg import *
    
    from .target import TargetProducer, _I
    
    
    class JGraphContext(ABC):
        """
        Helper to build the FFilterGraph from a FFmpegFilterGraphTargetProducer.
        """
        
        @property
        @abstractmethod
        def data_dir(self) -> Path:
            pass
        
        @abstractmethod
        def get_tmp_file_path(self, name_hint: str) -> Path:
            """
            File ending of name_hint is preserved
            """
            pass
    
        @abstractmethod
        def get_stream_metadata(self, stream_jid: str) -> FStreamMetadata:
            pass
    
        @abstractmethod
        def add_stream_metadata(self, stream_jid: str, metadata: FStreamMetadata):
            pass
    
        @abstractmethod
        def add_input_file(self, file: Path, streams: dict[int, FID], video_framerate: int or None = None):
            pass
        
        @abstractmethod
        def add_output_file(self, file: Path, streams: list[tuple[FID, dict[str, str | list[str]]]]):
            pass
        
        @abstractmethod
        def add_to_graph(self, *nodes: IONamedFFilterGraphNode):
            pass
    
    
    class JGraphNode(TypedJsonDataClass[str], ABC):
        """
        A node in the JGraph. The JGraph is an abstract layer above the FGraph (FFmpeg Filter Graph) from which the FGraph
        can be built. Why J? Because it's the next letter after F which create a nice word 'JGraph'.
        In most cases, a JGraphNode corresponds to a single FFilter. But there can be an arbitrary mapping. The only
        guarantee is, that this node adds filters to the graph which produce the streams for the outputs (get_output_ids())
        and these filters only need streams of the given inputs (get_input_ids()).
        
        For clarity, we prepend 'J' to all objects, IDs, etc. which are on the layer of the JGraph, and 'F' to all things
        corresponding to the FFmpeg filter graph.
        """
        
        @abstractmethod
        def get_input_ids(self) -> set[str]:
            pass
        
        @abstractmethod
        def get_output_ids(self) -> set[str]:
            pass
        
        def get_target_input_ids(self) -> set[str]:
            return set()
        
        def get_target_output_ids(self) -> set[str]:
            return set()
        
        def calculate_current_input_hash(self,
                                         session: SessionDb,
                                         lecture: "Lecture",
                                         input_media_by_id: dict[str, "MediumMetadata"]
                                         ) -> tuple[str, _I] or None:
            return "0" * 64, None
        
        def create_job_data_to_build_filter_graph(self,
                                                  session: SessionDb,
                                                  lecture: "Lecture",
                                                  input_media_by_id: dict[str, "MediumMetadata"],
                                                  output_files_by_id: dict[str, "MediumFile"],
                                                  intermediate: _I
                                                  ) -> JsonTypes:
            """
            Executed from the media_process_scheduler. Can return some data needed by the job running ffmpeg and building
            the graph (create_job_data_to_build_filter_graph())
            """
            return {}
        
        @abstractmethod
        def build_filter_graph(self, context: JGraphContext, job_data: CJsonValue):
            """
            Executed from the job running ffmpeg, and can perform arbitrary work. job_data is the date returned by
            create_job_data_to_build_filter_graph()
            """
            pass
    
    
    class FFmpegFilterGraphTargetProducer(TargetProducer):
        nodes: list[JGraphNode]
        
        def __post_init__(self):
            super().__post_init__()
            node_by_output_id: dict[str, JGraphNode] = {}
            for node in self.nodes:
                for output_id in node.get_output_ids():
                    if output_id in node_by_output_id:
                        raise JsonSerializableInitException(f"Duplicate output id {output_id}")
                    node_by_output_id[output_id] = node
            
            # Ordered by dependencies. E.g. all dependencies of a node come before that node
            self._ordered_node_list: list[JGraphNode] = []
            ids_used_in_input = set()
            
            def _ensure_node_dependencies_in_list(node: JGraphNode, dependency_stack: list[JGraphNode]):
                if node in dependency_stack:
                    raise JsonSerializableInitException(f"Detected cycle in nodes: {' -> '.join(
                        str(sorted(n.get_output_ids()))
                        for n in dependency_stack[dependency_stack.index(node):] + [node]
                    )}")
                if node in self._ordered_node_list:
                    return
                # Sort to ensure consistent order between runs
                sorted_input_ids = list(node.get_input_ids())
                sorted_input_ids.sort()
                # First ensure all dependencies are in list, then add ourselves
                for input_id in sorted_input_ids:
                    if input_id not in node_by_output_id:
                        raise JsonSerializableInitException(f"Node has input id {input_id} but no node exists which produces"
                                                            f" that output")
                    ids_used_in_input.add(input_id)
                    _ensure_node_dependencies_in_list(node_by_output_id[input_id], dependency_stack + [node])
                self._ordered_node_list.append(node)
            
            # Go through nodes in the order which they are declared for consistency between runs
            for node in self.nodes:
                _ensure_node_dependencies_in_list(node, [])
            
            unused_output_ids = set(node_by_output_id.keys()) - ids_used_in_input
            if len(unused_output_ids) > 0:
                raise JsonSerializableInitException(f"Got unused output ids: {unused_output_ids}")
            
            self._target_input_ids = set()
            self._target_output_ids = set()
    
            for node in self._ordered_node_list:
                self._target_input_ids.update(node.get_target_input_ids())
                self._target_output_ids.update(node.get_target_output_ids())
        
        @classmethod
        def get_type(cls) -> str:
            return "ffmpeg_filter_graph"
        
        def get_target_input_ids(self) -> set[str]:
            return self._target_input_ids
        
        def get_target_output_ids(self) -> set[str]:
            return self._target_output_ids
        
        def calculate_current_input_hash(self,
                                         session: SessionDb,
                                         lecture: "Lecture",
                                         input_media_by_id: dict[str, "MediumMetadata"]
                                         ) -> tuple[str, _I] or None:
            hash_concat = ""
            intermediate = []
            for node in self._ordered_node_list:
                node_res = node.calculate_current_input_hash(session, lecture, input_media_by_id)
                if node_res is None:
                    return None
                node_sha256, node_intermediate = node_res
                hash_concat += node_sha256
                intermediate.append(node_intermediate)
            
            return hash_sha256(hash_concat), intermediate
        
        def create_job_data_to_produce_files(self,
                                             session: SessionDb,
                                             lecture: "Lecture",
                                             input_media_by_id: dict[str, "MediumMetadata"],
                                             output_files_by_id: dict[str, "MediumFile"],
                                             intermediate: _I
                                             ) -> tuple[str, JsonTypes]:
            # A job should be mostly reproducible and not querying it's input parameters from the database. That's why where
            # providing all this data here
            return "ffmpeg_filter_graph_target", {
                "ordered_nodes_and_data": [
                    {
                        "node": node.to_json(),
                        "data": node.create_job_data_to_build_filter_graph(
                            session, lecture, input_media_by_id, output_files_by_id, node_intermediate
                        )
                    }
                    for node, node_intermediate in zip(self._ordered_node_list, intermediate)
                ]
            }