diff --git a/api/config/db_test_data.sql b/api/config/db_test_data.sql index 1407fb2a989f7a472365e4e6f753b795ed6e3f7c..15992fc91c735f139ede8e1d0adbea477c730eee 100644 --- a/api/config/db_test_data.sql +++ b/api/config/db_test_data.sql @@ -67,7 +67,7 @@ INSERT INTO chapter (id,lecture_id,start_time,"name",deleted,visible) VALUES SELECT setval('chapter_id_seq', 1000); -INSERT INTO medium_file (id,file_path,lecture_id,process_sha256,process_target_id,input_data_sha256,producer_job_id,to_be_replaced,deleted) VALUES +INSERT INTO medium_file (id,file_path,lecture_id,producer_sha256,process_target_id,input_data_sha256,producer_job_id,to_be_replaced,deleted) VALUES (1,'thumbnail/l_3.jpg',3,'0000000000000000000000000000000000000000000000000000000000000000','thumbnail','0000000000000000000000000000000000000000000000000000000000000000',NULL,false,false), (2,'thumbnail/l_25.jpg',25,'0000000000000000000000000000000000000000000000000000000000000000','thumbnail','0000000000000000000000000000000000000000000000000000000000000000',NULL,false,false), (3,'thumbnail/l_186.jpg',186,'0000000000000000000000000000000000000000000000000000000000000000','thumbnail','0000000000000000000000000000000000000000000000000000000000000000',NULL,false,false), diff --git a/common_py/src/videoag_common/media_process/basic_targets.py b/common_py/src/videoag_common/media_process/basic_targets.py index fe103a57dd9739ab4e9e840efcdffce8156c3fbb..be5f11603da5017f7b3b20543cfcc1e9507e976a 100644 --- a/common_py/src/videoag_common/media_process/basic_targets.py +++ b/common_py/src/videoag_common/media_process/basic_targets.py @@ -34,7 +34,6 @@ class SourceFileTargetProducer(SingleOutputTargetProducer["SourceMedium"]): def calculate_current_input_hash(self, session: SessionDb, lecture: "Lecture", - process_sha256: str, input_media_by_id: dict[str, "MediumMetadata"]) -> tuple[str, "MediumMetadata"] or None: from videoag_common.objects import SorterFile sorter_file = session.scalar( diff --git a/common_py/src/videoag_common/media_process/ffmpeg_target.py b/common_py/src/videoag_common/media_process/ffmpeg_target.py index 74d4b9b299535dc988e49a7224dafad0ba85b811..43fe33a157f0faa14d87fe41c013c4c0aa76ceb9 100644 --- a/common_py/src/videoag_common/media_process/ffmpeg_target.py +++ b/common_py/src/videoag_common/media_process/ffmpeg_target.py @@ -75,7 +75,6 @@ class JGraphNode(TypedJsonDataClass[str], ABC): def calculate_current_input_hash(self, session: SessionDb, lecture: "Lecture", - process_sha256: str, input_media_by_id: dict[str, "MediumMetadata"] ) -> tuple[str, _I] or None: return "0" * 64, None @@ -106,6 +105,7 @@ class FFmpegFilterGraphTargetProducer(TargetProducer): nodes: list[JGraphNode] def __post_init__(self): + super().__post_init__() node_by_output_id: dict[str, JGraphNode] = {} for node in self.nodes: for output_id in node.get_output_ids(): @@ -114,7 +114,7 @@ class FFmpegFilterGraphTargetProducer(TargetProducer): node_by_output_id[output_id] = node # Ordered by dependencies. E.g. all dependencies of a node come before that node - self._ordered_node_list = [] + self._ordered_node_list: list[JGraphNode] = [] ids_used_in_input = set() def _ensure_node_dependencies_in_list(node: JGraphNode, dependency_stack: list[JGraphNode]): @@ -165,13 +165,12 @@ class FFmpegFilterGraphTargetProducer(TargetProducer): def calculate_current_input_hash(self, session: SessionDb, lecture: "Lecture", - process_sha256: str, input_media_by_id: dict[str, "MediumMetadata"] ) -> tuple[str, _I] or None: hash_concat = "" intermediate = [] for node in self._ordered_node_list: - node_res = node.calculate_current_input_hash(session, lecture, process_sha256, input_media_by_id) + node_res = node.calculate_current_input_hash(session, lecture, input_media_by_id) if node_res is None: return None node_sha256, node_intermediate = node_res diff --git a/common_py/src/videoag_common/media_process/jnode/file_jnode.py b/common_py/src/videoag_common/media_process/jnode/file_jnode.py index b59f7648c7a52acebaa0c2e09aea91970d3d1759..4771a2e918681329aa80cfe3a73246f0536d51ce 100644 --- a/common_py/src/videoag_common/media_process/jnode/file_jnode.py +++ b/common_py/src/videoag_common/media_process/jnode/file_jnode.py @@ -52,7 +52,6 @@ class InputFileJNode(JGraphNode): def calculate_current_input_hash(self, session: SessionDb, lecture: "Lecture", - process_sha256: str, input_media_by_id: dict[str, "MediumMetadata"] ) -> tuple[str, _I] or None: from videoag_common.objects import MediumMetadata diff --git a/common_py/src/videoag_common/media_process/jnode/video_slide_jnode.py b/common_py/src/videoag_common/media_process/jnode/video_slide_jnode.py index 3d8370e6c3dbf4901615fdafb09a373bf9aec4aa..c345005271cdf4651aac948b1180d2807ec41ba4 100644 --- a/common_py/src/videoag_common/media_process/jnode/video_slide_jnode.py +++ b/common_py/src/videoag_common/media_process/jnode/video_slide_jnode.py @@ -152,7 +152,6 @@ class VideoSlideJGraphNode(EqualInOutNodeJGraphNode): def calculate_current_input_hash(self, session: SessionDb, lecture: "Lecture", - process_sha256: str, input_media_by_id: dict[str, "MediumMetadata"] ) -> tuple[str, dict[str, str]] or None: lecture_text_vars = get_lecture_text_vars(lecture) diff --git a/common_py/src/videoag_common/media_process/target.py b/common_py/src/videoag_common/media_process/target.py index 9352279d9fab11cdd8214cab5efebcf1f08d90b3..ce150e81d8699e079e8676d5e086cbc4a9df8d7d 100644 --- a/common_py/src/videoag_common/media_process/target.py +++ b/common_py/src/videoag_common/media_process/target.py @@ -17,9 +17,8 @@ _I = TypeVar("_I") class TargetProducer(TypedJsonDataClass[str], Generic[_I]): - def __init__(self, **kwargs): - if len(kwargs) > 0: - raise ValueError(f"Got unknown kwargs: {kwargs.keys()}") + def __post_init__(self, **kwargs): + self._hash: str or None = None def __str__(self): return f"{type(self).__name__}[output ids: {self.get_target_output_ids()}]" @@ -27,6 +26,26 @@ class TargetProducer(TypedJsonDataClass[str], Generic[_I]): def __hash__(self): return hash(self.get_target_output_ids()) + def sha256(self) -> str: + if self._hash is None: + self._hash = self._calculate_sha256() + return self._hash + + def _calculate_sha256(self) -> str: + return hash_sha256( + hash_json_sha256(self.to_json()) + + hash_json_sha256(type(self).get_version()) + ) + + @classmethod + def get_version(cls) -> int: + """ + Value to identify the current code version. This is included in the hash to identify changes in the producer. It + should be increased when there have been significant code changes and the producer should be run again even if + the json arguments and input dependencies have not changed. + """ + return 0 + @classmethod @abstractmethod def get_type(cls) -> str: @@ -44,7 +63,6 @@ class TargetProducer(TypedJsonDataClass[str], Generic[_I]): def calculate_current_input_hash(self, session: SessionDb, lecture: "Lecture", - process_sha256: str, input_media_by_id: dict[str, "MediumMetadata"]) -> tuple[str, _I] or None: """ This is intended to be only called from the process scheduler job. Note that this method is executed inside a @@ -97,7 +115,6 @@ class SingleInputTargetProducer(TargetProducer[None], ABC): def calculate_current_input_hash(self, session: SessionDb, lecture: "Lecture", - process_sha256: str, input_media_by_id: dict[str, "MediumMetadata"]) -> tuple[str, _I] or None: return input_media_by_id[self.input_id].file.input_data_sha256, None diff --git a/common_py/src/videoag_common/objects/medium.py b/common_py/src/videoag_common/objects/medium.py index 9fbabc0b36facddb17d73c9556770d135137b46d..ffc6a64f30c7a5f42dd13f6f4c7c881763657869 100644 --- a/common_py/src/videoag_common/objects/medium.py +++ b/common_py/src/videoag_common/objects/medium.py @@ -174,7 +174,7 @@ class MediumFile(DeletableApiObject, Base): # Note that these four may NOT be unique. E.g. if one output is generated twice (because another output of the same # producer was deleted, etc.) lecture_id: Mapped[int] = mapped_column(ForeignKey("lecture.id"), nullable=False, index=True) - process_sha256: Mapped[str] = api_mapped( + producer_sha256: Mapped[str] = api_mapped( mapped_column(String(length=64, collation=STRING_COLLATION), nullable=False), ApiStringField( include_in_data=True, diff --git a/common_py/src/videoag_common/test/object_data.py b/common_py/src/videoag_common/test/object_data.py index 75ac52015e7f67961c3ff72d19cafb10d79b81bd..9830e9f81a4199a1714355bee2e658ddce946a01 100644 --- a/common_py/src/videoag_common/test/object_data.py +++ b/common_py/src/videoag_common/test/object_data.py @@ -70,7 +70,7 @@ TEST_DATA_MEDIUM_FILE_1 = \ "id": 1, "file_path": "thumbnail/l_3.jpg", "lecture": 3, - "process_sha256": "0000000000000000000000000000000000000000000000000000000000000000", + "producer_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "process_target_id": "thumbnail", "input_data_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "producer_job_id": None, @@ -81,7 +81,7 @@ TEST_DATA_MEDIUM_FILE_2 = \ "id": 2, "file_path": "thumbnail/l_25.jpg", "lecture": 25, - "process_sha256": "0000000000000000000000000000000000000000000000000000000000000000", + "producer_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "process_target_id": "thumbnail", "input_data_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "producer_job_id": None, @@ -92,7 +92,7 @@ TEST_DATA_MEDIUM_FILE_3 = \ "id": 3, "file_path": "thumbnail/l_186.jpg", "lecture": 186, - "process_sha256": "0000000000000000000000000000000000000000000000000000000000000000", + "producer_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "process_target_id": "thumbnail", "input_data_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "producer_job_id": None, @@ -103,7 +103,7 @@ TEST_DATA_MEDIUM_FILE_4 = \ "id": 4, "file_path": "thumbnail/l_1186.jpg", "lecture": 1186, - "process_sha256": "0000000000000000000000000000000000000000000000000000000000000000", + "producer_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "process_target_id": "thumbnail", "input_data_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "producer_job_id": None, @@ -114,7 +114,7 @@ TEST_DATA_MEDIUM_FILE_5 = \ "id": 5, "file_path": "pub/09ss-fosap/09ss-fosap-090421.mp4", "lecture": 186, - "process_sha256": "0000000000000000000000000000000000000000000000000000000000000000", + "producer_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "process_target_id": "video_1080", "input_data_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "producer_job_id": None, @@ -125,7 +125,7 @@ TEST_DATA_MEDIUM_FILE_6 = \ "id": 6, "file_path": "vpnonline/07ws-buk/07ws-buk-071026.mp4", "lecture": 3, - "process_sha256": "0000000000000000000000000000000000000000000000000000000000000000", + "producer_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "process_target_id": "video", "input_data_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "producer_job_id": None, @@ -136,7 +136,7 @@ TEST_DATA_MEDIUM_FILE_7 = \ "id": 7, "file_path": "pub/07ws-diskrete/07ws-diskrete-071211.mp4", "lecture": 25, - "process_sha256": "0000000000000000000000000000000000000000000000000000000000000000", + "producer_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "process_target_id": "video", "input_data_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "producer_job_id": None, @@ -147,7 +147,7 @@ TEST_DATA_MEDIUM_FILE_8 = \ "id": 8, "file_path": "vpnonline/11ws-infin/11ws-infin-111010.mp4", "lecture": 1186, - "process_sha256": "0000000000000000000000000000000000000000000000000000000000000000", + "producer_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "process_target_id": "video", "input_data_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "producer_job_id": None, @@ -158,7 +158,7 @@ TEST_DATA_MEDIUM_FILE_9 = \ "id": 9, "file_path": "pub/09ss-fosap/09ss-fosap-090421-720p.mp4", "lecture": 186, - "process_sha256": "0000000000000000000000000000000000000000000000000000000000000000", + "producer_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "process_target_id": "video_720", "input_data_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "producer_job_id": None, @@ -169,7 +169,7 @@ TEST_DATA_MEDIUM_FILE_10 = \ "id": 10, "file_path": "pub/09ss-fosap/09ss-fosap-090421-480p.mp4", "lecture": 186, - "process_sha256": "0000000000000000000000000000000000000000000000000000000000000000", + "producer_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "process_target_id": "video_480", "input_data_sha256": "0000000000000000000000000000000000000000000000000000000000000000", "producer_job_id": None, diff --git a/job_controller/jobs/media_process_scheduler/job.py b/job_controller/jobs/media_process_scheduler/job.py index 012fc4615e44e8d50ad3d24e8ef380761f833681..3509ed76f8dea961c4448e7acd81cbcb6a9357a3 100644 --- a/job_controller/jobs/media_process_scheduler/job.py +++ b/job_controller/jobs/media_process_scheduler/job.py @@ -68,12 +68,16 @@ class ProcessScheduler: assert isinstance(file, MediumFile) if file.to_be_replaced: continue - if file.process_sha256 != self._process.sha256(): - # Mark for deletion if it's from an old process + producer = self._process.producer_by_output_id.get(file.process_target_id) + if producer is None: file.to_be_replaced = True - logger.info( - f"Discarding medium file {file.process_target_id} ({file.id}) because it is from a different " - f"process: {file.process_sha256}") + logger.info(f"Discarding medium file {file.process_target_id} ({file.id}) because the current process" + f" doesn't have this target") + continue + if file.producer_sha256 != producer.sha256(): + file.to_be_replaced = True + logger.info(f"Discarding medium file {file.process_target_id} ({file.id}) because its current producer" + f" has hash {producer.sha256()} while the file was produced by a producer with {file.producer_sha256}") continue self._media_files[file.process_target_id] = file @@ -295,7 +299,7 @@ class ProcessScheduler: return try: - hash_tuple = producer.calculate_current_input_hash(self._session, self._lecture, self._process.sha256(), self._produced_media) + hash_tuple = producer.calculate_current_input_hash(self._session, self._lecture, self._produced_media) except MediaProcessException as e: self._process_errors.append(f"In producer for {producer.get_target_output_ids()}: {e}") logger.info(f"MediaProcessException for producer {producer}:", exc_info=e) @@ -335,7 +339,7 @@ class ProcessScheduler: for output_id in producer.get_target_output_ids(): output_file = MediumFile( lecture=self._lecture, - process_sha256=self._process.sha256(), + producer_sha256=producer.sha256(), process_target_id=output_id, input_data_sha256=input_hash, )