Select Git revision
chapters.py
Forked from
Video AG Infrastruktur / website
Source project has a limited visibility.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
livestreams.py 13.06 KiB
from xml.etree import ElementTree
import random
import string
from ipaddress import ip_address, ip_network
import json
import requests
from server import *
@sched_func(120)
def livestream_thumbnail():
livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active')
lectures = query('SELECT * FROM lectures WHERE stream_job IS NOT NULL')
for stream in genlive(livestreams)+genlive_new(lectures):
schedule_job('thumbnail', {'src': stream['path'], 'filename': 'l_%i.jpg'%stream['lecture_id']})
@app.route('/internal/streaming/legacy_auth', methods=['GET', 'POST'])
@app.route('/internal/streaming/legacy_auth/<server>', methods=['GET', 'POST'])
def streamauth_legacy(server=None):
# pylint: disable=too-many-branches,bare-except,chained-comparison
internal = False
if 'X-Real-IP' in request.headers:
for net in config.get('FSMPI_IP_RANGES', []):
if ip_address(request.headers['X-Real-IP']) in ip_network(net):
internal = True
if request.values['app'] != 'live':
return 'Bad request', 400
if not internal:
return 'Forbidden', 403
if request.values['call'] == 'publish':
if request.values['pass'] != 'caisoh8aht0wuSu':
return 'Forbidden', 403
matches = query('''SELECT lectures.*
FROM lectures
JOIN courses ON lectures.course_id = courses.id
WHERE courses.handle = ?
ORDER BY lectures.time DESC''', request.values['name'])
now = datetime.now()
match = {'id': -1}
for lecture in matches:
if lecture['time']-timedelta(minutes=30) <= now and \
now <= lecture['time']+timedelta(minutes=lecture['duration']):
match = lecture
break
if 'lecture' in request.values:
match = {'id': request.values['lecture']}
try:
modify("INSERT INTO streams (handle, active, visible, lecture_id, description, poster) VALUES (?, 0, 1, -1, "", "")", request.values['name'])
except:
pass
if server:
data = {'src': 'rtmp://%s/live/%s'%(server, request.values['name']),
'destbase': 'rtmp://%s/hls/%s'%(server, request.values['name'])}
job_id = schedule_job('simple_live_transcode', data, priority=10)
modify("UPDATE streams SET active = 1, lecture_id = ?, job_id = ? WHERE handle = ?",
match['id'], job_id, request.values['name'])
else:
modify("UPDATE streams SET active = 1, lecture_id = ? WHERE handle = ?",
match['id'], request.values['name'])
elif request.values['call'] == 'publish_done':
job_id = query('SELECT job_id FROM streams WHERE handle = ?', request.values['name'])[0]['job_id']
modify("UPDATE streams SET active = 0 WHERE handle = ?", request.values['name'])
if job_id:
cancel_job(job_id)
else:
return 'Bad request', 400
return 'OK', 200
@job_handler('simple_live_transcode', state='failed')
def restart_failed_live_transcode(id, type, data, state, status): # pylint: disable=unused-argument
restart_job(id)
@app.route('/internal/streaming')
@register_navbar('Streaming', icon='broadcast-tower', iconlib='fa')
@mod_required
def streaming():
# pylint: disable=invalid-name
sources = query('SELECT * FROM live_sources WHERE NOT deleted')
for source in sources:
if not source['clientid']:
continue
req = requests.get('http://%s:8080/stats'%source['server'])
if req.status_code != 200:
continue
source['stat'] = {}
tree = ElementTree.fromstring(req.text)
if not tree:
continue
s = tree.find("./server/application/[name='src']/live/stream/[name='%i']"%source['id'])
if not s:
continue
for e in s.find("client/[publishing='']").getchildren():
source['stat'][e.tag] = e.text
source['video'] = {}
for e in s.find('meta/video').getchildren():
source['video'][e.tag] = e.text
source['audio'] = {}
for e in s.find('meta/audio').getchildren():
source['audio'][e.tag] = e.text
return render_template("streaming.html", sources=sources)
def gentoken():
return ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(16))
@app.route('/internal/streaming/rekey/<int:id>')
@mod_required
def streamrekey(id):
modify('UPDATE live_sources SET `key` = ? WHERE id = ? AND NOT deleted', gentoken(), id)
source = query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id)[0]
flash('''Der Streamkey von <strong>{name}</strong> wurde neu generiert:
<span><input readonly type="text" style="width: 15em" value="{key}"></span><br>
Trage diesen Streamkey zusammen mit einem der folgenden Streamingserver in die Streamingsoftware ein:
<ul>
<li>{server}</li>
<li>{backup_server}</li>
</ul>Insgesamt sollte die Streaming-URL z.B. so aussehen:
<a href="{server}{key}">{server}{key}</a>'''.format(name=source['name'],
key=source['key'], server=config['STREAMING_SERVER'],
backup_server=config['BACKUP_STREAMING_SERVER']))
return redirect(url_for('streaming'))
@app.route('/internal/streaming/drop/<int:id>')
@mod_required
def streamdrop(id):
source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id) or [None])[0]
if not source:
if 'ref' in request.values:
flash('Streamquelle nicht gefunden')
return redirect(request.values['ref'])
else:
return 'Not found', 404
requests.get('http://%s:8080/control/drop/publisher?clientid=%i'%(source['server'], source['clientid']))
if 'ref' in request.values:
return redirect(request.values['ref'])
return 'Ok', 200
@sched_func(120)
def live_source_thumbnail():
sources = query('SELECT * FROM live_sources WHERE clientid IS NOT NULL')
for source in sources:
schedule_job('thumbnail', {'srcurl': 'rtmp://%s/src/%i'%(source['server'], source['id']), 'filename': 's_%i.jpg'%source['id']})
def ip_in_networks(ip, networks):
for net in networks:
if ip_address(ip) in ip_network(net):
return True
return False
@app.route('/internal/streaming/auth/<server>', methods=['GET', 'POST'])
def streamauth(server):
# pylint: disable=too-many-return-statements
if not ip_in_networks(request.headers['X-Real-IP'], config.get('FSMPI_IP_RANGES', [])):
return 'Forbidden', 403
# Sources publish their streams at rtmp://example.com/src/{key} and are
# the redirected to rtmp://example.com/src/{id} to hide the secret stream key
if request.values['call'] == 'publish':
sources = query('SELECT * FROM live_sources WHERE NOT deleted AND `key` = ?', request.values['name'])
if not sources:
return 'Not found', 404
modify('UPDATE live_sources SET server = ?, server_public = ?, clientid = ?, last_active = ?, preview_key = ? WHERE id = ?',
server, request.args.get('public_ip', server), request.values['clientid'],
datetime.now(), gentoken(), sources[0]['id'])
live_source_thumbnail()
ret = Response('Redirect', 301, {'Location': '%i'%sources[0]['id']})
ret.autocorrect_location_header = False
return ret
elif request.values['call'] == 'publish_done':
source = (query('SELECT * FROM live_sources WHERE server = ? AND clientid = ?', server, request.values['clientid']) or [None])[0]
modify('UPDATE live_sources SET server = NULL, clientid = NULL, preview_key = NULL, last_active = ? WHERE server = ? AND clientid = ?',
datetime.now(), server, request.values['clientid'])
if not source:
return 'Ok', 200
for lecture in query('SELECT * FROM lectures WHERE stream_job IS NOT NULL'):
settings = json.loads(lecture['stream_settings'])
if str(source['id']) in [str(settings.get('source1')), str(settings.get('source2'))]:
cancel_job(lecture['stream_job'])
return 'Ok', 200
elif request.values['call'] == 'play':
source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', request.values['name']) or [None])[0]
if not source:
return 'Not found', 404
if ip_in_networks(request.values['addr'], config.get('INTERNAL_IP_RANGES', [])):
return 'Ok', 200
if source['preview_key'] == request.values.get('preview_key'):
return 'Ok', 200
return 'Forbidden', 403
return 'Bad request', 400
def schedule_livestream(lecture_id):
# pylint: disable=too-many-branches,too-many-statements
lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0]
settings = json.loads(lecture['stream_settings'])
# Server that receives transcoded streams and generates HLS data, later
# (hopefully) overwritten with one of the source's ingestion servers to
# reduce the number of servers the stream' stability relies on
dest_server = 'rwth.video'
# Used by complex_live_transcode.c (ffworker) to open the sources and
# construct a ffmpeg filter graph <https://ffmpeg.org/ffmpeg-filters.html>:
#
# Audio graph
# src1 -> {src1.afilter} \
# amix -> {data.afilter} -> output
# src2 -> {src2.afilter} /
# Video graph
# src1 -> {src1.vfilter} \
# {vmix} -> scale=1920:1080 -> opt. logo overlay -> output
# src2 -> {src2.vfilter} /
data = {
'src1':
{
#'url': 'rtmp://...',
'afilter': [],
'vfilter': [],
},
'src2': {
#'url': 'rtmp://...',
'afilter': [],
'vfilter': [],
},
'afilter': [],
#'vmix': 'streamselect=map=0',
'videoag_logo': int(bool(settings.get('video_showlogo'))),
'lecture_id': lecture['id'],
#'destbase': 'rtmp://...'
}
# afilter/vfilter are lists here to simplify the code below and must be
# converted to a single filter expression afterwards.
src1 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source1')) or [{}])[0]
src2 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source2')) or [{}])[0]
for idx, src in zip([1, 2], [src1, src2]):
if src:
dest_server = src['server']
data['src%i'%idx]['url'] = 'rtmp://%s/src/%i'%(src['server'], src['id'])
if not src['clientid']:
flash('Quelle „%s“ ist nicht aktiv!'%src['name'])
return None
if settings.get('source%i_deinterlace'%idx):
data['src%i'%idx]['vfilter'].append('yadif')
mode = settings.get('source%i_audiomode'%idx)
leftvol = float(settings.get('source%i_leftvolume'%idx, 100))/100.0
rightvol = float(settings.get('source%i_rightvolume'%idx, 100))/100.0
if mode == 'mono':
data['src%i'%idx]['afilter'].append('pan=mono|c0=%f*c0+%f*c1'%(0.5*leftvol, 0.5*rightvol))
elif mode == 'stereo':
data['src%i'%idx]['afilter'].append('pan=stereo|c0=%f*c0|c1=%f*c1'%(leftvol, rightvol))
elif mode == 'unchanged':
pass
elif mode == 'off':
data['src%i'%idx]['afilter'].append('pan=mono|c0=0*c0')
else:
raise Exception()
data['destbase'] = 'rtmp://%s/hls/%i'%(dest_server, lecture['id'])
mode = settings.get('videomode')
if mode == '1':
data['vmix'] = 'streamselect=map=0'
elif mode == '2':
data['vmix'] = 'streamselect=map=1'
elif mode == 'lecture4:3':
data['src1']['vfilter'].append('scale=1440:1080')
data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080')
data['vmix'] = 'hstack'
elif mode == 'lecture16:9':
data['src1']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135')
data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080')
data['vmix'] = 'hstack'
elif mode == 'sidebyside':
data['src1']['vfilter'].append('scale=960:540')
data['src2']['vfilter'].append('scale=960:540')
data['vmix'] = 'hstack,pad=1920:1080:0:270'
if settings.get('audio_normalize'):
data['afilter'].append('loudnorm')
# Filter setup done, now lists of ffmpeg filter expressions must be
# converted to single expressions
def build_filter(exprs):
return ','.join(exprs) if exprs else None
data['afilter'] = build_filter(data['afilter'])
data['src1']['afilter'] = build_filter(data['src1']['afilter'])
data['src1']['vfilter'] = build_filter(data['src1']['vfilter'])
data['src2']['afilter'] = build_filter(data['src2']['afilter'])
data['src2']['vfilter'] = build_filter(data['src2']['vfilter'])
if lecture['stream_job']:
flash('Stream läuft bereits!')
return None
job_id = schedule_job('complex_live_transcode', data, priority=10)
modify('UPDATE lectures_data SET stream_job = ? WHERE id = ? AND stream_job IS NULL', job_id, lecture_id)
if query('SELECT stream_job FROM lectures WHERE id = ?', lecture_id)[0]['stream_job'] != job_id:
flash('Stream läuft bereits!')
cancel_job(job_id)
return None
return job_id
@job_handler('complex_live_transcode', state='failed')
def restart_failed_complex_live_transcode(id, type, data, state, status): # pylint: disable=unused-argument
restart_job(id)
@job_handler('complex_live_transcode', state='failed')
@job_handler('complex_live_transcode', state='finished')
def cleanup_after_complex_live_transcode_ended(id, type, data, state, status): # pylint: disable=unused-argument
job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
if state == 'finished' or (state == 'failed' and job['canceled']):
modify('UPDATE lectures_data SET stream_job = NULL WHERE stream_job = ?', id)
@app.route('/internal/streaming/control', methods=['POST'])
@mod_required
def control_stream():
action = request.values['action']
lecture_id = int(request.values['lecture_id'])
course = (query('SELECT courses.* FROM courses JOIN lectures ON (courses.id = lectures.course_id) WHERE lectures.id = ?', lecture_id) or [None])[0]
if action == 'start':
schedule_livestream(lecture_id)
elif action == 'stop':
lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0]
cancel_job(lecture['stream_job'])
return redirect(url_for('course', handle=course['handle']))