1 #![feature(test)]
2
3 extern crate test;
4 use crate::test::Bencher;
5
6 use {
7 futures::{
8 channel::mpsc::{self, Sender, UnboundedSender},
9 ready,
10 stream::{Stream, StreamExt},
11 sink::Sink,
12 task::{Context, Poll},
13 },
14 futures_test::task::noop_context,
15 std::pin::Pin,
16 };
17
18 /// Single producer, single consumer
19 #[bench]
unbounded_1_tx(b: &mut Bencher)20 fn unbounded_1_tx(b: &mut Bencher) {
21 let mut cx = noop_context();
22 b.iter(|| {
23 let (tx, mut rx) = mpsc::unbounded();
24
25 // 1000 iterations to avoid measuring overhead of initialization
26 // Result should be divided by 1000
27 for i in 0..1000 {
28
29 // Poll, not ready, park
30 assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
31
32 UnboundedSender::unbounded_send(&tx, i).unwrap();
33
34 // Now poll ready
35 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
36 }
37 })
38 }
39
40 /// 100 producers, single consumer
41 #[bench]
unbounded_100_tx(b: &mut Bencher)42 fn unbounded_100_tx(b: &mut Bencher) {
43 let mut cx = noop_context();
44 b.iter(|| {
45 let (tx, mut rx) = mpsc::unbounded();
46
47 let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
48
49 // 1000 send/recv operations total, result should be divided by 1000
50 for _ in 0..10 {
51 for (i, x) in tx.iter().enumerate() {
52 assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
53
54 UnboundedSender::unbounded_send(x, i).unwrap();
55
56 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
57 }
58 }
59 })
60 }
61
62 #[bench]
unbounded_uncontended(b: &mut Bencher)63 fn unbounded_uncontended(b: &mut Bencher) {
64 let mut cx = noop_context();
65 b.iter(|| {
66 let (tx, mut rx) = mpsc::unbounded();
67
68 for i in 0..1000 {
69 UnboundedSender::unbounded_send(&tx, i).expect("send");
70 // No need to create a task, because poll is not going to park.
71 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
72 }
73 })
74 }
75
76
77 /// A Stream that continuously sends incrementing number of the queue
78 struct TestSender {
79 tx: Sender<u32>,
80 last: u32, // Last number sent
81 }
82
83 // Could be a Future, it doesn't matter
84 impl Stream for TestSender {
85 type Item = u32;
86
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>87 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
88 -> Poll<Option<Self::Item>>
89 {
90 let this = &mut *self;
91 let mut tx = Pin::new(&mut this.tx);
92
93 ready!(tx.as_mut().poll_ready(cx)).unwrap();
94 tx.as_mut().start_send(this.last + 1).unwrap();
95 this.last += 1;
96 assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx));
97 Poll::Ready(Some(this.last))
98 }
99 }
100
101 /// Single producers, single consumer
102 #[bench]
bounded_1_tx(b: &mut Bencher)103 fn bounded_1_tx(b: &mut Bencher) {
104 let mut cx = noop_context();
105 b.iter(|| {
106 let (tx, mut rx) = mpsc::channel(0);
107
108 let mut tx = TestSender { tx, last: 0 };
109
110 for i in 0..1000 {
111 assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
112 assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
113 assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
114 }
115 })
116 }
117
118 /// 100 producers, single consumer
119 #[bench]
bounded_100_tx(b: &mut Bencher)120 fn bounded_100_tx(b: &mut Bencher) {
121 let mut cx = noop_context();
122 b.iter(|| {
123 // Each sender can send one item after specified capacity
124 let (tx, mut rx) = mpsc::channel(0);
125
126 let mut tx: Vec<_> = (0..100).map(|_| {
127 TestSender {
128 tx: tx.clone(),
129 last: 0
130 }
131 }).collect();
132
133 for i in 0..10 {
134 for x in &mut tx {
135 // Send an item
136 assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx));
137 // Then block
138 assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx));
139 // Recv the item
140 assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
141 }
142 }
143 })
144 }
145