jobmanagement.py 1.67 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from server import modify, query, date_json_handler, sched_func
from datetime import datetime, timedelta
import json

job_handlers = {}
def job_handler(*types, state='finished'):
	def wrapper(func):
		for jobtype in types:
			if jobtype not in job_handlers:
				job_handlers[jobtype] = {}
			if state not in job_handlers[jobtype]:
				job_handlers[jobtype][state] = []
			job_handlers[jobtype][state].append(func)
		return func
	return wrapper

@sched_func(10)
def job_catch_broken():
	# scheduled but never pinged
	query('BEGIN')
	query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10))
	try:
		query('COMMIT')
	except:
		pass
	# no pings since 60s
	query('BEGIN')
	query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60))
	try:
		query('COMMIT')
	except:
		pass

def job_set_state(id, state):
	query('UPDATE jobs SET state=? WHERE id=?', state, id)

def schedule_job(jobtype, data=None, priority=0, queue="default"):
	if not data:
		data = {}
	return modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)',
			jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now())
def cancel_job(job_id):
	modify('UPDATE jobs SET state = "deleted" WHERE id = ? AND state = "ready"', job_id)
	modify('UPDATE jobs SET canceled = 1 WHERE id = ?', job_id)

def restart_job(job_id, canceled=False):
	if canceled:
		modify('UPDATE jobs SET state = "ready", canceled = 0 WHERE id = ? AND state = "failed"', job_id)
	else:
		modify('UPDATE jobs SET state = "ready" WHERE id = ? AND state = "failed" AND NOT canceled', job_id)