Select Git revision
Forked from
Video AG Infrastruktur / website
Source project has a limited visibility.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ffmpeg_target.py 8.13 KiB
from abc import ABC, abstractmethod
from pathlib import Path
from videoag_common.miscellaneous import *
from videoag_common.database import *
from videoag_common.ffmpeg import *
from .target import TargetProducer, _I
class JGraphContext(ABC):
"""
Helper to build the FFilterGraph from a FFmpegFilterGraphTargetProducer.
"""
@property
@abstractmethod
def data_dir(self) -> Path:
pass
@abstractmethod
def get_tmp_file_path(self, name_hint: str) -> Path:
"""
File ending of name_hint is preserved
"""
pass
@abstractmethod
def get_stream_metadata(self, stream_jid: str) -> FStreamMetadata:
pass
@abstractmethod
def add_stream_metadata(self, stream_jid: str, metadata: FStreamMetadata):
pass
@abstractmethod
def add_input_file(self, file: Path, streams: dict[int, FID], video_framerate: int or None = None):
pass
@abstractmethod
def add_output_file(self, file: Path, streams: list[tuple[FID, dict[str, str | list[str]]]]):
pass
@abstractmethod
def add_to_graph(self, *nodes: IONamedFFilterGraphNode):
pass
class JGraphNode(TypedJsonDataClass[str], ABC):
"""
A node in the JGraph. The JGraph is an abstract layer above the FGraph (FFmpeg Filter Graph) from which the FGraph
can be built. Why J? Because it's the next letter after F which create a nice word 'JGraph'.
In most cases, a JGraphNode corresponds to a single FFilter. But there can be an arbitrary mapping. The only
guarantee is, that this node adds filters to the graph which produce the streams for the outputs (get_output_ids())
and these filters only need streams of the given inputs (get_input_ids()).
For clarity, we prepend 'J' to all objects, IDs, etc. which are on the layer of the JGraph, and 'F' to all things
corresponding to the FFmpeg filter graph.
"""
@abstractmethod
def get_input_ids(self) -> set[str]:
pass
@abstractmethod
def get_output_ids(self) -> set[str]:
pass
def get_target_input_ids(self) -> set[str]:
return set()
def get_target_output_ids(self) -> set[str]:
return set()
def calculate_current_input_hash(self,
session: SessionDb,
lecture: "Lecture",
input_media_by_id: dict[str, "MediumMetadata"]
) -> tuple[str, _I] or None:
return "0" * 64, None
def create_job_data_to_build_filter_graph(self,
session: SessionDb,
lecture: "Lecture",
input_media_by_id: dict[str, "MediumMetadata"],
output_files_by_id: dict[str, "MediumFile"],
intermediate: _I
) -> JsonTypes:
"""
Executed from the media_process_scheduler. Can return some data needed by the job running ffmpeg and building
the graph (create_job_data_to_build_filter_graph())
"""
return {}
@abstractmethod
def build_filter_graph(self, context: JGraphContext, job_data: CJsonValue):
"""
Executed from the job running ffmpeg, and can perform arbitrary work. job_data is the date returned by
create_job_data_to_build_filter_graph()
"""
pass
class FFmpegFilterGraphTargetProducer(TargetProducer):
nodes: list[JGraphNode]
def __post_init__(self):
super().__post_init__()
node_by_output_id: dict[str, JGraphNode] = {}
for node in self.nodes:
for output_id in node.get_output_ids():
if output_id in node_by_output_id:
raise JsonSerializableInitException(f"Duplicate output id {output_id}")
node_by_output_id[output_id] = node
# Ordered by dependencies. E.g. all dependencies of a node come before that node
self._ordered_node_list: list[JGraphNode] = []
ids_used_in_input = set()
def _ensure_node_dependencies_in_list(node: JGraphNode, dependency_stack: list[JGraphNode]):
if node in dependency_stack:
raise JsonSerializableInitException(f"Detected cycle in nodes: {' -> '.join(
str(sorted(n.get_output_ids()))
for n in dependency_stack[dependency_stack.index(node):] + [node]
)}")
if node in self._ordered_node_list:
return
# Sort to ensure consistent order between runs
sorted_input_ids = list(node.get_input_ids())
sorted_input_ids.sort()
# First ensure all dependencies are in list, then add ourselves
for input_id in sorted_input_ids:
if input_id not in node_by_output_id:
raise JsonSerializableInitException(f"Node has input id {input_id} but no node exists which produces"
f" that output")
ids_used_in_input.add(input_id)
_ensure_node_dependencies_in_list(node_by_output_id[input_id], dependency_stack + [node])
self._ordered_node_list.append(node)
# Go through nodes in the order which they are declared for consistency between runs
for node in self.nodes:
_ensure_node_dependencies_in_list(node, [])
unused_output_ids = set(node_by_output_id.keys()) - ids_used_in_input
if len(unused_output_ids) > 0:
raise JsonSerializableInitException(f"Got unused output ids: {unused_output_ids}")
self._target_input_ids = set()
self._target_output_ids = set()
for node in self._ordered_node_list:
self._target_input_ids.update(node.get_target_input_ids())
self._target_output_ids.update(node.get_target_output_ids())
@classmethod
def get_type(cls) -> str:
return "ffmpeg_filter_graph"
def get_target_input_ids(self) -> set[str]:
return self._target_input_ids
def get_target_output_ids(self) -> set[str]:
return self._target_output_ids
def calculate_current_input_hash(self,
session: SessionDb,
lecture: "Lecture",
input_media_by_id: dict[str, "MediumMetadata"]
) -> tuple[str, _I] or None:
hash_concat = ""
intermediate = []
for node in self._ordered_node_list:
node_res = node.calculate_current_input_hash(session, lecture, input_media_by_id)
if node_res is None:
return None
node_sha256, node_intermediate = node_res
hash_concat += node_sha256
intermediate.append(node_intermediate)
return hash_sha256(hash_concat), intermediate
def create_job_data_to_produce_files(self,
session: SessionDb,
lecture: "Lecture",
input_media_by_id: dict[str, "MediumMetadata"],
output_files_by_id: dict[str, "MediumFile"],
intermediate: _I
) -> tuple[str, JsonTypes]:
# A job should be mostly reproducible and not querying it's input parameters from the database. That's why where
# providing all this data here
return "ffmpeg_filter_graph_target", {
"ordered_nodes_and_data": [
{
"node": node.to_json(),
"data": node.create_job_data_to_build_filter_graph(
session, lecture, input_media_by_id, output_files_by_id, node_intermediate
)
}
for node, node_intermediate in zip(self._ordered_node_list, intermediate)
]
}