• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![feature(test)]
2 
3 extern crate test;
4 use crate::test::Bencher;
5 
6 use {
7     futures::{
8         channel::mpsc::{self, Sender, UnboundedSender},
9         ready,
10         stream::{Stream, StreamExt},
11         sink::Sink,
12         task::{Context, Poll},
13     },
14     futures_test::task::noop_context,
15     std::pin::Pin,
16 };
17 
18 /// Single producer, single consumer
19 #[bench]
unbounded_1_tx(b: &mut Bencher)20 fn unbounded_1_tx(b: &mut Bencher) {
21     let mut cx = noop_context();
22     b.iter(|| {
23         let (tx, mut rx) = mpsc::unbounded();
24 
25         // 1000 iterations to avoid measuring overhead of initialization
26         // Result should be divided by 1000
27         for i in 0..1000 {
28 
29             // Poll, not ready, park
30             assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
31 
32             UnboundedSender::unbounded_send(&tx, i).unwrap();
33 
34             // Now poll ready
35             assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
36         }
37     })
38 }
39 
40 /// 100 producers, single consumer
41 #[bench]
unbounded_100_tx(b: &mut Bencher)42 fn unbounded_100_tx(b: &mut Bencher) {
43     let mut cx = noop_context();
44     b.iter(|| {
45         let (tx, mut rx) = mpsc::unbounded();
46 
47         let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
48 
49         // 1000 send/recv operations total, result should be divided by 1000
50         for _ in 0..10 {
51             for (i, x) in tx.iter().enumerate() {
52                 assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
53 
54                 UnboundedSender::unbounded_send(x, i).unwrap();
55 
56                 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
57             }
58         }
59     })
60 }
61 
62 #[bench]
unbounded_uncontended(b: &mut Bencher)63 fn unbounded_uncontended(b: &mut Bencher) {
64     let mut cx = noop_context();
65     b.iter(|| {
66         let (tx, mut rx) = mpsc::unbounded();
67 
68         for i in 0..1000 {
69             UnboundedSender::unbounded_send(&tx, i).expect("send");
70             // No need to create a task, because poll is not going to park.
71             assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
72         }
73     })
74 }
75 
76 
77 /// A Stream that continuously sends incrementing number of the queue
78 struct TestSender {
79     tx: Sender<u32>,
80     last: u32, // Last number sent
81 }
82 
83 // Could be a Future, it doesn't matter
84 impl Stream for TestSender {
85     type Item = u32;
86 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>87     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
88         -> Poll<Option<Self::Item>>
89     {
90         let this = &mut *self;
91         let mut tx = Pin::new(&mut this.tx);
92 
93         ready!(tx.as_mut().poll_ready(cx)).unwrap();
94         tx.as_mut().start_send(this.last + 1).unwrap();
95         this.last += 1;
96         assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx));
97         Poll::Ready(Some(this.last))
98     }
99 }
100 
101 /// Single producers, single consumer
102 #[bench]
bounded_1_tx(b: &mut Bencher)103 fn bounded_1_tx(b: &mut Bencher) {
104     let mut cx = noop_context();
105     b.iter(|| {
106         let (tx, mut rx) = mpsc::channel(0);
107 
108         let mut tx = TestSender { tx, last: 0 };
109 
110         for i in 0..1000 {
111             assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
112             assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
113             assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
114         }
115     })
116 }
117 
118 /// 100 producers, single consumer
119 #[bench]
bounded_100_tx(b: &mut Bencher)120 fn bounded_100_tx(b: &mut Bencher) {
121     let mut cx = noop_context();
122     b.iter(|| {
123         // Each sender can send one item after specified capacity
124         let (tx, mut rx) = mpsc::channel(0);
125 
126         let mut tx: Vec<_> = (0..100).map(|_| {
127             TestSender {
128                 tx: tx.clone(),
129                 last: 0
130             }
131         }).collect();
132 
133         for i in 0..10 {
134             for x in &mut tx {
135                 // Send an item
136                 assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx));
137                 // Then block
138                 assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx));
139                 // Recv the item
140                 assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
141             }
142         }
143     })
144 }
145