Select Git revision
lecture.html
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
jobmanagement.py 2.28 KiB
from datetime import datetime, timedelta
import traceback
import json
from server import modify, query, date_json_handler, sched_func, notify_admins
job_handlers = {} #pylint: disable=invalid-name
def job_handler(*types, state='finished'):
def wrapper(func):
for jobtype in types:
if jobtype not in job_handlers:
job_handlers[jobtype] = {}
if state not in job_handlers[jobtype]:
job_handlers[jobtype][state] = []
job_handlers[jobtype][state].append(func)
return func
return wrapper
def job_handler_handle(id, state):
job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
type = job['type']
for func in job_handlers.get(type, {}).get(state, []):
try:
func(id, job['type'], json.loads(job['data']), state, json.loads(job['status']))
except Exception: #pylint: disable=broad-except
notify_admins('scheduler_exception', name=func.__name__, traceback=traceback.format_exc())
traceback.print_exc()
@sched_func(10)
def job_catch_broken():
# scheduled but never pinged
modify("BEGIN")
query('UPDATE jobs SET state=\'ready\' WHERE state=\'scheduled\' and time_scheduled < ?', datetime.now() - timedelta(seconds=10))
try:
modify("COMMIT")
except: #pylint: disable=bare-except
pass
# no pings since 60s
modify("BEGIN")
query('UPDATE jobs SET state=\'failed\' WHERE state=\'running\' and last_ping < ?', datetime.now() - timedelta(seconds=60))
try:
modify("COMMIT")
except: #pylint: disable=bare-except
pass
def job_set_state(id, state):
query('UPDATE jobs SET state=? WHERE id=?', state, id)
def schedule_job(jobtype, data=None, priority=0, queue="default"):
if not data:
data = {}
return modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)',
jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now(),
get_id=True)
def cancel_job(job_id):
query('UPDATE jobs SET state = \'deleted\' WHERE id = ? AND state = \'ready\'', job_id)
query('UPDATE jobs SET canceled = true WHERE id = ?', job_id)
def restart_job(job_id, canceled=False):
# info: sql no test cover
if canceled:
query('UPDATE jobs SET state = \'ready\', canceled = false WHERE id = ? AND state = \'failed\'', job_id)
else:
query('UPDATE jobs SET state = \'ready\' WHERE id = ? AND state = \'failed\' AND NOT canceled', job_id)