diff --git a/config.ini.example b/config.ini.example deleted file mode 100644 index b361d541985c480f721df2634568e8d12c2ae4ed..0000000000000000000000000000000000000000 --- a/config.ini.example +++ /dev/null @@ -1,15 +0,0 @@ -[MAIN] -VIDEOS_RAW=/mnt/raw -VIDEOS_RELEASED=/mnt/released - -[API] -KEY= -BASE=http://localhost:5000 - -[JOBS] -TYPES=thumbnail - -[thumbnail] -width=640 -height=-1 -folder=thumbnail diff --git a/manager.py b/manager.py index 2762cc2f6c03879246fb04705f909350f884ae18..252c754482f80cb32ed5a10ea0d177dfa3427323 100755 --- a/manager.py +++ b/manager.py @@ -1,4 +1,6 @@ #!/usr/bin/env python3 +import os +import sys import time import threading import sched @@ -7,11 +9,37 @@ import traceback import configparser import psutil import subprocess -from workerapi import WorkerApi import json +import requests +from socket import gethostname -config = configparser.ConfigParser() -config.read('config.ini') +class WorkerApi(object): + def __init__(self, baseurl,apikey): + self.baseurl = baseurl + self.apikey = apikey + + def worker_ping(self): + r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey}) + return r.status_code == 200 + + def worker_schedule(self, jobtypes): + r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': ",".join(jobtypes)}) + if r.status_code == 200: + return r.json() + else: + return False + +configfile = "/etc/worker.conf" +if len(sys.argv) > 1: + configfile = sys.argv[1] + +with open(configfile, "r") as f: + for line in f.readlines(): + line = line.split('#')[0] + if "=" not in line: + continue + key, value = line.split("=", 1) + os.environ["WORKER_"+key.strip()] = value.strip() scheduler = sched.scheduler() def run_scheduler(): @@ -34,7 +62,8 @@ def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}): return func return wrapper -api = WorkerApi(config['API']['BASE'],config['API']['KEY']) +api = WorkerApi(os.environ.get("WORKER_APIBASE", "http://127.0.0.1:999999/nourl"), + os.environ.get("WORKER_APIKEY", "empty")) threading.Thread(target=run_scheduler, daemon=True).start() @@ -44,6 +73,15 @@ def ping_website_for_host(): if not api.worker_ping(): print("Error sending host ping") +workerdir = os.environ.get("WORKER_WORKERDIR", "/usr/local/lib/worker") + +def get_jobtypes(): + res = [] + for name in os.listdir(workerdir): + if os.access(workerdir+"/"+name, os.X_OK): + res.append(name) + return res + procs = [] while True: @@ -56,9 +94,11 @@ while True: if psutil.cpu_percent() > 85: time.sleep(0.1) continue - j = api.worker_schedule(config['JOBS']['TYPES']) + j = api.worker_schedule(get_jobtypes()) if not j: time.sleep(1) continue print("started jobid %i"%j['id']) - procs.append(subprocess.Popen(['./worker.py', str(j['id']), str(j['type']), str(j['priority']) , str(j['data']) ] )) + if str(j['type']) in get_jobtypes(): + procs.append(subprocess.Popen([workerdir+"/"+str(j['type']), str(j['id']), + str(j['type']), str(j['priority']) , str(j['data']) ] )) diff --git a/worker.conf b/worker.conf new file mode 100644 index 0000000000000000000000000000000000000000..c7592cb05ff8b5633e933fd5c53c0a7e14566950 --- /dev/null +++ b/worker.conf @@ -0,0 +1,8 @@ +WORKERDIR=/usr/local/lib/worker + +RAW=/mnt/raw +RELEASED=/mnt/released +TMP=/mnt/video-main/kodiert + +APIKEY= +APIBASE=http://localhost:5000 diff --git a/worker.py b/worker.py deleted file mode 100755 index d81c9f7c06b5da744b143b62779abf063bd7fb9f..0000000000000000000000000000000000000000 --- a/worker.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python3 -import time -import traceback -import configparser -import json -import shlex -import sys -import subprocess -import os -from workerapi import WorkerApi - -config = configparser.ConfigParser() -config.read('config.ini') - -api = WorkerApi(config['API']['BASE'],config['API']['KEY']) - -id = int(sys.argv[1]) -jobtype = sys.argv[2] -priority = sys.argv[3] -data = json.loads(sys.argv[4]) -print(sys.argv) - -if (jobtype == 'thumbnail'): - inputfile = config['MAIN']['VIDEOS_RELEASED']+'/'+data['path'] - outputfile = config['MAIN']['VIDEOS_RELEASED']+'/'+config['thumbnail']['folder']+'/'+'l_'+str(data['lectureid'])+'.jpg' - api.job_ping(id=id) - try: - duration = float(subprocess.check_output(["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", inputfile])) - except: - duration = 0 - api.job_ping(id=id) - if subprocess.call(["ffmpeg", "-loglevel", "error", "-y", "-ss", str(duration*0.4), "-i", inputfile, "-vf", "scale=640:-1", "-frames:v", "1", outputfile]) == 0: - api.job_ping(id=id,state='finished') - else: - api.job_ping(id=id,state='failed') - print('failed') diff --git a/workerapi.py b/workerapi.py deleted file mode 100644 index 6e7c17ca6c7cfaa0485ac8b269a90dfad6aa1603..0000000000000000000000000000000000000000 --- a/workerapi.py +++ /dev/null @@ -1,24 +0,0 @@ -import requests -from socket import gethostname -import json - - -class WorkerApi(object): - def __init__(self, baseurl,apikey): - self.baseurl = baseurl - self.apikey = apikey - - def worker_ping(self): - r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey}) - return r.status_code == 200 - - def worker_schedule(self, jobtypes): - r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': jobtypes}) - if r.status_code == 200: - return r.json() - else: - return False - - def job_ping(self, id, state='running', status={}): - r = requests.post(self.baseurl+'/internal/jobs/api/job/'+str(id)+'/ping', params={'apikey': self.apikey, 'host': gethostname(), 'state': state, 'status': json.dumps(status) }) - return r.status_code == 200