memberlist 是 Hashicorp 公司开源的 Gossip 库,该库被 Consul 使用,它是 SWIM(THE SWIM MEMBERSHIP PROTOCOL)的扩展实现。
main.go:
xpackage main
import ( "encoding/json" "flag" "fmt" "log" "net/http" "os" "strings" "sync"
"github.com/hashicorp/memberlist" "github.com/pborman/uuid")
var ( mtx sync.RWMutex items = map[string]string{} broadcasts *memberlist.TransmitLimitedQueue
members = flag.String("members", "", "comma seperated list of members") port = flag.Int("port", 4001, "http port"))
func init() { flag.Parse()}
type update struct { Action string Data map[string]string}
type delegate struct{}
func (d *delegate) NodeMeta(limit int) []byte { return []byte{}}
func (d *delegate) NotifyMsg(b []byte) { if len(b) == 0 { return }
switch b[0] { case 'd': var updates []*update if err := json.Unmarshal(b[1:], &updates); err != nil { return } mtx.Lock() for _, u := range updates { for k, v := range u.Data { switch u.Action { case "add": items[k] = v case "del": delete(items, k) } } } mtx.Unlock() }}
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { return broadcasts.GetBroadcasts(overhead, limit)}
func (d *delegate) LocalState(join bool) []byte { mtx.RLock() m := items mtx.RUnlock() b, _ := json.Marshal(m) return b}
func (d *delegate) MergeRemoteState(buf []byte, join bool) { if len(buf) == 0 { return } if !join { return } var m map[string]string if err := json.Unmarshal(buf, &m); err != nil { return } mtx.Lock() for k, v := range m { items[k] = v } mtx.Unlock()}
type broadcast struct { msg []byte notify chan<- struct{}}
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool { return false}
func (b *broadcast) Message() []byte { return b.msg}
func (b *broadcast) Finished() { if b.notify != nil { close(b.notify) }}
func addHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseForm(); err != nil { http.Error(w, err.Error(), 500) return } key := r.Form.Get("key") val := r.Form.Get("val") mtx.Lock() items[key] = val mtx.Unlock() b, err := json.Marshal([]*update{ { Action: "add", Data: map[string]string{ key: val, }, }, }) if err != nil { http.Error(w, err.Error(), 500) return } // 广播数据 broadcasts.QueueBroadcast(&broadcast{ msg: append([]byte("d"), b...), notify: nil, })}
func delHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseForm(); err != nil { http.Error(w, err.Error(), 500) return } key := r.Form.Get("key") mtx.Lock() delete(items, key) mtx.Unlock()
b, err := json.Marshal([]*update{ { Action: "del", Data: map[string]string{ key: "", }, }, }) if err != nil { http.Error(w, err.Error(), 500) return } broadcasts.QueueBroadcast(&broadcast{ msg: append([]byte("d"), b...), notify: nil, })}
func getHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseForm(); err != nil { http.Error(w, err.Error(), 500) return } key := r.Form.Get("key") mtx.RLock() val := items[key] mtx.RUnlock() if _, err := w.Write([]byte(val)); err != nil { log.Printf("fail to write response, err: %s.\n", err) }}
func start() error { hostname, _ := os.Hostname() c := memberlist.DefaultLocalConfig() c.Delegate = &delegate{} c.BindPort = 0 c.Name = hostname + "-" + uuid.NewUUID().String() // 创建 Gossip 网络 m, err := memberlist.Create(c) if err != nil { return err } // 第一个节点没有 member,从第二个开始有 member if len(*members) > 0 { parts := strings.Split(*members, ",") _, err := m.Join(parts) if err != nil { return err } } broadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: func() int { return m.NumMembers() }, RetransmitMult: 3, } node := m.LocalNode() log.Printf("Local member %s:%d\n", node.Addr, node.Port) return nil}
func main() { if err := start(); err != nil { panic(err) } http.HandleFunc("/add", addHandler) http.HandleFunc("/del", delHandler) http.HandleFunc("/get", getHandler) fmt.Printf("Listening on :%d\n", *port) if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil); err != nil { panic(err) }}go.mod:
xxxxxxxxxxgo 1.19
require ( github.com/hashicorp/memberlist v0.5.0 github.com/pborman/uuid v1.2.1)在第一个窗口执行:
go run main.go --port=1112可看到类似下面的输出:
x2022/12/19 16:23:25 [DEBUG] memberlist: Using dynamic bind port 577352022/12/19 16:23:25 Local member 10.1.1.32:57735Listening on :1112
注意其中的 Local member,后面要用到
在第二个窗口执行:
go run main.go --port=1113 --members=10.1.1.32:57735在第三个窗口执行:
go run main.go --port=1114 --members=10.1.1.32:57735在第四个窗口执行:
x
curl 'http://localhost:1112/add?key=foo&val=bar'
curl 'http://localhost:1113/get?key=foo'# 输出:bar
curl 'http://localhost:1112/del?key=foo'
curl 'http://localhost:1114/get?key=foo'# 无输出节点的状态有三种:
alive - 节点是“活的”suspect - 对于 PingMsg 没有应答或应答超时时,节点的状态是“可疑的”dead - 节点“已死亡”
1)如果节点 B 无法对节点 A 发出的 PingMsg 进行响应,或响应超时,那么它将被节点 A 标记为 suspect。如果 suspect 持续一段时间(或 A 收到足够多的其它节点关于 B 的 SuspectMsg),节点 A 会在集群中广播 SuspectMsg,告知集群中其它节点:节点 B 很可疑
2)如果节点 B 收到针对它的 SuspectMsg,那么它通过发送 AliveMsg,告知对方:“I'm alive”。在对方节点看来,B 的状态从 suspect 变为 alive
3)如果一段时间内,节点 B 的状态仍然是 suspect,那么对于节点 A 而言,B 的状态会被置为 dead
4)一段时间后,如果节点 B 重新上线,那么它可以通过与种子节点的 Gossip(Push/Pull)重新被认为 alive
memberlist 的所有动作都可以在 schedule() 中看到,动作共有 3 种。
state.go:
xxxxxxxxxx// Schedule is used to ensure the Tick is performed periodically. This// function is safe to call multiple times. If the memberlist is already// scheduled, then it won't do anything.func (m *Memberlist) schedule() { m.tickerLock.Lock() defer m.tickerLock.Unlock()
// If we already have tickers, then don't do anything, since we're // scheduled if len(m.tickers) > 0 { return }
// Create the stop tick channel, a blocking channel. We close this // when we should stop the tickers. stopCh := make(chan struct{})
// Create a new probeTicker if m.config.ProbeInterval > 0 { t := time.NewTicker(m.config.ProbeInterval) // 动作 1 go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe) m.tickers = append(m.tickers, t) }
// Create a push pull ticker if needed if m.config.PushPullInterval > 0 { // 动作 2 go m.pushPullTrigger(stopCh) }
// Create a gossip ticker if needed if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 { t := time.NewTicker(m.config.GossipInterval) // 动作 3 go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip) m.tickers = append(m.tickers, t) }
// If we made any tickers, then record the stopTick channel for // later. if len(m.tickers) > 0 { m.stopTick = stopCh }}周期性地探测集群中状态为 alive 和 suspect 的节点。每个周期只探测一个节点。
周期性地从已知 alive 的集群节点中选择 1 个节点,进行 Push/Pull 交换信息。交换的信息包括两种:
广播所有 dead 节点(只广播一次)
集群中有 A、B、C 三个节点,其中 A 为种子节点,B 和 C 为普通节点。

为避免该问题,需要多配置种子节点,并保证所有种子节点不同时宕机。