diff --git a/api/src/api/routes/job.py b/api/src/api/routes/job.py index 80ec6bd6ad50be6f6baa40b2e945146d1ff36b74..778b25da884eaba9e923eb5562d8be2e95c0bae2 100644 --- a/api/src/api/routes/job.py +++ b/api/src/api/routes/job.py @@ -11,7 +11,7 @@ from ..routes import * ("entries_per_page", "?int", "Must be between 10 and 100"), ("page", "?int", "Zero-indexed. Must not be negative. If this is bigger than the page count, an empty " "page is returned"), - ("status", "?string", f"Only return jobs with this status. Must be one of " + ("state", "?string", f"Only return jobs with this state. Must be one of " f"{list(map(lambda m: m.value, JobState.__members__.values()))}"), ("type", "?string", f"Only return jobs of this type. May be any string"), ], @@ -25,17 +25,17 @@ def api_route_get_jobs(): entries_per_page = api_request_get_query_int("entries_per_page", 20, 10, 50) page_num = api_request_get_query_int("page", 0) - status: str or None = api_request_get_query_string("status", 100) + state: str or None = api_request_get_query_string("state", 100) type: str or None = api_request_get_query_string("type", 100) - if status is not None and not any(s.value == status for s in JobState.__members__.values()): - raise ApiClientException(ERROR_REQUEST_INVALID_PARAMETER("URL.status", "Unknown status")) + if state is not None and not any(s.value == state for s in JobState.__members__.values()): + raise ApiClientException(ERROR_REQUEST_INVALID_PARAMETER("URL.state", "Unknown state")) def _trans(session: SessionDb): query = Job.select(api_user_ac(), []) - if status is not None: - query = query.where(Job.status == status) + if state is not None: + query = query.where(Job.state == state) if type is not None: query = query.where(Job.type == type) diff --git a/api/src/api/routes/media_process.py b/api/src/api/routes/media_process.py index a918587f533cae53f98d2c6d5dc30de47f39d79b..7d228250f1bd068d589aac71108854e1fd563974 100644 --- a/api/src/api/routes/media_process.py +++ b/api/src/api/routes/media_process.py @@ -199,7 +199,7 @@ def api_route_run_media_process_scheduler(lecture_id: int): if session.scalar( Job.sudo_select() .where(Job.type == "media_process_scheduler") - .where(Job.status.in_([JobState.READY, JobState.SPAWNING, JobState.RUNNING])) + .where(Job.state.in_([JobState.READY, JobState.SPAWNING, JobState.RUNNING])) .where(Job.input_data["lecture_id"].astext.cast(sql.Integer) == lecture_id) .with_only_columns(sql.func.count()) ) > 0: diff --git a/common_py/src/videoag_common/objects/job.py b/common_py/src/videoag_common/objects/job.py index 7af6dda99d45159434c3301c67ec68bb26e35a55..80d60d5909cf77180471afc4f66edd455ecf9a67 100644 --- a/common_py/src/videoag_common/objects/job.py +++ b/common_py/src/videoag_common/objects/job.py @@ -7,7 +7,7 @@ from videoag_common.api_object import * from .user import User -class JobState(Enum): +class JobState(JsonSerializableEnum): READY = "ready" SPAWNING = "spawning" # Job is in K8s but wasn't scheduled yet RUNNING = "running" # Job is running in K8s @@ -35,7 +35,7 @@ class Job(ApiObject, Base): ) ) - status: Mapped[JobState] = api_mapped( + state: Mapped[JobState] = api_mapped( mapped_column(_JOB_STATE_ENUM, nullable=False, index=True, default=JobState.READY), ApiEnumField( include_in_data=True @@ -125,7 +125,7 @@ class Job(ApiObject, Base): ) def set_error(self, error_code: str, error_message: str or None = None): - self.status = JobState.FAILED + self.state = JobState.FAILED self.output_data = { "error_code": error_code, "error_message": error_message diff --git a/job_controller/jobs/media_process_scheduler/job.py b/job_controller/jobs/media_process_scheduler/job.py index 91849ca19aabe4057081e776751d978154580f4e..e7952bca675cdf345cdf8141c2109da7458704d5 100644 --- a/job_controller/jobs/media_process_scheduler/job.py +++ b/job_controller/jobs/media_process_scheduler/job.py @@ -258,7 +258,7 @@ class ProcessScheduler: f"{output_id} ({output_file.id}) has no associated producer job") return True - if output_file.producer_job.status == JobState.FINISHED: + if output_file.producer_job.state == JobState.FINISHED: logger.info(f"Job for producer of {producer.get_target_output_ids()} finished") if not self._try_create_metadata_for_file(output_file): output_file.to_be_replaced = True @@ -271,7 +271,7 @@ class ProcessScheduler: assert isinstance(output_file.medium_metadata, MediumMetadata) self._newly_produced_media.append(output_file.medium_metadata) return False - elif output_file.producer_job.status == JobState.FAILED: + elif output_file.producer_job.state == JobState.FAILED: output_file.to_be_replaced = True logger.info(f"Job for producer of {producer.get_target_output_ids()} required because producer job" f" {output_file.producer_job.id} for output {output_id} ({output_file.id}) has failed." diff --git a/job_controller/src/job_controller/job/event_listener.py b/job_controller/src/job_controller/job/event_listener.py index 2d62c2870753c15210be1e3c093e43f78cfa0f24..e61a63ab2bf3fb86699a212f9838e99c2aa65a0d 100644 --- a/job_controller/src/job_controller/job/event_listener.py +++ b/job_controller/src/job_controller/job/event_listener.py @@ -39,7 +39,7 @@ class EventListener: return False session.add(Job( - status=JobState.READY, + state=JobState.READY, type=job_type, input_data=replace_json_arguments(self.job_data, event_data), cause_job_id=cause_job_id diff --git a/job_controller/src/job_controller/job_controller.py b/job_controller/src/job_controller/job_controller.py index 26f1c99edfae2611db4c3ece8a42d75307e33c56..d8733ce2021f08b6d8ccde63ca51e7a512738546 100644 --- a/job_controller/src/job_controller/job_controller.py +++ b/job_controller/src/job_controller/job_controller.py @@ -39,7 +39,7 @@ class JobScheduler: if session.scalar( Job.sudo_select() .where(Job.type == self._job_type) - .where(Job.status.in_([JobState.READY, JobState.SPAWNING, JobState.RUNNING])) + .where(Job.state.in_([JobState.READY, JobState.SPAWNING, JobState.RUNNING])) .with_only_columns(sql.func.count()) ) > 0: print(f"Warning: Skipping a scheduled execution of job {self._job_type} because one is already ready/running") @@ -115,7 +115,7 @@ class JobController: # This just blocks all ready jobs (whose state we want to change anyway) ready_jobs = session.scalars( Job.sudo_select() - .where(Job.status == JobState.READY) + .where(Job.state == JobState.READY) ).all() for job in ready_jobs: print(f"Spawning job {job.id} of type {job.type}") @@ -123,7 +123,7 @@ class JobController: if job.type == "handle_event": self._handle_event_job(session, job) elif job.type == "no_op": - job.status = JobState.FINISHED + job.state = JobState.FINISHED self._handle_on_end_event(session, job) else: self._spawn_ready_job(job) @@ -143,7 +143,7 @@ class JobController: job.input_data ): print(f"Job {job.id} is already spawned") - job.status = JobState.SPAWNING + job.state = JobState.SPAWNING # Special job type which is handled directly without spawning an actual Job (K8s) def _handle_event_job(self, session: SessionDb, event_job: Job): @@ -154,7 +154,7 @@ class JobController: for job_type, listener in self._event_lister_by_event.get(event_type, []): if listener.on_event(session, job_type, event_job.id, data): self._immediate_next_cycle = True - event_job.status = JobState.FINISHED + event_job.state = JobState.FINISHED def _check_all_active(self, session: SessionDb) -> int: """ @@ -164,12 +164,12 @@ class JobController: # transaction might fail but then the next execution will find the failed/finished jobs and update them in the database active_jobs = session.scalars( Job.sudo_select() - .where(Job.status.in_([JobState.SPAWNING, JobState.RUNNING])) + .where(Job.state.in_([JobState.SPAWNING, JobState.RUNNING])) ).all() active_count = 0 for job in active_jobs: self._check_active_job(session, job) - if job.status in [JobState.SPAWNING, JobState.RUNNING]: + if job.state in [JobState.SPAWNING, JobState.RUNNING]: active_count += 1 return active_count @@ -179,20 +179,20 @@ class JobController: job_info = self.execution_api.get_job_info(job.id) if job_info is None: # Well this is interesting. - if job.status == JobState.RUNNING: + if job.state == JobState.RUNNING: # Job definitely existed once in executor (K8s). Just assume it failed failed = True else: - assert job.status == JobState.SPAWNING + assert job.state == JobState.SPAWNING # The job controller probably failed to actually spawn it after setting it SPAWNING - job.status = JobState.READY + job.state = JobState.READY self._immediate_next_cycle = True return else: if job.run_start_time is None: job.run_start_time = job_info.get_start_time() - if job.run_start_time is not None and job.status == JobState.SPAWNING: - job.status = JobState.RUNNING + if job.run_start_time is not None and job.state == JobState.SPAWNING: + job.state = JobState.RUNNING is_success = job_info.is_success() if is_success is None: # Not finished yet @@ -211,11 +211,11 @@ class JobController: job.run_end_time = datetime.now() if failed: - job.status = JobState.FAILED + job.state = JobState.FAILED # TODO some error context return - job.status = JobState.FINISHED + job.state = JobState.FINISHED self._handle_on_end_event(session, job) def _handle_on_end_event(self, session: SessionDb, job: Job):