1 use crate::codec::framed_impl::{FramedImpl, ReadFrame}; 2 use crate::codec::Decoder; 3 4 use futures_core::Stream; 5 use tokio::io::AsyncRead; 6 7 use bytes::BytesMut; 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 use std::fmt; 11 use std::pin::Pin; 12 use std::task::{Context, Poll}; 13 14 pin_project! { 15 /// A [`Stream`] of messages decoded from an [`AsyncRead`]. 16 /// 17 /// [`Stream`]: futures_core::Stream 18 /// [`AsyncRead`]: tokio::io::AsyncRead 19 pub struct FramedRead<T, D> { 20 #[pin] 21 inner: FramedImpl<T, D, ReadFrame>, 22 } 23 } 24 25 // ===== impl FramedRead ===== 26 27 impl<T, D> FramedRead<T, D> 28 where 29 T: AsyncRead, 30 D: Decoder, 31 { 32 /// Creates a new `FramedRead` with the given `decoder`. new(inner: T, decoder: D) -> FramedRead<T, D>33 pub fn new(inner: T, decoder: D) -> FramedRead<T, D> { 34 FramedRead { 35 inner: FramedImpl { 36 inner, 37 codec: decoder, 38 state: Default::default(), 39 }, 40 } 41 } 42 43 /// Creates a new `FramedRead` with the given `decoder` and a buffer of `capacity` 44 /// initial size. with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D>45 pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> { 46 FramedRead { 47 inner: FramedImpl { 48 inner, 49 codec: decoder, 50 state: ReadFrame { 51 eof: false, 52 is_readable: false, 53 buffer: BytesMut::with_capacity(capacity), 54 has_errored: false, 55 }, 56 }, 57 } 58 } 59 } 60 61 impl<T, D> FramedRead<T, D> { 62 /// Returns a reference to the underlying I/O stream wrapped by 63 /// `FramedRead`. 64 /// 65 /// Note that care should be taken to not tamper with the underlying stream 66 /// of data coming in as it may corrupt the stream of frames otherwise 67 /// being worked with. get_ref(&self) -> &T68 pub fn get_ref(&self) -> &T { 69 &self.inner.inner 70 } 71 72 /// Returns a mutable reference to the underlying I/O stream wrapped by 73 /// `FramedRead`. 74 /// 75 /// Note that care should be taken to not tamper with the underlying stream 76 /// of data coming in as it may corrupt the stream of frames otherwise 77 /// being worked with. get_mut(&mut self) -> &mut T78 pub fn get_mut(&mut self) -> &mut T { 79 &mut self.inner.inner 80 } 81 82 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by 83 /// `FramedRead`. 84 /// 85 /// Note that care should be taken to not tamper with the underlying stream 86 /// of data coming in as it may corrupt the stream of frames otherwise 87 /// being worked with. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>88 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { 89 self.project().inner.project().inner 90 } 91 92 /// Consumes the `FramedRead`, returning its underlying I/O stream. 93 /// 94 /// Note that care should be taken to not tamper with the underlying stream 95 /// of data coming in as it may corrupt the stream of frames otherwise 96 /// being worked with. into_inner(self) -> T97 pub fn into_inner(self) -> T { 98 self.inner.inner 99 } 100 101 /// Returns a reference to the underlying decoder. decoder(&self) -> &D102 pub fn decoder(&self) -> &D { 103 &self.inner.codec 104 } 105 106 /// Returns a mutable reference to the underlying decoder. decoder_mut(&mut self) -> &mut D107 pub fn decoder_mut(&mut self) -> &mut D { 108 &mut self.inner.codec 109 } 110 111 /// Maps the decoder `D` to `C`, preserving the read buffer 112 /// wrapped by `Framed`. map_decoder<C, F>(self, map: F) -> FramedRead<T, C> where F: FnOnce(D) -> C,113 pub fn map_decoder<C, F>(self, map: F) -> FramedRead<T, C> 114 where 115 F: FnOnce(D) -> C, 116 { 117 // This could be potentially simplified once rust-lang/rust#86555 hits stable 118 let FramedImpl { 119 inner, 120 state, 121 codec, 122 } = self.inner; 123 FramedRead { 124 inner: FramedImpl { 125 inner, 126 state, 127 codec: map(codec), 128 }, 129 } 130 } 131 132 /// Returns a mutable reference to the underlying decoder. decoder_pin_mut(self: Pin<&mut Self>) -> &mut D133 pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D { 134 self.project().inner.project().codec 135 } 136 137 /// Returns a reference to the read buffer. read_buffer(&self) -> &BytesMut138 pub fn read_buffer(&self) -> &BytesMut { 139 &self.inner.state.buffer 140 } 141 142 /// Returns a mutable reference to the read buffer. read_buffer_mut(&mut self) -> &mut BytesMut143 pub fn read_buffer_mut(&mut self) -> &mut BytesMut { 144 &mut self.inner.state.buffer 145 } 146 } 147 148 // This impl just defers to the underlying FramedImpl 149 impl<T, D> Stream for FramedRead<T, D> 150 where 151 T: AsyncRead, 152 D: Decoder, 153 { 154 type Item = Result<D::Item, D::Error>; 155 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>156 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 157 self.project().inner.poll_next(cx) 158 } 159 } 160 161 // This impl just defers to the underlying T: Sink 162 impl<T, I, D> Sink<I> for FramedRead<T, D> 163 where 164 T: Sink<I>, 165 { 166 type Error = T::Error; 167 poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>168 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 169 self.project().inner.project().inner.poll_ready(cx) 170 } 171 start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error>172 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { 173 self.project().inner.project().inner.start_send(item) 174 } 175 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>176 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 177 self.project().inner.project().inner.poll_flush(cx) 178 } 179 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>180 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 181 self.project().inner.project().inner.poll_close(cx) 182 } 183 } 184 185 impl<T, D> fmt::Debug for FramedRead<T, D> 186 where 187 T: fmt::Debug, 188 D: fmt::Debug, 189 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 191 f.debug_struct("FramedRead") 192 .field("inner", &self.get_ref()) 193 .field("decoder", &self.decoder()) 194 .field("eof", &self.inner.state.eof) 195 .field("is_readable", &self.inner.state.is_readable) 196 .field("buffer", &self.read_buffer()) 197 .finish() 198 } 199 } 200