• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::codec::encoder::Encoder;
2 use crate::codec::framed_impl::{FramedImpl, WriteFrame};
3 
4 use futures_core::Stream;
5 use tokio::io::AsyncWrite;
6 
7 use bytes::BytesMut;
8 use futures_sink::Sink;
9 use pin_project_lite::pin_project;
10 use std::fmt;
11 use std::io;
12 use std::pin::Pin;
13 use std::task::{Context, Poll};
14 
15 pin_project! {
16     /// A [`Sink`] of frames encoded to an `AsyncWrite`.
17     ///
18     /// [`Sink`]: futures_sink::Sink
19     pub struct FramedWrite<T, E> {
20         #[pin]
21         inner: FramedImpl<T, E, WriteFrame>,
22     }
23 }
24 
25 impl<T, E> FramedWrite<T, E>
26 where
27     T: AsyncWrite,
28 {
29     /// Creates a new `FramedWrite` with the given `encoder`.
new(inner: T, encoder: E) -> FramedWrite<T, E>30     pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
31         FramedWrite {
32             inner: FramedImpl {
33                 inner,
34                 codec: encoder,
35                 state: WriteFrame::default(),
36             },
37         }
38     }
39 }
40 
41 impl<T, E> FramedWrite<T, E> {
42     /// Returns a reference to the underlying I/O stream wrapped by
43     /// `FramedWrite`.
44     ///
45     /// Note that care should be taken to not tamper with the underlying stream
46     /// of data coming in as it may corrupt the stream of frames otherwise
47     /// being worked with.
get_ref(&self) -> &T48     pub fn get_ref(&self) -> &T {
49         &self.inner.inner
50     }
51 
52     /// Returns a mutable reference to the underlying I/O stream wrapped by
53     /// `FramedWrite`.
54     ///
55     /// Note that care should be taken to not tamper with the underlying stream
56     /// of data coming in as it may corrupt the stream of frames otherwise
57     /// being worked with.
get_mut(&mut self) -> &mut T58     pub fn get_mut(&mut self) -> &mut T {
59         &mut self.inner.inner
60     }
61 
62     /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
63     /// `FramedWrite`.
64     ///
65     /// Note that care should be taken to not tamper with the underlying stream
66     /// of data coming in as it may corrupt the stream of frames otherwise
67     /// being worked with.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>68     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
69         self.project().inner.project().inner
70     }
71 
72     /// Consumes the `FramedWrite`, returning its underlying I/O stream.
73     ///
74     /// Note that care should be taken to not tamper with the underlying stream
75     /// of data coming in as it may corrupt the stream of frames otherwise
76     /// being worked with.
into_inner(self) -> T77     pub fn into_inner(self) -> T {
78         self.inner.inner
79     }
80 
81     /// Returns a reference to the underlying encoder.
encoder(&self) -> &E82     pub fn encoder(&self) -> &E {
83         &self.inner.codec
84     }
85 
86     /// Returns a mutable reference to the underlying encoder.
encoder_mut(&mut self) -> &mut E87     pub fn encoder_mut(&mut self) -> &mut E {
88         &mut self.inner.codec
89     }
90 
91     /// Maps the encoder `E` to `C`, preserving the write buffer
92     /// wrapped by `Framed`.
map_encoder<C, F>(self, map: F) -> FramedWrite<T, C> where F: FnOnce(E) -> C,93     pub fn map_encoder<C, F>(self, map: F) -> FramedWrite<T, C>
94     where
95         F: FnOnce(E) -> C,
96     {
97         // This could be potentially simplified once rust-lang/rust#86555 hits stable
98         let FramedImpl {
99             inner,
100             state,
101             codec,
102         } = self.inner;
103         FramedWrite {
104             inner: FramedImpl {
105                 inner,
106                 state,
107                 codec: map(codec),
108             },
109         }
110     }
111 
112     /// Returns a mutable reference to the underlying encoder.
encoder_pin_mut(self: Pin<&mut Self>) -> &mut E113     pub fn encoder_pin_mut(self: Pin<&mut Self>) -> &mut E {
114         self.project().inner.project().codec
115     }
116 
117     /// Returns a reference to the write buffer.
write_buffer(&self) -> &BytesMut118     pub fn write_buffer(&self) -> &BytesMut {
119         &self.inner.state.buffer
120     }
121 
122     /// Returns a mutable reference to the write buffer.
write_buffer_mut(&mut self) -> &mut BytesMut123     pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
124         &mut self.inner.state.buffer
125     }
126 
127     /// Returns backpressure boundary
backpressure_boundary(&self) -> usize128     pub fn backpressure_boundary(&self) -> usize {
129         self.inner.state.backpressure_boundary
130     }
131 
132     /// Updates backpressure boundary
set_backpressure_boundary(&mut self, boundary: usize)133     pub fn set_backpressure_boundary(&mut self, boundary: usize) {
134         self.inner.state.backpressure_boundary = boundary;
135     }
136 }
137 
138 // This impl just defers to the underlying FramedImpl
139 impl<T, I, E> Sink<I> for FramedWrite<T, E>
140 where
141     T: AsyncWrite,
142     E: Encoder<I>,
143     E::Error: From<io::Error>,
144 {
145     type Error = E::Error;
146 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>147     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148         self.project().inner.poll_ready(cx)
149     }
150 
start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error>151     fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
152         self.project().inner.start_send(item)
153     }
154 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>155     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156         self.project().inner.poll_flush(cx)
157     }
158 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>159     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
160         self.project().inner.poll_close(cx)
161     }
162 }
163 
164 // This impl just defers to the underlying T: Stream
165 impl<T, D> Stream for FramedWrite<T, D>
166 where
167     T: Stream,
168 {
169     type Item = T::Item;
170 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>171     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172         self.project().inner.project().inner.poll_next(cx)
173     }
174 }
175 
176 impl<T, U> fmt::Debug for FramedWrite<T, U>
177 where
178     T: fmt::Debug,
179     U: fmt::Debug,
180 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result181     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182         f.debug_struct("FramedWrite")
183             .field("inner", &self.get_ref())
184             .field("encoder", &self.encoder())
185             .field("buffer", &self.inner.state.buffer)
186             .finish()
187     }
188 }
189