Skip to content
Snippets Groups Projects
Commit 99982a28 authored by Simon Künzel's avatar Simon Künzel
Browse files

Rename job 'status' to 'state'

parent 687cc138
No related branches found
No related tags found
No related merge requests found
......@@ -11,7 +11,7 @@ from ..routes import *
("entries_per_page", "?int", "Must be between 10 and 100"),
("page", "?int", "Zero-indexed. Must not be negative. If this is bigger than the page count, an empty "
"page is returned"),
("status", "?string", f"Only return jobs with this status. Must be one of "
("state", "?string", f"Only return jobs with this state. Must be one of "
f"{list(map(lambda m: m.value, JobState.__members__.values()))}"),
("type", "?string", f"Only return jobs of this type. May be any string"),
],
......@@ -25,17 +25,17 @@ def api_route_get_jobs():
entries_per_page = api_request_get_query_int("entries_per_page", 20, 10, 50)
page_num = api_request_get_query_int("page", 0)
status: str or None = api_request_get_query_string("status", 100)
state: str or None = api_request_get_query_string("state", 100)
type: str or None = api_request_get_query_string("type", 100)
if status is not None and not any(s.value == status for s in JobState.__members__.values()):
raise ApiClientException(ERROR_REQUEST_INVALID_PARAMETER("URL.status", "Unknown status"))
if state is not None and not any(s.value == state for s in JobState.__members__.values()):
raise ApiClientException(ERROR_REQUEST_INVALID_PARAMETER("URL.state", "Unknown state"))
def _trans(session: SessionDb):
query = Job.select(api_user_ac(), [])
if status is not None:
query = query.where(Job.status == status)
if state is not None:
query = query.where(Job.state == state)
if type is not None:
query = query.where(Job.type == type)
......
......@@ -199,7 +199,7 @@ def api_route_run_media_process_scheduler(lecture_id: int):
if session.scalar(
Job.sudo_select()
.where(Job.type == "media_process_scheduler")
.where(Job.status.in_([JobState.READY, JobState.SPAWNING, JobState.RUNNING]))
.where(Job.state.in_([JobState.READY, JobState.SPAWNING, JobState.RUNNING]))
.where(Job.input_data["lecture_id"].astext.cast(sql.Integer) == lecture_id)
.with_only_columns(sql.func.count())
) > 0:
......
......@@ -7,7 +7,7 @@ from videoag_common.api_object import *
from .user import User
class JobState(Enum):
class JobState(JsonSerializableEnum):
READY = "ready"
SPAWNING = "spawning" # Job is in K8s but wasn't scheduled yet
RUNNING = "running" # Job is running in K8s
......@@ -35,7 +35,7 @@ class Job(ApiObject, Base):
)
)
status: Mapped[JobState] = api_mapped(
state: Mapped[JobState] = api_mapped(
mapped_column(_JOB_STATE_ENUM, nullable=False, index=True, default=JobState.READY),
ApiEnumField(
include_in_data=True
......@@ -125,7 +125,7 @@ class Job(ApiObject, Base):
)
def set_error(self, error_code: str, error_message: str or None = None):
self.status = JobState.FAILED
self.state = JobState.FAILED
self.output_data = {
"error_code": error_code,
"error_message": error_message
......
......@@ -258,7 +258,7 @@ class ProcessScheduler:
f"{output_id} ({output_file.id}) has no associated producer job")
return True
if output_file.producer_job.status == JobState.FINISHED:
if output_file.producer_job.state == JobState.FINISHED:
logger.info(f"Job for producer of {producer.get_target_output_ids()} finished")
if not self._try_create_metadata_for_file(output_file):
output_file.to_be_replaced = True
......@@ -271,7 +271,7 @@ class ProcessScheduler:
assert isinstance(output_file.medium_metadata, MediumMetadata)
self._newly_produced_media.append(output_file.medium_metadata)
return False
elif output_file.producer_job.status == JobState.FAILED:
elif output_file.producer_job.state == JobState.FAILED:
output_file.to_be_replaced = True
logger.info(f"Job for producer of {producer.get_target_output_ids()} required because producer job"
f" {output_file.producer_job.id} for output {output_id} ({output_file.id}) has failed."
......
......@@ -39,7 +39,7 @@ class EventListener:
return False
session.add(Job(
status=JobState.READY,
state=JobState.READY,
type=job_type,
input_data=replace_json_arguments(self.job_data, event_data),
cause_job_id=cause_job_id
......
......@@ -39,7 +39,7 @@ class JobScheduler:
if session.scalar(
Job.sudo_select()
.where(Job.type == self._job_type)
.where(Job.status.in_([JobState.READY, JobState.SPAWNING, JobState.RUNNING]))
.where(Job.state.in_([JobState.READY, JobState.SPAWNING, JobState.RUNNING]))
.with_only_columns(sql.func.count())
) > 0:
print(f"Warning: Skipping a scheduled execution of job {self._job_type} because one is already ready/running")
......@@ -115,7 +115,7 @@ class JobController:
# This just blocks all ready jobs (whose state we want to change anyway)
ready_jobs = session.scalars(
Job.sudo_select()
.where(Job.status == JobState.READY)
.where(Job.state == JobState.READY)
).all()
for job in ready_jobs:
print(f"Spawning job {job.id} of type {job.type}")
......@@ -123,7 +123,7 @@ class JobController:
if job.type == "handle_event":
self._handle_event_job(session, job)
elif job.type == "no_op":
job.status = JobState.FINISHED
job.state = JobState.FINISHED
self._handle_on_end_event(session, job)
else:
self._spawn_ready_job(job)
......@@ -143,7 +143,7 @@ class JobController:
job.input_data
):
print(f"Job {job.id} is already spawned")
job.status = JobState.SPAWNING
job.state = JobState.SPAWNING
# Special job type which is handled directly without spawning an actual Job (K8s)
def _handle_event_job(self, session: SessionDb, event_job: Job):
......@@ -154,7 +154,7 @@ class JobController:
for job_type, listener in self._event_lister_by_event.get(event_type, []):
if listener.on_event(session, job_type, event_job.id, data):
self._immediate_next_cycle = True
event_job.status = JobState.FINISHED
event_job.state = JobState.FINISHED
def _check_all_active(self, session: SessionDb) -> int:
"""
......@@ -164,12 +164,12 @@ class JobController:
# transaction might fail but then the next execution will find the failed/finished jobs and update them in the database
active_jobs = session.scalars(
Job.sudo_select()
.where(Job.status.in_([JobState.SPAWNING, JobState.RUNNING]))
.where(Job.state.in_([JobState.SPAWNING, JobState.RUNNING]))
).all()
active_count = 0
for job in active_jobs:
self._check_active_job(session, job)
if job.status in [JobState.SPAWNING, JobState.RUNNING]:
if job.state in [JobState.SPAWNING, JobState.RUNNING]:
active_count += 1
return active_count
......@@ -179,20 +179,20 @@ class JobController:
job_info = self.execution_api.get_job_info(job.id)
if job_info is None:
# Well this is interesting.
if job.status == JobState.RUNNING:
if job.state == JobState.RUNNING:
# Job definitely existed once in executor (K8s). Just assume it failed
failed = True
else:
assert job.status == JobState.SPAWNING
assert job.state == JobState.SPAWNING
# The job controller probably failed to actually spawn it after setting it SPAWNING
job.status = JobState.READY
job.state = JobState.READY
self._immediate_next_cycle = True
return
else:
if job.run_start_time is None:
job.run_start_time = job_info.get_start_time()
if job.run_start_time is not None and job.status == JobState.SPAWNING:
job.status = JobState.RUNNING
if job.run_start_time is not None and job.state == JobState.SPAWNING:
job.state = JobState.RUNNING
is_success = job_info.is_success()
if is_success is None:
# Not finished yet
......@@ -211,11 +211,11 @@ class JobController:
job.run_end_time = datetime.now()
if failed:
job.status = JobState.FAILED
job.state = JobState.FAILED
# TODO some error context
return
job.status = JobState.FINISHED
job.state = JobState.FINISHED
self._handle_on_end_event(session, job)
def _handle_on_end_event(self, session: SessionDb, job: Job):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment