import secrets import re import string from enum import StrEnum from ipaddress import ip_address, ip_network import requests from requests.exceptions import JSONDecodeError from functools import wraps from flask import session, request from api.miscellaneous import * from api.database import * import api class ViewPermissionsType(StrEnum): PUBLIC = "public" PRIVATE = "private" AUTHENTICATION = "authentication" INHERIT = "inherit" class ViewPermissions: def __init__(self, type: ViewPermissionsType, rwth_authentication: bool or None = None, fsmpi_authentication: bool or None = None, moodle_course_ids: list[int] or None = None, passwords: dict[str, str] or None = None): super().__init__() self.type = type self.rwth_authentication = rwth_authentication self.fsmpi_authentication = fsmpi_authentication self.moodle_course_ids = moodle_course_ids self.passwords = passwords is_auth = type == ViewPermissionsType.AUTHENTICATION if ((rwth_authentication is not None) != is_auth) \ or ((fsmpi_authentication is not None) != is_auth) \ or ((moodle_course_ids is not None) != is_auth) \ or ((passwords is not None) != is_auth): raise TypeError() if is_auth \ and not rwth_authentication \ and not fsmpi_authentication \ and len(moodle_course_ids) == 0 \ and len(passwords) == 0: raise RuntimeError("At least one authentication method must be present for type AUTHENTICATION") @staticmethod def from_json(json: CJsonValue or JsonTypes) -> "ViewPermissions": is_client = isinstance(json, CJsonValue) if not is_client: json = CJsonValue(json) json = json.as_object() try: type_str: str = json.get_string("type", 100) try: type = ViewPermissionsType(type_str) except ValueError: raise json.raise_error(f"Unknown type '{truncate_string(type_str)}'") rwth_authentication = None fsmpi_authentication = None moodle_course_ids = None passwords = None if type == ViewPermissionsType.AUTHENTICATION: rwth_authentication = json.get_bool("rwth_authentication") fsmpi_authentication = json.get_bool("fsmpi_authentication") moodle_course_ids = [v.as_sint32() for v in json.get_array("moodle_course_ids")] passwords_json = json.get_object("passwords") passwords = {k: passwords_json.get_string(k, 1024) for k in passwords_json.keys()} if not rwth_authentication \ and not fsmpi_authentication \ and len(moodle_course_ids) == 0 \ and len(passwords) == 0: raise ApiClientException(ERROR_BAD_REQUEST(f"For type {ViewPermissionsType.AUTHENTICATION.value} at" f" least one authentication method must be present")) else: json.raise_if_present("rwth_authentication") json.raise_if_present("fsmpi_authentication") json.raise_if_present("moodle_course_ids") json.raise_if_present("passwords") return ViewPermissions( type, rwth_authentication, fsmpi_authentication, moodle_course_ids, passwords ) except ApiClientException as e: if is_client: raise e raise RuntimeError(f"Invalid json: {e.error.message}") def to_json(self) -> dict: if self.type == ViewPermissionsType.AUTHENTICATION: return { "type": self.type.value, "rwth_authentication": self.rwth_authentication, "fsmpi_authentication": self.fsmpi_authentication, "moodle_course_ids": self.moodle_course_ids, "passwords": self.passwords } else: return { "type": self.type.value } class EffectiveViewPermissions(ViewPermissions): def __init__(self, object_permissions: ViewPermissions, inherited_permissions: ViewPermissions = ViewPermissions(ViewPermissionsType.PUBLIC)): """ :param object_permissions: The permissions set for the object :param inherited_permissions: Only relevant if `permissions.type == INHERIT`. May not have type INHERIT """ effective = object_permissions if object_permissions.type == ViewPermissionsType.INHERIT: if inherited_permissions.type == ViewPermissionsType.INHERIT: raise RuntimeError("No indirect inheritance allowed") effective = inherited_permissions super().__init__( effective.type, effective.rwth_authentication, effective.fsmpi_authentication, effective.moodle_course_ids, effective.passwords ) self.object_permissions = object_permissions def api_moderator_route(require_csrf_token: bool = False): def decorator(func): if hasattr(func, "is_api_route") and func.is_api_route: raise Exception("@api_moderator_route() seems to be applied after @api_route(). @api_moderator_route() " "should be the first (lowest) decorator!") @wraps(func) def wrapper(*args, **kwargs): if not is_moderator(): raise ApiClientException(ERROR_UNAUTHORIZED) if require_csrf_token: check_csrf_token() return func(*args, **kwargs) return wrapper return decorator def is_moderator(): return "user" in session def check_csrf_token(): if "X-Csrf-Token" not in request.headers \ or not is_valid_csrf_token(request.headers["X-Csrf-Token"]): raise ApiClientException(ERROR_INVALID_CSRF_TOKEN) def is_valid_csrf_token(csrf_token: str) -> bool: if "_csrf_token" not in session: return False return csrf_token == session["_csrf_token"] def get_csrf_token(): if "_csrf_token" not in session: raise RuntimeError("No csrf token in session. Probably not a moderator") return session["_csrf_token"] def get_user_id() -> int or None: if "user" not in session: return None return session["user"]["dbid"] def get_user_info(): if "user" not in session: return None user = session["user"] return { "id": user["dbid"], "id_string": user["uid"], "name": user["givenName"] } def is_rwth_ip(ip_string: str) -> bool: ip = ip_address(ip_string) for net in api.config['RWTH_IP_RANGES']: if ip in ip_network(net): return True return False def _get_raw_view_permissions(auth_list_db: []) -> ViewPermissions: if len(auth_list_db) == 0: return ViewPermissions(ViewPermissionsType.INHERIT) has_rwth: bool = False has_fsmpi: bool = False has_none: bool = False moodle_course_ids: list[int] = [] passwords: dict[str, str] = {} for auth_db in auth_list_db: match auth_db["type"]: case "public": return ViewPermissions(ViewPermissionsType.PUBLIC) case "rwth": has_rwth = True case "moodle": moodle_course_ids.append(int(auth_db["param1"])) case "fsmpi": has_fsmpi = True case "password": username = auth_db["param1"] password = auth_db["param2"] if username in passwords: print(f"Warning: Duplicate username in password permissions: {username} (Perm id: {auth_db['id']})") passwords[username] = password case "none": has_none = True case "l2p": pass case _: raise ValueError("Unknown authentication method: " + str(type)) if has_rwth or has_fsmpi or len(moodle_course_ids) > 0 or len(passwords) > 0: return ViewPermissions( ViewPermissionsType.AUTHENTICATION, has_rwth, has_fsmpi, moodle_course_ids, passwords ) if has_none: return ViewPermissions(ViewPermissionsType.PRIVATE) # We must only have a l2p permission. This is effectively private return ViewPermissions(ViewPermissionsType.PRIVATE) def get_effective_view_permissions(auth_list_db: [], for_lecture: bool) -> EffectiveViewPermissions: course_auth = [] lecture_auth = [] for auth_db in auth_list_db: if "course_id" in auth_db and auth_db["course_id"] is not None: course_auth.append(auth_db) elif "lecture_id" in auth_db and auth_db["lecture_id"] is not None: lecture_auth.append(auth_db) elif "video_id" in auth_db and auth_db["video_id"] is not None: pass # Not supported anymore else: raise Exception("Permission has no course or lecture id set") course_perm = EffectiveViewPermissions(_get_raw_view_permissions(course_auth)) if for_lecture: return EffectiveViewPermissions( _get_raw_view_permissions(lecture_auth), course_perm ) else: return course_perm def get_authentication_methods(auth_methods_db: [], for_lecture: bool) -> [str]: perm = get_effective_view_permissions(auth_methods_db, for_lecture) match perm.type: case ViewPermissionsType.PUBLIC: return ["public"] case ViewPermissionsType.PRIVATE: return [] case ViewPermissionsType.AUTHENTICATION: auth_list_json: list[str] = [] if perm.rwth_authentication: auth_list_json.append("rwth") if perm.fsmpi_authentication: auth_list_json.append("fsmpi") if len(perm.moodle_course_ids) > 0: auth_list_json.append("moodle") if len(perm.passwords) > 0: auth_list_json.append("password") return auth_list_json case ViewPermissionsType.INHERIT: raise ValueError("Got INHERIT permission from get_effective_view_permissions") case _: raise ValueError(f"Unknown type {perm.type}") def is_lecture_authenticated(lecture_id: int) -> bool: if is_moderator(): return True from api.course import lecture_query_auth return is_authenticated(lecture_query_auth(lecture_id), True) def is_authenticated(auth_list: [], for_lecture: bool): if is_moderator(): return True perm = get_effective_view_permissions(auth_list, for_lecture) match perm.type: case ViewPermissionsType.PUBLIC: return True case ViewPermissionsType.PRIVATE: return False case ViewPermissionsType.AUTHENTICATION: if perm.rwth_authentication and ( session.get("rwthintern", False) or ("X-Real-IP" in request.headers and is_rwth_ip(request.headers["X-Real-IP"])) ): return True if perm.fsmpi_authentication and is_moderator(): return True for moodle_id in perm.moodle_course_ids: if str(moodle_id) in session.get("moodle_courses", []): return True if "auth_data" in session: for username, password in session["auth_data"].items(): if username not in perm.passwords: continue if password == perm.passwords[username]: return True case ViewPermissionsType.INHERIT: raise ValueError("Got INHERIT permission from get_effective_view_permissions") case _: raise ValueError(f"Unknown type {perm.type}") def authenticate_password(lecture_id: int, username: str, password: str): """ May throw APIClientException. Only returns if authentication was successful. """ from api.course import lecture_query_auth perm = get_effective_view_permissions(lecture_query_auth(lecture_id), True) if perm.type != ViewPermissionsType.AUTHENTICATION or len(perm.passwords) == 0: raise ApiClientException(ERROR_LECTURE_HAS_NO_PASSWORD) if username not in perm.passwords or password != perm.passwords[username]: raise ApiClientException(ERROR_AUTHENTICATION_FAILED) if "auth_data" not in session: session["auth_data"] = {} session["auth_data"][username] = password session.modified = True if DEBUG_ENABLED: def __ldap_authenticate(user: str, password: str) -> tuple[str, str, str, list[str]] or None: if user == "videoag" and password == "videoag": return "videoag", "Video", "Video", ["fachschaft", "videoag"] return None elif "LDAP_HOST" in api.config: import ldap3 from ldap3.core.exceptions import LDAPBindError, LDAPPasswordIsMandatoryError __LDAP_USER_REGEX = re.compile("[a-z0-9]+") __LDAP_MEMBER_OF_REGEX = re.compile("CN=([a-zA-Z0-9_-]+),CN=Users,DC=fsmpi,DC=rwth-aachen,DC=de") __LDAP_HOST = api.config["LDAP_HOST"] __LDAP_PORT = api.config["LDAP_PORT"] def __ldap_authenticate(user: str, password: str) -> tuple[str, str, str, list[str]] or None: """ Returns user id, given name, surname, list of groups """ if __LDAP_USER_REGEX.fullmatch(user) is None: return None try: server = ldap3.Server(host=__LDAP_HOST, port=__LDAP_PORT, use_ssl=True) conn = ldap3.Connection(server, f"fsmpi\\{user}", password, auto_bind=True, check_names=False) conn.search("cn=users,dc=fsmpi,dc=rwth-aachen,dc=de", f"(cn={user})", attributes=["memberOf", "givenName", "sn"]) attributes = conn.response[0]["attributes"] # TODO There also exists a display name given_name = attributes["givenName"][0] # Attribute should have exactly one value surname = attributes["sn"][0] # Attribute should have exactly one value groups = [] for val in attributes["memberOf"]: match = __LDAP_MEMBER_OF_REGEX.fullmatch(val) if not match: continue groups.append(match.group(1)) return user, given_name, surname, groups except (LDAPBindError, LDAPPasswordIsMandatoryError): return None else: def __ldap_authenticate(user: str, password: str) -> tuple[str, str, str, list[str]] or None: raise ApiClientException(ERROR_AUTHENTICATION_NOT_AVAILABLE()) _SQL_GET_USER_BY_NAME = PreparedStatement(""" SELECT * FROM "users" WHERE "name" = ? """) _SQL_INSERT_USER = PreparedStatement(""" INSERT INTO "users" ("name", "realname", "fsacc", "level", "calendar_key", "rfc6238") VALUES (?, ?, ?, 1, '', '') RETURNING "id" """) def _db_get_user_id(trans: ReadTransaction, user_id: str) -> int: user_list_db = trans.execute_statement_and_close(_SQL_GET_USER_BY_NAME, user_id) if len(user_list_db) < 1: raise ApiClientException(ERROR_AUTHENTICATION_NOT_AVAILABLE( "Site is read-only and we can not create a new account for you in the database")) user_db = user_list_db[0] return user_db["id"] def _db_get_or_create_user_id(trans: WriteTransaction, user_id: str, given_name: str) -> int: user_list_db = trans.execute_statement(_SQL_GET_USER_BY_NAME, user_id) if len(user_list_db) < 1: result = trans.execute_statement_and_commit( _SQL_INSERT_USER, user_id, given_name, user_id) return result[0]["id"] else: trans.commit() user_db = user_list_db[0] return user_db["id"] def authenticate_fsmpi(username: str, password: str) -> {}: """ May throw APIClientException. Only returns if authentication was successful. """ user_data = __ldap_authenticate(username, password) if user_data is None: raise ApiClientException(ERROR_AUTHENTICATION_FAILED) user_id, given_name, surname, groups = user_data if not __ldap_is_moderator(groups): raise ApiClientException(ERROR_AUTHENTICATION_FAILED) if api.live_config.is_readonly(): user_db_id = db_pool.execute_read_transaction(_db_get_user_id, user_id) else: user_db_id = db_pool.execute_write_transaction(_db_get_or_create_user_id, user_id, given_name) session["user"] = { "uid": user_id, "dbid": user_db_id, "givenName": given_name, "sn": surname } session["_csrf_token"] = "".join( secrets.choice(string.ascii_letters + string.digits) for _ in range(64)) session.permanent = True return get_user_info() def logout_moderator(): session.pop("user", None) session.pop("_csrf_token", None) session.permanent = False def __ldap_is_moderator(user_groups: []) -> bool: for group in api.config["LDAP_GROUPS"]: if group in user_groups: return True return False def start_rwth_authentication() -> str: """ :return: Verification url """ return __start_oauth("rwth") def is_rwth_authentication_running() -> bool: return "oauthscope" in session and "rwth" in session["oauthscope"] def is_moodle_authentication_running() -> bool: return "oauthscope" in session and "moodle" in session["oauthscope"] def start_moodle_authentication() -> str: """ :return: Verification url """ return __start_oauth("moodle") if DEBUG_ENABLED: debug_running_oauth = {} def __start_oauth(scope: str) -> str: if scope not in ["rwth", "moodle"]: raise ValueError("Expected scope to be rwth or moodle") oauth_code = "".join( secrets.choice(string.ascii_letters + string.digits) for _ in range(64)) debug_running_oauth[oauth_code] = { "scope": scope } session["oauthcode"] = oauth_code session["oauthscope"] = scope return "/debug/dummy_oauth/auth/" + oauth_code def try_finish_running_authentication(): if "oauthscope" not in session or "oauthcode" not in session: return oauth_scope = session["oauthscope"] oauth_code = session["oauthcode"] running_oauth = debug_running_oauth.get(oauth_code) if running_oauth is None or not running_oauth.get("finished", False): return del session["oauthcode"] del session["oauthscope"] session["rwthintern"] = True if oauth_scope == "moodle": session["moodle_courses"] = [] if "moodle_courses" in running_oauth: session["moodle_courses"] = running_oauth["moodle_courses"] return @api.app.route("/debug/dummy_oauth/auth/<code>", methods=["GET"]) def debug_authentication_dummy_oauth_auth(code): if code not in debug_running_oauth: return "Unknown code", HTTP_404_NOT_FOUND if debug_running_oauth[code].get("scope") == "moodle": return """ <html> <h1>Dummy Moodle OAuth</h1> <form action="/debug/dummy_oauth/submit/%s" method="post"> <label for="moodle_courses">Moodle courses (ids comma separated):</label> <input type="text" name="moodle_courses" id="moodle_courses"/> <input type="submit"/> </form> </html> """ % code, HTTP_200_OK else: return """ <html> <h1>Dummy RWTH OAuth</h1> <form action="/debug/dummy_oauth/submit/%s" method="post"> <input type="submit"/> </form> </html> """ % code, HTTP_200_OK @api.app.route("/debug/dummy_oauth/submit/<code>", methods=["POST"]) def debug_authentication_dummy_oauth_submit(code): if code not in debug_running_oauth: return "Unknown code", HTTP_404_NOT_FOUND running_oauth = debug_running_oauth[code] running_oauth["finished"] = True if running_oauth.get("scope") == "moodle" and "moodle_courses" in request.form: courses_str: str = request.form["moodle_courses"] running_oauth["moodle_courses"] = [str(int(s)) for s in courses_str .replace("\r", "") .replace("\n", "") .replace(" ", "") .replace("\t", "") .split(",")] return "Successfully Authenticated", HTTP_200_OK else: def __start_oauth(scope: str) -> str: if scope not in ["rwth", "moodle"]: raise ValueError("Expected scope to be rwth or moodle") response = __make_oauth_request( "code", scope="moodle.rwth" if scope == "moodle" else "userinfo.rwth" ) session["oauthcode"] = response["device_code"] session["oauthscope"] = scope return response["verification_url"] + "?q=verify&d=" + response["user_code"] def try_finish_running_authentication(): if "oauthcode" not in session or "oauthscope" not in session: return oauth_scope = session["oauthscope"] if oauth_scope not in ["rwth", "moodle"]: raise ValueError("Session auth scope set to something different than rwth or moodle") try: token_response = __make_oauth_request("token", code=session["oauthcode"], grant_type="device") if token_response.get("status") != "ok": return del session["oauthcode"] del session["oauthscope"] session["rwthintern"] = True if oauth_scope == "moodle": moodle_response = __make_moodle_request("getmyenrolledcourses", token_response["access_token"]) if moodle_response and moodle_response.get("Data"): moodle_courses = [] session["moodle_courses"] = moodle_courses for course in moodle_response["Data"]: moodle_courses.append(str(course["id"])) else: # TODO notify pass __make_oauth_request("token", refresh_token=token_response["refresh_token"], grant_type="invalidate") except ApiClientException: pass def get_currently_authenticated_methods() -> [str]: methods = [] if session.get("rwthintern", False): methods.append("rwth") if "moodle_courses" in session: methods.append("moodle") if "auth_data" in session and len(session["auth_data"]) > 0: methods.append("password") if is_moderator(): methods.append("fsmpi") return methods OAUTH_BASE = "https://oauth.campus.rwth-aachen.de/oauth2waitress/oauth2.svc/" MOODLE_BASE = "https://moped.ecampus.rwth-aachen.de/proxy/api/v2/eLearning/Moodle/" def __make_moodle_request(endpoint, token, **args): args["token"] = token return __handle_request_result(requests.request('GET', MOODLE_BASE + endpoint, params=args, timeout=30)) def __make_oauth_request(endpoint, **args): if "RWTH_API_KEY" not in api.config: raise ApiClientException(ERROR_AUTHENTICATION_NOT_AVAILABLE()) args["client_id"] = api.config["RWTH_API_KEY"] return __handle_request_result(requests.request('POST', OAUTH_BASE + endpoint, data=args, timeout=30)) def __handle_request_result(response: requests.Response): if response.ok: return response.json() try: response = response.json() except JSONDecodeError: response = None pass if response.status_code >= 500: raise ApiClientException(ERROR_AUTHENTICATION_NOT_AVAILABLE()) raise Exception("Error during request. Status: " + str(response.status_code) + " Response: " + str(response))