from server import * import requests from xml.etree import ElementTree import random import string @sched_func(120) def livestream_thumbnail(): livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active') lectures = query('SELECT * FROM lectures WHERE stream_job IS NOT NULL') for v in genlive(livestreams)+genlive_new(lectures): schedule_job('thumbnail', {'src': v['path'], 'filename': 'l_%i.jpg'%v['lecture_id']}) @app.route('/internal/streaming/legacy_auth', methods=['GET', 'POST']) @app.route('/internal/streaming/legacy_auth/', methods=['GET', 'POST']) def streamauth_legacy(server=None): internal = False if 'X-Real-IP' in request.headers: for net in config.get('FSMPI_IP_RANGES', []): if ip_address(request.headers['X-Real-IP']) in ip_network(net): internal = True if request.values['app'] != 'live': return 'Bad request', 400 if not internal: return 'Forbidden', 403 if request.values['call'] == 'publish': if request.values['pass'] != 'caisoh8aht0wuSu': return 'Forbidden', 403 matches = query("SELECT lectures.* FROM lectures JOIN courses ON lectures.course_id = courses.id WHERE courses.handle = ? ORDER BY lectures.time DESC", request.values['name']) now = datetime.now() match = {'id': -1} for lecture in matches: if lecture['time']-timedelta(minutes=30) <= now and \ now <= lecture['time']+timedelta(minutes=lecture['duration']): match = lecture break if 'lecture' in request.values: match = {'id': request.values['lecture']} try: modify("INSERT INTO streams (handle, active, visible, lecture_id, description, poster) VALUES (?, 0, 1, -1, "", "")", request.values['name']) except: pass if server: data = {'src': 'rtmp://%s/live/%s'%(server, request.values['name']), 'destbase': 'rtmp://%s/hls/%s'%(server, request.values['name'])} job_id = schedule_job('simple_live_transcode', data, priority=10) modify("UPDATE streams SET active = 1, lecture_id = ?, job_id = ? WHERE handle = ?", match['id'], job_id, request.values['name']) else: modify("UPDATE streams SET active = 1, lecture_id = ? WHERE handle = ?", match['id'], request.values['name']) elif request.values['call'] == 'publish_done': job_id = query('SELECT job_id FROM streams WHERE handle = ?', request.values['name'])[0]['job_id'] modify("UPDATE streams SET active = 0 WHERE handle = ?", request.values['name']) if job_id: cancel_job(job_id) else: return 'Bad request', 400 return 'OK', 200 @job_handler('simple_live_transcode', state='failed') def restart_failed_live_transcode(id, type, data, state, status): restart_job(id) @app.route('/internal/streaming') @register_navbar('Streaming', icon='broadcast-tower', iconlib='fa') @mod_required def streaming(): sources = query('SELECT * FROM live_sources WHERE NOT deleted') for source in sources: if not source['clientid']: continue r = requests.get('http://%s:8080/stats'%source['server']) if r.status_code != 200: continue source['stat'] = {} tree = ElementTree.fromstring(r.text) if not tree: continue s = tree.find("./server/application/[name='src']/live/stream/[name='%i']"%source['id']) if not s: continue for e in s.find("client/[publishing='']").getchildren(): source['stat'][e.tag] = e.text source['video'] = {} for e in s.find('meta/video').getchildren(): source['video'][e.tag] = e.text source['audio'] = {} for e in s.find('meta/audio').getchildren(): source['audio'][e.tag] = e.text return render_template("streaming.html", sources=sources) def gentoken(): return ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(16)) @app.route('/internal/streaming/rekey/') @mod_required def streamrekey(id): modify('UPDATE live_sources SET `key` = ? WHERE id = ? AND NOT deleted', gentoken(), id) source = query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id)[0] flash('Der Streamkey von '+source['name']+' wurde neu generiert: ') return redirect(url_for('streaming')) @app.route('/internal/streaming/drop/') @mod_required def streamdrop(id): source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id) or [None])[0] if not source: if 'ref' in request.values: flash('Streamquelle nicht gefunden') return redirect(request.values['ref']) else: return 'Not found', 404 requests.get('http://%s:8080/control/drop/publisher?clientid=%i'%(source['server'], source['clientid'])) if 'ref' in request.values: return redirect(request.values['ref']) return 'Ok', 200 @sched_func(120) def live_source_thumbnail(): sources = query('SELECT * FROM live_sources WHERE clientid IS NOT NULL') for source in sources: schedule_job('thumbnail', {'srcurl': 'rtmp://%s/src/%i'%(source['server'], source['id']), 'filename': 's_%i.jpg'%source['id']}) @app.route('/internal/streaming/auth/', methods=['GET', 'POST']) def streamauth(server): internal = False for net in config.get('FSMPI_IP_RANGES', []): if ip_address(request.headers['X-Real-IP']) in ip_network(net): internal = True if not internal: return 'Forbidden', 403 if request.values['call'] == 'publish': sources = query('SELECT * FROM live_sources WHERE NOT deleted AND `key` = ?', request.values['name']) if not sources: return 'Not found', 404 modify('UPDATE live_sources SET server = ?, server_public = ?, clientid = ?, last_active = ?, preview_key = ? WHERE id = ?', server, request.args.get('public_ip', server), request.values['clientid'], datetime.now(), gentoken(), sources[0]['id']) live_source_thumbnail() ret = Response('Redirect', 301, {'Location': '%i'%sources[0]['id']}) ret.autocorrect_location_header = False return ret if request.values['call'] == 'play': source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', request.values['name']) or [None])[0] if not source: return 'Not found', 404 for net in config.get('INTERNAL_IP_RANGES', []): if ip_address(request.values['addr']) in ip_network(net): return 'Ok', 200 if source['preview_key'] == request.values.get('preview_key'): return 'Ok', 200 return 'Forbidden', 403 elif request.values['call'] == 'publish_done': source = (query('SELECT * FROM live_sources WHERE server = ? AND clientid = ?', server, request.values['clientid']) or [None])[0] modify('UPDATE live_sources SET server = NULL, clientid = NULL, preview_key = NULL, last_active = ? WHERE server = ? AND clientid = ?', datetime.now(), server, request.values['clientid']) if not source: return 'Ok', 200 for lecture in query('SELECT * FROM lectures WHERE stream_job IS NOT NULL'): settings = json.loads(lecture['stream_settings']) if str(source['id']) in [str(settings.get('source1')), str(settings.get('source2'))]: cancel_job(lecture['stream_job']) return 'Ok', 200 return 'Bad request', 400 def schedule_livestream(lecture_id): def build_filter(l): return ','.join(l) if l else None server = 'rwth.video' lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0] settings = json.loads(lecture['stream_settings']) data = {'src1': {'afilter': [], 'vfilter': []}, 'src2': {'afilter': [], 'vfilter': []}, 'afilter': [], 'videoag_logo': int(bool(settings.get('video_showlogo'))), 'lecture_id': lecture['id']} src1 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source1')) or [{}])[0] src2 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source2')) or [{}])[0] for idx, obj in zip([1,2], [src1, src2]): if obj: server = obj['server'] data['src%i'%idx]['url'] = 'rtmp://%s/src/%i'%(obj['server'], obj['id']) if not obj['clientid']: flash('Quelle „%s“ ist nicht aktiv!'%obj['name']) return None if settings.get('source%i_deinterlace'%idx): data['src%i'%idx]['vfilter'].append('yadif') mode = settings.get('source%i_audiomode'%idx) leftvol = float(settings.get('source%i_leftvolume'%idx, 100))/100.0 rightvol = float(settings.get('source%i_rightvolume'%idx, 100))/100.0 if mode == 'mono': data['src%i'%idx]['afilter'].append('pan=mono|c0=%f*c0+%f*c1'%(0.5*leftvol, 0.5*rightvol)) elif mode == 'stereo': data['src%i'%idx]['afilter'].append('pan=stereo|c0=%f*c0|c1=%f*c1'%(leftvol, rightvol)) elif mode == 'unchanged': pass elif mode == 'off': data['src%i'%idx]['afilter'].append('pan=mono|c0=0*c0') else: raise(Exception()) mode = settings.get('videomode') if mode == '1': data['vmix'] = 'streamselect=map=0' elif mode == '2': data['vmix'] = 'streamselect=map=1' elif mode == 'lecture4:3': data['src1']['vfilter'].append('scale=1440:1080') data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080') data['vmix'] = 'hstack' elif mode == 'lecture16:9': data['src1']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135') data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080') data['vmix'] = 'hstack' elif mode == 'sidebyside': data['src1']['vfilter'].append('scale=960:540') data['src2']['vfilter'].append('scale=960:540') data['vmix'] = 'hstack,pad=1920:1080:0:270' if settings.get('audio_normalize'): data['afilter'].append('loudnorm') data['afilter'] = build_filter(data['afilter']) data['src1']['afilter'] = build_filter(data['src1']['afilter']) data['src1']['vfilter'] = build_filter(data['src1']['vfilter']) data['src2']['afilter'] = build_filter(data['src2']['afilter']) data['src2']['vfilter'] = build_filter(data['src2']['vfilter']) data['destbase'] = 'rtmp://%s/hls/%i'%(server, lecture['id']) if lecture['stream_job']: flash('Stream läuft bereits!') return None job_id = schedule_job('complex_live_transcode', data, priority=10) modify('UPDATE lectures_data SET stream_job = ? WHERE id = ? AND stream_job IS NULL', job_id, lecture_id) if query('SELECT stream_job FROM lectures WHERE id = ?', lecture_id)[0]['stream_job'] != job_id: flash('Stream läuft bereits!') cancel_job(job_id) return None return job_id @job_handler('complex_live_transcode', state='failed') def restart_failed_complex_live_transcode(id, type, data, state, status): restart_job(id) @job_handler('complex_live_transcode', state='failed') @job_handler('complex_live_transcode', state='finished') def cleanup_after_complex_live_transcode_ended(id, type, data, state, status): job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0] if state == 'finished' or (state == 'failed' and job['canceled']): modify('UPDATE lectures_data SET stream_job = NULL WHERE stream_job = ?', id) @app.route('/internal/streaming/control', methods=['POST']) @mod_required def control_stream(): action = request.values['action'] lecture_id = int(request.values['lecture_id']) course = (query('SELECT courses.* FROM courses JOIN lectures ON (courses.id = lectures.course_id) WHERE lectures.id = ?', lecture_id) or [None])[0] if action == 'start': schedule_livestream(lecture_id) elif action == 'stop': lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0] cancel_job(lecture['stream_job']) return redirect(url_for('course', handle=course['handle']))