Skip to content
Snippets Groups Projects
Select Git revision
  • 730640f94915687a2b3048157434be316b8e28d4
  • main default
  • full_migration
  • v1.0.9 protected
  • v1.0.8 protected
  • v1.0.7 protected
  • v1.0.6 protected
  • v1.0.5 protected
  • v1.0.4 protected
  • v1.0.3 protected
  • v1.0.2 protected
  • v1.0.1 protected
  • v1.0 protected
13 results

database.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    authentication.py 21.53 KiB
    import random
    import string
    from enum import StrEnum
    from ipaddress import ip_address, ip_network
    import requests
    from requests.exceptions import JSONDecodeError
    from functools import wraps
    from flask import session, request
    
    from ldap import ldapauth
    
    from api.miscellaneous import *
    from api.database import *
    
    
    class ViewPermissionsType(StrEnum):
        PUBLIC = "public"
        PRIVATE = "private"
        AUTHENTICATION = "authentication"
        INHERIT = "inherit"
    
    
    class ViewPermissions:
        
        def __init__(self,
                     type: ViewPermissionsType,
                     rwth_authentication: bool or None = None,
                     fsmpi_authentication: bool or None = None,
                     moodle_course_ids: list[int] or None = None,
                     passwords: dict[str, str] or None = None):
            super().__init__()
            self.type = type
            self.rwth_authentication = rwth_authentication
            self.fsmpi_authentication = fsmpi_authentication
            self.moodle_course_ids = moodle_course_ids
            self.passwords = passwords
            
            is_auth = type == ViewPermissionsType.AUTHENTICATION
            if ((rwth_authentication is not None) != is_auth) \
                    or ((fsmpi_authentication is not None) != is_auth) \
                    or ((moodle_course_ids is not None) != is_auth) \
                    or ((passwords is not None) != is_auth):
                raise TypeError()
            if is_auth \
                    and not rwth_authentication \
                    and not fsmpi_authentication \
                    and len(moodle_course_ids) == 0 \
                    and len(passwords) == 0:
                raise RuntimeError("At least one authentication method must be present for type AUTHENTICATION")
        
        @staticmethod
        def from_json(json: CJsonValue or JsonTypes) -> "ViewPermissions":
            is_client = isinstance(json, CJsonValue)
            if not is_client:
                json = CJsonValue(json)
            json = json.as_object()
            try:
                type_str: str = json.get_string("type", 100)
                try:
                    type = ViewPermissionsType(type_str)
                except ValueError:
                    raise json.raise_error(f"Unknown type '{truncate_string(type_str)}'")
                rwth_authentication = None
                fsmpi_authentication = None
                moodle_course_ids = None
                passwords = None
                if type == ViewPermissionsType.AUTHENTICATION:
                    rwth_authentication = json.get_bool("rwth_authentication")
                    fsmpi_authentication = json.get_bool("fsmpi_authentication")
                    moodle_course_ids = [v.as_sint32() for v in json.get_array("moodle_course_ids")]
                    passwords_json = json.get_object("passwords")
                    passwords = {k: passwords_json.get_string(k, 1024) for k in passwords_json.keys()}
                    
                    if not rwth_authentication \
                            and not fsmpi_authentication \
                            and len(moodle_course_ids) == 0 \
                            and len(passwords) == 0:
                        raise ApiClientException(ERROR_BAD_REQUEST(f"For type {ViewPermissionsType.AUTHENTICATION.value} at"
                                                                   f" least one authentication method must be present"))
                else:
                    json.raise_if_present("rwth_authentication")
                    json.raise_if_present("fsmpi_authentication")
                    json.raise_if_present("moodle_course_ids")
                    json.raise_if_present("passwords")
                return ViewPermissions(
                    type,
                    rwth_authentication,
                    fsmpi_authentication,
                    moodle_course_ids,
                    passwords
                )
            except ApiClientException as e:
                if is_client:
                    raise e
                raise RuntimeError(f"Invalid json: {e.error.message}")
        
        def to_json(self) -> dict:
            if self.type == ViewPermissionsType.AUTHENTICATION:
                return {
                    "type": self.type.value,
                    "rwth_authentication": self.rwth_authentication,
                    "fsmpi_authentication": self.fsmpi_authentication,
                    "moodle_course_ids": self.moodle_course_ids,
                    "passwords": self.passwords
                }
            else:
                return {
                    "type": self.type.value
                }
    
    
    class EffectiveViewPermissions(ViewPermissions):
        
        def __init__(self,
                     object_permissions: ViewPermissions,
                     inherited_permissions: ViewPermissions = ViewPermissions(ViewPermissionsType.PUBLIC)):
            """
            :param object_permissions: The permissions set for the object
            :param inherited_permissions: Only relevant if `permissions.type == INHERIT`. May not have type INHERIT
            """
            effective = object_permissions
            if object_permissions.type == ViewPermissionsType.INHERIT:
                if inherited_permissions.type == ViewPermissionsType.INHERIT:
                    raise RuntimeError("No indirect inheritance allowed")
                effective = inherited_permissions
            super().__init__(
                effective.type,
                effective.rwth_authentication,
                effective.fsmpi_authentication,
                effective.moodle_course_ids,
                effective.passwords
            )
            self.object_permissions = object_permissions
    
    
    def api_moderator_route(require_csrf_token: bool = False):
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                if not is_moderator():
                    raise ApiClientException(ERROR_UNAUTHORIZED)
                if require_csrf_token:
                    check_csrf_token()
                return func(*args, **kwargs)
            
            return wrapper
        
        return decorator
    
    
    def is_moderator():
        return "user" in session
    
    
    def check_csrf_token():
        if "X-Csrf-Token" not in request.headers \
                or not is_valid_csrf_token(request.headers["X-Csrf-Token"]):
            raise ApiClientException(ERROR_INVALID_CSRF_TOKEN)
    
    
    def is_valid_csrf_token(csrf_token: str) -> bool:
        if "_csrf_token" not in session:
            return False
        return csrf_token == session["_csrf_token"]
    
    
    def get_csrf_token():
        if "_csrf_token" not in session:
            raise RuntimeError("No csrf token in session. Probably not a moderator")
        return session["_csrf_token"]
    
    
    def get_user_id() -> int or None:
        if "user" not in session:
            return None
        return session["user"]["dbid"]
    
    
    def get_user_info():
        if "user" not in session:
            return None
        user = session["user"]
        return {
            "id": user["dbid"],
            "id_string": user["uid"],
            "name": user["givenName"]
        }
    
    
    def is_rwth_ip(ip_string: str) -> bool:
        ip = ip_address(ip_string)
        for net in server.config['RWTH_IP_RANGES']:
            if ip in ip_network(net):
                return True
        return False
    
    
    def _get_raw_view_permissions(auth_list_db: []) -> ViewPermissions:
        if len(auth_list_db) == 0:
            return ViewPermissions(ViewPermissionsType.INHERIT)
        has_rwth: bool = False
        has_fsmpi: bool = False
        has_none: bool = False
        moodle_course_ids: list[int] = []
        passwords: dict[str, str] = {}
        for auth_db in auth_list_db:
            match auth_db["type"]:
                case "public":
                    return ViewPermissions(ViewPermissionsType.PUBLIC)
                case "rwth":
                    has_rwth = True
                case "moodle":
                    moodle_course_ids.append(int(auth_db["param1"]))
                case "fsmpi":
                    has_fsmpi = True
                case "password":
                    username = auth_db["param1"]
                    password = auth_db["param2"]
                    if username in passwords:
                        print(f"Warning: Duplicate username in password permissions: {username} (Perm id: {auth_db['id']})")
                    passwords[username] = password
                case "none":
                    has_none = True
                case "l2p":
                    pass
                case _:
                    raise ValueError("Unknown authentication method: " + str(type))
        
        if has_rwth or has_fsmpi or len(moodle_course_ids) > 0 or len(passwords) > 0:
            return ViewPermissions(
                ViewPermissionsType.AUTHENTICATION,
                has_rwth,
                has_fsmpi,
                moodle_course_ids,
                passwords
            )
        
        if has_none:
            return ViewPermissions(ViewPermissionsType.PRIVATE)
        
        # We must only have a l2p permission. This is effectively private
        return ViewPermissions(ViewPermissionsType.PRIVATE)
    
    
    def get_effective_view_permissions(auth_list_db: [], for_lecture: bool) -> EffectiveViewPermissions:
        course_auth = []
        lecture_auth = []
        for auth_db in auth_list_db:
            if "course_id" in auth_db and auth_db["course_id"] is not None:
                course_auth.append(auth_db)
            elif "lecture_id" in auth_db and auth_db["lecture_id"] is not None:
                lecture_auth.append(auth_db)
            elif "video_id" in auth_db and auth_db["video_id"] is not None:
                pass  # Not supported anymore
            else:
                raise Exception("Permission has no course or lecture id set")
        
        course_perm = EffectiveViewPermissions(_get_raw_view_permissions(course_auth))
        
        if for_lecture:
            return EffectiveViewPermissions(
                _get_raw_view_permissions(lecture_auth),
                course_perm
            )
        else:
            return course_perm
    
    
    def get_authentication_methods(auth_methods_db: [], for_lecture: bool) -> [str]:
        perm = get_effective_view_permissions(auth_methods_db, for_lecture)
        match perm.type:
            case ViewPermissionsType.PUBLIC:
                return ["public"]
            case ViewPermissionsType.PRIVATE:
                return []
            case ViewPermissionsType.AUTHENTICATION:
                auth_list_json: list[str] = []
                if perm.rwth_authentication:
                    auth_list_json.append("rwth")
                if perm.fsmpi_authentication:
                    auth_list_json.append("fsmpi")
                if len(perm.moodle_course_ids) > 0:
                    auth_list_json.append("moodle")
                if len(perm.passwords) > 0:
                    auth_list_json.append("password")
                return auth_list_json
            case ViewPermissionsType.INHERIT:
                raise ValueError("Got INHERIT permission from get_effective_view_permissions")
            case _:
                raise ValueError(f"Unknown type {perm.type}")
    
    
    def is_lecture_authenticated(lecture_id: int) -> bool:
        if is_moderator():
            return True
        from api.course import lecture_query_auth
        return is_authenticated(lecture_query_auth(lecture_id), True)
    
    
    def is_authenticated(auth_list: [], for_lecture: bool):
        if is_moderator():
            return True
        perm = get_effective_view_permissions(auth_list, for_lecture)
        match perm.type:
            case ViewPermissionsType.PUBLIC:
                return True
            case ViewPermissionsType.PRIVATE:
                return False
            case ViewPermissionsType.AUTHENTICATION:
                if perm.rwth_authentication and (
                        session.get("rwthintern", False)
                        or ("X-Real-IP" in request.headers and is_rwth_ip(request.headers["X-Real-IP"]))
                ):
                    return True
                
                if perm.fsmpi_authentication and is_moderator():
                    return True
                
                for moodle_id in perm.moodle_course_ids:
                    if moodle_id in session.get("moodle_courses", []):
                        return True
                
                if "auth_data" in session:
                    for username, password in session["auth_data"].items():
                        if username not in perm.passwords:
                            continue
                        if password == perm.passwords[username]:
                            return True
            
            case ViewPermissionsType.INHERIT:
                raise ValueError("Got INHERIT permission from get_effective_view_permissions")
            case _:
                raise ValueError(f"Unknown type {perm.type}")
    
    
    def authenticate_password(lecture_id: int, username: str, password: str):
        """
        May throw APIClientException.
        
        Only returns if authentication was successful.
        """
        from api.course import lecture_query_auth
        perm = get_effective_view_permissions(lecture_query_auth(lecture_id), True)
        if perm.type != ViewPermissionsType.AUTHENTICATION or len(perm.passwords) == 0:
            raise ApiClientException(ERROR_LECTURE_HAS_NO_PASSWORD)
        
        if username not in perm.passwords or password != perm.passwords[username]:
            raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
        
        if "auth_data" not in session:
            session["auth_data"] = {}
        session["auth_data"][username] = password
        session.modified = True
    
    
    _SQL_GET_USER_BY_NAME = PreparedStatement("""
    SELECT * FROM `users` WHERE `name` = ?
    """)
    _SQL_INSERT_USER = PreparedStatement("""
    INSERT INTO `users`
    (`name`, `realname`, `fsacc`, `level`, `calendar_key`, `rfc6238`)
    VALUES (?, ?, ?, 1, "", "")
    """)
    
    
    def authenticate_fsmpi(username: str, password: str) -> {}:
        """
        May throw APIClientException.
        
        Only returns if authentication was successful.
        """
        user_info, groups = ldapauth(username, password)
        user_id: str = user_info.get("uid")
        if not user_id or not __ldap_is_moderator(groups):
            raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
        session["user"] = user_info
        
        with db_pool.start_write_transaction() as trans:
            user_list_db = trans.execute_statement(_SQL_GET_USER_BY_NAME, user_id)
            if len(user_list_db) < 1:
                result = trans.execute_statement_full_result_and_commit(
                    _SQL_INSERT_USER, user_id, user_info["givenName"], user_id)
                user_db_id = result[DB_RESULT_AUTO_INCREMENT]
            else:
                trans.commit()
                user_db = user_list_db[0]
                user_db_id = user_db["id"]
        
        user_info["dbid"] = user_db_id
        session["_csrf_token"] = "".join(
            random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
        session.permanent = True
        
        return get_user_info()
    
    
    def logout_moderator():
        session.pop("user", None)
        session.pop("_csrf_token", None)
        session.permanent = False
    
    
    def __ldap_is_moderator(user_groups: []) -> bool:
        for group in server.config["LDAP_GROUPS"]:
            if group in user_groups:
                return True
        return False
    
    
    def start_rwth_authentication() -> str:
        """
        :return: Verification url
        """
        return __start_oauth("rwth")
    
    
    def is_rwth_authentication_running() -> bool:
        return "oauthscope" in session and "rwth" in session["oauthscope"]
    
    
    def is_moodle_authentication_running() -> bool:
        return "oauthscope" in session and "moodle" in session["oauthscope"]
    
    
    def start_moodle_authentication() -> str:
        """
        :return: Verification url
        """
        return __start_oauth("moodle")
    
    
    if DEBUG_ENABLED:
        
        debug_running_oauth = {}
        
        
        def __start_oauth(scope: str) -> str:
            if scope not in ["rwth", "moodle"]:
                raise ValueError("Expected scope to be rwth or moodle")
            oauth_code = "".join(
                random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
            
            debug_running_oauth[oauth_code] = {
                "scope": scope
            }
            session["oauthcode"] = oauth_code
            session["oauthscope"] = scope
            return "/debug/dummy_oauth/auth/" + oauth_code
        
        
        def try_finish_running_authentication():
            if "oauthscope" not in session or "oauthcode" not in session:
                return
            oauth_scope = session["oauthscope"]
            oauth_code = session["oauthcode"]
            
            running_oauth = debug_running_oauth.get(oauth_code)
            if running_oauth is None or not running_oauth.get("finished", False):
                return
            
            del session["oauthcode"]
            del session["oauthscope"]
            
            session["rwthintern"] = True
            if oauth_scope == "moodle":
                session["moodle_courses"] = []
                if "moodle_courses" in running_oauth:
                    session["moodle_courses"] = running_oauth["moodle_courses"]
            
            return
        
        
        @server.app.route("/debug/dummy_oauth/auth/<code>", methods=["GET"])
        def debug_authentication_dummy_oauth_auth(code):
            if code not in debug_running_oauth:
                return "Unknown code", HTTP_404_NOT_FOUND
            if debug_running_oauth[code].get("scope") == "moodle":
                return """
                    <html>
                        <h1>Dummy Moodle OAuth</h1>
                        <form action="/debug/dummy_oauth/submit/%s" method="post">
                            <label for="moodle_courses">Moodle courses (ids comma separated):</label>
                            <input type="text" name="moodle_courses" id="moodle_courses"/>
                            <input type="submit"/>
                        </form>
                    </html>
                    """ % code, HTTP_200_OK
            else:
                return """
                    <html>
                        <h1>Dummy RWTH OAuth</h1>
                        <form action="/debug/dummy_oauth/submit/%s" method="post">
                            <input type="submit"/>
                        </form>
                    </html>
                    """ % code, HTTP_200_OK
        
        
        @server.app.route("/debug/dummy_oauth/submit/<code>", methods=["POST"])
        def debug_authentication_dummy_oauth_submit(code):
            if code not in debug_running_oauth:
                return "Unknown code", HTTP_404_NOT_FOUND
            
            running_oauth = debug_running_oauth[code]
            running_oauth["finished"] = True
            
            if running_oauth.get("scope") == "moodle" and "moodle_courses" in request.form:
                courses_str: str = request.form["moodle_courses"]
                running_oauth["moodle_courses"] = [str(int(s)) for s in courses_str
                                                   .replace("\r", "")
                                                   .replace("\n", "")
                                                   .replace(" ", "")
                                                   .replace("\t", "")
                                                   .split(",")]
            return "Successfully Authenticated", HTTP_200_OK
    
    else:
        def __start_oauth(scope: str) -> str:
            if scope not in ["rwth", "moodle"]:
                raise ValueError("Expected scope to be rwth or moodle")
            response = __make_oauth_request(
                "code",
                scope="moodle.rwth" if scope == "moodle" else "userinfo.rwth"
            )
            session["oauthcode"] = response["device_code"]
            session["oauthscope"] = scope
            return response["verification_url"] + "?q=verify&d=" + response["user_code"]
        
        
        def try_finish_running_authentication():
            if "oauthcode" not in session or "oauthscope" not in session:
                return
            oauth_scope = session["oauthscope"]
            if oauth_scope not in ["rwth", "moodle"]:
                raise ValueError("Session auth scope set to something different than rwth or moodle")
            try:
                token_response = __make_oauth_request("token", code=session["oauthcode"], grant_type="device")
                if token_response.get("status") != "ok":
                    return
                del session["oauthcode"]
                del session["oauthscope"]
                
                session["rwthintern"] = True
                if oauth_scope == "moodle":
                    moodle_response = __make_moodle_request("getmyenrolledcourses", token_response["access_token"])
                    if moodle_response and moodle_response.get("Data"):
                        moodle_courses = []
                        session["moodle_courses"] = moodle_courses
                        for course in moodle_response["Data"]:
                            moodle_courses.append(str(course["id"]))
                    else:
                        server.notify_admins(
                            'endpoint_exception',
                            traceback="try_finish_running_authentication failed while getting moodle courses, data={}"
                            .format(str(moodle_response)))
                
                __make_oauth_request("token", refresh_token=token_response["refresh_token"], grant_type="invalidate")
            except ApiClientException:
                pass
    
    
    def get_currently_authenticated_methods() -> [str]:
        methods = []
        if session.get("rwthintern", False):
            methods.append("rwth")
        if "moodle_courses" in session:
            methods.append("moodle")
        if "auth_data" in session and len(session["auth_data"]) > 0:
            methods.append("password")
        if is_moderator():
            methods.append("fsmpi")
        return methods
    
    
    OAUTH_BASE = "https://oauth.campus.rwth-aachen.de/oauth2waitress/oauth2.svc/"
    MOODLE_BASE = "https://moped.ecampus.rwth-aachen.de/proxy/api/v2/eLearning/Moodle/"
    
    
    def __make_moodle_request(endpoint, token, **args):
        args["token"] = token
        return __handle_request_result(requests.request('GET', MOODLE_BASE + endpoint, params=args, timeout=30))
    
    
    def __make_oauth_request(endpoint, **args):
        if "L2P_APIKEY" not in server.config:
            raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
        
        args["client_id"] = server.config["L2P_APIKEY"]
        return __handle_request_result(requests.request('POST', OAUTH_BASE + endpoint, data=args, timeout=30))
    
    
    def __handle_request_result(response: requests.Response):
        if response.ok:
            return response.json()
        
        try:
            response = response.json()
        except JSONDecodeError:
            response = None
            pass
        
        if response.status_code >= 500:
            raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
        raise Exception("Error during request. Status: " + str(response.status_code) + " Response: " + str(response))