manager.py 2.57 KB
Newer Older
Andreas Valder's avatar
Andreas Valder committed
1
#!/usr/bin/env python3
2
3
import os
import sys
Andreas Valder's avatar
Andreas Valder committed
4
5
6
7
8
9
10
11
import time
import threading
import sched
import random
import traceback
import configparser
import psutil
import subprocess
Julian Rother's avatar
Julian Rother committed
12
import json
13
14
import requests
from socket import gethostname
Andreas Valder's avatar
Andreas Valder committed
15

16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class WorkerApi(object):
	def __init__(self, baseurl,apikey):
		self.baseurl = baseurl
		self.apikey = apikey

	def worker_ping(self):
		r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey})
		return  r.status_code == 200

	def worker_schedule(self, jobtypes):
		r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': ",".join(jobtypes)})
		if r.status_code == 200:
			return r.json()
		else:
			return False

configfile = "/etc/worker.conf"
if len(sys.argv) > 1:
	configfile = sys.argv[1]

with open(configfile, "r") as f:
	for line in f.readlines():
		line = line.split('#')[0]
		if "=" not in line:
			continue
		key, value = line.split("=", 1)
		os.environ["WORKER_"+key.strip()] = value.strip()
Andreas Valder's avatar
Andreas Valder committed
43
44
45

scheduler = sched.scheduler()
def run_scheduler():
Andreas Valder's avatar
Andreas Valder committed
46
47
48
49
	time.sleep(1) # weird things on startup
	while True:
		scheduler.run()
		time.sleep(1)
Andreas Valder's avatar
Andreas Valder committed
50
51

def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}):
Andreas Valder's avatar
Andreas Valder committed
52
53
54
55
56
57
58
59
60
61
62
63
	if firstdelay == None:
		firstdelay = random.randint(1, 10)
	def wrapper(func):
		def sched_wrapper():
			try:
				func(*args, **kargs)
			except Exception:
				traceback.print_exc()
			scheduler.enter(delay, priority, sched_wrapper)
		scheduler.enter(firstdelay, priority, sched_wrapper)
		return func
	return wrapper
Andreas Valder's avatar
Andreas Valder committed
64

65
66
api = WorkerApi(os.environ.get("WORKER_APIBASE", "http://127.0.0.1:999999/nourl"),
		os.environ.get("WORKER_APIKEY", "empty"))
Andreas Valder's avatar
Andreas Valder committed
67
68
69
70
71

threading.Thread(target=run_scheduler, daemon=True).start()

@sched_func(5)
def ping_website_for_host():
Andreas Valder's avatar
Andreas Valder committed
72
73
74
	# ping so the website knows our host is still alive
	if not api.worker_ping():
		print("Error sending host ping")
Andreas Valder's avatar
Andreas Valder committed
75

76
77
78
79
80
81
82
83
84
workerdir = os.environ.get("WORKER_WORKERDIR", "/usr/local/lib/worker")

def get_jobtypes():
	res = []
	for name in os.listdir(workerdir):
		if os.access(workerdir+"/"+name, os.X_OK):
			res.append(name)
	return res

Julian Rother's avatar
Julian Rother committed
85
procs = []
Andreas Valder's avatar
Andreas Valder committed
86
87

while True:
Andreas Valder's avatar
Andreas Valder committed
88
89
90
91
92
93
94
95
96
	for p in procs:
		p.poll()
		if p.returncode == None:
			continue
		p.wait()
		procs.remove(p)
	if psutil.cpu_percent() > 85:
		time.sleep(0.1)
		continue
97
	j = api.worker_schedule(get_jobtypes())
Andreas Valder's avatar
Andreas Valder committed
98
99
100
	if not j:
		time.sleep(1)
		continue
Andreas Valder's avatar
Andreas Valder committed
101
	print("started jobid %i"%j['id'])
102
103
104
	if str(j['type']) in get_jobtypes():
		procs.append(subprocess.Popen([workerdir+"/"+str(j['type']), str(j['id']),
					str(j['type']), str(j['priority']) , str(j['data']) ] ))