jobs.py 5.24 KB
Newer Older
Andreas Valder's avatar
Andreas Valder committed
1
from server import *
2
import json
3
import random
4
from time import sleep
5

6
@app.route('/internal/jobs/overview')
7
@register_navbar('Jobs', iconlib='fa', icon='suitcase', group='weitere')
Andreas Valder's avatar
Andreas Valder committed
8 9
@mod_required
def jobs_overview():
10 11 12 13 14 15 16 17 18
	if 'page' in request.args:
		page = max(0, int(request.args['page']))
	else:
		page = 0
	if 'pagesize' in request.args:
		pagesize = min(500, int(request.args['pagesize']))
	else:
		pagesize = 50

19
	worker = query('SELECT * FROM worker ORDER BY last_ping DESC')
20 21 22 23 24 25 26 27 28 29 30 31 32

	# get filter options
	filter_values = {
			'type': query('SELECT distinct type FROM jobs'),
			'state': query('SELECT distinct state FROM jobs'),
			'worker': query('SELECT distinct worker FROM jobs')}
	
	# parse filter
	filter = {
			'type': request.args.get('type','%'),
			'state': request.args.get('state','failed'),
			'worker': request.args.get('worker','%') }

33
	pagecount = math.ceil(query('SELECT count(id) as count FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?)', filter['type'], filter['worker'], filter['worker'], filter['state'])[0]['count']/pagesize)
Andreas Valder's avatar
Andreas Valder committed
34
	jobs = query('SELECT * FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?) ORDER BY `time_created` DESC LIMIT ? OFFSET ?', filter['type'], filter['worker'], filter['worker'], filter['state'], pagesize, page*pagesize)
35 36 37 38 39 40 41
	active_streams = query('SELECT lectures.*, "course" AS sep, courses.*, "job" AS sep, jobs.* FROM lectures JOIN courses ON (courses.id = lectures.course_id) JOIN jobs ON (jobs.id = lectures.stream_job) WHERE lectures.stream_job')
	for stream in active_streams:
		try:
			stream['destbase'] = json.loads((stream['job']['data'] or '{}')).get('destbase')
		except:
			pass
	return render_template('jobs_overview.html',worker=worker,jobs=jobs, filter_values=filter_values, filter=filter, page=page, pagesize=pagesize, pagecount=pagecount, active_streams=active_streams)
42

43 44
@app.route('/internal/jobs/action/<action>', methods=['GET', 'POST'])
@app.route('/internal/jobs/action/<action>/<jobid>', methods=['GET', 'POST'])
45 46
@mod_required
@csrf_protect
47
def jobs_action(action, jobid=None):
48
	if action == 'clear_failed':
49
		query('UPDATE jobs SET state = "deleted" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid)
50
	elif action == 'retry_failed':
51
		query('UPDATE jobs SET state = "ready", canceled = 0 WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid)
52
	elif action == 'copy' and jobid:
53
		query("INSERT INTO jobs (type, priority, queue, state, data, time_created) SELECT type, priority, queue, 'ready', data, ? FROM jobs where id = ?", datetime.now(), jobid)
54
	elif action == 'delete' and jobid:
55
		query('UPDATE jobs SET state = "deleted" WHERE id = ?', jobid)
56 57
	elif action == 'cancel' and jobid:
		cancel_job(jobid)
58 59
	return redirect(request.values.get('ref', url_for('jobs_overview')))

60 61 62 63 64 65 66 67 68 69
def jobs_api_token_required(func):
	@wraps(func)
	def decorator(*args, **kwargs):
		if 'apikey' in request.values:
			token = request.values['apikey']
		elif request.get_json() and ('apikey' in request.get_json()):
			token = request.get_json()['apikey']
		else:
			token = None
		
70
		if not token == config.get('JOBS_API_KEY', [None]):
71 72 73 74 75
			return 'Permission denied', 403
		else:
			return func(*args, **kwargs)
	return decorator

76
@app.route('/internal/jobs/api/job/<int:id>/ping', methods=['GET', 'POST'])
77 78 79
@jobs_api_token_required
def jobs_ping(id):
	hostname = request.values['host']
80
	status = json.dumps(json.loads(request.values['status']), default=date_json_handler)
Andreas Valder's avatar
Andreas Valder committed
81
	state = request.values['state']
82 83
	if state == 'finished':
		query('UPDATE jobs SET time_finished = ?, status = ?, state = "finished" where id = ?', datetime.now(), status, id)
Andreas Valder's avatar
Andreas Valder committed
84
	else:
85
		query('UPDATE jobs SET worker = ?, last_ping = ?, status = ?, state = ? where id = ?', hostname, datetime.now(), status, state, id)
86
	job_handler_handle(id, state)
87
	job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
88 89 90 91
	if job['canceled']:
		return 'Job canceled', 205
	else:
		return 'OK', 200
92

93
@app.route('/internal/jobs/api/worker/<hostname>/schedule', methods=['POST'])
94
@jobs_api_token_required
Andreas Valder's avatar
Andreas Valder committed
95
def jobs_schedule(hostname):
96
	query('REPLACE INTO worker (hostname, last_ping) values (?, ?)', hostname, datetime.now())
97
	hostdata = request.get_json()
98
	if not hostdata:
Andreas Valder's avatar
Andreas Valder committed
99
		return 'no hostdata sent', 400
100
	job = None
101
	tries = 0
102
	jobtypes = hostdata['jobtypes'] if 'jobtypes' in hostdata else []
103 104
	while (not job):
		try:
105 106
			query("BEGIN")
			for i in query('SELECT * FROM jobs WHERE state = "ready" ORDER BY priority DESC'):
107
				if i['type'] in hostdata['jobtypes'] and i['queue'] in hostdata['queues']:
108 109 110 111 112
					job = i
					break
			if not job:
				return 'no jobs', 503
			modify('UPDATE jobs SET state="scheduled", worker = ?, time_scheduled = ? WHERE id = ?', hostname, datetime.now(), job['id']) 
113
			query("COMMIT")
114
		except:
115
			tries += 1
116
			job = None
117
			sleep(random.random())
118 119 120
			if tries > 10:
				return 'no jobs', 503

121
	return Response(json.dumps(job, default=date_json_handler), mimetype='application/json')
122 123 124 125 126 127

@app.route('/internal/jobs/add/forward', methods=['GET', 'POST'])
@mod_required
@csrf_protect
def add_forward_job():
	schedule_job('live_forward', {'src': request.values['src'],
128
			'dest': request.values['dest'], 'format': 'flv'}, priority=9)
129 130
	return redirect(request.values.get('ref', url_for('jobs_overview')))