1 use futures_01::{
2 task as task01, Async as Async01, Future as Future01, Poll as Poll01,
3 Stream as Stream01,
4 };
5 #[cfg(feature = "sink")]
6 use futures_01::{
7 AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01,
8 };
9 use futures_core::{
10 task::{RawWaker, RawWakerVTable},
11 future::TryFuture as TryFuture03,
12 stream::TryStream as TryStream03,
13 };
14 #[cfg(feature = "sink")]
15 use futures_sink::Sink as Sink03;
16 use crate::task::{
17 self as task03,
18 ArcWake as ArcWake03,
19 WakerRef,
20 };
21 #[cfg(feature = "sink")]
22 use std::marker::PhantomData;
23 use std::{
24 mem,
25 pin::Pin,
26 sync::Arc,
27 task::Context,
28 };
29
30 /// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture) or
31 /// [`TryStream`](futures_core::stream::TryStream) into a futures 0.1
32 /// [`Future`](futures_01::future::Future) or
33 /// [`Stream`](futures_01::stream::Stream).
34 #[derive(Debug, Clone, Copy)]
35 #[must_use = "futures do nothing unless you `.await` or poll them"]
36 pub struct Compat<T> {
37 pub(crate) inner: T,
38 }
39
40 /// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1
41 /// [`Sink`](futures_01::sink::Sink).
42 #[cfg(feature = "sink")]
43 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
44 #[derive(Debug)]
45 #[must_use = "sinks do nothing unless polled"]
46 pub struct CompatSink<T, Item> {
47 inner: T,
48 _phantom: PhantomData<fn(Item)>,
49 }
50
51 impl<T> Compat<T> {
52 /// Creates a new [`Compat`].
53 ///
54 /// For types which implement appropriate futures `0.3`
55 /// traits, the result will be a type which implements
56 /// the corresponding futures 0.1 type.
new(inner: T) -> Self57 pub fn new(inner: T) -> Self {
58 Self { inner }
59 }
60
61 /// Get a reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
62 /// contained within.
get_ref(&self) -> &T63 pub fn get_ref(&self) -> &T {
64 &self.inner
65 }
66
67 /// Get a mutable reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
68 /// contained within.
get_mut(&mut self) -> &mut T69 pub fn get_mut(&mut self) -> &mut T {
70 &mut self.inner
71 }
72
73 /// Returns the inner item.
into_inner(self) -> T74 pub fn into_inner(self) -> T {
75 self.inner
76 }
77 }
78
79 #[cfg(feature = "sink")]
80 impl<T, Item> CompatSink<T, Item> {
81 /// Creates a new [`CompatSink`].
new(inner: T) -> Self82 pub fn new(inner: T) -> Self {
83 Self {
84 inner,
85 _phantom: PhantomData,
86 }
87 }
88
89 /// Get a reference to 0.3 Sink contained within.
get_ref(&self) -> &T90 pub fn get_ref(&self) -> &T {
91 &self.inner
92 }
93
94 /// Get a mutable reference to 0.3 Sink contained within.
get_mut(&mut self) -> &mut T95 pub fn get_mut(&mut self) -> &mut T {
96 &mut self.inner
97 }
98
99 /// Returns the inner item.
into_inner(self) -> T100 pub fn into_inner(self) -> T {
101 self.inner
102 }
103 }
104
poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E>105 fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>)
106 -> Result<Async01<T>, E>
107 {
108 match x? {
109 task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
110 task03::Poll::Pending => Ok(Async01::NotReady),
111 }
112 }
113
114 impl<Fut> Future01 for Compat<Fut>
115 where
116 Fut: TryFuture03 + Unpin,
117 {
118 type Item = Fut::Ok;
119 type Error = Fut::Error;
120
poll(&mut self) -> Poll01<Self::Item, Self::Error>121 fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
122 with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
123 }
124 }
125
126 impl<St> Stream01 for Compat<St>
127 where
128 St: TryStream03 + Unpin,
129 {
130 type Item = St::Ok;
131 type Error = St::Error;
132
poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error>133 fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
134 with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
135 task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
136 task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
137 task03::Poll::Pending => Ok(Async01::NotReady),
138 })
139 }
140 }
141
142 #[cfg(feature = "sink")]
143 impl<T, Item> Sink01 for CompatSink<T, Item>
144 where
145 T: Sink03<Item> + Unpin,
146 {
147 type SinkItem = Item;
148 type SinkError = T::Error;
149
start_send( &mut self, item: Self::SinkItem, ) -> StartSend01<Self::SinkItem, Self::SinkError>150 fn start_send(
151 &mut self,
152 item: Self::SinkItem,
153 ) -> StartSend01<Self::SinkItem, Self::SinkError> {
154 with_sink_context(self, |mut inner, cx| {
155 match inner.as_mut().poll_ready(cx)? {
156 task03::Poll::Ready(()) => {
157 inner.start_send(item).map(|()| AsyncSink01::Ready)
158 }
159 task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
160 }
161 })
162 }
163
poll_complete(&mut self) -> Poll01<(), Self::SinkError>164 fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
165 with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_flush(cx)))
166 }
167
close(&mut self) -> Poll01<(), Self::SinkError>168 fn close(&mut self) -> Poll01<(), Self::SinkError> {
169 with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_close(cx)))
170 }
171 }
172
173 #[derive(Clone)]
174 struct Current(task01::Task);
175
176 impl Current {
new() -> Self177 fn new() -> Self {
178 Self(task01::current())
179 }
180
as_waker(&self) -> WakerRef<'_>181 fn as_waker(&self) -> WakerRef<'_> {
182 unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current {
183 &*(ptr as *const Current)
184 }
185 fn current_to_ptr(current: &Current) -> *const () {
186 current as *const Current as *const ()
187 }
188
189 unsafe fn clone(ptr: *const ()) -> RawWaker {
190 // Lazily create the `Arc` only when the waker is actually cloned.
191 // FIXME: remove `transmute` when a `Waker` -> `RawWaker` conversion
192 // function is landed in `core`.
193 mem::transmute::<task03::Waker, RawWaker>(
194 task03::waker(Arc::new(ptr_to_current(ptr).clone()))
195 )
196 }
197 unsafe fn drop(_: *const ()) {}
198 unsafe fn wake(ptr: *const ()) {
199 ptr_to_current(ptr).0.notify()
200 }
201
202 let ptr = current_to_ptr(self);
203 let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
204 WakerRef::new_unowned(std::mem::ManuallyDrop::new(unsafe {
205 task03::Waker::from_raw(RawWaker::new(ptr, vtable))
206 }))
207 }
208 }
209
210 impl ArcWake03 for Current {
wake_by_ref(arc_self: &Arc<Self>)211 fn wake_by_ref(arc_self: &Arc<Self>) {
212 arc_self.0.notify();
213 }
214 }
215
with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,216 fn with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R
217 where
218 T: Unpin,
219 F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
220 {
221 let current = Current::new();
222 let waker = current.as_waker();
223 let mut cx = Context::from_waker(&waker);
224 f(Pin::new(&mut compat.inner), &mut cx)
225 }
226
227 #[cfg(feature = "sink")]
with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R where T: Unpin, F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,228 fn with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R
229 where
230 T: Unpin,
231 F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
232 {
233 let current = Current::new();
234 let waker = current.as_waker();
235 let mut cx = Context::from_waker(&waker);
236 f(Pin::new(&mut compat.inner), &mut cx)
237 }
238
239 #[cfg(feature = "io-compat")]
240 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
241 mod io {
242 use super::*;
243 use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
244 use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
245
poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error>246 fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>)
247 -> Result<T, std::io::Error>
248 {
249 match x {
250 task03::Poll::Ready(Ok(t)) => Ok(t),
251 task03::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()),
252 task03::Poll::Ready(Err(e)) => Err(e),
253 }
254 }
255
256 impl<R: AsyncRead03 + Unpin> std::io::Read for Compat<R> {
read(&mut self, buf: &mut [u8]) -> std::io::Result<usize>257 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
258 let current = Current::new();
259 let waker = current.as_waker();
260 let mut cx = Context::from_waker(&waker);
261 poll_03_to_io(Pin::new(&mut self.inner).poll_read(&mut cx, buf))
262 }
263 }
264
265 impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {
266 #[cfg(feature = "read-initializer")]
prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool267 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
268 let initializer = self.inner.initializer();
269 let does_init = initializer.should_initialize();
270 if does_init {
271 initializer.initialize(buf);
272 }
273 does_init
274 }
275 }
276
277 impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
write(&mut self, buf: &[u8]) -> std::io::Result<usize>278 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
279 let current = Current::new();
280 let waker = current.as_waker();
281 let mut cx = Context::from_waker(&waker);
282 poll_03_to_io(Pin::new(&mut self.inner).poll_write(&mut cx, buf))
283 }
284
flush(&mut self) -> std::io::Result<()>285 fn flush(&mut self) -> std::io::Result<()> {
286 let current = Current::new();
287 let waker = current.as_waker();
288 let mut cx = Context::from_waker(&waker);
289 poll_03_to_io(Pin::new(&mut self.inner).poll_flush(&mut cx))
290 }
291 }
292
293 impl<W: AsyncWrite03 + Unpin> AsyncWrite01 for Compat<W> {
shutdown(&mut self) -> std::io::Result<Async01<()>>294 fn shutdown(&mut self) -> std::io::Result<Async01<()>> {
295 let current = Current::new();
296 let waker = current.as_waker();
297 let mut cx = Context::from_waker(&waker);
298 poll_03_to_01(Pin::new(&mut self.inner).poll_close(&mut cx))
299 }
300 }
301 }
302