Commit 8f461c1f authored by Julian Rother's avatar Julian Rother

Removed worker.py and restructured manager.py to launch ffworker binaries

parent 2e021a88
[MAIN]
VIDEOS_RAW=/mnt/raw
VIDEOS_RELEASED=/mnt/released
[API]
KEY=
BASE=http://localhost:5000
[JOBS]
TYPES=thumbnail
[thumbnail]
width=640
height=-1
folder=thumbnail
#!/usr/bin/env python3
import os
import sys
import time
import threading
import sched
......@@ -7,11 +9,37 @@ import traceback
import configparser
import psutil
import subprocess
from workerapi import WorkerApi
import json
import requests
from socket import gethostname
config = configparser.ConfigParser()
config.read('config.ini')
class WorkerApi(object):
def __init__(self, baseurl,apikey):
self.baseurl = baseurl
self.apikey = apikey
def worker_ping(self):
r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey})
return r.status_code == 200
def worker_schedule(self, jobtypes):
r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': ",".join(jobtypes)})
if r.status_code == 200:
return r.json()
else:
return False
configfile = "/etc/worker.conf"
if len(sys.argv) > 1:
configfile = sys.argv[1]
with open(configfile, "r") as f:
for line in f.readlines():
line = line.split('#')[0]
if "=" not in line:
continue
key, value = line.split("=", 1)
os.environ["WORKER_"+key.strip()] = value.strip()
scheduler = sched.scheduler()
def run_scheduler():
......@@ -34,7 +62,8 @@ def sched_func(delay, priority=0, firstdelay=None, args=[], kargs={}):
return func
return wrapper
api = WorkerApi(config['API']['BASE'],config['API']['KEY'])
api = WorkerApi(os.environ.get("WORKER_APIBASE", "http://127.0.0.1:999999/nourl"),
os.environ.get("WORKER_APIKEY", "empty"))
threading.Thread(target=run_scheduler, daemon=True).start()
......@@ -44,6 +73,15 @@ def ping_website_for_host():
if not api.worker_ping():
print("Error sending host ping")
workerdir = os.environ.get("WORKER_WORKERDIR", "/usr/local/lib/worker")
def get_jobtypes():
res = []
for name in os.listdir(workerdir):
if os.access(workerdir+"/"+name, os.X_OK):
res.append(name)
return res
procs = []
while True:
......@@ -56,9 +94,11 @@ while True:
if psutil.cpu_percent() > 85:
time.sleep(0.1)
continue
j = api.worker_schedule(config['JOBS']['TYPES'])
j = api.worker_schedule(get_jobtypes())
if not j:
time.sleep(1)
continue
print("started jobid %i"%j['id'])
procs.append(subprocess.Popen(['./worker.py', str(j['id']), str(j['type']), str(j['priority']) , str(j['data']) ] ))
if str(j['type']) in get_jobtypes():
procs.append(subprocess.Popen([workerdir+"/"+str(j['type']), str(j['id']),
str(j['type']), str(j['priority']) , str(j['data']) ] ))
WORKERDIR=/usr/local/lib/worker
RAW=/mnt/raw
RELEASED=/mnt/released
TMP=/mnt/video-main/kodiert
APIKEY=
APIBASE=http://localhost:5000
#!/usr/bin/env python3
import time
import traceback
import configparser
import json
import shlex
import sys
import subprocess
import os
from workerapi import WorkerApi
config = configparser.ConfigParser()
config.read('config.ini')
api = WorkerApi(config['API']['BASE'],config['API']['KEY'])
id = int(sys.argv[1])
jobtype = sys.argv[2]
priority = sys.argv[3]
data = json.loads(sys.argv[4])
print(sys.argv)
if (jobtype == 'thumbnail'):
inputfile = config['MAIN']['VIDEOS_RELEASED']+'/'+data['path']
outputfile = config['MAIN']['VIDEOS_RELEASED']+'/'+config['thumbnail']['folder']+'/'+'l_'+str(data['lectureid'])+'.jpg'
api.job_ping(id=id)
try:
duration = float(subprocess.check_output(["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", inputfile]))
except:
duration = 0
api.job_ping(id=id)
if subprocess.call(["ffmpeg", "-loglevel", "error", "-y", "-ss", str(duration*0.4), "-i", inputfile, "-vf", "scale=640:-1", "-frames:v", "1", outputfile]) == 0:
api.job_ping(id=id,state='finished')
else:
api.job_ping(id=id,state='failed')
print('failed')
import requests
from socket import gethostname
import json
class WorkerApi(object):
def __init__(self, baseurl,apikey):
self.baseurl = baseurl
self.apikey = apikey
def worker_ping(self):
r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/ping', params={'apikey': self.apikey})
return r.status_code == 200
def worker_schedule(self, jobtypes):
r = requests.post(self.baseurl+'/internal/jobs/api/worker/'+gethostname()+'/schedule', json={'apikey': self.apikey, 'jobtypes': jobtypes})
if r.status_code == 200:
return r.json()
else:
return False
def job_ping(self, id, state='running', status={}):
r = requests.post(self.baseurl+'/internal/jobs/api/job/'+str(id)+'/ping', params={'apikey': self.apikey, 'host': gethostname(), 'state': state, 'status': json.dumps(status) })
return r.status_code == 200
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment