jobs.py 5.05 KB
Newer Older
Andreas Valder's avatar
Andreas Valder committed
1
2
from server import *
import traceback
Andreas Valder's avatar
Andreas Valder committed
3
import json
Andreas Valder's avatar
Andreas Valder committed
4
import random
5
from sorter import schedule_thumbnail
Andreas Valder's avatar
Andreas Valder committed
6

7
@app.route('/internal/jobs/overview')
Andreas Valder's avatar
Andreas Valder committed
8
9
10
@register_navbar('Jobs', iconlib='fa',  icon='suitcase')
@mod_required
def jobs_overview():
11
12
13
14
15
16
17
18
19
	if 'page' in request.args:
		page = max(0, int(request.args['page']))
	else:
		page = 0
	if 'pagesize' in request.args:
		pagesize = min(500, int(request.args['pagesize']))
	else:
		pagesize = 50

Andreas Valder's avatar
Andreas Valder committed
20
	worker = query('SELECT * FROM worker ORDER BY last_ping DESC')
21
22
23
24
25
26
27
28
29
30
31
32
33

	# get filter options
	filter_values = {
			'type': query('SELECT distinct type FROM jobs'),
			'state': query('SELECT distinct state FROM jobs'),
			'worker': query('SELECT distinct worker FROM jobs')}
	
	# parse filter
	filter = {
			'type': request.args.get('type','%'),
			'state': request.args.get('state','failed'),
			'worker': request.args.get('worker','%') }

Andreas Valder's avatar
Andreas Valder committed
34
	pagecount = math.ceil(query('SELECT count(id) as count  FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?)', filter['type'], filter['worker'], filter['worker'], filter['state'])[0]['count']/pagesize)
Andreas Valder's avatar
Andreas Valder committed
35
	jobs = query('SELECT * FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?) ORDER BY `time_created` DESC LIMIT ? OFFSET ?', filter['type'], filter['worker'], filter['worker'], filter['state'], pagesize, page*pagesize)
36
	return render_template('jobs_overview.html',worker=worker,jobs=jobs, filter_values=filter_values, filter=filter, page=page, pagesize=pagesize, pagecount=pagecount)
Andreas Valder's avatar
Andreas Valder committed
37

38
39
@app.route('/internal/jobs/action/<action>', methods=['GET', 'POST'])
@app.route('/internal/jobs/action/<action>/<jobid>', methods=['GET', 'POST'])
40
41
@mod_required
@csrf_protect
42
def jobs_action(action, jobid=None):
43
	if action == 'clear_failed':
44
		query('UPDATE jobs SET state="deleted" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid)
45
46
	if action == 'retry_failed':
		query('UPDATE jobs SET state="ready" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid)
47
48
49
	if action == 'copy':
		if jobid:
			query("INSERT INTO jobs SELECT NULL, type, priority, 'ready', '', '' , ?, '', NULL, data, '{}' FROM jobs where ID=?;", datetime.now(), jobid)
50
51
52
	if action == 'delete':
		if jobid:
			query('UPDATE jobs SET state="deleted" WHERE id = ?', jobid)
53
54
55
56
57
	if action == 'add':
		jobtype = request.values.get('type', None)
		if jobtype == 'thumbnail':
			lectureid = int(request.values.get('lecture_id', -1))
			schedule_thumbnail(lectureid)
58
59
	return redirect(request.values.get('ref', url_for('jobs_overview')))

Andreas Valder's avatar
Andreas Valder committed
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def jobs_api_token_required(func):
	@wraps(func)
	def decorator(*args, **kwargs):
		if 'apikey' in request.values:
			token = request.values['apikey']
		elif request.get_json() and ('apikey' in request.get_json()):
			token = request.get_json()['apikey']
		else:
			token = None
		
		if not token == config['JOBS_API_KEY']:
			return 'Permission denied', 403
		else:
			return func(*args, **kwargs)
	return decorator

def date_json_handler(obj):
	return obj.isoformat() if hasattr(obj, 'isoformat') else obj

Andreas Valder's avatar
Andreas Valder committed
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@sched_func(10)
def jobs_catch_broken():
	# scheduled but never pinged
	query('BEGIN')
	query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10))
	try:
		query('COMMIT')
	except:
		pass
	# no pings since 60s
	query('BEGIN')
	query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60))
	try:
		query('COMMIT')
	except:
		pass

96
@app.route('/internal/jobs/api/worker/<hostname>/ping', methods=['GET', 'POST'])
Andreas Valder's avatar
Andreas Valder committed
97
@jobs_api_token_required
Andreas Valder's avatar
Andreas Valder committed
98
def jobs_worker_ping(hostname):
99
	query('REPLACE INTO worker (hostname, last_ping) values (?, ?)', hostname, datetime.now())
Andreas Valder's avatar
Andreas Valder committed
100
101
	return 'OK',200

102
@app.route('/internal/jobs/api/job/<int:id>/ping', methods=['GET', 'POST'])
Andreas Valder's avatar
Andreas Valder committed
103
104
105
@jobs_api_token_required
def jobs_ping(id):
	hostname = request.values['host']
Andreas Valder's avatar
Andreas Valder committed
106
107
	status = json.dumps(request.values['status'], default=date_json_handler)
	state = request.values['state']
108
109
	if state == 'finished':
		query('UPDATE jobs SET time_finished = ?, status = ?, state = "finished" where id = ?', datetime.now(), status, id)
Andreas Valder's avatar
Andreas Valder committed
110
	else:
111
		query('UPDATE jobs SET worker = ?, last_ping = ?, status = ?, state = ? where id = ?', hostname, datetime.now(), status, state, id)
Andreas Valder's avatar
Andreas Valder committed
112
113
	return 'OK',200

114
@app.route('/internal/jobs/api/worker/<hostname>/schedule', methods=['POST'])
Andreas Valder's avatar
Andreas Valder committed
115
@jobs_api_token_required
Andreas Valder's avatar
Andreas Valder committed
116
def jobs_schedule(hostname):
Andreas Valder's avatar
Andreas Valder committed
117
118
	hostdata =  request.get_json()
	if not hostdata:
Andreas Valder's avatar
Andreas Valder committed
119
		return 'no hostdata sent', 400
Andreas Valder's avatar
Andreas Valder committed
120
	job = None
Andreas Valder's avatar
Andreas Valder committed
121
	jobtypes = hostdata['jobtypes'] if 'jobtypes' in hostdata else []
Andreas Valder's avatar
Andreas Valder committed
122
123
124
125
126
127
	while (not job):
		query("BEGIN")
		for i in query('SELECT * FROM jobs WHERE state = "ready" ORDER BY priority DESC'):
			if i['type'] in hostdata['jobtypes'].split(','):
				job=i
				break
Andreas Valder's avatar
Andreas Valder committed
128
129
130
		if not job:
			return 'no jobs', 503
		modify('UPDATE jobs SET state="scheduled", worker = ?, time_scheduled = ? WHERE id = ?', hostname, datetime.now(), job['id']) 
Andreas Valder's avatar
Andreas Valder committed
131
132
133
134
135
		try:
			query("COMMIT")
		except:
			job = None
			sleep(random.randinti(0,50))
Andreas Valder's avatar
Andreas Valder committed
136
	return Response(json.dumps(job, default=date_json_handler),  mimetype='application/json')