• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::ready;
4 use futures_core::stream::Stream;
5 use futures_core::task::{Context, Poll};
6 use futures_sink::Sink;
7 
8 use crate::lock::BiLock;
9 
10 /// A `Stream` part of the split pair
11 #[derive(Debug)]
12 #[must_use = "streams do nothing unless polled"]
13 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
14 pub struct SplitStream<S>(BiLock<S>);
15 
16 impl<S> Unpin for SplitStream<S> {}
17 
18 impl<S> SplitStream<S> {
19     /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
is_pair_of<Item>(&self, other: &SplitSink<S, Item>) -> bool20     pub fn is_pair_of<Item>(&self, other: &SplitSink<S, Item>) -> bool {
21         other.is_pair_of(&self)
22     }
23 }
24 
25 impl<S: Unpin> SplitStream<S> {
26     /// Attempts to put the two "halves" of a split `Stream + Sink` back
27     /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
28     /// a matching pair originating from the same call to `StreamExt::split`.
reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>> where S: Sink<Item>,29     pub fn reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>>
30     where
31         S: Sink<Item>,
32     {
33         other.reunite(self)
34     }
35 }
36 
37 impl<S: Stream> Stream for SplitStream<S> {
38     type Item = S::Item;
39 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>40     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
41         ready!(self.0.poll_lock(cx)).as_pin_mut().poll_next(cx)
42     }
43 }
44 
45 #[allow(non_snake_case)]
SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item>46 fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> {
47     SplitSink { lock, slot: None }
48 }
49 
50 /// A `Sink` part of the split pair
51 #[derive(Debug)]
52 #[must_use = "sinks do nothing unless polled"]
53 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
54 pub struct SplitSink<S, Item> {
55     lock: BiLock<S>,
56     slot: Option<Item>,
57 }
58 
59 impl<S, Item> Unpin for SplitSink<S, Item> {}
60 
61 impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
62     /// Attempts to put the two "halves" of a split `Stream + Sink` back
63     /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
64     /// a matching pair originating from the same call to `StreamExt::split`.
reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>>65     pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>> {
66         self.lock.reunite(other.0).map_err(|err| ReuniteError(SplitSink(err.0), SplitStream(err.1)))
67     }
68 }
69 
70 impl<S, Item> SplitSink<S, Item> {
71     /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
is_pair_of(&self, other: &SplitStream<S>) -> bool72     pub fn is_pair_of(&self, other: &SplitStream<S>) -> bool {
73         self.lock.is_pair_of(&other.0)
74     }
75 }
76 
77 impl<S: Sink<Item>, Item> SplitSink<S, Item> {
poll_flush_slot( mut inner: Pin<&mut S>, slot: &mut Option<Item>, cx: &mut Context<'_>, ) -> Poll<Result<(), S::Error>>78     fn poll_flush_slot(
79         mut inner: Pin<&mut S>,
80         slot: &mut Option<Item>,
81         cx: &mut Context<'_>,
82     ) -> Poll<Result<(), S::Error>> {
83         if slot.is_some() {
84             ready!(inner.as_mut().poll_ready(cx))?;
85             Poll::Ready(inner.start_send(slot.take().unwrap()))
86         } else {
87             Poll::Ready(Ok(()))
88         }
89     }
90 
poll_lock_and_flush_slot( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), S::Error>>91     fn poll_lock_and_flush_slot(
92         mut self: Pin<&mut Self>,
93         cx: &mut Context<'_>,
94     ) -> Poll<Result<(), S::Error>> {
95         let this = &mut *self;
96         let mut inner = ready!(this.lock.poll_lock(cx));
97         Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx)
98     }
99 }
100 
101 impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
102     type Error = S::Error;
103 
poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>104     fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
105         loop {
106             if self.slot.is_none() {
107                 return Poll::Ready(Ok(()));
108             }
109             ready!(self.as_mut().poll_lock_and_flush_slot(cx))?;
110         }
111     }
112 
start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error>113     fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error> {
114         self.slot = Some(item);
115         Ok(())
116     }
117 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>118     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
119         let this = &mut *self;
120         let mut inner = ready!(this.lock.poll_lock(cx));
121         ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
122         inner.as_pin_mut().poll_flush(cx)
123     }
124 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>125     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
126         let this = &mut *self;
127         let mut inner = ready!(this.lock.poll_lock(cx));
128         ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
129         inner.as_pin_mut().poll_close(cx)
130     }
131 }
132 
split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, SplitStream<S>)133 pub(super) fn split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, SplitStream<S>) {
134     let (a, b) = BiLock::new(s);
135     let read = SplitStream(a);
136     let write = SplitSink(b);
137     (write, read)
138 }
139 
140 /// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves
141 /// of a `Stream + Split`, and thus could not be `reunite`d.
142 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
143 pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>);
144 
145 impl<T, Item> fmt::Debug for ReuniteError<T, Item> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result146     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147         f.debug_tuple("ReuniteError").field(&"...").finish()
148     }
149 }
150 
151 impl<T, Item> fmt::Display for ReuniteError<T, Item> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result152     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153         write!(f, "tried to reunite a SplitStream and SplitSink that don't form a pair")
154     }
155 }
156 
157 #[cfg(feature = "std")]
158 impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {}
159 
160 #[cfg(test)]
161 mod tests {
162     use super::*;
163     use crate::stream::StreamExt;
164     use core::marker::PhantomData;
165 
166     struct NopStream<Item> {
167         phantom: PhantomData<Item>,
168     }
169 
170     impl<Item> Stream for NopStream<Item> {
171         type Item = Item;
172 
poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>>173         fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
174             todo!()
175         }
176     }
177 
178     impl<Item> Sink<Item> for NopStream<Item> {
179         type Error = ();
180 
poll_ready( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>181         fn poll_ready(
182             self: Pin<&mut Self>,
183             _cx: &mut Context<'_>,
184         ) -> Poll<Result<(), Self::Error>> {
185             todo!()
186         }
187 
start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error>188         fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> {
189             todo!()
190         }
191 
poll_flush( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>192         fn poll_flush(
193             self: Pin<&mut Self>,
194             _cx: &mut Context<'_>,
195         ) -> Poll<Result<(), Self::Error>> {
196             todo!()
197         }
198 
poll_close( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>199         fn poll_close(
200             self: Pin<&mut Self>,
201             _cx: &mut Context<'_>,
202         ) -> Poll<Result<(), Self::Error>> {
203             todo!()
204         }
205     }
206 
207     #[test]
test_pairing()208     fn test_pairing() {
209         let s1 = NopStream::<()> { phantom: PhantomData };
210         let (sink1, stream1) = s1.split();
211         assert!(sink1.is_pair_of(&stream1));
212         assert!(stream1.is_pair_of(&sink1));
213 
214         let s2 = NopStream::<()> { phantom: PhantomData };
215         let (sink2, stream2) = s2.split();
216         assert!(sink2.is_pair_of(&stream2));
217         assert!(stream2.is_pair_of(&sink2));
218 
219         assert!(!sink1.is_pair_of(&stream2));
220         assert!(!stream1.is_pair_of(&sink2));
221         assert!(!sink2.is_pair_of(&stream1));
222         assert!(!stream2.is_pair_of(&sink1));
223     }
224 }
225