Commit 8609f969 authored by Andreas Valder's avatar Andreas Valder

redesigned worker

parent 8d0b85cc
Dependencies:
python3
python-psutil
Struktur:
Management process
ping for host
get and start new jobs
detect failed jobs
kill failed jobs
Worker process
executes job
ping for job
#!/usr/bin/env python3
import time
import threading
import sched
import random
import traceback
import configparser
import psutil
import subprocess
from workerapi import WorkerApi
config = configparser.ConfigParser()
config.read('config.ini')
scheduler = sched.scheduler()
def run_scheduler():
time.sleep(1) # weird things on startup
while True:
scheduler.run()
time.sleep(10)
def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}):
if firstdelay == None:
firstdelay = random.randint(1, 10)
def wrapper(func):
def sched_wrapper():
try:
func(*args, **kargs)
except Exception:
traceback.print_exc()
scheduler.enter(delay, priority, sched_wrapper)
scheduler.enter(firstdelay, priority, sched_wrapper)
return func
return wrapper
api = WorkerApi(config['API']['BASE'],config['API']['KEY'])
threading.Thread(target=run_scheduler, daemon=True).start()
@sched_func(5)
def ping_website_for_host():
# ping so the website knows our host is still alive
if not api.worker_ping():
print("Error sending host ping")
@sched_func(10)
def get_job():
if psutil.cpu_percent(interval=3) > 85:
return
j = api.worker_schedule(config['JOBS']['TYPES'])
if not j:
return
print("id: %i, data: %s, all: %s"%(j['id'],j['data'],j))
subprocess.Popen(['./worker.py', str(j['id']), str(j['type']), str(j['priority']) , str(j['data']) ] )
while True:
time.sleep(10)
#!/usr/bin/env python3
import requests
import time
import threading
import sched
import random
import traceback
import configparser
import psutil
import os
import json
import shlex
import sys
from workerapi import WorkerApi
config = configparser.ConfigParser()
config.read('config.ini')
scheduler = sched.scheduler()
def run_scheduler():
time.sleep(1) # weird things on startup
while True:
scheduler.run()
time.sleep(10)
api = WorkerApi(config['API']['BASE'],config['API']['KEY'])
def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}):
print(delay)
if firstdelay == None:
firstdelay = random.randint(1, 10)
def wrapper(func):
def sched_wrapper():
try:
func(*args, **kargs)
except Exception:
traceback.print_exc()
scheduler.enter(delay, priority, sched_wrapper)
scheduler.enter(firstdelay, priority, sched_wrapper)
return func
return wrapper
id = int(sys.argv[1])
jobtype = sys.argv[2]
priority = sys.argv[3]
data = json.loads(sys.argv[4])
print(sys.argv)
threading.Thread(target=run_scheduler, daemon=True).start()
@sched_func(5)
def ping_website():
r = requests.post(config['API']['BASE']+'/jobs/api/worker/'+config['MAIN']['HOST']+'/ping', params={'apikey': config['API']['KEY']})
if not r.status_code == 200:
print("Error sending ping: ",r)
@sched_func(10)
def get_jobs():
if psutil.cpu_percent(interval=3) < 70:
r = requests.post(config['API']['BASE']+'/jobs/api/worker/'+config['MAIN']['HOST']+'/schedule', json={'apikey': config['API']['KEY'], 'jobtypes': 'thumbnail'})
if r.status_code == 200:
threading.Thread(target=executejob, daemon=True, kwargs={'data': r.json()}).start()
def executejob(data):
print(data)
param = json.loads(data['data'])
if (data['type'] == 'thumbnail'):
inputfile = '/mnt/sshfs/video-main/'+param['path']
outputfile = '/tmp/l_'+str(param['lectureid'])+'.png'
r = requests.post(config['API']['BASE']+'/jobs/api/job/'+str(data['id'])+'/ping', params={'apikey': config['API']['KEY'], 'host': config['MAIN']['HOST'], 'state': 'running', 'status': '{}'})
if (jobtype == 'thumbnail'):
inputfile = '/mnt/sshfs/video-main/'+data['path']
outputfile = '/tmp/l_'+str(data['lectureid'])+'.png'
api.job_ping(id=id)
os.system("ffmpeg -loglevel error -i '" + shlex.quote(inputfile) + "' -ss '10:00' -vf 'scale=640:360' -frames:v 1 '" + shlex.quote(outputfile) + "'")
r = requests.post(config['API']['BASE']+'/jobs/api/job/'+str(data['id'])+'/finished', json={'apikey': config['API']['KEY'], 'status': '{}'})
print(r)
while True:
time.sleep(10)
api.job_ping(id=id,state='finished')
import requests
from socket import gethostname
import json
class WorkerApi(object):
def __init__(self, baseurl,apikey):
self.baseurl = baseurl
self.apikey = apikey
def worker_ping(self):
r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey})
return r.status_code == 200
def worker_schedule(self, jobtypes):
r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': jobtypes})
if r.status_code == 200:
return r.json()
else:
return False
def job_ping(self, id, state='running', status={}):
r = requests.post(self.baseurl+'/internal/jobs/api/job/'+str(id)+'/ping', params={'apikey': self.apikey, 'host': gethostname(), 'state': state, 'status': json.dumps(status) })
return r.status_code == 200
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment