Select Git revision
language_parser.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
authentication.py 21.53 KiB
import random
import string
from enum import StrEnum
from ipaddress import ip_address, ip_network
import requests
from requests.exceptions import JSONDecodeError
from functools import wraps
from flask import session, request
from ldap import ldapauth
from api.miscellaneous import *
from api.database import *
class ViewPermissionsType(StrEnum):
PUBLIC = "public"
PRIVATE = "private"
AUTHENTICATION = "authentication"
INHERIT = "inherit"
class ViewPermissions:
def __init__(self,
type: ViewPermissionsType,
rwth_authentication: bool or None = None,
fsmpi_authentication: bool or None = None,
moodle_course_ids: list[int] or None = None,
passwords: dict[str, str] or None = None):
super().__init__()
self.type = type
self.rwth_authentication = rwth_authentication
self.fsmpi_authentication = fsmpi_authentication
self.moodle_course_ids = moodle_course_ids
self.passwords = passwords
is_auth = type == ViewPermissionsType.AUTHENTICATION
if ((rwth_authentication is not None) != is_auth) \
or ((fsmpi_authentication is not None) != is_auth) \
or ((moodle_course_ids is not None) != is_auth) \
or ((passwords is not None) != is_auth):
raise TypeError()
if is_auth \
and not rwth_authentication \
and not fsmpi_authentication \
and len(moodle_course_ids) == 0 \
and len(passwords) == 0:
raise RuntimeError("At least one authentication method must be present for type AUTHENTICATION")
@staticmethod
def from_json(json: CJsonValue or JsonTypes) -> "ViewPermissions":
is_client = isinstance(json, CJsonValue)
if not is_client:
json = CJsonValue(json)
json = json.as_object()
try:
type_str: str = json.get_string("type", 100)
try:
type = ViewPermissionsType(type_str)
except ValueError:
raise json.raise_error(f"Unknown type '{truncate_string(type_str)}'")
rwth_authentication = None
fsmpi_authentication = None
moodle_course_ids = None
passwords = None
if type == ViewPermissionsType.AUTHENTICATION:
rwth_authentication = json.get_bool("rwth_authentication")
fsmpi_authentication = json.get_bool("fsmpi_authentication")
moodle_course_ids = [v.as_sint32() for v in json.get_array("moodle_course_ids")]
passwords_json = json.get_object("passwords")
passwords = {k: passwords_json.get_string(k, 1024) for k in passwords_json.keys()}
if not rwth_authentication \
and not fsmpi_authentication \
and len(moodle_course_ids) == 0 \
and len(passwords) == 0:
raise ApiClientException(ERROR_BAD_REQUEST(f"For type {ViewPermissionsType.AUTHENTICATION.value} at"
f" least one authentication method must be present"))
else:
json.raise_if_present("rwth_authentication")
json.raise_if_present("fsmpi_authentication")
json.raise_if_present("moodle_course_ids")
json.raise_if_present("passwords")
return ViewPermissions(
type,
rwth_authentication,
fsmpi_authentication,
moodle_course_ids,
passwords
)
except ApiClientException as e:
if is_client:
raise e
raise RuntimeError(f"Invalid json: {e.error.message}")
def to_json(self) -> dict:
if self.type == ViewPermissionsType.AUTHENTICATION:
return {
"type": self.type.value,
"rwth_authentication": self.rwth_authentication,
"fsmpi_authentication": self.fsmpi_authentication,
"moodle_course_ids": self.moodle_course_ids,
"passwords": self.passwords
}
else:
return {
"type": self.type.value
}
class EffectiveViewPermissions(ViewPermissions):
def __init__(self,
object_permissions: ViewPermissions,
inherited_permissions: ViewPermissions = ViewPermissions(ViewPermissionsType.PUBLIC)):
"""
:param object_permissions: The permissions set for the object
:param inherited_permissions: Only relevant if `permissions.type == INHERIT`. May not have type INHERIT
"""
effective = object_permissions
if object_permissions.type == ViewPermissionsType.INHERIT:
if inherited_permissions.type == ViewPermissionsType.INHERIT:
raise RuntimeError("No indirect inheritance allowed")
effective = inherited_permissions
super().__init__(
effective.type,
effective.rwth_authentication,
effective.fsmpi_authentication,
effective.moodle_course_ids,
effective.passwords
)
self.object_permissions = object_permissions
def api_moderator_route(require_csrf_token: bool = False):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not is_moderator():
raise ApiClientException(ERROR_UNAUTHORIZED)
if require_csrf_token:
check_csrf_token()
return func(*args, **kwargs)
return wrapper
return decorator
def is_moderator():
return "user" in session
def check_csrf_token():
if "X-Csrf-Token" not in request.headers \
or not is_valid_csrf_token(request.headers["X-Csrf-Token"]):
raise ApiClientException(ERROR_INVALID_CSRF_TOKEN)
def is_valid_csrf_token(csrf_token: str) -> bool:
if "_csrf_token" not in session:
return False
return csrf_token == session["_csrf_token"]
def get_csrf_token():
if "_csrf_token" not in session:
raise RuntimeError("No csrf token in session. Probably not a moderator")
return session["_csrf_token"]
def get_user_id() -> int or None:
if "user" not in session:
return None
return session["user"]["dbid"]
def get_user_info():
if "user" not in session:
return None
user = session["user"]
return {
"id": user["dbid"],
"id_string": user["uid"],
"name": user["givenName"]
}
def is_rwth_ip(ip_string: str) -> bool:
ip = ip_address(ip_string)
for net in server.config['RWTH_IP_RANGES']:
if ip in ip_network(net):
return True
return False
def _get_raw_view_permissions(auth_list_db: []) -> ViewPermissions:
if len(auth_list_db) == 0:
return ViewPermissions(ViewPermissionsType.INHERIT)
has_rwth: bool = False
has_fsmpi: bool = False
has_none: bool = False
moodle_course_ids: list[int] = []
passwords: dict[str, str] = {}
for auth_db in auth_list_db:
match auth_db["type"]:
case "public":
return ViewPermissions(ViewPermissionsType.PUBLIC)
case "rwth":
has_rwth = True
case "moodle":
moodle_course_ids.append(int(auth_db["param1"]))
case "fsmpi":
has_fsmpi = True
case "password":
username = auth_db["param1"]
password = auth_db["param2"]
if username in passwords:
print(f"Warning: Duplicate username in password permissions: {username} (Perm id: {auth_db['id']})")
passwords[username] = password
case "none":
has_none = True
case "l2p":
pass
case _:
raise ValueError("Unknown authentication method: " + str(type))
if has_rwth or has_fsmpi or len(moodle_course_ids) > 0 or len(passwords) > 0:
return ViewPermissions(
ViewPermissionsType.AUTHENTICATION,
has_rwth,
has_fsmpi,
moodle_course_ids,
passwords
)
if has_none:
return ViewPermissions(ViewPermissionsType.PRIVATE)
# We must only have a l2p permission. This is effectively private
return ViewPermissions(ViewPermissionsType.PRIVATE)
def get_effective_view_permissions(auth_list_db: [], for_lecture: bool) -> EffectiveViewPermissions:
course_auth = []
lecture_auth = []
for auth_db in auth_list_db:
if "course_id" in auth_db and auth_db["course_id"] is not None:
course_auth.append(auth_db)
elif "lecture_id" in auth_db and auth_db["lecture_id"] is not None:
lecture_auth.append(auth_db)
elif "video_id" in auth_db and auth_db["video_id"] is not None:
pass # Not supported anymore
else:
raise Exception("Permission has no course or lecture id set")
course_perm = EffectiveViewPermissions(_get_raw_view_permissions(course_auth))
if for_lecture:
return EffectiveViewPermissions(
_get_raw_view_permissions(lecture_auth),
course_perm
)
else:
return course_perm
def get_authentication_methods(auth_methods_db: [], for_lecture: bool) -> [str]:
perm = get_effective_view_permissions(auth_methods_db, for_lecture)
match perm.type:
case ViewPermissionsType.PUBLIC:
return ["public"]
case ViewPermissionsType.PRIVATE:
return []
case ViewPermissionsType.AUTHENTICATION:
auth_list_json: list[str] = []
if perm.rwth_authentication:
auth_list_json.append("rwth")
if perm.fsmpi_authentication:
auth_list_json.append("fsmpi")
if len(perm.moodle_course_ids) > 0:
auth_list_json.append("moodle")
if len(perm.passwords) > 0:
auth_list_json.append("password")
return auth_list_json
case ViewPermissionsType.INHERIT:
raise ValueError("Got INHERIT permission from get_effective_view_permissions")
case _:
raise ValueError(f"Unknown type {perm.type}")
def is_lecture_authenticated(lecture_id: int) -> bool:
if is_moderator():
return True
from api.course import lecture_query_auth
return is_authenticated(lecture_query_auth(lecture_id), True)
def is_authenticated(auth_list: [], for_lecture: bool):
if is_moderator():
return True
perm = get_effective_view_permissions(auth_list, for_lecture)
match perm.type:
case ViewPermissionsType.PUBLIC:
return True
case ViewPermissionsType.PRIVATE:
return False
case ViewPermissionsType.AUTHENTICATION:
if perm.rwth_authentication and (
session.get("rwthintern", False)
or ("X-Real-IP" in request.headers and is_rwth_ip(request.headers["X-Real-IP"]))
):
return True
if perm.fsmpi_authentication and is_moderator():
return True
for moodle_id in perm.moodle_course_ids:
if moodle_id in session.get("moodle_courses", []):
return True
if "auth_data" in session:
for username, password in session["auth_data"].items():
if username not in perm.passwords:
continue
if password == perm.passwords[username]:
return True
case ViewPermissionsType.INHERIT:
raise ValueError("Got INHERIT permission from get_effective_view_permissions")
case _:
raise ValueError(f"Unknown type {perm.type}")
def authenticate_password(lecture_id: int, username: str, password: str):
"""
May throw APIClientException.
Only returns if authentication was successful.
"""
from api.course import lecture_query_auth
perm = get_effective_view_permissions(lecture_query_auth(lecture_id), True)
if perm.type != ViewPermissionsType.AUTHENTICATION or len(perm.passwords) == 0:
raise ApiClientException(ERROR_LECTURE_HAS_NO_PASSWORD)
if username not in perm.passwords or password != perm.passwords[username]:
raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
if "auth_data" not in session:
session["auth_data"] = {}
session["auth_data"][username] = password
session.modified = True
_SQL_GET_USER_BY_NAME = PreparedStatement("""
SELECT * FROM `users` WHERE `name` = ?
""")
_SQL_INSERT_USER = PreparedStatement("""
INSERT INTO `users`
(`name`, `realname`, `fsacc`, `level`, `calendar_key`, `rfc6238`)
VALUES (?, ?, ?, 1, "", "")
""")
def authenticate_fsmpi(username: str, password: str) -> {}:
"""
May throw APIClientException.
Only returns if authentication was successful.
"""
user_info, groups = ldapauth(username, password)
user_id: str = user_info.get("uid")
if not user_id or not __ldap_is_moderator(groups):
raise ApiClientException(ERROR_AUTHENTICATION_FAILED)
session["user"] = user_info
with db_pool.start_write_transaction() as trans:
user_list_db = trans.execute_statement(_SQL_GET_USER_BY_NAME, user_id)
if len(user_list_db) < 1:
result = trans.execute_statement_full_result_and_commit(
_SQL_INSERT_USER, user_id, user_info["givenName"], user_id)
user_db_id = result[DB_RESULT_AUTO_INCREMENT]
else:
trans.commit()
user_db = user_list_db[0]
user_db_id = user_db["id"]
user_info["dbid"] = user_db_id
session["_csrf_token"] = "".join(
random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
session.permanent = True
return get_user_info()
def logout_moderator():
session.pop("user", None)
session.pop("_csrf_token", None)
session.permanent = False
def __ldap_is_moderator(user_groups: []) -> bool:
for group in server.config["LDAP_GROUPS"]:
if group in user_groups:
return True
return False
def start_rwth_authentication() -> str:
"""
:return: Verification url
"""
return __start_oauth("rwth")
def is_rwth_authentication_running() -> bool:
return "oauthscope" in session and "rwth" in session["oauthscope"]
def is_moodle_authentication_running() -> bool:
return "oauthscope" in session and "moodle" in session["oauthscope"]
def start_moodle_authentication() -> str:
"""
:return: Verification url
"""
return __start_oauth("moodle")
if DEBUG_ENABLED:
debug_running_oauth = {}
def __start_oauth(scope: str) -> str:
if scope not in ["rwth", "moodle"]:
raise ValueError("Expected scope to be rwth or moodle")
oauth_code = "".join(
random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
debug_running_oauth[oauth_code] = {
"scope": scope
}
session["oauthcode"] = oauth_code
session["oauthscope"] = scope
return "/debug/dummy_oauth/auth/" + oauth_code
def try_finish_running_authentication():
if "oauthscope" not in session or "oauthcode" not in session:
return
oauth_scope = session["oauthscope"]
oauth_code = session["oauthcode"]
running_oauth = debug_running_oauth.get(oauth_code)
if running_oauth is None or not running_oauth.get("finished", False):
return
del session["oauthcode"]
del session["oauthscope"]
session["rwthintern"] = True
if oauth_scope == "moodle":
session["moodle_courses"] = []
if "moodle_courses" in running_oauth:
session["moodle_courses"] = running_oauth["moodle_courses"]
return
@server.app.route("/debug/dummy_oauth/auth/<code>", methods=["GET"])
def debug_authentication_dummy_oauth_auth(code):
if code not in debug_running_oauth:
return "Unknown code", HTTP_404_NOT_FOUND
if debug_running_oauth[code].get("scope") == "moodle":
return """
<html>
<h1>Dummy Moodle OAuth</h1>
<form action="/debug/dummy_oauth/submit/%s" method="post">
<label for="moodle_courses">Moodle courses (ids comma separated):</label>
<input type="text" name="moodle_courses" id="moodle_courses"/>
<input type="submit"/>
</form>
</html>
""" % code, HTTP_200_OK
else:
return """
<html>
<h1>Dummy RWTH OAuth</h1>
<form action="/debug/dummy_oauth/submit/%s" method="post">
<input type="submit"/>
</form>
</html>
""" % code, HTTP_200_OK
@server.app.route("/debug/dummy_oauth/submit/<code>", methods=["POST"])
def debug_authentication_dummy_oauth_submit(code):
if code not in debug_running_oauth:
return "Unknown code", HTTP_404_NOT_FOUND
running_oauth = debug_running_oauth[code]
running_oauth["finished"] = True
if running_oauth.get("scope") == "moodle" and "moodle_courses" in request.form:
courses_str: str = request.form["moodle_courses"]
running_oauth["moodle_courses"] = [str(int(s)) for s in courses_str
.replace("\r", "")
.replace("\n", "")
.replace(" ", "")
.replace("\t", "")
.split(",")]
return "Successfully Authenticated", HTTP_200_OK
else:
def __start_oauth(scope: str) -> str:
if scope not in ["rwth", "moodle"]:
raise ValueError("Expected scope to be rwth or moodle")
response = __make_oauth_request(
"code",
scope="moodle.rwth" if scope == "moodle" else "userinfo.rwth"
)
session["oauthcode"] = response["device_code"]
session["oauthscope"] = scope
return response["verification_url"] + "?q=verify&d=" + response["user_code"]
def try_finish_running_authentication():
if "oauthcode" not in session or "oauthscope" not in session:
return
oauth_scope = session["oauthscope"]
if oauth_scope not in ["rwth", "moodle"]:
raise ValueError("Session auth scope set to something different than rwth or moodle")
try:
token_response = __make_oauth_request("token", code=session["oauthcode"], grant_type="device")
if token_response.get("status") != "ok":
return
del session["oauthcode"]
del session["oauthscope"]
session["rwthintern"] = True
if oauth_scope == "moodle":
moodle_response = __make_moodle_request("getmyenrolledcourses", token_response["access_token"])
if moodle_response and moodle_response.get("Data"):
moodle_courses = []
session["moodle_courses"] = moodle_courses
for course in moodle_response["Data"]:
moodle_courses.append(str(course["id"]))
else:
server.notify_admins(
'endpoint_exception',
traceback="try_finish_running_authentication failed while getting moodle courses, data={}"
.format(str(moodle_response)))
__make_oauth_request("token", refresh_token=token_response["refresh_token"], grant_type="invalidate")
except ApiClientException:
pass
def get_currently_authenticated_methods() -> [str]:
methods = []
if session.get("rwthintern", False):
methods.append("rwth")
if "moodle_courses" in session:
methods.append("moodle")
if "auth_data" in session and len(session["auth_data"]) > 0:
methods.append("password")
if is_moderator():
methods.append("fsmpi")
return methods
OAUTH_BASE = "https://oauth.campus.rwth-aachen.de/oauth2waitress/oauth2.svc/"
MOODLE_BASE = "https://moped.ecampus.rwth-aachen.de/proxy/api/v2/eLearning/Moodle/"
def __make_moodle_request(endpoint, token, **args):
args["token"] = token
return __handle_request_result(requests.request('GET', MOODLE_BASE + endpoint, params=args, timeout=30))
def __make_oauth_request(endpoint, **args):
if "L2P_APIKEY" not in server.config:
raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
args["client_id"] = server.config["L2P_APIKEY"]
return __handle_request_result(requests.request('POST', OAUTH_BASE + endpoint, data=args, timeout=30))
def __handle_request_result(response: requests.Response):
if response.ok:
return response.json()
try:
response = response.json()
except JSONDecodeError:
response = None
pass
if response.status_code >= 500:
raise ApiClientException(ERROR_AUTHENTICATION_SERVERS_NOT_AVAILABLE)
raise Exception("Error during request. Status: " + str(response.status_code) + " Response: " + str(response))