Skip to content
Snippets Groups Projects
Commit d2bd0596 authored by Lars Beckers's avatar Lars Beckers
Browse files

initial commit

parents
No related branches found
No related tags found
No related merge requests found
.idea/
*.pyc
__pycache__/
# OX/LDAP Synchronisation
Syncs users and groups from LDAP (according to samba-driven ActiveDirectory)
with open-xchange.
Requires at least `python3.6` and the dependencies listed in `requirements.txt`.
Call with `python -m oxldapsync` when the module is placed in your `PYTHONPATH`.
Alternatively, call `python __main__.py` directly.
Supports `-c` for config file location and `-v` for enabling verbose output.
An example configuration file is `ldapsync.conf`. Its default location is
`/opt/oxldapsync/etc/ldapsync.conf`, because the previous `perl` implementation
used this path. (This default is subject to change.) It uses the same syntax,
because that happens to be compatible to python's `ConfigParser` (w/o sections)
as well. Option names may include underscores and shadow same options without
those. Note, that you have to specify your OX credentials via the config file,
instead of the command line.
Misses multiple features of the original `perl` version. But is at least
maintainable.
#ox_path = /opt/open-xchange/sbin
#ox_admin_username = oxadmin
#ox_admin_password = password
#ox_context_id = 1
#ox_default_timezone = Europe/Berlin
#ox_default_language = de_DE
#ox_dont_modify = oxadmin
ldap_host = ad1.example.com,ad2.example.com
ldap_user_dn = cn=users,dc=example,dc=com
ldap_group_dn = cn=users,dc=example,dc=com
#ldap_username = cn=account,cn=users,dc=example,dc=com
#ldap_password = password
#ldap_ca_cert = /etc/ssl/certs/example_ca.pem
#ldap_port = 636
#ldap_user_filter = (&(objectClass=user)(mail=*))
if __name__ == '__main__':
from . import __main__
__main__.main()
import click
import logging
from . import config
from . import sync
logger = logging.getLogger()
@click.command()
@click.option('-v', '--verbose', is_flag=True, help='print non-critical logging messages')
@click.option('-c', '--config-file', default='/opt/oxldapsync/etc/ldapsync.conf',
show_default=True, help='configuration file name')
def main(verbose, config_file):
if verbose:
logger.setLevel(logging.INFO)
logger.info(f'reading config file {config_file}')
config.read_config(config_file)
logger.info('calling do_sync()')
sync.do_sync()
logger.info('done')
main()
import configparser
from pathlib import Path
import logging
logger = logging.getLogger()
_config = {}
def get(name, default=None):
return _config.get(name, _config.get(name.replace('_', ''), default))
def read_config(filename=None):
if filename is None or not Path(filename).resolve().is_file():
logger.warning('config file not given or unreadable, trying default')
filename = '/opt/oxldapsync/etc/ldapsync.conf'
with open(filename, 'r') as f:
config_string = '[values]\n' + f.read()
cp = configparser.ConfigParser()
cp['DEFAULT'] = {
'oxpath': "/opt/open-xchange/sbin/",
'oxcontextid': 1,
'ldapport': 636,
'oxdefaultlanguage': 'de_DE',
'oxdefaulttimezone': 'Europe/Berlin',
'oxdontmodify': 'oxadmin'
}
cp.read_string(config_string)
_config.update(cp['DEFAULT'])
_config.update(cp['values'])
_config.update({'ox_new_user_args': [
'--password', '""',
'--timezone', get('ox_default_timezone'),
'--language', get('ox_default_language')
]})
import ssl
import ldap3
import sys
import logging
from . import config
logger = logging.getLogger()
class LDAPConnection:
def __init__(self):
tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED)
if config.get('ldap_ca_cert', None):
tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED,
ca_certs_file=config.get('ldap_ca_cert'))
host = config.get('ldap_host').split(',')
if len(host) == 1:
host = host[0]
self.server = ldap3.Server(host, config.get('ldap_port'),
use_ssl=True, tls=tls_config)
else:
self.server = ldap3.ServerPool([
ldap3.Server(entry, config.get('ldap_port'),
use_ssl=True, tls=tls_config)
for entry in host], ldap3.FIRST)
self.domain = config.get('ldap_domain')
self.user_dn = config.get('ldap_user_dn')
self.group_dn = config.get('ldap_group_dn')
self.username = config.get('ldap_username', None)
self.password = config.get('ldap_password', None)
logger.info(f'connecting to {host} for {self.domain}')
self.connection = self._connect()
logger.info(f'authenticating using user {self.username}')
if not self._authenticate():
logger.error(f'could not authenticate with {host} for {self.domain} using user {self.username}')
sys.exit(1)
def _connect(self):
if self.username:
password = self.password or ''
username = "{}\\{}".format(self.domain, self.username)
return ldap3.Connection(self.server, username, password)
return ldap3.Connection(self.server)
def _authenticate(self):
if not self.connection.bind():
return False
obj_def = ldap3.ObjectDef("user", self.connection)
name_filter = "cn:={}".format(self.username)
user_reader = ldap3.Reader(self.connection, obj_def, self.user_dn, name_filter)
for result in user_reader.search():
if not self.is_user_enabled(result):
return False
return True
def is_user_enabled(self, user):
if user.userAccountControl in [546, 514, 66050, 66082]:
return False
return True
def search_users(self):
obj_def = ldap3.ObjectDef("user", self.connection)
user_filter = config.get('ldap_user_filter', '(&(objectClass=user)(mail=*))')
search_obj = ldap3.Reader(self.connection, obj_def, self.user_dn, user_filter)
for entry in search_obj.search():
if not self.is_user_enabled(entry):
continue
yield entry
def search_groups(self):
obj_def = ldap3.ObjectDef("group", self.connection)
search_obj = ldap3.Reader(self.connection, obj_def, self.group_dn)
for entry in search_obj.search():
yield entry
def groups_for_user(self, username):
obj_def = ldap3.ObjectDef("user", self.connection)
name_filter = "cn:={}".format(username)
user_reader = ldap3.Reader(self.connection, obj_def, self.user_dn, name_filter)
group_def = ldap3.ObjectDef("group", self.connection)
def _yield_recursive_groups(group_dn):
group_reader = ldap3.Reader(self.connection, group_def, group_dn)
for entry in group_reader.search():
yield entry.name.value
for child in entry.memberOf:
_yield_recursive_groups(child)
for result in user_reader.search():
for group_dn in result.memberOf:
_yield_recursive_groups(group_dn)
import subprocess
import csv
import logging
from . import config
logger = logging.getLogger()
def run_cmd(cmd):
cmd[0] = config.get('ox_path') + '/' + cmd[0]
if config.get('ox_admin_username', None):
cmd.insert(1, '-A ' + config.get('ox_admin_username'))
cmd.insert(2, '-P ' + config.get('ox_admin_password'))
cmd.insert(1, '-c ' + config.get('ox_context_id'))
logger.info(str(cmd))
return subprocess.run(cmd, check=True, capture_output=True, text=True)
def add_user(entry):
cmd = ['createuser']
cmd.extend(get_user_props(entry))
cmd.extend(config.get('ox_new_user_args', []))
return run_cmd(cmd)
def modify_user(entry):
cmd = ['changeuser']
cmd.extend(get_user_props(entry))
return run_cmd(cmd)
def delete_user(username):
return run_cmd(['deleteuser', '-u', username])
entry_to_cmdline = {
'cn': '--username',
'displayName': '--displayname',
'givenName': '--givenname',
'sn': '--surname',
'mail': '--email',
}
def get_user_props(entry):
ret = []
for k, v in entry_to_cmdline:
mapped = entry[k].value
if mapped:
ret.extend([v, mapped])
return ret
def get_users():
proc = run_cmd(['listuser', '--csv'])
reader = csv.DictReader(proc.stdout)
return list(reader)
def add_group(group, display_name, members):
if display_name == 'users':
return True
cmd = ['creategroup']
cmd.extend(get_group_props(group, display_name, members))
return run_cmd(cmd)
def add_group_members(group, display_name, members):
if display_name == 'users':
return False
cmd = ['changegroup']
cmd.extend(get_group_props(group, display_name, members))
return run_cmd(cmd)
def delete_group_members(group, display_name, members):
cmd = ['changegroup']
cmd.extend(get_group_props(group, display_name, members, remove_members=True))
return run_cmd(cmd)
def delete_group(group):
if group == 'users':
return False
return run_cmd(['deletegroup', '-n', group])
def get_group_props(group, display_name, members, remove_members=False):
member_op = '-a' if not remove_members else '-r'
return [
'-n', group,
'-d', display_name,
member_op, ','.join(members),
]
def get_groups():
proc = run_cmd(['listgroup', '--csv'])
reader = csv.DictReader(proc.stdout)
return list(reader)
from . import ldap
from . import ox
from . import config
import logging
logger = logging.getLogger()
def do_sync():
ldap_conn = ldap.LDAPConnection()
sync_users(ldap_conn)
sync_groups(ldap_conn)
def _find(haystack, needle):
for cnt, entry in enumerate(haystack):
if entry['name'] == needle:
return cnt
return None
def sync_users(ldap_conn):
logger.info('syncing users')
ox_users = ox.get_users()
for entry in ldap_conn.search_users():
username = entry['cn']
element = _find(ox_users, username)
if element is None:
ox.add_user(entry)
continue
ox.modify_user(entry)
ox_users[element]['touched'] = True
dont_modify = config.get('oxdontmodify', '').split(',')
for user in ox_users:
if not user.get('touched', False) and user['name'] not in dont_modify:
ox.delete_user(user['name'])
def sync_groups(ldap_conn):
logger.info('syncing groups')
ox_groups = ox.get_groups()
ox_users = ox.get_users()
for entry in ldap_conn.search_groups():
group = entry['name'].value
display_name = entry['displayName'].value
if display_name is None:
display_name = group
element = _find(ox_groups, group)
if element is None:
members = []
for member in entry['member'].value:
member = member.split(',')[0].split('"')[1]
cnt = _find(ox_users, member)
if cnt is not None:
members.append(ox_users[cnt]['id'])
ox.add_group(group, display_name, members)
continue
current_members = ox_groups[element]['members'].split(',')
active_members = len(current_members)
new_members = []
for cnt, member in enumerate(entry['member'].value):
member = member.split(',')[0].split('"')[1]
cnt = _find(ox_users, member)
if cnt is not None:
if ox_users[cnt]['id'] in current_members:
current_members.remove(ox_users[cnt]['id'])
else:
new_members.append(ox_users[cnt]['id'])
if current_members:
ox.delete_group_members(group, display_name, current_members)
if new_members:
ox.add_group_members(group, display_name, new_members)
ox_groups[element]['touched'] = (len(current_members) < active_members or new_members)
for group in ox_groups:
if not group.get('touched', False):
ox.delete_group(group['name'].value)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment