• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures_01::{
2     task as task01, Async as Async01, Future as Future01, Poll as Poll01,
3     Stream as Stream01,
4 };
5 #[cfg(feature = "sink")]
6 use futures_01::{
7     AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01,
8 };
9 use futures_core::{
10     task::{RawWaker, RawWakerVTable},
11     future::TryFuture as TryFuture03,
12     stream::TryStream as TryStream03,
13 };
14 #[cfg(feature = "sink")]
15 use futures_sink::Sink as Sink03;
16 use crate::task::{
17     self as task03,
18     ArcWake as ArcWake03,
19     WakerRef,
20 };
21 #[cfg(feature = "sink")]
22 use std::marker::PhantomData;
23 use std::{
24     mem,
25     pin::Pin,
26     sync::Arc,
27     task::Context,
28 };
29 
30 /// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture) or
31 /// [`TryStream`](futures_core::stream::TryStream) into a futures 0.1
32 /// [`Future`](futures_01::future::Future) or
33 /// [`Stream`](futures_01::stream::Stream).
34 #[derive(Debug, Clone, Copy)]
35 #[must_use = "futures do nothing unless you `.await` or poll them"]
36 pub struct Compat<T> {
37     pub(crate) inner: T,
38 }
39 
40 /// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1
41 /// [`Sink`](futures_01::sink::Sink).
42 #[cfg(feature = "sink")]
43 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
44 #[derive(Debug)]
45 #[must_use = "sinks do nothing unless polled"]
46 pub struct CompatSink<T, Item> {
47     inner: T,
48     _phantom: PhantomData<fn(Item)>,
49 }
50 
51 impl<T> Compat<T> {
52     /// Creates a new [`Compat`].
53     ///
54     /// For types which implement appropriate futures `0.3`
55     /// traits, the result will be a type which implements
56     /// the corresponding futures 0.1 type.
new(inner: T) -> Self57     pub fn new(inner: T) -> Self {
58         Self { inner }
59     }
60 
61     /// Get a reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
62     /// contained within.
get_ref(&self) -> &T63     pub fn get_ref(&self) -> &T {
64         &self.inner
65     }
66 
67     /// Get a mutable reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
68     /// contained within.
get_mut(&mut self) -> &mut T69     pub fn get_mut(&mut self) -> &mut T {
70         &mut self.inner
71     }
72 
73     /// Returns the inner item.
into_inner(self) -> T74     pub fn into_inner(self) -> T {
75         self.inner
76     }
77 }
78 
79 #[cfg(feature = "sink")]
80 impl<T, Item> CompatSink<T, Item> {
81     /// Creates a new [`CompatSink`].
new(inner: T) -> Self82     pub fn new(inner: T) -> Self {
83         Self {
84             inner,
85             _phantom: PhantomData,
86         }
87     }
88 
89     /// Get a reference to 0.3 Sink contained within.
get_ref(&self) -> &T90     pub fn get_ref(&self) -> &T {
91         &self.inner
92     }
93 
94     /// Get a mutable reference to 0.3 Sink contained within.
get_mut(&mut self) -> &mut T95     pub fn get_mut(&mut self) -> &mut T {
96         &mut self.inner
97     }
98 
99     /// Returns the inner item.
into_inner(self) -> T100     pub fn into_inner(self) -> T {
101         self.inner
102     }
103 }
104 
poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E>105 fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>)
106     -> Result<Async01<T>, E>
107 {
108     match x? {
109         task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
110         task03::Poll::Pending => Ok(Async01::NotReady),
111     }
112 }
113 
114 impl<Fut> Future01 for Compat<Fut>
115 where
116     Fut: TryFuture03 + Unpin,
117 {
118     type Item = Fut::Ok;
119     type Error = Fut::Error;
120 
poll(&mut self) -> Poll01<Self::Item, Self::Error>121     fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
122         with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
123     }
124 }
125 
126 impl<St> Stream01 for Compat<St>
127 where
128     St: TryStream03 + Unpin,
129 {
130     type Item = St::Ok;
131     type Error = St::Error;
132 
poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error>133     fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
134         with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
135             task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
136             task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
137             task03::Poll::Pending => Ok(Async01::NotReady),
138         })
139     }
140 }
141 
142 #[cfg(feature = "sink")]
143 impl<T, Item> Sink01 for CompatSink<T, Item>
144 where
145     T: Sink03<Item> + Unpin,
146 {
147     type SinkItem = Item;
148     type SinkError = T::Error;
149 
start_send( &mut self, item: Self::SinkItem, ) -> StartSend01<Self::SinkItem, Self::SinkError>150     fn start_send(
151         &mut self,
152         item: Self::SinkItem,
153     ) -> StartSend01<Self::SinkItem, Self::SinkError> {
154         with_sink_context(self, |mut inner, cx| {
155             match inner.as_mut().poll_ready(cx)? {
156                 task03::Poll::Ready(()) => {
157                     inner.start_send(item).map(|()| AsyncSink01::Ready)
158                 }
159                 task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
160             }
161         })
162     }
163 
poll_complete(&mut self) -> Poll01<(), Self::SinkError>164     fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
165         with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_flush(cx)))
166     }
167 
close(&mut self) -> Poll01<(), Self::SinkError>168     fn close(&mut self) -> Poll01<(), Self::SinkError> {
169         with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_close(cx)))
170     }
171 }
172 
173 #[derive(Clone)]
174 struct Current(task01::Task);
175 
176 impl Current {
new() -> Self177     fn new() -> Self {
178         Self(task01::current())
179     }
180 
as_waker(&self) -> WakerRef<'_>181     fn as_waker(&self) -> WakerRef<'_> {
182         unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current {
183             &*(ptr as *const Current)
184         }
185         fn current_to_ptr(current: &Current) -> *const () {
186             current as *const Current as *const ()
187         }
188 
189         unsafe fn clone(ptr: *const ()) -> RawWaker {
190             // Lazily create the `Arc` only when the waker is actually cloned.
191             // FIXME: remove `transmute` when a `Waker` -> `RawWaker` conversion
192             // function is landed in `core`.
193             mem::transmute::<task03::Waker, RawWaker>(
194                 task03::waker(Arc::new(ptr_to_current(ptr).clone()))
195             )
196         }
197         unsafe fn drop(_: *const ()) {}
198         unsafe fn wake(ptr: *const ()) {
199             ptr_to_current(ptr).0.notify()
200         }
201 
202         let ptr = current_to_ptr(self);
203         let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
204         WakerRef::new_unowned(std::mem::ManuallyDrop::new(unsafe {
205             task03::Waker::from_raw(RawWaker::new(ptr, vtable))
206         }))
207     }
208 }
209 
210 impl ArcWake03 for Current {
wake_by_ref(arc_self: &Arc<Self>)211     fn wake_by_ref(arc_self: &Arc<Self>) {
212         arc_self.0.notify();
213     }
214 }
215 
with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,216 fn with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R
217 where
218     T: Unpin,
219     F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
220 {
221     let current = Current::new();
222     let waker = current.as_waker();
223     let mut cx = Context::from_waker(&waker);
224     f(Pin::new(&mut compat.inner), &mut cx)
225 }
226 
227 #[cfg(feature = "sink")]
with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,228 fn with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R
229 where
230     T: Unpin,
231     F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
232 {
233     let current = Current::new();
234     let waker = current.as_waker();
235     let mut cx = Context::from_waker(&waker);
236     f(Pin::new(&mut compat.inner), &mut cx)
237 }
238 
239 #[cfg(feature = "io-compat")]
240 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
241 mod io {
242     use super::*;
243     use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
244     use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
245 
poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error>246     fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>)
247         -> Result<T, std::io::Error>
248     {
249         match x {
250             task03::Poll::Ready(Ok(t)) => Ok(t),
251             task03::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()),
252             task03::Poll::Ready(Err(e)) => Err(e),
253         }
254     }
255 
256     impl<R: AsyncRead03 + Unpin> std::io::Read for Compat<R> {
read(&mut self, buf: &mut [u8]) -> std::io::Result<usize>257         fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
258             let current = Current::new();
259             let waker = current.as_waker();
260             let mut cx = Context::from_waker(&waker);
261             poll_03_to_io(Pin::new(&mut self.inner).poll_read(&mut cx, buf))
262         }
263     }
264 
265     impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {
266         #[cfg(feature = "read-initializer")]
prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool267         unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
268             let initializer = self.inner.initializer();
269             let does_init = initializer.should_initialize();
270             if does_init {
271                 initializer.initialize(buf);
272             }
273             does_init
274         }
275     }
276 
277     impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
write(&mut self, buf: &[u8]) -> std::io::Result<usize>278         fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
279             let current = Current::new();
280             let waker = current.as_waker();
281             let mut cx = Context::from_waker(&waker);
282             poll_03_to_io(Pin::new(&mut self.inner).poll_write(&mut cx, buf))
283         }
284 
flush(&mut self) -> std::io::Result<()>285         fn flush(&mut self) -> std::io::Result<()> {
286             let current = Current::new();
287             let waker = current.as_waker();
288             let mut cx = Context::from_waker(&waker);
289             poll_03_to_io(Pin::new(&mut self.inner).poll_flush(&mut cx))
290         }
291     }
292 
293     impl<W: AsyncWrite03 + Unpin> AsyncWrite01 for Compat<W> {
shutdown(&mut self) -> std::io::Result<Async01<()>>294         fn shutdown(&mut self) -> std::io::Result<Async01<()>> {
295             let current = Current::new();
296             let waker = current.as_waker();
297             let mut cx = Context::from_waker(&waker);
298             poll_03_to_01(Pin::new(&mut self.inner).poll_close(&mut cx))
299         }
300     }
301 }
302