1. 异步任务管理器示例

Cargo.toml:

src/main.rs:


2. tokio::select! 介绍

在多个并发分支上等待,当第一个分支完成时返回,取消剩余分支。

必须在 async 函数、闭包、块的内部使用 select! 宏。

select! 宏接受具有如下模式的一或多个分支:

另外,select! 宏可以包含单个可选的 else 分支,如果其它分支都不匹配它们的模式,那么计算该分支:

该宏聚合所有 <async expression> 表达式,然后在当前任务中,并发地运行它们。一旦第一个表达式完成,并且其值匹配它的 <pattern>,select! 宏返回计算已完成分支的 <handler> 表达式的结果。

另外,每个分支可以包含可选的 if 前置条件。如果前置条件返回 false,那么禁用分支。仍然计算 <async expression>,但是不轮询结果 Future。当在循环中使用 select! 时,该功能很有用。

select! 表达式的完整生命周期如下所示:

  1. 计算所有 <precondition> 表达式。如果前置条件返回 false,那么在当前 select! 调用中,禁用该分支。由于循环重新进入 select! 将清除“禁用”状态。
  2. 聚合每个分支的 <async expression>,包括禁用的分支。如果分支被禁用,仍然计算 <async expression>,但是不轮询结果 Future。
  3. 同时等待所有剩余的 <async expression> 的结果。
  4. 一旦某个 <async expression> 返回值,那么尝试将该值应用到提供的 <pattern>,如果模式匹配,那么计算 <handler>,并返回。如果模式匹配,那么在当前 select! 调用中,禁用当前分支。从步骤 3 继续。
  5. 如果所有分支都被禁用,那么计算 else 表达式。如果未提供 else 分支,那么 Panic。

2.1. 运行时特点

通过在当前任务上运行所有异步表达式的方式,能够并发地,而非并行地运行表达式。这意味着在相同线程上运行所有表达式,如果一个分支阻塞线程,那么所有其它表达式都将不能继续。如果需要并行,那么使用 tokio::spawn 启动每个异步表达式,然后将 JoinHandle 传递给 select!。

2.2. 公平性

默认情况下,select! 首先随机地选择分支检查。当在循环中调用 select!,并且拥有已经就绪的分支时,这样提供一定程度的公平性。

通过将 biased; 添加到宏使用的开头的方式,重写该行为。查看下面的示例,获取细节。这将导致 select 按照 Future 从上到下出现的顺序轮询它们。这样做的原因包括:

但是使用 biased 模式时,有一个重要的注意事项。你需要确保 Future 的轮询顺序是公平的。比如,如果在流和关闭 Future 之间 select,而该流拥有大量消息,并且它们之间的时间几乎为 0,那么应该将关闭 Future 放到 select! 列表的前面,以确保它始终被轮询,不会因为流不断地准备就绪而被忽略。

2.3. Panic

如果所有分支都被禁用,并且未提供 else 分支,那么 select! 宏将 Panic。当提供的 if 前置条件返回 false ,或者模式不匹配 <async expression> 的结果时,分支被禁用。

2.4. 取消安全性(Cancellation safety)

当使用 select! 宏循环接收来自多个源的消息时,应该确保接收调用是取消安全的,以避免丢失消息。本节将介绍各种常见的方法,并且描述它们是否具备取消安全性。这里列出的列表并不全面,仅供参考。

如下方法是取消安全的:

以下方法不具备取消安全性,可能导致数据丢失:

以下方法不具备取消安全性,因为它们使用队列保证公平性,而取消操作将使你在队列中丢失位置:

为确定自己的方法是否具备取消安全性,请查看使用 .await 的位置。这是因为当异步方法被取消时,始终发生在 .await。如果你的函数在等待 .await 时,即使被重启,也能正确运行,那么它是取消安全的。

可以用如下方式定义取消安全性:如果你有一个尚未完成的 Future,那么删除并重新创建该 Future 必须是空操作。该定义基于在循环中使用 select! 的情况。如无此保证,当另一个分支完成,并且通过绕过循环的方式,重启 select!,那么将失去进度。

注意,取消不具备取消安全性的操作不一定是错误的。比如,如果取消任务是因为应用程序正在关闭,那么可能不关心部分读取数据丢失。

2.5. 示例

带两个分支的基础 select:

基础 Stream select:

收集两个流的内容。在该示例中,我们依赖模式匹配,以及 stream::iter 是 "fused" 的事实,即一旦流完成,所有对 next() 的调用返回 None:

通过传递 Future 引用的方式,在多个 select! 表达式中使用相同的 Future。这样做需要 Future 是 Unpin 的。通过使用 Box::pin 或栈固定(stack pinning)的方式,使 Future 变成 Unpin 的。

在下面的例子中,流最多被消费 1 秒钟的时间:

使用 select! 连接两个值:

使用 biased; 模式控制轮询顺序:

2.6. 避免不雅的 if 前置条件

考虑到 if 前置条件用于禁用 select! 分支,必须谨慎使用,以避免丢失值。

比如,下面是带有 if 的 sleep 的不正确用法。目标是重复地运行异步任务,最多运行 50 毫秒。然而,存在错过 sleep 完成的可能性。

在上面的示例中,即使 sleep.poll() 从未返回 Ready,sleep.is_elapsed() 可能返回 true。这将引起潜在的竞态条件,即在 while !sleep.is_elapsed() 检查和调用 select! 之间,sleep 到期,将导致 some_async_work() 调用在睡眠时间已过期的情况下仍然运行,而不被中断。

以下是一种避免竞态条件的重写示例: