1 use crate::sink::{SinkExt, SinkMapErr}; 2 use futures_core::stream::{FusedStream, Stream}; 3 use futures_sink::Sink; 4 use pin_project_lite::pin_project; 5 6 pin_project! { 7 /// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method. 8 #[derive(Debug)] 9 #[must_use = "sinks do nothing unless polled"] 10 pub struct SinkErrInto<Si: Sink<Item>, Item, E> { 11 #[pin] 12 sink: SinkMapErr<Si, fn(Si::Error) -> E>, 13 } 14 } 15 16 impl<Si, E, Item> SinkErrInto<Si, Item, E> 17 where 18 Si: Sink<Item>, 19 Si::Error: Into<E>, 20 { new(sink: Si) -> Self21 pub(super) fn new(sink: Si) -> Self { 22 Self { sink: SinkExt::sink_map_err(sink, Into::into) } 23 } 24 25 delegate_access_inner!(sink, Si, (.)); 26 } 27 28 impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E> 29 where 30 Si: Sink<Item>, 31 Si::Error: Into<E>, 32 { 33 type Error = E; 34 35 delegate_sink!(sink, Item); 36 } 37 38 // Forwarding impl of Stream from the underlying sink 39 impl<S, Item, E> Stream for SinkErrInto<S, Item, E> 40 where 41 S: Sink<Item> + Stream, 42 S::Error: Into<E>, 43 { 44 type Item = S::Item; 45 46 delegate_stream!(sink); 47 } 48 49 impl<S, Item, E> FusedStream for SinkErrInto<S, Item, E> 50 where 51 S: Sink<Item> + FusedStream, 52 S::Error: Into<E>, 53 { is_terminated(&self) -> bool54 fn is_terminated(&self) -> bool { 55 self.sink.is_terminated() 56 } 57 } 58