1 use futures::channel::oneshot::{self, Sender};
2 use futures::executor::block_on;
3 use futures::future::{poll_fn, FutureExt};
4 use futures::task::{Context, Poll};
5 use futures_test::task::panic_waker_ref;
6 use std::sync::mpsc;
7 use std::thread;
8
9 #[test]
smoke_poll()10 fn smoke_poll() {
11 let (mut tx, rx) = oneshot::channel::<u32>();
12 let mut rx = Some(rx);
13 let f = poll_fn(|cx| {
14 assert!(tx.poll_canceled(cx).is_pending());
15 assert!(tx.poll_canceled(cx).is_pending());
16 drop(rx.take());
17 assert!(tx.poll_canceled(cx).is_ready());
18 assert!(tx.poll_canceled(cx).is_ready());
19 Poll::Ready(())
20 });
21
22 block_on(f);
23 }
24
25 #[test]
cancel_notifies()26 fn cancel_notifies() {
27 let (mut tx, rx) = oneshot::channel::<u32>();
28
29 let t = thread::spawn(move || {
30 block_on(tx.cancellation());
31 });
32 drop(rx);
33 t.join().unwrap();
34 }
35
36 #[test]
cancel_lots()37 fn cancel_lots() {
38 const N: usize = if cfg!(miri) { 100 } else { 20000 };
39
40 let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
41 let t = thread::spawn(move || {
42 for (mut tx, tx2) in rx {
43 block_on(tx.cancellation());
44 tx2.send(()).unwrap();
45 }
46 });
47
48 for _ in 0..N {
49 let (otx, orx) = oneshot::channel::<u32>();
50 let (tx2, rx2) = mpsc::channel();
51 tx.send((otx, tx2)).unwrap();
52 drop(orx);
53 rx2.recv().unwrap();
54 }
55 drop(tx);
56
57 t.join().unwrap();
58 }
59
60 #[test]
cancel_after_sender_drop_doesnt_notify()61 fn cancel_after_sender_drop_doesnt_notify() {
62 let (mut tx, rx) = oneshot::channel::<u32>();
63 let mut cx = Context::from_waker(panic_waker_ref());
64 assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
65 drop(tx);
66 drop(rx);
67 }
68
69 #[test]
close()70 fn close() {
71 let (mut tx, mut rx) = oneshot::channel::<u32>();
72 rx.close();
73 block_on(poll_fn(|cx| {
74 match rx.poll_unpin(cx) {
75 Poll::Ready(Err(_)) => {}
76 _ => panic!(),
77 };
78 assert!(tx.poll_canceled(cx).is_ready());
79 Poll::Ready(())
80 }));
81 }
82
83 #[test]
close_wakes()84 fn close_wakes() {
85 let (mut tx, mut rx) = oneshot::channel::<u32>();
86 let (tx2, rx2) = mpsc::channel();
87 let t = thread::spawn(move || {
88 rx.close();
89 rx2.recv().unwrap();
90 });
91 block_on(tx.cancellation());
92 tx2.send(()).unwrap();
93 t.join().unwrap();
94 }
95
96 #[test]
is_canceled()97 fn is_canceled() {
98 let (tx, rx) = oneshot::channel::<u32>();
99 assert!(!tx.is_canceled());
100 drop(rx);
101 assert!(tx.is_canceled());
102 }
103
104 #[test]
cancel_sends()105 fn cancel_sends() {
106 const N: usize = if cfg!(miri) { 100 } else { 20000 };
107
108 let (tx, rx) = mpsc::channel::<Sender<_>>();
109 let t = thread::spawn(move || {
110 for otx in rx {
111 let _ = otx.send(42);
112 }
113 });
114
115 for _ in 0..N {
116 let (otx, mut orx) = oneshot::channel::<u32>();
117 tx.send(otx).unwrap();
118
119 orx.close();
120 let _ = block_on(orx);
121 }
122
123 drop(tx);
124 t.join().unwrap();
125 }
126
127 // #[test]
128 // fn spawn_sends_items() {
129 // let core = local_executor::Core::new();
130 // let future = ok::<_, ()>(1);
131 // let rx = spawn(future, &core);
132 // assert_eq!(core.run(rx).unwrap(), 1);
133 // }
134 //
135 // #[test]
136 // fn spawn_kill_dead_stream() {
137 // use std::thread;
138 // use std::time::Duration;
139 // use futures::future::Either;
140 // use futures::sync::oneshot;
141 //
142 // // a future which never returns anything (forever accepting incoming
143 // // connections), but dropping it leads to observable side effects
144 // // (like closing listening sockets, releasing limited resources,
145 // // ...)
146 // #[derive(Debug)]
147 // struct Dead {
148 // // when dropped you should get Err(oneshot::Canceled) on the
149 // // receiving end
150 // done: oneshot::Sender<()>,
151 // }
152 // impl Future for Dead {
153 // type Item = ();
154 // type Error = ();
155 //
156 // fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
157 // Ok(Poll::Pending)
158 // }
159 // }
160 //
161 // // need to implement a timeout for the test, as it would hang
162 // // forever right now
163 // let (timeout_tx, timeout_rx) = oneshot::channel();
164 // thread::spawn(move || {
165 // thread::sleep(Duration::from_millis(1000));
166 // let _ = timeout_tx.send(());
167 // });
168 //
169 // let core = local_executor::Core::new();
170 // let (done_tx, done_rx) = oneshot::channel();
171 // let future = Dead{done: done_tx};
172 // let rx = spawn(future, &core);
173 // let res = core.run(
174 // Ok::<_, ()>(())
175 // .into_future()
176 // .then(move |_| {
177 // // now drop the spawned future: maybe some timeout exceeded,
178 // // or some connection on this end was closed by the remote
179 // // end.
180 // drop(rx);
181 // // and wait for the spawned future to release its resources
182 // done_rx
183 // })
184 // .select2(timeout_rx)
185 // );
186 // match res {
187 // Err(Either::A((oneshot::Canceled, _))) => (),
188 // Ok(Either::B(((), _))) => {
189 // panic!("dead future wasn't canceled (timeout)");
190 // },
191 // _ => {
192 // panic!("dead future wasn't canceled (unexpected result)");
193 // },
194 // }
195 // }
196 //
197 // #[test]
198 // fn spawn_dont_kill_forgot_dead_stream() {
199 // use std::thread;
200 // use std::time::Duration;
201 // use futures::future::Either;
202 // use futures::sync::oneshot;
203 //
204 // // a future which never returns anything (forever accepting incoming
205 // // connections), but dropping it leads to observable side effects
206 // // (like closing listening sockets, releasing limited resources,
207 // // ...)
208 // #[derive(Debug)]
209 // struct Dead {
210 // // when dropped you should get Err(oneshot::Canceled) on the
211 // // receiving end
212 // done: oneshot::Sender<()>,
213 // }
214 // impl Future for Dead {
215 // type Item = ();
216 // type Error = ();
217 //
218 // fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
219 // Ok(Poll::Pending)
220 // }
221 // }
222 //
223 // // need to implement a timeout for the test, as it would hang
224 // // forever right now
225 // let (timeout_tx, timeout_rx) = oneshot::channel();
226 // thread::spawn(move || {
227 // thread::sleep(Duration::from_millis(1000));
228 // let _ = timeout_tx.send(());
229 // });
230 //
231 // let core = local_executor::Core::new();
232 // let (done_tx, done_rx) = oneshot::channel();
233 // let future = Dead{done: done_tx};
234 // let rx = spawn(future, &core);
235 // let res = core.run(
236 // Ok::<_, ()>(())
237 // .into_future()
238 // .then(move |_| {
239 // // forget the spawned future: should keep running, i.e. hit
240 // // the timeout below.
241 // rx.forget();
242 // // and wait for the spawned future to release its resources
243 // done_rx
244 // })
245 // .select2(timeout_rx)
246 // );
247 // match res {
248 // Err(Either::A((oneshot::Canceled, _))) => {
249 // panic!("forgotten dead future was canceled");
250 // },
251 // Ok(Either::B(((), _))) => (), // reached timeout
252 // _ => {
253 // panic!("forgotten dead future was canceled (unexpected result)");
254 // },
255 // }
256 // }
257