浅析 Rust 异步运行时 tokio
现在!运行!任务!
什么是 Task
我们使用 task 来定义一个用来运行的异步任务。
它是一个轻量的异步执行单位。某种程度上类似于 OS 提供的线程,但是有如下区别:
- 由运行时调度。Task 并不是由 OS 的调度器管理的,而是由 tokio 运行时自行管理的,我们将其称为绿色线程。类似于 Go 的协程、Kotlin 的协程和 Erlang 的进程。不过,不同的绿色线程的实现上有很大的不同,比如 tokio 的 task 没有独立的调用栈,而是在编译期编译为状态机,我们将其称为无栈协程。
- 更充分地利用计算资源。Task 不会一直使用 CPU,在不需要计算资源的时候它需要主动让出。这一点上和线程是一致的,即线程在执行 I/O 操作的时候也会让出 CPU,这个过程中,我们会执行上下文切换,而 OS 会选择另外一个任务执行(不一定是当前进程的另一个线程)。不过,task 相比线程而言,没有昂贵的 OS 线程上下文切换;task 无法继续推进时,运行时可以调度另一个 task 继续执行。对于 I/O 而言,tokio 会利用非堵塞 I/O 和 reactor/driver 的方式,或者委托给 blocking 线程的方式来避免阻塞工作线程。
我们可以使用三个关键字来描述 task,分别是:
- 轻量。Task 是轻量的。因为由 tokio runtime 而非 OS 管理,所以创建 task 或者切换 task 并不需要像 OS 线程切换时那样的昂贵上下文切换,并且这些操作完全在用户态闭环完成了,因此 overhead 相对小很多。
- Tokio 的 task 占用内存大小非常小。它占用的内存主要来自 future 生成的状态机,以及一些 tokio 相关的轻量元信息和状态。编译器能够提前确定 future 状态机的大小,因此不需要像 OS 线程那样,为每个执行单位预留一块较大的线程栈(一个典型值是 2 MiB)。
- 切换 task 本质上可以简单地视为更新调度器的队列,同时拿到下一个待运行的 task 并 poll 它。当然,实现上会稍微更复杂一些,但是这相比 OS 线程的上下文切换的 overhead 显得非常廉价。
- 协作调度。Task 是协作调度的。不同于抢占式调度,协作式调度会持续运行直到它无法继续推进并把控制权交还给运行时。常见位置是
.await:如果被 await 的 future 立即返回Ready,当前 task 会继续执行;如果返回Pending,当前 task 会被挂起,等待之后被唤醒。我们可以主动调用yield_now,或者使用 tokio 提供的定时器、I/O 函数,从而在相应时机把执行权交还给运行时,使得运行时可以切换到另一个 task 继续运行。切换过程中通常不需要 OS 线程上下文切换。- 协作调度的优点是基本上没有调度相关的开销(虽然还是有一点成本),缺点是使用者需要有较大的心智负担,即我们需要保证 task 是非阻塞的。
- 非阻塞。Task 必须是非阻塞的。当 task 无法继续运行的时候,它必须让出线程,使得 tokio runtime 能够运行另外的 task。Task 不应该直接执行可能长期阻塞线程的同步系统调用或其他阻塞操作。因此,tokio 提供了多种 APIs 来避免阻塞。
- 我们必须识别出哪些地方会阻塞异步任务、必须使用异步 API 来代替标准库提供的同步 API,等等。
- 例如,如果 task 会长时间运行,那么必须使用
spawn_blocking这种方法来避免阻塞工作线程,或者在需要的时候主动 yield 让出。 - 例如,我们需要运行 tokio 提供的
sleep函数来代替标准库的sleep函数。使用 tokio 的sleep函数,task 会让出工作线程,并让 runtime 在时间到了的时候唤醒自己。
举例来说,我们有代码如下(运行在 tokio 的异步运行时下的一个 task 中):
println!("1");
tokio::spawn(async {
println!("2");
});
这个 task 的逻辑为:
- 打印一个
"1\n"到标准输出上。 - Spawn 一个新的 task。在新的 task 中我们打印一个
"2\n"到标准输出上。
我们称前 task 为 task A,之后新生成的 task 为 task B,那么在 task A 去 spawn task B 的时候,tokio 就同时托管了这两个 tasks。Tokio 会 poll 这些 task,将 runnable 的 task 放置到可用的 worker 上推进。只要运行时仍然存活且 task 没有被取消,它们就有机会继续运行;但 tokio 并不保证每个已经 spawn 的 task 都一定自然执行到完成,例如运行时关闭时,尚未完成的 task 会被取消。
值得注意的是,通过 tokio::spawn 创建的普通 task 必须实现 Send trait。这是因为一个 task 在其整个生命周期,即从创建开始,到执行完毕或被取消的这一段时间内,可能在不止一个线程中被执行。即,在 task 的生命周期中,task 可以从一个线程中发送到另外一个线程中然后继续执行。
Task 能够在不同线程上执行的这个性质比较重要,这使得 worker 线程池能够均衡负载,不至于 “一核有难,七核围观”。不过,如果某个 task 已经把当前线程阻塞住了,运行时无法抢占这个正在执行的 task,从而会导致能处理 task 的线程减少,则会导致负载失衡。因此,避免在 async task 中执行阻塞操作仍然很重要。
后文的 “内部原理” 进一步探讨了 task 内部实现。
Task 的生命周期
一个 task 的生命周期分为如下几个阶段:
- 创建。通过
tokio::spawn、spawn_local、spawn_blocking等 API 创建。 - 运行。task 会被 runtime 调度和 poll;当被 poll 的 future 返回
Pending,或者 task 显式 yield 时,它会暂时让出执行权,等待之后被唤醒再继续推进。 - 结束。要么 future 正常返回,要么 panic 被 tokio 捕获,要么被外部取消。
本节我们讨论 task 的整个生命周期,从创建直到结束。
使用 Spawn 创建 Task
如前,我们可以通过 tokio::spawn 来添加一个异步任务。类似于 thread::spawn 一样。不同的是,后者会接受一个闭包并创建一个 OS 线程,而前者接受一个 future(即 async 块求值后得到的值),并使用这个 future 创建一个 task。
值得注意的是,如果我们当前环境中没有 tokio 上下文,那么如果调用 tokio::spawn,则会直接导致 panic(在前文 “Runtime” 中我们已经讨论了 tokio 的运行时上下文,这里不再赘述)。
当我们使用 tokio::spawn 的时候,它会返回一个 JoinHandle 结构体,就像 thread::spawn 会返回一个 handle 一样。例如我们可以:
let join_handle = tokio::spawn(async {
// Some code here...
});
这个由 future 构建得到的 task 在 spawn 返回后就已经交给 runtime 运行了。不过,我们能通过 await 这个 JoinHandle 来等待这个 task 完成。例如:
let join_handle = tokio::spawn(async {
print!("Hello ");
});
let join_handle_res = join_handle.await;
println!("World");
这个例子中,这里我们会等待 join_handle 对应的 task 完成(此时会输出 "Hello "),然后再输出 "World"。
这里的 join_handle_res 的类型为 Result<T, JoinError>。Task 正常执行完成后我们能得到 Ok(T),其他情况会得到 Err(JoinError)。我们会在后文进一步讨论 JoinError。
和 thread::JoinHandle 一样,我们可以直接 drop tokio::spawn 返回的 JoinHandle,此时 task 不会因此而停止,而是会继续在后台由运行时调度并运行。只是我们失去了等待它结束和获取返回值的能力。
此外,我们还有 task::spawn_local 和 task::spawn_blocking 等多种方式来构建 task,同样地,我们会在后文进一步讨论。
约束:Send 和 'static
在前面我们讨论过,在 tokio 中,task 一般是不会和某一个线程进行绑定(虽然在内部实现中,运行时会尽量让 task 始终由一个线程运行,从而避免跨线程调度导致的开销和缓存丢失带来的性能下降)。这使得 task 可能会 move 到不同的线程并继续运行。这一事实也体现在函数签名中。可以看到,函数 tokio::spawn 对 future 的要求是 Send + 'static,对 future 的返回值的要求也是 Send + 'static。
这里的 Send 意味着 task 在一次 .await 之后,可以被 runtime 迁移到另外一个 worker thread 上继续执行。这要求 跨过 .await 仍然存活的值 也必须是 Send,这样整个 future 才能是 Send 的。不过,如果某个 !Send 的值只在一次 poll 内部短暂存在,没有跨过 .await,那么它仍然可以在 task 中被使用。例如:
tokio::spawn(async {
{
let rc = std::rc::Rc::new(());
use_rc(rc.clone());
}
tokio::task::yield_now().await;
});
但是,如果 Rc 这类 !Send 的值跨过了 .await,那么它就无法被 tokio::spawn 创建的 task 持有。
另外一个,我们需要让 future 和其返回值满足 'static 生命周期约束。这不表示这个 future 会 “永久存活”,它只是表示这个 task 不再借用当前栈帧中的局部变量。因为 task 可能在创建它的函数已经返回之后依然继续执行。如果编译器抱怨了,那么可以使用 async move 将所有权移动到 task 内部,或者使用 Arc 等共享所有权的结构。
使用 spawn_local 在当前线程上运行
Warning
注意,根据 tokio 的 #7558 issue,未来我们会预期弃用
LocalSet。目前LocalRuntime已经稳定,如果真的有在当前线程上运行异步任务的需求,则尽量使用LocalRuntime。
如果某个 task 明确只能在当前线程上运行,例如它持有 Rc、RefCell 或者其他 !Send 的值,那么可以使用 task::spawn_local。可以看到它的签名为:
pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
相比于 spawn 而言,它不再要求 F 实现 Send,也不要求其 F::Output 实现 Send。
这类 task 不会在线程之间迁移。不过它必须运行在 LocalSet 或者 LocalRuntime 上,换句话说,它不能运行在 worker thread 中(这很容易理解,worker thread 上的 task 必须能够能被移动到另外一个 worker thread 上,而这和 spawn_local 生成的 task 天然矛盾),即我们不能这样写:
tokio::spawn(async {
// PANIC! Do not do this!
tokio::spawn_local(async {
// ...
})
})
相反,我们必须只能在 block_on 所在的 future 中使用 spawn_local:
#[tokio::main(flavor = "local")]
async fn main() {
let join = tokio::task::spawn_local(async {
println!("my nonsend data...")
});
join.await.unwrap()
}
阻塞和 Yielding
如前,当我们在一个 spawn 中的 task 中运行会阻塞的函数,会导致整个线程都被阻塞,而 tokio 运行时都没办法拿到这个线程然后让其能够运行其他 task。不过我们提供了 task::spawn_blocking 和 task::block_in_place 这两个 API。比如:
let res = task::spawn_blocking(move || {
// ...
}).await?;
其中 spawn_blocking 会在允许阻塞的线程上运行给定的闭包。Tokio 实际上会维护一个专门的阻塞线程池,并使用该线程池的资源来运行这个任务。
而 block_in_place 则提供了一个机会告诉调度器当前任务 即将 阻塞当前线程,这使得调度器能够将其他任务移动到另外的工作线程上。它会直接阻塞当前线程,在行为上,spawn_blocking 会立即返回 JoinHandle,而 block_in_place 则会阻塞直到闭包执行完毕。这个 API 只适用于多线程运行时。
我们还提供了 task::yield_now 函数。当运行后,当前 task 会让出调度,这使得调度器可以调度运行其他任务,这可以避免其他任务饿死。
对于一些复杂的 task 而言,如果里面存在一个会长时间占用工作线程的循环,而且我们不想要使用 spawn_blocking 的时候,我们可以每迭代 n 次,就调用下 yield_now,从而避免其他任务长时间无法被调度。比如我们处理一批消息的时候,或者遍历一个大型结构的时候。如果一次 poll 会处理 个项目,而每个项目耗时 微秒,那么对其他项目而言最坏的情况延迟就是会增加 微秒。如果这个值大到不可接受的时候,我们可以适当添加 yield_now。
执行完毕
当 task 的 future 完成求值之后(即,对其调用 poll 方法返回的是 Ready(T)),这个 task 就结束了。此时我们会:
- 将 task 的返回值放进对应的
JoinHandle<T>中。如果有人持有JoinHandle,那么他可以使用这个 handle 来获取返回值。 - Task 内部的局部变量会被 drop 掉。
- 如果外部在
.await这个JoinHandle,此时我们会唤醒它,而它会从 handle 中得到Ok(T)。
对 tokio 来说,它有三种终点:
- 大部分情况下,task 自己正常运行直到结束。
- Task 被取消,此时可能执行了一半。
- Task 运行过程中 panic 了。
后两个终点我们在下一节进一步讨论。
取消 Task 或者发生 Panic
一个 task 可以被取消掉。
我们可以通过方法 JoinHandle::abort 或者方法 AbortHandle::abort 来取消 task 的运行(AbortHandle 可以使用 JoinHandle::abort_handle 获取,但是它只能用于取消任务)。
当调用了这两个方法的任意一个后,我们会向对应 task 发送信号。这个取消不会同步完成;通常要等 task 再次把控制权交还给 runtime 后,runtime 才能检查到取消状态并结束它。如果这个 task 本身还没来得及运行,那它会尽快取消运行。当我们尝试退出 tokio 运行时的时候(比如从 #[tokio::main] 返回的时候),tokio 运行时上的所有 tasks 都会被取消。
一旦 task 被取消之后,它会停止运行,所有本地变量都会随着 future 被 drop 而被 drop 掉,完成后,JoinHandle 的 .await 表达式会得到一个 Err(JoinError),它即 “cancelled error”,表示 task 被取消了。
不过,abort 只是发出了取消请求,并不保证我们最终一定观察到 “cancelled error”。如果 task 在收到取消信号之后、下一次 yield 之前就已经自然执行完毕,那么 JoinHandle 仍然会返回 Ok(T)。
对于 spawn_blocking 创建出来的 task 来说,abort 通常不会真正中断一个已经开始执行的阻塞闭包。因为它不是 async task,因而无法在 .await 点被取消;只有在它还没开始运行的时候,取消才有机会阻止它启动。
Task 也可能在运行过程中发生 panic。Tokio 会捕获 task 内部的 panic,并在 JoinHandle 的结果中返回 JoinError;这和取消是两种不同的失败情况,我们在后文 “JoinError” 一节中进一步讨论。
Join 一个 Task
JoinHandle
如前所述,当我们 spawn 一个 task 之后,tokio 会返回一个 JoinHandle<T>。它是一个 “等待这个 task 结束并取回结果” 的句柄。
结构体 JoinHandle<T> 本身就是一个 future,因此可以直接 .await:
let handle = tokio::spawn(async { 1 + 2 });
let res = handle.await;
当对应 task 的执行完毕后,handle.await 不再堵塞,其返回值类型是 Result<T, JoinError>。它在类型层面有两类结果,其中错误结果可以继续区分为取消和 panic:
Ok(T)。此时 task 成功执行完毕。Err(JoinError)。此时 task 可能被取消或者发生 panic 而被捕获。
有意思的是,因为 JoinHandle 本身就是一个 future(即实现了 Future trait),所以我们可以像对待其他普通 futures 一样对待它。例如 tokio 提供了宏 join!,使得我们可以同时等待多个 future 执行,我们可以因此:
let handle_first = tokio::spawn(async { 1 + 2 });
let handle_second = tokio::spawn(async { println!("Hello") });
tokio::join!(handle_first, handle_second);
结构体 JoinHandle 提供了如下方法:
abort取消对应 task。
abort_handle创建一个
AbortHandle,我们可以使用它来取消 task。但是没有办法使用它来接收 task 求值后返回的值。is_finished一种不阻塞的方式来检查对应 task 是否完成。
id获取 task 的 ID。
JoinError
一个 task 可能不总是执行完毕。这个时候我们会得到 Err(JoinError)。它可能对应两种其他情况:
- 被取消。这个时候,我们可以使用
is_cancelled方法来进行检查。 - 内部 panic 且被 tokio 捕获。这个时候,我们可以使用
is_panic方法来进行检查。
用户应该自行处理好这两种情况。
Task 的局部变量
在线程中,我们可以使用 TLS(Thread-local Storage)变量。但是 task 本身就可能会运行在不同的线程上,所以 tokio 支持声明 task-local 的变量。
我们使用 task_local! 宏来声明新的 task-local 的变量:
task_local! {
pub static ONE: u32;
#[allow(unused)]
static TWO: f64;
}
作为一个有趣的比较,读者可以对比看看 std::thread_local! 宏。其中,task_local! 宏声明了 task 局部变量,而 thread_local! 宏则声明了 TLS 变量。
这里的 ONE 的类型即 tokio::task::LocalKey<u32>。我们使用 LocalKey 结构体来访问 task 局部变量。
LocalKey
当我们使用 task_local! 的时候,我们会得到 LocalKey。
我们可以使用 LocalKey::scope 方法来为 future 设置 task 局部变量为给定的值,而使用 LocalKey::get 方法来获取 task 局部变量的值。例如:
tokio::task_local! {
static NUMBER: u32;
}
NUMBER.scope(1, async move {
println!("task local value: {}", NUMBER.get())
}).await;
方法 LocalKey::get 会在未设置的时候 panic,此时也可以使用 LocalKey::try_get 来代替。
例子 —— 在 axum 中使用
以下是 [[Rust_crate_axum|axum]] 官方文档中提供的一个例子,这里我们给出了 USER 这个任务局部变量:
#[derive(Clone)]
struct CurrentUser {
name: String,
}
task_local! {
pub static USER: CurrentUser;
}
我们还给出了一个中间件,用于赋值这个 task-local 变量,如下:
async fn auth(req: Request, next: Next) -> Result<Response, StatusCode> {
let auth_header = req
.headers()
.get(header::AUTHORIZATION)
.and_then(|header| header.to_str().ok())
.ok_or(StatusCode::UNAUTHORIZED)?;
if let Some(current_user) = authorize_current_user(auth_header).await {
// State is setup here in the middleware
Ok(USER.scope(current_user, next.run(req)).await)
} else {
Err(StatusCode::UNAUTHORIZED)
}
}
async fn authorize_current_user(auth_token: &str) -> Option<CurrentUser> {
Some(CurrentUser {
name: auth_token.to_string(),
})
}
之后的 axum handler 即可直接访问 USER 来获取中间件解析出来的 CurrentUser 实例。
管理多个 Futures 或 Tasks
使用 select! 管理多个 Futures
我们可能希望通过类似 Go 的 select 来处理多个异步的源中。例如,我们可能有一个 future 正在运行:
let res = future.await;
但是我们也希望如果某个 channel 传递了一个值的时候,或者定时器超时的时候,我们能不再 await。而是直接处理 channel 或者定时器。
这个时候,我们可以使用 select! 宏来管理多个 future 对象。一个例子如下:
tokio::select! {
_ = do_stuff_async() => {
println!("do_stuff_async() completed first")
}
_ = more_async_work() => {
println!("more_async_work() completed first")
}
}
当一个分支完成的时候,我们会及时返回,然后 drop 掉剩下分支中尚未完成的 future。这里的 “drop” 不一定等价于取消一个已经 spawn 出去的 task:如果分支里等待的是 JoinHandle,drop 掉这个 handle 只会 detach 对应 task,task 本身仍然会在后台运行。分支的语法结构为:
<pattern> = <async-expression> (, if <precondition>)? => <handler>,
我们可以这样来给出多个 async 表达式,表示不同的分支。宏 select! 会在当前 task 中并发地运行所有分支。当一个分支完成且返回值能够和 <pattern> 匹配上的时候,这个时候我们会执行 <handler> 对应的逻辑。
分支可能包含一个可选的 if,当其对应的表达式求值为 false 的时候,这个分支会在本次 select! 调用中被禁用。对应的 async expression 仍然会被求值,但得到的 future 不会被 poll。
宏 select! 还可以接受一个 else 分支:
else => <expression>
select! 默认的行为会随机挑选一个分支然后进行检查。这个行为提供了一定的公平性:在 loop 中使用 select! 可能多个分支都有值,如果不随机的话,我们会永远执行第一个。不过我们可以在 select! 块一开始中使用 biased; 语句来规避这个行为,select! 会从上到下顺序地进行 poll。一些使用 biased; 的原因有:
- 随机数生成器具有 CPU 开销。
- polling 顺序比较重要。
不过如果真的使用了 biased; 的话,编写 select! 的工程师就需要保证分支的公平性。
当 select! 的所有分支都被禁用(或者模式没有匹配上 <async-expression>)的话,并且没有 else 分支,那么这个时候就会导致 panic。
使用 join! 管理多个 Futures
我们希望能够等多个 futures 都完成后再继续执行。
当然,一个简单的方式是,如下:
foo_future.await;
bar_future.await;
但是这个的问题也很明显,我们必须等 foo_future 对应逻辑完成后再开始执行 bar_future 对应的逻辑。我们可以使用 join! 来完成一个简单的优化:
let (foo, bar) = tokio::join!(foo_future, bar_future);
这里,我们会同时等待两个 future 执行完毕。
如果我们传入的是一系列返回 Result 的 futures,且我们希望当存在一个 future 求值出 Err 的时候就提前返回,那么我们可以使用 try_join! 宏。
使用 join_all 来管理多个 Futures
在 futures-rs 中的 futures::future::join_all() 函数支持接受一个 future 对象的可迭代对象,并返回一个 future 对象,并通过 .await 本 future 对象来完成 join! 的功能。
其文档给出的一个例子为:
use futures::future::join_all;
async fn foo(i: u32) -> u32 { i }
let futures = vec![foo(1), foo(2), foo(3)];
assert_eq!(join_all(futures).await, [1, 2, 3]);
使用 FuturesUnordered 等来管理多个 Futures
在 futures-rs 中,我们还提供了结构体 FuturesUnordered 和 FuturesOrdered。它们都实现了 Stream,可以一边持有多个尚未完成的 futures,一边通过 next().await 取出已经完成的结果。
一个 FuturesUnordered 的例子如下:
use futures::stream::{FuturesUnordered, StreamExt};
async fn fetch(id: u64) -> String {
tokio::time::sleep(std::time::Duration::from_millis(100 - id * 20)).await;
format!("item-{id}")
}
#[tokio::main]
async fn main() {
let mut futures = FuturesUnordered::new();
for id in 0..3 {
futures.push(fetch(id));
}
while let Some(value) = futures.next().await {
println!("{value}");
}
}
这里的结果会按照完成顺序返回,而不是按照 push 的顺序返回。比如上面的 id = 2 等待时间最短,所以它通常会先输出。如果需要按照加入顺序返回结果,则可以使用 FuturesOrdered。FuturesOrdered 中后面的 future 即使先完成,也要等前面的 future 完成后才会被 yield 出来。
FuturesUnordered 更适合管理一批数量较多、数量可能动态变化、且结果顺序不重要的 futures。和 join_all 相比,它可以在运行过程中继续 push 新的 future,也不要求最后一次性收集成一个 Vec。它还有一个重要的性能特征:只有当内部某个 future 产生 wake-up 通知时,集合才会 poll 对应的 future,这使得它更适合管理大量并发 future。
不过,FuturesUnordered 管理的是普通 future,而不是 Tokio task。也就是说,这些 future 仍然是在当前 async task 被 poll 的过程中向前推进;如果外层 task 被取消,这个集合里的 future 也会一起被 drop。若希望每个工作项独立成为 Tokio task,应该使用 tokio::spawn 后管理 JoinHandle,或者直接使用后面的 JoinSet。
使用 JoinSet 来管理多个 Tasks
在 tokio::task 模块下,我们有一个 JoinSet。
JoinSet 适合动态地管理一批已经 spawn 出去的 task。和 join! 不同,它不要求我们在编译期就知道 future 的数量;和 Vec<JoinHandle<_>> 相比,它会按照 “哪个 task 先完成,就先返回哪个结果” 的方式来回收结果。不过,JoinSet 中的所有 task 必须有相同的返回类型。另一个重要差异是:drop JoinHandle 会 detach 对应 task,而 drop JoinSet 会立即 abort 其中仍在运行的所有 task。
一个常见的模式如下:
let mut set = tokio::task::JoinSet::new();
for i in 0..10 {
set.spawn(async move { i * 2 });
}
while let Some(res) = set.join_next().await {
let value = res.unwrap();
println!("{value}");
}
这里的 join_next() 会等待任意一个已经完成的 task,并返回它的结果;当集合为空的时候,它会返回 None。
和 tokio::spawn 函数要求的约束一样,JoinSet::spawn() 要求 future 和其输出都满足 Send + 'static。如果我们需要管理一批 !Send 的本地 task,那么也可以使用 JoinSet::spawn_local(),前提是当前运行在 LocalSet 或者 LocalRuntime 上。
使用 TaskTracker 来管理多个 Tasks
在 tokio_util 中,我们提供了结构体 TaskTracker。在一些情况下,我们可以使用 TaskTracker 来代替 JoinSet。其中一种情况就是我们希望任务退出的时候,能够尽快释放其内存。
一个简单的例子如下:
let tracker = TaskTracker::new();
for i in 0..10 {
tracker.spawn(async move {
println!("Task {} is running!", i);
});
}
tracker.close();
tracker.wait().await;
// No `join_next` needed...
链接和引用
- docs.rs / tokio: https://docs.rs/tokio/latest/tokio/index.html
- 《Rust 语言圣经》/ 6.3 创建异步任务: https://course.rs/advance-practice/spawning.html
- Rust 入门秘籍: https://rust-book.junmajinlong.com/about.html
- Cybernetist / Rust tokio task cancellation patterns: https://cybernetist.com/2024/04/19/rust-tokio-task-cancellation-patterns/