Skip to content
Snippets Groups Projects
Commit 801a38ff authored by Simon Künzel's avatar Simon Künzel
Browse files

Add hash calculation for source file producer

parent 4d05105a
Branches
No related tags found
No related merge requests found
...@@ -10,7 +10,7 @@ from .target import TargetProducer, SingleInputTargetProducer, SingleOutputTarge ...@@ -10,7 +10,7 @@ from .target import TargetProducer, SingleInputTargetProducer, SingleOutputTarge
SOURCE_FILE_TAG_PATTERN = re.compile("[a-zA-Z0-9_]*") SOURCE_FILE_TAG_PATTERN = re.compile("[a-zA-Z0-9_]*")
class SourceFileTargetProducer(SingleOutputTargetProducer): class SourceFileTargetProducer(SingleOutputTargetProducer["SourceMedium"]):
def __init__(self, tag: str, **kwargs): def __init__(self, tag: str, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
...@@ -36,18 +36,10 @@ class SourceFileTargetProducer(SingleOutputTargetProducer): ...@@ -36,18 +36,10 @@ class SourceFileTargetProducer(SingleOutputTargetProducer):
# Can't import Lecture, TargetMedium due to circular dependency # Can't import Lecture, TargetMedium due to circular dependency
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
def calculate_current_input_hash(self, lecture: "Lecture", input_media_by_id: dict[str, "TargetMedium"]) -> str: def calculate_current_input_hash(self,
return "0" * 64
# Can't import Lecture, TargetMedium due to circular dependency
# noinspection PyUnresolvedReferences
def create_new_target_media_and_job_data(
self,
session: SessionDb, session: SessionDb,
lecture: "Lecture", lecture: "Lecture",
input_media_by_id: dict[str, "TargetMedium"], input_media_by_id: dict[str, "TargetMedium"]) -> [str, "SourceMedium"]:
common_new_medium_kwargs: dict[str, Any]
) -> tuple[list["TargetMedium"], tuple[str, JsonTypes] or None] or None:
from videoag_common.objects import SourceMedium from videoag_common.objects import SourceMedium
source_medium = session.scalar( source_medium = session.scalar(
SourceMedium.basic_select().where( SourceMedium.basic_select().where(
...@@ -58,6 +50,21 @@ class SourceFileTargetProducer(SingleOutputTargetProducer): ...@@ -58,6 +50,21 @@ class SourceFileTargetProducer(SingleOutputTargetProducer):
.order_by(SourceMedium.update_time.desc()) .order_by(SourceMedium.update_time.desc())
.limit(1) .limit(1)
) )
if source_medium is None:
return "0" * 64, None
return source_medium.sha256, source_medium
# Can't import Lecture, TargetMedium due to circular dependency
# noinspection PyUnresolvedReferences
def create_new_target_media_and_job_data(
self,
session: SessionDb,
lecture: "Lecture",
input_media_by_id: dict[str, "TargetMedium"],
common_new_medium_kwargs: dict[str, Any],
source_medium: "SourceMedium",
) -> tuple[list["TargetMedium"], tuple[str, JsonTypes] or None] or None:
from videoag_common.objects import SourceMedium, TargetMedium, TargetMediumType
if source_medium is None: if source_medium is None:
return None return None
if source_medium.file_metadata is None: if source_medium.file_metadata is None:
...@@ -71,9 +78,11 @@ class SourceFileTargetProducer(SingleOutputTargetProducer): ...@@ -71,9 +78,11 @@ class SourceFileTargetProducer(SingleOutputTargetProducer):
"file_path": source_medium.file_path, "file_path": source_medium.file_path,
"is_produced": True, "is_produced": True,
} }
medium = TargetMedium(**kwargs) medium_type = TargetMediumType(kwargs["type"])
kwargs.pop("type")
target_medium = orm.class_mapper(TargetMedium).polymorphic_map[medium_type].class_(**kwargs)
return medium, None return [target_medium], None
class DownscaleVideoTarget(SingleInputTargetProducer): class DownscaleVideoTarget(SingleInputTargetProducer):
...@@ -105,7 +114,8 @@ class DownscaleVideoTarget(SingleInputTargetProducer): ...@@ -105,7 +114,8 @@ class DownscaleVideoTarget(SingleInputTargetProducer):
session: SessionDb, session: SessionDb,
lecture: "Lecture", lecture: "Lecture",
input_media_by_id: dict[str, "TargetMedium"], input_media_by_id: dict[str, "TargetMedium"],
common_new_medium_kwargs: dict[str, Any] common_new_medium_kwargs: dict[str, Any],
intermediate: None,
) -> tuple[list["TargetMedium"], tuple[str, JsonTypes] or None] or None: ) -> tuple[list["TargetMedium"], tuple[str, JsonTypes] or None] or None:
# TODO # TODO
pass pass
......
import re import re
from abc import abstractmethod, ABC from abc import abstractmethod, ABC
from typing import Any from typing import Any, TypeVar, Generic
from videoag_common.miscellaneous import * from videoag_common.miscellaneous import *
from videoag_common.database import * from videoag_common.database import *
...@@ -8,8 +8,10 @@ from videoag_common.database import * ...@@ -8,8 +8,10 @@ from videoag_common.database import *
TARGET_ID_PATTERN = re.compile("[a-zA-Z0-9_]{1,100}") TARGET_ID_PATTERN = re.compile("[a-zA-Z0-9_]{1,100}")
TARGET_ID_MAX_LENGTH = 100 TARGET_ID_MAX_LENGTH = 100
_I = TypeVar("_I")
class TargetProducer:
class TargetProducer(Generic[_I]):
def __init__(self, **kwargs): def __init__(self, **kwargs):
if len(kwargs) > 0: if len(kwargs) > 0:
...@@ -49,7 +51,10 @@ class TargetProducer: ...@@ -49,7 +51,10 @@ class TargetProducer:
# Can't import Lecture, TargetMedium due to circular dependency # Can't import Lecture, TargetMedium due to circular dependency
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
@abstractmethod @abstractmethod
def calculate_current_input_hash(self, lecture: "Lecture", input_media_by_id: dict[str, "TargetMedium"]) -> str: def calculate_current_input_hash(self,
session: SessionDb,
lecture: "Lecture",
input_media_by_id: dict[str, "TargetMedium"]) -> tuple[str, _I]:
""" """
This is intended to be only called from the process scheduler job This is intended to be only called from the process scheduler job
...@@ -64,7 +69,8 @@ class TargetProducer: ...@@ -64,7 +69,8 @@ class TargetProducer:
session: SessionDb, session: SessionDb,
lecture: "Lecture", lecture: "Lecture",
input_media_by_id: dict[str, "TargetMedium"], input_media_by_id: dict[str, "TargetMedium"],
common_new_medium_kwargs: dict[str, Any] common_new_medium_kwargs: dict[str, Any],
intermediate: _I,
) -> tuple[list["TargetMedium"], tuple[str, JsonTypes] or None] or None: ) -> tuple[list["TargetMedium"], tuple[str, JsonTypes] or None] or None:
""" """
This is intended to be only called from the process scheduler job. This is intended to be only called from the process scheduler job.
...@@ -85,11 +91,12 @@ class TargetProducer: ...@@ -85,11 +91,12 @@ class TargetProducer:
:param input_media_by_id: Contains all the media this producer depends on :param input_media_by_id: Contains all the media this producer depends on
:param common_new_medium_kwargs: Contains all the values required by all target media except the :param common_new_medium_kwargs: Contains all the values required by all target media except the
process_target_id (e.g. lecture id, process sha, etc.) process_target_id (e.g. lecture id, process sha, etc.)
:param intermediate: The value returned by calculate_current_input_hash. May be used to prevent duplicate code
""" """
pass pass
class SingleInputTargetProducer(TargetProducer, ABC): class SingleInputTargetProducer(TargetProducer[None], ABC):
def __init__(self, source_id: str, **kwargs): def __init__(self, source_id: str, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
...@@ -112,11 +119,14 @@ class SingleInputTargetProducer(TargetProducer, ABC): ...@@ -112,11 +119,14 @@ class SingleInputTargetProducer(TargetProducer, ABC):
# Can't import Lecture, TargetMedium due to circular dependency # Can't import Lecture, TargetMedium due to circular dependency
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
def calculate_current_input_hash(self, lecture: "Lecture", input_media_by_id: dict[str, "TargetMedium"]) -> str: def calculate_current_input_hash(self,
return input_media_by_id[self.source_id].input_data_sha256 session: SessionDb,
lecture: "Lecture",
input_media_by_id: dict[str, "TargetMedium"]) -> [str, None]:
return input_media_by_id[self.source_id].input_data_sha256, None
class SingleOutputTargetProducer(TargetProducer, ABC): class SingleOutputTargetProducer(TargetProducer[_I], Generic[_I], ABC):
def __init__(self, output_id: str, **kwargs): def __init__(self, output_id: str, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment