LDAP(轻量级目录访问协议,Lightweight Directory Access Protocol)以树状的层次结构存储数据。目录是为查询、浏览和搜索而优化的专业分布式数据库,它呈树状结构组织数据。目录服务是由目录数据库和访问协议组成的系统。
LDAP 提供复杂的不同层次的访问控制(ACL)。

条目,也叫记录项,是 LDAP 中最基本的颗粒,就像字典中的词条、数据库中的记录。通常对 LDAP 的添加、删除、更改、检索等操作都以条目为基本对象。
每个条目都有唯一的标识名(Distinguished Name,DN)如上图中的"cn=baby,ou=marketing,ou=people,dc=mydomain,dc=org"。通过 DN 的层次型语法结构,可方便地表示出条目在 LDAP 树中的位置,通常用于检索。LDAP 目录树的最顶部是所谓的 Base DN,比如"dc=mydomain,dc=org"。
每个条目可以有很多属性(Attribute),比如常见的人有姓名、地址、电话等属性。每个属性都有名称及对应的值,属性值可以有单个或多个,比如一个人可以有多个邮箱。
定义属性需要符合规则,该规则通过 schema 指定。比如,如果 entry 不包含在 inetorgperson schema 中的 ObjectClass inetOrgPerson,那么就不能为其指定 employeeNumber 属性,因为该属性是在 inetOrgPerson 中定义的。
LDAP 为人员组织机构中常见的设计了属性,比如 commonName、surname。
对象类是属性的集合,通过对象类可以方便地定义条目类型。条目通过继承对象类的方式,继承各种属性。
对象类有三种类型:结构类型(Structural)、抽象类型(Abstract)和辅助类型(Auxiliary)。结构类型是最基本的类型,它规定对象实体的基本属性,每个条目属于且仅属于一个结构型对象类。抽象类型是结构类型或其它抽象类型的父类,它将对象属性中共性的部分组织在一起,成为其它类的模版,条目不能直接继承抽象型对象类。辅助类型规定对象实体的扩展属性。虽然每个条目只属于一个结构型对象类,但可以同时属于多个辅助型对象类。
对象类本身可以相互继承,所有对象类的根类是 top。以常用的人员类型为例,它们的继承关系如下:

对象类(ObjectClass)、属性类型(AttributeType)、语法(Syntax)分别约定条目、属性、值,它们之间的关系如下图所示,所有这些构成模式(Schema) - 对象类的集合。条目数据在导入时,通常需要接受模式检查,以确保目录中所有条目的数据结构一致。
Schema(通常在 /etc/ldap/schema/ 目录)在导入时需要注意前后顺序。

