点此去 Tim 的博客首页浏览更多文章
1. 广播队列
在广播队列(Broadcast Queue)中,发送者发送的每个消息将被所有接收者接收。而在常见的 MPMC(Multi-Produce
Multi-Consumer)管道中,发送者发送的每个消息只能被多个消费者中的一个接收。
下面是一个使用广播队列的场景。假如在主协程中,对于每个任务,都启动一个子协程运行它,当任务结束时,子协程也随
之结束,即采用 one-task-per-coroutine 的并发模式。此时,主协程如何通知子协程结束呢?在 Go 语言中,通常将
Context 作为函数或方法的第一个参数,当 context.Done() 方法返回的 Channel 关闭时,表明函数或方法应该退出。类
似地,可以创建一个广播队列,主协程持有发送端,每个子协程持有一个接收端。当主协程希望子协程结束时,向广播队列
发送一条消息,则所有子协程都能收到该结束消息。
2. Rust 实现
下面是基于 flume 实现的 Broadcast Queue。得益于 flume,它既支持 sync 模式,也支持 async 模式。
2.1. Cargo.toml
[package]
name = "broadcast-queue"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
flume = "0.10.14"
tokio = { version = "1.25.0", features = ["full"] }
2.2. src/broadcast_queue.rs
/// 使用 new type 模式,将 Flume Receiver 封装进自定义的 Receiver
Receiver T ( Receiver T )pub struct < : Send + Clone> flume:: < > ;
/// 为 Receiver 实现 Deref<Target=flume::Receiver<T>>,以直接使用 flume::Receiver 的方法
T Receiver T impl< : Send + Clone> std::ops::Deref for < > {
Target Receiver Ttype = flume:: < >;
deref( ) Target fn &self -> &Self:: {
&self.0
}
}
/// 广播队列
BroadcastQueue T pub struct < : Clone + Send> {
/// 内部持有所有 Sender
senders Sender T: Vec<flume:: < >>,
}
T BroadcastQueue T impl< : Send + Clone> < > {
/// 创建广播队列的实例
new() pub fn -> Self {
Self {
senders Sender T new(): Vec::<flume:: < >>:: ,
}
}
/// 订阅广播队列,获取 Receiver 对象
subscribe( ) Receiver T pub fn &mut self -> < > {
(tx rx) T ()let , = flume::unbounded::< > ;
senders push(tx)self. . ;
Receiver(rx)
}
/// 从 senders 中删除已经没有 Receiver 的 Sender。should_be_deleted_senders 中的索引必须是升序的。
delete_senders( should_be_deleted_senders ) fn &mut self, : Vec<usize> {
shift let mut = 0usize;
idx should_be_deleted_senders into_iter() for in . {
senders remove(idx shift)self. . - ;
// 每删除一个元素,后面的元素向前移动一个位置
shift += 1;
}
}
/// 同步地广播消息
broadcast( data T) pub fn &mut self, : {
should_be_deleted_senders new()let mut = Vec:: ;
(idx tx) senders iter() enumerate() for , in self. . . {
// 发送失败表明相应的 Sender 已经没有 Receiver,需要将这样的 Sender 清理掉
(_) tx send(data clone()) if let Err = . . {
should_be_deleted_senders push(idx). ;
}
}
delete_senders(should_be_deleted_senders)self.
}
/// 异步地广播消息
broadcast_async( data T) pub async fn &mut self, : {
should_be_deleted_senders new()let mut = Vec:: ;
(idx tx) senders iter() enumerate() for , in self. . . {
// 发送失败表明相应的 Sender 已经没有 Receiver,需要将这样的 Sender 清理掉
(_) tx send_async(data clone()) if let Err = . . .await {
should_be_deleted_senders push(idx). ;
}
}
delete_senders(should_be_deleted_senders)self.
}
/// 尝试广播消息
try_broadcast( data T) pub fn &mut self, : {
should_be_deleted_senders new()let mut = Vec:: ;
(idx tx) senders iter() enumerate() for , in self. . . {
// 因为使用的是无界队列,所以发送失败表明相应的 Sender 已经没有 Receiver,需要将这样的 Sender 清理掉
(_) tx try_send(data clone()) if let Err = . . {
should_be_deleted_senders push(idx). ;
}
}
delete_senders(should_be_deleted_senders)self.
}
/// 获取 Subscriber 的数量
get_subscriber_count( ) pub fn &self -> usize {
senders len()self. .
}
}
2.3. src/lib.rs
broadcast_queuepub mod ;
2.4. src/main.rs
BroadcastQueueuse broadcast_queue::broadcast_queue:: ;
main#[tokio:: ]
main() async fn {
b () new()let mut = BroadcastQueue::< >:: ;
rx1 b subscribe()let = . ;
rx2 b subscribe()let = . ;
rx3 b subscribe()let = . ;
( b get_subscriber_count())println! "current number of subscribers is {}", . ;
handle spawn( let = tokio:: async move {
( )println! "enter handle" ;
tokio::select! {
ret rx1 recv_async() = . => {
(_) ret if let Err = {
( )println! "there are no senders associated to current receiver" ;
} else {
( )println! "rx1 received a message"
}
},
// 只等待 1 秒
_ sleep( from_secs( )) = tokio::time:: tokio::time::Duration:: 4 => {
}
}
( )println! "exit handle" ;
)} ;
( )println! "begin to sleep" ;
sleep( from_secs( ))tokio::time:: tokio::time::Duration:: 3 .await;
( )println! "sleep ends" ;
b broadcast_async(()). .await;
( )println! "broadcast ends" ;
drop(rx2);
b broadcast(()). ;
sleep( from_secs( ))tokio::time:: tokio::time::Duration:: 1 .await;
( b get_subscriber_count())println! "current number of subscribers is {}", . ;
(()) rx3 try_recv() while let Ok = . {
( )println! "rx3 received a message" ;
}
handle unwrap().await. ;
}