from server import modify, query, date_json_handler, sched_func, notify_admins from datetime import datetime, timedelta import traceback import json job_handlers = {} def job_handler(*types, state='finished'): def wrapper(func): for jobtype in types: if jobtype not in job_handlers: job_handlers[jobtype] = {} if state not in job_handlers[jobtype]: job_handlers[jobtype][state] = [] job_handlers[jobtype][state].append(func) return func return wrapper def job_handler_handle(id, state): job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0] type = job['type'] for func in job_handlers.get(type, {}).get(state, []): try: func(id, job['type'], json.loads(job['data']), state, json.loads(job['status'])) except Exception: notify_admins('scheduler_exception', name=func.__name__, traceback=traceback.format_exc()) traceback.print_exc() @sched_func(10) def job_catch_broken(): # scheduled but never pinged query('BEGIN') query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10)) try: query('COMMIT') except: pass # no pings since 60s query('BEGIN') query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60)) try: query('COMMIT') except: pass def job_set_state(id, state): query('UPDATE jobs SET state=? WHERE id=?', state, id) def schedule_job(jobtype, data=None, priority=0, queue="default"): if not data: data = {} return modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)', jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now()) def cancel_job(job_id): query('UPDATE jobs SET state = "deleted" WHERE id = ? AND state = "ready"', job_id) query('UPDATE jobs SET canceled = 1 WHERE id = ?', job_id) def restart_job(job_id, canceled=False): if canceled: query('UPDATE jobs SET state = "ready", canceled = 0 WHERE id = ? AND state = "failed"', job_id) else: query('UPDATE jobs SET state = "ready" WHERE id = ? AND state = "failed" AND NOT canceled', job_id)