1 use crate::task::{self as task03, ArcWake as ArcWake03, WakerRef};
2 use futures_01::{
3 task as task01, Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01,
4 };
5 #[cfg(feature = "sink")]
6 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01};
7 use futures_core::{
8 future::TryFuture as TryFuture03,
9 stream::TryStream as TryStream03,
10 task::{RawWaker, RawWakerVTable},
11 };
12 #[cfg(feature = "sink")]
13 use futures_sink::Sink as Sink03;
14 #[cfg(feature = "sink")]
15 use std::marker::PhantomData;
16 use std::{mem, pin::Pin, sync::Arc, task::Context};
17
18 /// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture) or
19 /// [`TryStream`](futures_core::stream::TryStream) into a futures 0.1
20 /// [`Future`](futures_01::future::Future) or
21 /// [`Stream`](futures_01::stream::Stream).
22 #[derive(Debug, Clone, Copy)]
23 #[must_use = "futures do nothing unless you `.await` or poll them"]
24 pub struct Compat<T> {
25 pub(crate) inner: T,
26 }
27
28 /// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1
29 /// [`Sink`](futures_01::sink::Sink).
30 #[cfg(feature = "sink")]
31 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
32 #[derive(Debug)]
33 #[must_use = "sinks do nothing unless polled"]
34 pub struct CompatSink<T, Item> {
35 inner: T,
36 _phantom: PhantomData<fn(Item)>,
37 }
38
39 impl<T> Compat<T> {
40 /// Creates a new [`Compat`].
41 ///
42 /// For types which implement appropriate futures `0.3`
43 /// traits, the result will be a type which implements
44 /// the corresponding futures 0.1 type.
new(inner: T) -> Self45 pub fn new(inner: T) -> Self {
46 Self { inner }
47 }
48
49 /// Get a reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
50 /// contained within.
get_ref(&self) -> &T51 pub fn get_ref(&self) -> &T {
52 &self.inner
53 }
54
55 /// Get a mutable reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
56 /// contained within.
get_mut(&mut self) -> &mut T57 pub fn get_mut(&mut self) -> &mut T {
58 &mut self.inner
59 }
60
61 /// Returns the inner item.
into_inner(self) -> T62 pub fn into_inner(self) -> T {
63 self.inner
64 }
65 }
66
67 #[cfg(feature = "sink")]
68 impl<T, Item> CompatSink<T, Item> {
69 /// Creates a new [`CompatSink`].
new(inner: T) -> Self70 pub fn new(inner: T) -> Self {
71 Self { inner, _phantom: PhantomData }
72 }
73
74 /// Get a reference to 0.3 Sink contained within.
get_ref(&self) -> &T75 pub fn get_ref(&self) -> &T {
76 &self.inner
77 }
78
79 /// Get a mutable reference to 0.3 Sink contained within.
get_mut(&mut self) -> &mut T80 pub fn get_mut(&mut self) -> &mut T {
81 &mut self.inner
82 }
83
84 /// Returns the inner item.
into_inner(self) -> T85 pub fn into_inner(self) -> T {
86 self.inner
87 }
88 }
89
poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E>90 fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E> {
91 match x? {
92 task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
93 task03::Poll::Pending => Ok(Async01::NotReady),
94 }
95 }
96
97 impl<Fut> Future01 for Compat<Fut>
98 where
99 Fut: TryFuture03 + Unpin,
100 {
101 type Item = Fut::Ok;
102 type Error = Fut::Error;
103
poll(&mut self) -> Poll01<Self::Item, Self::Error>104 fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
105 with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
106 }
107 }
108
109 impl<St> Stream01 for Compat<St>
110 where
111 St: TryStream03 + Unpin,
112 {
113 type Item = St::Ok;
114 type Error = St::Error;
115
poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error>116 fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
117 with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
118 task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
119 task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
120 task03::Poll::Pending => Ok(Async01::NotReady),
121 })
122 }
123 }
124
125 #[cfg(feature = "sink")]
126 impl<T, Item> Sink01 for CompatSink<T, Item>
127 where
128 T: Sink03<Item> + Unpin,
129 {
130 type SinkItem = Item;
131 type SinkError = T::Error;
132
start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError>133 fn start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError> {
134 with_sink_context(self, |mut inner, cx| match inner.as_mut().poll_ready(cx)? {
135 task03::Poll::Ready(()) => inner.start_send(item).map(|()| AsyncSink01::Ready),
136 task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
137 })
138 }
139
poll_complete(&mut self) -> Poll01<(), Self::SinkError>140 fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
141 with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_flush(cx)))
142 }
143
close(&mut self) -> Poll01<(), Self::SinkError>144 fn close(&mut self) -> Poll01<(), Self::SinkError> {
145 with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_close(cx)))
146 }
147 }
148
149 #[derive(Clone)]
150 struct Current(task01::Task);
151
152 impl Current {
new() -> Self153 fn new() -> Self {
154 Self(task01::current())
155 }
156
as_waker(&self) -> WakerRef<'_>157 fn as_waker(&self) -> WakerRef<'_> {
158 unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current {
159 &*(ptr as *const Current)
160 }
161 fn current_to_ptr(current: &Current) -> *const () {
162 current as *const Current as *const ()
163 }
164
165 unsafe fn clone(ptr: *const ()) -> RawWaker {
166 // Lazily create the `Arc` only when the waker is actually cloned.
167 // FIXME: remove `transmute` when a `Waker` -> `RawWaker` conversion
168 // function is landed in `core`.
169 mem::transmute::<task03::Waker, RawWaker>(task03::waker(Arc::new(
170 ptr_to_current(ptr).clone(),
171 )))
172 }
173 unsafe fn drop(_: *const ()) {}
174 unsafe fn wake(ptr: *const ()) {
175 ptr_to_current(ptr).0.notify()
176 }
177
178 let ptr = current_to_ptr(self);
179 let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
180 WakerRef::new_unowned(std::mem::ManuallyDrop::new(unsafe {
181 task03::Waker::from_raw(RawWaker::new(ptr, vtable))
182 }))
183 }
184 }
185
186 impl ArcWake03 for Current {
wake_by_ref(arc_self: &Arc<Self>)187 fn wake_by_ref(arc_self: &Arc<Self>) {
188 arc_self.0.notify();
189 }
190 }
191
with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,192 fn with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R
193 where
194 T: Unpin,
195 F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
196 {
197 let current = Current::new();
198 let waker = current.as_waker();
199 let mut cx = Context::from_waker(&waker);
200 f(Pin::new(&mut compat.inner), &mut cx)
201 }
202
203 #[cfg(feature = "sink")]
with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,204 fn with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R
205 where
206 T: Unpin,
207 F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
208 {
209 let current = Current::new();
210 let waker = current.as_waker();
211 let mut cx = Context::from_waker(&waker);
212 f(Pin::new(&mut compat.inner), &mut cx)
213 }
214
215 #[cfg(feature = "io-compat")]
216 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
217 mod io {
218 use super::*;
219 use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
220 use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
221
poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error>222 fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error> {
223 match x {
224 task03::Poll::Ready(Ok(t)) => Ok(t),
225 task03::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()),
226 task03::Poll::Ready(Err(e)) => Err(e),
227 }
228 }
229
230 impl<R: AsyncRead03 + Unpin> std::io::Read for Compat<R> {
read(&mut self, buf: &mut [u8]) -> std::io::Result<usize>231 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
232 let current = Current::new();
233 let waker = current.as_waker();
234 let mut cx = Context::from_waker(&waker);
235 poll_03_to_io(Pin::new(&mut self.inner).poll_read(&mut cx, buf))
236 }
237 }
238
239 impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {}
240
241 impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
write(&mut self, buf: &[u8]) -> std::io::Result<usize>242 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
243 let current = Current::new();
244 let waker = current.as_waker();
245 let mut cx = Context::from_waker(&waker);
246 poll_03_to_io(Pin::new(&mut self.inner).poll_write(&mut cx, buf))
247 }
248
flush(&mut self) -> std::io::Result<()>249 fn flush(&mut self) -> std::io::Result<()> {
250 let current = Current::new();
251 let waker = current.as_waker();
252 let mut cx = Context::from_waker(&waker);
253 poll_03_to_io(Pin::new(&mut self.inner).poll_flush(&mut cx))
254 }
255 }
256
257 impl<W: AsyncWrite03 + Unpin> AsyncWrite01 for Compat<W> {
shutdown(&mut self) -> std::io::Result<Async01<()>>258 fn shutdown(&mut self) -> std::io::Result<Async01<()>> {
259 let current = Current::new();
260 let waker = current.as_waker();
261 let mut cx = Context::from_waker(&waker);
262 poll_03_to_01(Pin::new(&mut self.inner).poll_close(&mut cx))
263 }
264 }
265 }
266