Select Git revision
.dockerignore
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
livestreams.py 5.80 KiB
from server import *
import requests
from xml.etree import ElementTree
import random
import string
@sched_func(120)
def livestream_thumbnail():
livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active')
for v in genlive(livestreams):
schedule_job('thumbnail', {'lectureid': str(v['lecture_id']), 'path': v['path']})
@app.route('/internal/streaming/legacy_auth', methods=['GET', 'POST'])
@app.route('/internal/streaming/legacy_auth/<server>', methods=['GET', 'POST'])
def streamauth_legacy(server=None):
internal = False
if 'X-Real-IP' in request.headers:
for net in config.get('FSMPI_IP_RANGES', []):
if ip_address(request.headers['X-Real-IP']) in ip_network(net):
internal = True
if request.values['app'] != 'live':
return 'Bad request', 400
if not internal:
return 'Forbidden', 403
if request.values['call'] == 'publish':
if request.values['pass'] != 'caisoh8aht0wuSu':
return 'Forbidden', 403
matches = query("SELECT lectures.* FROM lectures JOIN courses ON lectures.course_id = courses.id WHERE courses.handle = ? ORDER BY lectures.time DESC", request.values['name'])
now = datetime.now()
match = {'id': -1}
for lecture in matches:
if lecture['time']-timedelta(minutes=30) <= now and \
now <= lecture['time']+timedelta(minutes=lecture['duration']):
match = lecture
break
if 'lecture' in request.values:
match = {'id': request.values['lecture']}
try:
modify("INSERT INTO streams (handle, active, visible, lecture_id, description, poster) VALUES (?, 0, 1, -1, "", "")", request.values['name'])
except:
pass
if server:
data = {'src': 'rtmp://%s/live/%s'%(server, request.values['name']),
'destbase': 'rtmp://%s/hls/%s'%(server, request.values['name'])}
job_id = schedule_job('simple_live_transcode', data, priority=10)
modify("UPDATE streams SET active = 1, lecture_id = ?, job_id = ? WHERE handle = ?",
match['id'], job_id, request.values['name'])
else:
modify("UPDATE streams SET active = 1, lecture_id = ? WHERE handle = ?",
match['id'], request.values['name'])
elif request.values['call'] == 'publish_done':
job_id = query('SELECT job_id FROM streams WHERE handle = ?', request.values['name'])[0]['job_id']
modify("UPDATE streams SET active = 0 WHERE handle = ?", request.values['name'])
if job_id:
cancel_job(job_id)
else:
return 'Bad request', 400
return 'OK', 200
@job_handler('simple_live_transcode', state='failed')
def restart_failed_live_transcode(id, type, data, state, status):
restart_job(id)
@app.route('/internal/streaming')
#@register_navbar('Streaming', icon='transfer')
@mod_required
def streaming():
sources = query('SELECT * FROM live_sources WHERE NOT deleted')
for source in sources:
if not source['clientid']:
continue
r = requests.get('http://%s:8080/stats'%source['server'])
if r.status_code != 200:
continue
source['stat'] = {}
tree = ElementTree.fromstring(r.text)
if not tree:
continue
s = tree.find("./server/application/[name='src']/live/stream/[name='%i']"%source['id'])
for e in s.find("client/[publishing='']").getchildren():
source['stat'][e.tag] = e.text
source['video'] = {}
for e in s.find('meta/video').getchildren():
source['video'][e.tag] = e.text
source['audio'] = {}
for e in s.find('meta/audio').getchildren():
source['audio'][e.tag] = e.text
return render_template("streaming.html", sources=sources)
def gentoken():
return ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(16))
@app.route('/internal/streaming/rekey/<int:id>')
@mod_required
def streamrekey(id):
modify('UPDATE live_sources SET `key` = ? WHERE id = ? AND NOT deleted', gentoken(), id)
source = query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id)[0]
flash('Der Streamkey von <strong>'+source['name']+'</strong> wurde neu generiert: <span><input readonly type="text" style="width: 15em" value="'+source['key']+'"></span>')
return redirect(url_for('streaming'))
@app.route('/internal/streaming/drop/<int:id>')
@mod_required
def streamdrop(id):
source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id) or [None])[0]
if not source:
if 'ref' in request.values:
flash('Streamquelle nicht gefunden')
return redirect(request.values['ref'])
else:
return 'Not found', 404
requests.get('http://%s:8080/control/drop/publisher?clientid=%i'%(source['server'], source['clientid']))
if 'ref' in request.values:
return redirect(request.values['ref'])
return 'Ok', 200
@app.route('/internal/streaming/auth/<server>', methods=['GET', 'POST'])
def streamauth(server):
internal = False
for net in config.get('FSMPI_IP_RANGES', []):
if ip_address(request.headers['X-Real-IP']) in ip_network(net):
internal = True
if not internal:
return 'Forbidden', 403
if request.values['call'] == 'publish':
sources = query('SELECT * FROM live_sources WHERE NOT deleted AND `key` = ?', request.values['name'])
if not sources:
return 'Not found', 404
modify('UPDATE live_sources SET server = ?, clientid = ?, last_active = ?, preview_key = ? WHERE id = ?', server, request.values['clientid'], datetime.now(), gentoken(), sources[0]['id'])
ret = Response('Redirect', 301, {'Location': '%i'%sources[0]['id']})
ret.autocorrect_location_header = False
return ret
if request.values['call'] == 'play':
source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', request.values['name']) or [None])[0]
if not source:
return 'Not found', 404
if source['preview_key'] != request.values.get('preview_key'):
return 'Forbidden', 403
return 'Ok', 200
elif request.values['call'] == 'publish_done':
modify('UPDATE live_sources SET server = NULL, clientid = NULL, preview_key = NULL WHERE server = ? AND clientid = ?', server, request.values['clientid'])
return 'Ok', 200
return 'Bad request', 400