jobs.py 4.81 KB
Newer Older
Andreas Valder's avatar
Andreas Valder committed
1
import json
Andreas Valder's avatar
Andreas Valder committed
2
import random
3
import math
4
from time import sleep
Julian Rother's avatar
Julian Rother committed
5

6 7
from server import *

8
@app.route('/internal/jobs/overview')
9
@register_navbar('Jobs', iconlib='fa', icon='suitcase', group='weitere')
Andreas Valder's avatar
Andreas Valder committed
10 11
@mod_required
def jobs_overview():
12 13
	page = max(0, int(request.args.get('page', 0)))
	pagesize = min(500, int(request.args.get('pagesize', 50)))
Andreas Valder's avatar
Andreas Valder committed
14
	worker = query('SELECT * FROM worker ORDER BY last_ping DESC')
15 16 17 18 19 20

	# get filter options
	filter_values = {
			'type': query('SELECT distinct type FROM jobs'),
			'state': query('SELECT distinct state FROM jobs'),
			'worker': query('SELECT distinct worker FROM jobs')}
21

22 23
	# parse filter
	filter = {
24 25 26
			'type': request.args.get('type', '%'),
			'state': request.args.get('state', 'failed'),
			'worker': request.args.get('worker', '%')}
27

28 29 30 31 32 33 34 35 36
	pagecount = math.ceil(query('SELECT count(id) as count FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?)',
		filter['type'], filter['worker'], filter['worker'], filter['state'])[0]['count']/pagesize)
	jobs = query('SELECT * FROM jobs \
			WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?) \
			ORDER BY `time_created` DESC LIMIT ? OFFSET ?',
			filter['type'], filter['worker'], filter['worker'], filter['state'], pagesize, page*pagesize)
	active_streams = query('SELECT lectures.*, "course" AS sep, courses.*, "job" AS sep, jobs.* FROM lectures \
			JOIN courses ON (courses.id = lectures.course_id) \
			JOIN jobs ON (jobs.id = lectures.stream_job) WHERE lectures.stream_job')
37 38 39
	for stream in active_streams:
		try:
			stream['destbase'] = json.loads((stream['job']['data'] or '{}')).get('destbase')
40
		except: #pylint: disable=bare-except
41
			pass
42 43 44 45 46 47 48 49 50
	return render_template('jobs_overview.html',
			worker=worker,
			jobs=jobs,
			filter_values=filter_values,
			filter=filter,
			page=page,
			pagesize=pagesize,
			pagecount=pagecount,
			active_streams=active_streams)
Andreas Valder's avatar
Andreas Valder committed
51

52 53
@app.route('/internal/jobs/action/<action>', methods=['GET', 'POST'])
@app.route('/internal/jobs/action/<action>/<jobid>', methods=['GET', 'POST'])
54 55
@mod_required
@csrf_protect
56
def jobs_action(action, jobid=None):
57
	if action == 'clear_failed':
58
		query('UPDATE jobs SET state = "deleted" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid)
59
	elif action == 'retry_failed':
60
		query('UPDATE jobs SET state = "ready", canceled = 0 WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid)
Julian Rother's avatar
Julian Rother committed
61
	elif action == 'copy' and jobid:
62 63 64
		query("INSERT INTO jobs (type, priority, queue, state, data, time_created) \
				SELECT type, priority, queue, 'ready', data, ? FROM jobs where id = ?",
				datetime.now(), jobid)
Julian Rother's avatar
Julian Rother committed
65
	elif action == 'delete' and jobid:
66
		query('UPDATE jobs SET state = "deleted" WHERE id = ?', jobid)
Julian Rother's avatar
Julian Rother committed
67 68
	elif action == 'cancel' and jobid:
		cancel_job(jobid)
69 70
	return redirect(request.values.get('ref', url_for('jobs_overview')))

71
@app.route('/internal/jobs/api/job/<int:id>/ping', methods=['GET', 'POST'])
72
@api_token_required('JOBS_API_KEY')
Andreas Valder's avatar
Andreas Valder committed
73 74
def jobs_ping(id):
	hostname = request.values['host']
75
	status = json.dumps(json.loads(request.values['status']), default=date_json_handler)
Andreas Valder's avatar
Andreas Valder committed
76
	state = request.values['state']
77 78
	if state == 'finished':
		query('UPDATE jobs SET time_finished = ?, status = ?, state = "finished" where id = ?', datetime.now(), status, id)
Andreas Valder's avatar
Andreas Valder committed
79
	else:
80
		query('UPDATE jobs SET worker = ?, last_ping = ?, status = ?, state = ? where id = ?', hostname, datetime.now(), status, state, id)
81
	job_handler_handle(id, state)
82
	job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
Julian Rother's avatar
Julian Rother committed
83 84
	if job['canceled']:
		return 'Job canceled', 205
85
	return 'OK', 200
Andreas Valder's avatar
Andreas Valder committed
86

87
@app.route('/internal/jobs/api/worker/<hostname>/schedule', methods=['POST'])
88
@api_token_required('JOBS_API_KEY')
Andreas Valder's avatar
Andreas Valder committed
89
def jobs_schedule(hostname):
90
	query('REPLACE INTO worker (hostname, last_ping) values (?, ?)', hostname, datetime.now())
91
	hostdata = request.get_json()
Andreas Valder's avatar
Andreas Valder committed
92
	if not hostdata:
Andreas Valder's avatar
Andreas Valder committed
93
		return 'no hostdata sent', 400
Andreas Valder's avatar
Andreas Valder committed
94
	job = None
95
	tries = 0
96
	while not job:
Andreas Valder's avatar
Andreas Valder committed
97
		try:
98
			modify("BEGIN")
99
			for i in query('SELECT * FROM jobs WHERE state = "ready" ORDER BY priority DESC'):
100
				if i['type'] in hostdata['jobtypes'] and i['queue'] in hostdata['queues']:
101 102 103 104
					job = i
					break
			if not job:
				return 'no jobs', 503
105 106 107
			modify('UPDATE jobs SET state="scheduled", worker = ?, time_scheduled = ? WHERE id = ?', hostname, datetime.now(), job['id'])
			modify("COMMIT")
		except: #pylint: disable=bare-except
108
			tries += 1
Andreas Valder's avatar
Andreas Valder committed
109
			job = None
110
			sleep(random.random())
111 112 113
			if tries > 10:
				return 'no jobs', 503

114
	return Response(json.dumps(job, default=date_json_handler), mimetype='application/json')
115 116 117 118 119 120

@app.route('/internal/jobs/add/forward', methods=['GET', 'POST'])
@mod_required
@csrf_protect
def add_forward_job():
	schedule_job('live_forward', {'src': request.values['src'],
121
			'dest': request.values['dest'], 'format': 'flv'}, priority=9)
122
	return redirect(request.values.get('ref', url_for('jobs_overview')))