jobs.py 3.86 KB
Newer Older
Andreas Valder's avatar
Andreas Valder committed
1
2
from server import *
import traceback
Andreas Valder's avatar
Andreas Valder committed
3
import json
Andreas Valder's avatar
Andreas Valder committed
4
import random
Andreas Valder's avatar
Andreas Valder committed
5

6
@app.route('/internal/jobs/overview')
Andreas Valder's avatar
Andreas Valder committed
7
8
9
@register_navbar('Jobs', iconlib='fa',  icon='suitcase')
@mod_required
def jobs_overview():
10
11
12
13
14
15
16
17
	if 'page' in request.args:
		page = max(0, int(request.args['page']))
	else:
		page = 0
	if 'pagesize' in request.args:
		pagesize = min(500, int(request.args['pagesize']))
	else:
		pagesize = 50
18
	pagecount = math.ceil(query('SELECT count(id) as count FROM jobs')[0]['count']/pagesize)
19

Andreas Valder's avatar
Andreas Valder committed
20
	worker = query('SELECT * FROM worker ORDER BY last_ping DESC')
21
22
23
24
25
26
27
28
29
30
31
32
33

	# get filter options
	filter_values = {
			'type': query('SELECT distinct type FROM jobs'),
			'state': query('SELECT distinct state FROM jobs'),
			'worker': query('SELECT distinct worker FROM jobs')}
	
	# parse filter
	filter = {
			'type': request.args.get('type','%'),
			'state': request.args.get('state','failed'),
			'worker': request.args.get('worker','%') }

Andreas Valder's avatar
Andreas Valder committed
34
	jobs = query('SELECT * FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?) ORDER BY `time_created` DESC LIMIT ? OFFSET ?', filter['type'], filter['worker'], filter['worker'], filter['state'], pagesize, page*pagesize)
35
	return render_template('jobs_overview.html',worker=worker,jobs=jobs, filter_values=filter_values, filter=filter, page=page, pagesize=pagesize, pagecount=pagecount)
Andreas Valder's avatar
Andreas Valder committed
36

Andreas Valder's avatar
Andreas Valder committed
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def jobs_api_token_required(func):
	@wraps(func)
	def decorator(*args, **kwargs):
		if 'apikey' in request.values:
			token = request.values['apikey']
		elif request.get_json() and ('apikey' in request.get_json()):
			token = request.get_json()['apikey']
		else:
			token = None
		
		if not token == config['JOBS_API_KEY']:
			return 'Permission denied', 403
		else:
			return func(*args, **kwargs)
	return decorator

def date_json_handler(obj):
	return obj.isoformat() if hasattr(obj, 'isoformat') else obj

Andreas Valder's avatar
Andreas Valder committed
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@sched_func(10)
def jobs_catch_broken():
	# scheduled but never pinged
	query('BEGIN')
	query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10))
	try:
		query('COMMIT')
	except:
		pass
	# no pings since 60s
	query('BEGIN')
	query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60))
	try:
		query('COMMIT')
	except:
		pass

73
@app.route('/internal/jobs/api/worker/<hostname>/ping', methods=['GET', 'POST'])
Andreas Valder's avatar
Andreas Valder committed
74
@jobs_api_token_required
Andreas Valder's avatar
Andreas Valder committed
75
def jobs_worker_ping(hostname):
76
	query('REPLACE INTO worker (hostname, last_ping) values (?, ?)', hostname, datetime.now())
Andreas Valder's avatar
Andreas Valder committed
77
78
	return 'OK',200

79
@app.route('/internal/jobs/api/job/<int:id>/ping', methods=['GET', 'POST'])
Andreas Valder's avatar
Andreas Valder committed
80
81
82
@jobs_api_token_required
def jobs_ping(id):
	hostname = request.values['host']
Andreas Valder's avatar
Andreas Valder committed
83
84
	status = json.dumps(request.values['status'], default=date_json_handler)
	state = request.values['state']
85
86
	if state == 'finished':
		query('UPDATE jobs SET time_finished = ?, status = ?, state = "finished" where id = ?', datetime.now(), status, id)
Andreas Valder's avatar
Andreas Valder committed
87
	else:
88
		query('UPDATE jobs SET worker = ?, last_ping = ?, status = ?, state = ? where id = ?', hostname, datetime.now(), status, state, id)
Andreas Valder's avatar
Andreas Valder committed
89
90
	return 'OK',200

91
@app.route('/internal/jobs/api/worker/<hostname>/schedule', methods=['POST'])
Andreas Valder's avatar
Andreas Valder committed
92
@jobs_api_token_required
Andreas Valder's avatar
Andreas Valder committed
93
def jobs_schedule(hostname):
Andreas Valder's avatar
Andreas Valder committed
94
95
	hostdata =  request.get_json()
	if not hostdata:
Andreas Valder's avatar
Andreas Valder committed
96
		return 'no hostdata sent', 400
Andreas Valder's avatar
Andreas Valder committed
97
	job = None
Andreas Valder's avatar
Andreas Valder committed
98
	jobtypes = hostdata['jobtypes'] if 'jobtypes' in hostdata else []
Andreas Valder's avatar
Andreas Valder committed
99
100
101
102
103
104
	while (not job):
		query("BEGIN")
		for i in query('SELECT * FROM jobs WHERE state = "ready" ORDER BY priority DESC'):
			if i['type'] in hostdata['jobtypes'].split(','):
				job=i
				break
Andreas Valder's avatar
Andreas Valder committed
105
106
107
		if not job:
			return 'no jobs', 503
		modify('UPDATE jobs SET state="scheduled", worker = ?, time_scheduled = ? WHERE id = ?', hostname, datetime.now(), job['id']) 
Andreas Valder's avatar
Andreas Valder committed
108
109
110
111
112
		try:
			query("COMMIT")
		except:
			job = None
			sleep(random.randinti(0,50))
Andreas Valder's avatar
Andreas Valder committed
113
	return Response(json.dumps(job, default=date_json_handler),  mimetype='application/json')