1 use futures::channel::oneshot;
2 use futures::executor::{block_on, block_on_stream};
3 use futures::future::{self, join, Future, FutureExt, TryFutureExt};
4 use futures::stream::{FuturesOrdered, StreamExt};
5 use futures::task::Poll;
6 use futures_test::task::noop_context;
7 use std::any::Any;
8
9 #[test]
works_1()10 fn works_1() {
11 let (a_tx, a_rx) = oneshot::channel::<i32>();
12 let (b_tx, b_rx) = oneshot::channel::<i32>();
13 let (c_tx, c_rx) = oneshot::channel::<i32>();
14
15 let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>();
16
17 b_tx.send(99).unwrap();
18 assert!(stream.poll_next_unpin(&mut noop_context()).is_pending());
19
20 a_tx.send(33).unwrap();
21 c_tx.send(33).unwrap();
22
23 let mut iter = block_on_stream(stream);
24 assert_eq!(Some(Ok(33)), iter.next());
25 assert_eq!(Some(Ok(99)), iter.next());
26 assert_eq!(Some(Ok(33)), iter.next());
27 assert_eq!(None, iter.next());
28 }
29
30 #[test]
works_2()31 fn works_2() {
32 let (a_tx, a_rx) = oneshot::channel::<i32>();
33 let (b_tx, b_rx) = oneshot::channel::<i32>();
34 let (c_tx, c_rx) = oneshot::channel::<i32>();
35
36 let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()]
37 .into_iter()
38 .collect::<FuturesOrdered<_>>();
39
40 let mut cx = noop_context();
41 a_tx.send(33).unwrap();
42 b_tx.send(33).unwrap();
43 assert!(stream.poll_next_unpin(&mut cx).is_ready());
44 assert!(stream.poll_next_unpin(&mut cx).is_pending());
45 c_tx.send(33).unwrap();
46 assert!(stream.poll_next_unpin(&mut cx).is_ready());
47 }
48
49 #[test]
test_push_front()50 fn test_push_front() {
51 let (a_tx, a_rx) = oneshot::channel::<i32>();
52 let (b_tx, b_rx) = oneshot::channel::<i32>();
53 let (c_tx, c_rx) = oneshot::channel::<i32>();
54 let (d_tx, d_rx) = oneshot::channel::<i32>();
55
56 let mut stream = FuturesOrdered::new();
57
58 let mut cx = noop_context();
59
60 stream.push_back(a_rx);
61 stream.push_back(b_rx);
62 stream.push_back(c_rx);
63
64 a_tx.send(1).unwrap();
65 b_tx.send(2).unwrap();
66 c_tx.send(3).unwrap();
67
68 // 1 and 2 should be received in order
69 assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
70 assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
71
72 stream.push_front(d_rx);
73 d_tx.send(4).unwrap();
74
75 // we pushed `d_rx` to the front and sent 4, so we should receive 4 next
76 // and then 3 after it
77 assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
78 assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
79 }
80
81 #[test]
test_push_back()82 fn test_push_back() {
83 let (a_tx, a_rx) = oneshot::channel::<i32>();
84 let (b_tx, b_rx) = oneshot::channel::<i32>();
85 let (c_tx, c_rx) = oneshot::channel::<i32>();
86 let (d_tx, d_rx) = oneshot::channel::<i32>();
87
88 let mut stream = FuturesOrdered::new();
89
90 let mut cx = noop_context();
91
92 stream.push_back(a_rx);
93 stream.push_back(b_rx);
94 stream.push_back(c_rx);
95
96 a_tx.send(1).unwrap();
97 b_tx.send(2).unwrap();
98 c_tx.send(3).unwrap();
99
100 // All results should be received in order
101
102 assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
103 assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
104
105 stream.push_back(d_rx);
106 d_tx.send(4).unwrap();
107
108 assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
109 assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
110 }
111
112 #[test]
from_iterator()113 fn from_iterator() {
114 let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
115 .into_iter()
116 .collect::<FuturesOrdered<_>>();
117 assert_eq!(stream.len(), 3);
118 assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
119 }
120
121 #[test]
queue_never_unblocked()122 fn queue_never_unblocked() {
123 let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
124 let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
125 let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();
126
127 let mut stream = vec![
128 Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
129 Box::new(
130 future::try_select(b_rx, c_rx)
131 .map_err(|e| e.factor_first().0)
132 .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>)),
133 ) as _,
134 ]
135 .into_iter()
136 .collect::<FuturesOrdered<_>>();
137
138 let cx = &mut noop_context();
139 for _ in 0..10 {
140 assert!(stream.poll_next_unpin(cx).is_pending());
141 }
142
143 b_tx.send(Box::new(())).unwrap();
144 assert!(stream.poll_next_unpin(cx).is_pending());
145 c_tx.send(Box::new(())).unwrap();
146 assert!(stream.poll_next_unpin(cx).is_pending());
147 assert!(stream.poll_next_unpin(cx).is_pending());
148 }
149
150 #[test]
test_push_front_negative()151 fn test_push_front_negative() {
152 let (a_tx, a_rx) = oneshot::channel::<i32>();
153 let (b_tx, b_rx) = oneshot::channel::<i32>();
154 let (c_tx, c_rx) = oneshot::channel::<i32>();
155
156 let mut stream = FuturesOrdered::new();
157
158 let mut cx = noop_context();
159
160 stream.push_front(a_rx);
161 stream.push_front(b_rx);
162 stream.push_front(c_rx);
163
164 a_tx.send(1).unwrap();
165 b_tx.send(2).unwrap();
166 c_tx.send(3).unwrap();
167
168 // These should all be received in reverse order
169 assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
170 assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
171 assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
172 }
173