Skip to content
Snippets Groups Projects
Commit cfe2643d authored by Simon Künzel's avatar Simon Künzel
Browse files

Fix bugs in media_process_scheduler

parent a49fb216
Branches
No related tags found
No related merge requests found
import logging
import random
import string
from pathlib import Path
from videoag_common.database import *
......@@ -28,7 +30,7 @@ class ProcessScheduler:
self._produced_media: dict[str, MediumMetadata] = {}
self._newly_produced_media: list[MediumMetadata] = []
self._session = None
self._session: SessionDb = None
self._lecture = None
self._process = None
......@@ -39,6 +41,7 @@ class ProcessScheduler:
Lecture.course,
Lecture.media_files,
MediumFile.medium_metadata,
MediumFile.producer_job,
MediumMetadata.publish_medium,
])
.where(Lecture.id == self._lecture_id)
......@@ -69,8 +72,8 @@ class ProcessScheduler:
continue
self._media_files[file.process_target_id] = file
print("Relevant Media files:")
print(self._media_files)
logger.info("Relevant Media files:")
logger.info(self._media_files)
# Check and schedule jobs
for producer in self._process.producers_in_order:
......@@ -122,25 +125,28 @@ class ProcessScheduler:
probe_context = MediumProbeContext(_DATA_DIR, file)
errors_by_type = {}
for type in MediumMetadataType:
type_mapper = MediumMetadata.__mapper__.polymorphic_map.get(type, None)
for medium_type in MediumMetadataType:
type_mapper = MediumMetadata.__mapper__.polymorphic_map.get(medium_type, None)
if type_mapper is None:
continue
res = type_mapper.class_.try_create_for_file(probe_context)
if res is None:
if res is None or isinstance(res, str):
# Also add to dict if no error, so it can be seen which types were tried
errors_by_type[medium_type] = res
continue
if isinstance(res, str):
errors_by_type[type] = res
continue
assert isinstance(file.medium_metadata, type_mapper.class_)
assert file.medium_metadata == res
logger.info(f"Assigned metadata {file.medium_metadata.id} of type {type} to medium file {file.file_path}"
if not isinstance(res, type_mapper.class_):
raise ValueError(f"Got wrong metadata of type {type(res)} from try_create_for_file"
f"for {medium_type}")
file.medium_metadata = res
assert isinstance(file.medium_metadata, MediumMetadata)
logger.info(f"Assigned metadata {file.medium_metadata.id} of type {medium_type} to medium file {file.file_path}"
f" ({file.process_target_id}, {file.id})")
return True
err = (f"Could not get metadata for medium file {file.file_path} ({file.process_target_id}, {file.id})."
f" Generated Errors by type:\n{errors_by_type}")
err = (f"Could not get metadata for medium file {file.file_path} ({file.process_target_id}, {file.id}). Probe"
f" results by type (None means the type is not applicable and it was not attempted to create metadata):"
f"\n{errors_by_type}")
logger.info(err)
self._process_errors.append(err)
return False
......@@ -170,7 +176,7 @@ class ProcessScheduler:
f"{output_id} ({output_file.id}) has no associated producer job")
return True
if output_file.producer_job.status == JobState.FINISHED_AND_PROCESSED:
if output_file.producer_job.status == JobState.FINISHED:
logger.info(f"Job for producer of {producer.get_target_output_ids()} finished")
if not self._try_create_metadata_for_file(output_file):
output_file.to_be_replaced = True
......@@ -183,13 +189,13 @@ class ProcessScheduler:
assert isinstance(output_file.medium_metadata, MediumMetadata)
self._newly_produced_media.append(output_file.medium_metadata)
return False
elif output_file.producer_job.status == JobState.FAILED_AND_PROCESSED:
elif output_file.producer_job.status == JobState.FAILED:
output_file.to_be_replaced = True
logger.info(f"Job for producer of {producer.get_target_output_ids()} required because producer job"
f"{output_file.producer_job.id} for output {output_id} ({output_file.id}) has failed")
# It might look that this ends in an infinite loop of failed jobs, but a failed job should not trigger
# the process scheduler to be executed. Only if some other condition changed (which maybe causes the job
# to not fail next time)
f" {output_file.producer_job.id} for output {output_id} ({output_file.id}) has failed."
f" While it looks like this might result in an infinite loop, a failed job should not trigger the"
f" process scheduler to be executed. Only if some other condition changed (which maybe causes the"
f" job to not fail next time)")
return True
logger.info(
......@@ -247,13 +253,19 @@ class ProcessScheduler:
input_data_sha256=input_hash,
)
new_output_files[output_id] = output_file
output_file.file_path = output_file.get_default_file_path_no_ending()
# The file path cannot be null (which is good) but we need the objects id to determine the file path
# So we provide an invalid path for now. This should never pop up anywhere since we change it to the
# correct one, in the same transaction below.
# Note that deferred constraints do not work for NOT NULL in postgres
output_file.file_path = f'invalid-path/lecture/{self._lecture_id}/'.join(random.choices(string.ascii_letters, k=64))
self._media_files[output_id] = output_file
assert isinstance(self._session, SessionDb)
self._session.add_all(new_output_files.values())
self._session.flush() # Ensure ids for new files are available
for output_file in new_output_files.values():
output_file.file_path = output_file.get_default_file_path_no_ending()
job_type, job_input_data = producer.create_job_data_to_produce_files(
self._session,
self._lecture,
......@@ -283,7 +295,7 @@ class ProcessScheduler:
def execute(database, own_job_id, input_data: CJsonObject):
return database.execute_write_transaction(
return database.execute_write_transaction_and_commit(
ProcessScheduler(
own_job_id,
input_data.get_sint32("lecture_id"),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment