You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
use std::rc::Rc;#[tokio::main]asyncfnmain(){// `Rc` does not implement `Send`, and thus may not be sent between// threads safely.let nonsend_data = Rc::new("my nonsend data...");let nonsend_data = nonsend_data.clone();// Because the `async` block here moves `nonsend_data`, the future is `!Send`.// Since `tokio::spawn` requires the spawned future to implement `Send`, this// will not compile.
tokio::spawn(asyncmove{println!("{}", nonsend_data);// ...}).await.unwrap();}
use std::rc::Rc;use tokio::task;#[tokio::main]asyncfnmain(){let nonsend_data = Rc::new("my nonsend data...");// Construct a local task set that can run `!Send` futures.let local = task::LocalSet::new();// Run the local task set.
local.run_until(asyncmove{let nonsend_data = nonsend_data.clone();// `spawn_local` ensures that the future is spawned on the local// task set.
task::spawn_local(asyncmove{println!("{}", nonsend_data);// ...}).await.unwrap();}).await;}// 本身是一个 futureuse tokio::{task, time};use std::rc::Rc;#[tokio::main]asyncfnmain(){let nonsend_data = Rc::new("world");let local = task::LocalSet::new();let nonsend_data2 = nonsend_data.clone();
local.spawn_local(asyncmove{// ...println!("hello {}", nonsend_data2)});
local.spawn_local(asyncmove{
time::sleep(time::Duration::from_millis(100)).await;println!("goodbye {}", nonsend_data)});// ...
local.await;}
什么是阻塞
异步是使用一种称为协作调度的机制实现的
异步代码不能在到达.await 的情况前花费很长时间
阻塞线程 (blocking the thread) 意味着阻止运行时切换当前任务
它阻塞了线程。在这种情况下,没有其他任务,所以这不是问题,但在实际程序中会有其他的任务
use std::time::Duration;#[tokio::main]asyncfnmain(){println!("Hello World!");// No .await here!
std::thread::sleep(Duration::from_secs(5));println!("Five seconds later...");}use std::time::Duration;asyncfnsleep_then_print(timer:i32){println!("Start timer {}.", timer);// No .await here!
std::thread::sleep(Duration::from_secs(1));println!("Timer {} done.", timer);}#[tokio::main]asyncfnmain(){// The join! macro lets you run multiple things concurrently.
tokio::join!(
sleep_then_print(1),
sleep_then_print(2),
sleep_then_print(3),);}
traitSequencer{fngenerate(&self) -> Vec<i32>;}implPlainSequencer{asyncfngenerate_async(&self)->Vec<i32>{letmut res = vec![];for i in0..self.bound{
res.push(i);
tokio::time::sleep(Duration::from_millis(100)).await;}
res
}}implSequencerforPlainSequencer{fngenerate(&self) -> Vec<i32>{self.generate_async().await}}
Cannot start a runtime from within a runtime.This happens because a function(like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.thread'tests::test_sync_method' panicked at 'Cannot start a runtime from within a runtime.
/Users/lei/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/enter.rs:39:9
fnrun_executor<T,F:FnMut(&mutContext<'_>) -> Poll<T>>(mutf:F) -> T{let _enter = enter().expect("cannot execute `LocalPool` executor from within \ another executor",);CURRENT_THREAD_NOTIFY.with(|thread_notify| {let waker = waker_ref(thread_notify);letmut cx = Context::from_waker(&waker);loop{ifletPoll::Ready(t) = f(&mut cx){return t;}// Wait for a wakeup.while !thread_notify.unparked.swap(false,Ordering::Acquire){// No wakeup occurred. It may occur now, right before parking,// but in that case the token made available by `unpark()`// is guaranteed to still be available and `park()` is a no-op.
thread::park();}}})}
use tokio::net::TcpListener;#[tokio::main]asyncfnmain(){let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();loop{let(socket, _) = listener.accept().await.unwrap();// A new task is spawned for each inbound socket. The socket is// moved to the new task and processed there.
tokio::spawn(asyncmove{process(socket).await;});}}#[tokio::main]asyncfnmain(){// 能拿到执行结果let handle = tokio::spawn(async{// Do some async work"return value"});// Do some other work// 返回一个 Result<T, Error>let out = handle.await.unwrap();println!("GOT {}", out);}
'static 约束
基于 tokio 产生一个 task 时,其类型的 lifetime 必须是**'static 的**,
use tokio::task;#[tokio::main]asyncfnmain(){let v = vec![1,2,3];
task::spawn(async{println!("Here's a vec: {:?}", v);});}
产生的结果是
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
v 仍然被 task 之外拥有,println! 只是借用。能通过 async mov 的方式来使所有权变更
task::spawn(asyncmove{println!("Here's a vec: {:?}", v);});
use tokio::task::yield_now;use std::rc::Rc;#[tokio::main]asyncfnmain(){
tokio::spawn(async{// The scope forces `rc` to drop before `.await`.{let rc = Rc::new("hello");println!("{}", rc);}// `rc` is no longer used. It is **not** persisted when// the task yields to the scheduleryield_now().await;});}
下面则不行
use tokio::task::yield_now;use std::rc::Rc;#[tokio::main]asyncfnmain(){
tokio::spawn(async{let rc = Rc::new("hello");// `rc` is used after `.await`. It must be persisted to// the task's state.yield_now().await;println!("{}", rc);});}
锁
不要在锁住的时候,调用 await
use std::sync::{Mutex,MutexGuard};asyncfnincrement_and_do_stuff(mutex:&Mutex<i32>){letmut lock:MutexGuard<i32> = mutex.lock().unwrap();*lock += 1;do_something_async().await;}// lock goes out of scope here
这是因为 the std::sync::MutexGuard type is not Send,这就意味着 you can't send a mutex lock to another thread。你能这样
// This works!asyncfnincrement_and_do_stuff(mutex:&Mutex<i32>){{letmut lock:MutexGuard<i32> = mutex.lock().unwrap();*lock += 1;}// lock goes out of scope heredo_something_async().await;}// 注意现在 rust 编译器还不是识别这种,仍然不能asyncfnincrement_and_do_stuff(mutex:&Mutex<i32>){letmut lock:MutexGuard<i32> = mutex.lock().unwrap();*lock += 1;drop(lock);do_something_async().await;}
broadcast: 多生产者,多消费值。每个 consumer 都能收到发送的每一个值,类似于 close 的信号,一个使用的场景是用来进行优雅的关闭。
watch: 单生产者,多消费者。Many values can be sent, but no history is kept. Receivers only see the most recent value.
async_channel:多生产者和多消费者
mpsc 的使用
use tokio::sync::mpsc;#[tokio::main]asyncfnmain(){let(tx,mut rx) = mpsc::channel(32);let tx2 = tx.clone();
tokio::spawn(asyncmove{
tx.send("sending from first handle").await;});
tokio::spawn(asyncmove{
tx2.send("sending from second handle").await;});whileletSome(message) = rx.recv().await{println!("GOT = {}", message);}}
参考资料
(usage:: 什么是 tokio runtime )
tokio::main
来在后台创建运行时
,理解运行时是有一整套的异步代码,包括executor
,waker
这些运行时上下文中
,使用 tokio::spawn 函数产生其他任务。tokio::spawn
或者是用#\[tokio::main\]
注释的主函数。Future::poll
,驱动异步计算完成待处理队列
。从待处理队列移动到就绪队列
tokio::runtime::Builder
tokio::task::LocalSet
Send
的 future,因此无法在线程之间安全地 Send。!Send
future 并安排它们在调用 Runtime::block_on 的线程上。run_until
方法只能在\#\[tokio::main\]
、\#\[tokio::test\]
或直接在对Runtime::block_on
的调用中使用。tokio::spawn
生成的任务中使用。LocalSet
本身实现了Future
,用于在LocalSet
上运行多个 futures 并驱动整个集合直到它们完成什么是阻塞
.await
处如果我想 block,该怎么办
tokio::task::spawn_blocking
函数rayon
cratestd::thread::spawn
生成专用线程。(usage:: 在同步代码中使用异步函数)
尝试一
Runtime::block_on
方法,它会阻塞当前线程,直到 future 完成。尝试二
tokio::test
中挂起,而在tokio::main
中正常完成。generate
函数的一定是Tokio 的 executor。尝试三
futures::executor::block_on
中,额外的RUNTIME
用于生成异步代码。RUNTIME
,而不是为futures::executor::block_on
生成一个新线程,死锁问题就解决了,tokio::time::sleep
方法调用将抱怨“没有 reactor 正在运行”,因为 Tokio 功能需要运行时tokio::main 和 tokio::test 区别
tokio::main
没有挂起而tokio::test
挂起,它们使用不同的运行时。tokio::main
在多线程运行时运行,而tokio::test
在单线程运行时运行。futures::executor::block_on
阻塞时,用户提交的异步代码无法执行,导致前面提到的死锁。一些结论
常见的功能
scoped_thread_local!
并发编程(类似 goroutine)
'static 约束
send
概念解释
锁
channel
为啥需要 channel
client
,但是client
并没有实现Copy
特征,set
和get
都使用了client
的可变引用&mut self
,由此还会造成同时借用两个可变引用的错误。std::sync::Mutex
无法被使用,同步锁无法在.await
调用过程中使用tokio::sync:Mutex
,答案是能用,但是同时就只能运行一个请求channel 的种类
mpsc 的使用
select
<async expression>
都会被汇总并同时执行。<pattern>
。<pattern>
就是变量名,异步表达式的结果能绑定到这个变量名上且<handler>能访问这个变量。<pattern>
与异步计算的结果不匹配,则其余的异步表达式将继续并发执行直到下一个完成为止。取消
Drop去清理后台资源
。错误
?
号操作符从表达式传播错误。?
号从异步表达式或处理程序中使用。?
在异步表达式中能将错误传播到异步表达式之外。Result
了。?
号能立即传播错误到select!
表达式之外。返回值
模式匹配 (Pattern matching)
<pattern>
使用了变量绑定。借用
select!
宏没有这样的限制。data
变量。Ok()
上进行了模式匹配,如果一个表达式失败,另外一个将继续执行。<handler>
时,select!
保证只有一个<handler>
运行。<handler>
能可变的借用同一个数据。循环
select!
宏会随机的选择分支来检查就绪情况。select!
没有随机的选择首先要检查的分支,rx1
. 如果rx1
始终都有新消息,则永远不会再检查其余的通道了恢复异步操作 (Resuming an async operation)
select!
宏中调用action(),它在循环外被调用。action()
的返回分配给operation,而不需要调用.await
。然后在operation
上调用tokio::pin!
.select!
循里面,不是传递operation而是传递&mut operation
.operation
变量正在跟踪异步操作。action()
的一次新的调用。select!
分支从通道中接收消息。如果消息是偶数,则循环完成。否则再次开始select!
..await
一个引用,必须固定引用的值或者实现Unpin
.每个任务的并发
tokio::spawn
与select!
都能运行并发异步操作。tokio::spawn
函数传入一个异步操作并产生一个新的任务去运行它。select!
宏能在同一个任务上同时运行所有分支
。select!
宏上的所有分支被同一个任务执行,它们永远不会同时运行。select!
宏的多路复用 步操作也在单个任务上运行。#type/rust #type/networking #public
The text was updated successfully, but these errors were encountered: