Skip to content
Snippets Groups Projects
Select Git revision
  • 34b6503fe0753feeedb6df7af9b64c021e4b7c85
  • main default
  • full_migration
  • v1.0.9 protected
  • v1.0.8 protected
  • v1.0.7 protected
  • v1.0.6 protected
  • v1.0.5 protected
  • v1.0.4 protected
  • v1.0.3 protected
  • v1.0.2 protected
  • v1.0.1 protected
  • v1.0 protected
13 results

run_tests.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    authentication.py 13.55 KiB
    from ipaddress import ip_address, ip_network
    import requests
    
    from api.course import lecture_query_auth
    from api.errors import *
    from json.decoder import JSONDecodeError
    from server import config, session, request
    
    
    def is_moderator():
        return 'user' in session
    
    
    def is_rwth_ip(ip_string: str) -> bool:
        ip = ip_address(ip_string)
        for net in config['RWTH_IP_RANGES']:
            if ip in ip_network(net):
                return True
        return False
    
    
    def get_effective_auth(auth_list_db: []) -> []:
        course_auth = []
        lecture_auth = []
        for auth_db in auth_list_db:
            if "course_id" in auth_db:
                course_auth.append(auth_db)
            elif "lecture_id" in auth_db:
                lecture_auth.append(auth_db)
            elif "video_id" in auth_db:
                pass  # Not supported anymore
            else:
                raise Exception("Permission has no course, lecture or video id set")
        if len(lecture_auth) > 0:
            return lecture_auth
        if len(course_auth) > 0:
            return course_auth
        return [{"type": "public"}]
    
    
    def get_authentication_methods(auth_methods_db: []) -> [str]:
        auth_methods_db = get_effective_auth(auth_methods_db)
        has_rwth: bool = False
        has_moodle: bool = False
        has_fsmpi: bool = False
        has_password: bool = False
        has_none: bool = False
        for auth_method_db in auth_methods_db:
            if auth_method_db["type"] == "public":
                return ["public"]
            elif auth_method_db["type"] == "rwth":
                has_rwth = True
            elif auth_method_db["type"] == "moodle":
                has_moodle = True
            elif auth_method_db["type"] == "fsmpi":
                has_fsmpi = True
            elif auth_method_db["type"] == "password":
                has_password = True
            elif auth_method_db["type"] == "none":
                has_none = True
            else:
                raise ValueError("Unknown authentication method: " + str(auth_method_db["type"]))
        
        auth_list_json = []
        if has_rwth:
            auth_list_json.append("rwth")
        if has_moodle:
            auth_list_json.append("moodle")
        if has_fsmpi:
            auth_list_json.append("fsmpi")
        if has_password:
            auth_list_json.append("password")
        
        if len(auth_list_json) > 0:
            return auth_list_json
        
        if has_none:
            return []
        raise Exception("Got empty array from get_effective_auth")
    
    
    def is_lecture_authenticated(lecture_id: int) -> bool:
        return is_authenticated(lecture_query_auth(lecture_id))
    
    
    def is_authenticated(auth_list: []):
        auth_list = get_effective_auth(auth_list)
        for auth in auth_list:
            auth_type = auth["type"]
            if auth_type == "public":
                return True
            elif auth_type == "password":
                if not "auth_data" in session:
                    continue
                for username, password in session["auth_data"].items():
                    if auth["param1"] == username and auth["param2"] == password:
                        return True
            elif auth_type == "l2p":
                continue  # Not supported anymore
            elif auth_type == "moodle":
                if auth["param1"] in session.get("moodle_courses", []):
                    return True
            elif auth_type == "rwth":
                if session.get("rwthintern", False):
                    return True
                if "X-Real-IP" not in request.headers:
                    continue
                if is_rwth_ip(request.headers["X-Real-IP"]):
                    return True
            elif auth_type == "fsmpi":
                if is_moderator():
                    return True
            else:
                raise Exception("Unknown permission type: " + str(auth_type))
        return False
    
    
    def authenticate_password(lecture_id: int, username: str, password: str):
        """
        May throw APIClientException.
        
        Only returns if authentication was successful.
        """
        auth_list = get_effective_auth(lecture_query_auth(lecture_id))
        found_password: bool = False
        for auth in auth_list:
            if auth["type"] != "password":
                continue
            found_password = True
            if auth["param1"] != username or auth["param2"] != password:
                continue
            if "auth_data" not in session:
                session["auth_data"] = {}
            session["auth_data"][username] = password
            return
        if not found_password:
            raise ApiClientException(ERROR_LECTURE_HAS_NO_PASSWORD)
        raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
    
    
    def authenticate_fsmpi(username: str, password: str) -> {}:
        """
        May throw APIClientException.
        
        Only returns if authentication was successful.
        """
        user_info, groups = ldapauth(username, password)
        user_id: str = user_info.get("uid")
        if not user_id or not __ldap_is_moderator(groups):
            raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
        session["user"] = user_info
        
        user_list_db = query("""\
            SELECT * FROM users WHERE name = ? \
            """, user_id)
        if len(user_list_db) < 1:
            modify("""\
                INSERT INTO users \
                (name, realname, fsacc, level, calendar_key, rfc6238) \
                VALUES (?, ?, ?, 1, "", "") \
                """, user_id, user_info["givenName"], user_id)
            user_list_db = query("""\
                SELECT * FROM users WHERE name = ? \
                """, user_id)
            if len(user_list_db) < 1:
                raise Exception("User not present in database after insertion")
        user_db = user_list_db[0]
        database_id = user_db["id"]
        
        user_info["dbid"] = database_id
        session["_csrf_token"] = "".join(
            random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
        session.permanent = True
        
        return {
            "id": database_id,
            "id_string": user_id,
            "name": user_info["givenName"]
        }
    
    
    def __ldap_is_moderator(user_groups: []) -> bool:
        for group in config["LDAP_GROUPS"]:
            if group in user_groups:
                return True
        return False
    
    
    def start_rwth_authentication() -> str:
        """
        :return: Verification url
        """
        return __start_oauth("rwth")
    
    
    def is_rwth_authentication_running() -> bool:
        return "oauthscope" in session and "rwth" in session["oauthscope"]
    
    
    def is_moodle_authentication_running() -> bool:
        return "oauthscope" in session and "moodle" in session["oauthscope"]
    
    
    def start_moodle_authentication() -> str:
        """
        :return: Verification url
        """
        return __start_oauth("moodle")
    
    
    if config.get("DEBUG", False):
        
        debug_running_oauth = {}
        
        def __start_oauth(scope: str) -> str:
            if scope not in ["rwth", "moodle"]:
                raise ValueError("Expected scope to be rwth or moodle")
            oauth_code = "".join(
                random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
    
            debug_running_oauth[oauth_code] = {
                "scope": scope
            }
            session["oauthcode"] = oauth_code
            session["oauthscope"] = scope
            return "/debug/dummy_oauth/auth/" + oauth_code
        
        
        def try_finish_running_authentication():
            if "oauthscope" not in session or "oauthcode" not in session:
                return
            oauth_scope = session["oauthscope"]
            oauth_code = session["oauthcode"]
            
            running_oauth = debug_running_oauth.get(oauth_code)
            if running_oauth is None or not running_oauth.get("finished", False):
                return
    
            del session["oauthcode"]
            del session["oauthscope"]
            
            session["rwthintern"] = True
            if oauth_scope == "moodle":
                session["moodle_courses"] = []
                if "moodle_courses" in running_oauth:
                    session["moodle_courses"] = running_oauth["moodle_courses"]
            
            del debug_running_oauth[oauth_code]
            return
        
        
        @app.route("/debug/dummy_oauth/auth/<code>", methods=["GET"])
        def debug_authentication_dummy_oauth_auth(code):
            if code not in debug_running_oauth:
                return "Unknown code", HTTP_404_NOT_FOUND
            if debug_running_oauth[code].get("scope") == "moodle":
                return """
                    <html>
                        <h1>Dummy Moodle OAuth</h1>
                        <form action="/debug/dummy_oauth/submit/%s" method="post">
                            <label for="moodle_courses">Moodle courses (ids comma separated):</label>
                            <input type="text" name="moodle_courses" id="moodle_courses"/>
                            <input type="submit"/>
                        </form>
                    </html>
                    """ % code, HTTP_200_OK
            else:
                return """
                    <html>
                        <h1>Dummy RWTH OAuth</h1>
                        <form action="/debug/dummy_oauth/submit/%s" method="post">
                            <input type="submit"/>
                        </form>
                    </html>
                    """ % code, HTTP_200_OK
        
        
        @app.route("/debug/dummy_oauth/submit/<code>", methods=["POST"])
        def debug_authentication_dummy_oauth_submit(code):
            if code not in debug_running_oauth:
                return "Unknown code", HTTP_404_NOT_FOUND
            
            running_oauth = debug_running_oauth[code]
            running_oauth["finished"] = True
            
            print(str(request.form))
            if running_oauth.get("scope") == "moodle" and "moodle_courses" in request.form:
                courses_str: str = request.form["moodle_courses"]
                running_oauth["moodle_courses"] = [str(int(s)) for s in courses_str
                                                         .replace("\r", "")
                                                         .replace("\n", "")
                                                         .replace(" ", "")
                                                         .replace("\t", "")
                                                         .split(",")]
            return "Successfully Authenticated", HTTP_200_OK
    
    else:
        def __start_oauth(scope: str) -> str:
            if scope not in ["rwth", "moodle"]:
                raise ValueError("Expected scope to be rwth or moodle")
            response = __make_oauth_request(
                "code",
                scope="moodle.rwth" if scope == "moodle" else "userinfo.rwth"
            )
            session["oauthcode"] = response["device_code"]
            session["oauthscope"] = scope
            return response["verification_url"] + "?q=verify&d=" + response["user_code"]
        
        
        def try_finish_running_authentication():
            if "oauthcode" not in session or "oauthscope" not in session:
                return
            oauth_scope = session["oauthscope"]
            if not oauth_scope in ["rwth", "moodle"]:
                raise ValueError("Session auth scope set to something different than rwth or moodle")
            try:
                token_response = __make_oauth_request("token", code=session["oauthcode"], grant_type="device")
                if token_response.get("status") != "ok":
                    return
                del session["oauthcode"]
                del session["oauthscope"]
                
                session["rwthintern"] = True
                if oauth_scope == "moodle":
                    moodle_response = __make_moodle_request("getmyenrolledcourses", token_response["access_token"])
                    if moodle_response and moodle_response.get("Data"):
                        moodle_courses = []
                        session["moodle_courses"] = moodle_courses
                        for course in moodle_response["Data"]:
                            moodle_courses.append(str(course["id"]))
                    else:
                        notify_admins(
                            'endpoint_exception',
                            traceback="try_finish_running_authentication failed while getting moodle courses, data={}"
                            .format(str(moodle_response)))
                
                __make_oauth_request("token", refresh_token=token_response["refresh_token"], grant_type="invalidate")
            except ApiClientException:
                pass
    
    
    def get_currently_authenticated_methods() -> [str]:
        methods = []
        if session.get("rwthintern", False):
            methods.append("rwth")
        if "moodle_courses" in session:
            methods.append("moodle")
        if "auth_data" in session and len(session["auth_data"]) > 0:
            methods.append("password")
        if is_moderator():
            methods.append("fsmpi")
        return methods
    
    
    OAUTH_BASE = "https://oauth.campus.rwth-aachen.de/oauth2waitress/oauth2.svc/"
    MOODLE_BASE = "https://moped.ecampus.rwth-aachen.de/proxy/api/v2/eLearning/Moodle/"
    
    
    def __make_moodle_request(endpoint, token, **args):
        args["token"] = token
        return __handle_request_result(requests.request('GET', MOODLE_BASE + endpoint, params=args, timeout=30))
    
    
    def __make_oauth_request(endpoint, **args):
        if "L2P_APIKEY" not in config:
            raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
        
        args["client_id"] = config["L2P_APIKEY"]
        return __handle_request_result(requests.request('POST', OAUTH_BASE + endpoint, data=args, timeout=30))
    
    
    def __handle_request_result(response: requests.Response):
        if response.ok:
            return response.json()
        
        try:
            response = response.json()
        except JSONDecodeError:
            response = None
            pass
        
        if response.status_code >= 500:
            raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
        raise Exception("Error during request. Status: " + str(response.status_code) + " Response: " + str(response))