auth.py 8.09 KB
Newer Older
1
import hmac, hashlib
2
import ssl
Robin Sonnabend's avatar
Robin Sonnabend committed
3
from datetime import datetime
Robin Sonnabend's avatar
Robin Sonnabend committed
4

5
6

class User:
Robin Sonnabend's avatar
Robin Sonnabend committed
7
8
    def __init__(self, username, groups, timestamp=None, obsolete=False,
            permanent=False):
9
10
        self.username = username
        self.groups = groups
Robin Sonnabend's avatar
Robin Sonnabend committed
11
12
13
14
        if timestamp is not None:
            self.timestamp = timestamp
        else:
            self.timestamp = datetime.now()
15
        self.obsolete = obsolete
16
        self.permanent = permanent
17
18

    def summarize(self):
Robin Sonnabend's avatar
Robin Sonnabend committed
19
20
        return "{}:{}:{}:{}:{}".format(self.username, ",".join(self.groups),
            str(self.timestamp.timestamp()), self.obsolete, self.permanent)
21
22
23

    @staticmethod
    def from_summary(summary):
24
25
        parts = summary.split(":", 4)
        if len(parts) != 5:
Robin Sonnabend's avatar
Robin Sonnabend committed
26
            return None
27
        name, group_str, timestamp_str, obsolete_str, permanent_str = parts
28
29
30
        timestamp = datetime.fromtimestamp(float(timestamp_str))
        obsolete = obsolete_str == "True"
        groups = group_str.split(",")
31
32
        permanent = permanent_str == "True"
        return User(name, groups, timestamp, obsolete, permanent)
33
34
35
36
37
38

    @staticmethod
    def from_hashstring(secure_string):
        summary, hash = secure_string.split("=", 1)
        return User.from_summary(summary)

Robin Sonnabend's avatar
Robin Sonnabend committed
39

Robin Sonnabend's avatar
Robin Sonnabend committed
40
41
42
43
class UserManager:
    def __init__(self, backends):
        self.backends = backends

44
    def login(self, username, password, permanent=False):
Robin Sonnabend's avatar
Robin Sonnabend committed
45
46
        for backend in self.backends:
            if backend.authenticate(username, password):
47
                groups = sorted(list(set(backend.groups(username, password))))
Robin Sonnabend's avatar
Robin Sonnabend committed
48
49
                return User(username, groups, obsolete=backend.obsolete,
                    permanent=permanent)
Robin Sonnabend's avatar
Robin Sonnabend committed
50
51
52
53
54
55
56
        return None

    def all_groups(self):
        for backend in self.backends:
            yield from backend.all_groups()


Robin Sonnabend's avatar
Robin Sonnabend committed
57
58
59
60
class SecurityManager:
    def __init__(self, key, max_duration=300):
        self.maccer = hmac.new(key.encode("utf-8"), digestmod=hashlib.sha512)
        self.max_duration = max_duration
Robin Sonnabend's avatar
Robin Sonnabend committed
61

Robin Sonnabend's avatar
Robin Sonnabend committed
62
63
64
65
66
    def hash_user(self, user):
        maccer = self.maccer.copy()
        summary = user.summarize()
        maccer.update(summary.encode("utf-8"))
        return "{}={}".format(summary, maccer.hexdigest())
Robin Sonnabend's avatar
Robin Sonnabend committed
67

Robin Sonnabend's avatar
Robin Sonnabend committed
68
69
70
71
    def check_user(self, string):
        parts = string.split("=", 1)
        if len(parts) != 2:
            # wrong format, expecting summary:hash
72
            return False
Robin Sonnabend's avatar
Robin Sonnabend committed
73
74
75
76
77
78
79
80
81
82
83
        summary, hash = map(lambda s: s.encode("utf-8"), parts)
        maccer = self.maccer.copy()
        maccer.update(summary)
        user = User.from_hashstring(string)
        if user is None:
            return False
        session_duration = datetime.now() - user.timestamp
        macs_equal = hmac.compare_digest(maccer.hexdigest().encode("utf-8"),
            hash)
        time_short = int(session_duration.total_seconds()) < self.max_duration
        return macs_equal and (time_short or user.permanent)
Robin Sonnabend's avatar
Robin Sonnabend committed
84

85
86

class StaticUserManager:
Robin Sonnabend's avatar
Robin Sonnabend committed
87
    def __init__(self, users, obsolete=False):
88
89
90
91
        self.passwords = {
            username: password
            for (username, password, groups) in users
        }
Robin Sonnabend's avatar
Robin Sonnabend committed
92
        self.group_map = {
93
94
95
            username: groups
            for (username, password, groups) in users
        }
Robin Sonnabend's avatar
Robin Sonnabend committed
96
        self.obsolete = obsolete
