manager.py 3.33 KB
Newer Older
Andreas Valder's avatar
Andreas Valder committed
1
#!/usr/bin/env python3
2
3
import os
import sys
Andreas Valder's avatar
Andreas Valder committed
4
5
6
7
8
9
10
11
import time
import threading
import sched
import random
import traceback
import configparser
import psutil
import subprocess
Julian Rother's avatar
Julian Rother committed
12
import json
13
14
import requests
from socket import gethostname
Andreas Valder's avatar
Andreas Valder committed
15

16
17
18
19
20
21
22
23
24
class WorkerApi(object):
	def __init__(self, baseurl,apikey):
		self.baseurl = baseurl
		self.apikey = apikey

	def worker_ping(self):
		r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey})
		return  r.status_code == 200

25
26
	def worker_schedule(self, jobtypes, queues):
		r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': jobtypes, 'queues': queues})
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
		if r.status_code == 200:
			return r.json()
		else:
			return False

configfile = "/etc/worker.conf"
if len(sys.argv) > 1:
	configfile = sys.argv[1]

with open(configfile, "r") as f:
	for line in f.readlines():
		line = line.split('#')[0]
		if "=" not in line:
			continue
		key, value = line.split("=", 1)
		os.environ["WORKER_"+key.strip()] = value.strip()
Andreas Valder's avatar
Andreas Valder committed
43
44
45

scheduler = sched.scheduler()
def run_scheduler():
Andreas Valder's avatar
Andreas Valder committed
46
47
48
49
	time.sleep(1) # weird things on startup
	while True:
		scheduler.run()
		time.sleep(1)
Andreas Valder's avatar
Andreas Valder committed
50
51

def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}):
Andreas Valder's avatar
Andreas Valder committed
52
53
54
55
56
57
58
59
60
61
62
63
	if firstdelay == None:
		firstdelay = random.randint(1, 10)
	def wrapper(func):
		def sched_wrapper():
			try:
				func(*args, **kargs)
			except Exception:
				traceback.print_exc()
			scheduler.enter(delay, priority, sched_wrapper)
		scheduler.enter(firstdelay, priority, sched_wrapper)
		return func
	return wrapper
Andreas Valder's avatar
Andreas Valder committed
64

65
66
api = WorkerApi(os.environ.get("WORKER_APIBASE", "http://127.0.0.1:999999/nourl"),
		os.environ.get("WORKER_APIKEY", "empty"))
Andreas Valder's avatar
Andreas Valder committed
67
68
69

threading.Thread(target=run_scheduler, daemon=True).start()

70
@sched_func(15)
Andreas Valder's avatar
Andreas Valder committed
71
def ping_website_for_host():
Andreas Valder's avatar
Andreas Valder committed
72
73
74
	# ping so the website knows our host is still alive
	if not api.worker_ping():
		print("Error sending host ping")
Andreas Valder's avatar
Andreas Valder committed
75

76
77
78
79
80
81
82
83
84
workerdir = os.environ.get("WORKER_WORKERDIR", "/usr/local/lib/worker")

def get_jobtypes():
	res = []
	for name in os.listdir(workerdir):
		if os.access(workerdir+"/"+name, os.X_OK):
			res.append(name)
	return res

85
86
87
88
89
90
91
92
93
94

proc_queues = {'default': {}, 'background': {}}
queue_sizes = {'default': int(os.environ.get("WORKER_DEFAULT_QUEUE", "2")),
	'background': int(os.environ.get("WORKER_BACKGROUND_QUEUE", "2"))}

def pop_pidinfo(pid):
	for name, queue in proc_queues.items():
		if pid in queue:
			return queue.pop(pid)
	return None
Andreas Valder's avatar
Andreas Valder committed
95
96

while True:
97
98
99
100
101
102
103
104
105
106
107
108
109
110
	while max(map(len, proc_queues.values())):
		pid, status = os.waitpid(0, os.WNOHANG)
		if not pid:
			break
		info = pop_pidinfo(pid)
		if status != 0:
			print('Job %i (pid %i) terminated with non-zero exit code %i'%(info, pid, status))

	queues = []
	for name, queue in proc_queues.items():
		if len(queue) < queue_sizes[name]:
			queues.append(name)
	if not queues:
		time.sleep(30)
Andreas Valder's avatar
Andreas Valder committed
111
		continue
112
113
	
	j = api.worker_schedule(get_jobtypes(), queues)
Andreas Valder's avatar
Andreas Valder committed
114
	if not j:
115
		time.sleep(30)
Andreas Valder's avatar
Andreas Valder committed
116
		continue
117
118
119
120
121
122
123
124

	if str(j['type']) in get_jobtypes() and str(j['queue']) in queues:
		path = workerdir+'/'+str(j['type'])
		print("Started Job %i"%j['id'])
		pid = os.spawnv(os.P_NOWAIT, path, [path, str(j['id']), str(j['type']), str(j['priority']), str(j['data'])])
		proc_queues[str(j['queue'])][pid] = j['id']
		if str(j['queue']) == 'background':
			os.sched_setscheduler(pid, os.SCHED_IDLE, os.sched_param(0))
125
	time.sleep(1)