senders Sender T new(): Vec::<flume:: < >>:: ,
}
}
/// 订阅广播队列,获取 Receiver 对象
subscribe( ) Receiver T pub fn &mut self -> < > {
(tx rx) T ()let , = flume::unbounded::< > ;
senders push(tx)self. . ;
Receiver(rx)
}
/// 从 senders 中删除已经没有 Receiver 的 Sender。should_be_deleted_senders 中的索引必须是升序的。
delete_senders( should_be_deleted_senders ) fn &mut self, : Vec<usize> {
shift let mut = 0usize;
idx should_be_deleted_senders into_iter() for in . {
senders remove(idx shift)self. . - ;
// 每删除一个元素,后面的元素向前移动一个位置
shift += 1;
}
}
/// 同步地广播消息
broadcast( data T) pub fn &mut self, : {
should_be_deleted_senders new()let mut = Vec:: ;
(idx tx) senders iter() enumerate() for , in self. . . {
// 发送失败表明相应的 Sender 已经没有 Receiver,需要将这样的 Sender 清理掉
(_) tx send(data clone()) if let Err = . . {
should_be_deleted_senders push(idx). ;
}
}
delete_senders(should_be_deleted_senders)self.
}
/// 异步地广播消息
broadcast_async( data T) pub async fn &mut self, : {
should_be_deleted_senders new()let mut = Vec:: ;
(idx tx) senders iter() enumerate() for , in self. . . {
// 发送失败表明相应的 Sender 已经没有 Receiver,需要将这样的 Sender 清理掉
(_) tx send_async(data clone()) if let Err = . . .await {
should_be_deleted_senders push(idx). ;
}
}
delete_senders(should_be_deleted_senders)self.
}
/// 尝试广播消息
try_broadcast( data T) pub fn &mut self, : {
should_be_deleted_senders new()let mut = Vec:: ;
(idx tx) senders iter() enumerate() for , in self. . . {
// 因为使用的是无界队列,所以发送失败表明相应的 Sender 已经没有 Receiver,需要将这样的 Sender 清理掉
(_) tx try_send(data clone()) if let Err = . . {
should_be_deleted_senders push(idx). ;
}
}
delete_senders(should_be_deleted_senders)self.
}
/// 获取 Subscriber 的数量
get_subscriber_count( ) pub fn &self -> usize {
senders len()self. .
}
}
2.3. src/lib.rs