97
98
99
100
101
102

    def authenticate(self, username, password):
        return (username in self.passwords
            and self.passwords[username] == password)

    def groups(self, username, password=None):
Robin Sonnabend's avatar
Robin Sonnabend committed
103
104
        if username in self.group_map:
            yield from self.group_map[username]
105
106

    def all_groups(self):
Robin Sonnabend's avatar
Robin Sonnabend committed
107
108
        yield from list(set(group for group in groups.values()))

109

Robin Sonnabend's avatar
Robin Sonnabend committed
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
try:
    import ldap3
    from ldap3.utils.dn import parse_dn

    class LdapManager:
        def __init__(self, host, user_dn, group_dn, port=636, use_ssl=True,
                obsolete=False):
            self.server = ldap3.Server(host, port=port, use_ssl=use_ssl)
            self.user_dn = user_dn
            self.group_dn = group_dn
            self.obsolete = obsolete

        def authenticate(self, username, password):
            try:
                connection = ldap3.Connection(self.server,
                    self.user_dn.format(username), password)
                return connection.bind()
            except ldap3.core.exceptions.LDAPSocketOpenError:
                return False

        def groups(self, username, password=None):
            connection = ldap3.Connection(self.server)
            obj_def = ldap3.ObjectDef("posixgroup", connection)
            group_reader = ldap3.Reader(connection, obj_def, self.group_dn)
            username = username.lower()
            for group in group_reader.search():
                members = group.memberUid.value
                if members is not None and username in members:
                    yield group.cn.value

        def all_groups(self):
            connection = ldap3.Connection(self.server)
            obj_def = ldap3.ObjectDef("posixgroup", connection)
            group_reader = ldap3.Reader(connection, obj_def, self.group_dn)
            for group in group_reader.search():
                yield group.cn.value
Robin Sonnabend's avatar
Robin Sonnabend committed
146
147


Robin Sonnabend's avatar
Robin Sonnabend committed
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    class ADManager:
        def __init__(self, host, domain, user_dn, group_dn,
            port=636, use_ssl=True, ca_cert=None, obsolete=False):
            tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED)
            if ca_cert is not None:
                tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED,
                    ca_certs_file=ca_cert)
            self.server = ldap3.Server(host, port=port, use_ssl=use_ssl,
                tls=tls_config)
            self.domain = domain
            self.user_dn = user_dn
            self.group_dn = group_dn
            self.obsolete = obsolete

        def prepare_connection(self, username=None, password=None):
            if username is not None and password is not None:
                ad_user = "{}\\{}".format(self.domain, username)
                return ldap3.Connection(self.server, ad_user, password)
            return ldap3.Connection(self.server)

        def authenticate(self, username, password):
            try:
                return self.prepare_connection(username, password).bind()
            except ldap3.core.exceptions.LDAPSocketOpenError:
                return False

        def groups(self, username, password):
            connection = self.prepare_connection(username, password)
            connection.bind()
            obj_def = ldap3.ObjectDef("user", connection)
            name_filter = "cn:={}".format(username)
            user_reader = ldap3.Reader(connection, obj_def, self.user_dn,
                name_filter)
            group_def = ldap3.ObjectDef("group", connection)
            def _yield_recursive_groups(group_dn):
                group_reader = ldap3.Reader(connection, group_def, group_dn, None)
                for entry in group_reader.search():
                    yield entry.name.value
                    for child in entry.memberOf:
                        yield from _yield_recursive_groups(child)
            for result in user_reader.search():
                for group_dn in result.memberOf:
                    yield from _yield_recursive_groups(group_dn)


        def all_groups(self):
            connection = self.prepare_connection()
            connection.bind()
            obj_def = ldap3.ObjectDef("group", connection)
            group_reader = ldap3.Reader(connection, obj_def, self.group_dn)
            for result in reader.search():
                yield result.name.value
except ModuleNotFoundError:
    pass


try:
    import grp, pwd, pam

    class PAMManager:
        def __init__(self, obsolete=False):
            self.pam = pam.pam()
            self.obsolete = obsolete

        def authenticate(self, username, password):
            return self.pam.authenticate(username, password)

        def groups(self, username, password=None):
            print(username)
            yield grp.getgrgid(pwd.getpwnam(username).pw_gid).gr_name
            for group in grp.getgrall():
                if username in group.gr_mem:
                    yield group.gr_name

        def all_groups(self):
            for group in grp.getgrall():
Robin Sonnabend's avatar
Robin Sonnabend committed
224
                yield group.gr_name
Robin Sonnabend's avatar
Robin Sonnabend committed
225
226
except ModuleNotFoundError:
    pass
Robin Sonnabend's avatar
Robin Sonnabend committed
227