• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::fmt;
2 use core::marker::PhantomData;
3 use core::pin::Pin;
4 use futures_core::future::Future;
5 use futures_core::ready;
6 use futures_core::stream::Stream;
7 use futures_core::task::{Context, Poll};
8 use futures_sink::Sink;
9 use pin_project_lite::pin_project;
10 
11 pin_project! {
12     /// Sink for the [`with`](super::SinkExt::with) method.
13     #[must_use = "sinks do nothing unless polled"]
14     pub struct With<Si, Item, U, Fut, F> {
15         #[pin]
16         sink: Si,
17         f: F,
18         #[pin]
19         state: Option<Fut>,
20         _phantom: PhantomData<fn(U) -> Item>,
21     }
22 }
23 
24 impl<Si, Item, U, Fut, F> fmt::Debug for With<Si, Item, U, Fut, F>
25 where
26     Si: fmt::Debug,
27     Fut: fmt::Debug,
28 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30         f.debug_struct("With")
31             .field("sink", &self.sink)
32             .field("state", &self.state)
33             .finish()
34     }
35 }
36 
37 impl<Si, Item, U, Fut, F> With<Si, Item, U, Fut, F>
38 where Si: Sink<Item>,
39       F: FnMut(U) -> Fut,
40       Fut: Future,
41 {
new<E>(sink: Si, f: F) -> Self where Fut: Future<Output = Result<Item, E>>, E: From<Si::Error>,42     pub(super) fn new<E>(sink: Si, f: F) -> Self
43         where
44             Fut: Future<Output = Result<Item, E>>,
45             E: From<Si::Error>,
46     {
47         Self {
48             state: None,
49             sink,
50             f,
51             _phantom: PhantomData,
52         }
53     }
54 }
55 
56 impl<Si, Item, U, Fut, F> Clone for With<Si, Item, U, Fut, F>
57 where
58     Si: Clone,
59     F: Clone,
60     Fut: Clone,
61 {
clone(&self) -> Self62     fn clone(&self) -> Self {
63         Self {
64             state: self.state.clone(),
65             sink: self.sink.clone(),
66             f: self.f.clone(),
67             _phantom: PhantomData,
68         }
69     }
70 }
71 
72 // Forwarding impl of Stream from the underlying sink
73 impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F>
74     where S: Stream + Sink<Item>,
75           F: FnMut(U) -> Fut,
76           Fut: Future
77 {
78     type Item = S::Item;
79 
80     delegate_stream!(sink);
81 }
82 
83 impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F>
84     where Si: Sink<Item>,
85           F: FnMut(U) -> Fut,
86           Fut: Future<Output = Result<Item, E>>,
87           E: From<Si::Error>,
88 {
89     delegate_access_inner!(sink, Si, ());
90 
91     /// Completes the processing of previous item if any.
poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), E>>92     fn poll(
93         self: Pin<&mut Self>,
94         cx: &mut Context<'_>,
95     ) -> Poll<Result<(), E>> {
96         let mut this = self.project();
97 
98         let item = match this.state.as_mut().as_pin_mut() {
99             None => return Poll::Ready(Ok(())),
100             Some(fut) => ready!(fut.poll(cx))?,
101         };
102         this.state.set(None);
103         this.sink.start_send(item)?;
104         Poll::Ready(Ok(()))
105     }
106 }
107 
108 impl<Si, Item, U, Fut, F, E> Sink<U> for With<Si, Item, U, Fut, F>
109     where Si: Sink<Item>,
110           F: FnMut(U) -> Fut,
111           Fut: Future<Output = Result<Item, E>>,
112           E: From<Si::Error>,
113 {
114     type Error = E;
115 
poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>116     fn poll_ready(
117         mut self: Pin<&mut Self>,
118         cx: &mut Context<'_>,
119     ) -> Poll<Result<(), Self::Error>> {
120         ready!(self.as_mut().poll(cx))?;
121         ready!(self.project().sink.poll_ready(cx)?);
122         Poll::Ready(Ok(()))
123     }
124 
start_send( self: Pin<&mut Self>, item: U, ) -> Result<(), Self::Error>125     fn start_send(
126         self: Pin<&mut Self>,
127         item: U,
128     ) -> Result<(), Self::Error> {
129         let mut this = self.project();
130 
131         assert!(this.state.is_none());
132         this.state.set(Some((this.f)(item)));
133         Ok(())
134     }
135 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>136     fn poll_flush(
137         mut self: Pin<&mut Self>,
138         cx: &mut Context<'_>,
139     ) -> Poll<Result<(), Self::Error>> {
140         ready!(self.as_mut().poll(cx))?;
141         ready!(self.project().sink.poll_flush(cx)?);
142         Poll::Ready(Ok(()))
143     }
144 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>145     fn poll_close(
146         mut self: Pin<&mut Self>,
147         cx: &mut Context<'_>,
148     ) -> Poll<Result<(), Self::Error>> {
149         ready!(self.as_mut().poll(cx))?;
150         ready!(self.project().sink.poll_close(cx)?);
151         Poll::Ready(Ok(()))
152     }
153 }
154