1 use core::fmt; 2 use core::marker::PhantomData; 3 use core::pin::Pin; 4 use futures_core::future::Future; 5 use futures_core::ready; 6 use futures_core::stream::Stream; 7 use futures_core::task::{Context, Poll}; 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 11 pin_project! { 12 /// Sink for the [`with`](super::SinkExt::with) method. 13 #[must_use = "sinks do nothing unless polled"] 14 pub struct With<Si, Item, U, Fut, F> { 15 #[pin] 16 sink: Si, 17 f: F, 18 #[pin] 19 state: Option<Fut>, 20 _phantom: PhantomData<fn(U) -> Item>, 21 } 22 } 23 24 impl<Si, Item, U, Fut, F> fmt::Debug for With<Si, Item, U, Fut, F> 25 where 26 Si: fmt::Debug, 27 Fut: fmt::Debug, 28 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 30 f.debug_struct("With") 31 .field("sink", &self.sink) 32 .field("state", &self.state) 33 .finish() 34 } 35 } 36 37 impl<Si, Item, U, Fut, F> With<Si, Item, U, Fut, F> 38 where Si: Sink<Item>, 39 F: FnMut(U) -> Fut, 40 Fut: Future, 41 { new<E>(sink: Si, f: F) -> Self where Fut: Future<Output = Result<Item, E>>, E: From<Si::Error>,42 pub(super) fn new<E>(sink: Si, f: F) -> Self 43 where 44 Fut: Future<Output = Result<Item, E>>, 45 E: From<Si::Error>, 46 { 47 Self { 48 state: None, 49 sink, 50 f, 51 _phantom: PhantomData, 52 } 53 } 54 } 55 56 impl<Si, Item, U, Fut, F> Clone for With<Si, Item, U, Fut, F> 57 where 58 Si: Clone, 59 F: Clone, 60 Fut: Clone, 61 { clone(&self) -> Self62 fn clone(&self) -> Self { 63 Self { 64 state: self.state.clone(), 65 sink: self.sink.clone(), 66 f: self.f.clone(), 67 _phantom: PhantomData, 68 } 69 } 70 } 71 72 // Forwarding impl of Stream from the underlying sink 73 impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F> 74 where S: Stream + Sink<Item>, 75 F: FnMut(U) -> Fut, 76 Fut: Future 77 { 78 type Item = S::Item; 79 80 delegate_stream!(sink); 81 } 82 83 impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F> 84 where Si: Sink<Item>, 85 F: FnMut(U) -> Fut, 86 Fut: Future<Output = Result<Item, E>>, 87 E: From<Si::Error>, 88 { 89 delegate_access_inner!(sink, Si, ()); 90 91 /// Completes the processing of previous item if any. poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), E>>92 fn poll( 93 self: Pin<&mut Self>, 94 cx: &mut Context<'_>, 95 ) -> Poll<Result<(), E>> { 96 let mut this = self.project(); 97 98 let item = match this.state.as_mut().as_pin_mut() { 99 None => return Poll::Ready(Ok(())), 100 Some(fut) => ready!(fut.poll(cx))?, 101 }; 102 this.state.set(None); 103 this.sink.start_send(item)?; 104 Poll::Ready(Ok(())) 105 } 106 } 107 108 impl<Si, Item, U, Fut, F, E> Sink<U> for With<Si, Item, U, Fut, F> 109 where Si: Sink<Item>, 110 F: FnMut(U) -> Fut, 111 Fut: Future<Output = Result<Item, E>>, 112 E: From<Si::Error>, 113 { 114 type Error = E; 115 poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>116 fn poll_ready( 117 mut self: Pin<&mut Self>, 118 cx: &mut Context<'_>, 119 ) -> Poll<Result<(), Self::Error>> { 120 ready!(self.as_mut().poll(cx))?; 121 ready!(self.project().sink.poll_ready(cx)?); 122 Poll::Ready(Ok(())) 123 } 124 start_send( self: Pin<&mut Self>, item: U, ) -> Result<(), Self::Error>125 fn start_send( 126 self: Pin<&mut Self>, 127 item: U, 128 ) -> Result<(), Self::Error> { 129 let mut this = self.project(); 130 131 assert!(this.state.is_none()); 132 this.state.set(Some((this.f)(item))); 133 Ok(()) 134 } 135 poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>136 fn poll_flush( 137 mut self: Pin<&mut Self>, 138 cx: &mut Context<'_>, 139 ) -> Poll<Result<(), Self::Error>> { 140 ready!(self.as_mut().poll(cx))?; 141 ready!(self.project().sink.poll_flush(cx)?); 142 Poll::Ready(Ok(())) 143 } 144 poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>145 fn poll_close( 146 mut self: Pin<&mut Self>, 147 cx: &mut Context<'_>, 148 ) -> Poll<Result<(), Self::Error>> { 149 ready!(self.as_mut().poll(cx))?; 150 ready!(self.project().sink.poll_close(cx)?); 151 Poll::Ready(Ok(())) 152 } 153 } 154