浅析 Rust 异步运行时 tokio
现在!运行!任务!
Runtime
Runtime 提供了供异步 task 运行的环境。一般来说包括 I/O 驱动器、时间驱动(timer)、阻塞线程池、任务调度器。
我们有多种类别的运行时,包括当前线程的运行时、多线程的运行时。在后文中我们进一步探讨。
构建 Runtime
默认情况下,我们可以调用 Runtime::new() 来构建一个默认的运行时,它等价于:
Builder::new_multi_thread().enable_all().build()
这里的 new_multi_thread 用于给出一个多线程的 tokio 运行时,而 enable_all 用于支持 I/O 和时间。它得到的是一个带有运行在多线程上的调度器、一个 I/O 驱动和一个时间驱动的异步运行时。它等价于执行了:
enable_io。enable_time。
Runtime 构建器支持非常多的方法。不过避免本节过长,我们在后文的 “内部原理” 一节中讨论更多。
我们可以使用 runtime 构建器来构建一个 runtime。不过,更常见的是使用 #[tokio::main] 宏来构建一个 runtime 并将对应的函数作为入口点进行运行 [1]。例如下面的代码:
#[tokio::main]
async fn main() {
println!("hello");
}
其等价于:
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
可以看到,默认的运行时为多线程运行时,对于大部分应用场景而言这是最合适的选择。在上面的例子中,假设我们探测到可用的 CPU 核心数为 8,则它会创建 8 个工作线程(worker thread),同时 main 本身也占据一个线程,所以在这个例子中,一共是 9 个线程。
不过,我们还提供了:
- 局部运行时(Local Runtime)。它也是单线程的,不过它能直接驱动
!Send的本地 task。 - 本地线程运行时(Current-thread Runtime)。
我们不展开讨论局部运行时。
对于本地线程运行时而言,我们只使用当前线程来完成 future 的执行。
Runtime 方法
Runtime 提供了很多方法。和构建相关的方法见前文 “构建 Runtime” 一节。
在讨论这些方法之前,需要指出的是,runtime 有 “上下文” 这一个概念,它使得 worker thread 能够知道其关联的 runtime 是谁。它基于 TLS(Thread-Local Storage,线程局部存储)实现。例如,当我们直接使用 tokio::spawn 方法的时候,它会在 TLS 中找到 runtime,并让其创建一个 tokio task 并执行。
如下是一些常见的方法:
handle返回运行时的一个 handle,使用它能够和运行时交互。见后文 “使用 Handle 和 Runtime 交互” 了解更多。
spawn将一个 future 包装为 task,并
spawn运行在这个 tokio 运行时上。运行时会运行这个 task,直到其完成或者被取消。spawn_blocking将一个闭包运行在运行时的 blocking 线程上。Runtime 本身有一个 blocking 线程池。当我们调用
spawn_blocking的时候,我们会在该线程池中选择一个可用的线程并执行对应逻辑。如果没有可用的线程则新建或者等待。block_on它会在 Tokio 运行时上运行一个 future 直到它完成。这也是运行时的 入口点。它会在当前线程上执行 future,并以阻塞方式等待其执行完毕并返回最终结果。内部生成的任何任务或者定时器都将在该运行时上执行。注意,这个 future 是非工作型 future(non-worker future),它不会被放在 worker thread 中运行,而是一直待在当前线程上,直到其执行完毕。
如果提供的 future 发生了 panic,或者异步执行上下文中调用了本函数,本函数会 panic。
如下是一些在业务代码中不太常见的方法:
enter进入运行时上下文。进入后,当前线程就暂时带上了这个 runtime 的上下文信息,使得一些依赖 “当前运行时” 的 API(例如
tokio::spawn、Handle::current等)能够正常工作。shutdown_timeout关闭运行时,并最多等待给定的时长。关闭时,runtime 会停止接收和调度新的异步任务,并尝试等待已有任务、阻塞线程池任务和 runtime 资源完成清理。
如果到了超时时间仍然没有完全结束,这个方法就直接返回;未停下来的阻塞任务会继续在后台运行。
shutdown_background立即开始关闭运行时,但不等待其完成。
它同样不会强行终止已经开始执行的阻塞任务;只是当前调用方不再等待这些工作结束。
metrics返回这个 runtime 的指标视图。通过它可以读取 worker 数量、全局队列深度等统计信息。
阻塞边界
tokio 的 worker thread 是拿来执行异步 task 的,不适合长时间做会阻塞线程的工作。这和 Go 语言的 goroutine 不同。Goroutine 支持抢占式调度,这使得即使某些 goroutine 长时间运行,也不会造成其他 goroutine 饥饿。但是天下没有免费的午餐,只有各种各样的 trade-off,抢占式调度使得其存在着一定的调度 overhead,而在 tokio 中,我们严格区分一个 task 是否会长时间占用 CPU,例如,使用一些阻塞 API、大量地进行计算,等等。常见的阻塞来源包括:
std::thread::sleep这类同步等待,或者标准库或者第三方库提供的同步文件 I/O、同步网络 I/O 方法。- 长时间占用 CPU 的纯计算逻辑。
对应的处理方式通常是:
- 优先使用 Tokio 提供的异步 I/O API。
- 必须执行阻塞操作或重 CPU 计算时,则需要使用
spawn_blocking函数把工作移到专门的阻塞线程池。
例如下面这段代码就会错误地堵住 worker thread:
tokio::spawn(async {
std::thread::sleep(std::time::Duration::from_secs(1));
});
这 1 秒中,线程无法执行其他任何 task 了。因此,更合适的写法是:
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
});
或者使用 spawn_blocking 函数:
let n = tokio::task::spawn_blocking(|| {
std::thread::sleep(std::time::Duration::from_secs(1));
42
}).await.unwrap();
这里闭包中的逻辑会运行在 tokio 专门维护的阻塞线程池上,而不是运行异步 task 的 worker thread 上。外层 async task 只是等待这个阻塞任务完成,因此这 1 秒内,worker thread 仍然可以继续 poll 其他 task。
使用 Handle 和 Runtime 交互
Handle 是运行时的一个 “把手”,我们可以使用它来和 Runtime 交互。
比如,当我们执行 tokio::spawn 的时候,其效果和如下代码差不多:
let handle = Handle::current();
handle.spawn(async {
// To do something...
});
可以看到,我们在这里委托当前 tokio 上下文对应的运行时创建一个新的 task 并执行。这里我们使用 Handle::current 或者 Handle::try_current 获取当前运行时的 handle。如果在非 Tokio 运行时上下文中调用 current 的话,会出现 panic。 如果有这种可能性的话,建议使用 try_current 来避免。
将其拆分成两步使得我们可以在一些没有 tokio 上下文的线程中和 tokio 运行时交互。例如下面的代码中,我们可以先获取 handle,然后使用 Handle::spawn 来让一个 task 运行在 tokio 运行时中:
let handle = Handle::current();
thread::spawn(move || {
handle.spawn(async {
// To do something....
})
})
当然,如果我们能访问到运行时,我们也可以直接使用 Runtime 的 handle 方法来获取。例如 RT 为我们想要访问的运行时:
let handle = RT.handle();
Handle 提供的常用方法如下:
spawn派生一个新的任务。和
tokio::spawn相比,我们可以在非 runtime 的工作线程内运行这个方法。只要手里持有某个运行时的Handle,就可以把 future 提交到这个运行时上执行。spawn_blocking在专门用于阻塞操作的执行器上运行提供的函数。它和
Runtime::spawn_blocking的作用相同,只是这里通过Handle把工作提交到对应的运行时。block_on在当前线程上运行一个 future,堵塞直到它完成,并返回解析结果。这个 future 生成的任何其他 task 也都会在对应的运行时上执行。
对于本地线程运行时而言,只有
Runtime::block_on方法能驱动 I/O 和计时器驱动。而Handle::block_on方法无法驱动它们。这个方法会在以下情况下 panic:
- 提供的 future 本身 panic 了。
- 如果它在一个异步上下文中被调用。
- 一个计时器 future 在已被关闭的运行时上运行。
runtime_flavor返回运行时类别。返回值类型为
RuntimeFlavor,目前包括CurrentThread和MultiThread两种。id返回当前运行时的 ID。这个
Id是一个不透明标识,用来区分当前运行时和其他运行时。metrics返回运行时的指标。它返回一个
RuntimeMetrics视图,用来读取该运行时的观测信息。dump返回运行时状态的一个快照捕获。它能够导出 runtime 中各个 task 的状态和最近一次 poll 的调用栈轨迹,适合在排查 “某些 task 卡在哪里” 这类问题时使用。
独立 Runtime
本节给出了一个小技巧。我们在某些时候可能希望拥有一个全局的或者独立的 tokio 异步运行时。
比如,我们的程序的主要逻辑是同步代码,但是我们的确需要运行一些异步代码(即对 Future 求值),不过我们又不希望每次想要在同步函数中对 Future 求值都创建异步运行时(开销很大)来运行它。
这个时候,我们可以选择使用全局变量来维护异步运行时:
pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Runtime::new()
.expect("failed to build runtime")
});
当我们需要使用的时候,我们可以先获取 Handle 对象然后再调用 spawn 函数。例如:
RT.spawn(async {
async_something().await;
});
或者:
let _ = RT.block_on(do_some_async_task());
在实践中,我们可以将运行时或者 Handle 作为一个结构体的字段,并在构建的时候,支持上层传入,如果上层选择不传入的时候我们可以回退从而构建一个独立的 runtime。
在 Rust 社区中的 “hdfs-native” 库的实现中,我们就使用了这个技巧。我们也可以使用这个技巧来提供一些同步的接口,虽然底层实现是异步的。
处理 Panic
参考文档 [2]。
当 tokio 运行时管理的任务 panic 的时候,tokio 会捕获这个 panic。
如果我们将 panic 视为任务的返回值,那么它的效果和任务正常退出完全相同。这个行为可以通过枚举 tokio::runtime::UnhandledPanic 来变更。可以阅读这个枚举对应的文档了解更多。
当任务 panic 的时候,对应的 task 的 JoinHandle 在 .await 后会返回 Err(JoinError),它会支持这个 task 发生了 panic。更多见后文的 “JoinError” 一节。
链接和引用
- Tokio / Tutorial / Hello Tokio: https://tokio.rs/tokio/tutorial/hello-tokio
- Rust Forum / "Tokio Runtime - what happens when a thread panics?": https://users.rust-lang.org/t/tokio-runtime-what-happens-when-a-thread-panics/95819