Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • bootstrap4
  • intros
  • master
  • modules
  • postgres_integration
  • s3compatible
6 results

Target

Select target project
  • jannik/website
  • vincent/website
  • dominic/website
  • romank/website
  • videoaginfra/website
5 results
Select Git revision
  • bootstrap4
  • intros
  • live_sources
  • master
  • modules
5 results
Show changes
import unittest
import server
import flask
import os
import random
class FlaskTestCase(unittest.TestCase):
def tearDown(self):
pass
def setUp(self):
server.app.testing = True
self.requestContext = server.app.test_request_context()
self.client = server.app.test_client()
self.app = server.app
def videoagLogin(self):
self.sess_csrf_token = os.urandom(32)
self.sess_dbid = random.randint(0, 100)
self.sess_username = 'videoag'
with self.client.session_transaction() as sess:
sess['user'] = {'name': self.sess_username, '_csrf_token': self.csrf_token, 'dbid': self.sess_dbid}
sess['_csrf_token'] = self.sess_csrf_token
from flaskunittest import FlaskTestCase
from datetime import datetime, timedelta
import jobmanagement
from server import query
class JobmanagementTestCase(FlaskTestCase):
def getJobCount(self, state=None):
if not state:
data = query("SELECT count(id) AS count from jobs")
else:
data = query("SELECT count(id) AS count FROM jobs WHERE state=?", state)
return data[0]['count']
def getCanceledJobCount(self):
data = query("SELECT count(id) AS count from jobs WHERE canceled=true")
return data[0]['count']
def generateTestJob(self):
return jobmanagement.schedule_job('testjob', data={'data': 'mytestdata'})
def moveJobScheduletimeToPast(self, id, seconds=500):
query("UPDATE jobs SET time_scheduled = ? WHERE id = ?", datetime.now() - timedelta(seconds=seconds), id)
def test_schedule_job(self):
with self.requestContext:
jobCountBefore = self.getJobCount()
self.generateTestJob()
assert(jobCountBefore + 1 == self.getJobCount())
def test_cancel_job(self):
with self.requestContext:
canceledJobCountBefore = self.getCanceledJobCount()
jobmanagement.cancel_job(self.generateTestJob())
canceledJobCountAfter = self.getCanceledJobCount()
assert(canceledJobCountBefore +1 == canceledJobCountAfter)
def test_catch_broken(self):
with self.requestContext:
readyJobCountBefore = self.getJobCount('ready')
jobid = self.generateTestJob()
self.moveJobScheduletimeToPast(jobid)
jobmanagement.job_set_state(jobid, 'scheduled')
jobmanagement.job_catch_broken()
readyJobCountAfter = self.getJobCount('ready')
assert(readyJobCountBefore + 1 == readyJobCountAfter)
def test_job_set_state(self):
with self.requestContext:
jobCountBefore = self.getJobCount('teststate')
jobid = self.generateTestJob()
jobmanagement.job_set_state(jobid, 'teststate')
assert(jobCountBefore + 1 == self.getJobCount('teststate'))
#!/usr/bin/env python3
import os
import unittest
import server
import json
import server
import flask
from flask import url_for
class VideoTestCase(unittest.TestCase):
@classmethod
def tearDownClass(cls):
os.unlink(server.app.config['SQLITE_DB'])
def tearDown(self):
pass
def setUp(self):
server.app.testing = True
......@@ -65,6 +61,12 @@ class VideoTestCase(unittest.TestCase):
r = c.get('/15ws-einfprog')
assert r.status_code == 200
r = c.get('/99')
assert r.status_code == 200
r = c.post('/99/2330/login', data={"username": "wrong", "password": "auth"})
assert r.status_code == 302
def test_timetable(self):
with self.app as c:
r = c.get('/internal/timetable')
......@@ -77,7 +79,6 @@ class VideoTestCase(unittest.TestCase):
assert r.status_code == 200
assert 'AfI' in r.data.decode()
assert 'Progra' in r.data.decode()
assert 'Bio' in r.data.decode()
def test_faq(self):
r = self.app.get('/faq')
......@@ -122,7 +123,7 @@ class VideoTestCase(unittest.TestCase):
def test_search(self):
r = self.app.get('/search?q=Malo')
assert r.status_code == 200
assert 'Mathematische Logik II' in r.data.decode() and '4.1 Der Sequenzenkalkül' in r.data.decode()
assert 'Mathematische Logik II' in r.data.decode()
r = self.app.get('/search?q=Afi+Stens')
assert r.status_code == 200
assert 'Analysis für Informatiker' in r.data.decode() and 'Höhere Mathematik I' in r.data.decode()
......@@ -181,6 +182,12 @@ class VideoTestCase(unittest.TestCase):
assert r.status_code == 200
assert 'Testtitle' in r.data.decode() and 'lectures.7353.title' in r.data.decode()
r = c.post('/internal/set/responsible/20/1', data={'_csrf_token': 'asd'})
assert r.status_code == 200
r = c.post('/internal/unset/responsible/20/1', data={'_csrf_token': 'asd'})
assert r.status_code == 200
def test_legacyurl(self):
with self.app as c:
......@@ -237,8 +244,8 @@ class VideoTestCase(unittest.TestCase):
r = self.app.post('/internal/jobs/api/worker/test/schedule', data=json.dumps({'jobtypes': ['thumbnail'], 'queues': ['default'], 'apikey': '1'}), content_type='application/json')
assert r.status_code == 200
jobdata = json.loads(json.loads(r.data.decode())['data'])
assert jobdata.get('lectureid') == '6981'
assert jobdata.get('path') == 'pub/hls/15ws-afi.m3u8'
assert jobdata.get('filename') == 'l_6981.jpg'
assert jobdata.get('src') == 'pub/hls/15ws-afi.m3u8'
r = self.app.get('/internal/streaming/legacy_auth/testserver', data={'app': 'live', 'call': 'publish_done', 'pass': 'caisoh8aht0wuSu', 'lecture': 6981, 'name': '15ws-afi'}, headers={'X-Real-IP': '137.226.35.193'})
assert r.status_code == 200
......@@ -264,11 +271,28 @@ class VideoTestCase(unittest.TestCase):
assert len(match) == 1
assert match[0]['id'] == 6095
self.login(c)
r = self.app.get('/internal/sort/log')
assert r.status_code == 200
r = self.app.post('/internal/jobs/add/thumbnail', data={'lectureid': 10, '_csrf_token': 'asd'})
assert r.status_code == 302
r = self.app.get('/internal/sort/now')
assert r.status_code == 200
r = self.app.get('/internal/sort/encoded/something-wrong', data={'apikey': '1'})
assert r.status_code == 400
r = self.app.get('/internal/sort/encoded/09ss-dsal-090619-720p.mp4', data={'apikey': '1'})
assert r.status_code == 200
@unittest.skip("too slow")
def test_campusimport(self):
with self.app as c:
self.login(c)
r = self.app.post('/internal/import/257', data={'campus.new.url': 'https://www.campus.rwth-aachen.de/rwth/all/event.asp?gguid=0x4664DBD60E5A02479B53089BF0EB0681&tguid=0x0B473CF286B45B4984CD02565C07D6F8', 'campus.new.type': 'Vorlesung'})
r = self.app.post('/internal/import/257', data={'campus.new.url': 'https://online.rwth-aachen.de/RWTHonline/pl/ui/%24ctx/wbLv.wbShowLVDetail?pStpSpNr=269474&pSpracheNr=1', 'campus.new.type': 'Übung'})
assert r.status_code == 200
r = self.app.get('/internal/import/257/now')
......@@ -280,7 +304,139 @@ class VideoTestCase(unittest.TestCase):
r = self.app.get('/internal/cutprogress')
assert r.status_code == 200
# Some quick tests below to execute more sql statements
def test_encoding(self):
with self.app as c:
self.login(c)
r = self.app.post('/internal/jobs/add/remux', data={'videoid': 28, '_csrf_token': 'asd'})
assert r.status_code == 302
r = self.app.post('/internal/jobs/add/reencode', data={'videoid': 16080, '_csrf_token': 'asd'})
assert r.status_code == 302
# Trigger edit handler
r = c.get('internal/edit', data={"courses.3.title":"Test","_csrf_token":"asd"})
assert r.status_code == 200
def test_feeds(self):
with self.app as c:
r = self.app.get('/feed')
assert r.status_code == 200
r = self.app.get('/07ws-buk/feed')
assert r.status_code == 200
if __name__ == '__main__':
unittest.main()
r = self.app.get('/07ws-buk/rss', data={'format_id':10})
assert r.status_code == 200
r = self.app.get('/courses/feed')
assert r.status_code == 200
def test_icalexport(self):
with self.app as c:
self.login(c)
r = self.app.get('/internal/ical/user/1')
assert r.status_code == 200
r = self.app.get('/internal/ical/notuser/1')
assert r.status_code == 200
r = self.app.get('/internal/ical/course/1')
assert r.status_code == 200
def test_jobs(self):
with self.app as c:
self.login(c)
r = self.app.get('/internal/jobs/overview')
assert r.status_code == 200
r = self.app.get('/internal/jobs/overview?worker=worker0')
assert r.status_code == 200
r = self.app.post('/internal/jobs/action/clear_failed', data={'_csrf_token': 'asd'})
assert r.status_code == 302
r = self.app.post('/internal/jobs/action/clear_failed/1', data={'_csrf_token': 'asd'})
assert r.status_code == 302
r = self.app.post('/internal/jobs/action/retry_failed', data={'_csrf_token': 'asd'})
assert r.status_code == 302
r = self.app.post('/internal/jobs/action/retry_failed/1', data={'_csrf_token': 'asd'})
assert r.status_code == 302
r = self.app.post('/internal/jobs/action/copy/1', data={'_csrf_token': 'asd'})
assert r.status_code == 302
r = self.app.post('/internal/jobs/action/delete/1', data={'_csrf_token': 'asd'})
assert r.status_code == 302
r = self.app.post('/internal/jobs/action/cancel/1', data={'_csrf_token': 'asd'})
assert r.status_code == 302
r = self.app.post('/internal/jobs/api/job/1/ping', data={'apikey': '1', 'host': 'test', 'status': '{}', 'state': 'finished'})
assert r.status_code == 205
# Test new worker
import uuid
r = self.app.post(f'/internal/jobs/api/worker/{uuid.uuid4()}/schedule', data=json.dumps({'jobtypes': ['probe'], 'queues': ['default'], 'apikey': '1'}), content_type='application/json')
assert r.status_code in [200, 503]
def test_auth(self):
with self.app as c:
r = self.app.get('/internal/auth', headers={'X-Original-Uri': 'https://videoag.fsmpi.rwth-aachen.de/files/pub/15ws-afi/15ws-afi-151022-720p.mp4'})
assert r.status_code == 200
# Not found, but sql is executed
r = self.app.get('/internal/auth', headers={'X-Original-Uri': 'https://videoag.fsmpi.rwth-aachen.de/files/pub/hls/something'})
assert r.status_code == 404
r = self.app.get('/internal/auth', headers={'X-Original-Uri': 'https://videoag.fsmpi.rwth-aachen.de/files/pub/hls/42'})
assert r.status_code == 404
def test_stats(self):
with self.app as c:
self.login(c)
r = self.app.get('/internal/stats')
assert r.status_code == 200
r = self.app.get('/internal/stats/generic/formats_views')
assert r.status_code == 200
r = self.app.get('/internal/stats/generic/course_count')
assert r.status_code == 200
r = self.app.get('/internal/stats/generic/lectures_count')
assert r.status_code == 200
r = self.app.get('/internal/stats/generic/categories_courses')
assert r.status_code == 200
r = self.app.get('/internal/stats/generic/organizer_courses')
assert r.status_code == 200
r = self.app.get('/internal/stats/generic/categories_lectures')
assert r.status_code == 200
r = self.app.get('/internal/stats/generic/lecture_views')
assert r.status_code == 200
r = self.app.get('/internal/stats/generic/live_views')
assert r.status_code == 200
r = self.app.get('/internal/stats/generic/lecture_totalviews')
assert r.status_code == 200
r = self.app.get('/internal/stats/viewsperday/lecture/1')
assert r.status_code == 200
r = self.app.get('/internal/stats/viewsperday/course/1')
assert r.status_code == 200
r = self.app.get('/internal/stats/viewsperday/global')
assert r.status_code == 200
r = self.app.get('/internal/stats/viewsperday/courses')
assert r.status_code == 200
from flaskunittest import FlaskTestCase
import sorter
from datetime import datetime, date, time
class SorterTestCase(FlaskTestCase):
def test_split_filename(self):
testdata = [
{'filename': 'asdasd', 'chunks': ['asdasd']},
{'filename': 'a-b-c-d', 'chunks': ['a', 'b', 'c', 'd']},
{'filename': 'a_', 'chunks': ['a', '']},
{'filename': 'a-', 'chunks': ['a', '']},
{'filename': 'a ', 'chunks': ['a', '']},
{'filename': '', 'chunks': ['', 'ß']},
{'filename': 'b-a.mp4', 'chunks': ['b', 'a.mp4']},
{'filename': '', 'chunks': ['']}
]
for test in testdata:
result = sorter.split_filename(test['filename'])
assert result == test['chunks'], 'result was {}, should be {}'.format(result, test)
def test_extract_format_keyword_from_filename(self):
testdata = [
{'chunks': ['',''], 'format': ''},
{'chunks': ['asd','720p'], 'format': '720p'},
{'chunks': ['asd','720P'], 'format': '720p'},
{'chunks': ['asd','#\ää'], 'format': '#\ää'},
{'chunks': ['123'], 'format': '123'},
]
for test in testdata:
result = sorter.extract_format_keyword_from_filename(test['chunks'])
assert result == test['format'], 'result was {}, should be {}'.format(result, test)
def test_filter_formats_by_filename(self):
testdata = [
{'chunks': ['ääüp','ßääää'], 'format': 0},
{'chunks': ['123üß','720P'], 'format': 5},
{'chunks': ['testvideo','1080p'], 'format': 4},
{'chunks': ['mp3'], 'format': 7},
]
with self.requestContext:
for test in testdata:
result = sorter.filter_formats_by_filename(test['chunks'])
assert result == test['formatid'], 'result was {}, should be {}'.format(result, test)
def test_parse_filename(self):
testdata = [
{'filename': '', 'data': {'keywords': ['']}},
{'filename': '18ss-mc-180413_720p.mp4', 'data': {'keywords': ['18ss', 'mc', '720p'], 'date': date(year=2018, month=4, day=13)}},
{'filename': 'astaintern-astawiki-1080p.mp4', 'data': {'keywords': ['astaintern', 'astawiki', '1080p']}},
{'filename': '15ss-zkk-extremale-codes-720p.mp4', 'data': {'keywords': ['15ss', 'zkk', 'extremale', 'codes', '720p']}},
# TODO: missing test data for time
]
for test in testdata:
result = sorter.parse_filename(sorter.split_filename(test['filename']))
assert result == test['data'], 'result was {}, should be {}'.format(result, test['data'])
def test_filter_lectures_by_keywords(self):
testdata = [
{'lectures': [], 'keywords': []},
{'lectures': [{'title': 'a', 'result': True}, {'title': 'b:', 'result': False}], 'keywords': ['a']},
{'lectures': [{'speaker': 'aca', 'result': True}, {'comment': 'bbd:', 'result': False}], 'keywords': ['c']},
{'lectures': [{'internal': 'apäöa', 'result': False}, {'comment': 'bbd:', 'result': False}], 'keywords': ['c']},
{'lectures': [{'internal': 'alll', 'result': False}, {'comment': 'bbdäo', 'result': True}], 'keywords': ['ä']},
]
for test in testdata:
result = sorter.filter_lectures_by_keywords(test['lectures'], test['keywords'])
for i in result:
assert i.get('result')
for i in test.get('lectures', []):
assert (not i.get('result')) or (i in result)
def test_filter_lectures_by_datetime(self):
testdata = [
{'lectures': [], 'date': None, 'time': None},
{'lectures': [{'time': datetime(year=2000, month=1, day=1), 'result': True}], 'date': None, 'time': time(hour=0, minute=0)},
{'lectures': [{'time': datetime(year=2000, month=1, day=1), 'result': False}], 'date': None, 'time': time(hour=0, minute=1)},
{'lectures': [{'result': False}], 'date': None, 'time': time(hour=0, minute=1)},
# TODO: add more testdata
]
for test in testdata:
result = sorter.filter_lectures_by_datetime(test['lectures'], test.get('date'), test.get('time'))
for i in result:
assert i.get('result')
for i in test.get('lectures', []):
assert (not i.get('result')) or (i in result)
pass
def test_filter_formats_by_filename(self):
pass
def test_sort_file(self):
testdata = [
{'filename': '08ws-swt-081118.mp4', 'match': [104], 'fmt': 0},
{'filename': '15ss-zkk-extremale-codes-1080p.mp4', 'match': [6095], 'fmt': 4},
{'filename': '15ws-afi-151027-720p.mp4', 'match': [6326], 'fmt': 5},
]
with self.requestContext:
for test in testdata:
match, fmt = sorter.sort_file(test['filename'])
assert len(match) == len(test['match'])
for i in match:
assert i['id'] in test['match'], '{} is not supposed to match, only {} is'.format(i['id'], test['match'])
assert fmt == test['fmt'], 'format id {} is wronge, it is supposed to be {}'.format(fmt, test['fmt'])
from server import *
from datetime import time
@register_navbar('personalisierter Drehplan', icon='calendar', userendpoint=True, endpoint='timetable_user')
@register_navbar('Drehplan', icon='calendar')
@app.route('/internal/timetable')
@app.route('/internal/user/<int:user>/timetable', endpoint='timetable_user')
@mod_required
def timetable(user=None):
if 'kw' not in request.args:
if 'date' in request.args:
thisweekmonday = datetime.now()
thisweekmonday -= timedelta(days=thisweekmonday.weekday())
def get_monday(day):
return day-timedelta(days=day.weekday())
def get_week_offset(value):
if value is None:
return 0
day = None
for pattern in ['%d-%m-%Y-1', '%Y-W%W-%w']:
try:
datesweekmonday = datetime.strptime(request.args['date'], '%d-%m-%Y')
except ValueError:
datesweekmonday = None
if not datesweekmonday:
try:
datesweekmonday = datetime.strptime(request.args['date'] + '-1', "%Y-W%W-%w")
day = datetime.strptime(value+'-1', pattern)
except ValueError:
datesweekmonday = None
pass
if day is not None:
break
if day is None:
return 0
return int((get_monday(day) - get_monday(datetime.now())).days/7)
if not datesweekmonday:
kw = 0
else:
datesweekmonday -= timedelta(days=datesweekmonday.weekday())
kw = int((datesweekmonday.date() - thisweekmonday.date()).days/7)
else:
kw=0
else:
kw=int(request.args['kw'])
try:
start = date.today() - timedelta(days=date.today().weekday() -7*kw)
except:
start = date.today() - timedelta(days=date.today().weekday())
weekofyear = '{}-W{:02d}'.format(datetime.today().year, datetime.today().isocalendar()[1])
days = [{'date': start, 'lectures': [], 'atonce':0, 'index': 0 }]
earlieststart=time(23,59)
latestend=time(0,0)
for i in range(1,7):
days.append({'date': days[i-1]['date'] + timedelta(days=1), 'atonce':0, 'index': i, 'lectures':[] })
for i in days:
# date and times are burning in sqlite
s = datetime.combine(i['date'],time(0,0))
e = datetime.combine(i['date'],time(23,59))
i['lectures'] = []
for l in query ('''
SELECT lectures.*, courses.short, "course" AS sep, courses.*
def query_lectures_on_day(start, end):
# What we want to match:
# lecture.time <= end AND lecture.time+lecture.duration >= start
# But there is no SQL statement that does this and is compatible with both sqlite
# and mysql, so we approximate the "lecture.time+lecture.duration" part
rows = query('''SELECT lectures.*, courses.short, \'course\' AS sep, courses.*
FROM lectures
JOIN courses ON (lectures.course_id = courses.id)
WHERE time < ? AND time > ? AND NOT norecording AND NOT external
ORDER BY time ASC''', i['date']+timedelta(weeks=2), i['date']-timedelta(weeks=2)):
# we can not use the where clause of sql to match against the time, because sqlite and mysql use a different syntax -.-
# we still use it to only get the lectures for a 3 week periode
if not l['time']:
l['time'] = datetime.fromtimestamp(0)
if ((l['time'] < e) and (l['time'] > s)) or ((l['time'] + timedelta(minutes=l['duration']) < e) and (l['time'] + timedelta(minutes=l['duration'])> s)):
# filter on responsible user if a user parameter was given
l['responsible'] = query('''SELECT users.*
FROM responsible
JOIN users ON (responsible.user_id = users.id AND responsible.course_id = ?)
ORDER BY users.realname ASC''', l['course_id'])
if len(l['responsible']) == 0:
l['responsible'] = [{"realname": "Niemand", "id": -1}]
if not user or user in [ r['id'] for r in l['responsible'] ]:
i['lectures'].append(l)
WHERE time <= ? AND time > ?
ORDER BY time ASC''', end, start-timedelta(weeks=2))
lectures = []
for lecture in rows:
if lecture['time']+timedelta(minutes=lecture['duration']) >= start:
lectures.append(lecture)
return lectures
oldtime = l['time']
l['time'] = max(s,l['time'])
l['duration'] = ( min(e,oldtime + timedelta(minutes=l['duration'])) - l['time'] ).total_seconds()/60
# sweepline to find out how many lectures overlap
maxcol=0;
curcol=0;
freecol=[];
for l in i['lectures']:
# who the hell inserts lectures with zero length?!?!?
l['time_end'] = l['time']+timedelta(minutes=max(l['duration'],1))
# create sweepline input array
sweeplinetupels = [(l['time'],True,l) for l in i['lectures']]
sweeplinetupels += [(l['time_end'],False,l) for l in i['lectures']]
tmp = []
for x in sweeplinetupels:
unique = True
for y in tmp:
if x[0] == y[0] and x[1] == y[1] and x[2]['short'] == y[2]['short']:
unique = False
if unique:
tmp.append(x)
'''Use a sweepline algorithm to find overlapping lectures
sweeplinetupels = sorted(tmp, key=lambda t:(t[0],t[1]))
for l in sweeplinetupels:
if l[1]:
For each day the item 'maxcol' will be the number of columns required to display
the overlapping lectures. For each lecture the item 'timetable_col' will be the
index of the column the lecture is going to be rendered in.'''
def timetable_sweepline(days):
earliest_start = time(23, 59)
latest_end = time(0, 0)
for day in days:
sweeplinetupels = [(lecture['time'].time(), True, lecture) for lecture in day['lectures']]
sweeplinetupels += [(lecture['time_end'].time(), False, lecture) for lecture in day['lectures']]
maxcol = 0
curcol = 0
freecol = []
sweeplinetupels.sort(key=lambda row: row[:2])
for timestamp, is_start, lecture in sweeplinetupels:
if is_start:
curcol += 1
if curcol > maxcol:
maxcol = curcol
if len(freecol) == 0:
freecol.append(maxcol)
l[2]['timetable_col'] = freecol.pop()
if earlieststart > l[0].time():
earlieststart = l[0].time()
maxcol = max(maxcol, curcol)
if freecol:
lecture['timetable_col'] = freecol.pop()
else:
lecture['timetable_col'] = maxcol
earliest_start = min(earliest_start, timestamp)
else:
curcol -= 1
freecol.append(l[2]['timetable_col'])
if latestend < l[0].time():
latestend = l[0].time()
i['maxcol'] = max(maxcol,1)
times=[]
s = min(earlieststart,time(8,0))
e = max(latestend,time(19,0))
for i in range(s.hour*4,min(int((60*e.hour/15)/4)*4+5,24*4)):
t = i*15
times.append(time(int(t/60),t%60))
return render_template('timetable.html',days=days,times=times,kw=kw, weekofyear=weekofyear, user=query('SELECT * FROM users WHERE id = ?', user)[0] if user else None)
freecol.append(lecture['timetable_col'])
latest_end = max(latest_end, timestamp)
day['maxcol'] = max(maxcol, 1)
return earliest_start, latest_end
@register_navbar('personalisierter Drehplan', icon='calendar', userendpoint=True, endpoint='timetable_user')
@register_navbar('Drehplan', icon='calendar')
@app.route('/internal/timetable')
@app.route('/internal/user/<int:user>/timetable', endpoint='timetable_user')
@mod_required
def timetable(user=None):
if 'kw' in request.args:
week_offset = int(request.args['kw'])
else:
week_offset = get_week_offset(request.args.get('date', None))
start_day = date.today() - timedelta(days=date.today().weekday() - 7*week_offset)
days = [{'date': start_day, 'lectures': [], 'atonce': 0, 'index': 0}]
for i in range(1, 7):
days.append({'date': days[i-1]['date'] + timedelta(days=1), 'atonce': 0, 'index': i, 'lectures': []})
for day in days:
start = datetime.combine(day['date'], time(0, 0))
end = datetime.combine(day['date'], time(23, 59))
day['lectures'] = []
for lecture in query_lectures_on_day(start, end):
lecture['time_end'] = lecture['time']+timedelta(minutes=lecture['duration'])
# "Crop" lecture's timespan to start/end of day
lecture['time'] = max(start, lecture['time'])
lecture['time_end'] = min(end, lecture['time_end'])
# Ensure length > 0
lecture['time_end'] = max(lecture['time_end'], lecture['time']+timedelta(minutes=1))
lecture['duration'] = int((lecture['time_end'] - lecture['time']).total_seconds()/60)
# Filter on responsible user if a user parameter was given
lecture['responsible'] = query('''SELECT users.*
FROM responsible
JOIN users ON (responsible.user_id = users.id AND responsible.course_id = ?)
ORDER BY users.realname ASC''', lecture['course_id'])
if len(lecture['responsible']) == 0:
lecture['responsible'] = [{"realname": "Niemand", "id": -1}]
if not user or user in [r['id'] for r in lecture['responsible']]:
day['lectures'].append(lecture)
earliest_start, latest_end = timetable_sweepline(days)
start = min(earliest_start, time(8, 0))
end = max(latest_end, time(19, 0))
blocks = []
for i in range(start.hour*4, min(int((60*end.hour/15)/4)*4+5, 24*4)):
timestamp = i*15
blocks.append(time(int(timestamp/60), timestamp%60))
weekofyear = '{}-W{:02d}'.format(datetime.today().year, datetime.today().isocalendar()[1])
return render_template('timetable.html',
days=days,
blocks=blocks,
kw=week_offset,
weekofyear=weekofyear,
user=query('SELECT * FROM users WHERE id = ?', user)[0] if user else None)