Skip to content
Snippets Groups Projects
Select Git revision
  • fe69fa2395363dcba16581a00fb32e1832829e13
  • master default protected
  • intros
  • live_sources
  • bootstrap4
  • modules
6 results

importer.py

Blame
  • Forked from Video AG Infrastruktur / website
    Source project has a limited visibility.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    importer.py 7.40 KiB
    from server import *
    
    import urllib.request
    import urllib.parse
    
    @app.route('/internal/import/<int:id>', methods=['GET', 'POST'])
    @mod_required
    def list_import_sources(id):
    	courses = query('SELECT * FROM courses WHERE id = ?', id)[0]
    
    	campus={}
    	for i in request.values:
    		group, importid, field = i.split('.', 2)
    		if group == 'campus':
    			if not importid in campus:
    				campus[importid] = {}
    			campus[importid][field] = request.values[i]
    	for i in campus:
    		if i.startswith('new'):
    			if campus[i]['url'] != '':
    				modify('INSERT INTO import_campus (url, type, course_id, last_checked, changed) VALUES (?, ?, ?, ?, 1)',campus[i]['url'],campus[i]['type'],id,datetime.now())
    		else:
    			if campus[i]['url'] != '':
    				query('UPDATE import_campus SET url = ?, `type` = ? WHERE (course_id = ?) AND (id = ?)', campus[i]['url'],campus[i]['type'],id,int(i))	
    			else:
    				query('DELETE FROM import_campus WHERE (id = ?) AND (course_id = ?)',int(i),id)
    	import_campus = query('SELECT * FROM import_campus WHERE course_id = ?',id)
    
    	return render_template('import_campus.html', course=courses, import_campus=import_campus, events=[])
    
    def fetch_co_course_events(i):
    	from lxml import html
    	from lxml import etree
    	events = []
    	try:
    		remote_html = urllib.request.urlopen(i['url']).read()
    	except:
    		flash("Ungültige URL: '"+i['url']+"'")
    	tablexpath = "//td[text()='Termine und Ort']/following::table[1]"
    	basetable = html.fromstring(remote_html).xpath(tablexpath)[0]
    	parsebase = html.tostring(basetable);
    
    	#parse recurring events
    	toparse = [i['url']]
    	for j in basetable.xpath("//table[@cellpadding='5']//tr[@class='hierarchy4' and td[@name='togglePeriodApp']]"):
    		url = str(j.xpath("td[@name='togglePeriodApp']/a/@href")[0])
    		toparse.append(url)
    	events_raw = []
    	for j in toparse:
    		if j.startswith('event'):
    			url = 'https://www.campus.rwth-aachen.de/rwth/all/'+j
    		else:
    			url = j
    		text = urllib.request.urlopen(url).read()
    		dom = html.fromstring(text).xpath(tablexpath)[0]
    		#we get the "heading" row, from it extract the room and time. best way to get it is to match on the picture -.-
    		baserow = dom.xpath("//table[@cellpadding='5']//tr[@class='hierarchy4' and td[@name='togglePeriodApp']/*/img[@src='../../server/img/minus.gif']]")
    		if not baserow:
    			continue
    		baserow = baserow[0]
    		rowdata = {'dates': []}
    
    		# "kein raum vergeben" is a special case, else use campus id
    		if baserow.xpath("td[6]/text()")[0] == 'Kein Raum vergeben':
    			rowdata['place'] = ''
    		elif baserow.xpath("td[6]/a"):
    			rowdata['place'] = baserow.xpath("td[6]/a")[0].text_content()
    		else:
    			rowdata['place'] = baserow.xpath("td[6]/text()")[0].split(' ',1)[0]
    
    		rowdata['start'] = baserow.xpath("td[3]/text()")[0]
    		rowdata['end'] = baserow.xpath("td[5]/text()")[0]
    		rowdata['dates'] = baserow.getparent().xpath("tr[@class='hierarchy5']//td[@colspan='3']/text()")
    		events_raw.append(rowdata)
    
    	# parse single appointments
    	if basetable.xpath("//table[@cellpadding='3']/tr/td[text()='Einmalige Termine:']"):
    		singletable = basetable.xpath("//table[@cellpadding='3']/tr/td[text()='Einmalige Termine:']")[0].getparent().getparent()
    		for row in singletable.xpath("tr/td[2]"):
    			rowdata = {}
    			if row.xpath("text()[2]")[0] == 'Kein Raum vergeben':
    				rowdata['place'] = ''
    			elif row.xpath("a"):
    				rowdata['place'] = row.xpath("a")[0].text_content()
    			else:
    				rowdata['place'] = row.xpath("text()[2]")[0].split(' ',1)[0]
    
    			rowdata['dates'] = [row.xpath("text()[1]")[0][4:14]]
    			rowdata['start'] = row.xpath("text()[1]")[0][17:22]
    			rowdata['end'] = row.xpath("text()[1]")[0][27:32]
    			events_raw.append(rowdata)
    
    	#now we have to filter our data and do some lookups
    	for j in events_raw:
    		for k in j['dates']:
    			e = {}
    			fmt= "%d.%m.%Y %H:%M"
    			e['time'] = datetime.strptime("%s %s"%(k,j['start']) ,fmt)
    			e['duration'] = int((datetime.strptime("%s %s"%(k,j['end']) ,fmt) - e['time']).seconds/60)
    			j['place'] = str(j['place'])
    			if j['place'] != '':
    				dbplace = query("SELECT name FROM places WHERE (campus_room = ?) OR (campus_name = ?) OR ((NOT campus_name) AND name = ?)",j['place'],j['place'],j['place'])
    				if dbplace:
    					e['place'] = dbplace[0]['name']
    				else:
    					e['place'] = 'Unbekannter Ort ('+j['place']+')'
    			else:
    				e['place'] = ''
    			e['title'] = i['type']
    			events.append(e)
    	# it is parsed.
    	return events
    
    def fetch_ro_event_ical(ids):
    	data = {'pMode': 'T', 'pInclPruef': 'N', 'pInclPruefGepl': 'N', 'pOutputFormat': '99', 'pCharset': 'UTF8', 'pMaskAction': 'DOWNLOAD'}
    	data = list(data.items())
    	for id in ids:
    		data.append(('pTerminNr', id))
    	data = urllib.parse.urlencode(data).encode('utf-8')
    	r = urllib.request.Request('https://online.rwth-aachen.de/RWTHonline/pl/ui/%24ctx/wbKalender.wbExport',
    			data=data, method='POST')
    	with urllib.request.urlopen(r) as f:
    		return f.read().decode('utf-8')
    
    def fetch_ro_course_ical(id):
    	from lxml import html
    	url = 'https://online.rwth-aachen.de/RWTHonline/pl/ui/%24ctx/wbTermin_List.wbLehrveranstaltung?pStpSpNr='+'%i'%(int(id))
    	req = urllib.request.urlopen(url)
    	dom = html.fromstring(req.read())
    	event_ids = [x.value for x in dom.xpath('//input[@name="pTerminNr"]')]
    	return fetch_ro_event_ical(event_ids)
    
    def fetch_ro_course_events(item):
    	import icalendar
    	# First fix crappy javascript fragment-Paths
    	url = urllib.parse.urlparse(item['url'].replace('#/', ''))
    	args = urllib.parse.parse_qs(url.query)
    	if 'pStpSpNr' in args: # Legacy URLs
    		id = args['pStpSpNr'][0]
    	elif url.path.split('/')[-2] == 'courses': # New URLs
    		id = url.path.split('/')[-1]
    	else:
    		flash("Ungültige URL: '"+i['url']+"'")
    	cal = icalendar.Calendar().from_ical(fetch_ro_course_ical(id))
    	events = []
    	for comp in cal.subcomponents:
    		if comp.name != 'VEVENT':
    			continue
    		if comp.get('STATUS') != 'CONFIRMED':
    			continue
    		e = {}
    		e['place'] = str(comp.get('LOCATION', ''))
    		e['time'] = comp['DTSTART'].dt # TODO: tz
    		e['duration'] = int((comp['DTEND'].dt - comp['DTSTART'].dt).seconds/60)
    		e['title'] = item['type']
    		events.append(e)
    	return events
    
    @app.route('/internal/import/<int:id>/now', methods=['GET', 'POST'])
    @mod_required
    def import_from(id):
    	courses = query('SELECT * FROM courses WHERE id = ?', id)[0]
    	lectures = query('SELECT * FROM lectures WHERE course_id = ?', courses['id'])
    	import_campus = query('SELECT * FROM import_campus WHERE course_id = ?',id)
    	events = []
    	try:
    		# if u have to port this to anything new, god be with you.
    		for i in import_campus:
    			if 'www.campus.rwth-aachen.de' in i['url']:
    				events += fetch_co_course_events(i)
    			else:
    				events += fetch_ro_course_events(i)
    	except ImportError:
    		flash('python-lxml not found, campus import will not work.')
    
    	# events to add
    	newevents = []
    	for i in events + lectures:
    		unique = False
    		exists = False
    		for j in newevents:
    			unique = (i['place'] == j['place']) and (i['time'] == j['time']) and (i['duration'] == j['duration'])
    			if unique:
    				break
    		for j in lectures:
    			exists = (i['place'] == j['place']) and (i['time'] == j['time']) and (i['duration'] == j['duration'])
    			if exists:
    				break
    		if (not unique) and (not exists):
    			newevents.append(i)
    	
    	# deleted events
    	deletedlectures = []
    	for i in lectures:
    		incampus = False
    		for j in events:
    			incampus = (i['place'] == j['place']) and (i['time'] == j['time']) and (i['duration'] == j['duration'])
    			if incampus:
    				break
    		if not incampus:
    			deletedlectures.append(i)
    
    	return render_template('import_campus.html', course=courses, import_campus=import_campus, newevents=newevents, deletedlectures=deletedlectures)