1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::ready;
4 use futures_core::stream::Stream;
5 use futures_core::task::{Context, Poll};
6 use futures_sink::Sink;
7
8 use crate::lock::BiLock;
9
10 /// A `Stream` part of the split pair
11 #[derive(Debug)]
12 #[must_use = "streams do nothing unless polled"]
13 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
14 pub struct SplitStream<S>(BiLock<S>);
15
16 impl<S> Unpin for SplitStream<S> {}
17
18 impl<S> SplitStream<S> {
19 /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
is_pair_of<Item>(&self, other: &SplitSink<S, Item>) -> bool20 pub fn is_pair_of<Item>(&self, other: &SplitSink<S, Item>) -> bool {
21 other.is_pair_of(&self)
22 }
23 }
24
25 impl<S: Unpin> SplitStream<S> {
26 /// Attempts to put the two "halves" of a split `Stream + Sink` back
27 /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
28 /// a matching pair originating from the same call to `StreamExt::split`.
reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>> where S: Sink<Item>,29 pub fn reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>>
30 where
31 S: Sink<Item>,
32 {
33 other.reunite(self)
34 }
35 }
36
37 impl<S: Stream> Stream for SplitStream<S> {
38 type Item = S::Item;
39
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>40 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
41 ready!(self.0.poll_lock(cx)).as_pin_mut().poll_next(cx)
42 }
43 }
44
45 #[allow(non_snake_case)]
SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item>46 fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> {
47 SplitSink { lock, slot: None }
48 }
49
50 /// A `Sink` part of the split pair
51 #[derive(Debug)]
52 #[must_use = "sinks do nothing unless polled"]
53 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
54 pub struct SplitSink<S, Item> {
55 lock: BiLock<S>,
56 slot: Option<Item>,
57 }
58
59 impl<S, Item> Unpin for SplitSink<S, Item> {}
60
61 impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
62 /// Attempts to put the two "halves" of a split `Stream + Sink` back
63 /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
64 /// a matching pair originating from the same call to `StreamExt::split`.
reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>>65 pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>> {
66 self.lock.reunite(other.0).map_err(|err| ReuniteError(SplitSink(err.0), SplitStream(err.1)))
67 }
68 }
69
70 impl<S, Item> SplitSink<S, Item> {
71 /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
is_pair_of(&self, other: &SplitStream<S>) -> bool72 pub fn is_pair_of(&self, other: &SplitStream<S>) -> bool {
73 self.lock.is_pair_of(&other.0)
74 }
75 }
76
77 impl<S: Sink<Item>, Item> SplitSink<S, Item> {
poll_flush_slot( mut inner: Pin<&mut S>, slot: &mut Option<Item>, cx: &mut Context<'_>, ) -> Poll<Result<(), S::Error>>78 fn poll_flush_slot(
79 mut inner: Pin<&mut S>,
80 slot: &mut Option<Item>,
81 cx: &mut Context<'_>,
82 ) -> Poll<Result<(), S::Error>> {
83 if slot.is_some() {
84 ready!(inner.as_mut().poll_ready(cx))?;
85 Poll::Ready(inner.start_send(slot.take().unwrap()))
86 } else {
87 Poll::Ready(Ok(()))
88 }
89 }
90
poll_lock_and_flush_slot( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), S::Error>>91 fn poll_lock_and_flush_slot(
92 mut self: Pin<&mut Self>,
93 cx: &mut Context<'_>,
94 ) -> Poll<Result<(), S::Error>> {
95 let this = &mut *self;
96 let mut inner = ready!(this.lock.poll_lock(cx));
97 Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx)
98 }
99 }
100
101 impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
102 type Error = S::Error;
103
poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>104 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
105 loop {
106 if self.slot.is_none() {
107 return Poll::Ready(Ok(()));
108 }
109 ready!(self.as_mut().poll_lock_and_flush_slot(cx))?;
110 }
111 }
112
start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error>113 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error> {
114 self.slot = Some(item);
115 Ok(())
116 }
117
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>118 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
119 let this = &mut *self;
120 let mut inner = ready!(this.lock.poll_lock(cx));
121 ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
122 inner.as_pin_mut().poll_flush(cx)
123 }
124
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>125 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
126 let this = &mut *self;
127 let mut inner = ready!(this.lock.poll_lock(cx));
128 ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
129 inner.as_pin_mut().poll_close(cx)
130 }
131 }
132
split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, SplitStream<S>)133 pub(super) fn split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, SplitStream<S>) {
134 let (a, b) = BiLock::new(s);
135 let read = SplitStream(a);
136 let write = SplitSink(b);
137 (write, read)
138 }
139
140 /// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves
141 /// of a `Stream + Split`, and thus could not be `reunite`d.
142 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
143 pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>);
144
145 impl<T, Item> fmt::Debug for ReuniteError<T, Item> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 f.debug_tuple("ReuniteError").field(&"...").finish()
148 }
149 }
150
151 impl<T, Item> fmt::Display for ReuniteError<T, Item> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result152 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153 write!(f, "tried to reunite a SplitStream and SplitSink that don't form a pair")
154 }
155 }
156
157 #[cfg(feature = "std")]
158 impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {}
159
160 #[cfg(test)]
161 mod tests {
162 use super::*;
163 use crate::stream::StreamExt;
164 use core::marker::PhantomData;
165
166 struct NopStream<Item> {
167 phantom: PhantomData<Item>,
168 }
169
170 impl<Item> Stream for NopStream<Item> {
171 type Item = Item;
172
poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>>173 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
174 todo!()
175 }
176 }
177
178 impl<Item> Sink<Item> for NopStream<Item> {
179 type Error = ();
180
poll_ready( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>181 fn poll_ready(
182 self: Pin<&mut Self>,
183 _cx: &mut Context<'_>,
184 ) -> Poll<Result<(), Self::Error>> {
185 todo!()
186 }
187
start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error>188 fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> {
189 todo!()
190 }
191
poll_flush( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>192 fn poll_flush(
193 self: Pin<&mut Self>,
194 _cx: &mut Context<'_>,
195 ) -> Poll<Result<(), Self::Error>> {
196 todo!()
197 }
198
poll_close( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>199 fn poll_close(
200 self: Pin<&mut Self>,
201 _cx: &mut Context<'_>,
202 ) -> Poll<Result<(), Self::Error>> {
203 todo!()
204 }
205 }
206
207 #[test]
test_pairing()208 fn test_pairing() {
209 let s1 = NopStream::<()> { phantom: PhantomData };
210 let (sink1, stream1) = s1.split();
211 assert!(sink1.is_pair_of(&stream1));
212 assert!(stream1.is_pair_of(&sink1));
213
214 let s2 = NopStream::<()> { phantom: PhantomData };
215 let (sink2, stream2) = s2.split();
216 assert!(sink2.is_pair_of(&stream2));
217 assert!(stream2.is_pair_of(&sink2));
218
219 assert!(!sink1.is_pair_of(&stream2));
220 assert!(!stream1.is_pair_of(&sink2));
221 assert!(!sink2.is_pair_of(&stream1));
222 assert!(!stream2.is_pair_of(&sink1));
223 }
224 }
225