livestreams.py 11.1 KB
Newer Older
1
from server import *
Julian Rother's avatar
Julian Rother committed
2
3
4
5
6
import requests
from xml.etree import ElementTree
import random
import string

7
@sched_func(120)
8
def livestream_thumbnail():
Andreas Valder's avatar
Andreas Valder committed
9
	livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active')
10
11
	lectures = query('SELECT * FROM lectures WHERE stream_job IS NOT NULL')
	for v in genlive(livestreams)+genlive_new(lectures):
Julian Rother's avatar
Julian Rother committed
12
		schedule_job('thumbnail', {'src': v['path'], 'filename': 'l_%i.jpg'%v['lecture_id']})
13

14
@app.route('/internal/streaming/legacy_auth', methods=['GET', 'POST'])
15
@app.route('/internal/streaming/legacy_auth/<server>', methods=['GET', 'POST'])
Julian Rother's avatar
Julian Rother committed
16
def streamauth_legacy(server=None):
17
	internal = False
18
19
20
21
	if 'X-Real-IP' in request.headers:
		for net in config.get('FSMPI_IP_RANGES', []):
			if ip_address(request.headers['X-Real-IP']) in ip_network(net):
				internal = True
22
23
24
25
26
	if request.values['app'] != 'live':
		return 'Bad request', 400
	if not internal:
		return 'Forbidden', 403
	if request.values['call'] == 'publish':
27
28
		if request.values['pass'] != 'caisoh8aht0wuSu':
			return 'Forbidden', 403
29
30
31
32
33
34
35
36
37
38
39
40
41
42
		matches = query("SELECT lectures.* FROM lectures JOIN courses ON lectures.course_id = courses.id WHERE courses.handle = ? ORDER BY lectures.time DESC", request.values['name'])
		now = datetime.now()
		match = {'id': -1}
		for lecture in matches:
			if lecture['time']-timedelta(minutes=30) <= now and \
				now <= lecture['time']+timedelta(minutes=lecture['duration']):
				match = lecture
				break
		if 'lecture' in request.values:
			match = {'id': request.values['lecture']}
		try:
			modify("INSERT INTO streams (handle, active, visible, lecture_id, description, poster) VALUES (?, 0, 1, -1, "", "")", request.values['name'])
		except:
			pass
43
44
45
46
47
48
49
50
51
		if server:
			data = {'src': 'rtmp://%s/live/%s'%(server, request.values['name']),
				'destbase': 'rtmp://%s/hls/%s'%(server, request.values['name'])}
			job_id = schedule_job('simple_live_transcode', data, priority=10)
			modify("UPDATE streams SET active = 1, lecture_id = ?, job_id = ? WHERE handle = ?",
					match['id'], job_id, request.values['name'])
		else:
			modify("UPDATE streams SET active = 1, lecture_id = ? WHERE handle = ?",
					match['id'], request.values['name'])
52
	elif request.values['call'] == 'publish_done':
53
		job_id = query('SELECT job_id FROM streams WHERE handle = ?', request.values['name'])[0]['job_id']
54
		modify("UPDATE streams SET active = 0 WHERE handle = ?", request.values['name'])
55
56
		if job_id:
			cancel_job(job_id)
57
58
59
60
	else:
		return 'Bad request', 400
	return 'OK', 200

61
62
63
64
@job_handler('simple_live_transcode', state='failed')
def restart_failed_live_transcode(id, type, data, state, status):
	restart_job(id)

Julian Rother's avatar
Julian Rother committed
65
@app.route('/internal/streaming')
66
@register_navbar('Streaming', icon='broadcast-tower', iconlib='fa')
Julian Rother's avatar
Julian Rother committed
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@mod_required
def streaming():
	sources = query('SELECT * FROM live_sources WHERE NOT deleted')
	for source in sources:
		if not source['clientid']:
			continue
		r = requests.get('http://%s:8080/stats'%source['server'])
		if r.status_code != 200:
			continue
		source['stat'] = {}
		tree = ElementTree.fromstring(r.text)
		if not tree:
			continue
		s = tree.find("./server/application/[name='src']/live/stream/[name='%i']"%source['id'])
81
82
		if not s:
			continue
Julian Rother's avatar
Julian Rother committed
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
		for e in s.find("client/[publishing='']").getchildren():
			source['stat'][e.tag] = e.text
		source['video'] = {}
		for e in s.find('meta/video').getchildren():
			source['video'][e.tag] = e.text
		source['audio'] = {}
		for e in s.find('meta/audio').getchildren():
			source['audio'][e.tag] = e.text
	return render_template("streaming.html", sources=sources)

def gentoken():
	return ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(16))

@app.route('/internal/streaming/rekey/<int:id>')
@mod_required
def streamrekey(id):
99
	modify('UPDATE live_sources SET `key` = ? WHERE id = ? AND NOT deleted', gentoken(), id)
Julian Rother's avatar
Julian Rother committed
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
	source = query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id)[0]
	flash('Der Streamkey von <strong>'+source['name']+'</strong> wurde neu generiert: <span><input readonly type="text" style="width: 15em" value="'+source['key']+'"></span>')
	return redirect(url_for('streaming'))

@app.route('/internal/streaming/drop/<int:id>')
@mod_required
def streamdrop(id):
	source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id) or [None])[0]
	if not source:
		if 'ref' in request.values:
			flash('Streamquelle nicht gefunden')
			return redirect(request.values['ref'])
		else:
			return 'Not found', 404
	requests.get('http://%s:8080/control/drop/publisher?clientid=%i'%(source['server'], source['clientid']))
	if 'ref' in request.values:
		return redirect(request.values['ref'])
	return 'Ok', 200

119
120
121
122
@sched_func(120)
def live_source_thumbnail():
	sources = query('SELECT * FROM live_sources WHERE clientid IS NOT NULL')
	for source in sources:
Julian Rother's avatar
Julian Rother committed
123
		schedule_job('thumbnail', {'srcurl': 'rtmp://%s/src/%i'%(source['server'], source['id']), 'filename': 's_%i.jpg'%source['id']})
124

Julian Rother's avatar
Julian Rother committed
125
126
127
@app.route('/internal/streaming/auth/<server>', methods=['GET', 'POST'])
def streamauth(server):
	internal = False
128
	for net in config.get('FSMPI_IP_RANGES', []):
Julian Rother's avatar
Julian Rother committed
129
130
131
132
133
		if ip_address(request.headers['X-Real-IP']) in ip_network(net):
			internal = True
	if not internal:
		return 'Forbidden', 403
	if request.values['call'] == 'publish':
134
		sources = query('SELECT * FROM live_sources WHERE NOT deleted AND `key` = ?', request.values['name'])
Julian Rother's avatar
Julian Rother committed
135
136
		if not sources:
			return 'Not found', 404
137
		modify('UPDATE live_sources SET server = ?, server_public = ?, clientid = ?, last_active = ?, preview_key = ? WHERE id = ?', server, request.args.get('public_ip', server), request.values['clientid'], datetime.now(), gentoken(), sources[0]['id'])
138
		live_source_thumbnail()
Julian Rother's avatar
Julian Rother committed
139
140
141
142
143
144
145
		ret = Response('Redirect', 301, {'Location': '%i'%sources[0]['id']})
		ret.autocorrect_location_header = False
		return ret
	if request.values['call'] == 'play':
		source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', request.values['name']) or [None])[0]
		if not source:
			return 'Not found', 404
146
147
148
149
150
151
		for net in config.get('INTERNAL_IP_RANGES', []):
			if ip_address(request.values['addr']) in ip_network(net):
				return 'Ok', 200
		if source['preview_key'] == request.values.get('preview_key'):
			return 'Ok', 200
		return 'Forbidden', 403
Julian Rother's avatar
Julian Rother committed
152
	elif request.values['call'] == 'publish_done':
153
		source = (query('SELECT * FROM live_sources WHERE server = ? AND clientid = ?', server, request.values['clientid']) or [None])[0]
154
		modify('UPDATE live_sources SET server = NULL, clientid = NULL, preview_key = NULL, last_active = ? WHERE server = ? AND clientid = ?', datetime.now(), server, request.values['clientid'])
155
156
		if not source:
			return 'Ok', 200
Julian Rother's avatar
Julian Rother committed
157
		for lecture in query('SELECT * FROM lectures WHERE stream_job IS NOT NULL'):
158
			settings = json.loads(lecture['stream_settings'])
Julian Rother's avatar
Julian Rother committed
159
			if str(source['id']) in [str(settings.get('source1')), str(settings.get('source2'))]:
160
				cancel_job(lecture['stream_job'])
Julian Rother's avatar
Julian Rother committed
161
162
		return 'Ok', 200
	return 'Bad request', 400
163
164
165
166
167
168
169
170
171
172
173
174
175
176

def schedule_livestream(lecture_id):
	def build_filter(l):
		return ','.join(l) if l else None
	server = 'rwth.video'
	lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0]
	settings = json.loads(lecture['stream_settings'])
	data = {'src1': {'afilter': [], 'vfilter': []}, 'src2': {'afilter': [], 'vfilter': []}, 'afilter': [], 'videoag_logo': int(bool(settings.get('video_showlogo'))), 'lecture_id': lecture['id']}
	src1 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source1')) or [{}])[0]
	src2 = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', settings.get('source2')) or [{}])[0]
	for idx, obj in zip([1,2], [src1, src2]):
		if obj:
			server = obj['server']
			data['src%i'%idx]['url'] = 'rtmp://%s/src/%i'%(obj['server'], obj['id'])
177
178
179
			if not obj['clientid']:
				flash('Quelle „%s“ ist nicht aktiv!'%obj['name'])
				return None
180
181
		if settings.get('source%i_deinterlace'%idx):
			data['src%i'%idx]['vfilter'].append('yadif')
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
		mode = settings.get('source%i_audiomode'%idx)
		leftvol = float(settings.get('source%i_leftvolume'%idx, 100))/100.0
		rightvol = float(settings.get('source%i_rightvolume'%idx, 100))/100.0
		if mode == 'mono':
			data['src%i'%idx]['afilter'].append('pan=mono|c0=%f*c0+%f*c1'%(0.5*leftvol, 0.5*rightvol))
		elif mode == 'stereo':
			data['src%i'%idx]['afilter'].append('pan=stereo|c0=%f*c0|c1=%f*c1'%(leftvol, rightvol))
		elif mode == 'unchanged':
			pass
		elif mode == 'off':
			data['src%i'%idx]['afilter'].append('pan=mono|c0=0*c0')
		else:
			raise(Exception())
	mode = settings.get('videomode')
	if mode == '1':
		data['vmix'] = 'streamselect=map=0'
	elif mode == '2':
		data['vmix'] = 'streamselect=map=1'
Julian Rother's avatar
Julian Rother committed
200
	elif mode == 'lecture4:3':
201
202
203
		data['src1']['vfilter'].append('scale=1440:1080')
		data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080')
		data['vmix'] = 'hstack'
Julian Rother's avatar
Julian Rother committed
204
	elif mode == 'lecture16:9':
205
206
207
		data['src1']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135')
		data['src2']['vfilter'].append('scale=1440:810,pad=1440:1080:0:135,crop=480:1080')
		data['vmix'] = 'hstack'
Julian Rother's avatar
Julian Rother committed
208
	elif mode == 'sidebyside':
209
210
211
212
213
214
215
216
217
218
		data['src1']['vfilter'].append('scale=960:540')
		data['src2']['vfilter'].append('scale=960:540')
		data['vmix'] = 'hstack,pad=1920:1080:0:270'
	if settings.get('audio_normalize'):
		data['afilter'].append('loudnorm')
	data['afilter'] = build_filter(data['afilter'])
	data['src1']['afilter'] = build_filter(data['src1']['afilter'])
	data['src1']['vfilter'] = build_filter(data['src1']['vfilter'])
	data['src2']['afilter'] = build_filter(data['src2']['afilter'])
	data['src2']['vfilter'] = build_filter(data['src2']['vfilter'])
Julian Rother's avatar
Julian Rother committed
219
	data['destbase'] = 'rtmp://%s/hls/%i'%(server, lecture['id'])
220
221
222
	if lecture['stream_job']:
		flash('Stream läuft bereits!')
		return None
223
	job_id = schedule_job('complex_live_transcode', data, priority=10)
224
225
226
227
228
	modify('UPDATE lectures_data SET stream_job = ? WHERE id = ? AND stream_job IS NULL', job_id, lecture_id)
	if query('SELECT stream_job FROM lectures WHERE id = ?', lecture_id)[0]['stream_job'] != job_id:
		flash('Stream läuft bereits!')
		cancel_job(job_id)
		return None
229
230
	return job_id

231
232
233
234
@job_handler('complex_live_transcode', state='failed')
def restart_failed_complex_live_transcode(id, type, data, state, status):
	restart_job(id)

Julian Rother's avatar
Julian Rother committed
235
236
@job_handler('complex_live_transcode', state='failed')
@job_handler('complex_live_transcode', state='finished')
237
238
239
240
241
242
def cleanup_after_complex_live_transcode_ended(id, type, data, state, status):
	job = query('SELECT * FROM jobs WHERE id = ?', id, nlfix=False)[0]
	if state == 'finished' or (state == 'failed' and job['canceled']):
		modify('UPDATE lectures_data SET stream_job = NULL WHERE stream_job = ?', id)

@app.route('/internal/streaming/control', methods=['POST'])
243
@mod_required
244
245
def control_stream():
	action = request.values['action']
246
247
	lecture_id = int(request.values['lecture_id'])
	course = (query('SELECT courses.* FROM courses JOIN lectures ON (courses.id = lectures.course_id) WHERE lectures.id = ?', lecture_id) or [None])[0]
248
249
250
251
252
	if action == 'start':
		schedule_livestream(lecture_id)
	elif action == 'stop':
		lecture = query('SELECT * FROM lectures WHERE id = ?', lecture_id)[0]
		cancel_job(lecture['stream_job'])
253
	return redirect(url_for('course', handle=course['handle']))