1 use futures::channel::oneshot::{self, Sender};
2 use futures::executor::block_on;
3 use futures::future::{FutureExt, poll_fn};
4 use futures::task::{Context, Poll};
5 use futures_test::task::panic_waker_ref;
6 use std::sync::mpsc;
7 use std::thread;
8
9 #[test]
smoke_poll()10 fn smoke_poll() {
11 let (mut tx, rx) = oneshot::channel::<u32>();
12 let mut rx = Some(rx);
13 let f = poll_fn(|cx| {
14 assert!(tx.poll_canceled(cx).is_pending());
15 assert!(tx.poll_canceled(cx).is_pending());
16 drop(rx.take());
17 assert!(tx.poll_canceled(cx).is_ready());
18 assert!(tx.poll_canceled(cx).is_ready());
19 Poll::Ready(())
20 });
21
22 block_on(f);
23 }
24
25 #[test]
cancel_notifies()26 fn cancel_notifies() {
27 let (mut tx, rx) = oneshot::channel::<u32>();
28
29 let t = thread::spawn(move || {
30 block_on(tx.cancellation());
31 });
32 drop(rx);
33 t.join().unwrap();
34 }
35
36 #[test]
cancel_lots()37 fn cancel_lots() {
38 let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
39 let t = thread::spawn(move || {
40 for (mut tx, tx2) in rx {
41 block_on(tx.cancellation());
42 tx2.send(()).unwrap();
43 }
44 });
45
46 for _ in 0..20000 {
47 let (otx, orx) = oneshot::channel::<u32>();
48 let (tx2, rx2) = mpsc::channel();
49 tx.send((otx, tx2)).unwrap();
50 drop(orx);
51 rx2.recv().unwrap();
52 }
53 drop(tx);
54
55 t.join().unwrap();
56 }
57
58 #[test]
cancel_after_sender_drop_doesnt_notify()59 fn cancel_after_sender_drop_doesnt_notify() {
60 let (mut tx, rx) = oneshot::channel::<u32>();
61 let mut cx = Context::from_waker(panic_waker_ref());
62 assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
63 drop(tx);
64 drop(rx);
65 }
66
67 #[test]
close()68 fn close() {
69 let (mut tx, mut rx) = oneshot::channel::<u32>();
70 rx.close();
71 block_on(poll_fn(|cx| {
72 match rx.poll_unpin(cx) {
73 Poll::Ready(Err(_)) => {},
74 _ => panic!(),
75 };
76 assert!(tx.poll_canceled(cx).is_ready());
77 Poll::Ready(())
78 }));
79 }
80
81 #[test]
close_wakes()82 fn close_wakes() {
83 let (mut tx, mut rx) = oneshot::channel::<u32>();
84 let (tx2, rx2) = mpsc::channel();
85 let t = thread::spawn(move || {
86 rx.close();
87 rx2.recv().unwrap();
88 });
89 block_on(tx.cancellation());
90 tx2.send(()).unwrap();
91 t.join().unwrap();
92 }
93
94 #[test]
is_canceled()95 fn is_canceled() {
96 let (tx, rx) = oneshot::channel::<u32>();
97 assert!(!tx.is_canceled());
98 drop(rx);
99 assert!(tx.is_canceled());
100 }
101
102 #[test]
cancel_sends()103 fn cancel_sends() {
104 let (tx, rx) = mpsc::channel::<Sender<_>>();
105 let t = thread::spawn(move || {
106 for otx in rx {
107 let _ = otx.send(42);
108 }
109 });
110
111 for _ in 0..20000 {
112 let (otx, mut orx) = oneshot::channel::<u32>();
113 tx.send(otx).unwrap();
114
115 orx.close();
116 let _ = block_on(orx);
117 }
118
119 drop(tx);
120 t.join().unwrap();
121 }
122
123 // #[test]
124 // fn spawn_sends_items() {
125 // let core = local_executor::Core::new();
126 // let future = ok::<_, ()>(1);
127 // let rx = spawn(future, &core);
128 // assert_eq!(core.run(rx).unwrap(), 1);
129 // }
130 //
131 // #[test]
132 // fn spawn_kill_dead_stream() {
133 // use std::thread;
134 // use std::time::Duration;
135 // use futures::future::Either;
136 // use futures::sync::oneshot;
137 //
138 // // a future which never returns anything (forever accepting incoming
139 // // connections), but dropping it leads to observable side effects
140 // // (like closing listening sockets, releasing limited resources,
141 // // ...)
142 // #[derive(Debug)]
143 // struct Dead {
144 // // when dropped you should get Err(oneshot::Canceled) on the
145 // // receiving end
146 // done: oneshot::Sender<()>,
147 // }
148 // impl Future for Dead {
149 // type Item = ();
150 // type Error = ();
151 //
152 // fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
153 // Ok(Poll::Pending)
154 // }
155 // }
156 //
157 // // need to implement a timeout for the test, as it would hang
158 // // forever right now
159 // let (timeout_tx, timeout_rx) = oneshot::channel();
160 // thread::spawn(move || {
161 // thread::sleep(Duration::from_millis(1000));
162 // let _ = timeout_tx.send(());
163 // });
164 //
165 // let core = local_executor::Core::new();
166 // let (done_tx, done_rx) = oneshot::channel();
167 // let future = Dead{done: done_tx};
168 // let rx = spawn(future, &core);
169 // let res = core.run(
170 // Ok::<_, ()>(())
171 // .into_future()
172 // .then(move |_| {
173 // // now drop the spawned future: maybe some timeout exceeded,
174 // // or some connection on this end was closed by the remote
175 // // end.
176 // drop(rx);
177 // // and wait for the spawned future to release its resources
178 // done_rx
179 // })
180 // .select2(timeout_rx)
181 // );
182 // match res {
183 // Err(Either::A((oneshot::Canceled, _))) => (),
184 // Ok(Either::B(((), _))) => {
185 // panic!("dead future wasn't canceled (timeout)");
186 // },
187 // _ => {
188 // panic!("dead future wasn't canceled (unexpected result)");
189 // },
190 // }
191 // }
192 //
193 // #[test]
194 // fn spawn_dont_kill_forgot_dead_stream() {
195 // use std::thread;
196 // use std::time::Duration;
197 // use futures::future::Either;
198 // use futures::sync::oneshot;
199 //
200 // // a future which never returns anything (forever accepting incoming
201 // // connections), but dropping it leads to observable side effects
202 // // (like closing listening sockets, releasing limited resources,
203 // // ...)
204 // #[derive(Debug)]
205 // struct Dead {
206 // // when dropped you should get Err(oneshot::Canceled) on the
207 // // receiving end
208 // done: oneshot::Sender<()>,
209 // }
210 // impl Future for Dead {
211 // type Item = ();
212 // type Error = ();
213 //
214 // fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
215 // Ok(Poll::Pending)
216 // }
217 // }
218 //
219 // // need to implement a timeout for the test, as it would hang
220 // // forever right now
221 // let (timeout_tx, timeout_rx) = oneshot::channel();
222 // thread::spawn(move || {
223 // thread::sleep(Duration::from_millis(1000));
224 // let _ = timeout_tx.send(());
225 // });
226 //
227 // let core = local_executor::Core::new();
228 // let (done_tx, done_rx) = oneshot::channel();
229 // let future = Dead{done: done_tx};
230 // let rx = spawn(future, &core);
231 // let res = core.run(
232 // Ok::<_, ()>(())
233 // .into_future()
234 // .then(move |_| {
235 // // forget the spawned future: should keep running, i.e. hit
236 // // the timeout below.
237 // rx.forget();
238 // // and wait for the spawned future to release its resources
239 // done_rx
240 // })
241 // .select2(timeout_rx)
242 // );
243 // match res {
244 // Err(Either::A((oneshot::Canceled, _))) => {
245 // panic!("forgotten dead future was canceled");
246 // },
247 // Ok(Either::B(((), _))) => (), // reached timeout
248 // _ => {
249 // panic!("forgotten dead future was canceled (unexpected result)");
250 // },
251 // }
252 // }
253