livestreams.py 5.8 KB
Newer Older
1
from server import *
Julian Rother's avatar
Julian Rother committed
2
3
4
5
6
import requests
from xml.etree import ElementTree
import random
import string

7
@sched_func(120)
8
def livestream_thumbnail():
Andreas Valder's avatar
Andreas Valder committed
9
	livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active')
10
	for v in genlive(livestreams):
Julian Rother's avatar
Julian Rother committed
11
		schedule_job('thumbnail', {'lectureid': str(v['lecture_id']), 'path': v['path']})
12

13
@app.route('/internal/streaming/legacy_auth', methods=['GET', 'POST'])
14
@app.route('/internal/streaming/legacy_auth/<server>', methods=['GET', 'POST'])
Julian Rother's avatar
Julian Rother committed
15
def streamauth_legacy(server=None):
16
	internal = False
17
18
19
20
	if 'X-Real-IP' in request.headers:
		for net in config.get('FSMPI_IP_RANGES', []):
			if ip_address(request.headers['X-Real-IP']) in ip_network(net):
				internal = True
21
22
23
24
25
	if request.values['app'] != 'live':
		return 'Bad request', 400
	if not internal:
		return 'Forbidden', 403
	if request.values['call'] == 'publish':
26
27
		if request.values['pass'] != 'caisoh8aht0wuSu':
			return 'Forbidden', 403
28
29
30
31
32
33
34
35
36
37
38
39
40
41
		matches = query("SELECT lectures.* FROM lectures JOIN courses ON lectures.course_id = courses.id WHERE courses.handle = ? ORDER BY lectures.time DESC", request.values['name'])
		now = datetime.now()
		match = {'id': -1}
		for lecture in matches:
			if lecture['time']-timedelta(minutes=30) <= now and \
				now <= lecture['time']+timedelta(minutes=lecture['duration']):
				match = lecture
				break
		if 'lecture' in request.values:
			match = {'id': request.values['lecture']}
		try:
			modify("INSERT INTO streams (handle, active, visible, lecture_id, description, poster) VALUES (?, 0, 1, -1, "", "")", request.values['name'])
		except:
			pass
42
43
44
45
46
47
48
49
50
		if server:
			data = {'src': 'rtmp://%s/live/%s'%(server, request.values['name']),
				'destbase': 'rtmp://%s/hls/%s'%(server, request.values['name'])}
			job_id = schedule_job('simple_live_transcode', data, priority=10)
			modify("UPDATE streams SET active = 1, lecture_id = ?, job_id = ? WHERE handle = ?",
					match['id'], job_id, request.values['name'])
		else:
			modify("UPDATE streams SET active = 1, lecture_id = ? WHERE handle = ?",
					match['id'], request.values['name'])
51
	elif request.values['call'] == 'publish_done':
52
		job_id = query('SELECT job_id FROM streams WHERE handle = ?', request.values['name'])[0]['job_id']
53
		modify("UPDATE streams SET active = 0 WHERE handle = ?", request.values['name'])
54
55
		if job_id:
			cancel_job(job_id)
56
57
58
59
	else:
		return 'Bad request', 400
	return 'OK', 200

60
61
62
63
@job_handler('simple_live_transcode', state='failed')
def restart_failed_live_transcode(id, type, data, state, status):
	restart_job(id)

Julian Rother's avatar
Julian Rother committed
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
@app.route('/internal/streaming')
#@register_navbar('Streaming', icon='transfer')
@mod_required
def streaming():
	sources = query('SELECT * FROM live_sources WHERE NOT deleted')
	for source in sources:
		if not source['clientid']:
			continue
		r = requests.get('http://%s:8080/stats'%source['server'])
		if r.status_code != 200:
			continue
		source['stat'] = {}
		tree = ElementTree.fromstring(r.text)
		if not tree:
			continue
		s = tree.find("./server/application/[name='src']/live/stream/[name='%i']"%source['id'])
		for e in s.find("client/[publishing='']").getchildren():
			source['stat'][e.tag] = e.text
		source['video'] = {}
		for e in s.find('meta/video').getchildren():
			source['video'][e.tag] = e.text
		source['audio'] = {}
		for e in s.find('meta/audio').getchildren():
			source['audio'][e.tag] = e.text
	return render_template("streaming.html", sources=sources)

def gentoken():
	return ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(16))

@app.route('/internal/streaming/rekey/<int:id>')
@mod_required
def streamrekey(id):
	modify('UPDATE live_sources SET key = ? WHERE id = ? AND NOT deleted', gentoken(), id)
	source = query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id)[0]
	flash('Der Streamkey von <strong>'+source['name']+'</strong> wurde neu generiert: <span><input readonly type="text" style="width: 15em" value="'+source['key']+'"></span>')
	return redirect(url_for('streaming'))

@app.route('/internal/streaming/drop/<int:id>')
@mod_required
def streamdrop(id):
	source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id) or [None])[0]
	if not source:
		if 'ref' in request.values:
			flash('Streamquelle nicht gefunden')
			return redirect(request.values['ref'])
		else:
			return 'Not found', 404
	requests.get('http://%s:8080/control/drop/publisher?clientid=%i'%(source['server'], source['clientid']))
	if 'ref' in request.values:
		return redirect(request.values['ref'])
	return 'Ok', 200

@app.route('/internal/streaming/auth/<server>', methods=['GET', 'POST'])
def streamauth(server):
	internal = False
	for net in config.get('INTERNAL_IP_RANGES', []):
		if ip_address(request.headers['X-Real-IP']) in ip_network(net):
			internal = True
	if not internal:
		return 'Forbidden', 403
	if request.values['call'] == 'publish':
		sources = query('SELECT * FROM live_sources WHERE NOT deleted AND key = ?', request.values['name'])
		if not sources:
			return 'Not found', 404
		modify('UPDATE live_sources SET server = ?, clientid = ?, last_active = ?, preview_key = ? WHERE id = ?', server, request.values['clientid'], datetime.now(), gentoken(), sources[0]['id'])
		ret = Response('Redirect', 301, {'Location': '%i'%sources[0]['id']})
		ret.autocorrect_location_header = False
		return ret
	if request.values['call'] == 'play':
		source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', request.values['name']) or [None])[0]
		if not source:
			return 'Not found', 404
		if source['preview_key'] != request.values.get('preview_key'):
			return 'Forbidden', 403
		return 'Ok', 200
	elif request.values['call'] == 'publish_done':
		modify('UPDATE live_sources SET server = NULL, clientid = NULL, preview_key = NULL WHERE server = ? AND clientid = ?', server, request.values['clientid'])
		return 'Ok', 200
	return 'Bad request', 400