• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures_01::executor::{
2     spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01, Spawn as Spawn01,
3     UnsafeNotify as UnsafeNotify01,
4 };
5 use futures_01::{Async as Async01, Future as Future01, Stream as Stream01};
6 #[cfg(feature = "sink")]
7 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01};
8 use futures_core::{future::Future as Future03, stream::Stream as Stream03, task as task03};
9 #[cfg(feature = "sink")]
10 use futures_sink::Sink as Sink03;
11 use std::pin::Pin;
12 use std::task::Context;
13 
14 #[cfg(feature = "io-compat")]
15 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
16 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
17 pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
18 
19 /// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
20 /// object to a futures 0.3-compatible version,
21 #[derive(Debug)]
22 #[must_use = "futures do nothing unless you `.await` or poll them"]
23 pub struct Compat01As03<T> {
24     pub(crate) inner: Spawn01<T>,
25 }
26 
27 impl<T> Unpin for Compat01As03<T> {}
28 
29 impl<T> Compat01As03<T> {
30     /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
31     /// object in a futures 0.3-compatible wrapper.
new(object: T) -> Self32     pub fn new(object: T) -> Self {
33         Self { inner: spawn01(object) }
34     }
35 
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R36     fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R {
37         let notify = &WakerToHandle(cx.waker());
38         self.inner.poll_fn_notify(notify, 0, f)
39     }
40 
41     /// Get a reference to 0.1 Future, Stream, AsyncRead, or AsyncWrite object contained within.
get_ref(&self) -> &T42     pub fn get_ref(&self) -> &T {
43         self.inner.get_ref()
44     }
45 
46     /// Get a mutable reference to 0.1 Future, Stream, AsyncRead or AsyncWrite object contained
47     /// within.
get_mut(&mut self) -> &mut T48     pub fn get_mut(&mut self) -> &mut T {
49         self.inner.get_mut()
50     }
51 
52     /// Consume this wrapper to return the underlying 0.1 Future, Stream, AsyncRead, or
53     /// AsyncWrite object.
into_inner(self) -> T54     pub fn into_inner(self) -> T {
55         self.inner.into_inner()
56     }
57 }
58 
59 /// Extension trait for futures 0.1 [`Future`](futures_01::future::Future)
60 pub trait Future01CompatExt: Future01 {
61     /// Converts a futures 0.1
62     /// [`Future<Item = T, Error = E>`](futures_01::future::Future)
63     /// into a futures 0.3
64     /// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
65     ///
66     /// ```
67     /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
68     /// # futures::executor::block_on(async {
69     /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
70     /// # // feature issues
71     /// use futures_util::compat::Future01CompatExt;
72     ///
73     /// let future = futures_01::future::ok::<u32, ()>(1);
74     /// assert_eq!(future.compat().await, Ok(1));
75     /// # });
76     /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,77     fn compat(self) -> Compat01As03<Self>
78     where
79         Self: Sized,
80     {
81         Compat01As03::new(self)
82     }
83 }
84 impl<Fut: Future01> Future01CompatExt for Fut {}
85 
86 /// Extension trait for futures 0.1 [`Stream`](futures_01::stream::Stream)
87 pub trait Stream01CompatExt: Stream01 {
88     /// Converts a futures 0.1
89     /// [`Stream<Item = T, Error = E>`](futures_01::stream::Stream)
90     /// into a futures 0.3
91     /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
92     ///
93     /// ```
94     /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
95     /// # futures::executor::block_on(async {
96     /// use futures::stream::StreamExt;
97     /// use futures_util::compat::Stream01CompatExt;
98     ///
99     /// let stream = futures_01::stream::once::<u32, ()>(Ok(1));
100     /// let mut stream = stream.compat();
101     /// assert_eq!(stream.next().await, Some(Ok(1)));
102     /// assert_eq!(stream.next().await, None);
103     /// # });
104     /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,105     fn compat(self) -> Compat01As03<Self>
106     where
107         Self: Sized,
108     {
109         Compat01As03::new(self)
110     }
111 }
112 impl<St: Stream01> Stream01CompatExt for St {}
113 
114 /// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
115 #[cfg(feature = "sink")]
116 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
117 pub trait Sink01CompatExt: Sink01 {
118     /// Converts a futures 0.1
119     /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
120     /// into a futures 0.3
121     /// [`Sink<T, Error = E>`](futures_sink::Sink).
122     ///
123     /// ```
124     /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
125     /// # futures::executor::block_on(async {
126     /// use futures::{sink::SinkExt, stream::StreamExt};
127     /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
128     ///
129     /// let (tx, rx) = futures_01::unsync::mpsc::channel(1);
130     /// let (mut tx, mut rx) = (tx.sink_compat(), rx.compat());
131     ///
132     /// tx.send(1).await.unwrap();
133     /// drop(tx);
134     /// assert_eq!(rx.next().await, Some(Ok(1)));
135     /// assert_eq!(rx.next().await, None);
136     /// # });
137     /// ```
sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem> where Self: Sized,138     fn sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem>
139     where
140         Self: Sized,
141     {
142         Compat01As03Sink::new(self)
143     }
144 }
145 #[cfg(feature = "sink")]
146 impl<Si: Sink01> Sink01CompatExt for Si {}
147 
poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>>148 fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
149     match x? {
150         Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
151         Async01::NotReady => task03::Poll::Pending,
152     }
153 }
154 
155 impl<Fut: Future01> Future03 for Compat01As03<Fut> {
156     type Output = Result<Fut::Item, Fut::Error>;
157 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Self::Output>158     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Self::Output> {
159         poll_01_to_03(self.in_notify(cx, Future01::poll))
160     }
161 }
162 
163 impl<St: Stream01> Stream03 for Compat01As03<St> {
164     type Item = Result<St::Item, St::Error>;
165 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>166     fn poll_next(
167         mut self: Pin<&mut Self>,
168         cx: &mut Context<'_>,
169     ) -> task03::Poll<Option<Self::Item>> {
170         match self.in_notify(cx, Stream01::poll)? {
171             Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
172             Async01::Ready(None) => task03::Poll::Ready(None),
173             Async01::NotReady => task03::Poll::Pending,
174         }
175     }
176 }
177 
178 /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
179 #[cfg(feature = "sink")]
180 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
181 #[derive(Debug)]
182 #[must_use = "sinks do nothing unless polled"]
183 pub struct Compat01As03Sink<S, SinkItem> {
184     pub(crate) inner: Spawn01<S>,
185     pub(crate) buffer: Option<SinkItem>,
186     pub(crate) close_started: bool,
187 }
188 
189 #[cfg(feature = "sink")]
190 impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
191 
192 #[cfg(feature = "sink")]
193 impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
194     /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
new(inner: S) -> Self195     pub fn new(inner: S) -> Self {
196         Self { inner: spawn01(inner), buffer: None, close_started: false }
197     }
198 
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R) -> R199     fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R) -> R {
200         let notify = &WakerToHandle(cx.waker());
201         self.inner.poll_fn_notify(notify, 0, f)
202     }
203 
204     /// Get a reference to 0.1 Sink object contained within.
get_ref(&self) -> &S205     pub fn get_ref(&self) -> &S {
206         self.inner.get_ref()
207     }
208 
209     /// Get a mutable reference to 0.1 Sink contained within.
get_mut(&mut self) -> &mut S210     pub fn get_mut(&mut self) -> &mut S {
211         self.inner.get_mut()
212     }
213 
214     /// Consume this wrapper to return the underlying 0.1 Sink.
into_inner(self) -> S215     pub fn into_inner(self) -> S {
216         self.inner.into_inner()
217     }
218 }
219 
220 #[cfg(feature = "sink")]
221 impl<S, SinkItem> Stream03 for Compat01As03Sink<S, SinkItem>
222 where
223     S: Stream01,
224 {
225     type Item = Result<S::Item, S::Error>;
226 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>227     fn poll_next(
228         mut self: Pin<&mut Self>,
229         cx: &mut Context<'_>,
230     ) -> task03::Poll<Option<Self::Item>> {
231         match self.in_notify(cx, Stream01::poll)? {
232             Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
233             Async01::Ready(None) => task03::Poll::Ready(None),
234             Async01::NotReady => task03::Poll::Pending,
235         }
236     }
237 }
238 
239 #[cfg(feature = "sink")]
240 impl<S, SinkItem> Sink03<SinkItem> for Compat01As03Sink<S, SinkItem>
241 where
242     S: Sink01<SinkItem = SinkItem>,
243 {
244     type Error = S::SinkError;
245 
start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error>246     fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
247         debug_assert!(self.buffer.is_none());
248         self.buffer = Some(item);
249         Ok(())
250     }
251 
poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>252     fn poll_ready(
253         mut self: Pin<&mut Self>,
254         cx: &mut Context<'_>,
255     ) -> task03::Poll<Result<(), Self::Error>> {
256         match self.buffer.take() {
257             Some(item) => match self.in_notify(cx, |f| f.start_send(item))? {
258                 AsyncSink01::Ready => task03::Poll::Ready(Ok(())),
259                 AsyncSink01::NotReady(i) => {
260                     self.buffer = Some(i);
261                     task03::Poll::Pending
262                 }
263             },
264             None => task03::Poll::Ready(Ok(())),
265         }
266     }
267 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>268     fn poll_flush(
269         mut self: Pin<&mut Self>,
270         cx: &mut Context<'_>,
271     ) -> task03::Poll<Result<(), Self::Error>> {
272         let item = self.buffer.take();
273         match self.in_notify(cx, |f| match item {
274             Some(i) => match f.start_send(i)? {
275                 AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)),
276                 AsyncSink01::NotReady(t) => Ok((Async01::NotReady, Some(t))),
277             },
278             None => f.poll_complete().map(|i| (i, None)),
279         })? {
280             (Async01::Ready(_), _) => task03::Poll::Ready(Ok(())),
281             (Async01::NotReady, item) => {
282                 self.buffer = item;
283                 task03::Poll::Pending
284             }
285         }
286     }
287 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>288     fn poll_close(
289         mut self: Pin<&mut Self>,
290         cx: &mut Context<'_>,
291     ) -> task03::Poll<Result<(), Self::Error>> {
292         let item = self.buffer.take();
293         let close_started = self.close_started;
294 
295         let result = self.in_notify(cx, |f| {
296             if !close_started {
297                 if let Some(item) = item {
298                     if let AsyncSink01::NotReady(item) = f.start_send(item)? {
299                         return Ok((Async01::NotReady, Some(item), false));
300                     }
301                 }
302 
303                 if let Async01::NotReady = f.poll_complete()? {
304                     return Ok((Async01::NotReady, None, false));
305                 }
306             }
307 
308             Ok((<S as Sink01>::close(f)?, None, true))
309         });
310 
311         match result? {
312             (Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())),
313             (Async01::NotReady, item, close_started) => {
314                 self.buffer = item;
315                 self.close_started = close_started;
316                 task03::Poll::Pending
317             }
318         }
319     }
320 }
321 
322 struct NotifyWaker(task03::Waker);
323 
324 #[allow(missing_debug_implementations)] // false positive: this is private type
325 #[derive(Clone)]
326 struct WakerToHandle<'a>(&'a task03::Waker);
327 
328 impl From<WakerToHandle<'_>> for NotifyHandle01 {
from(handle: WakerToHandle<'_>) -> Self329     fn from(handle: WakerToHandle<'_>) -> Self {
330         let ptr = Box::new(NotifyWaker(handle.0.clone()));
331 
332         unsafe { Self::new(Box::into_raw(ptr)) }
333     }
334 }
335 
336 impl Notify01 for NotifyWaker {
notify(&self, _: usize)337     fn notify(&self, _: usize) {
338         self.0.wake_by_ref();
339     }
340 }
341 
342 unsafe impl UnsafeNotify01 for NotifyWaker {
clone_raw(&self) -> NotifyHandle01343     unsafe fn clone_raw(&self) -> NotifyHandle01 {
344         WakerToHandle(&self.0).into()
345     }
346 
drop_raw(&self)347     unsafe fn drop_raw(&self) {
348         let ptr: *const dyn UnsafeNotify01 = self;
349         drop(Box::from_raw(ptr as *mut dyn UnsafeNotify01));
350     }
351 }
352 
353 #[cfg(feature = "io-compat")]
354 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
355 mod io {
356     use super::*;
357     use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
358     use std::io::Error;
359     use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
360 
361     /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
362     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
363     pub trait AsyncRead01CompatExt: AsyncRead01 {
364         /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
365         /// [`AsyncRead`](futures_io::AsyncRead).
366         ///
367         /// ```
368         /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
369         /// # futures::executor::block_on(async {
370         /// use futures::io::AsyncReadExt;
371         /// use futures_util::compat::AsyncRead01CompatExt;
372         ///
373         /// let input = b"Hello World!";
374         /// let reader /* : impl tokio_io::AsyncRead */ = std::io::Cursor::new(input);
375         /// let mut reader /* : impl futures::io::AsyncRead + Unpin */ = reader.compat();
376         ///
377         /// let mut output = Vec::with_capacity(12);
378         /// reader.read_to_end(&mut output).await.unwrap();
379         /// assert_eq!(output, input);
380         /// # });
381         /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,382         fn compat(self) -> Compat01As03<Self>
383         where
384             Self: Sized,
385         {
386             Compat01As03::new(self)
387         }
388     }
389     impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
390 
391     /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
392     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
393     pub trait AsyncWrite01CompatExt: AsyncWrite01 {
394         /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
395         /// [`AsyncWrite`](futures_io::AsyncWrite).
396         ///
397         /// ```
398         /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
399         /// # futures::executor::block_on(async {
400         /// use futures::io::AsyncWriteExt;
401         /// use futures_util::compat::AsyncWrite01CompatExt;
402         ///
403         /// let input = b"Hello World!";
404         /// let mut cursor = std::io::Cursor::new(Vec::with_capacity(12));
405         ///
406         /// let mut writer = (&mut cursor).compat();
407         /// writer.write_all(input).await.unwrap();
408         ///
409         /// assert_eq!(cursor.into_inner(), input);
410         /// # });
411         /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,412         fn compat(self) -> Compat01As03<Self>
413         where
414             Self: Sized,
415         {
416             Compat01As03::new(self)
417         }
418     }
419     impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
420 
421     impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> task03::Poll<Result<usize, Error>>422         fn poll_read(
423             mut self: Pin<&mut Self>,
424             cx: &mut Context<'_>,
425             buf: &mut [u8],
426         ) -> task03::Poll<Result<usize, Error>> {
427             poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf)))
428         }
429     }
430 
431     impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> task03::Poll<Result<usize, Error>>432         fn poll_write(
433             mut self: Pin<&mut Self>,
434             cx: &mut Context<'_>,
435             buf: &[u8],
436         ) -> task03::Poll<Result<usize, Error>> {
437             poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf)))
438         }
439 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Error>>440         fn poll_flush(
441             mut self: Pin<&mut Self>,
442             cx: &mut Context<'_>,
443         ) -> task03::Poll<Result<(), Error>> {
444             poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush))
445         }
446 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Error>>447         fn poll_close(
448             mut self: Pin<&mut Self>,
449             cx: &mut Context<'_>,
450         ) -> task03::Poll<Result<(), Error>> {
451             poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown))
452         }
453     }
454 }
455