1 use futures::channel::oneshot::{self, Sender};
2 use futures::executor::block_on;
3 use futures::future::{poll_fn, FutureExt};
4 use futures::task::{Context, Poll};
5 use futures_test::task::panic_waker_ref;
6 use std::sync::mpsc;
7 use std::thread;
8
9 #[test]
smoke_poll()10 fn smoke_poll() {
11 let (mut tx, rx) = oneshot::channel::<u32>();
12 let mut rx = Some(rx);
13 let f = poll_fn(|cx| {
14 assert!(tx.poll_canceled(cx).is_pending());
15 assert!(tx.poll_canceled(cx).is_pending());
16 drop(rx.take());
17 assert!(tx.poll_canceled(cx).is_ready());
18 assert!(tx.poll_canceled(cx).is_ready());
19 Poll::Ready(())
20 });
21
22 block_on(f);
23 }
24
25 #[test]
cancel_notifies()26 fn cancel_notifies() {
27 let (mut tx, rx) = oneshot::channel::<u32>();
28
29 let t = thread::spawn(move || {
30 block_on(tx.cancellation());
31 });
32 drop(rx);
33 t.join().unwrap();
34 }
35
36 #[test]
cancel_lots()37 fn cancel_lots() {
38 #[cfg(miri)]
39 const N: usize = 100;
40 #[cfg(not(miri))]
41 const N: usize = 20000;
42
43 let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
44 let t = thread::spawn(move || {
45 for (mut tx, tx2) in rx {
46 block_on(tx.cancellation());
47 tx2.send(()).unwrap();
48 }
49 });
50
51 for _ in 0..N {
52 let (otx, orx) = oneshot::channel::<u32>();
53 let (tx2, rx2) = mpsc::channel();
54 tx.send((otx, tx2)).unwrap();
55 drop(orx);
56 rx2.recv().unwrap();
57 }
58 drop(tx);
59
60 t.join().unwrap();
61 }
62
63 #[test]
cancel_after_sender_drop_doesnt_notify()64 fn cancel_after_sender_drop_doesnt_notify() {
65 let (mut tx, rx) = oneshot::channel::<u32>();
66 let mut cx = Context::from_waker(panic_waker_ref());
67 assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
68 drop(tx);
69 drop(rx);
70 }
71
72 #[test]
close()73 fn close() {
74 let (mut tx, mut rx) = oneshot::channel::<u32>();
75 rx.close();
76 block_on(poll_fn(|cx| {
77 match rx.poll_unpin(cx) {
78 Poll::Ready(Err(_)) => {}
79 _ => panic!(),
80 };
81 assert!(tx.poll_canceled(cx).is_ready());
82 Poll::Ready(())
83 }));
84 }
85
86 #[test]
close_wakes()87 fn close_wakes() {
88 let (mut tx, mut rx) = oneshot::channel::<u32>();
89 let (tx2, rx2) = mpsc::channel();
90 let t = thread::spawn(move || {
91 rx.close();
92 rx2.recv().unwrap();
93 });
94 block_on(tx.cancellation());
95 tx2.send(()).unwrap();
96 t.join().unwrap();
97 }
98
99 #[test]
is_canceled()100 fn is_canceled() {
101 let (tx, rx) = oneshot::channel::<u32>();
102 assert!(!tx.is_canceled());
103 drop(rx);
104 assert!(tx.is_canceled());
105 }
106
107 #[test]
cancel_sends()108 fn cancel_sends() {
109 #[cfg(miri)]
110 const N: usize = 100;
111 #[cfg(not(miri))]
112 const N: usize = 20000;
113
114 let (tx, rx) = mpsc::channel::<Sender<_>>();
115 let t = thread::spawn(move || {
116 for otx in rx {
117 let _ = otx.send(42);
118 }
119 });
120
121 for _ in 0..N {
122 let (otx, mut orx) = oneshot::channel::<u32>();
123 tx.send(otx).unwrap();
124
125 orx.close();
126 let _ = block_on(orx);
127 }
128
129 drop(tx);
130 t.join().unwrap();
131 }
132
133 // #[test]
134 // fn spawn_sends_items() {
135 // let core = local_executor::Core::new();
136 // let future = ok::<_, ()>(1);
137 // let rx = spawn(future, &core);
138 // assert_eq!(core.run(rx).unwrap(), 1);
139 // }
140 //
141 // #[test]
142 // fn spawn_kill_dead_stream() {
143 // use std::thread;
144 // use std::time::Duration;
145 // use futures::future::Either;
146 // use futures::sync::oneshot;
147 //
148 // // a future which never returns anything (forever accepting incoming
149 // // connections), but dropping it leads to observable side effects
150 // // (like closing listening sockets, releasing limited resources,
151 // // ...)
152 // #[derive(Debug)]
153 // struct Dead {
154 // // when dropped you should get Err(oneshot::Canceled) on the
155 // // receiving end
156 // done: oneshot::Sender<()>,
157 // }
158 // impl Future for Dead {
159 // type Item = ();
160 // type Error = ();
161 //
162 // fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
163 // Ok(Poll::Pending)
164 // }
165 // }
166 //
167 // // need to implement a timeout for the test, as it would hang
168 // // forever right now
169 // let (timeout_tx, timeout_rx) = oneshot::channel();
170 // thread::spawn(move || {
171 // thread::sleep(Duration::from_millis(1000));
172 // let _ = timeout_tx.send(());
173 // });
174 //
175 // let core = local_executor::Core::new();
176 // let (done_tx, done_rx) = oneshot::channel();
177 // let future = Dead{done: done_tx};
178 // let rx = spawn(future, &core);
179 // let res = core.run(
180 // Ok::<_, ()>(())
181 // .into_future()
182 // .then(move |_| {
183 // // now drop the spawned future: maybe some timeout exceeded,
184 // // or some connection on this end was closed by the remote
185 // // end.
186 // drop(rx);
187 // // and wait for the spawned future to release its resources
188 // done_rx
189 // })
190 // .select2(timeout_rx)
191 // );
192 // match res {
193 // Err(Either::A((oneshot::Canceled, _))) => (),
194 // Ok(Either::B(((), _))) => {
195 // panic!("dead future wasn't canceled (timeout)");
196 // },
197 // _ => {
198 // panic!("dead future wasn't canceled (unexpected result)");
199 // },
200 // }
201 // }
202 //
203 // #[test]
204 // fn spawn_dont_kill_forgot_dead_stream() {
205 // use std::thread;
206 // use std::time::Duration;
207 // use futures::future::Either;
208 // use futures::sync::oneshot;
209 //
210 // // a future which never returns anything (forever accepting incoming
211 // // connections), but dropping it leads to observable side effects
212 // // (like closing listening sockets, releasing limited resources,
213 // // ...)
214 // #[derive(Debug)]
215 // struct Dead {
216 // // when dropped you should get Err(oneshot::Canceled) on the
217 // // receiving end
218 // done: oneshot::Sender<()>,
219 // }
220 // impl Future for Dead {
221 // type Item = ();
222 // type Error = ();
223 //
224 // fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
225 // Ok(Poll::Pending)
226 // }
227 // }
228 //
229 // // need to implement a timeout for the test, as it would hang
230 // // forever right now
231 // let (timeout_tx, timeout_rx) = oneshot::channel();
232 // thread::spawn(move || {
233 // thread::sleep(Duration::from_millis(1000));
234 // let _ = timeout_tx.send(());
235 // });
236 //
237 // let core = local_executor::Core::new();
238 // let (done_tx, done_rx) = oneshot::channel();
239 // let future = Dead{done: done_tx};
240 // let rx = spawn(future, &core);
241 // let res = core.run(
242 // Ok::<_, ()>(())
243 // .into_future()
244 // .then(move |_| {
245 // // forget the spawned future: should keep running, i.e. hit
246 // // the timeout below.
247 // rx.forget();
248 // // and wait for the spawned future to release its resources
249 // done_rx
250 // })
251 // .select2(timeout_rx)
252 // );
253 // match res {
254 // Err(Either::A((oneshot::Canceled, _))) => {
255 // panic!("forgotten dead future was canceled");
256 // },
257 // Ok(Either::B(((), _))) => (), // reached timeout
258 // _ => {
259 // panic!("forgotten dead future was canceled (unexpected result)");
260 // },
261 // }
262 // }
263