tokio使用中的注意事项 #53

BruceChen7 opened this issue Mar 13, 2023

BruceChen7 commented Mar 13, 2023


BruceChen7 commented Mar 13, 2023


(usage:: 什么是 tokio runtime )

  • 包含了如下几个部分
    • 一个 I/O 事件循环,称为驱动程序,它驱动 I/O 资源并将 I/O 事件分派给依赖它们的任务。
    • 执行使用这些 I/O 资源的任务的调度程序
    • 用于安排工作在设定的时间段后运行的定时器
    • tokio::main 来在后台创建运行时,理解运行时是有一整套的异步代码,包括executor, waker这些
  • 运行时上下文中,使用 tokio::spawn 函数产生其他任务。
  • 使用此函数产生的future 将在与 Runtime 使用的相同线程池上执行
  • 要运行异步函数,它们必须传递给 tokio::spawn 或者是用 #\[tokio::main\] 注释的主函数。
    • 这将生成的最外层 future,提交给 Tokio 执行者
    • 执行者 (executor) 负责在最外层 future 调用 Future::poll ,驱动异步计算完成
    • 这意味着在 outer future 上尽量不能 block(不能被运行是管理的 block,是不行的),否则执行者将会被阻塞,不能执行更多的 outer future
    • outer future 是指通过调用 tokio::spawn 和 tokio::main 注解的函数
    • 执行器负责调度不同的任务(顶级 future)。
    • 它通过轮询尝试推进任务。同时,它分发一个 Waker,该 Waker 会传递给 Reactor
    • 如果 Future 返回 ready,则已解决,该任务完成。
    • 如果返回 NotReady,则该未来被放入待处理队列
    • 一个任务被视为一个由 Future 组成的树
    • 由 async/await 创建的 Future 不是叶子 Future。
    • 代表实际上需要等待的操作的 Future 是叶子 Future。
    • 叶子 Future 在尝试从 TcpStream 读取时,会将 Waker 传递给反应器。
    • 如果读取操作返回了 WouldBlock,意味着目前还没有数据,它将把这个结果返回到执行器的 future 链上。
    • 然后执行器会将这个任务放入待处理队列
    • Reactor 等待一个由操作系统支持的事件队列,并在它监控的 TcpStreams 中的一个准备好读取数据时收到通知。
    • 当数据准备好时,它会调用相应 Waker 的唤醒方法。
    • 唤醒者(Waker)是由执行者(Executor)传递出来的,所以当数据准备就绪时,它知道如何将相应的任务从待处理队列移动到就绪队列
    • 然后唤醒执行器。它检查就绪队列并在那里找到执行的任务。
    • 它轮询该任务,然后循环继续,直到该任务中的整个 Future Tree 都得到解决并且该任务完成。


  • pub fn new_current_thread() -> Builder
  • 返回一个新的构建器,其中当前线程调度程序被选中
  • 要在生成的运行时上产生非发送任务,请将其与 LocalSet 相结合。


  • 一些场景下,要运行一个或多个未实现Send的 future,因此无法在线程之间安全地 Send
  • 使用 localSet 来安排一个或多个!Send future 在同一个线程上一起运行。
  • 下面代码不会编译
    use std::rc::Rc;
    async fn main() {
        // `Rc` does not implement `Send`, and thus may not be sent between
        // threads safely.
        let nonsend_data = Rc::new("my nonsend data...");
        let nonsend_data = nonsend_data.clone();
        // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
        // Since `tokio::spawn` requires the spawned future to implement `Send`, this
        // will not compile.
        tokio::spawn(async move {
            println!("{}", nonsend_data);
            // ...
  • 使用 LocalSet 来生成!Send future 并安排它们在调用 Runtime::block_on 的线程上。
  • 在本地任务集内运行时,能使用 task::spawn_local 来生成 !Send future。
  • run_until 方法只能在 \#\[tokio::main\] 、 \#\[tokio::test\] 或直接在对 Runtime::block_on 的调用中使用。
  • 不能在使用 tokio::spawn 生成的任务中使用。
  • LocalSet 本身实现了 Future,用于在 LocalSet 上运行多个 futures 并驱动整个集合直到它们完成
    use std::rc::Rc;
    use tokio::task;
    async fn main() {
        let nonsend_data = Rc::new("my nonsend data...");
        // Construct a local task set that can run `!Send` futures.
        let local = task::LocalSet::new();
        // Run the local task set.
        local.run_until(async move {
            let nonsend_data = nonsend_data.clone();
            // `spawn_local` ensures that the future is spawned on the local
            // task set.
            task::spawn_local(async move {
                println!("{}", nonsend_data);
                // ...
    // 本身是一个 future
    use tokio::{task, time};
    use std::rc::Rc;
    async fn main() {
        let nonsend_data = Rc::new("world");
        let local = task::LocalSet::new();
        let nonsend_data2 = nonsend_data.clone();
        local.spawn_local(async move {
            // ...
            println!("hello {}", nonsend_data2)
        local.spawn_local(async move {
            println!("goodbye {}", nonsend_data)
        // ...


  • 异步是使用一种称为协作调度的机制实现的
  • 异步代码不能在到达.await 的情况前花费很长时间
  • 阻塞线程 (blocking the thread) 意味着阻止运行时切换当前任务
  • 它阻塞了线程。在这种情况下,没有其他任务,所以这不是问题,但在实际程序中会有其他的任务
    use std::time::Duration;
    async fn main() {
        println!("Hello World!");
        // No .await here!
        println!("Five seconds later...");
    use std::time::Duration;
    async fn sleep_then_print(timer: i32) {
        println!("Start timer {}.", timer);
        // No .await here!
        println!("Timer {} done.", timer);
    async fn main() {
        // The join! macro lets you run multiple things concurrently.
  • 将花费三秒钟的时间运行,并且计时器将一个接一个地运行,没有任何并发性,原因是这里不是协作式的放弃执行
  • tokio 运行时无法将一个任务交换为另一个任务,因为这样的交换只能发生在 .await 处

如果我想 block,该怎么办

  • 什么样的情况,会出现 block
    • 同步 IO
    • 时间比较长的 CPU 运算
  • 这两种情况,都会导致在较长时间才能到达.await 操作,必须将这种阻塞操作移动到 tokio executor 线程池之外的线程
  • 使用 tokio::task::spawn_blocking 函数
    • 运行时包含一个单独的线程池,专门用于运行阻塞函数,你能使用 spawn_blocking 在其上运行任务。
    • 这个线程池的上限是大约 500 个线程,因此能在这个线程池上进行大量阻塞操作
  • 使用 rayon crate
  • 使用 std::thread::spawn 生成专用线程。

(usage:: 在同步代码中使用异步函数)

  • 某些场景下,trait 的签名是同步的
  • generate 是一个同步方法,.await 里面不能直接使用
    trait Sequencer {
        fn generate(&self) -> Vec<i32>;
    impl PlainSequencer {
        async fn generate_async(&self)->Vec<i32>{
            let mut res = vec![];
            for i in 0..self.bound {
    impl Sequencer for PlainSequencer {
        fn generate(&self) -> Vec<i32> {
  • 注意这里的 generate 是一个同步的签名,却调用异步的实现


impl Sequencer for PlainSequencer {
    fn generate(&self) -> Vec<i32> {

mod tests{
    async fn test_sync_method() {
        let sequencer = PlainSequencer {
            bound: 3
        let vec = sequencer.generate();
        println!("vec: {:?}", vec);
  • Runtime::block_on 方法,它会阻塞当前线程,直到 future 完成。
  • 该错误说明不允许从*当前正在执行的运行时中启动另一个运行时
    Cannot start a runtime from within a runtime.
    This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
    thread 'tests::test_sync_method' panicked at 'Cannot start a runtime from within a runtime.


impl Sequencer for PlainSequencer {
    fn generate(&self) -> Vec<i32> {
        futures::executor::block_on(async {
  • 编译成功,但是代码只是挂了,运行时没有返回
  • generate_async 方法中只有一个简单的 sleep 调用,但是为什么 future 永远不会完成呢?
  • 对于相同的代码片段,它在 tokio::test 中挂起,而在 tokio::main 中正常完成。
  • async 的基本原理
    // pseudo-rust code
    match ::std::future::IntoFuture::into_future(<expr>) {
        mut __awaitee => loop {
            match unsafe { ::std::future::Future::poll(
                <::std::pin::Pin>::new_unchecked(&mut __awaitee),
            ) } {
                ::std::task::Poll::Ready(result) => break result,
                ::std::task::Poll::Pending => {}
            task_context = yield ();
  • 调用 generate 函数的一定是Tokio 的 executor
  • 那么谁负责在 block_on 函数内部轮询 future 呢?
  • 一开始是以为是会在内部运行时轮询 generate_async
  • 实际运行在 futures_executor::local_pool::run_executor
    fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
        let _enter = enter().expect(
            "cannot execute `LocalPool` executor from within \
             another executor",
        CURRENT_THREAD_NOTIFY.with(|thread_notify| {
            let waker = waker_ref(thread_notify);
            let mut cx = Context::from_waker(&waker);
            loop {
                if let Poll::Ready(t) = f(&mut cx) {
                    return t;
                // Wait for a wakeup.
                while !thread_notify.unparked.swap(false, Ordering::Acquire) {
                    // No wakeup occurred. It may occur now, right before parking,
                    // but in that case the token made available by `unpark()`
                    // is guaranteed to still be available and `park()` is a no-op.
  • 尽管此方法名为 run_executor,但其中似乎没有任何 spawn 调用
  • 相反,它会在当前线程中不断循环,检查用户提交的 future 是否准备就绪
  • 当 Tokio运行时线程到达这一点时,它会立即 yield 自己,直到用户的 Future 准备就绪并将其取消停放
  • 另一方面,用户的 future 是同一个线程执行的,这个线程还在 parking,那么就造成了死锁
  • 由于不能在当前运行时线程中阻塞,所以使用另一个运行时来阻塞结果


impl Sequencer for PlainSequencer {
    fn generate(&self) -> Vec<i32> {
        let bound = self.bound;
        futures::executor::block_on(async move {
            RUNTIME.spawn(async move {
                let mut res = vec![];
                for i in 0..bound {
  • 在 futures::executor::block_on 中,额外的 RUNTIME 用于生成异步代码。
  • 异步任务需要一个 exectutor 来驱动它的状态变化
  • 如果删除 RUNTIME ,而不是为 futures::executor::block_on 生成一个新线程,死锁问题就解决了,
  • 但是 tokio::time::sleep 方法调用将抱怨“没有 reactor 正在运行”,因为 Tokio 功能需要运行时

tokio::main 和 tokio::test 区别

  • tokio::main 没有挂起而 tokio::test 挂起,它们使用不同的运行时。
  • 具体来说, tokio::main 在多线程运行时运行,而 tokio::test 在单线程运行时运行。
  • 在单线程运行时,当前线程被 futures::executor::block_on 阻塞时,用户提交的异步代码无法执行,导致前面提到的死锁。


  • 异步代码与可能导致阻塞的同步代码结合起来绝非明智之举。
  • 从同步上下文调用异步代码时,请使用 futures::executor::block_on 并将异步代码生成到专用运行时,因为前者会阻塞当前线程
  • 如果必须从异步上下文调用阻塞同步代码,建议使用 tokio::task::spawn_blocking 在处理阻塞操作的专用执行器上执行代码


// single-threaded executor for your async tasks
#[tokio::main(flavor = "current_thread")]
pub async fn run(self) -> Result<()> {


use scoped_thread_local::scoped_thread_local;
scoped_thread_local!(static COUNTER: u32);

fn main() {
    let threads = (0..4).map(|_| {
        std::thread::spawn(|| {
            COUNTER.with(|counter| {
                for i in 0..10 {
                    let value = counter.get().unwrap_or(&0) + 1;
                    println!("Thread {}: counter = {}", std::thread::current().id(), value);

    for thread in threads {
  • 使用 scoped_tls crate 用来定义 thread-local storage
  • 输出
    Thread ThreadId(1): counter = 1
    Thread ThreadId(2): counter = 1
    Thread ThreadId(3): counter = 1
    Thread ThreadId(4): counter = 1
    Thread ThreadId(3): counter = 2
    Thread ThreadId(1): counter = 2
    Thread ThreadId(2): counter = 2
    Thread ThreadId(4): counter = 2
    Thread ThreadId(1): counter = 3
    Thread ThreadId(2): counter = 3
    Thread ThreadId(3): counter = 3
    Thread ThreadId(4): counter = 3
    Thread ThreadId(3): counter = 4
    Thread ThreadId(1): counter = 4
    Thread ThreadId(2): counter = 4
    Thread ThreadId(4): counter = 4
    Thread ThreadId(3): counter = 5

并发编程(类似 goroutine)

  • spawn 用来产生一个 green thread,也能拿到该 thread 执行的结果。
    use tokio::net::TcpListener;
    async fn main() {
        let listener = TcpListener::bind("").await.unwrap();
        loop {
            let (socket, _) = listener.accept().await.unwrap();
            // A new task is spawned for each inbound socket. The socket is
            // moved to the new task and processed there.
            tokio::spawn(async move {
    async fn main() {
        // 能拿到执行结果
        let handle = tokio::spawn(async {
            // Do some async work
            "return value"
        // Do some other work
        // 返回一个 Result<T, Error>
        let out = handle.await.unwrap();
        println!("GOT {}", out);

'static 约束

  • 基于 tokio 产生一个 task 时,其类型的 lifetime 必须是**'static 的**,
  • task 中的代码,是不能够引用外部拥有的数据。
  • 注意'static 并不意味着生命周期一直到程序终结,能见rust 中的 lifetime
    use tokio::task;
    async fn main() {
        let v = vec![1, 2, 3];
        task::spawn(async {
            println!("Here's a vec: {:?}", v);
  • 产生的结果是
    error[E0373]: async block may outlive the current function, but
                  it borrows `v`, which is owned by the current function
     --> src/
    7 |       task::spawn(async {
      |  _______________________^
    8 | |         println!("Here's a vec: {:?}", v);
      | |                                        - `v` is borrowed here
    9 | |     });
      | |_____^ may outlive borrowed value `v`
    note: function requires argument type to outlive `'static`
     --> src/
    7 |       task::spawn(async {
      |  _________________^
    8 | |         println!("Here's a vector: {:?}", v);
    9 | |     });
      | |_____^
    help: to force the async block to take ownership of `v` (and any other
          referenced variables), use the `move` keyword
    7 |     task::spawn(async move {
    8 |         println!("Here's a vec: {:?}", v);
    9 |     });
  • v 仍然被 task 之外拥有,println! 只是借用。能通过 async mov 的方式来使所有权变更
    task::spawn(async move {
        println!("Here's a vec: {:?}", v);


  • tokio::spawn 产生的任务一定要实现 Send
    • 这样 task 才能够在多个线程调度。需要task 所持有的的数据是 owned
    • 当所有在.await 调用中持有的数据被 Send,任务就能被发送
    • 当.await 被调用时,任务就回到了调度器中。下一次任务被执行时,它将从最后的上次 yield 点恢复。
    • 为了使其正常工作,所有在.await 之后使用的状态都必须由任务保存
    • 如果这个状态是 Send,即能跨线程移动,那么任务本身也能跨线程移动。反之,如果状态不是 Send,那么任务也不是。


use tokio::task::yield_now;
use std::rc::Rc;
async fn main() {
    tokio::spawn(async {
        // The scope forces `rc` to drop before `.await`.
            let rc = Rc::new("hello");
            println!("{}", rc);

        // `rc` is no longer used. It is **not** persisted when
        // the task yields to the scheduler
  • 下面则不行
    use tokio::task::yield_now;
    use std::rc::Rc;
    async fn main() {
        tokio::spawn(async {
            let rc = Rc::new("hello");
            // `rc` is used after `.await`. It must be persisted to
            // the task's state.
            println!("{}", rc);

  • 不要在锁住的时候,调用 await
    use std::sync::{Mutex, MutexGuard};
    async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here
  • 这是因为 the std::sync::MutexGuard type is not Send,这就意味着 you can't send a mutex lock to another thread。你能这样
    // This works!
    async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
            let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
            *lock += 1;
        } // lock goes out of scope here
    // 注意现在 rust 编译器还不是识别这种,仍然不能
    async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;


为啥需要 channel

use mini_redis::client;
async fn main() {
    // 创建到服务器的连接
    let mut client = client::connect("").await.unwrap();

    // 生成两个任务,一个用于获取 key, 一个用于设置 key
    let t1 = tokio::spawn(async {
        let res = client.get("hello").await;

    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;

  • 因为两个任务都需要去访问 client,但是 client 并没有实现 Copy 特征,
  • 方法 setget 都使用了 client 的可变引用 &mut self,由此还会造成同时借用两个可变引用的错误。
  • 尝试有如下几个解决办法:
    • std::sync::Mutex 无法被使用,同步锁无法在 .await 调用过程中使用
    • 那么你可能会想,是不是能使用 tokio::sync:Mutex ,答案是能用,但是同时就只能运行一个请求

channel 的种类

  • mpsc: 多生产者,单消费者。多个值都能发送,只能有一个 consumer。
  • oneshot: 单生产者,一个消费者。一旦发送,channel 就会关闭
  • broadcast: 多生产者,多消费值。每个 consumer 都能收到发送的每一个值,类似于 close 的信号,一个使用的场景是用来进行优雅的关闭。
  • watch: 单生产者,多消费者。Many values can be sent, but no history is kept. Receivers only see the most recent value.
  • async_channel:多生产者和多消费者

mpsc 的使用

use tokio::sync::mpsc;
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await;

    tokio::spawn(async move {
        tx2.send("sending from second handle").await;

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);


<pattern> = <async expression> => <handler>,
  • 当 select! 宏展开时,所有的<async expression>都会被汇总并同时执行
  • 当其中一个表达式完成时,结果就会被匹配到<pattern>
  • 如果结果与 pattern 匹配时,那么将删除所有剩余的异步表达式并执行<handler>。
  • <handler>表达式能访问被建立的任何绑定值。
  • 基本上<pattern>就是变量名,异步表达式的结果能绑定到这个变量名上且<handler>能访问这个变量。
  • 如果<pattern>与异步计算的结果不匹配,则其余的异步表达式将继续并发执行直到下一个完成为止
  • 这时,将相同的逻辑用于该结果。
    use tokio::net::TcpStream;
    use tokio::sync::oneshot;
    async fn main() {
        let (tx, rx) = oneshot::channel();
        // 产生一个任务来发送消息到 oneshot 中
        tokio::spawn(async move {
        tokio::select! {
            socket = TcpStream::connect("localhost:3465") => {
                println!("Socket connected {:?}", socket);
            msg = rx => {
                println!("received message first {:?}", msg);


  • Futures 或者其它类型能通过实现Drop去清理后台资源
  • Tokio 的 oneshot::Receiver 通过向 Sender 方发送一个关闭的通知来实现 Drop 功能。
  • Sender 方能接收到这个通知并通过丢弃正在进行的操作来中止它。
    use tokio::sync::oneshot;
    async fn some_operation() -> String {
        // 这里计算值
    async fn main() {
        let (mut tx1, rx1) = oneshot::channel();
        let (tx2, rx2) = oneshot::channel();
        tokio::spawn(async {
            // select 操作和 oneshot 的 `close()` 通知。
            tokio::select! {
                val = some_operation() => {
                    let _ = tx1.send(val);
                _ = tx1.closed() => {
                    // `some_operation()` 被调用,
                    // 任务完成且 `tx1` 被丢弃
        tokio::spawn(async {
            let _ = tx2.send("two");
        tokio::select! {
            val = rx1 => {
                println!("rx1 completed first with {:?}", val);
            val = rx2 => {
                println!("rx2 completed first with {:?}", val);


  • 使用?号操作符从表达式传播错误。
  • 它如何工作是取决于是否?号从异步表达式或处理程序中使用
  • 使用?在异步表达式中能将错误传播到异步表达式之外
  • 这就使异步表达式的输出成一个Result了。
  • 从一个处理程序使用?号能立即传播错误到select!表达式之外。
    use tokio::net::TcpListener;
    use tokio::sync::oneshot;
    use std::io;
    async fn main() -> io::Result<()> {
        // [设置 `rx` oneshot channel]
        let listener = TcpListener::bind("localhost:3465").await?;
        tokio::select! {
            res = async {
      ;         // 帮助 Rust 类型推导
                Ok::<_, io::Error>(())
            } => {
            _ = rx => {
                println!("terminating accept loop");
  • 注意 listener.accept().await?。
  • ?号操作符传播错误到表达式之外且和 res 绑定
  • 如果是一个错误,res 将被设置为 Err(_)。
  • 当然在 handler 内部 ?能再次使用。res? 声明将传播一个错误到 main 函数之外。


async fn computation1() -> String {
    // 计算 1

async fn computation2() -> String {
    // 计算 2

async fn main() {
    let out = tokio::select! {
        res1 = computation1() => res1,
        res2 = computation2() => res2,

    println!("Got = {}", out);
  • 它需要表达式每个分支返回的值类型相同.
  • 如果 select! 表达式的输出不是必须的,推荐将表达式的返回值类型为 ()

模式匹配 (Pattern matching)

  • 前面仅仅对<pattern>使用了变量绑定。
  • 这里能使用任何 Rust 模式。
  • 假设从多个 MPSC 通道接收,可能会执行以下操作:
    use tokio::sync::mpsc;
    async fn main() {
        let (mut tx1, mut rx1) = mpsc::channel(128);
        let (mut tx2, mut rx2) = mpsc::channel(128);
        tokio::spawn(async move {
            // Do something w/ `tx1` and `tx2`
        tokio::select! {
            Some(v) = rx1.recv() => {
                println!("Got {:?} from rx1", v);
            Some(v) = rx2.recv() => {
                println!("Got {:?} from rx2", v);
            else => {
                println!("Both channels closed");
  • select! 表达式等待从 rx1 和 rx2 接收值。
  • 如果一个 channel 关闭了,recv() 返回了 None。
  • 这与模式不匹配且分支会被禁用。select! 表达将继续在其它分支上等待。
  • 注意 select! 表达式包含了一个 else 分支。
  • select! 表达式必须返回一个值
    • 在使用模式匹配时,可能所有的分支都不能匹配上关联的模式。
    • 如果这种情况发生了,那么 else 分支将会被返回。


  • 当产生一个任务时,生成的异步表达式必须要有其所有的数据
  • select!没有这样的限制
    • 每一个分支的数据都能借用数据并同时进行操作。
    • 根据 Rust 的借用规则来看多个异步表达式能,不可变的借用单个数据
    • 或者单个异步表达式能可变的借用数据
      use tokio::io::AsyncWriteExt;
      use tokio::net::TcpStream;
      use std::io;
      use std::net::SocketAddr;
      async fn race(
          data: &[u8],
          addr1: SocketAddr,
          addr2: SocketAddr
      ) -> io::Result<()> {
          tokio::select! {
              Ok(_) = async {
                  let mut socket = TcpStream::connect(addr1).await?;
                  Ok::<_, io::Error>(())
              } => {}
              Ok(_) = async {
                  let mut socket = TcpStream::connect(addr2).await?;
                  Ok::<_, io::Error>(())
              } => {}
              else => {}
  • 这两个异步表达式中都是不可变的借用了data变量。
  • 当其中一个操作成功完成后,另外一个将被丢弃
  • 因为在Ok()上进行了模式匹配,如果一个表达式失败,另外一个将继续执行。
  • 当涉及到每个分支的<handler>时,select!保证只有一个<handler>运行。
  • 根据这一点,每一个<handler>可变的借用同一个数据。
    use tokio::sync::oneshot;
    async fn main() {
        let (tx1, rx1) = oneshot::channel();
        let (tx2, rx2) = oneshot::channel();
        let mut out = String::new();
        tokio::spawn(async move {
            // 在 tx1 和 tx2 上发送值
        tokio::select! {
            _ = rx1 => {
                out.push_str("rx1 completed");
            _ = rx2 => {
                out.push_str("rx2 completed");
        println!("{}", out);


  • 使用 multiple channels:
    use tokio::sync::mpsc;
    async fn main() {
        let (tx1, mut rx1) = mpsc::channel(128);
        let (tx2, mut rx2) = mpsc::channel(128);
        let (tx3, mut rx3) = mpsc::channel(128);
        loop {
            let msg = tokio::select! {
                Some(msg) = rx1.recv() => msg,
                Some(msg) = rx2.recv() => msg,
                Some(msg) = rx3.recv() => msg,
                else => { break }
            println!("Got {}", msg);
        println!("All channels have been closed.");
  • select!宏会随机的选择分支来检查就绪情况
  • 多个通道都有待定的值时,将从其中随机选择一个来接收
  • 为了处理接收循环处理消息的速度慢于将消息推送到通道中的情况,这意味着通道填充数据。
  • 如果select!没有随机的选择首先要检查的分支,
  • 在每次循环迭代中,将首先检查rx1. 如果rx1 始终都有新消息,则永远不会再检查其余的通道了

恢复异步操作 (Resuming an async operation)

async fn action() {
    // 一些异步逻辑

async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

    let operation = action();

    loop {
        tokio::select! {
            _ = &mut operation => break,
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
  • 注意不是在select!宏中调用action(),它在循环外被调用。
  • action()的返回分配给operation,而不需要调用.await。然后在operation上调用tokio::pin!.
  • select!循里面,不是传递operation而是传递&mut operation. operation变量正在跟踪异步操作。
  • 循环中的每一次迭代都使用相同的操作,而不是发出对action()的一次新的调用。
  • 其它的select!分支从通道中接收消息。如果消息是偶数,则循环完成。否则再次开始 select!.
  • 需要注意的是,为了.await一个引用,必须固定引用的值或者实现Unpin.


  • tokio::spawnselect! 都能运行并发异步操作。
  • 但是用于运行并发操作的策略有所不同。
  • tokio::spawn
    • tokio::spawn函数传入一个异步操作并产生一个新的任务去运行它。
    • 任务是一个 tokio 运行时调度的对象。Tokio 独立调度两个不同的任务
    • 它们能在不同的操作系统线程上同时运行。
    • 因此产生的任务与产生的线程都有相同的限制:不可借用
  • select!宏能在同一个任务上同时运行所有分支
    • 因为select!宏上的所有分支被同一个任务执行,它们永远不会同时运行
    • select!宏的多路复用 步操作也在单个任务上运行。

