From fa55effc62bf97a89adeafab4c9eebfb26e0e45e Mon Sep 17 00:00:00 2001 From: Julian Rother <julianr@fsmpi.rwth-aachen.de> Date: Fri, 29 Dec 2017 04:11:25 +0100 Subject: [PATCH] Extended job api to support different queues --- db_schema.sql | 1 + encoding.py | 2 +- jobs.py | 7 ++++--- server.py | 6 +++--- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/db_schema.sql b/db_schema.sql index 8a6bc10..be32730 100644 --- a/db_schema.sql +++ b/db_schema.sql @@ -283,6 +283,7 @@ CREATE TABLE IF NOT EXISTS `jobs` ( `id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, `type` text NOT NULL, `priority` INTEGER NOT NULL DEFAULT 0, + `queue` text NOT NULL DEFAULT 'default', `state` text NOT NULL DEFAULT 'ready', `time_finished` datetime DEFAULT '', `time_scheduled` datetime DEFAULT '', diff --git a/encoding.py b/encoding.py index 3906f60..3de335b 100644 --- a/encoding.py +++ b/encoding.py @@ -58,7 +58,7 @@ def schedule_transcode(source, fmt_id=None, video=None): data['lecture_id'] = lecture['id'] data['format_id'] = fmt['id'] data['source_id'] = source['id'] - schedule_job('transcode', data) + schedule_job('transcode', data, queue="background") @job_handler('probe-raw') def update_lecture_videos(jobid, jobtype, data, state, status): diff --git a/jobs.py b/jobs.py index 25a9a30..23fc2ed 100644 --- a/jobs.py +++ b/jobs.py @@ -46,7 +46,7 @@ def jobs_action(action, jobid=None): query('UPDATE jobs SET state="ready" WHERE state = "failed" AND (id = ? OR ? IS NULL)', jobid, jobid) if action == 'copy': if jobid: - query("INSERT INTO jobs SELECT NULL, type, priority, 'ready', '', '' , ?, '', NULL, data, '{}' FROM jobs where ID=?;", datetime.now(), jobid) + query("INSERT INTO jobs SELECT NULL, type, priority, queue, 'ready', '', '' , ?, '', NULL, data, '{}' FROM jobs where ID=?;", datetime.now(), jobid) if action == 'delete': if jobid: query('UPDATE jobs SET state="deleted" WHERE id = ?', jobid) @@ -125,8 +125,9 @@ def jobs_schedule(hostname): while (not job): query("BEGIN") for i in query('SELECT * FROM jobs WHERE state = "ready" ORDER BY priority DESC'): - if i['type'] in hostdata['jobtypes'].split(','): - job=i + if i['type'] in hostdata['jobtypes'] and \ + i['queue'] in hostdata['queues']: + job = i break if not job: return 'no jobs', 503 diff --git a/server.py b/server.py index 7898f58..63e7859 100644 --- a/server.py +++ b/server.py @@ -497,11 +497,11 @@ def job_handler(*types, state='finished'): def date_json_handler(obj): return obj.isoformat() if hasattr(obj, 'isoformat') else obj -def schedule_job(jobtype, data=None, priority=0): +def schedule_job(jobtype, data=None, priority=0, queue="default"): if not data: data = {} - modify('INSERT INTO jobs (type, priority, data, time_created) VALUES (?, ?, ?, ?)', - jobtype, priority, json.dumps(data, default=date_json_handler), datetime.now()) + modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)', + jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now()) edit_handlers = {} def edit_handler(*tables, field=None): -- GitLab