select!
futures::select
宏能同时运行多个 Future ,从而使用户,可以在任何 Future 完成后,立即做出响应。
#![allow(unused_variables)] fn main() { use futures::{ future::FutureExt, // for `.fuse()` pin_mut, select, }; async fn task_one() { /* ... */ } async fn task_two() { /* ... */ } async fn race_tasks() { let t1 = task_one().fuse(); let t2 = task_two().fuse(); pin_mut!(t1, t2); select! { () = t1 => println!("task one completed first"), () = t2 => println!("task two completed first"), } } }
上面的函数将同时运行t1
和t2
同时。当t1
或是t2
完成后,相应的处理程序将调用println!
,而该函数将在不完成剩余任务的情况下,结束。
select
的基本语法是<pattern> = <expression> => <code>,
,重复您想要在select
上使用的,任意数量的 Future 。
default => ...
and complete => ...
select
也支持default
和complete
分支。
如果select
了的 Futures 没有一个是完成的,default
分支将运行。select
带上一个default
分支的组合,始终会立即返回,因为default
在其他 Future 均未准备好,就运行了。
complete
分支可以用来处理,select
ed Future 全部完成,并且将不再前进。对一个select!
循环访问时通常很方便。
#![allow(unused_variables)] fn main() { use futures::{future, select}; async fn count() { let mut a_fut = future::ready(4); let mut b_fut = future::ready(6); let mut total = 0; loop { select! { a = a_fut => total += a, b = b_fut => total += b, complete => break, default => unreachable!(), // never runs (futures are ready, then complete) }; } assert_eq!(total, 10); } }
Interaction with Unpin
and FusedFuture
您可能在上面的第一个示例中,注意到的一件事是,我们必须在两个async fn
返回的 Future 上调用.fuse()
,以及将用pin_mut
固定。这两个调用都是必需的,因为在select
中使用的 futures ,必须同时实现了Unpin
trait 与FusedFuture
trait。
Unpin
是必要的,因为select
不是取值的,而是可变的引用。由于不拥有 Future 的所有权,因此可以在对select
的调用后,未完成的 futures 还可以再次使用。
同样,FusedFuture
trait 是必需的,因为select
在一个 future 完成后,必不得对它再轮询。FusedFuture
是 一个 Future trait,作用是追踪 Future 本身是否完成的。这样使得,select
能在一个循环中使用,只轮询仍未完成的 Futures。可以在上面的示例中看到,其中a_fut
或是b_fut
是在循环第二次时完成。因为 future::ready
返回的 Future 实现了FusedFuture
,它可以告诉select
不要再次轮询。
请注意,streams 具有对应的FusedStream
trait。实现此 trait 或由.fuse()
封装的 Streams,会从他们的 Future .next()
/.try_next()
组合器中, yield 出FusedFuture
futures。
#![allow(unused_variables)] fn main() { use futures::{ stream::{Stream, StreamExt, FusedStream}, select, }; async fn add_two_streams( mut s1: impl Stream<Item = u8> + FusedStream + Unpin, mut s2: impl Stream<Item = u8> + FusedStream + Unpin, ) -> u8 { let mut total = 0; loop { let item = select! { x = s1.next() => x, x = s2.next() => x, complete => break, }; if let Some(next_num) = item { total += next_num; } } total } }
Concurrent tasks in a select
loop with Fuse
and FuturesUnordered
一个有点难以发现但方便的函数是Fuse::terminated()
,它允许构造一个已经终止的,空的 Future,之后可以用需要运行的 Future 填充它。
有个方便的情况就是,有一个任务需要在一个select
循环内运行,但这个循环又是在这个select
循环本身里面创建的。
注意使用.select_next_some()
函数。可以与select
合作,只运行那些由 stream 返回的Some(_)
值,而忽略None
s。
#![allow(unused_variables)] fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) { /* ... */ } async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let run_on_new_num_fut = run_on_new_num(starting_num).fuse(); let get_new_num_fut = Fuse::terminated(); pin_mut!(run_on_new_num_fut, get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`, // dropping the old one. run_on_new_num_fut.set(run_on_new_num(new_num).fuse()); }, // Run the `run_on_new_num_fut` () = run_on_new_num_fut => {}, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } }
如果需要同时运行多个相同 Future 的副本,请使用FuturesUnordered
类型。以下示例与上面的示例相似,但是将运行run_on_new_num_fut
的每个副本,直到完成,而不是在创建新的时,终止它们。还会打印出一个由run_on_new_num_fut
返回的值。
#![allow(unused_variables)] fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, FuturesUnordered, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 } // Runs `run_on_new_num` with the latest number // retrieved from `get_new_num`. // // `get_new_num` is re-run every time a timer elapses, // immediately cancelling the currently running // `run_on_new_num` and replacing it with the newly // returned value. async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let mut run_on_new_num_futs = FuturesUnordered::new(); run_on_new_num_futs.push(run_on_new_num(starting_num)); let get_new_num_fut = Fuse::terminated(); pin_mut!(get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived-- start a new `run_on_new_num_fut`. run_on_new_num_futs.push(run_on_new_num(new_num)); }, // Run the `run_on_new_num_futs` and check if any have completed res = run_on_new_num_futs.select_next_some() => { println!("run_on_new_num_fut returned {:?}", res); }, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } }