• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![feature(test)]
2 
3 extern crate test;
4 use crate::test::Bencher;
5 
6 use futures::channel::oneshot;
7 use futures::executor::block_on;
8 use futures::future::{self, FutureExt};
9 use futures::stream::{self, StreamExt};
10 use futures::task::Poll;
11 use std::collections::VecDeque;
12 use std::thread;
13 
14 #[bench]
oneshot_streams(b: &mut Bencher)15 fn oneshot_streams(b: &mut Bencher) {
16     const STREAM_COUNT: usize = 10_000;
17     const STREAM_ITEM_COUNT: usize = 1;
18 
19     b.iter(|| {
20         let mut txs = VecDeque::with_capacity(STREAM_COUNT);
21         let mut rxs = Vec::new();
22 
23         for _ in 0..STREAM_COUNT {
24             let (tx, rx) = oneshot::channel();
25             txs.push_back(tx);
26             rxs.push(rx);
27         }
28 
29         thread::spawn(move || {
30             let mut last = 1;
31             while let Some(tx) = txs.pop_front() {
32                 let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
33                 last += STREAM_ITEM_COUNT;
34             }
35         });
36 
37         let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
38             async {
39                 if let Some(next) = vals.next() {
40                     let val = next.await.unwrap();
41                     Some((val, vals))
42                 } else {
43                     None
44                 }
45             }
46             .boxed()
47         })
48         .flatten_unordered(None);
49 
50         block_on(future::poll_fn(move |cx| {
51             let mut count = 0;
52             loop {
53                 match flatten.poll_next_unpin(cx) {
54                     Poll::Ready(None) => break,
55                     Poll::Ready(Some(_)) => {
56                         count += 1;
57                     }
58                     _ => {}
59                 }
60             }
61             assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);
62 
63             Poll::Ready(())
64         }))
65     });
66 }
67