• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use bytes::Bytes;
2 use futures_core::stream::Stream;
3 use futures_sink::Sink;
4 use pin_project_lite::pin_project;
5 use std::pin::Pin;
6 use std::task::{Context, Poll};
7 
8 pin_project! {
9     /// A helper that wraps a [`Sink`]`<`[`Bytes`]`>` and converts it into a
10     /// [`Sink`]`<&'a [u8]>` by copying each byte slice into an owned [`Bytes`].
11     ///
12     /// See the documentation for [`SinkWriter`] for an example.
13     ///
14     /// [`Bytes`]: bytes::Bytes
15     /// [`SinkWriter`]: crate::io::SinkWriter
16     /// [`Sink`]: futures_sink::Sink
17     #[derive(Debug)]
18     pub struct CopyToBytes<S> {
19         #[pin]
20         inner: S,
21     }
22 }
23 
24 impl<S> CopyToBytes<S> {
25     /// Creates a new [`CopyToBytes`].
new(inner: S) -> Self26     pub fn new(inner: S) -> Self {
27         Self { inner }
28     }
29 
30     /// Gets a reference to the underlying sink.
get_ref(&self) -> &S31     pub fn get_ref(&self) -> &S {
32         &self.inner
33     }
34 
35     /// Gets a mutable reference to the underlying sink.
get_mut(&mut self) -> &mut S36     pub fn get_mut(&mut self) -> &mut S {
37         &mut self.inner
38     }
39 
40     /// Consumes this [`CopyToBytes`], returning the underlying sink.
into_inner(self) -> S41     pub fn into_inner(self) -> S {
42         self.inner
43     }
44 }
45 
46 impl<'a, S> Sink<&'a [u8]> for CopyToBytes<S>
47 where
48     S: Sink<Bytes>,
49 {
50     type Error = S::Error;
51 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>52     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53         self.project().inner.poll_ready(cx)
54     }
55 
start_send(self: Pin<&mut Self>, item: &'a [u8]) -> Result<(), Self::Error>56     fn start_send(self: Pin<&mut Self>, item: &'a [u8]) -> Result<(), Self::Error> {
57         self.project()
58             .inner
59             .start_send(Bytes::copy_from_slice(item))
60     }
61 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>62     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63         self.project().inner.poll_flush(cx)
64     }
65 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>66     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67         self.project().inner.poll_close(cx)
68     }
69 }
70 
71 impl<S: Stream> Stream for CopyToBytes<S> {
72     type Item = S::Item;
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>73     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74         self.project().inner.poll_next(cx)
75     }
76 }
77