Select Git revision
common_functions.sh
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
livestreams.py 13.06 KiB
from xml.etree import ElementTree
import random
import string
from ipaddress import ip_address, ip_network
import json
import requests
from server import *
@sched_func(120)
def livestream_thumbnail():
livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active')
lectures = query('SELECT * FROM lectures WHERE stream_job IS NOT NULL')
for stream in genlive(livestreams)+genlive_new(lectures):
schedule_job('thumbnail', {'src': stream['path'], 'filename': 'l_%i.jpg'%stream['lecture_id']})
@app.route('/internal/streaming/legacy_auth', methods=['GET', 'POST'])
@app.route('/internal/streaming/legacy_auth/<server>', methods=['GET', 'POST'])
def streamauth_legacy(server=None):
# pylint: disable=too-many-branches,bare-except,chained-comparison
internal = False
if 'X-Real-IP' in request.headers:
for net in config.get('FSMPI_IP_RANGES', []):
if ip_address(request.headers['X-Real-IP']) in ip_network(net):
internal = True
if request.values['app'] != 'live':
return 'Bad request', 400
if not internal:
return 'Forbidden', 403
if request.values['call'] == 'publish':
if request.values['pass'] != 'caisoh8aht0wuSu':
return 'Forbidden', 403
matches = query('''SELECT lectures.*
FROM lectures
JOIN courses ON lectures.course_id = courses.id
WHERE courses.handle = ?
ORDER BY lectures.time DESC''', request.values['name'])
now = datetime.now()
match = {'id': -1}
for lecture in matches:
if lecture['time']-timedelta(minutes=30) <= now and \
now <= lecture['time']+timedelta(minutes=lecture['duration']):
match = lecture
break
if 'lecture' in request.values:
match = {'id': request.values['lecture']}
try:
modify("INSERT INTO streams (handle, active, visible, lecture_id, description, poster) VALUES (?, 0, 1, -1, "", "")", request.values['name'])
except:
pass
if server:
data = {'src': 'rtmp://%s/live/%s'%(server, request.values['name']),
'destbase': 'rtmp://%s/hls/%s'%(server, request.values['name'])}
job_id = schedule_job('simple_live_transcode', data, priority=10)
modify("UPDATE streams SET active = 1, lecture_id = ?, job_id = ? WHERE handle = ?",
match['id'], job_id, request.values['name'])
else:
modify("UPDATE streams SET active = 1, lecture_id = ? WHERE handle = ?",
match['id'], request.values['name'])
elif request.values['call'] == 'publish_done':
job_id = query('SELECT job_id FROM streams WHERE handle = ?', request.values['name'])[0]['job_id']
modify("UPDATE streams SET active = 0 WHERE handle = ?", request.values['name'])
if job_id:
cancel_job(job_id)
else:
return 'Bad request', 400
return 'OK', 200
@job_handler('simple_live_transcode', state='failed')
def restart_failed_live_transcode(id, type, data, state, status): # pylint: disable=unused-argument
restart_job(id)
@app.route('/internal/streaming')
@register_navbar('Streaming', icon='broadcast-tower', iconlib='fa')
@mod_required
def streaming():
# pylint: disable=invalid-name
sources = query('SELECT * FROM live_sources WHERE NOT deleted')
for source in sources:
if not source['clientid']:
continue
req = requests.get('http://%s:8080/stats'%source['server'])
if req.status_code != 200:
continue
source['stat'] = {}
tree = ElementTree.fromstring(req.text)
if not tree:
continue
s = tree.find("./server/application/[name='src']/live/stream/[name='%i']"%source['id'])
if not s:
continue
for e in s.find("client/[publishing='']").getchildren():
source['stat'][e.tag] = e.text
source['video'] = {}
for e in s.find('meta/video').getchildren():
source['video'][e.tag] = e.text
source['audio'] = {}
for e in s.find('meta/audio').getchildren():
source['audio'][e.tag] = e.text
return render_template("streaming.html", sources=sources)
def gentoken():
return ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(16))
@app.route('/internal/streaming/rekey/<int:id>')
@mod_required
def streamrekey(id):
modify('UPDATE live_sources SET `key` = ? WHERE id = ? AND NOT deleted', gentoken(), id)
source = query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id)[0]
flash('''Der Streamkey von <strong>{name}</strong> wurde neu generiert:
<span><input readonly type="text" style="width: 15em" value="{key}"></span><br>
Trage diesen Streamkey zusammen mit einem der folgenden Streamingserver in die Streamingsoftware ein:
<ul>
<li>{server}</li>
<li>{backup_server}</li>
</ul>Insgesamt sollte die Streaming-URL z.B. so aussehen:
<a href="{server}{key}">{server}{key}</a>'''.format(name=source['name'],
key=source['key'], server=config['STREAMING_SERVER'],
backup_server=config['BACKUP_STREAMING_SERVER']))
return redirect(url_for('streaming'))
@app.route('/internal/streaming/drop/<int:id>')
@mod_required
def streamdrop(id):
source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id) or [None])[0]
if not source:
if 'ref' in request.values:
flash('Streamquelle nicht gefunden')
return redirect(request.values['ref'])
else:
return 'Not found', 404
requests.get('http://%s:8080/control/drop/publisher?clientid=%i'%(source['server'], source['clientid']))
if 'ref' in request.values:
return redirect(request.values['ref'])
return 'Ok', 200
@sched_func(120)
def live_source_thumbnail():
sources = query('SELECT * FROM live_sources WHERE clientid IS NOT NULL')
for source in sources:
schedule_job('thumbnail', {'srcurl': 'rtmp://%s/src/%i'%(source['server'], source['id']), 'filename': 's_%i.jpg'%source['id']})
def ip_in_networks(ip, networks):
for net in networks:
if ip_address(ip) in ip_network(net):
return True
return False
@app.route('/internal/streaming/auth/<server>', methods=['GET', 'POST'])
def streamauth(server):
# pylint: disable=too-many-return-statements
if not ip_in_networks(request.headers['X-Real-IP'], config.get('FSMPI_IP_RANGES', [])):
return 'Forbidden', 403
# Sources publish their streams at rtmp://example.com/src/{key} and are
# the redirected to rtmp://example.com/src/{id} to hide the secret stream key
if request.values['call'] == 'publish':
sources = query('SELECT * FROM live_sources WHERE NOT deleted AND `key` = ?', request.values['name'])
if not sources:
return 'Not found', 404
modify('UPDATE live_sources SET server = ?, server_public = ?, clientid = ?, last_active = ?, preview_key = ? WHERE id = ?',
server, request.args.get('public_ip', server), request.values['clientid'],
datetime.now(), gentoken(), sources[0]['id'])
live_source_thumbnail()
ret = Response('Redirect', 301, {'Location': '%i'%sources[0]['id']})
ret.autocorrect_location_header = False
return ret
elif request.values['call'] == 'publish_done':
source = (query('SELECT * FROM live_sources WHERE server = ? AND clientid = ?', server, request.values['clientid']) or [None])[0]
modify('UPDATE live_sources SET server = NULL, clientid = NULL, preview_key = NULL, last_active = ? WHERE server = ? AND clientid = ?',
datetime.now(), server, request.values['clientid'])
if not source:
return 'Ok', 200
for lecture in query('SELECT * FROM lectures WHERE stream_job IS NOT NULL'):
settings = json.loads(lecture['stream_settings'])
if str(source['id']) in [str(settings.get('source1')), str(settings.get('source2'))]:
cancel_job(lecture['stream_job'])
return 'Ok', 200
elif request.values['call'] == 'play':
source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', request.values['name']) or [None])[0]
if not source:
return 'Not found', 404
if ip_in_networks(request.values['addr'], config.get('INTERNAL_IP_RANGES', [])):
return 'Ok', 200
if source['preview_key'] == request.values.get('preview_key'):
return 'Ok', 200
return 'Forbidden', 403
return 'Bad request', 400
def schedule_livestream(lecture_id):
# pylint: disable=too-many-branches,too-many-statements
lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0]
settings = json.loads(lecture['stream_settings'])
# Server that receives transcoded streams and generates HLS data, later
# (hopefully) overwritten with one of the source's ingestion servers to
# reduce the number of servers the stream' stability relies on
dest_server = 'rwth.video'
# Used by complex_live_transcode.c (ffworker) to open the sources and
# construct a ffmpeg filter graph <https://ffmpeg.org/ffmpeg-filters.html>:
#
# Audio graph
# src1 -> {src1.afilter} \
# amix -> {data.afilter} -> output
# src2 -> {src2.afilter} /
# Video graph
# src1 -> {src1.vfilter} \
# {vmix} -> scale=1920:1080 -> opt. logo overlay -> output
# src2 -> {src2.vfilter} /
data = {
'src1':
{
#'url': 'rtmp://...',
'afilter': [],
'vfilter': [],
},
'src2': {
#'url': 'rtmp://...',
'afilter': [],
'vfilter': [],
},
'afilter': [],
#'vmix': 'streamselect=map=0',
'videoag_logo': int(bool(settings.get('video_showlogo'))),
'lecture_id': lecture['id'],
#'destbase': 'rtmp://...'
}
# afilter/vfilter are lists here to simplify the code below and must be
# converted to a single filter expression afterwards.
src1 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source1')) or [{}])[0]
src2 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source2')) or [{}])[0]
for idx, src in zip([1, 2], [src1, src2]):
if src:
dest_server = src['server']
data['src%i'%idx]['url'] = 'rtmp://%s/src/%i'%(src['server'], src['id'])
if not src['clientid']:
flash('Quelle „%s“ ist nicht aktiv!'%src['name'])
return None
if settings.get('source%i_deinterlace'%idx):
data['src%i'%idx]['vfilter'].append('yadif')
mode = settings.get('source%i_audiomode'%idx)
leftvol = float(settings.get('source%i_leftvolume'%idx, 100))/100.0
rightvol = float(settings.get('source%i_rightvolume'%idx, 100))/100.0
if mode == 'mono':
data['src%i'%idx]['afilter'].append('pan=mono|c0=%f*c0+%f*c1'%(0.5*leftvol, 0.5*rightvol))
elif mode == 'stereo':
data['src%i'%idx]['afilter'].append('pan=stereo|c0=%f*c0|c1=%f*c1'%(leftvol, rightvol))
elif mode == 'unchanged':
pass
elif mode == 'off':
data['src%i'%idx]['afilter'].append('pan=mono|c0=0*c0')
else:
raise Exception()
data['destbase'] = 'rtmp://%s/hls/%i'%(dest_server, lecture['id'])
mode = settings.get('videomode')
if mode == '1':
data['vmix'] = 'streamselect=map=0'
elif mode == '2':
data['vmix'] = 'streamselect=map=1'
elif mode == 'lecture4:3':
data['src1']['vfilter'].append('scale=1440:1080')
data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080')
data['vmix'] = 'hstack'
elif mode == 'lecture16:9':
data['src1']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135')
data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080')
data['vmix'] = 'hstack'
elif mode == 'sidebyside':
data['src1']['vfilter'].append('scale=960:540')
data['src2']['vfilter'].append('scale=960:540')
data['vmix'] = 'hstack,pad=1920:1080:0:270'
if settings.get('audio_normalize'):
data['afilter'].append('loudnorm')
# Filter setup done, now lists of ffmpeg filter expressions must be
# converted to single expressions
def build_filter(exprs):
return ','.join(exprs) if exprs else None
data['afilter'] = build_filter(data['afilter'])
data['src1']['afilter'] = build_filter(data['src1']['afilter'])
data['src1']['vfilter'] = build_filter(data['src1']['vfilter'])
data['src2']['afilter'] = build_filter(data['src2']['afilter'])
data['src2']['vfilter'] = build_filter(data['src2']['vfilter'])
if lecture['stream_job']:
flash('Stream läuft bereits!')
return None
job_id = schedule_job('complex_live_transcode', data, priority=10)
modify('UPDATE lectures_data SET stream_job = ? WHERE id = ? AND stream_job IS NULL', job_id, lecture_id)
if query('SELECT stream_job FROM lectures WHERE id = ?', lecture_id)[0]['stream_job'] != job_id:
flash('Stream läuft bereits!')
cancel_job(job_id)
return None
return job_id
@job_handler('complex_live_transcode', state='failed')
def restart_failed_complex_live_transcode(id, type, data, state, status): # pylint: disable=unused-argument
restart_job(id)
@job_handler('complex_live_transcode', state='failed')
@job_handler('complex_live_transcode', state='finished')
def cleanup_after_complex_live_transcode_ended(id, type, data, state, status): # pylint: disable=unused-argument
job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
if state == 'finished' or (state == 'failed' and job['canceled']):
modify('UPDATE lectures_data SET stream_job = NULL WHERE stream_job = ?', id)
@app.route('/internal/streaming/control', methods=['POST'])
@mod_required
def control_stream():
action = request.values['action']
lecture_id = int(request.values['lecture_id'])
course = (query('SELECT courses.* FROM courses JOIN lectures ON (courses.id = lectures.course_id) WHERE lectures.id = ?', lecture_id) or [None])[0]
if action == 'start':
schedule_livestream(lecture_id)
elif action == 'stop':
lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0]
cancel_job(lecture['stream_job'])
return redirect(url_for('course', handle=course['handle']))