• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures::channel::oneshot::{self, Sender};
2 use futures::executor::block_on;
3 use futures::future::{poll_fn, FutureExt};
4 use futures::task::{Context, Poll};
5 use futures_test::task::panic_waker_ref;
6 use std::sync::mpsc;
7 use std::thread;
8 
9 #[test]
smoke_poll()10 fn smoke_poll() {
11     let (mut tx, rx) = oneshot::channel::<u32>();
12     let mut rx = Some(rx);
13     let f = poll_fn(|cx| {
14         assert!(tx.poll_canceled(cx).is_pending());
15         assert!(tx.poll_canceled(cx).is_pending());
16         drop(rx.take());
17         assert!(tx.poll_canceled(cx).is_ready());
18         assert!(tx.poll_canceled(cx).is_ready());
19         Poll::Ready(())
20     });
21 
22     block_on(f);
23 }
24 
25 #[test]
cancel_notifies()26 fn cancel_notifies() {
27     let (mut tx, rx) = oneshot::channel::<u32>();
28 
29     let t = thread::spawn(move || {
30         block_on(tx.cancellation());
31     });
32     drop(rx);
33     t.join().unwrap();
34 }
35 
36 #[test]
cancel_lots()37 fn cancel_lots() {
38     const N: usize = if cfg!(miri) { 100 } else { 20000 };
39 
40     let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
41     let t = thread::spawn(move || {
42         for (mut tx, tx2) in rx {
43             block_on(tx.cancellation());
44             tx2.send(()).unwrap();
45         }
46     });
47 
48     for _ in 0..N {
49         let (otx, orx) = oneshot::channel::<u32>();
50         let (tx2, rx2) = mpsc::channel();
51         tx.send((otx, tx2)).unwrap();
52         drop(orx);
53         rx2.recv().unwrap();
54     }
55     drop(tx);
56 
57     t.join().unwrap();
58 }
59 
60 #[test]
cancel_after_sender_drop_doesnt_notify()61 fn cancel_after_sender_drop_doesnt_notify() {
62     let (mut tx, rx) = oneshot::channel::<u32>();
63     let mut cx = Context::from_waker(panic_waker_ref());
64     assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
65     drop(tx);
66     drop(rx);
67 }
68 
69 #[test]
close()70 fn close() {
71     let (mut tx, mut rx) = oneshot::channel::<u32>();
72     rx.close();
73     block_on(poll_fn(|cx| {
74         match rx.poll_unpin(cx) {
75             Poll::Ready(Err(_)) => {}
76             _ => panic!(),
77         };
78         assert!(tx.poll_canceled(cx).is_ready());
79         Poll::Ready(())
80     }));
81 }
82 
83 #[test]
close_wakes()84 fn close_wakes() {
85     let (mut tx, mut rx) = oneshot::channel::<u32>();
86     let (tx2, rx2) = mpsc::channel();
87     let t = thread::spawn(move || {
88         rx.close();
89         rx2.recv().unwrap();
90     });
91     block_on(tx.cancellation());
92     tx2.send(()).unwrap();
93     t.join().unwrap();
94 }
95 
96 #[test]
is_canceled()97 fn is_canceled() {
98     let (tx, rx) = oneshot::channel::<u32>();
99     assert!(!tx.is_canceled());
100     drop(rx);
101     assert!(tx.is_canceled());
102 }
103 
104 #[test]
cancel_sends()105 fn cancel_sends() {
106     const N: usize = if cfg!(miri) { 100 } else { 20000 };
107 
108     let (tx, rx) = mpsc::channel::<Sender<_>>();
109     let t = thread::spawn(move || {
110         for otx in rx {
111             let _ = otx.send(42);
112         }
113     });
114 
115     for _ in 0..N {
116         let (otx, mut orx) = oneshot::channel::<u32>();
117         tx.send(otx).unwrap();
118 
119         orx.close();
120         let _ = block_on(orx);
121     }
122 
123     drop(tx);
124     t.join().unwrap();
125 }
126 
127 // #[test]
128 // fn spawn_sends_items() {
129 //     let core = local_executor::Core::new();
130 //     let future = ok::<_, ()>(1);
131 //     let rx = spawn(future, &core);
132 //     assert_eq!(core.run(rx).unwrap(), 1);
133 // }
134 //
135 // #[test]
136 // fn spawn_kill_dead_stream() {
137 //     use std::thread;
138 //     use std::time::Duration;
139 //     use futures::future::Either;
140 //     use futures::sync::oneshot;
141 //
142 //     // a future which never returns anything (forever accepting incoming
143 //     // connections), but dropping it leads to observable side effects
144 //     // (like closing listening sockets, releasing limited resources,
145 //     // ...)
146 //     #[derive(Debug)]
147 //     struct Dead {
148 //         // when dropped you should get Err(oneshot::Canceled) on the
149 //         // receiving end
150 //         done: oneshot::Sender<()>,
151 //     }
152 //     impl Future for Dead {
153 //         type Item = ();
154 //         type Error = ();
155 //
156 //         fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
157 //             Ok(Poll::Pending)
158 //         }
159 //     }
160 //
161 //     // need to implement a timeout for the test, as it would hang
162 //     // forever right now
163 //     let (timeout_tx, timeout_rx) = oneshot::channel();
164 //     thread::spawn(move || {
165 //         thread::sleep(Duration::from_millis(1000));
166 //         let _ = timeout_tx.send(());
167 //     });
168 //
169 //     let core = local_executor::Core::new();
170 //     let (done_tx, done_rx) = oneshot::channel();
171 //     let future = Dead{done: done_tx};
172 //     let rx = spawn(future, &core);
173 //     let res = core.run(
174 //         Ok::<_, ()>(())
175 //         .into_future()
176 //         .then(move |_| {
177 //             // now drop the spawned future: maybe some timeout exceeded,
178 //             // or some connection on this end was closed by the remote
179 //             // end.
180 //             drop(rx);
181 //             // and wait for the spawned future to release its resources
182 //             done_rx
183 //         })
184 //         .select2(timeout_rx)
185 //     );
186 //     match res {
187 //         Err(Either::A((oneshot::Canceled, _))) => (),
188 //         Ok(Either::B(((), _))) => {
189 //             panic!("dead future wasn't canceled (timeout)");
190 //         },
191 //         _ => {
192 //             panic!("dead future wasn't canceled (unexpected result)");
193 //         },
194 //     }
195 // }
196 //
197 // #[test]
198 // fn spawn_dont_kill_forgot_dead_stream() {
199 //     use std::thread;
200 //     use std::time::Duration;
201 //     use futures::future::Either;
202 //     use futures::sync::oneshot;
203 //
204 //     // a future which never returns anything (forever accepting incoming
205 //     // connections), but dropping it leads to observable side effects
206 //     // (like closing listening sockets, releasing limited resources,
207 //     // ...)
208 //     #[derive(Debug)]
209 //     struct Dead {
210 //         // when dropped you should get Err(oneshot::Canceled) on the
211 //         // receiving end
212 //         done: oneshot::Sender<()>,
213 //     }
214 //     impl Future for Dead {
215 //         type Item = ();
216 //         type Error = ();
217 //
218 //         fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
219 //             Ok(Poll::Pending)
220 //         }
221 //     }
222 //
223 //     // need to implement a timeout for the test, as it would hang
224 //     // forever right now
225 //     let (timeout_tx, timeout_rx) = oneshot::channel();
226 //     thread::spawn(move || {
227 //         thread::sleep(Duration::from_millis(1000));
228 //         let _ = timeout_tx.send(());
229 //     });
230 //
231 //     let core = local_executor::Core::new();
232 //     let (done_tx, done_rx) = oneshot::channel();
233 //     let future = Dead{done: done_tx};
234 //     let rx = spawn(future, &core);
235 //     let res = core.run(
236 //         Ok::<_, ()>(())
237 //         .into_future()
238 //         .then(move |_| {
239 //             // forget the spawned future: should keep running, i.e. hit
240 //             // the timeout below.
241 //             rx.forget();
242 //             // and wait for the spawned future to release its resources
243 //             done_rx
244 //         })
245 //         .select2(timeout_rx)
246 //     );
247 //     match res {
248 //         Err(Either::A((oneshot::Canceled, _))) => {
249 //             panic!("forgotten dead future was canceled");
250 //         },
251 //         Ok(Either::B(((), _))) => (), // reached timeout
252 //         _ => {
253 //             panic!("forgotten dead future was canceled (unexpected result)");
254 //         },
255 //     }
256 // }
257