1. 并发模型是什么?

并发模型定义如何在系统中管理和协调多个并发单元(进程、线程、协程),以实现有效的资源利用、避免冲突,并且提高系统性能。下文将介绍常见的并发模型:


2. 共享内存

2.1. 同步原语

Go 语言提供多种同步原语,用于控制并发程序中的执行顺序、协调多个 goroutine 之间的操作,以及管理对共享资源的访问。

2.1.1. 互斥锁(Mutex)

import "sync"

var mu sync.Mutex
mu.Lock()
defer mu.Unlock()

2.1.2. 读写锁(RWMutex)

允许多个 goroutine 同时读取,但在写入时需要独占访问。

import "sync"

var rwmu sync.RWMutex
rwmu.RLock()
defer rwmu.RUnlock()

2.1.3. 条件变量(Cond)

用于在多个 goroutine 之间同步条件,可以与互斥锁一起使用。

cond := sync.NewCond(&sync.Mutex{})
cond.L.Lock()
defer cond.L.Unlock()
cond.Wait() // 等待通知或广播

2.1.4. 等待组(WaitGroup)

用于等待一组 goroutine 完成。

import "sync"

var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    // 执行任务
}()
wg.Wait() // 等待所有任务完成

2.1.5. Once

保只执行一次操作,适用于初始化或单例模式。

import "sync"

var once sync.Once
once.Do(func() {
    // 执行初始化
})

2.2. 原子操作

sync/atomic 包提供一组原子操作,用于在多个 goroutine 之间安全地操作共享变量。

import "sync/atomic"

var counter int32
atomic.AddInt32(&counter, 1)

2.3. 同步原语和原子操作的区别

原子操作基于底层的 CPU 原子指令实现。主要用于无锁编程,提供对单个变量的原子操作,保证操作的原子性,避免锁开销。

同步原语通常基于锁或其他协调机制实现。主要用于协调多个 goroutine 的执行顺序,确保对共享资源的安全访问。使用锁可能会引入线程调度和上下文切换的开销,有时还可能导致死锁或性能瓶颈。


3. CSP 模型

Rob Pike:

Don't communicate by sharing memory, share memory by communicating.

在 CSP 模型中,并发实体之间通过共享管道(Channel)进行通信。Go 语言中实现 CSP 理论的 Process 和 Channel:

3.1. 示例 - 捕获信号

package main

import (
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
    stopChan := make(chan bool, 1)
    var wg sync.WaitGroup
    for i := 1; i <= 3; i++ {
       wg.Add(1)
       go func(no int) {
          defer wg.Done()
          ticker := time.NewTicker(time.Second)
          for {
             select {
             case <-stopChan:
                log.Printf("goroutine %d exited\n", no)
                return
             case <-ticker.C:
                log.Printf("goroutine %d running\n", no)
             }
          }
       }(i)
    }
    log.Println("waiting for stop signal")
    <-c
    log.Println("stopping")
    close(stopChan)
    wg.Wait()
    log.Println("exited")
}

4. Actor 模型

Actor 模型的基础是消息传递,Actor 模型是基本计算单元,它能接收消息,基于消息执行运算,也可以向其它 Actor 模型发送消息。各个 Actor 模型之间相互隔离,不共享内存。

Actor 模型封装状态和行为,在进行并发编程时,Actor 模型只需要关注消息和其本身,并且消息是不可变对象。

Actor 模型由状态(State)、行为(Behavior)和邮箱(MailBox,可以认为是消息队列)三部分组成:

actor.png

4.1. 示例

// 关于中介者模式,请参考:http://timd.cn/design-pattern/mediator/;
// 关于模板方法模式,请参考:http://timd.cn/design-pattern/template-method/

package main

import (
    "fmt"
    "log"
    "math/rand/v2"
    "sync"
    "time"
)

type Registry struct {
    actors map[string][]Actor
}

func (r *Registry) Start() {
    for _, actors := range r.actors {
        for _, actor := range actors {
            go actor.Start()
        }
    }
}

func (r *Registry) AddActor(a Actor) {
    if _, found := r.actors[a.GetName()]; found {
        r.actors[a.GetName()] = append(r.actors[a.GetName()], a)
    } else {
        r.actors[a.GetName()] = []Actor{a}
    }
}

func (r *Registry) GetActors(name string) []Actor {
    if actors, found := r.actors[name]; found {
        return actors
    }
    return nil
}

func (r *Registry) SendMessage(to string, index int, message MessageType, waitFor time.Duration) bool {
    if actors, found := r.actors[to]; found {
        if index < 0 {
            index = rand.Int() % len(actors)
        }
        return actors[index].Receive(message, waitFor)
    } else {
        return false
    }
}

func (r *Registry) Stop() {
    for _, actors := range r.actors {
        for _, actor := range actors {
            actor.Stop()
        }
    }
}

func NewRegistry() *Registry {
    return &Registry{actors: map[string][]Actor{}}
}

// MessageType 表示消息类型
type MessageType = any

// Actor 表示 Actor
type Actor interface {
    // Start 启动 Actor
    Start()
    // Receive 接收消息
    Receive(MessageType, time.Duration) bool
    // Stop 停止 Actor
    Stop()
    // GetName 获取 Actor 的名字
    GetName() string
}

// BaseActor 是所有 Actor 的基类
type BaseActor struct {
    mailBox            chan MessageType
    stopCh             chan struct{}
    closeCallback      func()
    processMessageFunc func(MessageType, *Registry)
    getNameFunc        func() string
    registry           *Registry
}

func (b *BaseActor) Start() {
    log.Printf("%s started", b.GetName())
    defer b.closeCallback()
    for {
        select {
        case message, ok := <-b.mailBox:
            if !ok {
                goto END
            }
            b.processMessageFunc(message, b.registry)
        default:
            select {
            case message, ok := <-b.mailBox:
                if !ok {
                    goto END
                }
                b.processMessageFunc(message, b.registry)
            case <-b.stopCh:
                goto END
            }
        }
    }

END:
    log.Printf("%s stopped\n", b.GetName())
}

func (b *BaseActor) Receive(message MessageType, waitFor time.Duration) bool {
    select {
    case b.mailBox <- message:
        return true
    default:
        select {
        case b.mailBox <- message:
            return true
        case <-time.After(waitFor):
            return false
        }
    }
}

func (b *BaseActor) Stop() {
    close(b.stopCh)
}

func (b *BaseActor) GetName() string {
    return b.getNameFunc()
}

func NewBaseActor(mailBoxSize int, closeCallback func(),
    processMessageFunc func(MessageType, *Registry), getNameFunc func() string,
    registry *Registry) *BaseActor {
    b := &BaseActor{
        mailBox:            make(chan MessageType, mailBoxSize),
        stopCh:             make(chan struct{}),
        closeCallback:      closeCallback,
        processMessageFunc: processMessageFunc,
        getNameFunc:        getNameFunc,
        registry:           registry,
    }
    registry.AddActor(b)
    return b
}

type Assembler struct {
    *BaseActor
    no int
}

func (a *Assembler) processMessage(message MessageType, registry *Registry) {
    log.Printf("%s[%d] is processing message: %s", a.GetName(), a.no, message)
    sent := registry.SendMessage("sinker", -1, message, time.Second)
    log.Printf("%s[%d] sent message[%s] to sinker: %v", a.GetName(), a.no, message, sent)
}

func NewAssembler(mailBoxSize int, closeCallback func(), registry *Registry, no int) *Assembler {
    a := &Assembler{no: no}
    a.BaseActor = NewBaseActor(mailBoxSize, closeCallback, a.processMessage, func() string {
        return "assembler"
    }, registry)
    return a
}

type Sinker struct {
    *BaseActor
    no int
}

func (s *Sinker) processMessage(message MessageType, _ *Registry) {
    log.Printf("%s[%d] is processing message: %s", s.GetName(), s.no, message)
}

func NewSinker(mailBoxSize int, closeCallback func(), registry *Registry, no int) *Sinker {
    s := &Sinker{no: no}
    s.BaseActor = NewBaseActor(mailBoxSize, closeCallback, s.processMessage, func() string {
        return "sinker"
    }, registry)
    return s
}

func main() {
    var wg sync.WaitGroup
    registry := NewRegistry()

    var assemblers []*Assembler
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        assemblers = append(assemblers, NewAssembler(100, wg.Done, registry, i))
    }
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        NewSinker(100, wg.Done, registry, i)
    }

    registry.Start()

    for i := 1; i <= 100; i++ {
        assemblers[i%len(assemblers)].Receive(fmt.Sprintf("message-%d", i), time.Second)
    }

    time.Sleep(time.Second)
    registry.Stop()
    wg.Wait()
}