1 use core::fmt; 2 use core::marker::PhantomData; 3 use core::pin::Pin; 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream}; 6 use futures_core::task::{Context, Poll}; 7 use futures_sink::Sink; 8 use pin_project_lite::pin_project; 9 10 pin_project! { 11 /// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method. 12 #[must_use = "sinks do nothing unless polled"] 13 pub struct WithFlatMap<Si, Item, U, St, F> { 14 #[pin] 15 sink: Si, 16 f: F, 17 #[pin] 18 stream: Option<St>, 19 buffer: Option<Item>, 20 _marker: PhantomData<fn(U)>, 21 } 22 } 23 24 impl<Si, Item, U, St, F> fmt::Debug for WithFlatMap<Si, Item, U, St, F> 25 where 26 Si: fmt::Debug, 27 St: fmt::Debug, 28 Item: fmt::Debug, 29 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 31 f.debug_struct("WithFlatMap") 32 .field("sink", &self.sink) 33 .field("stream", &self.stream) 34 .field("buffer", &self.buffer) 35 .finish() 36 } 37 } 38 39 impl<Si, Item, U, St, F> WithFlatMap<Si, Item, U, St, F> 40 where 41 Si: Sink<Item>, 42 F: FnMut(U) -> St, 43 St: Stream<Item = Result<Item, Si::Error>>, 44 { new(sink: Si, f: F) -> Self45 pub(super) fn new(sink: Si, f: F) -> Self { 46 Self { sink, f, stream: None, buffer: None, _marker: PhantomData } 47 } 48 49 delegate_access_inner!(sink, Si, ()); 50 try_empty_stream(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>>51 fn try_empty_stream(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> { 52 let mut this = self.project(); 53 54 if this.buffer.is_some() { 55 ready!(this.sink.as_mut().poll_ready(cx))?; 56 let item = this.buffer.take().unwrap(); 57 this.sink.as_mut().start_send(item)?; 58 } 59 if let Some(mut some_stream) = this.stream.as_mut().as_pin_mut() { 60 while let Some(item) = ready!(some_stream.as_mut().poll_next(cx)?) { 61 match this.sink.as_mut().poll_ready(cx)? { 62 Poll::Ready(()) => this.sink.as_mut().start_send(item)?, 63 Poll::Pending => { 64 *this.buffer = Some(item); 65 return Poll::Pending; 66 } 67 }; 68 } 69 } 70 this.stream.set(None); 71 Poll::Ready(Ok(())) 72 } 73 } 74 75 // Forwarding impl of Stream from the underlying sink 76 impl<S, Item, U, St, F> Stream for WithFlatMap<S, Item, U, St, F> 77 where 78 S: Stream + Sink<Item>, 79 F: FnMut(U) -> St, 80 St: Stream<Item = Result<Item, S::Error>>, 81 { 82 type Item = S::Item; 83 84 delegate_stream!(sink); 85 } 86 87 impl<S, Item, U, St, F> FusedStream for WithFlatMap<S, Item, U, St, F> 88 where 89 S: FusedStream + Sink<Item>, 90 F: FnMut(U) -> St, 91 St: Stream<Item = Result<Item, S::Error>>, 92 { is_terminated(&self) -> bool93 fn is_terminated(&self) -> bool { 94 self.sink.is_terminated() 95 } 96 } 97 98 impl<Si, Item, U, St, F> Sink<U> for WithFlatMap<Si, Item, U, St, F> 99 where 100 Si: Sink<Item>, 101 F: FnMut(U) -> St, 102 St: Stream<Item = Result<Item, Si::Error>>, 103 { 104 type Error = Si::Error; 105 poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>106 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 107 self.try_empty_stream(cx) 108 } 109 start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error>110 fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> { 111 let mut this = self.project(); 112 113 assert!(this.stream.is_none()); 114 this.stream.set(Some((this.f)(item))); 115 Ok(()) 116 } 117 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>118 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 119 ready!(self.as_mut().try_empty_stream(cx)?); 120 self.project().sink.poll_flush(cx) 121 } 122 poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>123 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 124 ready!(self.as_mut().try_empty_stream(cx)?); 125 self.project().sink.poll_close(cx) 126 } 127 } 128