diff --git a/encoding.py b/encoding.py index 22bc8b94224c0758190a86acdcbb3dd296020027..13ad6995b883c4fb7267b81e6dfd1b2779ae22dc 100644 --- a/encoding.py +++ b/encoding.py @@ -1,5 +1,6 @@ from server import * import os.path +from jobmanagement import schedule_job def set_metadata(dest, course, lecture): chapters = query('SELECT text, time FROM chapters WHERE lecture_id = ? AND visible ORDER BY time', lecture['id']) diff --git a/jobmanagement.py b/jobmanagement.py new file mode 100644 index 0000000000000000000000000000000000000000..dd14e3b9ef0943edf7da28a7ba2ecb51a4b9cce9 --- /dev/null +++ b/jobmanagement.py @@ -0,0 +1,50 @@ +from server import modify, query, date_json_handler, sched_func +from datetime import datetime, timedelta +import json + +job_handlers = {} +def job_handler(*types, state='finished'): + def wrapper(func): + for jobtype in types: + if jobtype not in job_handlers: + job_handlers[jobtype] = {} + if state not in job_handlers[jobtype]: + job_handlers[jobtype][state] = [] + job_handlers[jobtype][state].append(func) + return func + return wrapper + +@sched_func(10) +def job_catch_broken(): + # scheduled but never pinged + query('BEGIN') + query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10)) + try: + query('COMMIT') + except: + pass + # no pings since 60s + query('BEGIN') + query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60)) + try: + query('COMMIT') + except: + pass + +def job_set_state(id, state): + query('UPDATE jobs SET state=? WHERE id=?', state, id) + +def schedule_job(jobtype, data=None, priority=0, queue="default"): + if not data: + data = {} + return modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)', + jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now()) +def cancel_job(job_id): + modify('UPDATE jobs SET state = "deleted" WHERE id = ? AND state = "ready"', job_id) + modify('UPDATE jobs SET canceled = 1 WHERE id = ?', job_id) + +def restart_job(job_id, canceled=False): + if canceled: + modify('UPDATE jobs SET state = "ready", canceled = 0 WHERE id = ? AND state = "failed"', job_id) + else: + modify('UPDATE jobs SET state = "ready" WHERE id = ? AND state = "failed" AND NOT canceled', job_id) diff --git a/jobs.py b/jobs.py index a410e4a152d06d0e5d3514c4d32a5f8fd01e8130..05c762037d8c2d122cf02a7dc490df258fcbcda0 100644 --- a/jobs.py +++ b/jobs.py @@ -3,34 +3,7 @@ import traceback import json import random from time import sleep - -job_handlers = {} -def job_handler(*types, state='finished'): - def wrapper(func): - for jobtype in types: - if jobtype not in job_handlers: - job_handlers[jobtype] = {} - if state not in job_handlers[jobtype]: - job_handlers[jobtype][state] = [] - job_handlers[jobtype][state].append(func) - return func - return wrapper - -def schedule_job(jobtype, data=None, priority=0, queue="default"): - if not data: - data = {} - return modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)', - jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now()) - -def cancel_job(job_id): - modify('UPDATE jobs SET state = "deleted" WHERE id = ? AND state = "ready"', job_id) - modify('UPDATE jobs SET canceled = 1 WHERE id = ?', job_id) - -def restart_job(job_id, canceled=False): - if canceled: - modify('UPDATE jobs SET state = "ready", canceled = 0 WHERE id = ? AND state = "failed"', job_id) - else: - modify('UPDATE jobs SET state = "ready" WHERE id = ? AND state = "failed" AND NOT canceled', job_id) +from jobmanagement import * @app.route('/internal/jobs/overview') @register_navbar('Jobs', iconlib='fa', icon='suitcase', group='weitere') @@ -96,23 +69,6 @@ def jobs_api_token_required(func): return func(*args, **kwargs) return decorator -@sched_func(10) -def jobs_catch_broken(): - # scheduled but never pinged - query('BEGIN') - query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10)) - try: - query('COMMIT') - except: - pass - # no pings since 60s - query('BEGIN') - query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60)) - try: - query('COMMIT') - except: - pass - @app.route('/internal/jobs/api/job/<int:id>/ping', methods=['GET', 'POST']) @jobs_api_token_required def jobs_ping(id): diff --git a/livestreams.py b/livestreams.py index 2c8acf6dcfbf8fc524291b014d107fdbc75aac6d..e55eac2669e3036a69b7b7ff9fc03bfaae5c10d5 100644 --- a/livestreams.py +++ b/livestreams.py @@ -1,10 +1,5 @@ from server import * - -@sched_func(120) -def livestream_thumbnail(): - livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active') - for v in genlive(livestreams): - schedule_job('thumbnail', {'lectureid': str(v['lecture_id']), 'path': v['path']}) +from jobmanagement import schedule_job, restart_job @app.route('/internal/streaming/legacy_auth', methods=['GET', 'POST']) @app.route('/internal/streaming/legacy_auth/<server>', methods=['GET', 'POST']) @@ -57,3 +52,8 @@ def streamauth(server=None): def restart_failed_live_transcode(id, type, data, state, status): restart_job(id) +@sched_func(120) +def livestream_thumbnail(): + livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active') + for v in genlive(livestreams): + schedule_job('thumbnail', {'lectureid': str(v['lecture_id']), 'path': v['path']}) diff --git a/sorter.py b/sorter.py index 5db96ee4376ed933d104ff1ac45e22dfcdcbd557..c39a4dc1f705140cb26a6b5787902cea49e7c132 100644 --- a/sorter.py +++ b/sorter.py @@ -1,4 +1,5 @@ from server import * +from jobmanagement import schedule_job import traceback import os.path diff --git a/tests/test_jobmanagement.py b/tests/test_jobmanagement.py new file mode 100644 index 0000000000000000000000000000000000000000..98037365b801a717b1225b2291e3a2a10e403588 --- /dev/null +++ b/tests/test_jobmanagement.py @@ -0,0 +1,66 @@ +import unittest +import server +import flask + +from datetime import datetime, timedelta +import jobmanagement +from server import query + + +class JobmanagementTestCase(unittest.TestCase): + def tearDown(self): + pass + + def setUp(self): + server.app.testing = True + self.requestContext = server.app.test_request_context() + self.client = server.app.test_client() + self.app = server.app + + def getJobCount(self, state=None): + if not state: + data = query("SELECT count(id) AS count from jobs") + else: + data = query("SELECT count(id) AS count FROM jobs WHERE state=?", state) + return data[0]['count'] + + def getCanceledJobCount(self): + data = query("SELECT count(id) AS count from jobs WHERE canceled=1") + return data[0]['count'] + + def generateTestJob(self): + return jobmanagement.schedule_job('testjob', data={'data': 'mytestdata'}) + + def moveJobScheduletimeToPast(self, id, seconds=500): + query("UPDATE jobs SET time_scheduled = ? WHERE id = ?", datetime.now() - timedelta(seconds=seconds), id) + + def test_schedule_job(self): + with self.requestContext: + jobCountBefore = self.getJobCount() + self.generateTestJob() + assert(jobCountBefore + 1 == self.getJobCount()) + + + def test_cancel_job(self): + with self.requestContext: + canceledJobCountBefore = self.getCanceledJobCount() + jobmanagement.cancel_job(self.generateTestJob()) + canceledJobCountAfter = self.getCanceledJobCount() + assert(canceledJobCountBefore +1 == canceledJobCountAfter) + + def test_catch_broken(self): + with self.requestContext: + readyJobCountBefore = self.getJobCount('ready') + jobid = self.generateTestJob() + self.moveJobScheduletimeToPast(jobid) + jobmanagement.job_set_state(jobid, 'scheduled') + jobmanagement.job_catch_broken() + readyJobCountAfter = self.getJobCount('ready') + assert(readyJobCountBefore + 1 == readyJobCountAfter) + + def test_job_set_state(self): + with self.requestContext: + jobCountBefore = self.getJobCount('teststate') + jobid = self.generateTestJob() + jobmanagement.job_set_state(jobid, 'teststate') + assert(jobCountBefore + 1 == self.getJobCount('teststate')) diff --git a/tests/test_jobs.py b/tests/test_jobs.py deleted file mode 100644 index 245137db25acf52972210f616243bee897468c8c..0000000000000000000000000000000000000000 --- a/tests/test_jobs.py +++ /dev/null @@ -1,2 +0,0 @@ -import os -import unittest