• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use futures::channel::{mpsc, oneshot};
2 use futures::executor::block_on;
3 use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt};
4 use futures::never::Never;
5 use futures::ready;
6 use futures::sink::{self, Sink, SinkErrInto, SinkExt};
7 use futures::stream::{self, Stream, StreamExt};
8 use futures::task::{self, ArcWake, Context, Poll, Waker};
9 use futures_test::task::panic_context;
10 use std::cell::{Cell, RefCell};
11 use std::collections::VecDeque;
12 use std::fmt;
13 use std::mem;
14 use std::pin::Pin;
15 use std::rc::Rc;
16 use std::sync::atomic::{AtomicBool, Ordering};
17 use std::sync::Arc;
18 
sassert_next<S>(s: &mut S, item: S::Item) where S: Stream + Unpin, S::Item: Eq + fmt::Debug,19 fn sassert_next<S>(s: &mut S, item: S::Item)
20 where
21     S: Stream + Unpin,
22     S::Item: Eq + fmt::Debug,
23 {
24     match s.poll_next_unpin(&mut panic_context()) {
25         Poll::Ready(None) => panic!("stream is at its end"),
26         Poll::Ready(Some(e)) => assert_eq!(e, item),
27         Poll::Pending => panic!("stream wasn't ready"),
28     }
29 }
30 
unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T31 fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
32     match x {
33         Poll::Ready(Ok(x)) => x,
34         Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
35         Poll::Pending => panic!("Poll::Pending"),
36     }
37 }
38 
39 // An Unpark struct that records unpark events for inspection
40 struct Flag(AtomicBool);
41 
42 impl Flag {
new() -> Arc<Self>43     fn new() -> Arc<Self> {
44         Arc::new(Self(AtomicBool::new(false)))
45     }
46 
take(&self) -> bool47     fn take(&self) -> bool {
48         self.0.swap(false, Ordering::SeqCst)
49     }
50 
set(&self, v: bool)51     fn set(&self, v: bool) {
52         self.0.store(v, Ordering::SeqCst)
53     }
54 }
55 
56 impl ArcWake for Flag {
wake_by_ref(arc_self: &Arc<Self>)57     fn wake_by_ref(arc_self: &Arc<Self>) {
58         arc_self.set(true)
59     }
60 }
61 
flag_cx<F, R>(f: F) -> R where F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,62 fn flag_cx<F, R>(f: F) -> R
63 where
64     F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
65 {
66     let flag = Flag::new();
67     let waker = task::waker_ref(&flag);
68     let cx = &mut Context::from_waker(&waker);
69     f(flag.clone(), cx)
70 }
71 
72 // Sends a value on an i32 channel sink
73 struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
74 
75 impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
new(sink: S, item: Item) -> Self76     fn new(sink: S, item: Item) -> Self {
77         Self(Some(sink), Some(item))
78     }
79 }
80 
81 impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
82     type Output = Result<S, S::Error>;
83 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>84     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
85         let Self(inner, item) = self.get_mut();
86         {
87             let mut inner = inner.as_mut().unwrap();
88             ready!(Pin::new(&mut inner).poll_ready(cx))?;
89             Pin::new(&mut inner).start_send(item.take().unwrap())?;
90         }
91         Poll::Ready(Ok(inner.take().unwrap()))
92     }
93 }
94 
95 // Immediately accepts all requests to start pushing, but completion is managed
96 // by manually flushing
97 struct ManualFlush<T: Unpin> {
98     data: Vec<T>,
99     waiting_tasks: Vec<Waker>,
100 }
101 
102 impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
103     type Error = ();
104 
poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>105     fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106         Poll::Ready(Ok(()))
107     }
108 
start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error>109     fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
110         if let Some(item) = item {
111             self.data.push(item);
112         } else {
113             self.force_flush();
114         }
115         Ok(())
116     }
117 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>118     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119         if self.data.is_empty() {
120             Poll::Ready(Ok(()))
121         } else {
122             self.waiting_tasks.push(cx.waker().clone());
123             Poll::Pending
124         }
125     }
126 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>127     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
128         self.poll_flush(cx)
129     }
130 }
131 
132 impl<T: Unpin> ManualFlush<T> {
new() -> Self133     fn new() -> Self {
134         Self { data: Vec::new(), waiting_tasks: Vec::new() }
135     }
136 
force_flush(&mut self) -> Vec<T>137     fn force_flush(&mut self) -> Vec<T> {
138         for task in self.waiting_tasks.drain(..) {
139             task.wake()
140         }
141         mem::replace(&mut self.data, Vec::new())
142     }
143 }
144 
145 struct ManualAllow<T: Unpin> {
146     data: Vec<T>,
147     allow: Rc<Allow>,
148 }
149 
150 struct Allow {
151     flag: Cell<bool>,
152     tasks: RefCell<Vec<Waker>>,
153 }
154 
155 impl Allow {
new() -> Self156     fn new() -> Self {
157         Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) }
158     }
159 
check(&self, cx: &mut Context<'_>) -> bool160     fn check(&self, cx: &mut Context<'_>) -> bool {
161         if self.flag.get() {
162             true
163         } else {
164             self.tasks.borrow_mut().push(cx.waker().clone());
165             false
166         }
167     }
168 
start(&self)169     fn start(&self) {
170         self.flag.set(true);
171         let mut tasks = self.tasks.borrow_mut();
172         for task in tasks.drain(..) {
173             task.wake();
174         }
175     }
176 }
177 
178 impl<T: Unpin> Sink<T> for ManualAllow<T> {
179     type Error = ();
180 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>181     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182         if self.allow.check(cx) {
183             Poll::Ready(Ok(()))
184         } else {
185             Poll::Pending
186         }
187     }
188 
start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error>189     fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
190         self.data.push(item);
191         Ok(())
192     }
193 
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>194     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195         Poll::Ready(Ok(()))
196     }
197 
poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>198     fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199         Poll::Ready(Ok(()))
200     }
201 }
202 
manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>)203 fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
204     let allow = Rc::new(Allow::new());
205     let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() };
206     (manual_allow, allow)
207 }
208 
209 #[test]
either_sink()210 fn either_sink() {
211     let mut s =
212         if true { Vec::<i32>::new().left_sink() } else { VecDeque::<i32>::new().right_sink() };
213 
214     Pin::new(&mut s).start_send(0).unwrap();
215 }
216 
217 #[test]
vec_sink()218 fn vec_sink() {
219     let mut v = Vec::new();
220     Pin::new(&mut v).start_send(0).unwrap();
221     Pin::new(&mut v).start_send(1).unwrap();
222     assert_eq!(v, vec![0, 1]);
223     block_on(v.flush()).unwrap();
224     assert_eq!(v, vec![0, 1]);
225 }
226 
227 #[test]
vecdeque_sink()228 fn vecdeque_sink() {
229     let mut deque = VecDeque::new();
230     Pin::new(&mut deque).start_send(2).unwrap();
231     Pin::new(&mut deque).start_send(3).unwrap();
232 
233     assert_eq!(deque.pop_front(), Some(2));
234     assert_eq!(deque.pop_front(), Some(3));
235     assert_eq!(deque.pop_front(), None);
236 }
237 
238 #[test]
send()239 fn send() {
240     let mut v = Vec::new();
241 
242     block_on(v.send(0)).unwrap();
243     assert_eq!(v, vec![0]);
244 
245     block_on(v.send(1)).unwrap();
246     assert_eq!(v, vec![0, 1]);
247 
248     block_on(v.send(2)).unwrap();
249     assert_eq!(v, vec![0, 1, 2]);
250 }
251 
252 #[test]
send_all()253 fn send_all() {
254     let mut v = Vec::new();
255 
256     block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap();
257     assert_eq!(v, vec![0, 1]);
258 
259     block_on(v.send_all(&mut stream::iter(vec![2, 3]).map(Ok))).unwrap();
260     assert_eq!(v, vec![0, 1, 2, 3]);
261 
262     block_on(v.send_all(&mut stream::iter(vec![4, 5]).map(Ok))).unwrap();
263     assert_eq!(v, vec![0, 1, 2, 3, 4, 5]);
264 }
265 
266 // Test that `start_send` on an `mpsc` channel does indeed block when the
267 // channel is full
268 #[test]
mpsc_blocking_start_send()269 fn mpsc_blocking_start_send() {
270     let (mut tx, mut rx) = mpsc::channel::<i32>(0);
271 
272     block_on(future::lazy(|_| {
273         tx.start_send(0).unwrap();
274 
275         flag_cx(|flag, cx| {
276             let mut task = StartSendFut::new(tx, 1);
277 
278             assert!(task.poll_unpin(cx).is_pending());
279             assert!(!flag.take());
280             sassert_next(&mut rx, 0);
281             assert!(flag.take());
282             unwrap(task.poll_unpin(cx));
283             assert!(!flag.take());
284             sassert_next(&mut rx, 1);
285         })
286     }));
287 }
288 
289 // test `flush` by using `with` to make the first insertion into a sink block
290 // until a oneshot is completed
291 #[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
292 #[test]
with_flush()293 fn with_flush() {
294     let (tx, rx) = oneshot::channel();
295     let mut block = rx.boxed();
296     let mut sink = Vec::new().with(|elem| {
297         mem::replace(&mut block, future::ok(()).boxed())
298             .map_ok(move |()| elem + 1)
299             .map_err(|_| -> Never { panic!() })
300     });
301 
302     assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(()));
303 
304     flag_cx(|flag, cx| {
305         let mut task = sink.flush();
306         assert!(task.poll_unpin(cx).is_pending());
307         tx.send(()).unwrap();
308         assert!(flag.take());
309 
310         unwrap(task.poll_unpin(cx));
311 
312         block_on(sink.send(1)).unwrap();
313         assert_eq!(sink.get_ref(), &[1, 2]);
314     })
315 }
316 
317 // test simple use of with to change data
318 #[test]
with_as_map()319 fn with_as_map() {
320     let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2));
321     block_on(sink.send(0)).unwrap();
322     block_on(sink.send(1)).unwrap();
323     block_on(sink.send(2)).unwrap();
324     assert_eq!(sink.get_ref(), &[0, 2, 4]);
325 }
326 
327 // test simple use of with_flat_map
328 #[test]
with_flat_map()329 fn with_flat_map() {
330     let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok));
331     block_on(sink.send(0)).unwrap();
332     block_on(sink.send(1)).unwrap();
333     block_on(sink.send(2)).unwrap();
334     block_on(sink.send(3)).unwrap();
335     assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]);
336 }
337 
338 // Check that `with` propagates `poll_ready` to the inner sink.
339 // Regression test for the issue #1834.
340 #[test]
with_propagates_poll_ready()341 fn with_propagates_poll_ready() {
342     let (tx, mut rx) = mpsc::channel::<i32>(0);
343     let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10));
344 
345     block_on(future::lazy(|_| {
346         flag_cx(|flag, cx| {
347             let mut tx = Pin::new(&mut tx);
348 
349             // Should be ready for the first item.
350             assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
351             assert_eq!(tx.as_mut().start_send(0), Ok(()));
352 
353             // Should be ready for the second item only after the first one is received.
354             assert_eq!(tx.as_mut().poll_ready(cx), Poll::Pending);
355             assert!(!flag.take());
356             sassert_next(&mut rx, 10);
357             assert!(flag.take());
358             assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
359             assert_eq!(tx.as_mut().start_send(1), Ok(()));
360         })
361     }));
362 }
363 
364 // test that the `with` sink doesn't require the underlying sink to flush,
365 // but doesn't claim to be flushed until the underlying sink is
366 #[test]
with_flush_propagate()367 fn with_flush_propagate() {
368     let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>);
369     flag_cx(|flag, cx| {
370         unwrap(Pin::new(&mut sink).poll_ready(cx));
371         Pin::new(&mut sink).start_send(Some(0)).unwrap();
372         unwrap(Pin::new(&mut sink).poll_ready(cx));
373         Pin::new(&mut sink).start_send(Some(1)).unwrap();
374 
375         {
376             let mut task = sink.flush();
377             assert!(task.poll_unpin(cx).is_pending());
378             assert!(!flag.take());
379         }
380         assert_eq!(sink.get_mut().force_flush(), vec![0, 1]);
381         assert!(flag.take());
382         unwrap(sink.flush().poll_unpin(cx));
383     })
384 }
385 
386 // test that `Clone` is implemented on `with` sinks
387 #[test]
with_implements_clone()388 fn with_implements_clone() {
389     let (mut tx, rx) = mpsc::channel(5);
390 
391     {
392         let mut is_positive = tx.clone().with(|item| future::ok::<bool, mpsc::SendError>(item > 0));
393 
394         let mut is_long =
395             tx.clone().with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5));
396 
397         block_on(is_positive.clone().send(-1)).unwrap();
398         block_on(is_long.clone().send("123456")).unwrap();
399         block_on(is_long.send("123")).unwrap();
400         block_on(is_positive.send(1)).unwrap();
401     }
402 
403     block_on(tx.send(false)).unwrap();
404 
405     block_on(tx.close()).unwrap();
406 
407     assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![false, true, false, true, false]);
408 }
409 
410 // test that a buffer is a no-nop around a sink that always accepts sends
411 #[test]
buffer_noop()412 fn buffer_noop() {
413     let mut sink = Vec::new().buffer(0);
414     block_on(sink.send(0)).unwrap();
415     block_on(sink.send(1)).unwrap();
416     assert_eq!(sink.get_ref(), &[0, 1]);
417 
418     let mut sink = Vec::new().buffer(1);
419     block_on(sink.send(0)).unwrap();
420     block_on(sink.send(1)).unwrap();
421     assert_eq!(sink.get_ref(), &[0, 1]);
422 }
423 
424 // test basic buffer functionality, including both filling up to capacity,
425 // and writing out when the underlying sink is ready
426 #[test]
buffer()427 fn buffer() {
428     let (sink, allow) = manual_allow::<i32>();
429     let sink = sink.buffer(2);
430 
431     let sink = block_on(StartSendFut::new(sink, 0)).unwrap();
432     let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap();
433 
434     flag_cx(|flag, cx| {
435         let mut task = sink.send(2);
436         assert!(task.poll_unpin(cx).is_pending());
437         assert!(!flag.take());
438         allow.start();
439         assert!(flag.take());
440         unwrap(task.poll_unpin(cx));
441         assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
442     })
443 }
444 
445 #[test]
fanout_smoke()446 fn fanout_smoke() {
447     let sink1 = Vec::new();
448     let sink2 = Vec::new();
449     let mut sink = sink1.fanout(sink2);
450     block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]).map(Ok))).unwrap();
451     let (sink1, sink2) = sink.into_inner();
452     assert_eq!(sink1, vec![1, 2, 3]);
453     assert_eq!(sink2, vec![1, 2, 3]);
454 }
455 
456 #[test]
fanout_backpressure()457 fn fanout_backpressure() {
458     let (left_send, mut left_recv) = mpsc::channel(0);
459     let (right_send, mut right_recv) = mpsc::channel(0);
460     let sink = left_send.fanout(right_send);
461 
462     let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap();
463 
464     flag_cx(|flag, cx| {
465         let mut task = sink.send(2);
466         assert!(!flag.take());
467         assert!(task.poll_unpin(cx).is_pending());
468         assert_eq!(block_on(left_recv.next()), Some(0));
469         assert!(flag.take());
470         assert!(task.poll_unpin(cx).is_pending());
471         assert_eq!(block_on(right_recv.next()), Some(0));
472         assert!(flag.take());
473 
474         assert!(task.poll_unpin(cx).is_pending());
475         assert_eq!(block_on(left_recv.next()), Some(2));
476         assert!(flag.take());
477         assert!(task.poll_unpin(cx).is_pending());
478         assert_eq!(block_on(right_recv.next()), Some(2));
479         assert!(flag.take());
480 
481         unwrap(task.poll_unpin(cx));
482         // make sure receivers live until end of test to prevent send errors
483         drop(left_recv);
484         drop(right_recv);
485     })
486 }
487 
488 #[test]
sink_map_err()489 fn sink_map_err() {
490     {
491         let cx = &mut panic_context();
492         let (tx, _rx) = mpsc::channel(1);
493         let mut tx = tx.sink_map_err(|_| ());
494         assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
495         assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
496     }
497 
498     let tx = mpsc::channel(0).0;
499     assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(()));
500 }
501 
502 #[test]
sink_unfold()503 fn sink_unfold() {
504     block_on(poll_fn(|cx| {
505         let (tx, mut rx) = mpsc::channel(1);
506         let unfold = sink::unfold((), |(), i: i32| {
507             let mut tx = tx.clone();
508             async move {
509                 tx.send(i).await.unwrap();
510                 Ok::<_, String>(())
511             }
512         });
513         futures::pin_mut!(unfold);
514         assert_eq!(unfold.as_mut().start_send(1), Ok(()));
515         assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
516         assert_eq!(rx.try_next().unwrap(), Some(1));
517 
518         assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
519         assert_eq!(unfold.as_mut().start_send(2), Ok(()));
520         assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
521         assert_eq!(unfold.as_mut().start_send(3), Ok(()));
522         assert_eq!(rx.try_next().unwrap(), Some(2));
523         assert!(rx.try_next().is_err());
524         assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
525         assert_eq!(unfold.as_mut().start_send(4), Ok(()));
526         assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full
527         assert_eq!(rx.try_next().unwrap(), Some(3));
528         assert_eq!(rx.try_next().unwrap(), Some(4));
529 
530         Poll::Ready(())
531     }))
532 }
533 
534 #[test]
err_into()535 fn err_into() {
536     #[derive(Copy, Clone, Debug, PartialEq, Eq)]
537     struct ErrIntoTest;
538 
539     impl From<mpsc::SendError> for ErrIntoTest {
540         fn from(_: mpsc::SendError) -> Self {
541             Self
542         }
543     }
544 
545     {
546         let cx = &mut panic_context();
547         let (tx, _rx) = mpsc::channel(1);
548         let mut tx: SinkErrInto<mpsc::Sender<()>, _, ErrIntoTest> = tx.sink_err_into();
549         assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
550         assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
551     }
552 
553     let tx = mpsc::channel(0).0;
554     assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest));
555 }
556