Commit d1bc9ae7 authored by Julian Rother's avatar Julian Rother
Browse files

Extended jobmanager to support default and background queue

parent 02bd0d08
...@@ -22,8 +22,8 @@ class WorkerApi(object): ...@@ -22,8 +22,8 @@ class WorkerApi(object):
r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey}) r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey})
return r.status_code == 200 return r.status_code == 200
def worker_schedule(self, jobtypes): def worker_schedule(self, jobtypes, queues):
r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': ",".join(jobtypes)}) r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': jobtypes, 'queues': queues})
if r.status_code == 200: if r.status_code == 200:
return r.json() return r.json()
else: else:
...@@ -82,23 +82,44 @@ def get_jobtypes(): ...@@ -82,23 +82,44 @@ def get_jobtypes():
res.append(name) res.append(name)
return res return res
procs = []
proc_queues = {'default': {}, 'background': {}}
queue_sizes = {'default': int(os.environ.get("WORKER_DEFAULT_QUEUE", "2")),
'background': int(os.environ.get("WORKER_BACKGROUND_QUEUE", "2"))}
def pop_pidinfo(pid):
for name, queue in proc_queues.items():
if pid in queue:
return queue.pop(pid)
return None
while True: while True:
for p in procs: while max(map(len, proc_queues.values())):
p.poll() pid, status = os.waitpid(0, os.WNOHANG)
if p.returncode == None: if not pid:
continue break
p.wait() info = pop_pidinfo(pid)
procs.remove(p) if status != 0:
if psutil.cpu_percent() > 85: print('Job %i (pid %i) terminated with non-zero exit code %i'%(info, pid, status))
time.sleep(0.1)
queues = []
for name, queue in proc_queues.items():
if len(queue) < queue_sizes[name]:
queues.append(name)
if not queues:
time.sleep(30)
continue continue
j = api.worker_schedule(get_jobtypes())
j = api.worker_schedule(get_jobtypes(), queues)
if not j: if not j:
time.sleep(30) time.sleep(30)
continue continue
print("started jobid %i"%j['id'])
if str(j['type']) in get_jobtypes(): if str(j['type']) in get_jobtypes() and str(j['queue']) in queues:
procs.append(subprocess.Popen([workerdir+'/'+str(j['type']), str(j['id']), str(j['type']), str(j['priority']) , str(j['data']) ] )) path = workerdir+'/'+str(j['type'])
print("Started Job %i"%j['id'])
pid = os.spawnv(os.P_NOWAIT, path, [path, str(j['id']), str(j['type']), str(j['priority']), str(j['data'])])
proc_queues[str(j['queue'])][pid] = j['id']
if str(j['queue']) == 'background':
os.sched_setscheduler(pid, os.SCHED_IDLE, os.sched_param(0))
time.sleep(1) time.sleep(1)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment