Skip to content
Snippets Groups Projects
Select Git revision
  • master default protected
  • postgres_integration
  • s3compatible
  • intros
  • bootstrap4
  • modules
6 results

jobs.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    jobs.py 5.13 KiB
    from server import *
    import traceback
    import json
    import random
    from sorter import schedule_thumbnail
    
    @app.route('/internal/jobs/overview')
    @register_navbar('Jobs', iconlib='fa', icon='suitcase')
    @mod_required
    def jobs_overview():
    	if 'page' in request.args:
    		page = max(0, int(request.args['page']))
    	else:
    		page = 0
    	if 'pagesize' in request.args:
    		pagesize = min(500, int(request.args['pagesize']))
    	else:
    		pagesize = 50
    
    	worker = query('SELECT * FROM worker ORDER BY last_ping DESC')
    
    	# get filter options
    	filter_values = {
    			'type': query('SELECT distinct type FROM jobs'),
    			'state': query('SELECT distinct state FROM jobs'),
    			'worker': query('SELECT distinct worker FROM jobs')}
    	
    	# parse filter
    	filter = {
    			'type': request.args.get('type','%'),
    			'state': request.args.get('state','failed'),
    			'worker': request.args.get('worker','%') }
    
    	pagecount = math.ceil(query('SELECT count(id) as count FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?)', filter['type'], filter['worker'], filter['worker'], filter['state'])[0]['count']/pagesize)
    	jobs = query('SELECT * FROM jobs WHERE (type like ?) AND (worker like ? OR (worker IS NULL AND ? = "%")) AND (state like ?) ORDER BY `time_created` DESC LIMIT ? OFFSET ?', filter['type'], filter['worker'], filter['worker'], filter['state'], pagesize, page*pagesize)
    	return render_template('jobs_overview.html',worker=worker,jobs=jobs, filter_values=filter_values, filter=filter, page=page, pagesize=pagesize, pagecount=pagecount)
    
    @app.route('/internal/jobs/action/<action>', methods=['GET', 'POST'])
    @app.route('/internal/jobs/action/<action>/<jobid>', methods=['GET', 'POST'])
    @mod_required
    @csrf_protect
    def jobs_action(action, jobid=None):
    	if action == 'clear_failed':
    		query('UPDATE jobs SET state="deleted" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid)
    	if action == 'retry_failed':
    		query('UPDATE jobs SET state="ready" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid)
    	if action == 'copy':
    		if jobid:
    			query("INSERT INTO jobs SELECT NULL, type, priority, queue, 'ready', '', '' , ?, '', NULL, data, '{}' FROM jobs where ID=?;", datetime.now(), jobid)
    	if action == 'delete':
    		if jobid:
    			query('UPDATE jobs SET state="deleted" WHERE id = ?', jobid)
    	if action == 'add':
    		jobtype = request.values.get('type', None)
    		if jobtype == 'thumbnail':
    			lectureid = int(request.values.get('lecture_id', -1))
    			schedule_thumbnail(lectureid)
    	return redirect(request.values.get('ref', url_for('jobs_overview')))
    
    def jobs_api_token_required(func):
    	@wraps(func)
    	def decorator(*args, **kwargs):
    		if 'apikey' in request.values:
    			token = request.values['apikey']
    		elif request.get_json() and ('apikey' in request.get_json()):
    			token = request.get_json()['apikey']
    		else:
    			token = None
    		
    		if not token == config.get('JOBS_API_KEY', [None]):
    			return 'Permission denied', 403
    		else:
    			return func(*args, **kwargs)
    	return decorator
    
    @sched_func(10)
    def jobs_catch_broken():
    	# scheduled but never pinged
    	query('BEGIN')
    	query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10))
    	try:
    		query('COMMIT')
    	except:
    		pass
    	# no pings since 60s
    	query('BEGIN')
    	query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60))
    	try:
    		query('COMMIT')
    	except:
    		pass
    
    @app.route('/internal/jobs/api/job/<int:id>/ping', methods=['GET', 'POST'])
    @jobs_api_token_required
    def jobs_ping(id):
    	hostname = request.values['host']
    	status = json.dumps(json.loads(request.values['status']), default=date_json_handler)
    	state = request.values['state']
    	if state == 'finished':
    		query('UPDATE jobs SET time_finished = ?, status = ?, state = "finished" where id = ?', datetime.now(), status, id)
    	else:
    		query('UPDATE jobs SET worker = ?, last_ping = ?, status = ?, state = ? where id = ?', hostname, datetime.now(), status, state, id)
    	job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
    	for func in job_handlers.get(job['type'], {}).get(state, []):
    		try:
    			func(id, job['type'], json.loads(job['data']), state, json.loads(job['status']))
    		except Exception:
    			traceback.print_exc()
    	return 'OK', 200
    
    @app.route('/internal/jobs/api/worker/<hostname>/schedule', methods=['POST'])
    @jobs_api_token_required
    def jobs_schedule(hostname):
    	query('REPLACE INTO worker (hostname, last_ping) values (?, ?)', hostname, datetime.now())
    	hostdata = request.get_json()
    	if not hostdata:
    		return 'no hostdata sent', 400
    	job = None
    	jobtypes = hostdata['jobtypes'] if 'jobtypes' in hostdata else []
    	while (not job):
    		query("BEGIN")
    		for i in query('SELECT * FROM jobs WHERE state = "ready" ORDER BY priority DESC'):
    			if i['type'] in hostdata['jobtypes'] and \
    					i['queue'] in hostdata['queues']:
    				job = i
    				break
    		if not job:
    			return 'no jobs', 503
    		modify('UPDATE jobs SET state="scheduled", worker = ?, time_scheduled = ? WHERE id = ?', hostname, datetime.now(), job['id']) 
    		try:
    			query("COMMIT")
    		except:
    			job = None
    			sleep(random.randinti(0,50))
    	return Response(json.dumps(job, default=date_json_handler), mimetype='application/json')