LDAP(轻量级目录访问协议,Lightweight Directory Access Protocol)以树状的层次结构存储数据。目录是为查询、浏览和搜索而优化的专业分布式数据库,它呈树状结构组织数据。目录服务是由目录数据库和访问协议组成的系统。
LDAP 提供复杂的不同层次的访问控制(ACL)。
条目,也叫记录项,是 LDAP 中最基本的颗粒,就像字典中的词条、数据库中的记录。通常对 LDAP 的添加、删除、更改、检索等操作都以条目为基本对象。
每个条目都有唯一的标识名(Distinguished Name,DN)如上图中的"cn=baby,ou=marketing,ou=people,dc=mydomain,dc=org"。通过 DN 的层次型语法结构,可方便地表示出条目在 LDAP 树中的位置,通常用于检索。LDAP 目录树的最顶部是所谓的 Base DN,比如"dc=mydomain,dc=org"。
每个条目可以有很多属性(Attribute),比如常见的人有姓名、地址、电话等属性。每个属性都有名称及对应的值,属性值可以有单个或多个,比如一个人可以有多个邮箱。
定义属性需要符合规则,该规则通过 schema 指定。比如,如果 entry 不包含在 inetorgperson schema 中的 ObjectClass inetOrgPerson,那么就不能为其指定 employeeNumber 属性,因为该属性是在 inetOrgPerson 中定义的。
LDAP 为人员组织机构中常见的设计了属性,比如 commonName、surname。
对象类是属性的集合,通过对象类可以方便地定义条目类型。条目通过继承对象类的方式,继承各种属性。
对象类有三种类型:结构类型(Structural)、抽象类型(Abstract)和辅助类型(Auxiliary)。结构类型是最基本的类型,它规定对象实体的基本属性,每个条目属于且仅属于一个结构型对象类。抽象类型是结构类型或其它抽象类型的父类,它将对象属性中共性的部分组织在一起,成为其它类的模版,条目不能直接继承抽象型对象类。辅助类型规定对象实体的扩展属性。虽然每个条目只属于一个结构型对象类,但可以同时属于多个辅助型对象类。
对象类本身可以相互继承,所有对象类的根类是 top。以常用的人员类型为例,它们的继承关系如下:
对象类(ObjectClass)、属性类型(AttributeType)、语法(Syntax)分别约定条目、属性、值,它们之间的关系如下图所示,所有这些构成模式(Schema) - 对象类的集合。条目数据在导入时,通常需要接受模式检查,以确保目录中所有条目的数据结构一致。
Schema(通常在 /etc/ldap/schema/ 目录)在导入时需要注意前后顺序。
LDAP 的守护进程 slapd 接收、响应请求,但实际存储、获取数据的操作由 Backend 完成,数据存放在 Database 中。一个 Backend 可以有多个 Database Instance,但每个 Database 的 suffix 和 rootdn 不一样。
LDIF(LDAP Data Interchange Format,数据交换格式)是 LDAP 数据库信息的文本格式,用于数据的导入/导出,没行是"属性:值"对。
LDAP 是以查询为主的目录结构,无论何种查询方式,最终都由过滤器确定查询的条件。过滤器相当于 SQL 中的 WHERE 子句。LDAP 的类过滤和字符串都必须放在括号内,比如 "(objectclass=*)"。
可使用 =、>=、<=、~=(约等于)进行比较,比如"(number<=100)"。合并条件必须把操作符放在两个操作对象的前面,单一操作对象用括号括起来。比如:
sudo apt install -y slapd ldap-utils
说明:
- 在安装过程中,会要求为 LDAP 创建管理员密码。
使用如下命令查看管理员账号:
sudo ldapsearch -H ldapi:// -LLL -Q -Y EXTERNAL -b "cn=config" "(olcRootDN=*)" dn olcRootDN olcRootPW
# 输出:
# dn: olcDatabase={1}mdb,cn=config
# olcRootDN: cn=admin,dc=nodomain
# olcRootPW: {SSHA}mRCiS+rtBWXZnFDR700/YVqvgoA5YxRf
可以看到目录信息树(DIT)的后缀是 nodomain,接下来对其进行更改。
sudo dpkg-reconfigure slapd
说明:
- 第一步回答 No
- 第二步填写域名,比如 mycompany.com
- 第三步填写组织名,比如 Company
- 第四步填写管理员密码,比如 secret;第五步确认管理员密码
- 第六步选择使用的数据库后端,比如 MDB
- 第七步选择在清除 slapd 时是否移除数据库,比如 Yes
- 第八步选择是否移除旧数据库,比如 Yes
再次查看管理员账户:
sudo ldapsearch -H ldapi:// -LLL -Q -Y EXTERNAL -b "cn=config" "(olcRootDN=*)" dn olcRootDN olcRootPW
# 输出:
# dn: olcDatabase={1}mdb,cn=config
# olcRootDN: cn=admin,dc=mycompany,dc=com
# olcRootPW: {SSHA}+bYNH2SvOxLNzZbO+yC3+SP+tG4OOkkp
创建 acl.ldif:
dn: olcDatabase={1}mdb,cn=config
changetype: modify
replace: olcAccess
olcAccess: to attrs=userPassword by anonymous auth by dn.base="cn=admin,dc=mycompany,dc=com" write by * none
olcAccess: to * by anonymous auth by dn.subtree="ou=admin,dc=mycompany,dc=com" write by * none
导入配置:
sudo ldapmodify -Q -Y EXTERNAL -H ldapi:/// -f acl.ldif
更多关于 ACL 的细节,请移步参考文档或自行查阅资料。
数据库文件在:/var/lib/ldap/
配置目录在:/etc/ldap/slapd.d/
slapd 端口:389
查看 slapd 状态:
systemctl status slapd
xxxxxxxxxx
from typing import Dict, Union, List, Optional, Generator
import contextlib
import unittest
import ldap
from ldap.ldapobject import LDAPObject
import ldap.modlist as modlist
class LDAP:
"""
LDAP 客户端
"""
def __init__(
self,
ldap_host: str,
user_dn: str,
password: str,
base_dn: str,
) -> None:
"""
:param ldap_host: LDAP 服务地址
:param user_dn: 用户 DN
:param password: 用户密码
:param base_dn: 用户管理的 DN
"""
self._ldap_host: str = ldap_host
self._user_dn: str = user_dn
self._password: str = password
self._base_dn: str = base_dn
contextmanager .
def _get_connection(self) -> Generator[LDAPObject, None, None]:
"""
获取 LDAPObject。
说明:
* 因不确定 LDAPObject 是否 Thread Safety,故每次操作前创建新对象
"""
connection: LDAPObject = ldap.initialize(self._ldap_host)
connection.simple_bind_s(self._user_dn, self._password)
try:
yield connection
finally:
# 如果需要清理,那么执行清理
...
def create_ou(self, ou_name: str) -> None:
"""
创建 ou。
说明:
* 该实现只创建支持一级部门
:param ou_name: ou 名称
"""
attrs: Dict[str, Union[bytes, List[bytes]]] = {
"objectclass": [b"top", b"organizationalUnit"],
"ou": ou_name.encode(),
}
ou_dn: str = f"ou={ou_name},{self._base_dn}"
try:
with self._get_connection() as connection:
connection.add_s(ou_dn, modlist.addModlist(attrs))
except ldap.ALREADY_EXISTS:
print(f"{ou_name} already exists")
else:
print(f"created ou {ou_name} successfully")
def create_user(
self,
username: str,
password: str,
ou_dn: Optional[str] = None,
employee_number: Optional[int] = None,
telephone_number: Optional[str] = None
) -> None:
"""
创建用户
:param username: 用户名
:param password: 密码
:param ou_dn: ou dn,其中不要包含 Base DN 部分
:param employee_number: 工号
:param telephone_number: 手机号
"""
attrs: Dict[str, Union[bytes, List[bytes]]] = {
"objectclass": [b"top", b"person", b"organizationalPerson", b"inetOrgPerson"],
"cn": username.encode(),
"sn": username.encode(),
"userPassword": password.encode()
}
if not ou_dn:
ou_dn = f"{self._base_dn}"
else:
ou_dn = f"{ou_dn},{self._base_dn}"
if employee_number:
attrs["employeeNumber"] = f"{employee_number}".encode()
if telephone_number:
attrs["telephoneNumber"] = f"{telephone_number}".encode()
user_dn: str = f"cn={username},{ou_dn}"
try:
with self._get_connection() as connection:
connection.add_s(user_dn, modlist.addModlist(attrs))
except ldap.ALREADY_EXISTS:
print(f"{user_dn} already exists")
else:
print(f"created user {user_dn} successfully")
def get_user_info(self, username: str, from_dn: Optional[str] = None) -> Optional[Dict[str, str]]:
"""
获取用户信息。
说明:
* 返回搜索到的第一个用户
:param username: 用户名
:param from_dn: 从哪个 DN 开始搜索,其中不要包含 Base DN 部分
"""
filter_str: str = f"(&(objectclass=person)(cn={username}))"
if not from_dn:
from_dn = self._base_dn
else:
from_dn = f"{from_dn},{self._base_dn}"
with self._get_connection() as connection:
ret = connection.search_s(
from_dn,
ldap.SCOPE_SUBTREE, # 搜索范围
filter_str
)
if not ret:
return None
return {
"telephone_number": ret[0][1]["telephoneNumber"][0].decode(),
"employee_number": ret[0][1]["employeeNumber"][0].decode(),
"username": username,
"dn": ret[0][0]
}
def authenticate_user(self, username: str, password: str) -> bool:
"""
验证用户的账号和密码
:param username: 用户名
:param password: 密码
:return: 是否通过验证
"""
connection: ldap.ldapobject = ldap.initialize(self._ldap_host)
user_info: Optional[Dict[str, str]] = self.get_user_info(username)
if not user_info:
return False
try:
connection.simple_bind_s(user_info["dn"], password)
except ldap.INVALID_CREDENTIALS:
return False
else:
return True
def delete_dn(self, dn: str) -> None:
"""
删除指定的 DN
:param dn: 要删除的 DN,其中不要包含 Base DN 部分
"""
with self._get_connection() as connection: # type: LDAPObject
connection.delete_s(f"{dn},{self._base_dn}")
class TestLDAP(unittest.TestCase):
def setUp(self) -> None:
self.ldap_host: str = "ldap://192.168.56.101:389"
self.base_dn: str = "dc=mycompany,dc=com"
self.client: LDAP = LDAP(
self.ldap_host,
f"cn=admin,{self.base_dn}",
"secret",
self.base_dn
)
def testLDAP(self) -> None:
# 创建部门 admin
self.client.create_ou("admin")
# 向部门 admin 添加新用户 tim
self.client.create_user("tim", "secret", "ou=admin", 1000, "17600000000")
# 获取名为 tim 的用户的信息
user_info: Optional[Dict[str, str]] = self.client.get_user_info("tim")
self.assertIsNotNone(user_info)
self.assertEqual("1000", user_info["employee_number"])
self.assertEqual("17600000000", user_info["telephone_number"])
self.assertTrue(self.client.authenticate_user("tim", "secret"))
self.assertFalse(self.client.authenticate_user("tim", "secret~"))
# 创建部门 rd
self.client.create_ou("rd")
# 向部门 rd 添加新用户 scott
self.client.create_user("scott", "secret", "ou=rd", 1001, "17600000001")
# 假设 admin 部门里的用户都是管理员
client: LDAP = LDAP(
self.ldap_host,
f"cn=tim,ou=admin,{self.base_dn}",
"secret",
f"ou=rd,{self.base_dn}")
# 获取 scott 的信息
user_info: Optional[Dict[str, str]] = client.get_user_info("scott")
print(user_info)
# 验证 scott
self.assertTrue(client.authenticate_user("scott", "secret"))
def tearDown(self) -> None:
# 删除用户 tim
self.client.delete_dn("cn=tim,ou=admin")
# 删除部门 admin
self.client.delete_dn("ou=admin")
# 删除用户 scott
self.client.delete_dn("cn=scott,ou=rd")
# 删除部门 rd
self.client.delete_dn("ou=rd")
if __name__ == "__main__":
unittest.main()
Vagrant.configure("2") do |config|
config.vm.box = "generic/ubuntu1804"
vms = Array(101..101)
vms.each do |seq|
config.vm.define :"openldap-#{seq}" do |vagrant|
vagrant.vm.hostname = "openldap-#{seq}"
vagrant.vm.network "private_network", ip: "192.168.56.#{seq}"
vagrant.vm.provider "virtualbox" do |vb|
vb.customize ["modifyvm", :id, "--name", "openldap-#{seq}"]
vb.gui = false
vb.memory = "3072"
vb.cpus = "4"
end
end
end
end