Commit 1d32e6d8 authored by Andreas Valder's avatar Andreas Valder

moved jobmanagement functions to own file and added unittests

parent 4378d229
from server import *
import os.path
from jobmanagement import schedule_job
def set_metadata(dest, course, lecture):
chapters = query('SELECT text, time FROM chapters WHERE lecture_id = ? AND visible ORDER BY time', lecture['id'])
......
from server import modify, query, date_json_handler, sched_func
from datetime import datetime, timedelta
import json
job_handlers = {}
def job_handler(*types, state='finished'):
def wrapper(func):
for jobtype in types:
if jobtype not in job_handlers:
job_handlers[jobtype] = {}
if state not in job_handlers[jobtype]:
job_handlers[jobtype][state] = []
job_handlers[jobtype][state].append(func)
return func
return wrapper
@sched_func(10)
def job_catch_broken():
# scheduled but never pinged
query('BEGIN')
query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10))
try:
query('COMMIT')
except:
pass
# no pings since 60s
query('BEGIN')
query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60))
try:
query('COMMIT')
except:
pass
def job_set_state(id, state):
query('UPDATE jobs SET state=? WHERE id=?', state, id)
def schedule_job(jobtype, data=None, priority=0, queue="default"):
if not data:
data = {}
return modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)',
jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now())
def cancel_job(job_id):
modify('UPDATE jobs SET state = "deleted" WHERE id = ? AND state = "ready"', job_id)
modify('UPDATE jobs SET canceled = 1 WHERE id = ?', job_id)
def restart_job(job_id, canceled=False):
if canceled:
modify('UPDATE jobs SET state = "ready", canceled = 0 WHERE id = ? AND state = "failed"', job_id)
else:
modify('UPDATE jobs SET state = "ready" WHERE id = ? AND state = "failed" AND NOT canceled', job_id)
......@@ -3,34 +3,7 @@ import traceback
import json
import random
from time import sleep
job_handlers = {}
def job_handler(*types, state='finished'):
def wrapper(func):
for jobtype in types:
if jobtype not in job_handlers:
job_handlers[jobtype] = {}
if state not in job_handlers[jobtype]:
job_handlers[jobtype][state] = []
job_handlers[jobtype][state].append(func)
return func
return wrapper
def schedule_job(jobtype, data=None, priority=0, queue="default"):
if not data:
data = {}
return modify('INSERT INTO jobs (type, priority, queue, data, time_created) VALUES (?, ?, ?, ?, ?)',
jobtype, priority, queue, json.dumps(data, default=date_json_handler), datetime.now())
def cancel_job(job_id):
modify('UPDATE jobs SET state = "deleted" WHERE id = ? AND state = "ready"', job_id)
modify('UPDATE jobs SET canceled = 1 WHERE id = ?', job_id)
def restart_job(job_id, canceled=False):
if canceled:
modify('UPDATE jobs SET state = "ready", canceled = 0 WHERE id = ? AND state = "failed"', job_id)
else:
modify('UPDATE jobs SET state = "ready" WHERE id = ? AND state = "failed" AND NOT canceled', job_id)
from jobmanagement import *
@app.route('/internal/jobs/overview')
@register_navbar('Jobs', iconlib='fa', icon='suitcase', group='weitere')
......@@ -96,23 +69,6 @@ def jobs_api_token_required(func):
return func(*args, **kwargs)
return decorator
@sched_func(10)
def jobs_catch_broken():
# scheduled but never pinged
query('BEGIN')
query('UPDATE jobs SET state="ready" WHERE state="scheduled" and time_scheduled < ?', datetime.now() - timedelta(seconds=10))
try:
query('COMMIT')
except:
pass
# no pings since 60s
query('BEGIN')
query('UPDATE jobs SET state="failed" WHERE state="running" and last_ping < ?', datetime.now() - timedelta(seconds=60))
try:
query('COMMIT')
except:
pass
@app.route('/internal/jobs/api/job/<int:id>/ping', methods=['GET', 'POST'])
@jobs_api_token_required
def jobs_ping(id):
......
from server import *
@sched_func(120)
def livestream_thumbnail():
livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active')
for v in genlive(livestreams):
schedule_job('thumbnail', {'lectureid': str(v['lecture_id']), 'path': v['path']})
from jobmanagement import schedule_job, restart_job
@app.route('/internal/streaming/legacy_auth', methods=['GET', 'POST'])
@app.route('/internal/streaming/legacy_auth/<server>', methods=['GET', 'POST'])
......@@ -57,3 +52,8 @@ def streamauth(server=None):
def restart_failed_live_transcode(id, type, data, state, status):
restart_job(id)
@sched_func(120)
def livestream_thumbnail():
livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active')
for v in genlive(livestreams):
schedule_job('thumbnail', {'lectureid': str(v['lecture_id']), 'path': v['path']})
from server import *
from jobmanagement import schedule_job
import traceback
import os.path
......
import unittest
import server
import flask
from datetime import datetime, timedelta
import jobmanagement
from server import query
class JobmanagementTestCase(unittest.TestCase):
def tearDown(self):
pass
def setUp(self):
server.app.testing = True
self.requestContext = server.app.test_request_context()
self.client = server.app.test_client()
self.app = server.app
def getJobCount(self, state=None):
if not state:
data = query("SELECT count(id) AS count from jobs")
else:
data = query("SELECT count(id) AS count FROM jobs WHERE state=?", state)
return data[0]['count']
def getCanceledJobCount(self):
data = query("SELECT count(id) AS count from jobs WHERE canceled=1")
return data[0]['count']
def generateTestJob(self):
return jobmanagement.schedule_job('testjob', data={'data': 'mytestdata'})
def moveJobScheduletimeToPast(self, id, seconds=500):
query("UPDATE jobs SET time_scheduled = ? WHERE id = ?", datetime.now() - timedelta(seconds=seconds), id)
def test_schedule_job(self):
with self.requestContext:
jobCountBefore = self.getJobCount()
self.generateTestJob()
assert(jobCountBefore + 1 == self.getJobCount())
def test_cancel_job(self):
with self.requestContext:
canceledJobCountBefore = self.getCanceledJobCount()
jobmanagement.cancel_job(self.generateTestJob())
canceledJobCountAfter = self.getCanceledJobCount()
assert(canceledJobCountBefore +1 == canceledJobCountAfter)
def test_catch_broken(self):
with self.requestContext:
readyJobCountBefore = self.getJobCount('ready')
jobid = self.generateTestJob()
self.moveJobScheduletimeToPast(jobid)
jobmanagement.job_set_state(jobid, 'scheduled')
jobmanagement.job_catch_broken()
readyJobCountAfter = self.getJobCount('ready')
assert(readyJobCountBefore + 1 == readyJobCountAfter)
def test_job_set_state(self):
with self.requestContext:
jobCountBefore = self.getJobCount('teststate')
jobid = self.generateTestJob()
jobmanagement.job_set_state(jobid, 'teststate')
assert(jobCountBefore + 1 == self.getJobCount('teststate'))
import os
import unittest
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment