template_helper.py 5.2 KB
Newer Older
1
from server import *
2
3
4
5
6
7
8
9
10
11
12
13
14
import subprocess

app.jinja_env.trim_blocks = True
app.jinja_env.lstrip_blocks = True
app.add_template_global(random.randint, name='randint')
app.add_template_global(datetime, name='datetime')
app.add_template_global(timedelta, name='timedelta')
app.add_template_global(gethostname, name='gethostname')
app.add_template_global(min, name='min')
app.add_template_global(max, name='max')

# get git commit
output = subprocess.check_output(['git', "log", "-g", "-1", "--pretty=%H # %h # %d # %s"]).decode('UTF-8').split('#', 3)
15
app.jinja_env.globals['gitversion'] = {'hash': output[1], 'longhash': output[0], 'branch': output[2], 'msg': output[3]}
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188

@app.template_global()
def ismod(*args):
	return ('user' in session)

csrf_endpoints = []

def csrf_protect(func):
	csrf_endpoints.append(func.__name__)
	@wraps(func)
	def decorator(*args, **kwargs):
		if '_csrf_token' in request.values:
			token = request.values['_csrf_token']
		elif request.get_json() and ('_csrf_token' in request.get_json()):
			token = request.get_json()['_csrf_token']
		else:
			token = None
		if not ('_csrf_token' in session) or (session['_csrf_token'] != token ) or not token: 
			return 'csrf test failed', 403
		else:
			return func(*args, **kwargs)
	return decorator

@app.url_defaults
def csrf_inject(endpoint, values):
	if endpoint not in csrf_endpoints or not session.get('_csrf_token'):
		return
	values['_csrf_token'] = session['_csrf_token']

@app.template_filter()
def base64encode(str):
	try:
		return base64.b64encode(str.encode('UTF-8')).decode('UTF-8')
	except:
		return ''

@app.template_filter()
def checkperm(perms, username=None, password=None):
	if ismod():
		return True
	perms = evalperm(perms)
	for perm in perms:
		if perm['type'] == 'public':
			return True
		elif perm['type'] == 'password':
			if perm['param1'] == username and perm['param2'] == password:
				return True
		elif perm['type'] == 'l2p':
			if perm['param1'] in session.get('l2p_courses', []):
				return True
		elif perm['type'] == 'rwth':
			if session.get('rwthintern', False):
				return True
			if 'X-Real-IP' not in request.headers:
				continue
			ip = ip_address(request.headers['X-Real-IP'])
			for net in config['RWTH_IP_RANGES']:
				if ip in ip_network(net):
					return True
	return False

@app.template_filter()
def permdescr(perms):
	perms = evalperm(perms)
	public = False
	password = False
	l2p_courses = []
	rwth_intern = False
	fsmpi_intern = False
	for perm in perms:
		if perm['type'] == 'public':
			public = True
		elif perm['type'] == 'password':
			password = True
		elif perm['type'] == 'l2p':
			l2p_courses.append(perm['param1'])
		elif perm['type'] == 'rwth':
			rwth_intern = True
		elif perm['type'] == 'fsmpi':
			fsmpi_intern = True
	if public or not perms:
		return 'public', 'Öffentlich verfügbar'
	if rwth_intern:
		if password:
			return 'rwth', 'Nur für RWTH-Angehörige und Nutzer mit Passwort verfügbar'
		return 'rwth', 'Nur für RWTH-Angehörige verfügbar'
	if fsmpi_intern:
		return 'fsmpi', 'Nur für Fachschaftler verfügbar'
	if l2p_courses:
		if password:
			return 'l2p', 'Nur für Teilnehmer der Veranstaltung und Nutzer mit Passwort verfügbar'
		return 'l2p', 'Nur für Teilnehmer der Veranstaltung verfügbar'
	if password:
		return 'password', 'Nur für Nutzer mit Passwort verfügbar'
	return 'none', 'Nicht verfügbar'

# debian ships jinja2 without this test...
@app.template_test(name='equalto')
def equalto(a,b):
	return a == b

@app.template_filter(name='filterdict')
def jinja2_filterdict(value, attrdel):
	v = dict(value)
	for a in attrdel:
		if a in v:
			del v[a]
	return dict(v)

@app.template_filter(name='semester')
def human_semester(s, long=False):
	if not s or s == 'zeitlos' or len(s) != 6:
		return 'Zeitlos'
	year = s[0:4]
	semester = s[4:6].upper()
	if not year.isdigit() or semester not in ['SS', 'WS']:
		print('Invalid semester string "%s"'%s)
		return '??'
	if not long:
		return semester+year[2:]
	elif semester == 'SS':
		return 'Sommersemester %s'%year
	else:
		return 'Wintersemester %s/%s'%(year, str(int(year)+1)[2:])

@app.template_filter(name='date')
def human_date(d):
	return d.strftime('%d.%m.%Y')

@app.template_filter(name='fulldate')
def human_fulldate(d):
	return d.strftime('%a, %d.%m.%Y, %H:%M Uhr')

@app.template_filter(name='time')
def human_time(d):
	return d.strftime('%H:%M')

@app.template_filter()
def rfc3339(d):
	return d.strftime('%Y-%m-%dT%H:%M:%S+02:00')

@app.template_global()
def get_announcements(minlevel=0):
	offset = timedelta()
	if ismod():
		offset = timedelta(hours=24)
	try:
		return query('SELECT * FROM announcements WHERE NOT deleted AND ((time_expire = NULL) OR time_expire > ?) AND (? OR (visible AND time_publish < ?)) AND level >= ? ORDER BY level DESC', datetime.now()-offset, ismod(), datetime.now(), minlevel)
	except:
		return []

@app.template_filter()
def fixnl(s):
	# To be remove, as soon as db schema is cleaned-up
	return str(s).replace('\n', '<br>')

@app.template_filter()
def tagid(s):
	if not s:
		return 'EMPTY'
	s = s.replace(' ', '_').lower()
	r = ''
	for c in s:
		if c in string.ascii_lowercase+string.digits+'_':
			r = r + c
	return r

@app.template_global()
def is_readonly():
	try:
		return show('SHOW GLOBAL STATUS LIKE "wsrep_ready"')['wsrep_ready'] != 'ON'
	except:
		return True