import json import random import math from time import sleep from server import * @app.route('/internal/jobs/overview') @register_navbar('Jobs', iconlib='fa', icon='suitcase', group='weitere') @mod_required def jobs_overview(): page = max(0, int(request.args.get('page', 0))) pagesize = min(500, int(request.args.get('pagesize', 50))) worker = query('SELECT * FROM worker ORDER BY last_ping DESC') # get filter options filter_values = { 'type': query('SELECT distinct type FROM jobs'), 'state': query('SELECT distinct state FROM jobs'), 'worker': query('SELECT distinct worker FROM jobs')} # parse filter filter = { 'type': request.args.get('type', '%'), 'state': request.args.get('state', 'failed'), 'worker': request.args.get('worker', '%')} condition_values = [] if filter['worker'] == '%': condition = 'WHERE (type like ?) AND (state like ?)' condition_values.extend([filter['type'], filter['state']]) else: condition = 'WHERE (type like ?) AND (worker like ?) AND (state like ?)' condition_values.extend([filter['type'], filter['worker'], filter['state']]) pagecount = math.ceil(query(f'SELECT count(id) as count FROM jobs {condition}', *condition_values)[0]['count']/pagesize) jobs = query(f'SELECT * FROM jobs \ {condition} \ ORDER BY "time_created" DESC LIMIT ? OFFSET ?', *[*condition_values, pagesize, page*pagesize]) active_streams = query('SELECT lectures.*, \'course\' AS sep, courses.*, \'job\' AS sep, jobs.* FROM lectures \ JOIN courses ON (courses.id = lectures.course_id) \ JOIN jobs ON (jobs.id = lectures.stream_job) WHERE lectures.stream_job IS NOT NULL') for stream in active_streams: try: stream['destbase'] = json.loads((stream['job']['data'] or '{}')).get('destbase') except: #pylint: disable=bare-except pass return render_template('jobs_overview.html', worker=worker, jobs=jobs, filter_values=filter_values, filter=filter, page=page, pagesize=pagesize, pagecount=pagecount, active_streams=active_streams) @app.route('/internal/jobs/action/<action>', methods=['GET', 'POST']) @app.route('/internal/jobs/action/<action>/<jobid>', methods=['GET', 'POST']) @mod_required @csrf_protect def jobs_action(action, jobid=None): if action == 'clear_failed': if jobid: query('UPDATE jobs SET state = \'deleted\' WHERE state = \'failed\' AND id = ?', jobid) else: query('UPDATE jobs SET state = \'deleted\' WHERE state = \'failed\'') elif action == 'retry_failed': if jobid: query('UPDATE jobs SET state = \'ready\', canceled = false WHERE state = \'failed\' AND id = ?', jobid) else: query('UPDATE jobs SET state = \'ready\', canceled = false WHERE state = \'failed\'') elif action == 'copy' and jobid: query("INSERT INTO jobs (type, priority, queue, state, data, time_created) \ SELECT type, priority, queue, 'ready', data, ? FROM jobs where id = ?", datetime.now(), jobid) elif action == 'delete' and jobid: query('UPDATE jobs SET state = \'deleted\' WHERE id = ?', jobid) elif action == 'cancel' and jobid: cancel_job(jobid) return redirect(request.values.get('ref', url_for('jobs_overview'))) @app.route('/internal/jobs/api/job/<int:id>/ping', methods=['GET', 'POST']) @api_token_required('JOBS_API_KEY') def jobs_ping(id): hostname = request.values['host'] status = json.dumps(json.loads(request.values['status']), default=date_json_handler) state = request.values['state'] if state == 'finished': query('UPDATE jobs SET time_finished = ?, status = ?, state = \'finished\' where id = ?', datetime.now(), status, id) else: query('UPDATE jobs SET worker = ?, last_ping = ?, status = ?, state = ? where id = ?', hostname, datetime.now(), status, state, id) job_handler_handle(id, state) job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0] if job['canceled']: return 'Job canceled', 205 return 'OK', 200 @app.route('/internal/jobs/api/worker/<hostname>/schedule', methods=['POST']) @api_token_required('JOBS_API_KEY') def jobs_schedule(hostname): if query("SELECT hostname FROM worker WHERE hostname = ?", hostname): query("UPDATE worker SET last_ping = ? WHERE hostname = ?", datetime.now(), hostname) else: query("INSERT INTO worker (hostname, last_ping) VALUES (?, ?)", hostname, datetime.now()) hostdata = request.get_json() if not hostdata: return 'no hostdata sent', 400 job = None tries = 0 while not job: try: modify("BEGIN") for i in query('SELECT * FROM jobs WHERE state = \'ready\' ORDER BY priority DESC'): if i['type'] in hostdata['jobtypes'] and i['queue'] in hostdata['queues']: job = i break if not job: return 'no jobs', 503 modify('UPDATE jobs SET state=\'scheduled\', worker = ?, time_scheduled = ? WHERE id = ?', hostname, datetime.now(), job['id']) modify("COMMIT") except: #pylint: disable=bare-except tries += 1 job = None sleep(random.random()) if tries > 10: return 'no jobs', 503 return Response(json.dumps(job, default=date_json_handler), mimetype='application/json') @app.route('/internal/jobs/add/forward', methods=['GET', 'POST']) @mod_required @csrf_protect def add_forward_job(): schedule_job('live_forward', {'src': request.values['src'], 'dest': request.values['dest'], 'format': 'flv'}, priority=9) return redirect(request.values.get('ref', url_for('jobs_overview')))