1 #![feature(test)] 2 3 extern crate test; 4 use crate::test::Bencher; 5 6 use futures::channel::oneshot; 7 use futures::executor::block_on; 8 use futures::future::{self, FutureExt}; 9 use futures::stream::{self, StreamExt}; 10 use futures::task::Poll; 11 use std::collections::VecDeque; 12 use std::thread; 13 14 #[bench] oneshot_streams(b: &mut Bencher)15fn oneshot_streams(b: &mut Bencher) { 16 const STREAM_COUNT: usize = 10_000; 17 const STREAM_ITEM_COUNT: usize = 1; 18 19 b.iter(|| { 20 let mut txs = VecDeque::with_capacity(STREAM_COUNT); 21 let mut rxs = Vec::new(); 22 23 for _ in 0..STREAM_COUNT { 24 let (tx, rx) = oneshot::channel(); 25 txs.push_back(tx); 26 rxs.push(rx); 27 } 28 29 thread::spawn(move || { 30 let mut last = 1; 31 while let Some(tx) = txs.pop_front() { 32 let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT)); 33 last += STREAM_ITEM_COUNT; 34 } 35 }); 36 37 let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| { 38 async { 39 if let Some(next) = vals.next() { 40 let val = next.await.unwrap(); 41 Some((val, vals)) 42 } else { 43 None 44 } 45 } 46 .boxed() 47 }) 48 .flatten_unordered(None); 49 50 block_on(future::poll_fn(move |cx| { 51 let mut count = 0; 52 loop { 53 match flatten.poll_next_unpin(cx) { 54 Poll::Ready(None) => break, 55 Poll::Ready(Some(_)) => { 56 count += 1; 57 } 58 _ => {} 59 } 60 } 61 assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT); 62 63 Poll::Ready(()) 64 })) 65 }); 66 } 67