Skip to content
Snippets Groups Projects
Verified Commit 961e4b4c authored by Dorian Koch's avatar Dorian Koch
Browse files

Recover spawning jobs

parent a655872c
No related merge requests found
...@@ -4,6 +4,7 @@ from job_database_api import DummyJobDatabaseApi, JobData, JobDatabaseApi ...@@ -4,6 +4,7 @@ from job_database_api import DummyJobDatabaseApi, JobData, JobDatabaseApi
from jobs.dummy_job import DummyJob from jobs.dummy_job import DummyJob
from kubernetes_api import K8sApi from kubernetes_api import K8sApi
import os import os
import datetime
def load_config(): def load_config():
...@@ -32,6 +33,10 @@ class ControllerState(): ...@@ -32,6 +33,10 @@ class ControllerState():
db_engine = self.config.get("DB_ENGINE") db_engine = self.config.get("DB_ENGINE")
if db_engine == "dummy": if db_engine == "dummy":
self.job_api = DummyJobDatabaseApi() self.job_api = DummyJobDatabaseApi()
# make some dummy jobs
start_id = int(datetime.datetime.now().timestamp())
for i in range(start_id, start_id + 4):
self.job_api.create_job(JobData("job{}".format(i), "dummy"))
else: else:
raise Exception(f"Unknown DB_ENGINE: {db_engine}") raise Exception(f"Unknown DB_ENGINE: {db_engine}")
self.event_queue = EventQueue() self.event_queue = EventQueue()
......
...@@ -34,19 +34,23 @@ class JobDatabaseApi(metaclass=ABCMeta): ...@@ -34,19 +34,23 @@ class JobDatabaseApi(metaclass=ABCMeta):
@abstractmethod @abstractmethod
def get_next_jobs_and_set_spawning(self, limit: int) -> list[JobData]: def get_next_jobs_and_set_spawning(self, limit: int) -> list[JobData]:
pass pass # atomically retrieve and set state to SPAWNING
@abstractmethod
def get_all_spawning_jobs(self) -> list[JobData]:
pass # used for recovery
@abstractmethod @abstractmethod
def get_job_by_id(self, job_id: str) -> Optional[JobData]: def get_job_by_id(self, job_id: str) -> Optional[JobData]:
pass pass # refresh state from db
@abstractmethod @abstractmethod
def create_job(self, job: JobData): def create_job(self, job: JobData):
pass pass # insert into db
@abstractmethod @abstractmethod
def update_job_state(self, job_id: str, new_state: JobState): def update_job_state(self, job_id: str, new_state: JobState):
pass pass # update state in db
class DummyJobDatabaseApi(JobDatabaseApi): class DummyJobDatabaseApi(JobDatabaseApi):
...@@ -65,6 +69,9 @@ class DummyJobDatabaseApi(JobDatabaseApi): ...@@ -65,6 +69,9 @@ class DummyJobDatabaseApi(JobDatabaseApi):
ret.append(next) ret.append(next)
return ret return ret
def get_all_spawning_jobs(self) -> list[JobData]:
return [job for job in self.db_state.values() if job.job_state == JobState.SPAWNING]
def get_job_by_id(self, job_id: str) -> Optional[JobData]: def get_job_by_id(self, job_id: str) -> Optional[JobData]:
return copy.deepcopy(self.db_state.get(job_id, None)) return copy.deepcopy(self.db_state.get(job_id, None))
......
from actions.spawn_job import WatchJob from actions.spawn_job import SpawnJob, WatchJob
from event_queue import EventResult from event_queue import EventResult
from actions.find_ready_jobs import FindReadyJobs from actions.find_ready_jobs import FindReadyJobs
from job_database_api import JobData from job_database_api import JobData, JobState
from job_controller import ControllerState from job_controller import ControllerState
import datetime import datetime
...@@ -60,12 +60,23 @@ def main(): ...@@ -60,12 +60,23 @@ def main():
cstate.event_queue.put(WatchJob(watch.metadata.labels["job_id"])) cstate.event_queue.put(WatchJob(watch.metadata.labels["job_id"]))
print("Done checking for existing jobs") print("Done checking for existing jobs")
# TODO: check for existing jobs in spawning state in db that are not in k8s and requeue them (ready state will be picked up by FindReadyJobs) # find spawning jobs that are not in k8s
spawning_jobs = cstate.job_api.get_all_spawning_jobs()
# make some dummy jobs num_resetted = 0
start_id = int(datetime.datetime.now().timestamp()) for job in spawning_jobs:
for i in range(start_id, start_id + 4): exists = False
cstate.job_api.create_job(JobData("job{}".format(i), "dummy")) for k8sjob in existing_worker_jobs.items:
if k8sjob.metadata.labels["job_id"] == job.job_id:
exists = True
break
if not exists:
# reset to ready
# TODO: maybe reset them to an error state?
job.update_state(cstate, JobState.READY)
num_resetted += 1
if num_resetted > 0:
print(f"Resetted {num_resetted} spawning jobs to ready state")
print("Reconcilation done")
cstate.event_queue.put(FindReadyJobs()) cstate.event_queue.put(FindReadyJobs())
...@@ -107,10 +118,17 @@ def main(): ...@@ -107,10 +118,17 @@ def main():
print(f"Error in event {evt}: {e}") print(f"Error in event {evt}: {e}")
print("###") print("###")
print("Event loop stopped") print("Event loop stopped")
# print all remaining events # set all jobs that were supposed to be spawned back to ready
print("Remaining events in queue:") num_readied = 0
while not cstate.event_queue.empty(): for evt in cstate.event_queue.queue.queue:
print(cstate.event_queue.get()) if isinstance(evt, SpawnJob):
# get up to date job data (it may have been canceled)
job = cstate.job_api.get_job_by_id(evt.job.jobData.job_id)
if job is not None and job.job_state == JobState.SPAWNING:
job.update_state(cstate, JobState.READY)
num_readied += 1
if num_readied > 0:
print(f"Readied {num_readied} jobs that were supposed to be spawned")
sys.exit(0) sys.exit(0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment