Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
jobs.py 4.85 KiB
from server import *
import json
import random
from time import sleep
@app.route('/internal/jobs/overview')
@register_navbar('Jobs', iconlib='fa', icon='suitcase', group='weitere')
@mod_required
def jobs_overview():
if 'page' in request.args:
page = max(0, int(request.args['page']))
else:
page = 0
if 'pagesize' in request.args:
pagesize = min(500, int(request.args['pagesize']))
else:
pagesize = 50
worker = query('SELECT * FROM worker ORDER BY last_ping DESC')
# get filter options
filter_values = {
'type': query('SELECT distinct type FROM jobs'),
'state': query('SELECT distinct state FROM jobs'),
'worker': query('SELECT distinct worker FROM jobs')}
# parse filter
filter = {
'type': request.args.get('type','%'),
'state': request.args.get('state','failed'),
'worker': request.args.get('worker','%') }
pagecount = math.ceil(query('SELECT count(id) as count FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?)', filter['type'], filter['worker'], filter['worker'], filter['state'])[0]['count']/pagesize)
jobs = query('SELECT * FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?) ORDER BY `time_created` DESC LIMIT ? OFFSET ?', filter['type'], filter['worker'], filter['worker'], filter['state'], pagesize, page*pagesize)
return render_template('jobs_overview.html',worker=worker,jobs=jobs, filter_values=filter_values, filter=filter, page=page, pagesize=pagesize, pagecount=pagecount)
@app.route('/internal/jobs/action/<action>', methods=['GET', 'POST'])
@app.route('/internal/jobs/action/<action>/<jobid>', methods=['GET', 'POST'])
@mod_required
@csrf_protect
def jobs_action(action, jobid=None):
if action == 'clear_failed':
query('UPDATE jobs SET state = "deleted" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid)
elif action == 'retry_failed':
query('UPDATE jobs SET state = "ready", canceled = 0 WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid)
elif action == 'copy' and jobid:
query("INSERT INTO jobs (type, priority, queue, state, data, time_created) SELECT type, priority, queue, 'ready', data, ? FROM jobs where id = ?", datetime.now(), jobid)
elif action == 'delete' and jobid:
query('UPDATE jobs SET state = "deleted" WHERE id = ?', jobid)
elif action == 'cancel' and jobid:
cancel_job(jobid)
return redirect(request.values.get('ref', url_for('jobs_overview')))
def jobs_api_token_required(func):
@wraps(func)
def decorator(*args, **kwargs):
if 'apikey' in request.values:
token = request.values['apikey']
elif request.get_json() and ('apikey' in request.get_json()):
token = request.get_json()['apikey']
else:
token = None
if not token == config.get('JOBS_API_KEY', [None]):
return 'Permission denied', 403
else:
return func(*args, **kwargs)
return decorator
@app.route('/internal/jobs/api/job/<int:id>/ping', methods=['GET', 'POST'])
@jobs_api_token_required
def jobs_ping(id):
hostname = request.values['host']
status = json.dumps(json.loads(request.values['status']), default=date_json_handler)
state = request.values['state']
if state == 'finished':
query('UPDATE jobs SET time_finished = ?, status = ?, state = "finished" where id = ?', datetime.now(), status, id)
else:
query('UPDATE jobs SET worker = ?, last_ping = ?, status = ?, state = ? where id = ?', hostname, datetime.now(), status, state, id)
job_handler_handle(id, state)
job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
if job['canceled']:
return 'Job canceled', 205
else:
return 'OK', 200
@app.route('/internal/jobs/api/worker/<hostname>/schedule', methods=['POST'])
@jobs_api_token_required
def jobs_schedule(hostname):
query('REPLACE INTO worker (hostname, last_ping) values (?, ?)', hostname, datetime.now())
hostdata = request.get_json()
if not hostdata:
return 'no hostdata sent', 400
job = None
tries = 0
jobtypes = hostdata['jobtypes'] if 'jobtypes' in hostdata else []
while (not job):
try:
query("BEGIN")
for i in query('SELECT * FROM jobs WHERE state = "ready" ORDER BY priority DESC'):
if i['type'] in hostdata['jobtypes'] and i['queue'] in hostdata['queues']:
job = i
break
if not job:
return 'no jobs', 503
modify('UPDATE jobs SET state="scheduled", worker = ?, time_scheduled = ? WHERE id = ?', hostname, datetime.now(), job['id'])
query("COMMIT")
except:
tries += 1
job = None
sleep(random.random())
if tries > 10:
return 'no jobs', 503
return Response(json.dumps(job, default=date_json_handler), mimetype='application/json')
@app.route('/internal/jobs/add/forward', methods=['GET', 'POST'])
@mod_required
@csrf_protect
def add_forward_job():
schedule_job('live_forward', {'src': request.values['src'],
'dest': request.values['dest'], 'format': 'flv'}, priority=9)
return redirect(request.values.get('ref', url_for('jobs_overview')))