jobmanagement.py 2.1 KB
Newer Older
1
from server import modify, query, date_json_handler, sched_func, notify_admins
2
from datetime import datetime, timedelta
3
import traceback
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import json

job_handlers = {}
def job_handler(*types, state='finished'):
	def wrapper(func):
		for jobtype in types:
			if jobtype not in job_handlers:
				job_handlers[jobtype] = {}
			if state not in job_handlers[jobtype]:
				job_handlers[jobtype][state] = []
			job_handlers[jobtype][state].append(func)
		return func
	return wrapper

18
19
20
21
22
23
24
def job_handler_handle(id, state):
	job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
	type = job['type']
	for func in job_handlers.get(type, {}).get(state, []):
		try:
			func(id, job['type'], json.loads(job['data']), state, json.loads(job['status']))
		except Exception:
25
			notify_admins('scheduler_exception', name=func.__name__, traceback=traceback.format_exc())
26
27
			traceback.print_exc()

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@sched_func(10)
def job_catch_broken():
	# scheduled but never pinged
	query('BEGIN')
	query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10))
	try:
		query('COMMIT')
	except:
		pass
	# no pings since 60s
	query('BEGIN')
	query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60))
	try:
		query('COMMIT')
	except:
		pass

def job_set_state(id, state):
	query('UPDATE jobs SET state=? WHERE id=?', state, id)

def schedule_job(jobtype, data=None, priority=0, queue="default"):
	if not data:
		data = {}
	return modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)',
			jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now())
53

54
def cancel_job(job_id):
Andreas Valder's avatar
Andreas Valder committed
55
56
	query('UPDATE jobs SET state = "deleted" WHERE id = ? AND state = "ready"', job_id)
	query('UPDATE jobs SET canceled = 1 WHERE id = ?', job_id)
57
58
59

def restart_job(job_id, canceled=False):
	if canceled:
Andreas Valder's avatar
Andreas Valder committed
60
		query('UPDATE jobs SET state = "ready", canceled = 0 WHERE id = ? AND state = "failed"', job_id)
61
	else:
Andreas Valder's avatar
Andreas Valder committed
62
		query('UPDATE jobs SET state = "ready" WHERE id = ? AND state = "failed" AND NOT canceled', job_id)