diff --git a/db_schema.sql b/db_schema.sql index 8a6bc103a8dd53bc10f9cec0d4b6987e3539a430..be3273035733da2fcd5bbc0ad6d6ac3c64c48cfb 100644 --- a/db_schema.sql +++ b/db_schema.sql @@ -283,6 +283,7 @@ CREATE TABLE IF NOT EXISTS `jobs` ( `id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, `type` text NOT NULL, `priority` INTEGER NOT NULL DEFAULT 0, + `queue` text NOT NULL DEFAULT 'default', `state` text NOT NULL DEFAULT 'ready', `time_finished` datetime DEFAULT '', `time_scheduled` datetime DEFAULT '', diff --git a/encoding.py b/encoding.py index 3906f602253404c3d96b39ebe9927b41a4d57e64..3de335b96462cdb272f9872785a73f8658958390 100644 --- a/encoding.py +++ b/encoding.py @@ -58,7 +58,7 @@ def schedule_transcode(source, fmt_id=None, video=None): data['lecture_id'] = lecture['id'] data['format_id'] = fmt['id'] data['source_id'] = source['id'] - schedule_job('transcode', data) + schedule_job('transcode', data, queue="background") @job_handler('probe-raw') def update_lecture_videos(jobid, jobtype, data, state, status): diff --git a/jobs.py b/jobs.py index 25a9a309715974c5a0c29a064db4afd1170263b5..23fc2edc49ebb4392a83cb60489d8740b93afa0a 100644 --- a/jobs.py +++ b/jobs.py @@ -46,7 +46,7 @@ def jobs_action(action, jobid=None): query('UPDATE jobs SET state="ready" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid) if action == 'copy': if jobid: - query("INSERT INTO jobs SELECT NULL, type, priority, 'ready', '', '' , ?, '', NULL, data, '{}' FROM jobs where ID=?;", datetime.now(), jobid) + query("INSERT INTO jobs SELECT NULL, type, priority, queue, 'ready', '', '' , ?, '', NULL, data, '{}' FROM jobs where ID=?;", datetime.now(), jobid) if action == 'delete': if jobid: query('UPDATE jobs SET state="deleted" WHERE id = ?', jobid) @@ -125,8 +125,9 @@ def jobs_schedule(hostname): while (not job): query("BEGIN") for i in query('SELECT * FROM jobs WHERE state = "ready" ORDER BY priority DESC'): - if i['type'] in hostdata['jobtypes'].split(','): - job=i + if i['type'] in hostdata['jobtypes'] and \ + i['queue'] in hostdata['queues']: + job = i break if not job: return 'no jobs', 503 diff --git a/server.py b/server.py index 7898f588e6efec2a0eb7d43b266ac07ccf20e72a..63e78593bdbd3c2ead88e6cf568b54ae47755505 100644 --- a/server.py +++ b/server.py @@ -497,11 +497,11 @@ def job_handler(*types, state='finished'): def date_json_handler(obj): return obj.isoformat() if hasattr(obj, 'isoformat') else obj -def schedule_job(jobtype, data=None, priority=0): +def schedule_job(jobtype, data=None, priority=0, queue="default"): if not data: data = {} - modify('INSERT INTO jobs (type, priority, data, time_created) VALUES (?, ?, ?, ?)', - jobtype, priority, json.dumps(data, default=date_json_handler), datetime.now()) + modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)', + jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now()) edit_handlers = {} def edit_handler(*tables, field=None):