Skip to content
Snippets Groups Projects
Commit fa55effc authored by Julian Rother's avatar Julian Rother
Browse files

Extended job api to support different queues

parent fc99fba9
No related branches found
No related tags found
No related merge requests found
...@@ -283,6 +283,7 @@ CREATE TABLE IF NOT EXISTS `jobs` ( ...@@ -283,6 +283,7 @@ CREATE TABLE IF NOT EXISTS `jobs` (
`id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, `id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
`type` text NOT NULL, `type` text NOT NULL,
`priority` INTEGER NOT NULL DEFAULT 0, `priority` INTEGER NOT NULL DEFAULT 0,
`queue` text NOT NULL DEFAULT 'default',
`state` text NOT NULL DEFAULT 'ready', `state` text NOT NULL DEFAULT 'ready',
`time_finished` datetime DEFAULT '', `time_finished` datetime DEFAULT '',
`time_scheduled` datetime DEFAULT '', `time_scheduled` datetime DEFAULT '',
......
...@@ -58,7 +58,7 @@ def schedule_transcode(source, fmt_id=None, video=None): ...@@ -58,7 +58,7 @@ def schedule_transcode(source, fmt_id=None, video=None):
data['lecture_id'] = lecture['id'] data['lecture_id'] = lecture['id']
data['format_id'] = fmt['id'] data['format_id'] = fmt['id']
data['source_id'] = source['id'] data['source_id'] = source['id']
schedule_job('transcode', data) schedule_job('transcode', data, queue="background")
@job_handler('probe-raw') @job_handler('probe-raw')
def update_lecture_videos(jobid, jobtype, data, state, status): def update_lecture_videos(jobid, jobtype, data, state, status):
......
...@@ -46,7 +46,7 @@ def jobs_action(action, jobid=None): ...@@ -46,7 +46,7 @@ def jobs_action(action, jobid=None):
query('UPDATE jobs SET state="ready" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid) query('UPDATE jobs SET state="ready" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid)
if action == 'copy': if action == 'copy':
if jobid: if jobid:
query("INSERT INTO jobs SELECT NULL, type, priority, 'ready', '', '' , ?, '', NULL, data, '{}' FROM jobs where ID=?;", datetime.now(), jobid) query("INSERT INTO jobs SELECT NULL, type, priority, queue, 'ready', '', '' , ?, '', NULL, data, '{}' FROM jobs where ID=?;", datetime.now(), jobid)
if action == 'delete': if action == 'delete':
if jobid: if jobid:
query('UPDATE jobs SET state="deleted" WHERE id = ?', jobid) query('UPDATE jobs SET state="deleted" WHERE id = ?', jobid)
...@@ -125,7 +125,8 @@ def jobs_schedule(hostname): ...@@ -125,7 +125,8 @@ def jobs_schedule(hostname):
while (not job): while (not job):
query("BEGIN") query("BEGIN")
for i in query('SELECT * FROM jobs WHERE state = "ready" ORDER BY priority DESC'): for i in query('SELECT * FROM jobs WHERE state = "ready" ORDER BY priority DESC'):
if i['type'] in hostdata['jobtypes'].split(','): if i['type'] in hostdata['jobtypes'] and \
i['queue'] in hostdata['queues']:
job = i job = i
break break
if not job: if not job:
......
...@@ -497,11 +497,11 @@ def job_handler(*types, state='finished'): ...@@ -497,11 +497,11 @@ def job_handler(*types, state='finished'):
def date_json_handler(obj): def date_json_handler(obj):
return obj.isoformat() if hasattr(obj, 'isoformat') else obj return obj.isoformat() if hasattr(obj, 'isoformat') else obj
def schedule_job(jobtype, data=None, priority=0): def schedule_job(jobtype, data=None, priority=0, queue="default"):
if not data: if not data:
data = {} data = {}
modify('INSERT INTO jobs (type, priority, data, time_created) VALUES (?, ?, ?, ?)', modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)',
jobtype, priority, json.dumps(data, default=date_json_handler), datetime.now()) jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now())
edit_handlers = {} edit_handlers = {}
def edit_handler(*tables, field=None): def edit_handler(*tables, field=None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment