#!/usr/bin/env python3 import os import sys import time import threading import sched import random import traceback import configparser import psutil import subprocess import json import requests from socket import gethostname class WorkerApi(object): def __init__(self, baseurl,apikey): self.baseurl = baseurl self.apikey = apikey def worker_ping(self): r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey}) return r.status_code == 200 def worker_schedule(self, jobtypes): r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': ",".join(jobtypes)}) if r.status_code == 200: return r.json() else: return False configfile = "/etc/worker.conf" if len(sys.argv) > 1: configfile = sys.argv[1] with open(configfile, "r") as f: for line in f.readlines(): line = line.split('#')[0] if "=" not in line: continue key, value = line.split("=", 1) os.environ["WORKER_"+key.strip()] = value.strip() scheduler = sched.scheduler() def run_scheduler(): time.sleep(1) # weird things on startup while True: scheduler.run() time.sleep(1) def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}): if firstdelay == None: firstdelay = random.randint(1, 10) def wrapper(func): def sched_wrapper(): try: func(*args, **kargs) except Exception: traceback.print_exc() scheduler.enter(delay, priority, sched_wrapper) scheduler.enter(firstdelay, priority, sched_wrapper) return func return wrapper api = WorkerApi(os.environ.get("WORKER_APIBASE", "http://127.0.0.1:999999/nourl"), os.environ.get("WORKER_APIKEY", "empty")) threading.Thread(target=run_scheduler, daemon=True).start() @sched_func(5) def ping_website_for_host(): # ping so the website knows our host is still alive if not api.worker_ping(): print("Error sending host ping") workerdir = os.environ.get("WORKER_WORKERDIR", "/usr/local/lib/worker") def get_jobtypes(): res = [] for name in os.listdir(workerdir): if os.access(workerdir+"/"+name, os.X_OK): res.append(name) return res procs = [] while True: for p in procs: p.poll() if p.returncode == None: continue p.wait() procs.remove(p) if psutil.cpu_percent() > 85: time.sleep(0.1) continue j = api.worker_schedule(get_jobtypes()) if not j: time.sleep(1) continue print("started jobid %i"%j['id']) if str(j['type']) in get_jobtypes(): procs.append(subprocess.Popen([workerdir+"/"+str(j['type']), str(j['id']), str(j['type']), str(j['priority']) , str(j['data']) ] ))