Skip to content
Snippets Groups Projects
Select Git revision
  • da9e40f1697b0934ae4be4b60374b1c678615c7c
  • master default protected
  • forbid-save-as
  • upload-via-token
  • moodle-integration
  • patch-double-tap-seek
  • patch_datum_anzeigen
  • patch_raum_anzeigen
  • intros
  • live_sources
  • bootstrap4
  • modules
12 results

chapters.py

Blame
  • Forked from Video AG Infrastruktur / website
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    livestreams.py 13.06 KiB
    from xml.etree import ElementTree
    import random
    import string
    from ipaddress import ip_address, ip_network
    import json
    import requests
    
    from server import *
    
    @sched_func(120)
    def livestream_thumbnail():
    	livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active')
    	lectures = query('SELECT * FROM lectures WHERE stream_job IS NOT NULL')
    	for stream in genlive(livestreams)+genlive_new(lectures):
    		schedule_job('thumbnail', {'src': stream['path'], 'filename': 'l_%i.jpg'%stream['lecture_id']})
    
    @app.route('/internal/streaming/legacy_auth', methods=['GET', 'POST'])
    @app.route('/internal/streaming/legacy_auth/<server>', methods=['GET', 'POST'])
    def streamauth_legacy(server=None):
    	# pylint: disable=too-many-branches,bare-except,chained-comparison
    	internal = False
    	if 'X-Real-IP' in request.headers:
    		for net in config.get('FSMPI_IP_RANGES', []):
    			if ip_address(request.headers['X-Real-IP']) in ip_network(net):
    				internal = True
    	if request.values['app'] != 'live':
    		return 'Bad request', 400
    	if not internal:
    		return 'Forbidden', 403
    	if request.values['call'] == 'publish':
    		if request.values['pass'] != 'caisoh8aht0wuSu':
    			return 'Forbidden', 403
    		matches = query('''SELECT lectures.*
    			FROM lectures
    			JOIN courses ON lectures.course_id = courses.id
    			WHERE courses.handle = ?
    			ORDER BY lectures.time DESC''', request.values['name'])
    		now = datetime.now()
    		match = {'id': -1}
    		for lecture in matches:
    			if lecture['time']-timedelta(minutes=30) <= now and \
    				now <= lecture['time']+timedelta(minutes=lecture['duration']):
    				match = lecture
    				break
    		if 'lecture' in request.values:
    			match = {'id': request.values['lecture']}
    		try:
    			modify("INSERT INTO streams (handle, active, visible, lecture_id, description, poster) VALUES (?, 0, 1, -1, "", "")", request.values['name'])
    		except:
    			pass
    		if server:
    			data = {'src': 'rtmp://%s/live/%s'%(server, request.values['name']),
    				'destbase': 'rtmp://%s/hls/%s'%(server, request.values['name'])}
    			job_id = schedule_job('simple_live_transcode', data, priority=10)
    			modify("UPDATE streams SET active = 1, lecture_id = ?, job_id = ? WHERE handle = ?",
    					match['id'], job_id, request.values['name'])
    		else:
    			modify("UPDATE streams SET active = 1, lecture_id = ? WHERE handle = ?",
    					match['id'], request.values['name'])
    	elif request.values['call'] == 'publish_done':
    		job_id = query('SELECT job_id FROM streams WHERE handle = ?', request.values['name'])[0]['job_id']
    		modify("UPDATE streams SET active = 0 WHERE handle = ?", request.values['name'])
    		if job_id:
    			cancel_job(job_id)
    	else:
    		return 'Bad request', 400
    	return 'OK', 200
    
    @job_handler('simple_live_transcode', state='failed')
    def restart_failed_live_transcode(id, type, data, state, status): # pylint: disable=unused-argument
    	restart_job(id)
    
    @app.route('/internal/streaming')
    @register_navbar('Streaming', icon='broadcast-tower', iconlib='fa')
    @mod_required
    def streaming():
    	# pylint: disable=invalid-name
    	sources = query('SELECT * FROM live_sources WHERE NOT deleted')
    	for source in sources:
    		if not source['clientid']:
    			continue
    		req = requests.get('http://%s:8080/stats'%source['server'])
    		if req.status_code != 200:
    			continue
    		source['stat'] = {}
    		tree = ElementTree.fromstring(req.text)
    		if not tree:
    			continue
    		s = tree.find("./server/application/[name='src']/live/stream/[name='%i']"%source['id'])
    		if not s:
    			continue
    		for e in s.find("client/[publishing='']").getchildren():
    			source['stat'][e.tag] = e.text
    		source['video'] = {}
    		for e in s.find('meta/video').getchildren():
    			source['video'][e.tag] = e.text
    		source['audio'] = {}
    		for e in s.find('meta/audio').getchildren():
    			source['audio'][e.tag] = e.text
    	return render_template("streaming.html", sources=sources)
    
    def gentoken():
    	return ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(16))
    
    @app.route('/internal/streaming/rekey/<int:id>')
    @mod_required
    def streamrekey(id):
    	modify('UPDATE live_sources SET `key` = ? WHERE id = ? AND NOT deleted', gentoken(), id)
    	source = query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id)[0]
    	flash('''Der Streamkey von <strong>{name}</strong> wurde neu generiert:
    			<span><input readonly type="text" style="width: 15em" value="{key}"></span><br>
    			Trage diesen Streamkey zusammen mit einem der folgenden Streamingserver in die Streamingsoftware ein:
    			<ul>
    				<li>{server}</li>
    				<li>{backup_server}</li>
    			</ul>Insgesamt sollte die Streaming-URL z.B. so aussehen:
    			<a href="{server}{key}">{server}{key}</a>'''.format(name=source['name'],
    		key=source['key'], server=config['STREAMING_SERVER'],
    		backup_server=config['BACKUP_STREAMING_SERVER']))
    	return redirect(url_for('streaming'))
    
    @app.route('/internal/streaming/drop/<int:id>')
    @mod_required
    def streamdrop(id):
    	source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id) or [None])[0]
    	if not source:
    		if 'ref' in request.values:
    			flash('Streamquelle nicht gefunden')
    			return redirect(request.values['ref'])
    		else:
    			return 'Not found', 404
    	requests.get('http://%s:8080/control/drop/publisher?clientid=%i'%(source['server'], source['clientid']))
    	if 'ref' in request.values:
    		return redirect(request.values['ref'])
    	return 'Ok', 200
    
    @sched_func(120)
    def live_source_thumbnail():
    	sources = query('SELECT * FROM live_sources WHERE clientid IS NOT NULL')
    	for source in sources:
    		schedule_job('thumbnail', {'srcurl': 'rtmp://%s/src/%i'%(source['server'], source['id']), 'filename': 's_%i.jpg'%source['id']})
    
    def ip_in_networks(ip, networks):
    	for net in networks:
    		if ip_address(ip) in ip_network(net):
    			return True
    	return False
    
    @app.route('/internal/streaming/auth/<server>', methods=['GET', 'POST'])
    def streamauth(server):
    	# pylint: disable=too-many-return-statements
    	if not ip_in_networks(request.headers['X-Real-IP'], config.get('FSMPI_IP_RANGES', [])):
    		return 'Forbidden', 403
    	# Sources publish their streams at rtmp://example.com/src/{key} and are
    	# the redirected to rtmp://example.com/src/{id} to hide the secret stream key
    	if request.values['call'] == 'publish':
    		sources = query('SELECT * FROM live_sources WHERE NOT deleted AND `key` = ?', request.values['name'])
    		if not sources:
    			return 'Not found', 404
    		modify('UPDATE live_sources SET server = ?, server_public = ?, clientid = ?, last_active = ?, preview_key = ? WHERE id = ?',
    			server, request.args.get('public_ip', server), request.values['clientid'],
    			datetime.now(), gentoken(), sources[0]['id'])
    		live_source_thumbnail()
    		ret = Response('Redirect', 301, {'Location': '%i'%sources[0]['id']})
    		ret.autocorrect_location_header = False
    		return ret
    	elif request.values['call'] == 'publish_done':
    		source = (query('SELECT * FROM live_sources WHERE server = ? AND clientid = ?', server, request.values['clientid']) or [None])[0]
    		modify('UPDATE live_sources SET server = NULL, clientid = NULL, preview_key = NULL, last_active = ? WHERE server = ? AND clientid = ?',
    			datetime.now(), server, request.values['clientid'])
    		if not source:
    			return 'Ok', 200
    		for lecture in query('SELECT * FROM lectures WHERE stream_job IS NOT NULL'):
    			settings = json.loads(lecture['stream_settings'])
    			if str(source['id']) in [str(settings.get('source1')), str(settings.get('source2'))]:
    				cancel_job(lecture['stream_job'])
    		return 'Ok', 200
    	elif request.values['call'] == 'play':
    		source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', request.values['name']) or [None])[0]
    		if not source:
    			return 'Not found', 404
    		if ip_in_networks(request.values['addr'], config.get('INTERNAL_IP_RANGES', [])):
    			return 'Ok', 200
    		if source['preview_key'] == request.values.get('preview_key'):
    			return 'Ok', 200
    		return 'Forbidden', 403
    	return 'Bad request', 400
    
    def schedule_livestream(lecture_id):
    	# pylint: disable=too-many-branches,too-many-statements
    	lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0]
    	settings = json.loads(lecture['stream_settings'])
    	# Server that receives transcoded streams and generates HLS data, later
    	# (hopefully) overwritten with one of the source's ingestion servers to
    	# reduce the number of servers the stream' stability relies on
    	dest_server = 'rwth.video'
    	# Used by complex_live_transcode.c (ffworker) to open the sources and
    	# construct a ffmpeg filter graph <https://ffmpeg.org/ffmpeg-filters.html>:
    	#
    	# Audio graph
    	# src1 -> {src1.afilter} \
    	#                         amix -> {data.afilter} -> output
    	# src2 -> {src2.afilter} /
    	# Video graph
    	# src1 -> {src1.vfilter} \
    	#                         {vmix} -> scale=1920:1080 -> opt. logo overlay -> output
    	# src2 -> {src2.vfilter} /
    	data = {
    		'src1':
    		{
    			#'url': 'rtmp://...',
    			'afilter': [],
    			'vfilter': [],
    		},
    		'src2': {
    			#'url': 'rtmp://...',
    			'afilter': [],
    			'vfilter': [],
    		},
    		'afilter': [],
    		#'vmix': 'streamselect=map=0',
    		'videoag_logo': int(bool(settings.get('video_showlogo'))),
    		'lecture_id': lecture['id'],
    		#'destbase': 'rtmp://...'
    	}
    	# afilter/vfilter are lists here to simplify the code below and must be
    	# converted to a single filter expression afterwards.
    	src1 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source1')) or [{}])[0]
    	src2 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source2')) or [{}])[0]
    	for idx, src in zip([1, 2], [src1, src2]):
    		if src:
    			dest_server = src['server']
    			data['src%i'%idx]['url'] = 'rtmp://%s/src/%i'%(src['server'], src['id'])
    			if not src['clientid']:
    				flash('Quelle „%s“ ist nicht aktiv!'%src['name'])
    				return None
    		if settings.get('source%i_deinterlace'%idx):
    			data['src%i'%idx]['vfilter'].append('yadif')
    		mode = settings.get('source%i_audiomode'%idx)
    		leftvol = float(settings.get('source%i_leftvolume'%idx, 100))/100.0
    		rightvol = float(settings.get('source%i_rightvolume'%idx, 100))/100.0
    		if mode == 'mono':
    			data['src%i'%idx]['afilter'].append('pan=mono|c0=%f*c0+%f*c1'%(0.5*leftvol, 0.5*rightvol))
    		elif mode == 'stereo':
    			data['src%i'%idx]['afilter'].append('pan=stereo|c0=%f*c0|c1=%f*c1'%(leftvol, rightvol))
    		elif mode == 'unchanged':
    			pass
    		elif mode == 'off':
    			data['src%i'%idx]['afilter'].append('pan=mono|c0=0*c0')
    		else:
    			raise Exception()
    	data['destbase'] = 'rtmp://%s/hls/%i'%(dest_server, lecture['id'])
    	mode = settings.get('videomode')
    	if mode == '1':
    		data['vmix'] = 'streamselect=map=0'
    	elif mode == '2':
    		data['vmix'] = 'streamselect=map=1'
    	elif mode == 'lecture4:3':
    		data['src1']['vfilter'].append('scale=1440:1080')
    		data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080')
    		data['vmix'] = 'hstack'
    	elif mode == 'lecture16:9':
    		data['src1']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135')
    		data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080')
    		data['vmix'] = 'hstack'
    	elif mode == 'sidebyside':
    		data['src1']['vfilter'].append('scale=960:540')
    		data['src2']['vfilter'].append('scale=960:540')
    		data['vmix'] = 'hstack,pad=1920:1080:0:270'
    	if settings.get('audio_normalize'):
    		data['afilter'].append('loudnorm')
    	# Filter setup done, now lists of ffmpeg filter expressions must be
    	# converted to single expressions
    	def build_filter(exprs):
    		return ','.join(exprs) if exprs else None
    	data['afilter'] = build_filter(data['afilter'])
    	data['src1']['afilter'] = build_filter(data['src1']['afilter'])
    	data['src1']['vfilter'] = build_filter(data['src1']['vfilter'])
    	data['src2']['afilter'] = build_filter(data['src2']['afilter'])
    	data['src2']['vfilter'] = build_filter(data['src2']['vfilter'])
    	if lecture['stream_job']:
    		flash('Stream läuft bereits!')
    		return None
    	job_id = schedule_job('complex_live_transcode', data, priority=10)
    	modify('UPDATE lectures_data SET stream_job = ? WHERE id = ? AND stream_job IS NULL', job_id, lecture_id)
    	if query('SELECT stream_job FROM lectures WHERE id = ?', lecture_id)[0]['stream_job'] != job_id:
    		flash('Stream läuft bereits!')
    		cancel_job(job_id)
    		return None
    	return job_id
    
    @job_handler('complex_live_transcode', state='failed')
    def restart_failed_complex_live_transcode(id, type, data, state, status): # pylint: disable=unused-argument
    	restart_job(id)
    
    @job_handler('complex_live_transcode', state='failed')
    @job_handler('complex_live_transcode', state='finished')
    def cleanup_after_complex_live_transcode_ended(id, type, data, state, status): # pylint: disable=unused-argument
    	job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
    	if state == 'finished' or (state == 'failed' and job['canceled']):
    		modify('UPDATE lectures_data SET stream_job = NULL WHERE stream_job = ?', id)
    
    @app.route('/internal/streaming/control', methods=['POST'])
    @mod_required
    def control_stream():
    	action = request.values['action']
    	lecture_id = int(request.values['lecture_id'])
    	course = (query('SELECT courses.* FROM courses JOIN lectures ON (courses.id = lectures.course_id) WHERE lectures.id = ?', lecture_id) or [None])[0]
    	if action == 'start':
    		schedule_livestream(lecture_id)
    	elif action == 'stop':
    		lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0]
    		cancel_job(lecture['stream_job'])
    	return redirect(url_for('course', handle=course['handle']))