1 #![feature(test)]
2
3 extern crate test;
4 use crate::test::Bencher;
5
6 use {
7 futures::{
8 channel::mpsc::{self, Sender, UnboundedSender},
9 ready,
10 sink::Sink,
11 stream::{Stream, StreamExt},
12 task::{Context, Poll},
13 },
14 futures_test::task::noop_context,
15 std::pin::Pin,
16 };
17
18 /// Single producer, single consumer
19 #[bench]
unbounded_1_tx(b: &mut Bencher)20 fn unbounded_1_tx(b: &mut Bencher) {
21 let mut cx = noop_context();
22 b.iter(|| {
23 let (tx, mut rx) = mpsc::unbounded();
24
25 // 1000 iterations to avoid measuring overhead of initialization
26 // Result should be divided by 1000
27 for i in 0..1000 {
28 // Poll, not ready, park
29 assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
30
31 UnboundedSender::unbounded_send(&tx, i).unwrap();
32
33 // Now poll ready
34 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
35 }
36 })
37 }
38
39 /// 100 producers, single consumer
40 #[bench]
unbounded_100_tx(b: &mut Bencher)41 fn unbounded_100_tx(b: &mut Bencher) {
42 let mut cx = noop_context();
43 b.iter(|| {
44 let (tx, mut rx) = mpsc::unbounded();
45
46 let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
47
48 // 1000 send/recv operations total, result should be divided by 1000
49 for _ in 0..10 {
50 for (i, x) in tx.iter().enumerate() {
51 assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
52
53 UnboundedSender::unbounded_send(x, i).unwrap();
54
55 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
56 }
57 }
58 })
59 }
60
61 #[bench]
unbounded_uncontended(b: &mut Bencher)62 fn unbounded_uncontended(b: &mut Bencher) {
63 let mut cx = noop_context();
64 b.iter(|| {
65 let (tx, mut rx) = mpsc::unbounded();
66
67 for i in 0..1000 {
68 UnboundedSender::unbounded_send(&tx, i).expect("send");
69 // No need to create a task, because poll is not going to park.
70 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
71 }
72 })
73 }
74
75 /// A Stream that continuously sends incrementing number of the queue
76 struct TestSender {
77 tx: Sender<u32>,
78 last: u32, // Last number sent
79 }
80
81 // Could be a Future, it doesn't matter
82 impl Stream for TestSender {
83 type Item = u32;
84
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>85 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
86 let this = &mut *self;
87 let mut tx = Pin::new(&mut this.tx);
88
89 ready!(tx.as_mut().poll_ready(cx)).unwrap();
90 tx.as_mut().start_send(this.last + 1).unwrap();
91 this.last += 1;
92 assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx));
93 Poll::Ready(Some(this.last))
94 }
95 }
96
97 /// Single producers, single consumer
98 #[bench]
bounded_1_tx(b: &mut Bencher)99 fn bounded_1_tx(b: &mut Bencher) {
100 let mut cx = noop_context();
101 b.iter(|| {
102 let (tx, mut rx) = mpsc::channel(0);
103
104 let mut tx = TestSender { tx, last: 0 };
105
106 for i in 0..1000 {
107 assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
108 assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
109 assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
110 }
111 })
112 }
113
114 /// 100 producers, single consumer
115 #[bench]
bounded_100_tx(b: &mut Bencher)116 fn bounded_100_tx(b: &mut Bencher) {
117 let mut cx = noop_context();
118 b.iter(|| {
119 // Each sender can send one item after specified capacity
120 let (tx, mut rx) = mpsc::channel(0);
121
122 let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();
123
124 for i in 0..10 {
125 for x in &mut tx {
126 // Send an item
127 assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx));
128 // Then block
129 assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx));
130 // Recv the item
131 assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
132 }
133 }
134 })
135 }
136