并发模型定义如何在系统中管理和协调多个并发单元(进程、线程、协程),以实现有效的资源利用、避免冲突,并且提高系统性能。下文将介绍常见的并发模型:
Go 语言提供多种同步原语,用于控制并发程序中的执行顺序、协调多个 goroutine 之间的操作,以及管理对共享资源的访问。
import "sync"
var mu sync.Mutex
mu.Lock()
defer mu.Unlock()
允许多个 goroutine 同时读取,但在写入时需要独占访问。
import "sync"
var rwmu sync.RWMutex
rwmu.RLock()
defer rwmu.RUnlock()
用于在多个 goroutine 之间同步条件,可以与互斥锁一起使用。
cond := sync.NewCond(&sync.Mutex{})
cond.L.Lock()
defer cond.L.Unlock()
cond.Wait() // 等待通知或广播
用于等待一组 goroutine 完成。
import "sync"
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// 执行任务
}()
wg.Wait() // 等待所有任务完成
保只执行一次操作,适用于初始化或单例模式。
import "sync"
var once sync.Once
once.Do(func() {
// 执行初始化
})
sync/atomic
包提供一组原子操作,用于在多个 goroutine 之间安全地操作共享变量。
import "sync/atomic"
var counter int32
atomic.AddInt32(&counter, 1)
原子操作基于底层的 CPU 原子指令实现。主要用于无锁编程,提供对单个变量的原子操作,保证操作的原子性,避免锁开销。
同步原语通常基于锁或其他协调机制实现。主要用于协调多个 goroutine 的执行顺序,确保对共享资源的安全访问。使用锁可能会引入线程调度和上下文切换的开销,有时还可能导致死锁或性能瓶颈。
Rob Pike:
Don't communicate by sharing memory, share memory by communicating.
在 CSP 模型中,并发实体之间通过共享管道(Channel)进行通信。Go 语言中实现 CSP 理论的 Process 和 Channel:
package main
import (
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
stopChan := make(chan bool, 1)
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(no int) {
defer wg.Done()
ticker := time.NewTicker(time.Second)
for {
select {
case <-stopChan:
log.Printf("goroutine %d exited\n", no)
return
case <-ticker.C:
log.Printf("goroutine %d running\n", no)
}
}
}(i)
}
log.Println("waiting for stop signal")
<-c
log.Println("stopping")
close(stopChan)
wg.Wait()
log.Println("exited")
}
Actor 模型的基础是消息传递,Actor 模型是基本计算单元,它能接收消息,基于消息执行运算,也可以向其它 Actor 模型发送消息。各个 Actor 模型之间相互隔离,不共享内存。
Actor 模型封装状态和行为,在进行并发编程时,Actor 模型只需要关注消息和其本身,并且消息是不可变对象。
Actor 模型由状态(State)、行为(Behavior)和邮箱(MailBox,可以认为是消息队列)三部分组成:
// 关于中介者模式,请参考:http://timd.cn/design-pattern/mediator/;
// 关于模板方法模式,请参考:http://timd.cn/design-pattern/template-method/
package main
import (
"fmt"
"log"
"math/rand/v2"
"sync"
"time"
)
type Registry struct {
actors map[string][]Actor
}
func (r *Registry) Start() {
for _, actors := range r.actors {
for _, actor := range actors {
go actor.Start()
}
}
}
func (r *Registry) AddActor(a Actor) {
if _, found := r.actors[a.GetName()]; found {
r.actors[a.GetName()] = append(r.actors[a.GetName()], a)
} else {
r.actors[a.GetName()] = []Actor{a}
}
}
func (r *Registry) GetActors(name string) []Actor {
if actors, found := r.actors[name]; found {
return actors
}
return nil
}
func (r *Registry) SendMessage(to string, index int, message MessageType, waitFor time.Duration) bool {
if actors, found := r.actors[to]; found {
if index < 0 {
index = rand.Int() % len(actors)
}
return actors[index].Receive(message, waitFor)
} else {
return false
}
}
func (r *Registry) Stop() {
for _, actors := range r.actors {
for _, actor := range actors {
actor.Stop()
}
}
}
func NewRegistry() *Registry {
return &Registry{actors: map[string][]Actor{}}
}
// MessageType 表示消息类型
type MessageType = any
// Actor 表示 Actor
type Actor interface {
// Start 启动 Actor
Start()
// Receive 接收消息
Receive(MessageType, time.Duration) bool
// Stop 停止 Actor
Stop()
// GetName 获取 Actor 的名字
GetName() string
}
// BaseActor 是所有 Actor 的基类
type BaseActor struct {
mailBox chan MessageType
stopCh chan struct{}
closeCallback func()
processMessageFunc func(MessageType, *Registry)
getNameFunc func() string
registry *Registry
}
func (b *BaseActor) Start() {
log.Printf("%s started", b.GetName())
defer b.closeCallback()
for {
select {
case message, ok := <-b.mailBox:
if !ok {
goto END
}
b.processMessageFunc(message, b.registry)
default:
select {
case message, ok := <-b.mailBox:
if !ok {
goto END
}
b.processMessageFunc(message, b.registry)
case <-b.stopCh:
goto END
}
}
}
END:
log.Printf("%s stopped\n", b.GetName())
}
func (b *BaseActor) Receive(message MessageType, waitFor time.Duration) bool {
select {
case b.mailBox <- message:
return true
default:
select {
case b.mailBox <- message:
return true
case <-time.After(waitFor):
return false
}
}
}
func (b *BaseActor) Stop() {
close(b.stopCh)
}
func (b *BaseActor) GetName() string {
return b.getNameFunc()
}
func NewBaseActor(mailBoxSize int, closeCallback func(),
processMessageFunc func(MessageType, *Registry), getNameFunc func() string,
registry *Registry) *BaseActor {
b := &BaseActor{
mailBox: make(chan MessageType, mailBoxSize),
stopCh: make(chan struct{}),
closeCallback: closeCallback,
processMessageFunc: processMessageFunc,
getNameFunc: getNameFunc,
registry: registry,
}
registry.AddActor(b)
return b
}
type Assembler struct {
*BaseActor
no int
}
func (a *Assembler) processMessage(message MessageType, registry *Registry) {
log.Printf("%s[%d] is processing message: %s", a.GetName(), a.no, message)
sent := registry.SendMessage("sinker", -1, message, time.Second)
log.Printf("%s[%d] sent message[%s] to sinker: %v", a.GetName(), a.no, message, sent)
}
func NewAssembler(mailBoxSize int, closeCallback func(), registry *Registry, no int) *Assembler {
a := &Assembler{no: no}
a.BaseActor = NewBaseActor(mailBoxSize, closeCallback, a.processMessage, func() string {
return "assembler"
}, registry)
return a
}
type Sinker struct {
*BaseActor
no int
}
func (s *Sinker) processMessage(message MessageType, _ *Registry) {
log.Printf("%s[%d] is processing message: %s", s.GetName(), s.no, message)
}
func NewSinker(mailBoxSize int, closeCallback func(), registry *Registry, no int) *Sinker {
s := &Sinker{no: no}
s.BaseActor = NewBaseActor(mailBoxSize, closeCallback, s.processMessage, func() string {
return "sinker"
}, registry)
return s
}
func main() {
var wg sync.WaitGroup
registry := NewRegistry()
var assemblers []*Assembler
for i := 1; i <= 3; i++ {
wg.Add(1)
assemblers = append(assemblers, NewAssembler(100, wg.Done, registry, i))
}
for i := 1; i <= 3; i++ {
wg.Add(1)
NewSinker(100, wg.Done, registry, i)
}
registry.Start()
for i := 1; i <= 100; i++ {
assemblers[i%len(assemblers)].Receive(fmt.Sprintf("message-%d", i), time.Second)
}
time.Sleep(time.Second)
registry.Stop()
wg.Wait()
}