Skip to content
Snippets Groups Projects
Commit b802d3ea authored by Simon Künzel's avatar Simon Künzel
Browse files

Included new files for authentication

parent 4548132f
No related branches found
No related tags found
No related merge requests found
from ipaddress import ip_address, ip_network
import requests
from api.course import lecture_query_auth
from api.errors import *
from json.decoder import JSONDecodeError
from server import config, session, request
def is_moderator():
return 'user' in session
def is_rwth_ip(ip_string: str) -> bool:
ip = ip_address(ip_string)
for net in config['RWTH_IP_RANGES']:
if ip in ip_network(net):
return True
return False
def get_effective_auth(auth_list_db: []) -> []:
course_auth = []
lecture_auth = []
for auth_db in auth_list_db:
if "course_id" in auth_db:
course_auth.append(auth_db)
elif "lecture_id" in auth_db:
lecture_auth.append(auth_db)
elif "video_id" in auth_db:
pass # Not supported anymore
else:
raise Exception("Permission has no course, lecture or video id set")
if len(lecture_auth) > 0:
return lecture_auth
if len(course_auth) > 0:
return course_auth
return [{"type": "public"}]
def get_authentication_methods(auth_methods_db: []) -> [str]:
auth_methods_db = get_effective_auth(auth_methods_db)
has_rwth: bool = False
has_moodle: bool = False
has_fsmpi: bool = False
has_password: bool = False
has_none: bool = False
for auth_method_db in auth_methods_db:
if auth_method_db["type"] == "public":
return ["public"]
elif auth_method_db["type"] == "rwth":
has_rwth = True
elif auth_method_db["type"] == "moodle":
has_moodle = True
elif auth_method_db["type"] == "fsmpi":
has_fsmpi = True
elif auth_method_db["type"] == "password":
has_password = True
elif auth_method_db["type"] == "none":
has_none = True
else:
raise ValueError("Unknown authentication method: " + str(auth_method_db["type"]))
auth_list_json = []
if has_rwth:
auth_list_json.append("rwth")
if has_moodle:
auth_list_json.append("moodle")
if has_fsmpi:
auth_list_json.append("fsmpi")
if has_password:
auth_list_json.append("password")
if len(auth_list_json) > 0:
return auth_list_json
if has_none:
return []
raise Exception("Got empty array from get_effective_auth")
def is_lecture_authenticated(lecture_id: int) -> bool:
return is_authenticated(lecture_query_auth(lecture_id))
def is_authenticated(auth_list: []):
auth_list = get_effective_auth(auth_list)
for auth in auth_list:
auth_type = auth["type"]
if auth_type == "public":
return True
elif auth_type == "password":
if not "auth_data" in session:
continue
for username, password in session["auth_data"].items():
if auth["param1"] == username and auth["param2"] == password:
return True
elif auth_type == "l2p":
continue # Not supported anymore
elif auth_type == "moodle":
if auth["param1"] in session.get("moodle_courses", []):
return True
elif auth_type == "rwth":
if session.get("rwthintern", False):
return True
if "X-Real-IP" not in request.headers:
continue
if is_rwth_ip(request.headers["X-Real-IP"]):
return True
else:
raise Exception("Unknown permission type: " + str(auth_type))
return False
def authenticate_password(lecture_id: int, username: str, password: str):
"""
May throw APIClientException.
Only returns if authentication was successful.
"""
auth_list = get_effective_auth(lecture_query_auth(lecture_id))
found_password: bool = False
for auth in auth_list:
if auth["type"] != "password":
continue
found_password = True
if auth["param1"] != username or auth["param2"] != password:
continue
if "auth_data" not in session:
session["auth_data"] = {}
session["auth_data"][username] = password
return
if not found_password:
raise ApiClientException(ERROR_LECTURE_HAS_NO_PASSWORD)
raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
def authenticate_fsmpi(username: str, password: str) -> {}:
"""
May throw APIClientException.
Only returns if authentication was successful.
"""
user_info, groups = ldapauth(username, password)
user_id: str = user_info.get("uid")
if not user_id or not __ldap_is_moderator(groups):
raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
session["user"] = user_info
user_list_db = query("""\
SELECT * FROM users WHERE name = ? \
""", user_id)
if len(user_list_db) < 1:
modify("""\
INSERT INTO users \
(name, realname, fsacc, level, calendar_key, rfc6238) \
VALUES (?, ?, ?, 1, "", "") \
""", user_id, user_info["givenName"], user_id)
user_list_db = query("""\
SELECT * FROM users WHERE name = ? \
""", user_id)
if len(user_list_db) < 1:
raise Exception("User not present in database after insertion")
user_db = user_list_db[0]
database_id = user_db["id"]
user_info["dbid"] = database_id
session["_csrf_token"] = "".join(
random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
session.permanent = True
return {
"id": database_id,
"id_string": user_id,
"name": user_info["givenName"]
}
def __ldap_is_moderator(user_groups: []) -> bool:
for group in config["LDAP_GROUPS"]:
if group in user_groups:
return True
return False
def start_rwth_authentication() -> str:
"""
:return: Verification url
"""
return __start_oauth("rwth")
def is_rwth_authentication_running() -> bool:
return "oauthscope" in session and "rwth" in session["oauthscope"]
def is_moodle_authentication_running() -> bool:
return "oauthscope" in session and "moodle" in session["oauthscope"]
def start_moodle_authentication() -> str:
"""
:return: Verification url
"""
return __start_oauth("moodle")
if config.get("DEBUG", False):
debug_running_oauth = {}
def __start_oauth(scope: str) -> str:
if scope not in ["rwth", "moodle"]:
raise ValueError("Expected scope to be rwth or moodle")
oauth_code = "".join(
random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
debug_running_oauth[oauth_code] = {
"scope": scope
}
session["oauthcode"] = oauth_code
session["oauthscope"] = scope
return "/debug/dummy_oauth/auth/" + oauth_code
def try_finish_running_authentication():
if "oauthscope" not in session or "oauthcode" not in session:
return
oauth_scope = session["oauthscope"]
oauth_code = session["oauthcode"]
running_oauth = debug_running_oauth.get(oauth_code)
if running_oauth is None or not running_oauth.get("finished", False):
return
del session["oauthcode"]
del session["oauthscope"]
session["rwthintern"] = True
if oauth_scope == "moodle":
session["moodle_courses"] = []
if "moodle_courses" in running_oauth:
session["moodle_courses"] = running_oauth["moodle_courses"]
del debug_running_oauth[oauth_code]
return
@app.route("/debug/dummy_oauth/auth/<code>", methods=["GET"])
def debug_authentication_dummy_oauth_auth(code):
if code not in debug_running_oauth:
return "Unknown code", HTTP_404_NOT_FOUND
if debug_running_oauth[code].get("scope") == "moodle":
return """
<html>
<h1>Dummy Moodle OAuth</h1>
<form action="/debug/dummy_oauth/submit/%s" method="post">
<label for="moodle_courses">Moodle courses (ids comma separated):</label>
<input type="text" name="moodle_courses" id="moodle_courses"/>
<input type="submit"/>
</form>
</html>
""" % code, HTTP_200_OK
else:
return """
<html>
<h1>Dummy RWTH OAuth</h1>
<form action="/debug/dummy_oauth/submit/%s" method="post">
<input type="submit"/>
</form>
</html>
""" % code, HTTP_200_OK
@app.route("/debug/dummy_oauth/submit/<code>", methods=["POST"])
def debug_authentication_dummy_oauth_submit(code):
if code not in debug_running_oauth:
return "Unknown code", HTTP_404_NOT_FOUND
running_oauth = debug_running_oauth[code]
running_oauth["finished"] = True
print(str(request.form))
if running_oauth.get("scope") == "moodle" and "moodle_courses" in request.form:
courses_str: str = request.form["moodle_courses"]
running_oauth["moodle_courses"] = [str(int(s)) for s in courses_str
.replace("\r", "")
.replace("\n", "")
.replace(" ", "")
.replace("\t", "")
.split(",")]
return "Successfully Authenticated", HTTP_200_OK
else:
def __start_oauth(scope: str) -> str:
if scope not in ["rwth", "moodle"]:
raise ValueError("Expected scope to be rwth or moodle")
response = __make_oauth_request(
"code",
scope="moodle.rwth" if scope == "moodle" else "userinfo.rwth"
)
session["oauthcode"] = response["device_code"]
session["oauthscope"] = scope
return response["verification_url"] + "?q=verify&d=" + response["user_code"]
def try_finish_running_authentication():
if "oauthcode" not in session or "oauthscope" not in session:
return
oauth_scope = session["oauthscope"]
if not oauth_scope in ["rwth", "moodle"]:
raise ValueError("Session auth scope set to something different than rwth or moodle")
try:
token_response = __make_oauth_request("token", code=session["oauthcode"], grant_type="device")
if token_response.get("status") != "ok":
return
del session["oauthcode"]
del session["oauthscope"]
session["rwthintern"] = True
if oauth_scope == "moodle":
moodle_response = __make_moodle_request("getmyenrolledcourses", token_response["access_token"])
if moodle_response and moodle_response.get("Data"):
moodle_courses = []
session["moodle_courses"] = moodle_courses
for course in moodle_response["Data"]:
moodle_courses.append(str(course["id"]))
else:
notify_admins(
'endpoint_exception',
traceback="try_finish_running_authentication failed while getting moodle courses, data={}"
.format(str(moodle_response)))
__make_oauth_request("token", refresh_token=token_response["refresh_token"], grant_type="invalidate")
except ApiClientException:
pass
def get_currently_authenticated_methods() -> [str]:
methods = []
if session.get("rwthintern", False):
methods.append("rwth")
if "moodle_courses" in session:
methods.append("moodle")
if "auth_data" in session and len(session["auth_data"]) > 0:
methods.append("password")
if is_moderator():
methods.append("fsmpi")
return methods
OAUTH_BASE = "https://oauth.campus.rwth-aachen.de/oauth2waitress/oauth2.svc/"
MOODLE_BASE = "https://moped.ecampus.rwth-aachen.de/proxy/api/v2/eLearning/Moodle/"
def __make_moodle_request(endpoint, token, **args):
args["token"] = token
return __handle_request_result(requests.request('GET', MOODLE_BASE + endpoint, params=args, timeout=30))
def __make_oauth_request(endpoint, **args):
if "L2P_APIKEY" not in config:
raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
args["client_id"] = config["L2P_APIKEY"]
return __handle_request_result(requests.request('POST', OAUTH_BASE + endpoint, data=args, timeout=30))
def __handle_request_result(response: requests.Response):
if response.ok:
return response.json()
try:
response = response.json()
except JSONDecodeError:
response = None
pass
if response.status_code >= 500:
raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
raise Exception("Error during request. Status: " + str(response.status_code) + " Response: " + str(response))
from api.api import *
import json
class ApiError:
def __init__(self, error_code: str, http_status_code: int, message: str):
self.error_code = error_code
self.http_status_code = http_status_code
self.message = message
ERROR_MALFORMED_JSON = ApiError("malformed_json", HTTP_400_BAD_REQUEST,
"The request json is malformed or missing")
def ERROR_REQUEST_MISSING_PARAMETER(parameter_name: str) -> ApiError:
return ApiError("request_missing_parameter", HTTP_400_BAD_REQUEST,
"Missing parameter " + parameter_name + " in request")
def ERROR_REQUEST_INVALID_PARAMETER(parameter_name: str, invalid_message: str) -> ApiError:
return ApiError("request_invalid_parameter", HTTP_400_BAD_REQUEST,
"Parameter " + parameter_name + " in request is invalid: " + invalid_message)
ERROR_UNKNOWN_REQUEST_PATH = ApiError("unknown_request_path", HTTP_404_NOT_FOUND,
"The specified request path does not exist")
ERROR_UNKNOWN_COURSE = ApiError("unknown_course", HTTP_404_NOT_FOUND,
"The specified course cannot be found")
ERROR_UNKNOWN_LECTURE = ApiError("unknown_lecture", HTTP_404_NOT_FOUND,
"The specified lecture cannot be found")
ERROR_INTERNAL_SERVER_ERROR = ApiError("internal_server_error", HTTP_500_INTERNAL_SERVER_ERROR,
"An unknown internal server error occurred")
ERROR_LECTURE_HAS_NO_PASSWORD = ApiError("lecture_has_no_password", HTTP_400_BAD_REQUEST,
"The specified lecture has no password authentication")
ERROR_AUTHENTICATION_FAILED = ApiError("authentication_failed", HTTP_403_FORBIDDEN,
"Authentication failed")
ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE = ApiError("authentication_servers_not_available", HTTP_503_SERVICE_UNAVAILABLE,
"Authentication servers are currently not available")
ERROR_UNAUTHORIZED = ApiError("unauthorized", HTTP_401_UNAUTHORIZED,
"You are not authorized")
ERROR_ACCESS_FORBIDDEN = ApiError("access_forbidden", HTTP_403_FORBIDDEN,
"You do not not access to this resource")
class ApiClientException(Exception):
def __init__(self, error: ApiError):
self.error = error
def api_handle_exceptions(func):
@wraps(func)
def decorator(*args, **kwargs):
try:
return func(*args, **kwargs)
except ApiClientException as e:
return api_on_error(e.error)
except Exception as e:
print("An exception occurred while handling api request:")
traceback.print_exception(e)
return api_on_error(ERROR_INTERNAL_SERVER_ERROR)
return decorator
def api_on_error(error: ApiError) -> tuple[str, int]:
error_json = {
"error_code": error.error_code,
"message": error.message
}
return json.dumps(error_json), error.http_status_code
from api.api import *
from api.authentication import *
@app.route(API_BASE_PATH + "/authentication/password", methods=["POST"])
@api_handle_exceptions
def authentication_password():
json_request = get_client_json(request)
lecture_id: int = json_request.get_int("lecture_id")
username: str = json_request.get_string("username", 128)
password: str = json_request.get_string("password", 128)
authenticate_password(lecture_id, username, password)
return json.dumps({}), HTTP_200_OK
@app.route(API_BASE_PATH + "/authentication/fsmpi", methods=["POST"])
@api_handle_exceptions
def authentication_fsmpi():
json_request = get_client_json(request)
username: str = json_request.get_string("username", 128)
password: str = json_request.get_string("password", 128)
user_info = authenticate_fsmpi(username, password)
response = {}
if user_info is not None:
response["user"] = user_info
if json_request.has("lecture_id"):
response["is_lecture_authenticated"] = is_lecture_authenticated(json_request.get_int("lecture_id"))
return json.dumps(response), HTTP_200_OK
@app.route(API_BASE_PATH + "/authentication/start_oauth", methods=["POST"])
@api_handle_exceptions
def authentication_start_oauth():
json_request = get_client_json(request)
login_type: str = json_request.get_string("type")
if login_type == "rwth":
verification_url = start_rwth_authentication()
elif login_type == "moodle":
verification_url = start_moodle_authentication()
else:
return api_on_error(ERROR_REQUEST_INVALID_PARAMETER("type", "Unknown type"))
return json.dumps({
"verification_url": verification_url
}), HTTP_202_ACCEPTED
@app.route(API_BASE_PATH + "/authentication/status", methods=["GET"])
@api_handle_exceptions
def authentication_status():
json_request = get_client_json(request)
try_finish_running_authentication()
json_response = {
"authenticated_methods": get_currently_authenticated_methods()
}
if is_rwth_authentication_running():
json_response["in_progress_authentication"] = "rwth"
elif is_moodle_authentication_running():
json_response["in_progress_authentication"] = "moodle"
if json_request.has("lecture_id"):
json_response["is_lecture_authenticated"] = is_lecture_authenticated(json_request.get_int("lecture_id"))
return json.dumps(json_response), HTTP_200_OK
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment