diff --git a/manager.py b/manager.py index a8bee06f9ba5114120b63a4c41b0a380af755c72..e4b62de71fd227a0a391b7cdd9480631f5152b34 100755 --- a/manager.py +++ b/manager.py @@ -18,10 +18,6 @@ class WorkerApi(object): self.baseurl = baseurl self.apikey = apikey - def worker_ping(self): - r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey}) - return r.status_code == 200 - def worker_schedule(self, jobtypes, queues): r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': jobtypes, 'queues': queues}) if r.status_code == 200: @@ -41,38 +37,9 @@ with open(configfile, "r") as f: key, value = line.split("=", 1) os.environ["WORKER_"+key.strip()] = value.strip() -scheduler = sched.scheduler() -def run_scheduler(): - time.sleep(1) # weird things on startup - while True: - scheduler.run() - time.sleep(1) - -def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}): - if firstdelay == None: - firstdelay = random.randint(1, 10) - def wrapper(func): - def sched_wrapper(): - try: - func(*args, **kargs) - except Exception: - traceback.print_exc() - scheduler.enter(delay, priority, sched_wrapper) - scheduler.enter(firstdelay, priority, sched_wrapper) - return func - return wrapper - api = WorkerApi(os.environ.get("WORKER_APIBASE", "http://127.0.0.1:999999/nourl"), os.environ.get("WORKER_APIKEY", "empty")) -threading.Thread(target=run_scheduler, daemon=True).start() - -@sched_func(15) -def ping_website_for_host(): - # ping so the website knows our host is still alive - if not api.worker_ping(): - print("Error sending host ping") - workerdir = os.environ.get("WORKER_WORKERDIR", "/usr/local/lib/worker") def get_jobtypes(): @@ -82,7 +49,6 @@ def get_jobtypes(): res.append(name) return res - proc_queues = {'default': {}, 'background': {}} queue_sizes = {'default': int(os.environ.get("WORKER_DEFAULT_QUEUE", "2")), 'background': int(os.environ.get("WORKER_BACKGROUND_QUEUE", "2"))} @@ -106,13 +72,9 @@ while True: for name, queue in proc_queues.items(): if len(queue) < queue_sizes[name]: queues.append(name) - if not queues: - time.sleep(30) - continue - j = api.worker_schedule(get_jobtypes(), queues) if not j: - time.sleep(30) + time.sleep(15) continue if str(j['type']) in get_jobtypes() and str(j['queue']) in queues: