Skip to content
Snippets Groups Projects
Select Git revision
  • 7d19efc219f0e8638b4cc6ded8b104292c047efd
  • master default protected
  • forbid-save-as
  • upload-via-token
  • moodle-integration
  • patch-double-tap-seek
  • patch_datum_anzeigen
  • patch_raum_anzeigen
  • intros
  • live_sources
  • bootstrap4
  • modules
12 results

jobmanagement.py

Blame
  • Forked from Video AG Infrastruktur / website
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    jobmanagement.py 2.10 KiB
    from server import modify, query, date_json_handler, sched_func, notify_admins
    from datetime import datetime, timedelta
    import traceback
    import json
    
    job_handlers = {}
    def job_handler(*types, state='finished'):
    	def wrapper(func):
    		for jobtype in types:
    			if jobtype not in job_handlers:
    				job_handlers[jobtype] = {}
    			if state not in job_handlers[jobtype]:
    				job_handlers[jobtype][state] = []
    			job_handlers[jobtype][state].append(func)
    		return func
    	return wrapper
    
    def job_handler_handle(id, state):
    	job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
    	type = job['type']
    	for func in job_handlers.get(type, {}).get(state, []):
    		try:
    			func(id, job['type'], json.loads(job['data']), state, json.loads(job['status']))
    		except Exception:
    			notify_admins('scheduler_exception', name=func.__name__, traceback=traceback.format_exc())
    			traceback.print_exc()
    
    @sched_func(10)
    def job_catch_broken():
    	# scheduled but never pinged
    	query('BEGIN')
    	query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10))
    	try:
    		query('COMMIT')
    	except:
    		pass
    	# no pings since 60s
    	query('BEGIN')
    	query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60))
    	try:
    		query('COMMIT')
    	except:
    		pass
    
    def job_set_state(id, state):
    	query('UPDATE jobs SET state=? WHERE id=?', state, id)
    
    def schedule_job(jobtype, data=None, priority=0, queue="default"):
    	if not data:
    		data = {}
    	return modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)',
    			jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now())
    
    def cancel_job(job_id):
    	query('UPDATE jobs SET state = "deleted" WHERE id = ? AND state = "ready"', job_id)
    	query('UPDATE jobs SET canceled = 1 WHERE id = ?', job_id)
    
    def restart_job(job_id, canceled=False):
    	if canceled:
    		query('UPDATE jobs SET state = "ready", canceled = 0 WHERE id = ? AND state = "failed"', job_id)
    	else:
    		query('UPDATE jobs SET state = "ready" WHERE id = ? AND state = "failed" AND NOT canceled', job_id)