网上有很多关于一致性哈希的文章,比如:
https://www.cnblogs.com/lpfuture/p/5796398.html。
下面主要提供一个带“虚拟节点”的Python一致性哈希实现。
# coding: utf8
from hashlib import md5
import bisect
class Node(object):
def __init__(self):
self._key = None
self._weight = 1
self._meta = {}
def set_key(self, key):
self._key = key
return self
def get_key(self):
return self._key
def set_weight(self, weight):
self._weight = weight
return self
def get_weight(self):
return self._weight
def with_meta(self, key, value):
self._meta[key] = value
return self
def has_meta(self, key):
return key in self._meta
def get_meta(self, key):
return self._meta[key]
def __str__(self):
return "Node{key=%s, weight=%d, meta=%s}" % \
(self._key, self._weight, self._meta)
__repr__ = __str__
class ConsistentHash(object):
def __init__(self,
nodes,
hash_algorithm,
ring_length,
name_virtual_node_func):
self._nodes = nodes
self._hash_algorithm = hash_algorithm
self._ring_length = ring_length
self._name_virtual_node_func = name_virtual_node_func
self._initialize()
def _initialize(self):
self._value_to_node = {}
for node in self._nodes:
for ind in range(1, node.get_weight() + 1):
virtual_node_name = self._name_virtual_node_func(node.get_key(), ind)
hash_value = self._get_hash_value(virtual_node_name)
if hash_value in self._value_to_node:
raise RuntimeError("hash value collision")
self._value_to_node[hash_value] = node
self._node_values = sorted(self._value_to_node.keys())
def _get_hash_value(self, key):
return self._hash_algorithm(key) % self._ring_length
def get_node(self, key):
# 形成环
hash_value = self._get_hash_value(key)
if hash_value >= self._node_values[-1]:
index = 0
else:
index = bisect.bisect_right(self._node_values, hash_value)
return self._value_to_node[self._node_values[index]]
class Builder(object):
def __init__(self):
self._nodes = None
self._hash_algorithm = lambda string: long(md5(string).hexdigest(), 16)
self._ring_length = 2 ** 32
self._name_virtual_node_func = self._name_virtual_node
def _name_virtual_node(self, node_name, node_index):
return "%100s#%10d" % (node_name, node_index)
def with_nodes(self, nodes):
self._nodes = nodes
return self
def with_hash_algorithm(self, hash_algorithm):
self._hash_algorithm = hash_algorithm
return self
def with_ring_length(self, ring_length):
self._ring_length = ring_length
return self
def with_name_virtual_node_func(self, name_virtual_node_func):
self._name_virtual_node_func = name_virtual_node_func
return self
def build(self):
if self._nodes is None:
raise RuntimeError("missing parameter nodes")
if self._hash_algorithm is None:
raise RuntimeError("missing parameter hash_algorithm")
if self._ring_length is None:
raise RuntimeError("missing parameter ring_length")
if self._name_virtual_node_func is None:
raise RuntimeError("missing parameter name_virtual_node_func")
return ConsistentHash(
self._nodes,
self._hash_algorithm,
self._ring_length,
self._name_virtual_node_func)
if __name__ == "__main__":
import os
nodes = []
node1 = Node().set_key("192.168.0.100:20000").set_weight(1).with_meta("name", "node1")
nodes.append(node1)
node2 = Node().set_key("172.16.0.100:10000").set_weight(1).with_meta("name", "node2")
nodes.append(node2)
node3 = Node().set_key("127.0.0.1:30000").set_weight(2).with_meta("name", "node3")
nodes.append(node3)
consistent_hash = ConsistentHash.Builder() \
.with_nodes(nodes) \
.build()
for i in range(10):
print(consistent_hash.get_node(os.urandom(100)).get_meta("name"))