Iteration and Concurrency
类似于同步 方式的Iterator
,这里有很多不同的方法可以迭代和处理一个Stream
中的值。有组合器样式的方法,例如map
,filter
和fold
和他们的有错误就早退的表弟try_map
,try_filter
和try_fold
。
不幸,for
循环不适用于Stream
s,但对于命令式代码,while let
和next
/try_next
函数可以这样用:
#![allow(unused_variables)] fn main() { async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 { use futures::stream::StreamExt; // for `next` let mut sum = 0; while let Some(item) = stream.next().await { sum += item; } sum } async fn sum_with_try_next( mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>, ) -> Result<i32, io::Error> { use futures::stream::TryStreamExt; // for `try_next` let mut sum = 0; while let Some(item) = stream.try_next().await? { sum += item; } Ok(sum) } }
但是,如果我们一次只处理一个元素,则可能会失去了并发的机会,这毕竟这是我们要编写异步代码的首要原因。要同时处理一个 stream 中的多个 items,请使用for_each_concurrent
和try_for_each_concurrent
方法:
#![allow(unused_variables)] fn main() { async fn jump_around( mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>, ) -> Result<(), io::Error> { use futures::stream::TryStreamExt; // for `try_for_each_concurrent` const MAX_CONCURRENT_JUMPERS: usize = 100; stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } }