Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio使用中的注意事项 #53

Open
BruceChen7 opened this issue Mar 13, 2023 · 0 comments
Open

tokio使用中的注意事项 #53

BruceChen7 opened this issue Mar 13, 2023 · 0 comments

Comments

@BruceChen7
Copy link
Owner

BruceChen7 commented Mar 13, 2023

参考资料

(usage:: 什么是 tokio runtime )

  • https://twitter.com/cf_samson/status/1782357728930697691
  • 包含了如下几个部分
    • 一个 I/O 事件循环,称为驱动程序,它驱动 I/O 资源并将 I/O 事件分派给依赖它们的任务。
    • 执行使用这些 I/O 资源的任务的调度程序
    • 用于安排工作在设定的时间段后运行的定时器
    • tokio::main 来在后台创建运行时,理解运行时是有一整套的异步代码,包括executor, waker这些
  • 运行时上下文中,使用 tokio::spawn 函数产生其他任务。
  • 使用此函数产生的future 将在与 Runtime 使用的相同线程池上执行
  • 要运行异步函数,它们必须传递给 tokio::spawn 或者是用 #\[tokio::main\] 注释的主函数。
    • 这将生成的最外层 future,提交给 Tokio 执行者
    • 执行者 (executor) 负责在最外层 future 调用 Future::poll ,驱动异步计算完成
    • 这意味着在 outer future 上尽量不能 block(不能被运行是管理的 block,是不行的),否则执行者将会被阻塞,不能执行更多的 outer future
    • outer future 是指通过调用 tokio::spawn 和 tokio::main 注解的函数
    • 执行器负责调度不同的任务(顶级 future)。
    • 它通过轮询尝试推进任务。同时,它分发一个 Waker,该 Waker 会传递给 Reactor
    • 如果 Future 返回 ready,则已解决,该任务完成。
    • 如果返回 NotReady,则该未来被放入待处理队列
    • 一个任务被视为一个由 Future 组成的树
    • 由 async/await 创建的 Future 不是叶子 Future。
    • 代表实际上需要等待的操作的 Future 是叶子 Future。
    • 叶子 Future 在尝试从 TcpStream 读取时,会将 Waker 传递给反应器。
    • 如果读取操作返回了 WouldBlock,意味着目前还没有数据,它将把这个结果返回到执行器的 future 链上。
    • 然后执行器会将这个任务放入待处理队列
    • Reactor 等待一个由操作系统支持的事件队列,并在它监控的 TcpStreams 中的一个准备好读取数据时收到通知。
    • 当数据准备好时,它会调用相应 Waker 的唤醒方法。
    • 唤醒者(Waker)是由执行者(Executor)传递出来的,所以当数据准备就绪时,它知道如何将相应的任务从待处理队列移动到就绪队列
    • 然后唤醒执行器。它检查就绪队列并在那里找到执行的任务。
    • 它轮询该任务,然后循环继续,直到该任务中的整个 Future Tree 都得到解决并且该任务完成。

tokio::runtime::Builder

  • pub fn new_current_thread() -> Builder
  • 返回一个新的构建器,其中当前线程调度程序被选中
  • 要在生成的运行时上产生非发送任务,请将其与 LocalSet 相结合。

tokio::task::LocalSet

  • 一些场景下,要运行一个或多个未实现Send的 future,因此无法在线程之间安全地 Send
  • 使用 localSet 来安排一个或多个!Send future 在同一个线程上一起运行。
  • 下面代码不会编译
    use std::rc::Rc;
    #[tokio::main]
    async fn main() {
        // `Rc` does not implement `Send`, and thus may not be sent between
        // threads safely.
        let nonsend_data = Rc::new("my nonsend data...");
    
        let nonsend_data = nonsend_data.clone();
        // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
        // Since `tokio::spawn` requires the spawned future to implement `Send`, this
        // will not compile.
        tokio::spawn(async move {
            println!("{}", nonsend_data);
            // ...
        }).await.unwrap();
    }
  • 使用 LocalSet 来生成!Send future 并安排它们在调用 Runtime::block_on 的线程上。
  • 在本地任务集内运行时,能使用 task::spawn_local 来生成 !Send future。
  • run_until 方法只能在 \#\[tokio::main\] 、 \#\[tokio::test\] 或直接在对 Runtime::block_on 的调用中使用。
  • 不能在使用 tokio::spawn 生成的任务中使用。
  • LocalSet 本身实现了 Future,用于在 LocalSet 上运行多个 futures 并驱动整个集合直到它们完成
    use std::rc::Rc;
    use tokio::task;
    
    #[tokio::main]
    async fn main() {
        let nonsend_data = Rc::new("my nonsend data...");
    
        // Construct a local task set that can run `!Send` futures.
        let local = task::LocalSet::new();
    
        // Run the local task set.
        local.run_until(async move {
            let nonsend_data = nonsend_data.clone();
            // `spawn_local` ensures that the future is spawned on the local
            // task set.
            task::spawn_local(async move {
                println!("{}", nonsend_data);
                // ...
            }).await.unwrap();
        }).await;
    }
    
    // 本身是一个 future
    use tokio::{task, time};
    use std::rc::Rc;
    #[tokio::main]
    async fn main() {
        let nonsend_data = Rc::new("world");
        let local = task::LocalSet::new();
    
        let nonsend_data2 = nonsend_data.clone();
        local.spawn_local(async move {
            // ...
            println!("hello {}", nonsend_data2)
        });
    
        local.spawn_local(async move {
            time::sleep(time::Duration::from_millis(100)).await;
            println!("goodbye {}", nonsend_data)
        });
    
        // ...
    
        local.await;
    }

什么是阻塞

  • 异步是使用一种称为协作调度的机制实现的
  • 异步代码不能在到达.await 的情况前花费很长时间
  • 阻塞线程 (blocking the thread) 意味着阻止运行时切换当前任务
  • 它阻塞了线程。在这种情况下,没有其他任务,所以这不是问题,但在实际程序中会有其他的任务
    use std::time::Duration;
    #[tokio::main]
    async fn main() {
        println!("Hello World!");
        // No .await here!
        std::thread::sleep(Duration::from_secs(5));
        println!("Five seconds later...");
    }
    
    use std::time::Duration;
    
    async fn sleep_then_print(timer: i32) {
        println!("Start timer {}.", timer);
    
        // No .await here!
        std::thread::sleep(Duration::from_secs(1));
    
        println!("Timer {} done.", timer);
    }
    
    #[tokio::main]
    async fn main() {
        // The join! macro lets you run multiple things concurrently.
        tokio::join!(
            sleep_then_print(1),
            sleep_then_print(2),
            sleep_then_print(3),
        );
    }
  • 将花费三秒钟的时间运行,并且计时器将一个接一个地运行,没有任何并发性,原因是这里不是协作式的放弃执行
  • tokio 运行时无法将一个任务交换为另一个任务,因为这样的交换只能发生在 .await 处

如果我想 block,该怎么办

  • 什么样的情况,会出现 block
    • 同步 IO
    • 时间比较长的 CPU 运算
  • 这两种情况,都会导致在较长时间才能到达.await 操作,必须将这种阻塞操作移动到 tokio executor 线程池之外的线程
  • 使用 tokio::task::spawn_blocking 函数
    • 运行时包含一个单独的线程池,专门用于运行阻塞函数,你能使用 spawn_blocking 在其上运行任务。
    • 这个线程池的上限是大约 500 个线程,因此能在这个线程池上进行大量阻塞操作
  • 使用 rayon crate
  • 使用 std::thread::spawn 生成专用线程。

(usage:: 在同步代码中使用异步函数)

  • https://greptime.com/blogs/2023-03-09-bridging-async-and-sync-rust
  • 某些场景下,trait 的签名是同步的
  • generate 是一个同步方法,.await 里面不能直接使用
    trait Sequencer {
        fn generate(&self) -> Vec<i32>;
    }
    
    impl PlainSequencer {
        async fn generate_async(&self)->Vec<i32>{
            let mut res = vec![];
            for i in 0..self.bound {
                res.push(i);
                tokio::time::sleep(Duration::from_millis(100)).await;
            }
            res
        }
    }
    
    impl Sequencer for PlainSequencer {
        fn generate(&self) -> Vec<i32> {
            self.generate_async().await
        }
    }
  • 注意这里的 generate 是一个同步的签名,却调用异步的实现

尝试一

impl Sequencer for PlainSequencer {
    fn generate(&self) -> Vec<i32> {
        RUNTIME.block_on(async{
            self.generate_async().await
        })
    }
}

#[cfg(test)]
mod tests{
    #[tokio::test]
    async fn test_sync_method() {
        let sequencer = PlainSequencer {
            bound: 3
        };
        let vec = sequencer.generate();
        println!("vec: {:?}", vec);
    }
}
  • Runtime::block_on 方法,它会阻塞当前线程,直到 future 完成。
  • 该错误说明不允许从*当前正在执行的运行时中启动另一个运行时
    Cannot start a runtime from within a runtime.
    This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
    thread 'tests::test_sync_method' panicked at 'Cannot start a runtime from within a runtime.
    /Users/lei/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/enter.rs:39:9

尝试二

impl Sequencer for PlainSequencer {
    fn generate(&self) -> Vec<i32> {
        futures::executor::block_on(async {
            self.generate_async().await
        })
    }
}
  • 编译成功,但是代码只是挂了,运行时没有返回
  • generate_async 方法中只有一个简单的 sleep 调用,但是为什么 future 永远不会完成呢?
  • 对于相同的代码片段,它在 tokio::test 中挂起,而在 tokio::main 中正常完成。
  • async 的基本原理
    // pseudo-rust code
    match ::std::future::IntoFuture::into_future(<expr>) {
        mut __awaitee => loop {
            match unsafe { ::std::future::Future::poll(
                <::std::pin::Pin>::new_unchecked(&mut __awaitee),
                ::std::future::get_context(task_context),
            ) } {
                ::std::task::Poll::Ready(result) => break result,
                ::std::task::Poll::Pending => {}
            }
            task_context = yield ();
        }
    }
  • 调用 generate 函数的一定是Tokio 的 executor
  • 那么谁负责在 block_on 函数内部轮询 future 呢?
  • 一开始是以为是会在内部运行时轮询 generate_async
  • 实际运行在 futures_executor::local_pool::run_executor
    fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
        let _enter = enter().expect(
            "cannot execute `LocalPool` executor from within \
             another executor",
        );
    
        CURRENT_THREAD_NOTIFY.with(|thread_notify| {
            let waker = waker_ref(thread_notify);
            let mut cx = Context::from_waker(&waker);
            loop {
                if let Poll::Ready(t) = f(&mut cx) {
                    return t;
                }
    
                // Wait for a wakeup.
                while !thread_notify.unparked.swap(false, Ordering::Acquire) {
                    // No wakeup occurred. It may occur now, right before parking,
                    // but in that case the token made available by `unpark()`
                    // is guaranteed to still be available and `park()` is a no-op.
                    thread::park();
                }
            }
        })
    }
  • 尽管此方法名为 run_executor,但其中似乎没有任何 spawn 调用
  • 相反,它会在当前线程中不断循环,检查用户提交的 future 是否准备就绪
  • 当 Tokio运行时线程到达这一点时,它会立即 yield 自己,直到用户的 Future 准备就绪并将其取消停放
  • 另一方面,用户的 future 是同一个线程执行的,这个线程还在 parking,那么就造成了死锁
  • 由于不能在当前运行时线程中阻塞,所以使用另一个运行时来阻塞结果

尝试三

impl Sequencer for PlainSequencer {
    fn generate(&self) -> Vec<i32> {
        let bound = self.bound;
        futures::executor::block_on(async move {
            RUNTIME.spawn(async move {
                let mut res = vec![];
                for i in 0..bound {
                    res.push(i);
                    tokio::time::sleep(Duration::from_millis(100)).await;
                }
                res
            }).await.unwrap()
        })
    }
}
  • 在 futures::executor::block_on 中,额外的 RUNTIME 用于生成异步代码。
  • 异步任务需要一个 exectutor 来驱动它的状态变化
  • 如果删除 RUNTIME ,而不是为 futures::executor::block_on 生成一个新线程,死锁问题就解决了,
  • 但是 tokio::time::sleep 方法调用将抱怨“没有 reactor 正在运行”,因为 Tokio 功能需要运行时

tokio::main 和 tokio::test 区别

  • tokio::main 没有挂起而 tokio::test 挂起,它们使用不同的运行时。
  • 具体来说, tokio::main 在多线程运行时运行,而 tokio::test 在单线程运行时运行。
  • 在单线程运行时,当前线程被 futures::executor::block_on 阻塞时,用户提交的异步代码无法执行,导致前面提到的死锁。

一些结论

  • 异步代码与可能导致阻塞的同步代码结合起来绝非明智之举。
  • 从同步上下文调用异步代码时,请使用 futures::executor::block_on 并将异步代码生成到专用运行时,因为前者会阻塞当前线程
  • 如果必须从异步上下文调用阻塞同步代码,建议使用 tokio::task::spawn_blocking 在处理阻塞操作的专用执行器上执行代码

常见的功能

// single-threaded executor for your async tasks
#[tokio::main(flavor = "current_thread")]
pub async fn run(self) -> Result<()> {
    Builder::new()
        .filter_level(log::LevelFilter::Off)
    ...
}

scoped_thread_local!

use scoped_thread_local::scoped_thread_local;
scoped_thread_local!(static COUNTER: u32);

fn main() {
    let threads = (0..4).map(|_| {
        std::thread::spawn(|| {
            COUNTER.with(|counter| {
                for i in 0..10 {
                    let value = counter.get().unwrap_or(&0) + 1;
                    counter.set(value);
                    println!("Thread {}: counter = {}", std::thread::current().id(), value);
                }
            });
        })
    }).collect::<Vec<_>>();

    for thread in threads {
        thread.join().unwrap();
    }
}
  • 使用 scoped_tls crate 用来定义 thread-local storage
  • 输出
    Thread ThreadId(1): counter = 1
    Thread ThreadId(2): counter = 1
    Thread ThreadId(3): counter = 1
    Thread ThreadId(4): counter = 1
    Thread ThreadId(3): counter = 2
    Thread ThreadId(1): counter = 2
    Thread ThreadId(2): counter = 2
    Thread ThreadId(4): counter = 2
    Thread ThreadId(1): counter = 3
    Thread ThreadId(2): counter = 3
    Thread ThreadId(3): counter = 3
    Thread ThreadId(4): counter = 3
    Thread ThreadId(3): counter = 4
    Thread ThreadId(1): counter = 4
    Thread ThreadId(2): counter = 4
    Thread ThreadId(4): counter = 4
    Thread ThreadId(3): counter = 5
    

并发编程(类似 goroutine)

  • spawn 用来产生一个 green thread,也能拿到该 thread 执行的结果。
    use tokio::net::TcpListener;
    #[tokio::main]
    async fn main() {
        let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    
        loop {
            let (socket, _) = listener.accept().await.unwrap();
            // A new task is spawned for each inbound socket. The socket is
            // moved to the new task and processed there.
            tokio::spawn(async move {
                process(socket).await;
            });
        }
    }
    
    #[tokio::main]
    async fn main() {
        // 能拿到执行结果
        let handle = tokio::spawn(async {
            // Do some async work
            "return value"
        });
    
        // Do some other work
        // 返回一个 Result<T, Error>
        let out = handle.await.unwrap();
        println!("GOT {}", out);
    }

'static 约束

  • 基于 tokio 产生一个 task 时,其类型的 lifetime 必须是**'static 的**,
  • task 中的代码,是不能够引用外部拥有的数据。
  • 注意'static 并不意味着生命周期一直到程序终结,能见rust 中的 lifetime
    use tokio::task;
    #[tokio::main]
    async fn main() {
        let v = vec![1, 2, 3];
    
        task::spawn(async {
            println!("Here's a vec: {:?}", v);
        });
    }
  • 产生的结果是
    error[E0373]: async block may outlive the current function, but
                  it borrows `v`, which is owned by the current function
     --> src/main.rs:7:23
      |
    7 |       task::spawn(async {
      |  _______________________^
    8 | |         println!("Here's a vec: {:?}", v);
      | |                                        - `v` is borrowed here
    9 | |     });
      | |_____^ may outlive borrowed value `v`
      |
    note: function requires argument type to outlive `'static`
     --> src/main.rs:7:17
      |
    7 |       task::spawn(async {
      |  _________________^
    8 | |         println!("Here's a vector: {:?}", v);
    9 | |     });
      | |_____^
    help: to force the async block to take ownership of `v` (and any other
          referenced variables), use the `move` keyword
      |
    7 |     task::spawn(async move {
    8 |         println!("Here's a vec: {:?}", v);
    9 |     });
      |
    
  • v 仍然被 task 之外拥有,println! 只是借用。能通过 async mov 的方式来使所有权变更
    task::spawn(async move {
        println!("Here's a vec: {:?}", v);
    });

