Source code for slapd

import os
import socket
import sys
import time
import subprocess
import logging
import atexit
from logging.handlers import SysLogHandler
from shutil import which
from urllib.parse import quote_plus

HERE = os.path.abspath(os.path.dirname(__file__))

SLAPD_CONF_TEMPLATE = r"""dn: cn=config
objectClass: olcGlobal
cn: config
olcServerID: %(serverid)s
olcLogLevel: stats stats2
olcAllows: bind_v2
olcAuthzRegexp: {0}"gidnumber=%(root_gid)s\+uidnumber=%(root_uid)s,cn=peercred,cn=external,cn=auth" "%(rootdn)s"
olcAuthzRegexp: {1}"C=DE, O=python-ldap, OU=slapd-test, CN=([A-Za-z]+)" "ldap://ou=people,dc=local???($1)"
olcTLSCACertificateFile: %(cafile)s
olcTLSCertificateFile: %(servercert)s
olcTLSCertificateKeyFile: %(serverkey)s
olcTLSVerifyClient: try

dn: cn=module,cn=config
objectClass: olcModuleList
cn: module
olcModuleLoad: back_%(database)s

dn: olcDatabase=config,cn=config
objectClass: olcDatabaseConfig
olcDatabase: config
olcRootDN: %(rootdn)s

dn: olcDatabase=%(database)s,cn=config
objectClass: olcDatabaseConfig
objectClass: olcMdbConfig
olcDatabase: %(database)s
olcSuffix: %(suffix)s
olcRootDN: %(rootdn)s
olcRootPW: %(rootpw)s
olcDbDirectory: %(directory)s
olcDbMaxSize: 1000000000
"""


def _add_sbin(path):
    """Add /sbin and related directories to a command search path"""
    directories = path.split(os.pathsep)
    if sys.platform != "win32":
        for sbin in "/usr/local/sbin", "/sbin", "/usr/sbin":
            if sbin not in directories:
                directories.append(sbin)
    return os.pathsep.join(directories)


