livestreams.py 6.31 KB
Newer Older
1
from server import *
Julian Rother's avatar
Julian Rother committed
2
3
4
5
6
import requests
from xml.etree import ElementTree
import random
import string

7
@sched_func(120)
8
def livestream_thumbnail():
Andreas Valder's avatar
Andreas Valder committed
9
	livestreams = query('SELECT streams.lecture_id, streams.handle AS livehandle FROM streams WHERE streams.active')
10
	for v in genlive(livestreams):
11
		schedule_job('thumbnail', {'src': v['path'], 'filename': 'l_%i.jpg'%lecture['id']})
12

13
@app.route('/internal/streaming/legacy_auth', methods=['GET', 'POST'])
14
@app.route('/internal/streaming/legacy_auth/<server>', methods=['GET', 'POST'])
Julian Rother's avatar
Julian Rother committed
15
def streamauth_legacy(server=None):
16
	internal = False
17
18
19
20
	if 'X-Real-IP' in request.headers:
		for net in config.get('FSMPI_IP_RANGES', []):
			if ip_address(request.headers['X-Real-IP']) in ip_network(net):
				internal = True
21
22
23
24
25
	if request.values['app'] != 'live':
		return 'Bad request', 400
	if not internal:
		return 'Forbidden', 403
	if request.values['call'] == 'publish':
26
27
		if request.values['pass'] != 'caisoh8aht0wuSu':
			return 'Forbidden', 403
28
29
30
31
32
33
34
35
36
37
38
39
40
41
		matches = query("SELECT lectures.* FROM lectures JOIN courses ON lectures.course_id = courses.id WHERE courses.handle = ? ORDER BY lectures.time DESC", request.values['name'])
		now = datetime.now()
		match = {'id': -1}
		for lecture in matches:
			if lecture['time']-timedelta(minutes=30) <= now and \
				now <= lecture['time']+timedelta(minutes=lecture['duration']):
				match = lecture
				break
		if 'lecture' in request.values:
			match = {'id': request.values['lecture']}
		try:
			modify("INSERT INTO streams (handle, active, visible, lecture_id, description, poster) VALUES (?, 0, 1, -1, "", "")", request.values['name'])
		except:
			pass
42
43
44
45
46
47
48
49
50
		if server:
			data = {'src': 'rtmp://%s/live/%s'%(server, request.values['name']),
				'destbase': 'rtmp://%s/hls/%s'%(server, request.values['name'])}
			job_id = schedule_job('simple_live_transcode', data, priority=10)
			modify("UPDATE streams SET active = 1, lecture_id = ?, job_id = ? WHERE handle = ?",
					match['id'], job_id, request.values['name'])
		else:
			modify("UPDATE streams SET active = 1, lecture_id = ? WHERE handle = ?",
					match['id'], request.values['name'])
51
	elif request.values['call'] == 'publish_done':
52
		job_id = query('SELECT job_id FROM streams WHERE handle = ?', request.values['name'])[0]['job_id']
53
		modify("UPDATE streams SET active = 0 WHERE handle = ?", request.values['name'])
54
55
		if job_id:
			cancel_job(job_id)
56
57
58
59
	else:
		return 'Bad request', 400
	return 'OK', 200

60
61
62
63
@job_handler('simple_live_transcode', state='failed')
def restart_failed_live_transcode(id, type, data, state, status):
	restart_job(id)

Julian Rother's avatar
Julian Rother committed
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@app.route('/internal/streaming')
#@register_navbar('Streaming', icon='transfer')
@mod_required
def streaming():
	sources = query('SELECT * FROM live_sources WHERE NOT deleted')
	for source in sources:
		if not source['clientid']:
			continue
		r = requests.get('http://%s:8080/stats'%source['server'])
		if r.status_code != 200:
			continue
		source['stat'] = {}
		tree = ElementTree.fromstring(r.text)
		if not tree:
			continue
		s = tree.find("./server/application/[name='src']/live/stream/[name='%i']"%source['id'])
80
81
		if not s:
			continue
Julian Rother's avatar
Julian Rother committed
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
		for e in s.find("client/[publishing='']").getchildren():
			source['stat'][e.tag] = e.text
		source['video'] = {}
		for e in s.find('meta/video').getchildren():
			source['video'][e.tag] = e.text
		source['audio'] = {}
		for e in s.find('meta/audio').getchildren():
			source['audio'][e.tag] = e.text
	return render_template("streaming.html", sources=sources)

def gentoken():
	return ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(16))

@app.route('/internal/streaming/rekey/<int:id>')
@mod_required
def streamrekey(id):
98
	modify('UPDATE live_sources SET `key` = ? WHERE id = ? AND NOT deleted', gentoken(), id)
Julian Rother's avatar
Julian Rother committed
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
	source = query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id)[0]
	flash('Der Streamkey von <strong>'+source['name']+'</strong> wurde neu generiert: <span><input readonly type="text" style="width: 15em" value="'+source['key']+'"></span>')
	return redirect(url_for('streaming'))

@app.route('/internal/streaming/drop/<int:id>')
@mod_required
def streamdrop(id):
	source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', id) or [None])[0]
	if not source:
		if 'ref' in request.values:
			flash('Streamquelle nicht gefunden')
			return redirect(request.values['ref'])
		else:
			return 'Not found', 404
	requests.get('http://%s:8080/control/drop/publisher?clientid=%i'%(source['server'], source['clientid']))
	if 'ref' in request.values:
		return redirect(request.values['ref'])
	return 'Ok', 200

118
119
120
121
@sched_func(120)
def live_source_thumbnail():
	sources = query('SELECT * FROM live_sources WHERE clientid IS NOT NULL')
	for source in sources:
Julian Rother's avatar
Julian Rother committed
122
		schedule_job('thumbnail', {'srcurl': 'rtmp://%s/src/%i'%(source['server'], source['id']), 'filename': 's_%i.jpg'%source['id']})
123

Julian Rother's avatar
Julian Rother committed
124
125
126
@app.route('/internal/streaming/auth/<server>', methods=['GET', 'POST'])
def streamauth(server):
	internal = False
127
	for net in config.get('FSMPI_IP_RANGES', []):
Julian Rother's avatar
Julian Rother committed
128
129
130
131
132
		if ip_address(request.headers['X-Real-IP']) in ip_network(net):
			internal = True
	if not internal:
		return 'Forbidden', 403
	if request.values['call'] == 'publish':
133
		sources = query('SELECT * FROM live_sources WHERE NOT deleted AND `key` = ?', request.values['name'])
Julian Rother's avatar
Julian Rother committed
134
135
		if not sources:
			return 'Not found', 404
136
		modify('UPDATE live_sources SET server = ?, server_public = ?, clientid = ?, last_active = ?, preview_key = ? WHERE id = ?', server, request.args.get('public_ip', server), request.values['clientid'], datetime.now(), gentoken(), sources[0]['id'])
137
		live_source_thumbnail()
Julian Rother's avatar
Julian Rother committed
138
139
140
141
142
143
144
		ret = Response('Redirect', 301, {'Location': '%i'%sources[0]['id']})
		ret.autocorrect_location_header = False
		return ret
	if request.values['call'] == 'play':
		source = (query('SELECT * FROM live_sources WHERE NOT deleted AND id = ?', request.values['name']) or [None])[0]
		if not source:
			return 'Not found', 404
145
146
147
148
149
150
		for net in config.get('INTERNAL_IP_RANGES', []):
			if ip_address(request.values['addr']) in ip_network(net):
				return 'Ok', 200
		if source['preview_key'] == request.values.get('preview_key'):
			return 'Ok', 200
		return 'Forbidden', 403
Julian Rother's avatar
Julian Rother committed
151
152
153
154
	elif request.values['call'] == 'publish_done':
		modify('UPDATE live_sources SET server = NULL, clientid = NULL, preview_key = NULL WHERE server = ? AND clientid = ?', server, request.values['clientid'])
		return 'Ok', 200
	return 'Bad request', 400