给定一个保存数以亿计的 IP 地址的文件:
xxxxxxxxxx
10.1.1.3
172.16.114.99
...
以及一个保存 IP 地址范围及其信息的文件(百万行):
xxxxxxxxxx
起始地址 终止地址 信息
10.1.1.1 10.1.1.5 中国北京
172.16.114.1 172.16.115.254 日本东京
...
其中,第一个文件无法完全装入内存,第二个文件可以装入内存。要求:求出第一个文件中的每个 IP 地址的信息。比如:
xxxxxxxxxx
10.1.1.3 中国北京
172.16.114.99 日本东京
...
将第二个文件中的每个数据项里的 IP 地址转换成整数,然后将其放入一个顺序表中,并且按照起始 IP 地址进行排序。
当查询某个 IP 地址所处的地址范围时,先将 IP 地址转换成整数,然后使用二分查找算法在上面的顺序表中进行查找。
为加快效率,可以把第一个文件切片,然后并行地在每个切片上进行搜索。
xxxxxxxxxx
import typing
NUMBERS: set[str] = set(list("0123456789"))
ZERO: int = ord("0")
def a_to_n(a: str) -> int:
"""将数值格式的字符串转换成整数"""
res: int = 0
for ch in a: # type: str
if ch not in NUMBERS:
raise ValueError(f"invalid character {ch}")
n: int = ord(ch) - ZERO
res = res * 10 + n
return res
def ipv4_to_n(ipv4: str) -> int:
"""将 IPv4 地址转换成整数"""
numbers: list[str] = ipv4.split(".")
if len(numbers) != 4:
raise ValueError("length should be 4")
numbers: list[int] = list(map(a_to_n, numbers))
for number in numbers:
if not 0 <= number <= 255:
raise ValueError("number should be between 0 and 255")
return (numbers[0] << 24) + (numbers[1] << 16) + (numbers[2] << 8) + numbers[3]
def n_to_ipv4(n: int) -> str:
"""将整数转换成 IPv4 地址"""
first: int = n >> 24
second: int = (n & 0x00FFFFFF) >> 16
third: int = (n & 0x0000FFFF) >> 8
forth: int = n & 0x000000FF
return f"{first}.{second}.{third}.{forth}"
def binary_search(array: list["Node"], target: int) -> int:
"""二分查找"""
low = 0
high = len(array) - 1
while low <= high:
mid = (low + high) // 2
if array[mid] == target:
return mid
if array[mid] < target:
low = mid + 1
else:
high = mid - 1
return -1
class Node(object):
"""每个 Node 对象代表一个 IP 段"""
def __init__(self, start_ip: str, end_ip: str, info: typing.Any):
"""
:param start_ip: IP 段的起点
:param end_ip: IP 段的终点
:param info: IP 段的信息,比如地理位置
"""
self.start_ip: str = start_ip
self.end_ip: str = end_ip
self.info: typing.Any = info
# 将 IP 段的起点和终点转换成整型,以使用二分查找
self.start_ip_number: int = ipv4_to_n(start_ip)
self.end_ip_number: int = ipv4_to_n(end_ip)
def __lt__(self, other: typing.Union["Node", int]) -> bool:
"""
如果 other 的类型是 Node,那么比较两个 Node 代表的 IP 段的起点;
如果 other 的类型是 int,那么当 Node 的右边界小于 other 时,认为 Node 小于 other;
如果 other 是其它类型的对象,那么抛出异常
"""
if isinstance(other, self.__class__):
# 这里假定区间没有交集,所以直接比较区间的起点
return self.start_ip_number < other.start_ip_number
elif isinstance(other, int):
return self.end_ip_number < other
raise TypeError("Node or int expected")
def __eq__(self, other: int) -> bool:
"""
判断 other 是否在 IP 段内
"""
return self.start_ip_number <= other <= self.end_ip_number
def __str__(self) -> str:
return f"Node[" \
f"start_ip={self.start_ip}, " \
f"end_ip={self.end_ip}, " \
f"info={self.info}, " \
f"range_number={self.start_ip_number}-{self.end_ip_number}" \
f"]"
__repr__ = __str__
class IpSearcher(object):
def __init__(self, ip_info_list: typing.List[typing.Tuple[str, str, typing.Any]]):
"""
:param ip_info_list: 文本形式的 IP 信息列表。比如 [("1.1.1.1", "1.1.1.3", "China"), ...]
"""
# TODO: 检查 IP 段之间是否有交集。要求 IP 段之间不能有交集
self._nodes: list[Node] = []
for ip_info in ip_info_list: # type: [typing.Tuple[str, str, typing.Any]
self._nodes.append(Node(ip_info[0], ip_info[1], ip_info[2]))
# 对 Node 列表进行排序,排序后才能进行二分查找
self._nodes.sort()
def search(self, ipv4: str) -> typing.Any:
"""
搜索附加到 IP 的信息,如果未找到,那么返回 None
:param ipv4: 要查询的 IPV4 地址
:return: 附加到 IP 的信息或 None
"""
ip_number: int = ipv4_to_n(ipv4)
index: int = binary_search(self._nodes, ip_number)
if index == -1:
return None
return self._nodes[index].info
def test():
ip_info_list: typing.List[typing.Tuple] = [
("10.1.1.1", "10.1.1.5", "address 1"),
("10.1.1.6", "10.1.1.100", "address 2"),
("10.1.1.101", "10.1.1.249", "address 3"),
("192.168.1.1", "192.168.2.254", "address 4"),
("192.168.3.1", "192.168.3.254", "address 5"),
]
table: typing.List[typing.Tuple] = [
("9.1.1.1", None),
("10.1.1.3", 0),
("10.1.1.5", 0),
("10.1.1.6", 1),
("10.1.1.70", 1),
("10.1.1.102", 2),
("10.1.1.249", 2),
("10.1.1.251", None),
("111.1.1.1", None),
("192.168.1.1", 3),
("192.168.1.33", 3),
("192.168.2.88", 3),
("192.168.2.254", 3),
("192.168.2.255", None),
("192.168.3.1", 4),
("192.168.3.22", 4),
("192.168.3.254", 4),
("192.168.3.255", None),
("200.1.1.1", None)
]
ip_searcher: IpSearcher = IpSearcher(ip_info_list)
for itm in table:
res: typing.Any = ip_searcher.search(itm[0])
# 进行断言
assert res is None and itm[1] is None or res == ip_info_list[itm[1]][2]
# 打印调试信息
print(f"{itm[0]} at {res}")
if __name__ == "__main__":
test()