send

  • tokio::spawn 产生的任务一定要实现 Send
    • 这样 task 才能够在多个线程调度。需要task 所持有的的数据是 owned
    • 当所有在.await 调用中持有的数据被 Send,任务就能被发送
    • 当.await 被调用时,任务就回到了调度器中。下一次任务被执行时,它将从最后的上次 yield 点恢复。
    • 为了使其正常工作,所有在.await 之后使用的状态都必须由任务保存
    • 如果这个状态是 Send,即能跨线程移动,那么任务本身也能跨线程移动。反之,如果状态不是 Send,那么任务也不是。

概念解释

use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // The scope forces `rc` to drop before `.await`.
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` is no longer used. It is **not** persisted when
        // the task yields to the scheduler
        yield_now().await;
    });
}
  • 下面则不行
    use tokio::task::yield_now;
    use std::rc::Rc;
    #[tokio::main]
    async fn main() {
        tokio::spawn(async {
            let rc = Rc::new("hello");
    
            // `rc` is used after `.await`. It must be persisted to
            // the task's state.
            yield_now().await;
    
            println!("{}", rc);
        });
    }

  • 不要在锁住的时候,调用 await
    use std::sync::{Mutex, MutexGuard};
    
    async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    
        do_something_async().await;
    } // lock goes out of scope here
  • 这是因为 the std::sync::MutexGuard type is not Send,这就意味着 you can't send a mutex lock to another thread。你能这样
    // This works!
    async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
        {
            let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
            *lock += 1;
        } // lock goes out of scope here
    
        do_something_async().await;
    }
    
    // 注意现在 rust 编译器还不是识别这种,仍然不能
    async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
        drop(lock);
    
        do_something_async().await;
    }

channel

为啥需要 channel

use mini_redis::client;
#[tokio::main]
async fn main() {
    // 创建到服务器的连接
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // 生成两个任务,一个用于获取 key, 一个用于设置 key
    let t1 = tokio::spawn(async {
        let res = client.get("hello").await;
    });

    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;
    });

    t1.await.unwrap();
    t2.await.unwrap();
}
  • 因为两个任务都需要去访问 client,但是 client 并没有实现 Copy 特征,
  • 方法 setget 都使用了 client 的可变引用 &mut self,由此还会造成同时借用两个可变引用的错误。
  • 尝试有如下几个解决办法:
    • std::sync::Mutex 无法被使用,同步锁无法在 .await 调用过程中使用
    • 那么你可能会想,是不是能使用 tokio::sync:Mutex ,答案是能用,但是同时就只能运行一个请求

channel 的种类

  • mpsc: 多生产者,单消费者。多个值都能发送,只能有一个 consumer。
  • oneshot: 单生产者,一个消费者。一旦发送,channel 就会关闭
  • broadcast: 多生产者,多消费值。每个 consumer 都能收到发送的每一个值,类似于 close 的信号,一个使用的场景是用来进行优雅的关闭。
  • watch: 单生产者,多消费者。Many values can be sent, but no history is kept. Receivers only see the most recent value.
  • async_channel:多生产者和多消费者

mpsc 的使用

use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await;
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await;
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

select

<pattern> = <async expression> => <handler>,
  • 当 select! 宏展开时,所有的<async expression>都会被汇总并同时执行
  • 当其中一个表达式完成时,结果就会被匹配到<pattern>
  • 如果结果与 pattern 匹配时,那么将删除所有剩余的异步表达式并执行<handler>。
  • <handler>表达式能访问被建立的任何绑定值。
  • 基本上<pattern>就是变量名,异步表达式的结果能绑定到这个变量名上且<handler>能访问这个变量。
  • 如果<pattern>与异步计算的结果不匹配,则其余的异步表达式将继续并发执行直到下一个完成为止
  • 这时,将相同的逻辑用于该结果。
    use tokio::net::TcpStream;
    use tokio::sync::oneshot;
    #[tokio::main]
    async fn main() {
        let (tx, rx) = oneshot::channel();
    
        // 产生一个任务来发送消息到 oneshot 中
        tokio::spawn(async move {
            tx.send("done").unwrap();
        });
    
        tokio::select! {
            socket = TcpStream::connect("localhost:3465") => {
                println!("Socket connected {:?}", socket);
            }
            msg = rx => {
                println!("received message first {:?}", msg);
            }
        }
    }

取消

  • Futures 或者其它类型能通过实现Drop去清理后台资源
  • Tokio 的 oneshot::Receiver 通过向 Sender 方发送一个关闭的通知来实现 Drop 功能。
  • Sender 方能接收到这个通知并通过丢弃正在进行的操作来中止它。
    use tokio::sync::oneshot;
    
    async fn some_operation() -> String {
        // 这里计算值
    }
    #[tokio::main]
    async fn main() {
        let (mut tx1, rx1) = oneshot::channel();
        let (tx2, rx2) = oneshot::channel();
    
        tokio::spawn(async {
            // select 操作和 oneshot 的 `close()` 通知。
            tokio::select! {
                val = some_operation() => {
                    let _ = tx1.send(val);
                }
                _ = tx1.closed() => {
                    // `some_operation()` 被调用,
                    // 任务完成且 `tx1` 被丢弃
                }
            }
        });
    
        tokio::spawn(async {
            let _ = tx2.send("two");
        });
    
        tokio::select! {
            val = rx1 => {
                println!("rx1 completed first with {:?}", val);
            }
            val = rx2 => {
                println!("rx2 completed first with {:?}", val);
            }
        }
    }

错误

  • 使用?号操作符从表达式传播错误。
  • 它如何工作是取决于是否?号从异步表达式或处理程序中使用
  • 使用?在异步表达式中能将错误传播到异步表达式之外
  • 这就使异步表达式的输出成一个Result了。
  • 从一个处理程序使用?号能立即传播错误到select!表达式之外。
    use tokio::net::TcpListener;
    use tokio::sync::oneshot;
    use std::io;
    #[tokio::main]
    async fn main() -> io::Result<()> {
        // [设置 `rx` oneshot channel]
        let listener = TcpListener::bind("localhost:3465").await?;
    
        tokio::select! {
            res = async {
      ;         // 帮助 Rust 类型推导
                Ok::<_, io::Error>(())
            } => {
                res?;
            }
            _ = rx => {
                println!("terminating accept loop");
            }
        }
    
        Ok(())
    }
  • 注意 listener.accept().await?。
  • ?号操作符传播错误到表达式之外且和 res 绑定
  • 如果是一个错误,res 将被设置为 Err(_)。
  • 当然在 handler 内部 ?能再次使用。res? 声明将传播一个错误到 main 函数之外。

返回值

async fn computation1() -> String {
    // 计算 1
}

async fn computation2() -> String {
    // 计算 2
}

#[tokio::main]
async fn main() {
    let out = tokio::select! {
        res1 = computation1() => res1,
        res2 = computation2() => res2,
    };

    println!("Got = {}", out);
}
  • 它需要表达式每个分支返回的值类型相同.
  • 如果 select! 表达式的输出不是必须的,推荐将表达式的返回值类型为 ()

模式匹配 (Pattern matching)

  • 前面仅仅对<pattern>使用了变量绑定。
  • 这里能使用任何 Rust 模式。
  • 假设从多个 MPSC 通道接收,可能会执行以下操作:
    use tokio::sync::mpsc;
    #[tokio::main]
    async fn main() {
        let (mut tx1, mut rx1) = mpsc::channel(128);
        let (mut tx2, mut rx2) = mpsc::channel(128);
    
        tokio::spawn(async move {
            // Do something w/ `tx1` and `tx2`
        });
    
        tokio::select! {
            Some(v) = rx1.recv() => {
                println!("Got {:?} from rx1", v);
            }
            Some(v) = rx2.recv() => {
                println!("Got {:?} from rx2", v);
            }
            else => {
                println!("Both channels closed");
            }
        }
    }
  • select! 表达式等待从 rx1 和 rx2 接收值。
  • 如果一个 channel 关闭了,recv() 返回了 None。
  • 这与模式不匹配且分支会被禁用。select! 表达将继续在其它分支上等待。
  • 注意 select! 表达式包含了一个 else 分支。
  • select! 表达式必须返回一个值
    • 在使用模式匹配时,可能所有的分支都不能匹配上关联的模式。
    • 如果这种情况发生了,那么 else 分支将会被返回。

借用

  • 当产生一个任务时,生成的异步表达式必须要有其所有的数据
  • select!没有这样的限制
    • 每一个分支的数据都能借用数据并同时进行操作。
    • 根据 Rust 的借用规则来看多个异步表达式能,不可变的借用单个数据
    • 或者单个异步表达式能可变的借用数据
      use tokio::io::AsyncWriteExt;
      use tokio::net::TcpStream;
      use std::io;
      use std::net::SocketAddr;
      
      async fn race(
          data: &[u8],
          addr1: SocketAddr,
          addr2: SocketAddr
      ) -> io::Result<()> {
          tokio::select! {
              Ok(_) = async {
                  let mut socket = TcpStream::connect(addr1).await?;
                  socket.write_all(data).await?;
                  Ok::<_, io::Error>(())
              } => {}
              Ok(_) = async {
                  let mut socket = TcpStream::connect(addr2).await?;
                  socket.write_all(data).await?;
                  Ok::<_, io::Error>(())
              } => {}
              else => {}
          };
      
          Ok(())
      }
  • 这两个异步表达式中都是不可变的借用了data变量。
  • 当其中一个操作成功完成后,另外一个将被丢弃
  • 因为在Ok()上进行了模式匹配,如果一个表达式失败,另外一个将继续执行。
  • 当涉及到每个分支的<handler>时,select!保证只有一个<handler>运行。
  • 根据这一点,每一个<handler>可变的借用同一个数据。
    use tokio::sync::oneshot;
    #[tokio::main]
    async fn main() {
        let (tx1, rx1) = oneshot::channel();
        let (tx2, rx2) = oneshot::channel();
    
        let mut out = String::new();
    
        tokio::spawn(async move {
            // 在 tx1 和 tx2 上发送值
        });
    
        tokio::select! {
            _ = rx1 => {
                out.push_str("rx1 completed");
            }
            _ = rx2 => {
                out.push_str("rx2 completed");
            }
        }
    
        println!("{}", out);
    }

循环

  • 使用 multiple channels:
    use tokio::sync::mpsc;
    #[tokio::main]
    async fn main() {
        let (tx1, mut rx1) = mpsc::channel(128);
        let (tx2, mut rx2) = mpsc::channel(128);
        let (tx3, mut rx3) = mpsc::channel(128);
    
        loop {
            let msg = tokio::select! {
                Some(msg) = rx1.recv() => msg,
                Some(msg) = rx2.recv() => msg,
                Some(msg) = rx3.recv() => msg,
                else => { break }
            };
    
            println!("Got {}", msg);
        }
        println!("All channels have been closed.");
    }
  • select!宏会随机的选择分支来检查就绪情况
  • 多个通道都有待定的值时,将从其中随机选择一个来接收
  • 为了处理接收循环处理消息的速度慢于将消息推送到通道中的情况,这意味着通道填充数据。
  • 如果select!没有随机的选择首先要检查的分支,
  • 在每次循环迭代中,将首先检查rx1. 如果rx1 始终都有新消息,则永远不会再检查其余的通道了

恢复异步操作 (Resuming an async operation)

async fn action() {
    // 一些异步逻辑
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

    let operation = action();
    tokio::pin!(operation);

    loop {
        tokio::select! {
            _ = &mut operation => break,
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}
  • 注意不是在select!宏中调用action(),它在循环外被调用。
  • action()的返回分配给operation,而不需要调用.await。然后在operation上调用tokio::pin!.
  • select!循里面,不是传递operation而是传递&mut operation. operation变量正在跟踪异步操作。
  • 循环中的每一次迭代都使用相同的操作,而不是发出对action()的一次新的调用。
  • 其它的select!分支从通道中接收消息。如果消息是偶数,则循环完成。否则再次开始 select!.
  • 需要注意的是,为了.await一个引用,必须固定引用的值或者实现Unpin.

每个任务的并发

  • tokio::spawnselect! 都能运行并发异步操作。
  • 但是用于运行并发操作的策略有所不同。
  • tokio::spawn
    • tokio::spawn函数传入一个异步操作并产生一个新的任务去运行它。
    • 任务是一个 tokio 运行时调度的对象。Tokio 独立调度两个不同的任务
    • 它们能在不同的操作系统线程上同时运行。
    • 因此产生的任务与产生的线程都有相同的限制:不可借用
  • select!宏能在同一个任务上同时运行所有分支
    • 因为select!宏上的所有分支被同一个任务执行,它们永远不会同时运行
    • select!宏的多路复用 步操作也在单个任务上运行。

#type/rust #type/networking #public

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant