• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![feature(test)]
2 
3 extern crate test;
4 use crate::test::Bencher;
5 
6 use {
7     futures::{
8         channel::mpsc::{self, Sender, UnboundedSender},
9         ready,
10         sink::Sink,
11         stream::{Stream, StreamExt},
12         task::{Context, Poll},
13     },
14     futures_test::task::noop_context,
15     std::pin::Pin,
16 };
17 
18 /// Single producer, single consumer
19 #[bench]
unbounded_1_tx(b: &mut Bencher)20 fn unbounded_1_tx(b: &mut Bencher) {
21     let mut cx = noop_context();
22     b.iter(|| {
23         let (tx, mut rx) = mpsc::unbounded();
24 
25         // 1000 iterations to avoid measuring overhead of initialization
26         // Result should be divided by 1000
27         for i in 0..1000 {
28             // Poll, not ready, park
29             assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
30 
31             UnboundedSender::unbounded_send(&tx, i).unwrap();
32 
33             // Now poll ready
34             assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
35         }
36     })
37 }
38 
39 /// 100 producers, single consumer
40 #[bench]
unbounded_100_tx(b: &mut Bencher)41 fn unbounded_100_tx(b: &mut Bencher) {
42     let mut cx = noop_context();
43     b.iter(|| {
44         let (tx, mut rx) = mpsc::unbounded();
45 
46         let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
47 
48         // 1000 send/recv operations total, result should be divided by 1000
49         for _ in 0..10 {
50             for (i, x) in tx.iter().enumerate() {
51                 assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
52 
53                 UnboundedSender::unbounded_send(x, i).unwrap();
54 
55                 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
56             }
57         }
58     })
59 }
60 
61 #[bench]
unbounded_uncontended(b: &mut Bencher)62 fn unbounded_uncontended(b: &mut Bencher) {
63     let mut cx = noop_context();
64     b.iter(|| {
65         let (tx, mut rx) = mpsc::unbounded();
66 
67         for i in 0..1000 {
68             UnboundedSender::unbounded_send(&tx, i).expect("send");
69             // No need to create a task, because poll is not going to park.
70             assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
71         }
72     })
73 }
74 
75 /// A Stream that continuously sends incrementing number of the queue
76 struct TestSender {
77     tx: Sender<u32>,
78     last: u32, // Last number sent
79 }
80 
81 // Could be a Future, it doesn't matter
82 impl Stream for TestSender {
83     type Item = u32;
84 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>85     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
86         let this = &mut *self;
87         let mut tx = Pin::new(&mut this.tx);
88 
89         ready!(tx.as_mut().poll_ready(cx)).unwrap();
90         tx.as_mut().start_send(this.last + 1).unwrap();
91         this.last += 1;
92         assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx));
93         Poll::Ready(Some(this.last))
94     }
95 }
96 
97 /// Single producers, single consumer
98 #[bench]
bounded_1_tx(b: &mut Bencher)99 fn bounded_1_tx(b: &mut Bencher) {
100     let mut cx = noop_context();
101     b.iter(|| {
102         let (tx, mut rx) = mpsc::channel(0);
103 
104         let mut tx = TestSender { tx, last: 0 };
105 
106         for i in 0..1000 {
107             assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
108             assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
109             assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
110         }
111     })
112 }
113 
114 /// 100 producers, single consumer
115 #[bench]
bounded_100_tx(b: &mut Bencher)116 fn bounded_100_tx(b: &mut Bencher) {
117     let mut cx = noop_context();
118     b.iter(|| {
119         // Each sender can send one item after specified capacity
120         let (tx, mut rx) = mpsc::channel(0);
121 
122         let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();
123 
124         for i in 0..10 {
125             for x in &mut tx {
126                 // Send an item
127                 assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx));
128                 // Then block
129                 assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx));
130                 // Recv the item
131                 assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
132             }
133         }
134     })
135 }
136