Skip to content
Snippets Groups Projects
Select Git revision
  • ab24a7240d3738ee252a030edcb0661132945576
  • master default protected
  • forbid-save-as
  • upload-via-token
  • moodle-integration
  • patch-double-tap-seek
  • patch_datum_anzeigen
  • patch_raum_anzeigen
  • intros
  • live_sources
  • bootstrap4
  • modules
12 results

livestreams.py

Blame
  • Forked from Video AG Infrastruktur / website
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    livestreams.py 10.98 KiB
    from server import *
    import requests
    from xml.etree import ElementTree
    import random
    import string
    
    @sched_func(120)
    def livestream_thumbnail():
    	livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active')
    	lectures = query('SELECT * FROM lectures WHERE stream_job IS NOT NULL')
    	for v in genlive(livestreams)+genlive_new(lectures):
    		schedule_job('thumbnail', {'src': v['path'], 'filename': 'l_%i.jpg'%v['lecture_id']})
    
    @app.route('/internal/streaming/legacy_auth', methods=['GET', 'POST'])
    @app.route('/internal/streaming/legacy_auth/<server>', methods=['GET', 'POST'])
    def streamauth_legacy(server=None):
    	internal = False
    	if 'X-Real-IP' in request.headers:
    		for net in config.get('FSMPI_IP_RANGES', []):
    			if ip_address(request.headers['X-Real-IP']) in ip_network(net):
    				internal = True
    	if request.values['app'] != 'live':
    		return 'Bad request', 400
    	if not internal:
    		return 'Forbidden', 403
    	if request.values['call'] == 'publish':
    		if request.values['pass'] != 'caisoh8aht0wuSu':
    			return 'Forbidden', 403
    		matches = query("SELECT lectures.* FROM lectures JOIN courses ON lectures.course_id = courses.id WHERE courses.handle = ? ORDER BY lectures.time DESC", request.values['name'])
    		now = datetime.now()
    		match = {'id': -1}
    		for lecture in matches:
    			if lecture['time']-timedelta(minutes=30) <= now and \
    				now <= lecture['time']+timedelta(minutes=lecture['duration']):
    				match = lecture
    				break
    		if 'lecture' in request.values:
    			match = {'id': request.values['lecture']}
    		try:
    			modify("INSERT INTO streams (handle, active, visible, lecture_id, description, poster) VALUES (?, 0, 1, -1, "", "")", request.values['name'])
    		except:
    			pass
    		if server:
    			data = {'src': 'rtmp://%s/live/%s'%(server, request.values['name']),
    				'destbase': 'rtmp://%s/hls/%s'%(server, request.values['name'])}
    			job_id = schedule_job('simple_live_transcode', data, priority=10)
    			modify("UPDATE streams SET active = 1, lecture_id = ?, job_id = ? WHERE handle = ?",
    					match['id'], job_id, request.values['name'])
    		else:
    			modify("UPDATE streams SET active = 1, lecture_id = ? WHERE handle = ?",
    					match['id'], request.values['name'])
    	elif request.values['call'] == 'publish_done':
    		job_id = query('SELECT job_id FROM streams WHERE handle = ?', request.values['name'])[0]['job_id']
    		modify("UPDATE streams SET active = 0 WHERE handle = ?", request.values['name'])
    		if job_id:
    			cancel_job(job_id)
    	else:
    		return 'Bad request', 400
    	return 'OK', 200
    
    @job_handler('simple_live_transcode', state='failed')
    def restart_failed_live_transcode(id, type, data, state, status):
    	restart_job(id)
    
    @app.route('/internal/streaming')
    @register_navbar('Streaming', icon='broadcast-tower', iconlib='fa')
    @mod_required
    def streaming():
    	sources = query('SELECT * FROM live_sources WHERE NOT deleted')
    	for source in sources:
    		if not source['clientid']:
    			continue
    		r = requests.get('http://%s:8080/stats'%source['server'])
    		if r.status_code != 200:
    			continue
    		source['stat'] = {}
    		tree = ElementTree.fromstring(r.text)
    		if not tree:
    			continue
    		s = tree.find("./server/application/[name='src']/live/stream/[name='%i']"%source['id'])
    		if not s:
    			continue
    		for e in s.find("client/[publishing='']").getchildren():
    			source['stat'][e.tag] = e.text
    		source['video'] = {}
    		for e in s.find('meta/video').getchildren():
    			source['video'][e.tag] = e.text
    		source['audio'] = {}
    		for e in s.find('meta/audio').getchildren():
    			source['audio'][e.tag] = e.text
    	return render_template("streaming.html", sources=sources)
    
    def gentoken():
    	return ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(16))
    
    @app.route('/internal/streaming/rekey/<int:id>')
    @mod_required
    def streamrekey(id):
    	modify('UPDATE live_sources SET `key` = ? WHERE id = ? AND NOT deleted', gentoken(), id)
    	source = query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id)[0]
    	flash('Der Streamkey von <strong>'+source['name']+'</strong> wurde neu generiert: <span><input readonly type="text" style="width: 15em" value="'+source['key']+'"></span>')
    	return redirect(url_for('streaming'))
    
    @app.route('/internal/streaming/drop/<int:id>')
    @mod_required
    def streamdrop(id):
    	source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id) or [None])[0]
    	if not source:
    		if 'ref' in request.values:
    			flash('Streamquelle nicht gefunden')
    			return redirect(request.values['ref'])
    		else:
    			return 'Not found', 404
    	requests.get('http://%s:8080/control/drop/publisher?clientid=%i'%(source['server'], source['clientid']))
    	if 'ref' in request.values:
    		return redirect(request.values['ref'])
    	return 'Ok', 200
    
    @sched_func(120)
    def live_source_thumbnail():
    	sources = query('SELECT * FROM live_sources WHERE clientid IS NOT NULL')
    	for source in sources:
    		schedule_job('thumbnail', {'srcurl': 'rtmp://%s/src/%i'%(source['server'], source['id']), 'filename': 's_%i.jpg'%source['id']})
    
    @app.route('/internal/streaming/auth/<server>', methods=['GET', 'POST'])
    def streamauth(server):
    	internal = False
    	for net in config.get('FSMPI_IP_RANGES', []):
    		if ip_address(request.headers['X-Real-IP']) in ip_network(net):
    			internal = True
    	if not internal:
    		return 'Forbidden', 403
    	if request.values['call'] == 'publish':
    		sources = query('SELECT * FROM live_sources WHERE NOT deleted AND `key` = ?', request.values['name'])
    		if not sources:
    			return 'Not found', 404
    		modify('UPDATE live_sources SET server = ?, server_public = ?, clientid = ?, last_active = ?, preview_key = ? WHERE id = ?', server, request.args.get('public_ip', server), request.values['clientid'], datetime.now(), gentoken(), sources[0]['id'])
    		live_source_thumbnail()
    		ret = Response('Redirect', 301, {'Location': '%i'%sources[0]['id']})
    		ret.autocorrect_location_header = False
    		return ret
    	if request.values['call'] == 'play':
    		source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', request.values['name']) or [None])[0]
    		if not source:
    			return 'Not found', 404
    		for net in config.get('INTERNAL_IP_RANGES', []):
    			if ip_address(request.values['addr']) in ip_network(net):
    				return 'Ok', 200
    		if source['preview_key'] == request.values.get('preview_key'):
    			return 'Ok', 200
    		return 'Forbidden', 403
    	elif request.values['call'] == 'publish_done':
    		source = (query('SELECT * FROM live_sources WHERE server = ? AND clientid = ?', server, request.values['clientid']) or [None])[0]
    		modify('UPDATE live_sources SET server = NULL, clientid = NULL, preview_key = NULL, last_active = ? WHERE server = ? AND clientid = ?', datetime.now(), server, request.values['clientid'])
    		if not source:
    			return 'Ok', 200
    		for lecture in query('SELECT * FROM lectures WHERE stream_job IS NOT NULL'):
    			settings = json.loads(lecture['stream_settings'])
    			if str(source['id']) in [str(settings.get('source1')), str(settings.get('source2'))]:
    				cancel_job(lecture['stream_job'])
    		return 'Ok', 200
    	return 'Bad request', 400
    
    def schedule_livestream(lecture_id):
    	def build_filter(l):
    		return ','.join(l) if l else None
    	server = 'rwth.video'
    	lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0]
    	settings = json.loads(lecture['stream_settings'])
    	data = {'src1': {'afilter': [], 'vfilter': []}, 'src2': {'afilter': [], 'vfilter': []}, 'afilter': [], 'videoag_logo': int(bool(settings.get('video_showlogo'))), 'lecture_id': lecture['id']}
    	src1 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source1')) or [{}])[0]
    	src2 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source2')) or [{}])[0]
    	for idx, obj in zip([1,2], [src1, src2]):
    		if obj:
    			server = obj['server']
    			data['src%i'%idx]['url'] = 'rtmp://%s/src/%i'%(obj['server'], obj['id'])
    			if not obj['clientid']:
    				flash('Quelle „%s“ ist nicht aktiv!'%obj['name'])
    				return None
    		mode = settings.get('source%i_audiomode'%idx)
    		leftvol = float(settings.get('source%i_leftvolume'%idx, 100))/100.0
    		rightvol = float(settings.get('source%i_rightvolume'%idx, 100))/100.0
    		if mode == 'mono':
    			data['src%i'%idx]['afilter'].append('pan=mono|c0=%f*c0+%f*c1'%(0.5*leftvol, 0.5*rightvol))
    		elif mode == 'stereo':
    			data['src%i'%idx]['afilter'].append('pan=stereo|c0=%f*c0|c1=%f*c1'%(leftvol, rightvol))
    		elif mode == 'unchanged':
    			pass
    		elif mode == 'off':
    			data['src%i'%idx]['afilter'].append('pan=mono|c0=0*c0')
    		else:
    			raise(Exception())
    	mode = settings.get('videomode')
    	if mode == '1':
    		data['vmix'] = 'streamselect=map=0'
    	elif mode == '2':
    		data['vmix'] = 'streamselect=map=1'
    	elif mode == 'lecture4:3':
    		data['src1']['vfilter'].append('scale=1440:1080')
    		data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080')
    		data['vmix'] = 'hstack'
    	elif mode == 'lecture16:9':
    		data['src1']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135')
    		data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080')
    		data['vmix'] = 'hstack'
    	elif mode == 'sidebyside':
    		data['src1']['vfilter'].append('scale=960:540')
    		data['src2']['vfilter'].append('scale=960:540')
    		data['vmix'] = 'hstack,pad=1920:1080:0:270'
    	if settings.get('audio_normalize'):
    		data['afilter'].append('loudnorm')
    	data['afilter'] = build_filter(data['afilter'])
    	data['src1']['afilter'] = build_filter(data['src1']['afilter'])
    	data['src1']['vfilter'] = build_filter(data['src1']['vfilter'])
    	data['src2']['afilter'] = build_filter(data['src2']['afilter'])
    	data['src2']['vfilter'] = build_filter(data['src2']['vfilter'])
    	data['destbase'] = 'rtmp://%s/hls/%i'%(server, lecture['id'])
    	if lecture['stream_job']:
    		flash('Stream läuft bereits!')
    		return None
    	job_id = schedule_job('complex_live_transcode', data, priority=10)
    	modify('UPDATE lectures_data SET stream_job = ? WHERE id = ? AND stream_job IS NULL', job_id, lecture_id)
    	if query('SELECT stream_job FROM lectures WHERE id = ?', lecture_id)[0]['stream_job'] != job_id:
    		flash('Stream läuft bereits!')
    		cancel_job(job_id)
    		return None
    	return job_id
    
    @job_handler('complex_live_transcode', state='failed')
    def restart_failed_complex_live_transcode(id, type, data, state, status):
    	restart_job(id)
    
    @job_handler('complex_live_transcode', state='failed')
    @job_handler('complex_live_transcode', state='finished')
    def cleanup_after_complex_live_transcode_ended(id, type, data, state, status):
    	job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
    	if state == 'finished' or (state == 'failed' and job['canceled']):
    		modify('UPDATE lectures_data SET stream_job = NULL WHERE stream_job = ?', id)
    
    @app.route('/internal/streaming/control', methods=['POST'])
    @mod_required
    def control_stream():
    	action = request.values['action']
    	lecture_id = int(request.values['lecture_id'])
    	course = (query('SELECT courses.* FROM courses JOIN lectures ON (courses.id = lectures.course_id) WHERE lectures.id = ?', lecture_id) or [None])[0]
    	if action == 'start':
    		schedule_livestream(lecture_id)
    	elif action == 'stop':
    		lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0]
    		cancel_job(lecture['stream_job'])
    	return redirect(url_for('course', handle=course['handle']))