• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::task::{self as task03, ArcWake as ArcWake03, WakerRef};
2 use futures_01::{
3     task as task01, Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01,
4 };
5 #[cfg(feature = "sink")]
6 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01};
7 use futures_core::{
8     future::TryFuture as TryFuture03,
9     stream::TryStream as TryStream03,
10     task::{RawWaker, RawWakerVTable},
11 };
12 #[cfg(feature = "sink")]
13 use futures_sink::Sink as Sink03;
14 #[cfg(feature = "sink")]
15 use std::marker::PhantomData;
16 use std::{mem, pin::Pin, sync::Arc, task::Context};
17 
18 /// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture) or
19 /// [`TryStream`](futures_core::stream::TryStream) into a futures 0.1
20 /// [`Future`](futures_01::future::Future) or
21 /// [`Stream`](futures_01::stream::Stream).
22 #[derive(Debug, Clone, Copy)]
23 #[must_use = "futures do nothing unless you `.await` or poll them"]
24 pub struct Compat<T> {
25     pub(crate) inner: T,
26 }
27 
28 /// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1
29 /// [`Sink`](futures_01::sink::Sink).
30 #[cfg(feature = "sink")]
31 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
32 #[derive(Debug)]
33 #[must_use = "sinks do nothing unless polled"]
34 pub struct CompatSink<T, Item> {
35     inner: T,
36     _phantom: PhantomData<fn(Item)>,
37 }
38 
39 impl<T> Compat<T> {
40     /// Creates a new [`Compat`].
41     ///
42     /// For types which implement appropriate futures `0.3`
43     /// traits, the result will be a type which implements
44     /// the corresponding futures 0.1 type.
new(inner: T) -> Self45     pub fn new(inner: T) -> Self {
46         Self { inner }
47     }
48 
49     /// Get a reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
50     /// contained within.
get_ref(&self) -> &T51     pub fn get_ref(&self) -> &T {
52         &self.inner
53     }
54 
55     /// Get a mutable reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
56     /// contained within.
get_mut(&mut self) -> &mut T57     pub fn get_mut(&mut self) -> &mut T {
58         &mut self.inner
59     }
60 
61     /// Returns the inner item.
into_inner(self) -> T62     pub fn into_inner(self) -> T {
63         self.inner
64     }
65 }
66 
67 #[cfg(feature = "sink")]
68 impl<T, Item> CompatSink<T, Item> {
69     /// Creates a new [`CompatSink`].
new(inner: T) -> Self70     pub fn new(inner: T) -> Self {
71         Self { inner, _phantom: PhantomData }
72     }
73 
74     /// Get a reference to 0.3 Sink contained within.
get_ref(&self) -> &T75     pub fn get_ref(&self) -> &T {
76         &self.inner
77     }
78 
79     /// Get a mutable reference to 0.3 Sink contained within.
get_mut(&mut self) -> &mut T80     pub fn get_mut(&mut self) -> &mut T {
81         &mut self.inner
82     }
83 
84     /// Returns the inner item.
into_inner(self) -> T85     pub fn into_inner(self) -> T {
86         self.inner
87     }
88 }
89 
poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E>90 fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E> {
91     match x? {
92         task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
93         task03::Poll::Pending => Ok(Async01::NotReady),
94     }
95 }
96 
97 impl<Fut> Future01 for Compat<Fut>
98 where
99     Fut: TryFuture03 + Unpin,
100 {
101     type Item = Fut::Ok;
102     type Error = Fut::Error;
103 
poll(&mut self) -> Poll01<Self::Item, Self::Error>104     fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
105         with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
106     }
107 }
108 
109 impl<St> Stream01 for Compat<St>
110 where
111     St: TryStream03 + Unpin,
112 {
113     type Item = St::Ok;
114     type Error = St::Error;
115 
poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error>116     fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
117         with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
118             task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
119             task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
120             task03::Poll::Pending => Ok(Async01::NotReady),
121         })
122     }
123 }
124 
125 #[cfg(feature = "sink")]
126 impl<T, Item> Sink01 for CompatSink<T, Item>
127 where
128     T: Sink03<Item> + Unpin,
129 {
130     type SinkItem = Item;
131     type SinkError = T::Error;
132 
start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError>133     fn start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError> {
134         with_sink_context(self, |mut inner, cx| match inner.as_mut().poll_ready(cx)? {
135             task03::Poll::Ready(()) => inner.start_send(item).map(|()| AsyncSink01::Ready),
136             task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
137         })
138     }
139 
poll_complete(&mut self) -> Poll01<(), Self::SinkError>140     fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
141         with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_flush(cx)))
142     }
143 
close(&mut self) -> Poll01<(), Self::SinkError>144     fn close(&mut self) -> Poll01<(), Self::SinkError> {
145         with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_close(cx)))
146     }
147 }
148 
149 #[derive(Clone)]
150 struct Current(task01::Task);
151 
152 impl Current {
new() -> Self153     fn new() -> Self {
154         Self(task01::current())
155     }
156 
as_waker(&self) -> WakerRef<'_>157     fn as_waker(&self) -> WakerRef<'_> {
158         unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current {
159             &*(ptr as *const Current)
160         }
161         fn current_to_ptr(current: &Current) -> *const () {
162             current as *const Current as *const ()
163         }
164 
165         unsafe fn clone(ptr: *const ()) -> RawWaker {
166             // Lazily create the `Arc` only when the waker is actually cloned.
167             // FIXME: remove `transmute` when a `Waker` -> `RawWaker` conversion
168             // function is landed in `core`.
169             mem::transmute::<task03::Waker, RawWaker>(task03::waker(Arc::new(
170                 ptr_to_current(ptr).clone(),
171             )))
172         }
173         unsafe fn drop(_: *const ()) {}
174         unsafe fn wake(ptr: *const ()) {
175             ptr_to_current(ptr).0.notify()
176         }
177 
178         let ptr = current_to_ptr(self);
179         let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
180         WakerRef::new_unowned(std::mem::ManuallyDrop::new(unsafe {
181             task03::Waker::from_raw(RawWaker::new(ptr, vtable))
182         }))
183     }
184 }
185 
186 impl ArcWake03 for Current {
wake_by_ref(arc_self: &Arc<Self>)187     fn wake_by_ref(arc_self: &Arc<Self>) {
188         arc_self.0.notify();
189     }
190 }
191 
with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,192 fn with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R
193 where
194     T: Unpin,
195     F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
196 {
197     let current = Current::new();
198     let waker = current.as_waker();
199     let mut cx = Context::from_waker(&waker);
200     f(Pin::new(&mut compat.inner), &mut cx)
201 }
202 
203 #[cfg(feature = "sink")]
with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,204 fn with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R
205 where
206     T: Unpin,
207     F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
208 {
209     let current = Current::new();
210     let waker = current.as_waker();
211     let mut cx = Context::from_waker(&waker);
212     f(Pin::new(&mut compat.inner), &mut cx)
213 }
214 
215 #[cfg(feature = "io-compat")]
216 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
217 mod io {
218     use super::*;
219     use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
220     use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
221 
poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error>222     fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error> {
223         match x {
224             task03::Poll::Ready(Ok(t)) => Ok(t),
225             task03::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()),
226             task03::Poll::Ready(Err(e)) => Err(e),
227         }
228     }
229 
230     impl<R: AsyncRead03 + Unpin> std::io::Read for Compat<R> {
read(&mut self, buf: &mut [u8]) -> std::io::Result<usize>231         fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
232             let current = Current::new();
233             let waker = current.as_waker();
234             let mut cx = Context::from_waker(&waker);
235             poll_03_to_io(Pin::new(&mut self.inner).poll_read(&mut cx, buf))
236         }
237     }
238 
239     impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {}
240 
241     impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
write(&mut self, buf: &[u8]) -> std::io::Result<usize>242         fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
243             let current = Current::new();
244             let waker = current.as_waker();
245             let mut cx = Context::from_waker(&waker);
246             poll_03_to_io(Pin::new(&mut self.inner).poll_write(&mut cx, buf))
247         }
248 
flush(&mut self) -> std::io::Result<()>249         fn flush(&mut self) -> std::io::Result<()> {
250             let current = Current::new();
251             let waker = current.as_waker();
252             let mut cx = Context::from_waker(&waker);
253             poll_03_to_io(Pin::new(&mut self.inner).poll_flush(&mut cx))
254         }
255     }
256 
257     impl<W: AsyncWrite03 + Unpin> AsyncWrite01 for Compat<W> {
shutdown(&mut self) -> std::io::Result<Async01<()>>258         fn shutdown(&mut self) -> std::io::Result<Async01<()>> {
259             let current = Current::new();
260             let waker = current.as_waker();
261             let mut cx = Context::from_waker(&waker);
262             poll_03_to_01(Pin::new(&mut self.inner).poll_close(&mut cx))
263         }
264     }
265 }
266