LDAP 的守护进程 slapd 接收、响应请求,但实际存储、获取数据的操作由 Backend 完成,数据存放在 Database 中。一个 Backend 可以有多个 Database Instance,但每个 Database 的 suffix 和 rootdn 不一样。
LDIF(LDAP Data Interchange Format,数据交换格式)是 LDAP 数据库信息的文本格式,用于数据的导入/导出,没行是"属性:值"对。
LDAP 是以查询为主的目录结构,无论何种查询方式,最终都由过滤器确定查询的条件。过滤器相当于 SQL 中的 WHERE 子句。LDAP 的类过滤和字符串都必须放在括号内,比如 "(objectclass=*)"。
可使用 =、>=、<=、~=(约等于)进行比较,比如"(number<=100)"。合并条件必须把操作符放在两个操作对象的前面,单一操作对象用括号括起来。比如:
sudo apt install -y slapd ldap-utils说明:
- 在安装过程中,会要求为 LDAP 创建管理员密码。
使用如下命令查看管理员账号:
sudo ldapsearch -H ldapi:// -LLL -Q -Y EXTERNAL -b "cn=config" "(olcRootDN=*)" dn olcRootDN olcRootPW# 输出:# dn: olcDatabase={1}mdb,cn=config# olcRootDN: cn=admin,dc=nodomain# olcRootPW: {SSHA}mRCiS+rtBWXZnFDR700/YVqvgoA5YxRf可以看到目录信息树(DIT)的后缀是 nodomain,接下来对其进行更改。
sudo dpkg-reconfigure slapd说明:
- 第一步回答 No
- 第二步填写域名,比如 mycompany.com
- 第三步填写组织名,比如 Company
- 第四步填写管理员密码,比如 secret;第五步确认管理员密码
- 第六步选择使用的数据库后端,比如 MDB
- 第七步选择在清除 slapd 时是否移除数据库,比如 Yes
- 第八步选择是否移除旧数据库,比如 Yes
再次查看管理员账户:
sudo ldapsearch -H ldapi:// -LLL -Q -Y EXTERNAL -b "cn=config" "(olcRootDN=*)" dn olcRootDN olcRootPW# 输出:# dn: olcDatabase={1}mdb,cn=config# olcRootDN: cn=admin,dc=mycompany,dc=com# olcRootPW: {SSHA}+bYNH2SvOxLNzZbO+yC3+SP+tG4OOkkp创建 acl.ldif:
dn: olcDatabase={1}mdb,cn=configchangetype: modifyreplace: olcAccessolcAccess: to attrs=userPassword by anonymous auth by dn.base="cn=admin,dc=mycompany,dc=com" write by * noneolcAccess: to * by anonymous auth by dn.subtree="ou=admin,dc=mycompany,dc=com" write by * none
导入配置:
sudo ldapmodify -Q -Y EXTERNAL -H ldapi:/// -f acl.ldif更多关于 ACL 的细节,请移步参考文档或自行查阅资料。
数据库文件在:/var/lib/ldap/
配置目录在:/etc/ldap/slapd.d/
slapd 端口:389
查看 slapd 状态:
systemctl status slapdxxxxxxxxxxfrom typing import Dict, Union, List, Optional, Generatorimport contextlibimport unittest
import ldapfrom ldap.ldapobject import LDAPObjectimport ldap.modlist as modlist
class LDAP: """ LDAP 客户端 """
def __init__( self, ldap_host: str, user_dn: str, password: str, base_dn: str, ) -> None: """ :param ldap_host: LDAP 服务地址 :param user_dn: 用户 DN :param password: 用户密码 :param base_dn: 用户管理的 DN """ self._ldap_host: str = ldap_host self._user_dn: str = user_dn self._password: str = password self._base_dn: str = base_dn
.contextmanager def _get_connection(self) -> Generator[LDAPObject, None, None]: """ 获取 LDAPObject。 说明: * 因不确定 LDAPObject 是否 Thread Safety,故每次操作前创建新对象 """ connection: LDAPObject = ldap.initialize(self._ldap_host) connection.simple_bind_s(self._user_dn, self._password) try: yield connection finally: # 如果需要清理,那么执行清理 ...
def create_ou(self, ou_name: str) -> None: """ 创建 ou。 说明: * 该实现只创建支持一级部门 :param ou_name: ou 名称 """ attrs: Dict[str, Union[bytes, List[bytes]]] = { "objectclass": [b"top", b"organizationalUnit"], "ou": ou_name.encode(), } ou_dn: str = f"ou={ou_name},{self._base_dn}" try: with self._get_connection() as connection: connection.add_s(ou_dn, modlist.addModlist(attrs)) except ldap.ALREADY_EXISTS: print(f"{ou_name} already exists") else: print(f"created ou {ou_name} successfully")
def create_user( self, username: str, password: str, ou_dn: Optional[str] = None, employee_number: Optional[int] = None, telephone_number: Optional[str] = None ) -> None: """ 创建用户 :param username: 用户名 :param password: 密码 :param ou_dn: ou dn,其中不要包含 Base DN 部分 :param employee_number: 工号 :param telephone_number: 手机号 """ attrs: Dict[str, Union[bytes, List[bytes]]] = { "objectclass": [b"top", b"person", b"organizationalPerson", b"inetOrgPerson"], "cn": username.encode(), "sn": username.encode(), "userPassword": password.encode() } if not ou_dn: ou_dn = f"{self._base_dn}" else: ou_dn = f"{ou_dn},{self._base_dn}" if employee_number: attrs["employeeNumber"] = f"{employee_number}".encode() if telephone_number: attrs["telephoneNumber"] = f"{telephone_number}".encode() user_dn: str = f"cn={username},{ou_dn}" try: with self._get_connection() as connection: connection.add_s(user_dn, modlist.addModlist(attrs)) except ldap.ALREADY_EXISTS: print(f"{user_dn} already exists") else: print(f"created user {user_dn} successfully")
def get_user_info(self, username: str, from_dn: Optional[str] = None) -> Optional[Dict[str, str]]: """ 获取用户信息。 说明: * 返回搜索到的第一个用户 :param username: 用户名 :param from_dn: 从哪个 DN 开始搜索,其中不要包含 Base DN 部分 """ filter_str: str = f"(&(objectclass=person)(cn={username}))" if not from_dn: from_dn = self._base_dn else: from_dn = f"{from_dn},{self._base_dn}" with self._get_connection() as connection: ret = connection.search_s( from_dn, ldap.SCOPE_SUBTREE, # 搜索范围 filter_str ) if not ret: return None return { "telephone_number": ret[0][1]["telephoneNumber"][0].decode(), "employee_number": ret[0][1]["employeeNumber"][0].decode(), "username": username, "dn": ret[0][0] }
def authenticate_user(self, username: str, password: str) -> bool: """ 验证用户的账号和密码
:param username: 用户名 :param password: 密码 :return: 是否通过验证 """ connection: ldap.ldapobject = ldap.initialize(self._ldap_host) user_info: Optional[Dict[str, str]] = self.get_user_info(username) if not user_info: return False try: connection.simple_bind_s(user_info["dn"], password) except ldap.INVALID_CREDENTIALS: return False else: return True
def delete_dn(self, dn: str) -> None: """ 删除指定的 DN :param dn: 要删除的 DN,其中不要包含 Base DN 部分 """ with self._get_connection() as connection: # type: LDAPObject connection.delete_s(f"{dn},{self._base_dn}")
class TestLDAP(unittest.TestCase): def setUp(self) -> None: self.ldap_host: str = "ldap://192.168.56.101:389" self.base_dn: str = "dc=mycompany,dc=com" self.client: LDAP = LDAP( self.ldap_host, f"cn=admin,{self.base_dn}", "secret", self.base_dn )
def testLDAP(self) -> None: # 创建部门 admin self.client.create_ou("admin") # 向部门 admin 添加新用户 tim self.client.create_user("tim", "secret", "ou=admin", 1000, "17600000000") # 获取名为 tim 的用户的信息 user_info: Optional[Dict[str, str]] = self.client.get_user_info("tim") self.assertIsNotNone(user_info) self.assertEqual("1000", user_info["employee_number"]) self.assertEqual("17600000000", user_info["telephone_number"]) self.assertTrue(self.client.authenticate_user("tim", "secret")) self.assertFalse(self.client.authenticate_user("tim", "secret~"))
# 创建部门 rd self.client.create_ou("rd") # 向部门 rd 添加新用户 scott self.client.create_user("scott", "secret", "ou=rd", 1001, "17600000001")
# 假设 admin 部门里的用户都是管理员 client: LDAP = LDAP( self.ldap_host, f"cn=tim,ou=admin,{self.base_dn}", "secret", f"ou=rd,{self.base_dn}") # 获取 scott 的信息 user_info: Optional[Dict[str, str]] = client.get_user_info("scott") print(user_info) # 验证 scott self.assertTrue(client.authenticate_user("scott", "secret"))
def tearDown(self) -> None: # 删除用户 tim self.client.delete_dn("cn=tim,ou=admin") # 删除部门 admin self.client.delete_dn("ou=admin") # 删除用户 scott self.client.delete_dn("cn=scott,ou=rd") # 删除部门 rd self.client.delete_dn("ou=rd")
if __name__ == "__main__": unittest.main()
Vagrant.configure("2") do |config| config.vm.box = "generic/ubuntu1804"
vms = Array(101..101) vms.each do |seq| config.vm.define :"openldap-#{seq}" do |vagrant| vagrant.vm.hostname = "openldap-#{seq}" vagrant.vm.network "private_network", ip: "192.168.56.#{seq}" vagrant.vm.provider "virtualbox" do |vb| vb.customize ["modifyvm", :id, "--name", "openldap-#{seq}"] vb.gui = false vb.memory = "3072" vb.cpus = "4" end end endend