浅析 Rust 异步运行时 tokio

现在!运行!任务!

2Runtime
4未完待续......

Runtime

Runtime 提供了供异步 task 运行的环境。一般来说包括 I/O 驱动器、时间驱动(timer)、阻塞线程池、任务调度器。

我们有多种类别的运行时,包括当前线程的运行时、多线程的运行时。在后文中我们进一步探讨。

构建 Runtime

默认情况下,我们可以调用 Runtime::new() 来构建一个默认的运行时,它等价于:

Builder::new_multi_thread().enable_all().build()

这里的 new_multi_thread 用于给出一个多线程的 tokio 运行时,而 enable_all 用于支持 I/O 和时间。它得到的是一个带有运行在多线程上的调度器、一个 I/O 驱动和一个时间驱动的异步运行时。它等价于执行了:

  • enable_io
  • enable_time

Runtime 构建器支持非常多的方法。不过避免本节过长,我们在后文的 “内部原理” 一节中讨论更多。

我们可以使用 runtime 构建器来构建一个 runtime。不过,更常见的是使用 #[tokio::main] 宏来构建一个 runtime 并将对应的函数作为入口点进行运行 [1]。例如下面的代码:

#[tokio::main]
async fn main() {
    println!("hello");
}

其等价于:

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}

可以看到,默认的运行时为多线程运行时,对于大部分应用场景而言这是最合适的选择。在上面的例子中,假设我们探测到可用的 CPU 核心数为 8,则它会创建 8 个工作线程(worker thread),同时 main 本身也占据一个线程,所以在这个例子中,一共是 9 个线程。

不过,我们还提供了:

  • 局部运行时(Local Runtime)。它也是单线程的,不过它能直接驱动 !Send 的本地 task。
  • 本地线程运行时(Current-thread Runtime)。

我们不展开讨论局部运行时。

对于本地线程运行时而言,我们只使用当前线程来完成 future 的执行。

Runtime 方法

Runtime 提供了很多方法。和构建相关的方法见前文 “构建 Runtime” 一节。

在讨论这些方法之前,需要指出的是,runtime 有 “上下文” 这一个概念,它使得 worker thread 能够知道其关联的 runtime 是谁。它基于 TLS(Thread-Local Storage,线程局部存储)实现。例如,当我们直接使用 tokio::spawn 方法的时候,它会在 TLS 中找到 runtime,并让其创建一个 tokio task 并执行。

如下是一些常见的方法:

handle

返回运行时的一个 handle,使用它能够和运行时交互。见后文 “使用 Handle 和 Runtime 交互” 了解更多。

spawn

将一个 future 包装为 task,并 spawn 运行在这个 tokio 运行时上。运行时会运行这个 task,直到其完成或者被取消。

spawn_blocking

将一个闭包运行在运行时的 blocking 线程上。Runtime 本身有一个 blocking 线程池。当我们调用 spawn_blocking 的时候,我们会在该线程池中选择一个可用的线程并执行对应逻辑。如果没有可用的线程则新建或者等待。

block_on

它会在 Tokio 运行时上运行一个 future 直到它完成。这也是运行时的 入口点。它会在当前线程上执行 future,并以阻塞方式等待其执行完毕并返回最终结果。内部生成的任何任务或者定时器都将在该运行时上执行。注意,这个 future 是非工作型 future(non-worker future),它不会被放在 worker thread 中运行,而是一直待在当前线程上,直到其执行完毕。

如果提供的 future 发生了 panic,或者异步执行上下文中调用了本函数,本函数会 panic。

如下是一些在业务代码中不太常见的方法:

enter

进入运行时上下文。进入后,当前线程就暂时带上了这个 runtime 的上下文信息,使得一些依赖 “当前运行时” 的 API(例如 tokio::spawnHandle::current 等)能够正常工作。

shutdown_timeout

关闭运行时,并最多等待给定的时长。关闭时,runtime 会停止接收和调度新的异步任务,并尝试等待已有任务、阻塞线程池任务和 runtime 资源完成清理。

如果到了超时时间仍然没有完全结束,这个方法就直接返回;未停下来的阻塞任务会继续在后台运行。

shutdown_background

立即开始关闭运行时,但不等待其完成。

它同样不会强行终止已经开始执行的阻塞任务;只是当前调用方不再等待这些工作结束。

metrics

返回这个 runtime 的指标视图。通过它可以读取 worker 数量、全局队列深度等统计信息。

阻塞边界

tokio 的 worker thread 是拿来执行异步 task 的,不适合长时间做会阻塞线程的工作。这和 Go 语言的 goroutine 不同。Goroutine 支持抢占式调度,这使得即使某些 goroutine 长时间运行,也不会造成其他 goroutine 饥饿。但是天下没有免费的午餐,只有各种各样的 trade-off,抢占式调度使得其存在着一定的调度 overhead,而在 tokio 中,我们严格区分一个 task 是否会长时间占用 CPU,例如,使用一些阻塞 API、大量地进行计算,等等。常见的阻塞来源包括:

  • std::thread::sleep 这类同步等待,或者标准库或者第三方库提供的同步文件 I/O、同步网络 I/O 方法。
  • 长时间占用 CPU 的纯计算逻辑。

对应的处理方式通常是:

  • 优先使用 Tokio 提供的异步 I/O API。
  • 必须执行阻塞操作或重 CPU 计算时,则需要使用 spawn_blocking 函数把工作移到专门的阻塞线程池。

例如下面这段代码就会错误地堵住 worker thread:

tokio::spawn(async {
    std::thread::sleep(std::time::Duration::from_secs(1));
});

这 1 秒中,线程无法执行其他任何 task 了。因此,更合适的写法是:

tokio::spawn(async {
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
});

或者使用 spawn_blocking 函数:

let n = tokio::task::spawn_blocking(|| {
    std::thread::sleep(std::time::Duration::from_secs(1));
    42
}).await.unwrap();

这里闭包中的逻辑会运行在 tokio 专门维护的阻塞线程池上,而不是运行异步 task 的 worker thread 上。外层 async task 只是等待这个阻塞任务完成,因此这 1 秒内,worker thread 仍然可以继续 poll 其他 task。

使用 Handle 和 Runtime 交互

Handle 是运行时的一个 “把手”,我们可以使用它来和 Runtime 交互。

比如,当我们执行 tokio::spawn 的时候,其效果和如下代码差不多:

let handle = Handle::current();
handle.spawn(async {
    // To do something...
});

可以看到,我们在这里委托当前 tokio 上下文对应的运行时创建一个新的 task 并执行。这里我们使用 Handle::current 或者 Handle::try_current 获取当前运行时的 handle。如果在非 Tokio 运行时上下文中调用 current 的话,会出现 panic。 如果有这种可能性的话,建议使用 try_current 来避免。

将其拆分成两步使得我们可以在一些没有 tokio 上下文的线程中和 tokio 运行时交互。例如下面的代码中,我们可以先获取 handle,然后使用 Handle::spawn 来让一个 task 运行在 tokio 运行时中:

let handle = Handle::current();

thread::spawn(move || {
    handle.spawn(async {
        // To do something....
    })
})

当然,如果我们能访问到运行时,我们也可以直接使用 Runtimehandle 方法来获取。例如 RT 为我们想要访问的运行时:

let handle = RT.handle();

Handle 提供的常用方法如下:

spawn

派生一个新的任务。和 tokio::spawn 相比,我们可以在非 runtime 的工作线程内运行这个方法。只要手里持有某个运行时的 Handle,就可以把 future 提交到这个运行时上执行。

spawn_blocking

在专门用于阻塞操作的执行器上运行提供的函数。它和 Runtime::spawn_blocking 的作用相同,只是这里通过 Handle 把工作提交到对应的运行时。

block_on

在当前线程上运行一个 future,堵塞直到它完成,并返回解析结果。这个 future 生成的任何其他 task 也都会在对应的运行时上执行。

对于本地线程运行时而言,只有 Runtime::block_on 方法能驱动 I/O 和计时器驱动。而 Handle::block_on 方法无法驱动它们。

这个方法会在以下情况下 panic:

  1. 提供的 future 本身 panic 了。
  2. 如果它在一个异步上下文中被调用。
  3. 一个计时器 future 在已被关闭的运行时上运行。
runtime_flavor

返回运行时类别。返回值类型为 RuntimeFlavor,目前包括 CurrentThreadMultiThread 两种。

id

返回当前运行时的 ID。这个 Id 是一个不透明标识,用来区分当前运行时和其他运行时。

metrics

返回运行时的指标。它返回一个 RuntimeMetrics 视图,用来读取该运行时的观测信息。

dump

返回运行时状态的一个快照捕获。它能够导出 runtime 中各个 task 的状态和最近一次 poll 的调用栈轨迹,适合在排查 “某些 task 卡在哪里” 这类问题时使用。

独立 Runtime

本节给出了一个小技巧。我们在某些时候可能希望拥有一个全局的或者独立的 tokio 异步运行时。

比如,我们的程序的主要逻辑是同步代码,但是我们的确需要运行一些异步代码(即对 Future 求值),不过我们又不希望每次想要在同步函数中对 Future 求值都创建异步运行时(开销很大)来运行它。

这个时候,我们可以选择使用全局变量来维护异步运行时:

pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
    tokio::runtime::Runtime::new()
        .expect("failed to build runtime")
});

当我们需要使用的时候,我们可以先获取 Handle 对象然后再调用 spawn 函数。例如:

RT.spawn(async {
    async_something().await;
});

或者:

let _ = RT.block_on(do_some_async_task());

在实践中,我们可以将运行时或者 Handle 作为一个结构体的字段,并在构建的时候,支持上层传入,如果上层选择不传入的时候我们可以回退从而构建一个独立的 runtime。

在 Rust 社区中的 “hdfs-native” 库的实现中,我们就使用了这个技巧。我们也可以使用这个技巧来提供一些同步的接口,虽然底层实现是异步的。

处理 Panic

参考文档 [2]

当 tokio 运行时管理的任务 panic 的时候,tokio 会捕获这个 panic。

如果我们将 panic 视为任务的返回值,那么它的效果和任务正常退出完全相同。这个行为可以通过枚举 tokio::runtime::UnhandledPanic 来变更。可以阅读这个枚举对应的文档了解更多。

当任务 panic 的时候,对应的 task 的 JoinHandle.await 后会返回 Err(JoinError),它会支持这个 task 发生了 panic。更多见后文的 “JoinError” 一节。

链接和引用

  1. Tokio / Tutorial / Hello Tokio: https://tokio.rs/tokio/tutorial/hello-tokio
  2. Rust Forum / "Tokio Runtime - what happens when a thread panics?": https://users.rust-lang.org/t/tokio-runtime-what-happens-when-a-thread-panics/95819