Select Git revision
jobmanagement.py
Forked from
Video AG Infrastruktur / website
Source project has a limited visibility.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
jobmanagement.py 2.10 KiB
from server import modify, query, date_json_handler, sched_func, notify_admins
from datetime import datetime, timedelta
import traceback
import json
job_handlers = {}
def job_handler(*types, state='finished'):
def wrapper(func):
for jobtype in types:
if jobtype not in job_handlers:
job_handlers[jobtype] = {}
if state not in job_handlers[jobtype]:
job_handlers[jobtype][state] = []
job_handlers[jobtype][state].append(func)
return func
return wrapper
def job_handler_handle(id, state):
job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
type = job['type']
for func in job_handlers.get(type, {}).get(state, []):
try:
func(id, job['type'], json.loads(job['data']), state, json.loads(job['status']))
except Exception:
notify_admins('scheduler_exception', name=func.__name__, traceback=traceback.format_exc())
traceback.print_exc()
@sched_func(10)
def job_catch_broken():
# scheduled but never pinged
query('BEGIN')
query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10))
try:
query('COMMIT')
except:
pass
# no pings since 60s
query('BEGIN')
query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60))
try:
query('COMMIT')
except:
pass
def job_set_state(id, state):
query('UPDATE jobs SET state=? WHERE id=?', state, id)
def schedule_job(jobtype, data=None, priority=0, queue="default"):
if not data:
data = {}
return modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)',
jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now())
def cancel_job(job_id):
query('UPDATE jobs SET state = "deleted" WHERE id = ? AND state = "ready"', job_id)
query('UPDATE jobs SET canceled = 1 WHERE id = ?', job_id)
def restart_job(job_id, canceled=False):
if canceled:
query('UPDATE jobs SET state = "ready", canceled = 0 WHERE id = ? AND state = "failed"', job_id)
else:
query('UPDATE jobs SET state = "ready" WHERE id = ? AND state = "failed" AND NOT canceled', job_id)