Skip to content
Snippets Groups Projects
Select Git revision
  • d646043564f3bb30f9de81094d2acf1606282ddf
  • master default protected
  • postgres_integration
  • s3compatible
  • intros
  • bootstrap4
  • modules
7 results

jobmanagement.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    jobmanagement.py 2.28 KiB
    from datetime import datetime, timedelta
    import traceback
    import json
    
    from server import modify, query, date_json_handler, sched_func, notify_admins
    
    job_handlers = {} #pylint: disable=invalid-name
    def job_handler(*types, state='finished'):
    	def wrapper(func):
    		for jobtype in types:
    			if jobtype not in job_handlers:
    				job_handlers[jobtype] = {}
    			if state not in job_handlers[jobtype]:
    				job_handlers[jobtype][state] = []
    			job_handlers[jobtype][state].append(func)
    		return func
    	return wrapper
    
    def job_handler_handle(id, state):
    	job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
    	type = job['type']
    	for func in job_handlers.get(type, {}).get(state, []):
    		try:
    			func(id, job['type'], json.loads(job['data']), state, json.loads(job['status']))
    		except Exception: #pylint: disable=broad-except
    			notify_admins('scheduler_exception', name=func.__name__, traceback=traceback.format_exc())
    			traceback.print_exc()
    
    @sched_func(10)
    def job_catch_broken():
    	# scheduled but never pinged
    	modify("BEGIN")
    	query('UPDATE jobs SET state=\'ready\' WHERE state=\'scheduled\' and time_scheduled < ?', datetime.now() - timedelta(seconds=10))
    	try:
    		modify("COMMIT")
    	except: #pylint: disable=bare-except
    		pass
    	# no pings since 60s
    	modify("BEGIN")
    	query('UPDATE jobs SET state=\'failed\' WHERE state=\'running\' and last_ping < ?', datetime.now() - timedelta(seconds=60))
    	try:
    		modify("COMMIT")
    	except: #pylint: disable=bare-except
    		pass
    
    def job_set_state(id, state):
    	query('UPDATE jobs SET state=? WHERE id=?', state, id)
    
    def schedule_job(jobtype, data=None, priority=0, queue="default"):
    	if not data:
    		data = {}
    	return modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)',
    			jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now(),
    			get_id=True)
    
    def cancel_job(job_id):
    	query('UPDATE jobs SET state = \'deleted\' WHERE id = ? AND state = \'ready\'', job_id)
    	query('UPDATE jobs SET canceled = true WHERE id = ?', job_id)
    
    def restart_job(job_id, canceled=False):
    	# info: sql no test cover
    	if canceled:
    		query('UPDATE jobs SET state = \'ready\', canceled = false WHERE id = ? AND state = \'failed\'', job_id)
    	else:
    		query('UPDATE jobs SET state = \'ready\' WHERE id = ? AND state = \'failed\' AND NOT canceled', job_id)