[docs] def combinedlogger( log_name, log_level=logging.WARN, syslogger_format="%(levelname)s %(message)s", consolelogger_format="%(asctime)s %(levelname)s %(message)s", ): """ Returns a combined SysLogHandler/StreamHandler logging instance with formatters """ if "LOGLEVEL" in os.environ: log_level = os.environ["LOGLEVEL"] try: log_level = int(log_level) except ValueError: pass # for writing to syslog newlogger = logging.getLogger(log_name) if syslogger_format and os.path.exists("/dev/log"): my_syslog_formatter = logging.Formatter( fmt=" ".join((log_name, syslogger_format)) ) my_syslog_handler = logging.handlers.SysLogHandler( address="/dev/log", facility=SysLogHandler.LOG_DAEMON, ) my_syslog_handler.setFormatter(my_syslog_formatter) newlogger.addHandler(my_syslog_handler) if consolelogger_format: my_stream_formatter = logging.Formatter(fmt=consolelogger_format) my_stream_handler = logging.StreamHandler() my_stream_handler.setFormatter(my_stream_formatter) newlogger.addHandler(my_stream_handler) newlogger.setLevel(log_level) return newlogger # end of combinedlogger()
[docs] class Slapd: """ Controller class for a slapd instance, OpenLDAP's server. This class creates a temporary data store for slapd, runs it listening on a private Unix domain socket and TCP port, and initializes it with a top-level entry and the root user. When a reference to an instance of this class is lost, the slapd server is shut down. An instance can be used as a context manager. When exiting the context manager, the slapd server is shut down and the temporary data store is removed. :param schemas: A list of schema names or schema paths to load at startup. By default this only contains `core`. :param host: The host on which the slapd server will listen to. The default value is `127.0.0.1`. :param port: The port on which the slapd server will listen to. If `None` a random available port will be chosen. :param log_level: The verbosity of Slapd. The default value is `logging.WARNING`. :param suffix: The LDAP suffix for all objects. The default is `dc=slapd-test,dc=python-ldap,dc=org`. :param root_cn: The root user common name. The default value is `Manager`. :param root_pw: The root user password. The default value is `password`. :param configuration_template: An optional inital database template. :param datadir_prefix: The prefix of the temporary directory where the slapd configuration and data will be stored. The default value is `python-ldap-test`. :param debug: Wether to launch slapd with debug verbosity on. When `True` debug is enabled, when `False` debug is disabled, when `None`, debug is only enable when *log_level* is `logging.DEBUG`. Default value is `None`. """ TMPDIR = os.environ.get("TMP", os.getcwd()) if "SCHEMA" in os.environ: SCHEMADIR = os.environ["SCHEMA"] elif os.path.isdir("/etc/openldap/schema"): SCHEMADIR = "/etc/openldap/schema" elif os.path.isdir("/etc/ldap/schema"): SCHEMADIR = "/etc/ldap/schema" else: SCHEMADIR = None BIN_PATH = os.environ.get("BIN", os.environ.get("PATH", os.defpath)) SBIN_PATH = os.environ.get("SBIN", _add_sbin(BIN_PATH)) def __init__( self, host=None, port=None, log_level=logging.WARN, schemas=None, database="mdb", suffix="dc=slapd-test,dc=python-ldap,dc=org", root_cn="Manager", root_pw="password", configuration_template=None, datadir_prefix=None, debug=None, ): self.logger = combinedlogger("python-ldap-test", log_level=log_level) self.schemas = schemas or ("core.ldif",) self.database = database self.suffix = suffix self.root_cn = root_cn self.root_pw = root_pw self.host = host or "127.0.0.1" self._proc = None self.port = port or self._avail_tcpport() self.server_id = self.port % 4096 self.testrundir = os.path.join( self.TMPDIR, "%s-%d" % (datadir_prefix or "python-ldap-test", self.port) ) self._slapd_conf = os.path.join(self.testrundir, "slapd.d") self._db_directory = os.path.join(self.testrundir, "openldap-data") self.ldap_uri = "ldap://%s:%d/" % (self.host, self.port) self.configuration_template = configuration_template or SLAPD_CONF_TEMPLATE self.debug = debug have_ldapi = hasattr(socket, "AF_UNIX") if have_ldapi: ldapi_path = os.path.join(self.testrundir, "ldapi") self.ldapi_uri = "ldapi://%s" % quote_plus(ldapi_path) self.default_ldap_uri = self.ldapi_uri # use SASL/EXTERNAL via LDAPI when invoking OpenLDAP CLI tools self.cli_sasl_external = True else: self.ldapi_uri = None self.default_ldap_uri = self.ldap_uri # Use simple bind via LDAP uri self.cli_sasl_external = False self._find_commands() if self.SCHEMADIR is None: raise ValueError("SCHEMADIR is None, ldap schemas are missing.") self.cafile = os.path.join(HERE, "certs/ca.pem") self.servercert = os.path.join(HERE, "certs/server.pem") self.serverkey = os.path.join(HERE, "certs/server.key") self.clientcert = os.path.join(HERE, "certs/client.pem") self.clientkey = os.path.join(HERE, "certs/client.key") def __enter__(self): self.start() return self def __exit__(self, exc_type, exc_value, traceback): self.stop() @property def root_dn(self): return "cn={self.root_cn},{self.suffix}".format(self=self) def _find_commands(self): self.PATH_LDAPADD = self._find_command("ldapadd") self.PATH_LDAPDELETE = self._find_command("ldapdelete") self.PATH_LDAPMODIFY = self._find_command("ldapmodify") self.PATH_LDAPSEARCH = self._find_command("ldapsearch") self.PATH_LDAPWHOAMI = self._find_command("ldapwhoami") self.PATH_SLAPADD = self._find_command("slapadd") self.PATH_SLAPCAT = self._find_command("slapcat") self.PATH_SLAPD = os.environ.get("SLAPD", None) if not self.PATH_SLAPD: self.PATH_SLAPD = self._find_command("slapd", in_sbin=True) def _find_command(self, cmd, in_sbin=False): if in_sbin: path = self.SBIN_PATH var_name = "SBIN" else: path = self.BIN_PATH var_name = "BIN" command = which(cmd, path=path) if command is None: raise ValueError( "Command '{}' not found. Set the {} environment variable to " "override slapd's search path.".format(cmd, var_name) ) return command def _setup_rundir(self): """ creates rundir structure for setting up a custom directory structure you have to override this method """ os.mkdir(self.testrundir) os.mkdir(self._db_directory) dir_name = os.path.join(self.testrundir, "slapd.d") self.logger.debug("Create directory %s", dir_name) os.mkdir(dir_name) def _cleanup_rundir(self): """ Recursively delete whole directory specified by `path' """ if not os.path.exists(self.testrundir): return self.logger.debug("clean-up %s", self.testrundir) for dirpath, dirnames, filenames in os.walk(self.testrundir, topdown=False): for filename in filenames: self.logger.debug("remove %s", os.path.join(dirpath, filename)) os.remove(os.path.join(dirpath, filename)) for dirname in dirnames: self.logger.debug("rmdir %s", os.path.join(dirpath, dirname)) os.rmdir(os.path.join(dirpath, dirname)) os.rmdir(self.testrundir) self.logger.info("cleaned-up %s", self.testrundir) def _avail_tcpport(self): """ find an available port for TCP connection """ sock = socket.socket() try: sock.bind((self.host, 0)) port = sock.getsockname()[1] finally: sock.close() self.logger.info("Found available port %d", port) return port def _gen_config(self): """ generates a slapd.conf and returns it as one string for generating specific static configuration files you have to override this method """ config_dict = { "serverid": hex(self.server_id), "database": self.database, "directory": self._db_directory, "suffix": self.suffix, "rootdn": self.root_dn, "rootpw": self.root_pw, "root_uid": os.getuid(), "root_gid": os.getgid(), "cafile": self.cafile, "servercert": self.servercert, "serverkey": self.serverkey, } return self.configuration_template % config_dict def _write_config(self): """Loads the slapd.d configuration.""" self.logger.debug("importing configuration: %s", self._slapd_conf) self.slapadd(self._gen_config(), ["-n0"]) ldif_paths = [ schema if os.path.exists(schema) else os.path.join(self.SCHEMADIR, schema) for schema in self.schemas ] for ldif_path in ldif_paths: self.slapadd(None, ["-n0", "-l", ldif_path]) self.logger.debug("import ok: %s", self._slapd_conf) def _test_config(self): self.logger.debug("testing config %s", self._slapd_conf) popen_list = [ self.PATH_SLAPD, "-Ttest", "-F", self._slapd_conf, "-u", "-v", "-d", "config", ] p = subprocess.run(popen_list, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if p.returncode != 0: self.logger.error(p.stdout.decode("utf-8")) raise RuntimeError("configuration test failed") self.logger.info("config ok: %s", self._slapd_conf) def _start_slapd(self): """ Spawns/forks the slapd process """ urls = [self.ldap_uri] if self.ldapi_uri: urls.append(self.ldapi_uri) slapd_args = [ self.PATH_SLAPD, "-F", self._slapd_conf, "-h", " ".join(urls), ] if self.debug is True or ( self.debug is None and self.logger.isEnabledFor(logging.DEBUG) ): slapd_args.extend(["-d", "-1"]) else: slapd_args.extend(["-d", "0"]) self.logger.info("starting slapd: %r", " ".join(slapd_args)) self._proc = subprocess.Popen(slapd_args) deadline = time.monotonic() + 10 while True: if self._proc.poll() is not None: # pragma: no cover self._stopped() raise RuntimeError("slapd exited before opening port") try: self.logger.debug("slapd connection check to %s", self.default_ldap_uri) self.ldapwhoami() except RuntimeError: if time.monotonic() >= deadline: # pragma: no cover break time.sleep(0.2) else: return raise RuntimeError("slapd did not start properly") # pragma: no cover
[docs] def start(self): """ Starts the slapd server process running, and waits for it to come up. """ if self._proc is not None: return atexit.register(self.stop) self._cleanup_rundir() self._setup_rundir() self._write_config() self._test_config() self._start_slapd() self.logger.debug( "slapd with pid=%d listening on %s and %s", self._proc.pid, self.ldap_uri, self.ldapi_uri, )
[docs] def stop(self): """ Stops the slapd server, and waits for it to terminate and cleans up """ if self._proc is not None: self.logger.debug("stopping slapd with pid %d", self._proc.pid) self._proc.terminate() self.wait() self._cleanup_rundir() atexit.unregister(self.stop)
[docs] def restart(self): """ Restarts the slapd server with same data """ self._proc.terminate() self.wait() self._start_slapd()
[docs] def wait(self): """Waits for the slapd process to terminate by itself.""" if self._proc: self._proc.wait() self._stopped()
def _stopped(self): """Called when the slapd server is known to have terminated""" if self._proc is not None: self.logger.info("slapd[%d] terminated", self._proc.pid) self._proc = None def _cli_auth_args(self): if self.cli_sasl_external: authc_args = [ "-Y", "EXTERNAL", ] if not self.logger.isEnabledFor(logging.DEBUG): authc_args.append("-Q") else: authc_args = [ "-x", "-D", self.root_dn, "-w", self.root_pw, ] return authc_args def _cli_popen( self, ldapcommand, extra_args=None, ldap_uri=None, stdin_data=None, expected=0, ): if isinstance(expected, int): expected = [expected] if ldap_uri is None: ldap_uri = self.default_ldap_uri if ldapcommand.split("/")[-1].startswith("ldap"): args = [ldapcommand, "-H", ldap_uri] + self._cli_auth_args() else: args = [ldapcommand, "-F", self._slapd_conf] args += extra_args or [] self.logger.debug("Run command: %r", " ".join(args)) proc = subprocess.run(args, input=stdin_data, capture_output=True) self.logger.debug( "stdin_data=%s", stdin_data.decode("utf-8") if stdin_data else stdin_data ) if proc.stdout is not None: self.logger.debug("stdout=%s", proc.stdout.decode("utf-8")) if proc.stderr is not None: self.logger.debug("stderr=%s", proc.stderr.decode("utf-8")) if proc.returncode not in expected: raise RuntimeError( "Unexpected process return code (expected {}, got {}): {!r}".format( expected, proc.returncode, " ".join(args) ) ) return proc
[docs] def ldapwhoami(self, extra_args=None, expected=0): """ Runs ldapwhoami on this slapd instance :param extra_args: Extra argument to pass to *ldapwhoami*. :param expected: Expected return code. Defaults to `0`. :type expected: An integer or a list of integers :return: A :class:`subprocess.CompletedProcess` with the *ldapwhoami* execution data. """ return self._cli_popen( self.PATH_LDAPWHOAMI, extra_args=extra_args, expected=expected )
[docs] def ldapadd(self, ldif, extra_args=None, expected=0): """ Runs ldapadd on this slapd instance, passing it the ldif content :param ldif: The ldif content to pass to the *ldapadd* standard input. :param extra_args: Extra argument to pass to *ldapadd*. :param expected: Expected return code. Defaults to `0`. :type expected: An integer or a list of integers :return: A :class:`subprocess.CompletedProcess` with the *ldapadd* execution data. """ return self._cli_popen( self.PATH_LDAPADD, extra_args=extra_args, stdin_data=ldif.encode("utf-8") if ldif else None, expected=expected, )
[docs] def ldapmodify(self, ldif, extra_args=None, expected=0): """ Runs ldapadd on this slapd instance, passing it the ldif content :param ldif: The ldif content to pass to the *ldapmodify* standard input. :param extra_args: Extra argument to pass to *ldapmodify*. :param expected: Expected return code. Defaults to `0`. :type expected: An integer or a list of integers :return: A :class:`subprocess.CompletedProcess` with the *ldapmodify* execution data. """ return self._cli_popen( self.PATH_LDAPMODIFY, extra_args=extra_args, stdin_data=ldif.encode("utf-8") if ldif else None, expected=expected, )
[docs] def ldapdelete(self, dn, recursive=False, extra_args=None, expected=0): """ Runs ldapdelete on this slapd instance, deleting 'dn' :param dn: The distinguished name of the element to delete. :param recursive: Whether to delete sub-elements. Defaults to `False`. :param extra_args: Extra argument to pass to *ldapdelete*. :param expected: Expected return code. Defaults to `0`. :type expected: An integer or a list of integers :return: A :class:`subprocess.CompletedProcess` with the *ldapdelete* execution data. """ if extra_args is None: extra_args = [] if recursive: extra_args.append("-r") extra_args.append(dn) return self._cli_popen( self.PATH_LDAPDELETE, extra_args=extra_args, expected=expected )
[docs] def ldapsearch(self, filter, searchbase=None, extra_args=None, expected=0): """ Runs search on this slapd instance :param filter: The search filter. :param base: The starting point for the search. :param extra_args: Extra argument to pass to *ldapdelete*. :param expected: Expected return code. Defaults to `0`. :type expected: An integer or a list of integers :return: A :class:`subprocess.CompletedProcess` with the *ldapdelete* execution data. """ if extra_args is None: extra_args = [] if searchbase: extra_args.extend(["-b", searchbase]) extra_args.append(filter) return self._cli_popen( self.PATH_LDAPSEARCH, extra_args=extra_args, expected=expected )
[docs] def slapadd(self, ldif, extra_args=None, expected=0): """ Runs slapadd on this slapd instance, passing it the ldif content :param ldif: The ldif content to pass to the *slapadd* standard input. :param extra_args: Extra argument to pass to *slapadd*. :param expected: Expected return code. Defaults to `0`. :type expected: An integer or a list of integers :return: A :class:`subprocess.CompletedProcess` with the *slapadd* execution data. """ return self._cli_popen( self.PATH_SLAPADD, stdin_data=ldif.encode("utf-8") if ldif else None, extra_args=extra_args, expected=expected, )
[docs] def slapcat(self, extra_args=None, expected=0): """ Runs slapadd on this slapd instance, passing it the ldif content :param extra_args: Extra argument to pass to *slapcat*. :param expected: Expected return code. Defaults to `0`. :type expected: An integer or a list of integers :return: A :class:`subprocess.CompletedProcess` with the *slapcat* execution data. """ return self._cli_popen( self.PATH_SLAPCAT, extra_args=extra_args, expected=expected, )
[docs] def init_tree(self): """ Creates the organization and applicationProcess object. """ suffix_dc = self.suffix.split(",")[0][3:] return self.ldapadd( "\n".join( [ "dn: " + self.suffix, "objectClass: dcObject", "objectClass: organization", "dc: " + suffix_dc, "o: " + suffix_dc, "", "dn: " + self.root_dn, "objectClass: applicationProcess", "cn: " + self.root_cn, ] ) + "\n" )