Commits (6)
......@@ -59,7 +59,9 @@ class UserManager:
class SecurityManager:
def __init__(self, key, max_duration=300):
self.maccer = hmac.new(key.encode("utf-8"), digestmod=hashlib.sha512)
if isinstance(key, str):
key = key.encode("utf-8")
self.maccer = hmac.new(key, digestmod=hashlib.sha512)
self.max_duration = max_duration
def hash_user(self, user):
......@@ -93,10 +95,17 @@ class StaticUserManager:
for (username, password, groups) in users
}
self.group_map = {
username: groups
username: tuple(groups)
for (username, password, groups) in users
}
def __repr__(self):
users = [
(username, self.passwords[username], self.group_map[username])
for username in self.passwords
]
return "StaticUserManager({})".format(users)
def authenticate(self, username, password):
return (username in self.passwords
and self.passwords[username] == password)
......@@ -106,7 +115,10 @@ class StaticUserManager:
yield from self.group_map[username]
def all_groups(self, username, password):
yield from list(set(group for group in self.group_map.values()))
yield from list(set(
group
for groups in self.group_map.values()
for group in groups))
try:
......@@ -118,6 +130,17 @@ try:
self.user_dn = user_dn
self.group_dn = group_dn
def __repr__(self):
return (
"LdapManager(host='{host}', user_dn='{user_dn}', "
"group_dn='{group_dn}', port={port}, use_ssl={use_ssl})"
.format(
host=self.server.host,
user_dn=self.user_dn,
group_dn=self.group_dn,
port=self.server.port,
use_ssl=self.server.ssl))
def authenticate(self, username, password):
try:
connection = ldap3.Connection(
......@@ -163,6 +186,24 @@ try:
self.domain = domain
self.user_dn = user_dn
self.group_dn = group_dn
self.ca_cert = ca_cert
self.host = host
self.port = port
self.use_ssl = use_ssl
def __repr__(self):
return (
"ADManager(host='{host}', domain='{domain}', "
"user_dn='{user_dn}', group_dn='{group_dn}', "
"port={port}, use_ssl={use_ssl}, ca_cert='{ca_cert}')"
.format(
host=self.host,
domain=self.domain,
user_dn=self.user_dn,
group_dn=self.group_dn,
port=self.port,
use_ssl=self.use_ssl,
ca_cert=self.ca_cert))
def prepare_connection(self, username=None, password=None):
if username is not None and password is not None:
......@@ -229,6 +270,9 @@ try:
def __init__(self):
self.pam = pam.pam()
def __repr__(self):
return "PAMManager()"
def authenticate(self, username, password):
return self.pam.authenticate(username, password)
......@@ -241,5 +285,6 @@ try:
def all_groups(self, username, password):
for group in grp.getgrall():
yield group.gr_name
except ImportError:
pass
......@@ -5,16 +5,15 @@
import functools
from flask import session, request, redirect as flask_redirect, url_for
import config
cookie = getattr(config, "REDIRECT_BACK_COOKIE", "back")
default_view = getattr(config, "REDIRECT_BACK_DEFAULT", "index")
COOKIE_NAME = "back"
DEFAULT_VIEW = "index"
def anchor(func, cookie=cookie):
def anchor(func, COOKIE_NAME=COOKIE_NAME):
@functools.wraps(func)
def result(*args, **kwargs):
session[cookie] = request.url
session[COOKIE_NAME] = request.url
return func(*args, **kwargs)
return result
......@@ -23,12 +22,12 @@ def default_url(default, **url_args):
return url_for(default, **url_args)
def url(default=default_view, cookie=cookie, **url_args):
return session.get(cookie, default_url(default, **url_args))
def url(default=DEFAULT_VIEW, COOKIE_NAME=COOKIE_NAME, **url_args):
return session.get(COOKIE_NAME, default_url(default, **url_args))
def redirect(default=default_view, cookie=cookie, **url_args):
target = url(default, cookie, **url_args)
def redirect(default=DEFAULT_VIEW, COOKIE_NAME=COOKIE_NAME, **url_args):
target = url(default, COOKIE_NAME, **url_args)
if target == request.url:
target = default_url(default, **url_args)
return flask_redirect(target)