Select Git revision
database.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
authentication.py 21.53 KiB
import random
import string
from enum import StrEnum
from ipaddress import ip_address, ip_network
import requests
from requests.exceptions import JSONDecodeError
from functools import wraps
from flask import session, request
from ldap import ldapauth
from api.miscellaneous import *
from api.database import *
class ViewPermissionsType(StrEnum):
PUBLIC = "public"
PRIVATE = "private"
AUTHENTICATION = "authentication"
INHERIT = "inherit"
class ViewPermissions:
def __init__(self,
type: ViewPermissionsType,
rwth_authentication: bool or None = None,
fsmpi_authentication: bool or None = None,
moodle_course_ids: list[int] or None = None,
passwords: dict[str, str] or None = None):
super().__init__()
self.type = type
self.rwth_authentication = rwth_authentication
self.fsmpi_authentication = fsmpi_authentication
self.moodle_course_ids = moodle_course_ids
self.passwords = passwords
is_auth = type == ViewPermissionsType.AUTHENTICATION
if ((rwth_authentication is not None) != is_auth) \
or ((fsmpi_authentication is not None) != is_auth) \
or ((moodle_course_ids is not None) != is_auth) \
or ((passwords is not None) != is_auth):
raise TypeError()
if is_auth \
and not rwth_authentication \
and not fsmpi_authentication \
and len(moodle_course_ids) == 0 \
and len(passwords) == 0:
raise RuntimeError("At least one authentication method must be present for type AUTHENTICATION")
@staticmethod
def from_json(json: CJsonValue or JsonTypes) -> "ViewPermissions":
is_client = isinstance(json, CJsonValue)
if not is_client:
json = CJsonValue(json)
json = json.as_object()
try:
type_str: str = json.get_string("type", 100)
try:
type = ViewPermissionsType(type_str)
except ValueError:
raise json.raise_error(f"Unknown type '{truncate_string(type_str)}'")
rwth_authentication = None
fsmpi_authentication = None
moodle_course_ids = None
passwords = None
if type == ViewPermissionsType.AUTHENTICATION:
rwth_authentication = json.get_bool("rwth_authentication")
fsmpi_authentication = json.get_bool("fsmpi_authentication")
moodle_course_ids = [v.as_sint32() for v in json.get_array("moodle_course_ids")]
passwords_json = json.get_object("passwords")
passwords = {k: passwords_json.get_string(k, 1024) for k in passwords_json.keys()}
if not rwth_authentication \
and not fsmpi_authentication \
and len(moodle_course_ids) == 0 \
and len(passwords) == 0:
raise ApiClientException(ERROR_BAD_REQUEST(f"For type {ViewPermissionsType.AUTHENTICATION.value} at"
f" least one authentication method must be present"))
else:
json.raise_if_present("rwth_authentication")
json.raise_if_present("fsmpi_authentication")
json.raise_if_present("moodle_course_ids")
json.raise_if_present("passwords")
return ViewPermissions(
type,
rwth_authentication,
fsmpi_authentication,
moodle_course_ids,
passwords
)
except ApiClientException as e:
if is_client:
raise e
raise RuntimeError(f"Invalid json: {e.error.message}")
def to_json(self) -> dict:
if self.type == ViewPermissionsType.AUTHENTICATION:
return {
"type": self.type.value,
"rwth_authentication": self.rwth_authentication,
"fsmpi_authentication": self.fsmpi_authentication,
"moodle_course_ids": self.moodle_course_ids,
"passwords": self.passwords
}
else:
return {
"type": self.type.value
}
class EffectiveViewPermissions(ViewPermissions):
def __init__(self,
object_permissions: ViewPermissions,
inherited_permissions: ViewPermissions = ViewPermissions(ViewPermissionsType.PUBLIC)):
"""
:param object_permissions: The permissions set for the object
:param inherited_permissions: Only relevant if `permissions.type == INHERIT`. May not have type INHERIT
"""
effective = object_permissions
if object_permissions.type == ViewPermissionsType.INHERIT:
if inherited_permissions.type == ViewPermissionsType.INHERIT:
raise RuntimeError("No indirect inheritance allowed")
effective = inherited_permissions
super().__init__(
effective.type,
effective.rwth_authentication,
effective.fsmpi_authentication,
effective.moodle_course_ids,
effective.passwords
)
self.object_permissions = object_permissions
def api_moderator_route(require_csrf_token: bool = False):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not is_moderator():
raise ApiClientException(ERROR_UNAUTHORIZED)
if require_csrf_token:
check_csrf_token()
return func(*args, **kwargs)
return wrapper
return decorator
def is_moderator():
return "user" in session
def check_csrf_token():
if "X-Csrf-Token" not in request.headers \
or not is_valid_csrf_token(request.headers["X-Csrf-Token"]):
raise ApiClientException(ERROR_INVALID_CSRF_TOKEN)
def is_valid_csrf_token(csrf_token: str) -> bool:
if "_csrf_token" not in session:
return False
return csrf_token == session["_csrf_token"]
def get_csrf_token():
if "_csrf_token" not in session:
raise RuntimeError("No csrf token in session. Probably not a moderator")
return session["_csrf_token"]
def get_user_id() -> int or None:
if "user" not in session:
return None
return session["user"]["dbid"]
def get_user_info():
if "user" not in session:
return None
user = session["user"]
return {
"id": user["dbid"],
"id_string": user["uid"],
"name": user["givenName"]
}
def is_rwth_ip(ip_string: str) -> bool:
ip = ip_address(ip_string)
for net in server.config['RWTH_IP_RANGES']:
if ip in ip_network(net):
return True
return False
def _get_raw_view_permissions(auth_list_db: []) -> ViewPermissions:
if len(auth_list_db) == 0:
return ViewPermissions(ViewPermissionsType.INHERIT)
has_rwth: bool = False
has_fsmpi: bool = False
has_none: bool = False
moodle_course_ids: list[int] = []
passwords: dict[str, str] = {}
for auth_db in auth_list_db:
match auth_db["type"]:
case "public":
return ViewPermissions(ViewPermissionsType.PUBLIC)
case "rwth":
has_rwth = True
case "moodle":
moodle_course_ids.append(int(auth_db["param1"]))
case "fsmpi":
has_fsmpi = True
case "password":
username = auth_db["param1"]
password = auth_db["param2"]
if username in passwords:
print(f"Warning: Duplicate username in password permissions: {username} (Perm id: {auth_db['id']})")
passwords[username] = password
case "none":
has_none = True
case "l2p":
pass
case _:
raise ValueError("Unknown authentication method: " + str(type))
if has_rwth or has_fsmpi or len(moodle_course_ids) > 0 or len(passwords) > 0:
return ViewPermissions(
ViewPermissionsType.AUTHENTICATION,
has_rwth,
has_fsmpi,
moodle_course_ids,
passwords
)
if has_none:
return ViewPermissions(ViewPermissionsType.PRIVATE)
# We must only have a l2p permission. This is effectively private
return ViewPermissions(ViewPermissionsType.PRIVATE)
def get_effective_view_permissions(auth_list_db: [], for_lecture: bool) -> EffectiveViewPermissions:
course_auth = []
lecture_auth = []
for auth_db in auth_list_db:
if "course_id" in auth_db and auth_db["course_id"] is not None:
course_auth.append(auth_db)
elif "lecture_id" in auth_db and auth_db["lecture_id"] is not None:
lecture_auth.append(auth_db)
elif "video_id" in auth_db and auth_db["video_id"] is not None:
pass # Not supported anymore
else:
raise Exception("Permission has no course or lecture id set")
course_perm = EffectiveViewPermissions(_get_raw_view_permissions(course_auth))
if for_lecture:
return EffectiveViewPermissions(
_get_raw_view_permissions(lecture_auth),
course_perm
)
else:
return course_perm
def get_authentication_methods(auth_methods_db: [], for_lecture: bool) -> [str]:
perm = get_effective_view_permissions(auth_methods_db, for_lecture)
match perm.type:
case ViewPermissionsType.PUBLIC:
return ["public"]
case ViewPermissionsType.PRIVATE:
return []
case ViewPermissionsType.AUTHENTICATION:
auth_list_json: list[str] = []
if perm.rwth_authentication:
auth_list_json.append("rwth")
if perm.fsmpi_authentication:
auth_list_json.append("fsmpi")
if len(perm.moodle_course_ids) > 0:
auth_list_json.append("moodle")
if len(perm.passwords) > 0:
auth_list_json.append("password")
return auth_list_json
case ViewPermissionsType.INHERIT:
raise ValueError("Got INHERIT permission from get_effective_view_permissions")
case _:
raise ValueError(f"Unknown type {perm.type}")
def is_lecture_authenticated(lecture_id: int) -> bool:
if is_moderator():
return True
from api.course import lecture_query_auth
return is_authenticated(lecture_query_auth(lecture_id), True)
def is_authenticated(auth_list: [], for_lecture: bool):
if is_moderator():
return True
perm = get_effective_view_permissions(auth_list, for_lecture)
match perm.type:
case ViewPermissionsType.PUBLIC:
return True
case ViewPermissionsType.PRIVATE:
return False
case ViewPermissionsType.AUTHENTICATION:
if perm.rwth_authentication and (
session.get("rwthintern", False)
or ("X-Real-IP" in request.headers and is_rwth_ip(request.headers["X-Real-IP"]))
):
return True
if perm.fsmpi_authentication and is_moderator():
return True
for moodle_id in perm.moodle_course_ids:
if moodle_id in session.get("moodle_courses", []):
return True
if "auth_data" in session:
for username, password in session["auth_data"].items():
if username not in perm.passwords:
continue
if password == perm.passwords[username]:
return True
case ViewPermissionsType.INHERIT:
raise ValueError("Got INHERIT permission from get_effective_view_permissions")
case _:
raise ValueError(f"Unknown type {perm.type}")
def authenticate_password(lecture_id: int, username: str, password: str):
"""
May throw APIClientException.
Only returns if authentication was successful.
"""
from api.course import lecture_query_auth
perm = get_effective_view_permissions(lecture_query_auth(lecture_id), True)
if perm.type != ViewPermissionsType.AUTHENTICATION or len(perm.passwords) == 0:
raise ApiClientException(ERROR_LECTURE_HAS_NO_PASSWORD)
if username not in perm.passwords or password != perm.passwords[username]:
raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
if "auth_data" not in session:
session["auth_data"] = {}
session["auth_data"][username] = password
session.modified = True
_SQL_GET_USER_BY_NAME = PreparedStatement("""
SELECT * FROM `users` WHERE `name` = ?
""")
_SQL_INSERT_USER = PreparedStatement("""
INSERT INTO `users`
(`name`, `realname`, `fsacc`, `level`, `calendar_key`, `rfc6238`)
VALUES (?, ?, ?, 1, "", "")
""")
def authenticate_fsmpi(username: str, password: str) -> {}:
"""
May throw APIClientException.
Only returns if authentication was successful.
"""
user_info, groups = ldapauth(username, password)
user_id: str = user_info.get("uid")
if not user_id or not __ldap_is_moderator(groups):
raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
session["user"] = user_info
with db_pool.start_write_transaction() as trans:
user_list_db = trans.execute_statement(_SQL_GET_USER_BY_NAME, user_id)
if len(user_list_db) < 1:
result = trans.execute_statement_full_result_and_commit(
_SQL_INSERT_USER, user_id, user_info["givenName"], user_id)
user_db_id = result[DB_RESULT_AUTO_INCREMENT]
else:
trans.commit()
user_db = user_list_db[0]
user_db_id = user_db["id"]
user_info["dbid"] = user_db_id
session["_csrf_token"] = "".join(
random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
session.permanent = True
return get_user_info()
def logout_moderator():
session.pop("user", None)
session.pop("_csrf_token", None)
session.permanent = False
def __ldap_is_moderator(user_groups: []) -> bool:
for group in server.config["LDAP_GROUPS"]:
if group in user_groups:
return True
return False
def start_rwth_authentication() -> str:
"""
:return: Verification url
"""
return __start_oauth("rwth")
def is_rwth_authentication_running() -> bool:
return "oauthscope" in session and "rwth" in session["oauthscope"]
def is_moodle_authentication_running() -> bool:
return "oauthscope" in session and "moodle" in session["oauthscope"]
def start_moodle_authentication() -> str:
"""
:return: Verification url
"""
return __start_oauth("moodle")
if DEBUG_ENABLED:
debug_running_oauth = {}
def __start_oauth(scope: str) -> str:
if scope not in ["rwth", "moodle"]:
raise ValueError("Expected scope to be rwth or moodle")
oauth_code = "".join(
random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
debug_running_oauth[oauth_code] = {
"scope": scope
}
session["oauthcode"] = oauth_code
session["oauthscope"] = scope
return "/debug/dummy_oauth/auth/" + oauth_code
def try_finish_running_authentication():
if "oauthscope" not in session or "oauthcode" not in session:
return
oauth_scope = session["oauthscope"]
oauth_code = session["oauthcode"]
running_oauth = debug_running_oauth.get(oauth_code)
if running_oauth is None or not running_oauth.get("finished", False):
return
del session["oauthcode"]
del session["oauthscope"]
session["rwthintern"] = True
if oauth_scope == "moodle":
session["moodle_courses"] = []
if "moodle_courses" in running_oauth:
session["moodle_courses"] = running_oauth["moodle_courses"]
return
@server.app.route("/debug/dummy_oauth/auth/<code>", methods=["GET"])
def debug_authentication_dummy_oauth_auth(code):
if code not in debug_running_oauth:
return "Unknown code", HTTP_404_NOT_FOUND
if debug_running_oauth[code].get("scope") == "moodle":
return """
<html>
<h1>Dummy Moodle OAuth</h1>
<form action="/debug/dummy_oauth/submit/%s" method="post">
<label for="moodle_courses">Moodle courses (ids comma separated):</label>
<input type="text" name="moodle_courses" id="moodle_courses"/>
<input type="submit"/>
</form>
</html>
""" % code, HTTP_200_OK
else:
return """
<html>
<h1>Dummy RWTH OAuth</h1>
<form action="/debug/dummy_oauth/submit/%s" method="post">
<input type="submit"/>
</form>
</html>
""" % code, HTTP_200_OK
@server.app.route("/debug/dummy_oauth/submit/<code>", methods=["POST"])
def debug_authentication_dummy_oauth_submit(code):
if code not in debug_running_oauth:
return "Unknown code", HTTP_404_NOT_FOUND
running_oauth = debug_running_oauth[code]
running_oauth["finished"] = True
if running_oauth.get("scope") == "moodle" and "moodle_courses" in request.form:
courses_str: str = request.form["moodle_courses"]
running_oauth["moodle_courses"] = [str(int(s)) for s in courses_str
.replace("\r", "")
.replace("\n", "")
.replace(" ", "")
.replace("\t", "")
.split(",")]
return "Successfully Authenticated", HTTP_200_OK
else:
def __start_oauth(scope: str) -> str:
if scope not in ["rwth", "moodle"]:
raise ValueError("Expected scope to be rwth or moodle")
response = __make_oauth_request(
"code",
scope="moodle.rwth" if scope == "moodle" else "userinfo.rwth"
)
session["oauthcode"] = response["device_code"]
session["oauthscope"] = scope
return response["verification_url"] + "?q=verify&d=" + response["user_code"]
def try_finish_running_authentication():
if "oauthcode" not in session or "oauthscope" not in session:
return
oauth_scope = session["oauthscope"]
if oauth_scope not in ["rwth", "moodle"]:
raise ValueError("Session auth scope set to something different than rwth or moodle")
try:
token_response = __make_oauth_request("token", code=session["oauthcode"], grant_type="device")
if token_response.get("status") != "ok":
return
del session["oauthcode"]
del session["oauthscope"]
session["rwthintern"] = True
if oauth_scope == "moodle":
moodle_response = __make_moodle_request("getmyenrolledcourses", token_response["access_token"])
if moodle_response and moodle_response.get("Data"):
moodle_courses = []
session["moodle_courses"] = moodle_courses
for course in moodle_response["Data"]:
moodle_courses.append(str(course["id"]))
else:
server.notify_admins(
'endpoint_exception',
traceback="try_finish_running_authentication failed while getting moodle courses, data={}"
.format(str(moodle_response)))
__make_oauth_request("token", refresh_token=token_response["refresh_token"], grant_type="invalidate")
except ApiClientException:
pass
def get_currently_authenticated_methods() -> [str]:
methods = []
if session.get("rwthintern", False):
methods.append("rwth")
if "moodle_courses" in session:
methods.append("moodle")
if "auth_data" in session and len(session["auth_data"]) > 0:
methods.append("password")
if is_moderator():
methods.append("fsmpi")
return methods
OAUTH_BASE = "https://oauth.campus.rwth-aachen.de/oauth2waitress/oauth2.svc/"
MOODLE_BASE = "https://moped.ecampus.rwth-aachen.de/proxy/api/v2/eLearning/Moodle/"
def __make_moodle_request(endpoint, token, **args):
args["token"] = token
return __handle_request_result(requests.request('GET', MOODLE_BASE + endpoint, params=args, timeout=30))
def __make_oauth_request(endpoint, **args):
if "L2P_APIKEY" not in server.config:
raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
args["client_id"] = server.config["L2P_APIKEY"]
return __handle_request_result(requests.request('POST', OAUTH_BASE + endpoint, data=args, timeout=30))
def __handle_request_result(response: requests.Response):
if response.ok:
return response.json()
try:
response = response.json()
except JSONDecodeError:
response = None
pass
if response.status_code >= 500:
raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
raise Exception("Error during request. Status: " + str(response.status_code) + " Response: " + str(response))