• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
2 
3 use futures::channel::oneshot;
4 use futures::executor::ThreadPool;
5 use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
6 use futures::task::SpawnExt;
7 use std::sync::mpsc;
8 use std::thread;
9 
run<F: Future + Send + 'static>(future: F)10 fn run<F: Future + Send + 'static>(future: F) {
11     let tp = ThreadPool::new().unwrap();
12     tp.spawn(future.map(drop)).unwrap();
13 }
14 
15 #[test]
join1()16 fn join1() {
17     let (tx, rx) = mpsc::channel();
18     run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap()));
19     assert_eq!(rx.recv(), Ok((1, 2)));
20     assert!(rx.recv().is_err());
21 }
22 
23 #[test]
join2()24 fn join2() {
25     let (c1, p1) = oneshot::channel::<i32>();
26     let (c2, p2) = oneshot::channel::<i32>();
27     let (tx, rx) = mpsc::channel();
28     run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
29     assert!(rx.try_recv().is_err());
30     c1.send(1).unwrap();
31     assert!(rx.try_recv().is_err());
32     c2.send(2).unwrap();
33     assert_eq!(rx.recv(), Ok((1, 2)));
34     assert!(rx.recv().is_err());
35 }
36 
37 #[test]
join3()38 fn join3() {
39     let (c1, p1) = oneshot::channel::<i32>();
40     let (c2, p2) = oneshot::channel::<i32>();
41     let (tx, rx) = mpsc::channel();
42     run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap()));
43     assert!(rx.try_recv().is_err());
44     drop(c1);
45     assert_eq!(rx.recv(), Ok(1));
46     assert!(rx.recv().is_err());
47     drop(c2);
48 }
49 
50 #[test]
join4()51 fn join4() {
52     let (c1, p1) = oneshot::channel::<i32>();
53     let (c2, p2) = oneshot::channel::<i32>();
54     let (tx, rx) = mpsc::channel();
55     run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap()));
56     assert!(rx.try_recv().is_err());
57     drop(c1);
58     assert!(rx.recv().is_ok());
59     drop(c2);
60     assert!(rx.recv().is_err());
61 }
62 
63 #[test]
join5()64 fn join5() {
65     let (c1, p1) = oneshot::channel::<i32>();
66     let (c2, p2) = oneshot::channel::<i32>();
67     let (c3, p3) = oneshot::channel::<i32>();
68     let (tx, rx) = mpsc::channel();
69     run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap()));
70     assert!(rx.try_recv().is_err());
71     c1.send(1).unwrap();
72     assert!(rx.try_recv().is_err());
73     c2.send(2).unwrap();
74     assert!(rx.try_recv().is_err());
75     c3.send(3).unwrap();
76     assert_eq!(rx.recv(), Ok(((1, 2), 3)));
77     assert!(rx.recv().is_err());
78 }
79 
80 #[test]
select1()81 fn select1() {
82     let (c1, p1) = oneshot::channel::<i32>();
83     let (c2, p2) = oneshot::channel::<i32>();
84     let (tx, rx) = mpsc::channel();
85     run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
86     assert!(rx.try_recv().is_err());
87     c1.send(1).unwrap();
88     let (v, p2) = rx.recv().unwrap().into_inner();
89     assert_eq!(v, 1);
90     assert!(rx.recv().is_err());
91 
92     let (tx, rx) = mpsc::channel();
93     run(p2.map_ok(move |v| tx.send(v).unwrap()));
94     c2.send(2).unwrap();
95     assert_eq!(rx.recv(), Ok(2));
96     assert!(rx.recv().is_err());
97 }
98 
99 #[test]
select2()100 fn select2() {
101     let (c1, p1) = oneshot::channel::<i32>();
102     let (c2, p2) = oneshot::channel::<i32>();
103     let (tx, rx) = mpsc::channel();
104     run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
105     assert!(rx.try_recv().is_err());
106     drop(c1);
107     let (v, p2) = rx.recv().unwrap();
108     assert_eq!(v, 1);
109     assert!(rx.recv().is_err());
110 
111     let (tx, rx) = mpsc::channel();
112     run(p2.map_ok(move |v| tx.send(v).unwrap()));
113     c2.send(2).unwrap();
114     assert_eq!(rx.recv(), Ok(2));
115     assert!(rx.recv().is_err());
116 }
117 
118 #[test]
select3()119 fn select3() {
120     let (c1, p1) = oneshot::channel::<i32>();
121     let (c2, p2) = oneshot::channel::<i32>();
122     let (tx, rx) = mpsc::channel();
123     run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
124     assert!(rx.try_recv().is_err());
125     drop(c1);
126     let (v, p2) = rx.recv().unwrap();
127     assert_eq!(v, 1);
128     assert!(rx.recv().is_err());
129 
130     let (tx, rx) = mpsc::channel();
131     run(p2.map_err(move |_v| tx.send(2).unwrap()));
132     drop(c2);
133     assert_eq!(rx.recv(), Ok(2));
134     assert!(rx.recv().is_err());
135 }
136 
137 #[test]
select4()138 fn select4() {
139     let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>();
140 
141     let t = thread::spawn(move || {
142         for c in rx {
143             c.send(1).unwrap();
144         }
145     });
146 
147     let (tx2, rx2) = mpsc::channel();
148     for _ in 0..10000 {
149         let (c1, p1) = oneshot::channel::<i32>();
150         let (c2, p2) = oneshot::channel::<i32>();
151 
152         let tx3 = tx2.clone();
153         run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap()));
154         tx.send(c1).unwrap();
155         rx2.recv().unwrap();
156         drop(c2);
157     }
158     drop(tx);
159 
160     t.join().unwrap();
161 }
162