• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures::channel::oneshot;
2 use futures::executor::{block_on, block_on_stream};
3 use futures::future::{self, join, Future, FutureExt, TryFutureExt};
4 use futures::stream::{FuturesOrdered, StreamExt};
5 use futures::task::Poll;
6 use futures_test::task::noop_context;
7 use std::any::Any;
8 
9 #[test]
works_1()10 fn works_1() {
11     let (a_tx, a_rx) = oneshot::channel::<i32>();
12     let (b_tx, b_rx) = oneshot::channel::<i32>();
13     let (c_tx, c_rx) = oneshot::channel::<i32>();
14 
15     let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>();
16 
17     b_tx.send(99).unwrap();
18     assert!(stream.poll_next_unpin(&mut noop_context()).is_pending());
19 
20     a_tx.send(33).unwrap();
21     c_tx.send(33).unwrap();
22 
23     let mut iter = block_on_stream(stream);
24     assert_eq!(Some(Ok(33)), iter.next());
25     assert_eq!(Some(Ok(99)), iter.next());
26     assert_eq!(Some(Ok(33)), iter.next());
27     assert_eq!(None, iter.next());
28 }
29 
30 #[test]
works_2()31 fn works_2() {
32     let (a_tx, a_rx) = oneshot::channel::<i32>();
33     let (b_tx, b_rx) = oneshot::channel::<i32>();
34     let (c_tx, c_rx) = oneshot::channel::<i32>();
35 
36     let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()]
37         .into_iter()
38         .collect::<FuturesOrdered<_>>();
39 
40     let mut cx = noop_context();
41     a_tx.send(33).unwrap();
42     b_tx.send(33).unwrap();
43     assert!(stream.poll_next_unpin(&mut cx).is_ready());
44     assert!(stream.poll_next_unpin(&mut cx).is_pending());
45     c_tx.send(33).unwrap();
46     assert!(stream.poll_next_unpin(&mut cx).is_ready());
47 }
48 
49 #[test]
test_push_front()50 fn test_push_front() {
51     let (a_tx, a_rx) = oneshot::channel::<i32>();
52     let (b_tx, b_rx) = oneshot::channel::<i32>();
53     let (c_tx, c_rx) = oneshot::channel::<i32>();
54     let (d_tx, d_rx) = oneshot::channel::<i32>();
55 
56     let mut stream = FuturesOrdered::new();
57 
58     let mut cx = noop_context();
59 
60     stream.push_back(a_rx);
61     stream.push_back(b_rx);
62     stream.push_back(c_rx);
63 
64     a_tx.send(1).unwrap();
65     b_tx.send(2).unwrap();
66     c_tx.send(3).unwrap();
67 
68     // 1 and 2 should be received in order
69     assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
70     assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
71 
72     stream.push_front(d_rx);
73     d_tx.send(4).unwrap();
74 
75     // we pushed `d_rx` to the front and sent 4, so we should receive 4 next
76     // and then 3 after it
77     assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
78     assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
79 }
80 
81 #[test]
test_push_back()82 fn test_push_back() {
83     let (a_tx, a_rx) = oneshot::channel::<i32>();
84     let (b_tx, b_rx) = oneshot::channel::<i32>();
85     let (c_tx, c_rx) = oneshot::channel::<i32>();
86     let (d_tx, d_rx) = oneshot::channel::<i32>();
87 
88     let mut stream = FuturesOrdered::new();
89 
90     let mut cx = noop_context();
91 
92     stream.push_back(a_rx);
93     stream.push_back(b_rx);
94     stream.push_back(c_rx);
95 
96     a_tx.send(1).unwrap();
97     b_tx.send(2).unwrap();
98     c_tx.send(3).unwrap();
99 
100     // All results should be received in order
101 
102     assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
103     assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
104 
105     stream.push_back(d_rx);
106     d_tx.send(4).unwrap();
107 
108     assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
109     assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
110 }
111 
112 #[test]
from_iterator()113 fn from_iterator() {
114     let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
115         .into_iter()
116         .collect::<FuturesOrdered<_>>();
117     assert_eq!(stream.len(), 3);
118     assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
119 }
120 
121 #[test]
queue_never_unblocked()122 fn queue_never_unblocked() {
123     let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
124     let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
125     let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();
126 
127     let mut stream = vec![
128         Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
129         Box::new(
130             future::try_select(b_rx, c_rx)
131                 .map_err(|e| e.factor_first().0)
132                 .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>)),
133         ) as _,
134     ]
135     .into_iter()
136     .collect::<FuturesOrdered<_>>();
137 
138     let cx = &mut noop_context();
139     for _ in 0..10 {
140         assert!(stream.poll_next_unpin(cx).is_pending());
141     }
142 
143     b_tx.send(Box::new(())).unwrap();
144     assert!(stream.poll_next_unpin(cx).is_pending());
145     c_tx.send(Box::new(())).unwrap();
146     assert!(stream.poll_next_unpin(cx).is_pending());
147     assert!(stream.poll_next_unpin(cx).is_pending());
148 }
149 
150 #[test]
test_push_front_negative()151 fn test_push_front_negative() {
152     let (a_tx, a_rx) = oneshot::channel::<i32>();
153     let (b_tx, b_rx) = oneshot::channel::<i32>();
154     let (c_tx, c_rx) = oneshot::channel::<i32>();
155 
156     let mut stream = FuturesOrdered::new();
157 
158     let mut cx = noop_context();
159 
160     stream.push_front(a_rx);
161     stream.push_front(b_rx);
162     stream.push_front(c_rx);
163 
164     a_tx.send(1).unwrap();
165     b_tx.send(2).unwrap();
166     c_tx.send(3).unwrap();
167 
168     // These should all be received in reverse order
169     assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
170     assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
171     assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
172 }
173