Skip to content
Snippets Groups Projects
Commit df33117e authored by Julian Rother's avatar Julian Rother
Browse files

Removed code for pinging the worker_ping endpoint

parent d1bc9ae7
No related branches found
No related tags found
No related merge requests found
...@@ -18,10 +18,6 @@ class WorkerApi(object): ...@@ -18,10 +18,6 @@ class WorkerApi(object):
self.baseurl = baseurl self.baseurl = baseurl
self.apikey = apikey self.apikey = apikey
def worker_ping(self):
r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey})
return r.status_code == 200
def worker_schedule(self, jobtypes, queues): def worker_schedule(self, jobtypes, queues):
r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': jobtypes, 'queues': queues}) r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': jobtypes, 'queues': queues})
if r.status_code == 200: if r.status_code == 200:
...@@ -41,38 +37,9 @@ with open(configfile, "r") as f: ...@@ -41,38 +37,9 @@ with open(configfile, "r") as f:
key, value = line.split("=", 1) key, value = line.split("=", 1)
os.environ["WORKER_"+key.strip()] = value.strip() os.environ["WORKER_"+key.strip()] = value.strip()
scheduler = sched.scheduler()
def run_scheduler():
time.sleep(1) # weird things on startup
while True:
scheduler.run()
time.sleep(1)
def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}):
if firstdelay == None:
firstdelay = random.randint(1, 10)
def wrapper(func):
def sched_wrapper():
try:
func(*args, **kargs)
except Exception:
traceback.print_exc()
scheduler.enter(delay, priority, sched_wrapper)
scheduler.enter(firstdelay, priority, sched_wrapper)
return func
return wrapper
api = WorkerApi(os.environ.get("WORKER_APIBASE", "http://127.0.0.1:999999/nourl"), api = WorkerApi(os.environ.get("WORKER_APIBASE", "http://127.0.0.1:999999/nourl"),
os.environ.get("WORKER_APIKEY", "empty")) os.environ.get("WORKER_APIKEY", "empty"))
threading.Thread(target=run_scheduler, daemon=True).start()
@sched_func(15)
def ping_website_for_host():
# ping so the website knows our host is still alive
if not api.worker_ping():
print("Error sending host ping")
workerdir = os.environ.get("WORKER_WORKERDIR", "/usr/local/lib/worker") workerdir = os.environ.get("WORKER_WORKERDIR", "/usr/local/lib/worker")
def get_jobtypes(): def get_jobtypes():
...@@ -82,7 +49,6 @@ def get_jobtypes(): ...@@ -82,7 +49,6 @@ def get_jobtypes():
res.append(name) res.append(name)
return res return res
proc_queues = {'default': {}, 'background': {}} proc_queues = {'default': {}, 'background': {}}
queue_sizes = {'default': int(os.environ.get("WORKER_DEFAULT_QUEUE", "2")), queue_sizes = {'default': int(os.environ.get("WORKER_DEFAULT_QUEUE", "2")),
'background': int(os.environ.get("WORKER_BACKGROUND_QUEUE", "2"))} 'background': int(os.environ.get("WORKER_BACKGROUND_QUEUE", "2"))}
...@@ -106,13 +72,9 @@ while True: ...@@ -106,13 +72,9 @@ while True:
for name, queue in proc_queues.items(): for name, queue in proc_queues.items():
if len(queue) < queue_sizes[name]: if len(queue) < queue_sizes[name]:
queues.append(name) queues.append(name)
if not queues:
time.sleep(30)
continue
j = api.worker_schedule(get_jobtypes(), queues) j = api.worker_schedule(get_jobtypes(), queues)
if not j: if not j:
time.sleep(30) time.sleep(15)
continue continue
if str(j['type']) in get_jobtypes() and str(j['queue']) in queues: if str(j['type']) in get_jobtypes() and str(j['queue']) in queues:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment