memberlist 是 Hashicorp 公司开源的 Gossip 库,该库被 Consul 使用,它是 SWIM(THE SWIM MEMBERSHIP PROTOCOL)的扩展实现。
main.go:
xpackage main
import (
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os"
"strings"
"sync"
"github.com/hashicorp/memberlist"
"github.com/pborman/uuid"
)
var (
mtx sync.RWMutex
items = map[string]string{}
broadcasts *memberlist.TransmitLimitedQueue
members = flag.String("members", "", "comma seperated list of members")
port = flag.Int("port", 4001, "http port")
)
func init() {
flag.Parse()
}
type update struct {
Action string
Data map[string]string
}
type delegate struct{}
func (d *delegate) NodeMeta(limit int) []byte {
return []byte{}
}
func (d *delegate) NotifyMsg(b []byte) {
if len(b) == 0 {
return
}
switch b[0] {
case 'd':
var updates []*update
if err := json.Unmarshal(b[1:], &updates); err != nil {
return
}
mtx.Lock()
for _, u := range updates {
for k, v := range u.Data {
switch u.Action {
case "add":
items[k] = v
case "del":
delete(items, k)
}
}
}
mtx.Unlock()
}
}
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
return broadcasts.GetBroadcasts(overhead, limit)
}
func (d *delegate) LocalState(join bool) []byte {
mtx.RLock()
m := items
mtx.RUnlock()
b, _ := json.Marshal(m)
return b
}
func (d *delegate) MergeRemoteState(buf []byte, join bool) {
if len(buf) == 0 {
return
}
if !join {
return
}
var m map[string]string
if err := json.Unmarshal(buf, &m); err != nil {
return
}
mtx.Lock()
for k, v := range m {
items[k] = v
}
mtx.Unlock()
}
type broadcast struct {
msg []byte
notify chan<- struct{}
}
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
return false
}
func (b *broadcast) Message() []byte {
return b.msg
}
func (b *broadcast) Finished() {
if b.notify != nil {
close(b.notify)
}
}
func addHandler(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
http.Error(w, err.Error(), 500)
return
}
key := r.Form.Get("key")
val := r.Form.Get("val")
mtx.Lock()
items[key] = val
mtx.Unlock()
b, err := json.Marshal([]*update{
{
Action: "add",
Data: map[string]string{
key: val,
},
},
})
if err != nil {
http.Error(w, err.Error(), 500)
return
}
// 广播数据
broadcasts.QueueBroadcast(&broadcast{
msg: append([]byte("d"), b...),
notify: nil,
})
}
func delHandler(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
http.Error(w, err.Error(), 500)
return
}
key := r.Form.Get("key")
mtx.Lock()
delete(items, key)
mtx.Unlock()
b, err := json.Marshal([]*update{
{
Action: "del",
Data: map[string]string{
key: "",
},
},
})
if err != nil {
http.Error(w, err.Error(), 500)
return
}
broadcasts.QueueBroadcast(&broadcast{
msg: append([]byte("d"), b...),
notify: nil,
})
}
func getHandler(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
http.Error(w, err.Error(), 500)
return
}
key := r.Form.Get("key")
mtx.RLock()
val := items[key]
mtx.RUnlock()
if _, err := w.Write([]byte(val)); err != nil {
log.Printf("fail to write response, err: %s.\n", err)
}
}
func start() error {
hostname, _ := os.Hostname()
c := memberlist.DefaultLocalConfig()
c.Delegate = &delegate{}
c.BindPort = 0
c.Name = hostname + "-" + uuid.NewUUID().String()
// 创建 Gossip 网络
m, err := memberlist.Create(c)
if err != nil {
return err
}
// 第一个节点没有 member,从第二个开始有 member
if len(*members) > 0 {
parts := strings.Split(*members, ",")
_, err := m.Join(parts)
if err != nil {
return err
}
}
broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return m.NumMembers()
},
RetransmitMult: 3,
}
node := m.LocalNode()
log.Printf("Local member %s:%d\n", node.Addr, node.Port)
return nil
}
func main() {
if err := start(); err != nil {
panic(err)
}
http.HandleFunc("/add", addHandler)
http.HandleFunc("/del", delHandler)
http.HandleFunc("/get", getHandler)
fmt.Printf("Listening on :%d\n", *port)
if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil); err != nil {
panic(err)
}
}
go.mod:
xxxxxxxxxx
go 1.19
require (
github.com/hashicorp/memberlist v0.5.0
github.com/pborman/uuid v1.2.1
)
在第一个窗口执行:
go run main.go --port=1112
可看到类似下面的输出:
x2022/12/19 16:23:25 [DEBUG] memberlist: Using dynamic bind port 57735
2022/12/19 16:23:25 Local member 10.1.1.32:57735
Listening on :1112
注意其中的 Local member,后面要用到
在第二个窗口执行:
go run main.go --port=1113 --members=10.1.1.32:57735
在第三个窗口执行:
go run main.go --port=1114 --members=10.1.1.32:57735
在第四个窗口执行:
x
curl 'http://localhost:1112/add?key=foo&val=bar'
curl 'http://localhost:1113/get?key=foo'
# 输出:bar
curl 'http://localhost:1112/del?key=foo'
curl 'http://localhost:1114/get?key=foo'
# 无输出
节点的状态有三种:
alive
- 节点是“活的”suspect
- 对于 PingMsg 没有应答或应答超时时,节点的状态是“可疑的”dead
- 节点“已死亡”1)如果节点 B 无法对节点 A 发出的 PingMsg 进行响应,或响应超时,那么它将被节点 A 标记为 suspect。如果 suspect 持续一段时间(或 A 收到足够多的其它节点关于 B 的 SuspectMsg),节点 A 会在集群中广播 SuspectMsg,告知集群中其它节点:节点 B 很可疑
2)如果节点 B 收到针对它的 SuspectMsg,那么它通过发送 AliveMsg,告知对方:“I'm alive”。在对方节点看来,B 的状态从 suspect 变为 alive
3)如果一段时间内,节点 B 的状态仍然是 suspect,那么对于节点 A 而言,B 的状态会被置为 dead
4)一段时间后,如果节点 B 重新上线,那么它可以通过与种子节点的 Gossip(Push/Pull)重新被认为 alive
memberlist 的所有动作都可以在 schedule() 中看到,动作共有 3 种。
state.go:
xxxxxxxxxx
// Schedule is used to ensure the Tick is performed periodically. This
// function is safe to call multiple times. If the memberlist is already
// scheduled, then it won't do anything.
func (m *Memberlist) schedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()
// If we already have tickers, then don't do anything, since we're
// scheduled
if len(m.tickers) > 0 {
return
}
// Create the stop tick channel, a blocking channel. We close this
// when we should stop the tickers.
stopCh := make(chan struct{})
// Create a new probeTicker
if m.config.ProbeInterval > 0 {
t := time.NewTicker(m.config.ProbeInterval)
// 动作 1
go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
m.tickers = append(m.tickers, t)
}
// Create a push pull ticker if needed
if m.config.PushPullInterval > 0 {
// 动作 2
go m.pushPullTrigger(stopCh)
}
// Create a gossip ticker if needed
if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
t := time.NewTicker(m.config.GossipInterval)
// 动作 3
go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
m.tickers = append(m.tickers, t)
}
// If we made any tickers, then record the stopTick channel for
// later.
if len(m.tickers) > 0 {
m.stopTick = stopCh
}
}
周期性地探测集群中状态为 alive 和 suspect 的节点。每个周期只探测一个节点。
周期性地从已知 alive 的集群节点中选择 1 个节点,进行 Push/Pull 交换信息。交换的信息包括两种:
广播所有 dead 节点(只广播一次)
集群中有 A、B、C 三个节点,其中 A 为种子节点,B 和 C 为普通节点。
为避免该问题,需要多配置种子节点,并保证所有种子节点不同时宕机。