1 #![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
2
3 use futures::channel::oneshot;
4 use futures::executor::ThreadPool;
5 use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
6 use futures::task::SpawnExt;
7 use std::sync::mpsc;
8 use std::thread;
9
run<F: Future + Send + 'static>(future: F)10 fn run<F: Future + Send + 'static>(future: F) {
11 let tp = ThreadPool::new().unwrap();
12 tp.spawn(future.map(drop)).unwrap();
13 }
14
15 #[test]
join1()16 fn join1() {
17 let (tx, rx) = mpsc::channel();
18 run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap()));
19 assert_eq!(rx.recv(), Ok((1, 2)));
20 assert!(rx.recv().is_err());
21 }
22
23 #[test]
join2()24 fn join2() {
25 let (c1, p1) = oneshot::channel::<i32>();
26 let (c2, p2) = oneshot::channel::<i32>();
27 let (tx, rx) = mpsc::channel();
28 run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
29 assert!(rx.try_recv().is_err());
30 c1.send(1).unwrap();
31 assert!(rx.try_recv().is_err());
32 c2.send(2).unwrap();
33 assert_eq!(rx.recv(), Ok((1, 2)));
34 assert!(rx.recv().is_err());
35 }
36
37 #[test]
join3()38 fn join3() {
39 let (c1, p1) = oneshot::channel::<i32>();
40 let (c2, p2) = oneshot::channel::<i32>();
41 let (tx, rx) = mpsc::channel();
42 run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap()));
43 assert!(rx.try_recv().is_err());
44 drop(c1);
45 assert_eq!(rx.recv(), Ok(1));
46 assert!(rx.recv().is_err());
47 drop(c2);
48 }
49
50 #[test]
join4()51 fn join4() {
52 let (c1, p1) = oneshot::channel::<i32>();
53 let (c2, p2) = oneshot::channel::<i32>();
54 let (tx, rx) = mpsc::channel();
55 run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap()));
56 assert!(rx.try_recv().is_err());
57 drop(c1);
58 assert!(rx.recv().is_ok());
59 drop(c2);
60 assert!(rx.recv().is_err());
61 }
62
63 #[test]
join5()64 fn join5() {
65 let (c1, p1) = oneshot::channel::<i32>();
66 let (c2, p2) = oneshot::channel::<i32>();
67 let (c3, p3) = oneshot::channel::<i32>();
68 let (tx, rx) = mpsc::channel();
69 run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap()));
70 assert!(rx.try_recv().is_err());
71 c1.send(1).unwrap();
72 assert!(rx.try_recv().is_err());
73 c2.send(2).unwrap();
74 assert!(rx.try_recv().is_err());
75 c3.send(3).unwrap();
76 assert_eq!(rx.recv(), Ok(((1, 2), 3)));
77 assert!(rx.recv().is_err());
78 }
79
80 #[test]
select1()81 fn select1() {
82 let (c1, p1) = oneshot::channel::<i32>();
83 let (c2, p2) = oneshot::channel::<i32>();
84 let (tx, rx) = mpsc::channel();
85 run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
86 assert!(rx.try_recv().is_err());
87 c1.send(1).unwrap();
88 let (v, p2) = rx.recv().unwrap().into_inner();
89 assert_eq!(v, 1);
90 assert!(rx.recv().is_err());
91
92 let (tx, rx) = mpsc::channel();
93 run(p2.map_ok(move |v| tx.send(v).unwrap()));
94 c2.send(2).unwrap();
95 assert_eq!(rx.recv(), Ok(2));
96 assert!(rx.recv().is_err());
97 }
98
99 #[test]
select2()100 fn select2() {
101 let (c1, p1) = oneshot::channel::<i32>();
102 let (c2, p2) = oneshot::channel::<i32>();
103 let (tx, rx) = mpsc::channel();
104 run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
105 assert!(rx.try_recv().is_err());
106 drop(c1);
107 let (v, p2) = rx.recv().unwrap();
108 assert_eq!(v, 1);
109 assert!(rx.recv().is_err());
110
111 let (tx, rx) = mpsc::channel();
112 run(p2.map_ok(move |v| tx.send(v).unwrap()));
113 c2.send(2).unwrap();
114 assert_eq!(rx.recv(), Ok(2));
115 assert!(rx.recv().is_err());
116 }
117
118 #[test]
select3()119 fn select3() {
120 let (c1, p1) = oneshot::channel::<i32>();
121 let (c2, p2) = oneshot::channel::<i32>();
122 let (tx, rx) = mpsc::channel();
123 run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
124 assert!(rx.try_recv().is_err());
125 drop(c1);
126 let (v, p2) = rx.recv().unwrap();
127 assert_eq!(v, 1);
128 assert!(rx.recv().is_err());
129
130 let (tx, rx) = mpsc::channel();
131 run(p2.map_err(move |_v| tx.send(2).unwrap()));
132 drop(c2);
133 assert_eq!(rx.recv(), Ok(2));
134 assert!(rx.recv().is_err());
135 }
136
137 #[test]
select4()138 fn select4() {
139 let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>();
140
141 let t = thread::spawn(move || {
142 for c in rx {
143 c.send(1).unwrap();
144 }
145 });
146
147 let (tx2, rx2) = mpsc::channel();
148 for _ in 0..10000 {
149 let (c1, p1) = oneshot::channel::<i32>();
150 let (c2, p2) = oneshot::channel::<i32>();
151
152 let tx3 = tx2.clone();
153 run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap()));
154 tx.send(c1).unwrap();
155 rx2.recv().unwrap();
156 drop(c2);
157 }
158 drop(tx);
159
160 t.join().unwrap();
161 }
162