Select Git revision
Forked from
protokollsystem / proto3
Source project has a limited visibility.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
authentication.py 13.55 KiB
from ipaddress import ip_address, ip_network
import requests
from api.course import lecture_query_auth
from api.errors import *
from json.decoder import JSONDecodeError
from server import config, session, request
def is_moderator():
return 'user' in session
def is_rwth_ip(ip_string: str) -> bool:
ip = ip_address(ip_string)
for net in config['RWTH_IP_RANGES']:
if ip in ip_network(net):
return True
return False
def get_effective_auth(auth_list_db: []) -> []:
course_auth = []
lecture_auth = []
for auth_db in auth_list_db:
if "course_id" in auth_db:
course_auth.append(auth_db)
elif "lecture_id" in auth_db:
lecture_auth.append(auth_db)
elif "video_id" in auth_db:
pass # Not supported anymore
else:
raise Exception("Permission has no course, lecture or video id set")
if len(lecture_auth) > 0:
return lecture_auth
if len(course_auth) > 0:
return course_auth
return [{"type": "public"}]
def get_authentication_methods(auth_methods_db: []) -> [str]:
auth_methods_db = get_effective_auth(auth_methods_db)
has_rwth: bool = False
has_moodle: bool = False
has_fsmpi: bool = False
has_password: bool = False
has_none: bool = False
for auth_method_db in auth_methods_db:
if auth_method_db["type"] == "public":
return ["public"]
elif auth_method_db["type"] == "rwth":
has_rwth = True
elif auth_method_db["type"] == "moodle":
has_moodle = True
elif auth_method_db["type"] == "fsmpi":
has_fsmpi = True
elif auth_method_db["type"] == "password":
has_password = True
elif auth_method_db["type"] == "none":
has_none = True
else:
raise ValueError("Unknown authentication method: " + str(auth_method_db["type"]))
auth_list_json = []
if has_rwth:
auth_list_json.append("rwth")
if has_moodle:
auth_list_json.append("moodle")
if has_fsmpi:
auth_list_json.append("fsmpi")
if has_password:
auth_list_json.append("password")
if len(auth_list_json) > 0:
return auth_list_json
if has_none:
return []
raise Exception("Got empty array from get_effective_auth")
def is_lecture_authenticated(lecture_id: int) -> bool:
return is_authenticated(lecture_query_auth(lecture_id))
def is_authenticated(auth_list: []):
auth_list = get_effective_auth(auth_list)
for auth in auth_list:
auth_type = auth["type"]
if auth_type == "public":
return True
elif auth_type == "password":
if not "auth_data" in session:
continue
for username, password in session["auth_data"].items():
if auth["param1"] == username and auth["param2"] == password:
return True
elif auth_type == "l2p":
continue # Not supported anymore
elif auth_type == "moodle":
if auth["param1"] in session.get("moodle_courses", []):
return True
elif auth_type == "rwth":
if session.get("rwthintern", False):
return True
if "X-Real-IP" not in request.headers:
continue
if is_rwth_ip(request.headers["X-Real-IP"]):
return True
elif auth_type == "fsmpi":
if is_moderator():
return True
else:
raise Exception("Unknown permission type: " + str(auth_type))
return False
def authenticate_password(lecture_id: int, username: str, password: str):
"""
May throw APIClientException.
Only returns if authentication was successful.
"""
auth_list = get_effective_auth(lecture_query_auth(lecture_id))
found_password: bool = False
for auth in auth_list:
if auth["type"] != "password":
continue
found_password = True
if auth["param1"] != username or auth["param2"] != password:
continue
if "auth_data" not in session:
session["auth_data"] = {}
session["auth_data"][username] = password
return
if not found_password:
raise ApiClientException(ERROR_LECTURE_HAS_NO_PASSWORD)
raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
def authenticate_fsmpi(username: str, password: str) -> {}:
"""
May throw APIClientException.
Only returns if authentication was successful.
"""
user_info, groups = ldapauth(username, password)
user_id: str = user_info.get("uid")
if not user_id or not __ldap_is_moderator(groups):
raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
session["user"] = user_info
user_list_db = query("""\
SELECT * FROM users WHERE name = ? \
""", user_id)
if len(user_list_db) < 1:
modify("""\
INSERT INTO users \
(name, realname, fsacc, level, calendar_key, rfc6238) \
VALUES (?, ?, ?, 1, "", "") \
""", user_id, user_info["givenName"], user_id)
user_list_db = query("""\
SELECT * FROM users WHERE name = ? \
""", user_id)
if len(user_list_db) < 1:
raise Exception("User not present in database after insertion")
user_db = user_list_db[0]
database_id = user_db["id"]
user_info["dbid"] = database_id
session["_csrf_token"] = "".join(
random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
session.permanent = True
return {
"id": database_id,
"id_string": user_id,
"name": user_info["givenName"]
}
def __ldap_is_moderator(user_groups: []) -> bool:
for group in config["LDAP_GROUPS"]:
if group in user_groups:
return True
return False
def start_rwth_authentication() -> str:
"""
:return: Verification url
"""
return __start_oauth("rwth")
def is_rwth_authentication_running() -> bool:
return "oauthscope" in session and "rwth" in session["oauthscope"]
def is_moodle_authentication_running() -> bool:
return "oauthscope" in session and "moodle" in session["oauthscope"]
def start_moodle_authentication() -> str:
"""
:return: Verification url
"""
return __start_oauth("moodle")
if config.get("DEBUG", False):
debug_running_oauth = {}
def __start_oauth(scope: str) -> str:
if scope not in ["rwth", "moodle"]:
raise ValueError("Expected scope to be rwth or moodle")
oauth_code = "".join(
random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
debug_running_oauth[oauth_code] = {
"scope": scope
}
session["oauthcode"] = oauth_code
session["oauthscope"] = scope
return "/debug/dummy_oauth/auth/" + oauth_code
def try_finish_running_authentication():
if "oauthscope" not in session or "oauthcode" not in session:
return
oauth_scope = session["oauthscope"]
oauth_code = session["oauthcode"]
running_oauth = debug_running_oauth.get(oauth_code)
if running_oauth is None or not running_oauth.get("finished", False):
return
del session["oauthcode"]
del session["oauthscope"]
session["rwthintern"] = True
if oauth_scope == "moodle":
session["moodle_courses"] = []
if "moodle_courses" in running_oauth:
session["moodle_courses"] = running_oauth["moodle_courses"]
del debug_running_oauth[oauth_code]
return
@app.route("/debug/dummy_oauth/auth/<code>", methods=["GET"])
def debug_authentication_dummy_oauth_auth(code):
if code not in debug_running_oauth:
return "Unknown code", HTTP_404_NOT_FOUND
if debug_running_oauth[code].get("scope") == "moodle":
return """
<html>
<h1>Dummy Moodle OAuth</h1>
<form action="/debug/dummy_oauth/submit/%s" method="post">
<label for="moodle_courses">Moodle courses (ids comma separated):</label>
<input type="text" name="moodle_courses" id="moodle_courses"/>
<input type="submit"/>
</form>
</html>
""" % code, HTTP_200_OK
else:
return """
<html>
<h1>Dummy RWTH OAuth</h1>
<form action="/debug/dummy_oauth/submit/%s" method="post">
<input type="submit"/>
</form>
</html>
""" % code, HTTP_200_OK
@app.route("/debug/dummy_oauth/submit/<code>", methods=["POST"])
def debug_authentication_dummy_oauth_submit(code):
if code not in debug_running_oauth:
return "Unknown code", HTTP_404_NOT_FOUND
running_oauth = debug_running_oauth[code]
running_oauth["finished"] = True
print(str(request.form))
if running_oauth.get("scope") == "moodle" and "moodle_courses" in request.form:
courses_str: str = request.form["moodle_courses"]
running_oauth["moodle_courses"] = [str(int(s)) for s in courses_str
.replace("\r", "")
.replace("\n", "")
.replace(" ", "")
.replace("\t", "")
.split(",")]
return "Successfully Authenticated", HTTP_200_OK
else:
def __start_oauth(scope: str) -> str:
if scope not in ["rwth", "moodle"]:
raise ValueError("Expected scope to be rwth or moodle")
response = __make_oauth_request(
"code",
scope="moodle.rwth" if scope == "moodle" else "userinfo.rwth"
)
session["oauthcode"] = response["device_code"]
session["oauthscope"] = scope
return response["verification_url"] + "?q=verify&d=" + response["user_code"]
def try_finish_running_authentication():
if "oauthcode" not in session or "oauthscope" not in session:
return
oauth_scope = session["oauthscope"]
if not oauth_scope in ["rwth", "moodle"]:
raise ValueError("Session auth scope set to something different than rwth or moodle")
try:
token_response = __make_oauth_request("token", code=session["oauthcode"], grant_type="device")
if token_response.get("status") != "ok":
return
del session["oauthcode"]
del session["oauthscope"]
session["rwthintern"] = True
if oauth_scope == "moodle":
moodle_response = __make_moodle_request("getmyenrolledcourses", token_response["access_token"])
if moodle_response and moodle_response.get("Data"):
moodle_courses = []
session["moodle_courses"] = moodle_courses
for course in moodle_response["Data"]:
moodle_courses.append(str(course["id"]))
else:
notify_admins(
'endpoint_exception',
traceback="try_finish_running_authentication failed while getting moodle courses, data={}"
.format(str(moodle_response)))
__make_oauth_request("token", refresh_token=token_response["refresh_token"], grant_type="invalidate")
except ApiClientException:
pass
def get_currently_authenticated_methods() -> [str]:
methods = []
if session.get("rwthintern", False):
methods.append("rwth")
if "moodle_courses" in session:
methods.append("moodle")
if "auth_data" in session and len(session["auth_data"]) > 0:
methods.append("password")
if is_moderator():
methods.append("fsmpi")
return methods
OAUTH_BASE = "https://oauth.campus.rwth-aachen.de/oauth2waitress/oauth2.svc/"
MOODLE_BASE = "https://moped.ecampus.rwth-aachen.de/proxy/api/v2/eLearning/Moodle/"
def __make_moodle_request(endpoint, token, **args):
args["token"] = token
return __handle_request_result(requests.request('GET', MOODLE_BASE + endpoint, params=args, timeout=30))
def __make_oauth_request(endpoint, **args):
if "L2P_APIKEY" not in config:
raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
args["client_id"] = config["L2P_APIKEY"]
return __handle_request_result(requests.request('POST', OAUTH_BASE + endpoint, data=args, timeout=30))
def __handle_request_result(response: requests.Response):
if response.ok:
return response.json()
try:
response = response.json()
except JSONDecodeError:
response = None
pass
if response.status_code >= 500:
raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
raise Exception("Error during request. Status: " + str(response.status_code) + " Response: " + str(response))