manager.py 1.46 KB
Newer Older
Andreas Valder's avatar
Andreas Valder committed
1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python3
import time
import threading
import sched
import random
import traceback
import configparser
import psutil
import subprocess
from workerapi import WorkerApi
Julian Rother's avatar
Julian Rother committed
11
import json
Andreas Valder's avatar
Andreas Valder committed
12
13
14
15
16
17

config = configparser.ConfigParser()
config.read('config.ini')

scheduler = sched.scheduler()
def run_scheduler():
Andreas Valder's avatar
Andreas Valder committed
18
19
20
21
	time.sleep(1) # weird things on startup
	while True:
		scheduler.run()
		time.sleep(1)
Andreas Valder's avatar
Andreas Valder committed
22
23

def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}):
Andreas Valder's avatar
Andreas Valder committed
24
25
26
27
28
29
30
31
32
33
34
35
	if firstdelay == None:
		firstdelay = random.randint(1, 10)
	def wrapper(func):
		def sched_wrapper():
			try:
				func(*args, **kargs)
			except Exception:
				traceback.print_exc()
			scheduler.enter(delay, priority, sched_wrapper)
		scheduler.enter(firstdelay, priority, sched_wrapper)
		return func
	return wrapper
Andreas Valder's avatar
Andreas Valder committed
36
37
38
39
40
41
42

api = WorkerApi(config['API']['BASE'],config['API']['KEY'])

threading.Thread(target=run_scheduler, daemon=True).start()

@sched_func(5)
def ping_website_for_host():
Andreas Valder's avatar
Andreas Valder committed
43
44
45
	# ping so the website knows our host is still alive
	if not api.worker_ping():
		print("Error sending host ping")
Andreas Valder's avatar
Andreas Valder committed
46

Julian Rother's avatar
Julian Rother committed
47
procs = []
Andreas Valder's avatar
Andreas Valder committed
48
49

while True:
Andreas Valder's avatar
Andreas Valder committed
50
51
52
53
54
55
56
57
58
59
60
61
62
	for p in procs:
		p.poll()
		if p.returncode == None:
			continue
		p.wait()
		procs.remove(p)
	if psutil.cpu_percent() > 85:
		time.sleep(0.1)
		continue
	j = api.worker_schedule(config['JOBS']['TYPES'])
	if not j:
		time.sleep(1)
		continue
Andreas Valder's avatar
Andreas Valder committed
63
	print("started jobid %i"%j['id'])
Andreas Valder's avatar
Andreas Valder committed
64
	procs.append(subprocess.Popen(['./worker.py', str(j['id']), str(j['type']), str(j['priority']) , str(j['data']) ] ))