from server import * import json import random from time import sleep @app.route('/internal/jobs/overview') @register_navbar('Jobs', iconlib='fa', icon='suitcase', group='weitere') @mod_required def jobs_overview(): if 'page' in request.args: page = max(0, int(request.args['page'])) else: page = 0 if 'pagesize' in request.args: pagesize = min(500, int(request.args['pagesize'])) else: pagesize = 50 worker = query('SELECT * FROM worker ORDER BY last_ping DESC') # get filter options filter_values = { 'type': query('SELECT distinct type FROM jobs'), 'state': query('SELECT distinct state FROM jobs'), 'worker': query('SELECT distinct worker FROM jobs')} # parse filter filter = { 'type': request.args.get('type','%'), 'state': request.args.get('state','failed'), 'worker': request.args.get('worker','%') } pagecount = math.ceil(query('SELECT count(id) as count FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?)', filter['type'], filter['worker'], filter['worker'], filter['state'])[0]['count']/pagesize) jobs = query('SELECT * FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?) ORDER BY `time_created` DESC LIMIT ? OFFSET ?', filter['type'], filter['worker'], filter['worker'], filter['state'], pagesize, page*pagesize) return render_template('jobs_overview.html',worker=worker,jobs=jobs, filter_values=filter_values, filter=filter, page=page, pagesize=pagesize, pagecount=pagecount) @app.route('/internal/jobs/action/<action>', methods=['GET', 'POST']) @app.route('/internal/jobs/action/<action>/<jobid>', methods=['GET', 'POST']) @mod_required @csrf_protect def jobs_action(action, jobid=None): if action == 'clear_failed': query('UPDATE jobs SET state = "deleted" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid) elif action == 'retry_failed': query('UPDATE jobs SET state = "ready", canceled = 0 WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid) elif action == 'copy' and jobid: query("INSERT INTO jobs (type, priority, queue, state, data, time_created) SELECT type, priority, queue, 'ready', data, ? FROM jobs where id = ?", datetime.now(), jobid) elif action == 'delete' and jobid: query('UPDATE jobs SET state = "deleted" WHERE id = ?', jobid) elif action == 'cancel' and jobid: cancel_job(jobid) return redirect(request.values.get('ref', url_for('jobs_overview'))) def jobs_api_token_required(func): @wraps(func) def decorator(*args, **kwargs): if 'apikey' in request.values: token = request.values['apikey'] elif request.get_json() and ('apikey' in request.get_json()): token = request.get_json()['apikey'] else: token = None if not token == config.get('JOBS_API_KEY', [None]): return 'Permission denied', 403 else: return func(*args, **kwargs) return decorator @app.route('/internal/jobs/api/job/<int:id>/ping', methods=['GET', 'POST']) @jobs_api_token_required def jobs_ping(id): hostname = request.values['host'] status = json.dumps(json.loads(request.values['status']), default=date_json_handler) state = request.values['state'] if state == 'finished': query('UPDATE jobs SET time_finished = ?, status = ?, state = "finished" where id = ?', datetime.now(), status, id) else: query('UPDATE jobs SET worker = ?, last_ping = ?, status = ?, state = ? where id = ?', hostname, datetime.now(), status, state, id) job_handler_handle(id, state) job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0] if job['canceled']: return 'Job canceled', 205 else: return 'OK', 200 @app.route('/internal/jobs/api/worker/<hostname>/schedule', methods=['POST']) @jobs_api_token_required def jobs_schedule(hostname): query('REPLACE INTO worker (hostname, last_ping) values (?, ?)', hostname, datetime.now()) hostdata = request.get_json() if not hostdata: return 'no hostdata sent', 400 job = None tries = 0 jobtypes = hostdata['jobtypes'] if 'jobtypes' in hostdata else [] while (not job): try: query("BEGIN") for i in query('SELECT * FROM jobs WHERE state = "ready" ORDER BY priority DESC'): if i['type'] in hostdata['jobtypes'] and i['queue'] in hostdata['queues']: job = i break if not job: return 'no jobs', 503 modify('UPDATE jobs SET state="scheduled", worker = ?, time_scheduled = ? WHERE id = ?', hostname, datetime.now(), job['id']) query("COMMIT") except: tries += 1 job = None sleep(random.random()) if tries > 10: return 'no jobs', 503 return Response(json.dumps(job, default=date_json_handler), mimetype='application/json') @app.route('/internal/jobs/add/forward', methods=['GET', 'POST']) @mod_required @csrf_protect def add_forward_job(): schedule_job('live_forward', {'src': request.values['src'], 'dest': request.values['dest'], 'format': 'flv'}, priority=9) return redirect(request.values.get('ref', url_for('jobs_overview')))