1 use crate::codec::encoder::Encoder; 2 use crate::codec::framed_impl::{FramedImpl, WriteFrame}; 3 4 use futures_core::Stream; 5 use tokio::io::AsyncWrite; 6 7 use bytes::BytesMut; 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 use std::fmt; 11 use std::io; 12 use std::pin::Pin; 13 use std::task::{Context, Poll}; 14 15 pin_project! { 16 /// A [`Sink`] of frames encoded to an `AsyncWrite`. 17 /// 18 /// [`Sink`]: futures_sink::Sink 19 pub struct FramedWrite<T, E> { 20 #[pin] 21 inner: FramedImpl<T, E, WriteFrame>, 22 } 23 } 24 25 impl<T, E> FramedWrite<T, E> 26 where 27 T: AsyncWrite, 28 { 29 /// Creates a new `FramedWrite` with the given `encoder`. new(inner: T, encoder: E) -> FramedWrite<T, E>30 pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> { 31 FramedWrite { 32 inner: FramedImpl { 33 inner, 34 codec: encoder, 35 state: WriteFrame::default(), 36 }, 37 } 38 } 39 } 40 41 impl<T, E> FramedWrite<T, E> { 42 /// Returns a reference to the underlying I/O stream wrapped by 43 /// `FramedWrite`. 44 /// 45 /// Note that care should be taken to not tamper with the underlying stream 46 /// of data coming in as it may corrupt the stream of frames otherwise 47 /// being worked with. get_ref(&self) -> &T48 pub fn get_ref(&self) -> &T { 49 &self.inner.inner 50 } 51 52 /// Returns a mutable reference to the underlying I/O stream wrapped by 53 /// `FramedWrite`. 54 /// 55 /// Note that care should be taken to not tamper with the underlying stream 56 /// of data coming in as it may corrupt the stream of frames otherwise 57 /// being worked with. get_mut(&mut self) -> &mut T58 pub fn get_mut(&mut self) -> &mut T { 59 &mut self.inner.inner 60 } 61 62 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by 63 /// `FramedWrite`. 64 /// 65 /// Note that care should be taken to not tamper with the underlying stream 66 /// of data coming in as it may corrupt the stream of frames otherwise 67 /// being worked with. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>68 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { 69 self.project().inner.project().inner 70 } 71 72 /// Consumes the `FramedWrite`, returning its underlying I/O stream. 73 /// 74 /// Note that care should be taken to not tamper with the underlying stream 75 /// of data coming in as it may corrupt the stream of frames otherwise 76 /// being worked with. into_inner(self) -> T77 pub fn into_inner(self) -> T { 78 self.inner.inner 79 } 80 81 /// Returns a reference to the underlying encoder. encoder(&self) -> &E82 pub fn encoder(&self) -> &E { 83 &self.inner.codec 84 } 85 86 /// Returns a mutable reference to the underlying encoder. encoder_mut(&mut self) -> &mut E87 pub fn encoder_mut(&mut self) -> &mut E { 88 &mut self.inner.codec 89 } 90 91 /// Maps the encoder `E` to `C`, preserving the write buffer 92 /// wrapped by `Framed`. map_encoder<C, F>(self, map: F) -> FramedWrite<T, C> where F: FnOnce(E) -> C,93 pub fn map_encoder<C, F>(self, map: F) -> FramedWrite<T, C> 94 where 95 F: FnOnce(E) -> C, 96 { 97 // This could be potentially simplified once rust-lang/rust#86555 hits stable 98 let FramedImpl { 99 inner, 100 state, 101 codec, 102 } = self.inner; 103 FramedWrite { 104 inner: FramedImpl { 105 inner, 106 state, 107 codec: map(codec), 108 }, 109 } 110 } 111 112 /// Returns a mutable reference to the underlying encoder. encoder_pin_mut(self: Pin<&mut Self>) -> &mut E113 pub fn encoder_pin_mut(self: Pin<&mut Self>) -> &mut E { 114 self.project().inner.project().codec 115 } 116 117 /// Returns a reference to the write buffer. write_buffer(&self) -> &BytesMut118 pub fn write_buffer(&self) -> &BytesMut { 119 &self.inner.state.buffer 120 } 121 122 /// Returns a mutable reference to the write buffer. write_buffer_mut(&mut self) -> &mut BytesMut123 pub fn write_buffer_mut(&mut self) -> &mut BytesMut { 124 &mut self.inner.state.buffer 125 } 126 127 /// Returns backpressure boundary backpressure_boundary(&self) -> usize128 pub fn backpressure_boundary(&self) -> usize { 129 self.inner.state.backpressure_boundary 130 } 131 132 /// Updates backpressure boundary set_backpressure_boundary(&mut self, boundary: usize)133 pub fn set_backpressure_boundary(&mut self, boundary: usize) { 134 self.inner.state.backpressure_boundary = boundary; 135 } 136 } 137 138 // This impl just defers to the underlying FramedImpl 139 impl<T, I, E> Sink<I> for FramedWrite<T, E> 140 where 141 T: AsyncWrite, 142 E: Encoder<I>, 143 E::Error: From<io::Error>, 144 { 145 type Error = E::Error; 146 poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>147 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 148 self.project().inner.poll_ready(cx) 149 } 150 start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error>151 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { 152 self.project().inner.start_send(item) 153 } 154 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>155 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 156 self.project().inner.poll_flush(cx) 157 } 158 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>159 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 160 self.project().inner.poll_close(cx) 161 } 162 } 163 164 // This impl just defers to the underlying T: Stream 165 impl<T, D> Stream for FramedWrite<T, D> 166 where 167 T: Stream, 168 { 169 type Item = T::Item; 170 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>171 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 172 self.project().inner.project().inner.poll_next(cx) 173 } 174 } 175 176 impl<T, U> fmt::Debug for FramedWrite<T, U> 177 where 178 T: fmt::Debug, 179 U: fmt::Debug, 180 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 182 f.debug_struct("FramedWrite") 183 .field("inner", &self.get_ref()) 184 .field("encoder", &self.encoder()) 185 .field("buffer", &self.inner.state.buffer) 186 .finish() 187 } 188 } 189