• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures::channel::oneshot::{self, Sender};
2 use futures::executor::block_on;
3 use futures::future::{poll_fn, FutureExt};
4 use futures::task::{Context, Poll};
5 use futures_test::task::panic_waker_ref;
6 use std::sync::mpsc;
7 use std::thread;
8 
9 #[test]
smoke_poll()10 fn smoke_poll() {
11     let (mut tx, rx) = oneshot::channel::<u32>();
12     let mut rx = Some(rx);
13     let f = poll_fn(|cx| {
14         assert!(tx.poll_canceled(cx).is_pending());
15         assert!(tx.poll_canceled(cx).is_pending());
16         drop(rx.take());
17         assert!(tx.poll_canceled(cx).is_ready());
18         assert!(tx.poll_canceled(cx).is_ready());
19         Poll::Ready(())
20     });
21 
22     block_on(f);
23 }
24 
25 #[test]
cancel_notifies()26 fn cancel_notifies() {
27     let (mut tx, rx) = oneshot::channel::<u32>();
28 
29     let t = thread::spawn(move || {
30         block_on(tx.cancellation());
31     });
32     drop(rx);
33     t.join().unwrap();
34 }
35 
36 #[test]
cancel_lots()37 fn cancel_lots() {
38     #[cfg(miri)]
39     const N: usize = 100;
40     #[cfg(not(miri))]
41     const N: usize = 20000;
42 
43     let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
44     let t = thread::spawn(move || {
45         for (mut tx, tx2) in rx {
46             block_on(tx.cancellation());
47             tx2.send(()).unwrap();
48         }
49     });
50 
51     for _ in 0..N {
52         let (otx, orx) = oneshot::channel::<u32>();
53         let (tx2, rx2) = mpsc::channel();
54         tx.send((otx, tx2)).unwrap();
55         drop(orx);
56         rx2.recv().unwrap();
57     }
58     drop(tx);
59 
60     t.join().unwrap();
61 }
62 
63 #[test]
cancel_after_sender_drop_doesnt_notify()64 fn cancel_after_sender_drop_doesnt_notify() {
65     let (mut tx, rx) = oneshot::channel::<u32>();
66     let mut cx = Context::from_waker(panic_waker_ref());
67     assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
68     drop(tx);
69     drop(rx);
70 }
71 
72 #[test]
close()73 fn close() {
74     let (mut tx, mut rx) = oneshot::channel::<u32>();
75     rx.close();
76     block_on(poll_fn(|cx| {
77         match rx.poll_unpin(cx) {
78             Poll::Ready(Err(_)) => {}
79             _ => panic!(),
80         };
81         assert!(tx.poll_canceled(cx).is_ready());
82         Poll::Ready(())
83     }));
84 }
85 
86 #[test]
close_wakes()87 fn close_wakes() {
88     let (mut tx, mut rx) = oneshot::channel::<u32>();
89     let (tx2, rx2) = mpsc::channel();
90     let t = thread::spawn(move || {
91         rx.close();
92         rx2.recv().unwrap();
93     });
94     block_on(tx.cancellation());
95     tx2.send(()).unwrap();
96     t.join().unwrap();
97 }
98 
99 #[test]
is_canceled()100 fn is_canceled() {
101     let (tx, rx) = oneshot::channel::<u32>();
102     assert!(!tx.is_canceled());
103     drop(rx);
104     assert!(tx.is_canceled());
105 }
106 
107 #[test]
cancel_sends()108 fn cancel_sends() {
109     #[cfg(miri)]
110     const N: usize = 100;
111     #[cfg(not(miri))]
112     const N: usize = 20000;
113 
114     let (tx, rx) = mpsc::channel::<Sender<_>>();
115     let t = thread::spawn(move || {
116         for otx in rx {
117             let _ = otx.send(42);
118         }
119     });
120 
121     for _ in 0..N {
122         let (otx, mut orx) = oneshot::channel::<u32>();
123         tx.send(otx).unwrap();
124 
125         orx.close();
126         let _ = block_on(orx);
127     }
128 
129     drop(tx);
130     t.join().unwrap();
131 }
132 
133 // #[test]
134 // fn spawn_sends_items() {
135 //     let core = local_executor::Core::new();
136 //     let future = ok::<_, ()>(1);
137 //     let rx = spawn(future, &core);
138 //     assert_eq!(core.run(rx).unwrap(), 1);
139 // }
140 //
141 // #[test]
142 // fn spawn_kill_dead_stream() {
143 //     use std::thread;
144 //     use std::time::Duration;
145 //     use futures::future::Either;
146 //     use futures::sync::oneshot;
147 //
148 //     // a future which never returns anything (forever accepting incoming
149 //     // connections), but dropping it leads to observable side effects
150 //     // (like closing listening sockets, releasing limited resources,
151 //     // ...)
152 //     #[derive(Debug)]
153 //     struct Dead {
154 //         // when dropped you should get Err(oneshot::Canceled) on the
155 //         // receiving end
156 //         done: oneshot::Sender<()>,
157 //     }
158 //     impl Future for Dead {
159 //         type Item = ();
160 //         type Error = ();
161 //
162 //         fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
163 //             Ok(Poll::Pending)
164 //         }
165 //     }
166 //
167 //     // need to implement a timeout for the test, as it would hang
168 //     // forever right now
169 //     let (timeout_tx, timeout_rx) = oneshot::channel();
170 //     thread::spawn(move || {
171 //         thread::sleep(Duration::from_millis(1000));
172 //         let _ = timeout_tx.send(());
173 //     });
174 //
175 //     let core = local_executor::Core::new();
176 //     let (done_tx, done_rx) = oneshot::channel();
177 //     let future = Dead{done: done_tx};
178 //     let rx = spawn(future, &core);
179 //     let res = core.run(
180 //         Ok::<_, ()>(())
181 //         .into_future()
182 //         .then(move |_| {
183 //             // now drop the spawned future: maybe some timeout exceeded,
184 //             // or some connection on this end was closed by the remote
185 //             // end.
186 //             drop(rx);
187 //             // and wait for the spawned future to release its resources
188 //             done_rx
189 //         })
190 //         .select2(timeout_rx)
191 //     );
192 //     match res {
193 //         Err(Either::A((oneshot::Canceled, _))) => (),
194 //         Ok(Either::B(((), _))) => {
195 //             panic!("dead future wasn't canceled (timeout)");
196 //         },
197 //         _ => {
198 //             panic!("dead future wasn't canceled (unexpected result)");
199 //         },
200 //     }
201 // }
202 //
203 // #[test]
204 // fn spawn_dont_kill_forgot_dead_stream() {
205 //     use std::thread;
206 //     use std::time::Duration;
207 //     use futures::future::Either;
208 //     use futures::sync::oneshot;
209 //
210 //     // a future which never returns anything (forever accepting incoming
211 //     // connections), but dropping it leads to observable side effects
212 //     // (like closing listening sockets, releasing limited resources,
213 //     // ...)
214 //     #[derive(Debug)]
215 //     struct Dead {
216 //         // when dropped you should get Err(oneshot::Canceled) on the
217 //         // receiving end
218 //         done: oneshot::Sender<()>,
219 //     }
220 //     impl Future for Dead {
221 //         type Item = ();
222 //         type Error = ();
223 //
224 //         fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
225 //             Ok(Poll::Pending)
226 //         }
227 //     }
228 //
229 //     // need to implement a timeout for the test, as it would hang
230 //     // forever right now
231 //     let (timeout_tx, timeout_rx) = oneshot::channel();
232 //     thread::spawn(move || {
233 //         thread::sleep(Duration::from_millis(1000));
234 //         let _ = timeout_tx.send(());
235 //     });
236 //
237 //     let core = local_executor::Core::new();
238 //     let (done_tx, done_rx) = oneshot::channel();
239 //     let future = Dead{done: done_tx};
240 //     let rx = spawn(future, &core);
241 //     let res = core.run(
242 //         Ok::<_, ()>(())
243 //         .into_future()
244 //         .then(move |_| {
245 //             // forget the spawned future: should keep running, i.e. hit
246 //             // the timeout below.
247 //             rx.forget();
248 //             // and wait for the spawned future to release its resources
249 //             done_rx
250 //         })
251 //         .select2(timeout_rx)
252 //     );
253 //     match res {
254 //         Err(Either::A((oneshot::Canceled, _))) => {
255 //             panic!("forgotten dead future was canceled");
256 //         },
257 //         Ok(Either::B(((), _))) => (), // reached timeout
258 //         _ => {
259 //             panic!("forgotten dead future was canceled (unexpected result)");
260 //         },
261 //     }
262 // }
263