给定一个保存数以亿计的 IP 地址的文件:
xxxxxxxxxx10.1.1.3172.16.114.99...
以及一个保存 IP 地址范围及其信息的文件(百万行):
xxxxxxxxxx起始地址 终止地址 信息10.1.1.1 10.1.1.5 中国北京172.16.114.1 172.16.115.254 日本东京...
其中,第一个文件无法完全装入内存,第二个文件可以装入内存。要求:求出第一个文件中的每个 IP 地址的信息。比如:
xxxxxxxxxx10.1.1.3 中国北京172.16.114.99 日本东京...
将第二个文件中的每个数据项里的 IP 地址转换成整数,然后将其放入一个顺序表中,并且按照起始 IP 地址进行排序。
当查询某个 IP 地址所处的地址范围时,先将 IP 地址转换成整数,然后使用二分查找算法在上面的顺序表中进行查找。
为加快效率,可以把第一个文件切片,然后并行地在每个切片上进行搜索。
xxxxxxxxxximport typing
NUMBERS: set[str] = set(list("0123456789"))ZERO: int = ord("0")
def a_to_n(a: str) -> int: """将数值格式的字符串转换成整数""" res: int = 0 for ch in a: # type: str if ch not in NUMBERS: raise ValueError(f"invalid character {ch}") n: int = ord(ch) - ZERO res = res * 10 + n return res
def ipv4_to_n(ipv4: str) -> int: """将 IPv4 地址转换成整数""" numbers: list[str] = ipv4.split(".") if len(numbers) != 4: raise ValueError("length should be 4") numbers: list[int] = list(map(a_to_n, numbers)) for number in numbers: if not 0 <= number <= 255: raise ValueError("number should be between 0 and 255") return (numbers[0] << 24) + (numbers[1] << 16) + (numbers[2] << 8) + numbers[3]
def n_to_ipv4(n: int) -> str: """将整数转换成 IPv4 地址""" first: int = n >> 24 second: int = (n & 0x00FFFFFF) >> 16 third: int = (n & 0x0000FFFF) >> 8 forth: int = n & 0x000000FF return f"{first}.{second}.{third}.{forth}"
def binary_search(array: list["Node"], target: int) -> int: """二分查找""" low = 0 high = len(array) - 1
while low <= high: mid = (low + high) // 2 if array[mid] == target: return mid if array[mid] < target: low = mid + 1 else: high = mid - 1 return -1
class Node(object): """每个 Node 对象代表一个 IP 段"""
def __init__(self, start_ip: str, end_ip: str, info: typing.Any): """ :param start_ip: IP 段的起点 :param end_ip: IP 段的终点 :param info: IP 段的信息,比如地理位置 """ self.start_ip: str = start_ip self.end_ip: str = end_ip self.info: typing.Any = info # 将 IP 段的起点和终点转换成整型,以使用二分查找 self.start_ip_number: int = ipv4_to_n(start_ip) self.end_ip_number: int = ipv4_to_n(end_ip)
def __lt__(self, other: typing.Union["Node", int]) -> bool: """ 如果 other 的类型是 Node,那么比较两个 Node 代表的 IP 段的起点; 如果 other 的类型是 int,那么当 Node 的右边界小于 other 时,认为 Node 小于 other; 如果 other 是其它类型的对象,那么抛出异常 """ if isinstance(other, self.__class__): # 这里假定区间没有交集,所以直接比较区间的起点 return self.start_ip_number < other.start_ip_number elif isinstance(other, int): return self.end_ip_number < other raise TypeError("Node or int expected")
def __eq__(self, other: int) -> bool: """ 判断 other 是否在 IP 段内 """ return self.start_ip_number <= other <= self.end_ip_number
def __str__(self) -> str: return f"Node[" \ f"start_ip={self.start_ip}, " \ f"end_ip={self.end_ip}, " \ f"info={self.info}, " \ f"range_number={self.start_ip_number}-{self.end_ip_number}" \ f"]"
__repr__ = __str__
class IpSearcher(object): def __init__(self, ip_info_list: typing.List[typing.Tuple[str, str, typing.Any]]): """ :param ip_info_list: 文本形式的 IP 信息列表。比如 [("1.1.1.1", "1.1.1.3", "China"), ...] """ # TODO: 检查 IP 段之间是否有交集。要求 IP 段之间不能有交集 self._nodes: list[Node] = [] for ip_info in ip_info_list: # type: [typing.Tuple[str, str, typing.Any] self._nodes.append(Node(ip_info[0], ip_info[1], ip_info[2])) # 对 Node 列表进行排序,排序后才能进行二分查找 self._nodes.sort()
def search(self, ipv4: str) -> typing.Any: """ 搜索附加到 IP 的信息,如果未找到,那么返回 None :param ipv4: 要查询的 IPV4 地址 :return: 附加到 IP 的信息或 None """ ip_number: int = ipv4_to_n(ipv4) index: int = binary_search(self._nodes, ip_number) if index == -1: return None return self._nodes[index].info
def test(): ip_info_list: typing.List[typing.Tuple] = [ ("10.1.1.1", "10.1.1.5", "address 1"), ("10.1.1.6", "10.1.1.100", "address 2"), ("10.1.1.101", "10.1.1.249", "address 3"), ("192.168.1.1", "192.168.2.254", "address 4"), ("192.168.3.1", "192.168.3.254", "address 5"), ] table: typing.List[typing.Tuple] = [ ("9.1.1.1", None), ("10.1.1.3", 0), ("10.1.1.5", 0), ("10.1.1.6", 1), ("10.1.1.70", 1), ("10.1.1.102", 2), ("10.1.1.249", 2), ("10.1.1.251", None), ("111.1.1.1", None), ("192.168.1.1", 3), ("192.168.1.33", 3), ("192.168.2.88", 3), ("192.168.2.254", 3), ("192.168.2.255", None), ("192.168.3.1", 4), ("192.168.3.22", 4), ("192.168.3.254", 4), ("192.168.3.255", None), ("200.1.1.1", None) ]
ip_searcher: IpSearcher = IpSearcher(ip_info_list) for itm in table: res: typing.Any = ip_searcher.search(itm[0]) # 进行断言 assert res is None and itm[1] is None or res == ip_info_list[itm[1]][2] # 打印调试信息 print(f"{itm[0]} at {res}")
if __name__ == "__main__": test()