1 use futures::channel::{mpsc, oneshot};
2 use futures::executor::block_on;
3 use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt};
4 use futures::never::Never;
5 use futures::ready;
6 use futures::sink::{self, Sink, SinkErrInto, SinkExt};
7 use futures::stream::{self, Stream, StreamExt};
8 use futures::task::{self, ArcWake, Context, Poll, Waker};
9 use futures_test::task::panic_context;
10 use std::cell::{Cell, RefCell};
11 use std::collections::VecDeque;
12 use std::fmt;
13 use std::mem;
14 use std::pin::Pin;
15 use std::rc::Rc;
16 use std::sync::atomic::{AtomicBool, Ordering};
17 use std::sync::Arc;
18
sassert_next<S>(s: &mut S, item: S::Item) where S: Stream + Unpin, S::Item: Eq + fmt::Debug,19 fn sassert_next<S>(s: &mut S, item: S::Item)
20 where
21 S: Stream + Unpin,
22 S::Item: Eq + fmt::Debug,
23 {
24 match s.poll_next_unpin(&mut panic_context()) {
25 Poll::Ready(None) => panic!("stream is at its end"),
26 Poll::Ready(Some(e)) => assert_eq!(e, item),
27 Poll::Pending => panic!("stream wasn't ready"),
28 }
29 }
30
unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T31 fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
32 match x {
33 Poll::Ready(Ok(x)) => x,
34 Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
35 Poll::Pending => panic!("Poll::Pending"),
36 }
37 }
38
39 // An Unpark struct that records unpark events for inspection
40 struct Flag(AtomicBool);
41
42 impl Flag {
new() -> Arc<Self>43 fn new() -> Arc<Self> {
44 Arc::new(Self(AtomicBool::new(false)))
45 }
46
take(&self) -> bool47 fn take(&self) -> bool {
48 self.0.swap(false, Ordering::SeqCst)
49 }
50
set(&self, v: bool)51 fn set(&self, v: bool) {
52 self.0.store(v, Ordering::SeqCst)
53 }
54 }
55
56 impl ArcWake for Flag {
wake_by_ref(arc_self: &Arc<Self>)57 fn wake_by_ref(arc_self: &Arc<Self>) {
58 arc_self.set(true)
59 }
60 }
61
flag_cx<F, R>(f: F) -> R where F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,62 fn flag_cx<F, R>(f: F) -> R
63 where
64 F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
65 {
66 let flag = Flag::new();
67 let waker = task::waker_ref(&flag);
68 let cx = &mut Context::from_waker(&waker);
69 f(flag.clone(), cx)
70 }
71
72 // Sends a value on an i32 channel sink
73 struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
74
75 impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
new(sink: S, item: Item) -> Self76 fn new(sink: S, item: Item) -> Self {
77 Self(Some(sink), Some(item))
78 }
79 }
80
81 impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
82 type Output = Result<S, S::Error>;
83
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>84 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
85 let Self(inner, item) = self.get_mut();
86 {
87 let mut inner = inner.as_mut().unwrap();
88 ready!(Pin::new(&mut inner).poll_ready(cx))?;
89 Pin::new(&mut inner).start_send(item.take().unwrap())?;
90 }
91 Poll::Ready(Ok(inner.take().unwrap()))
92 }
93 }
94
95 // Immediately accepts all requests to start pushing, but completion is managed
96 // by manually flushing
97 struct ManualFlush<T: Unpin> {
98 data: Vec<T>,
99 waiting_tasks: Vec<Waker>,
100 }
101
102 impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
103 type Error = ();
104
poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>105 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106 Poll::Ready(Ok(()))
107 }
108
start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error>109 fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
110 if let Some(item) = item {
111 self.data.push(item);
112 } else {
113 self.force_flush();
114 }
115 Ok(())
116 }
117
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>118 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119 if self.data.is_empty() {
120 Poll::Ready(Ok(()))
121 } else {
122 self.waiting_tasks.push(cx.waker().clone());
123 Poll::Pending
124 }
125 }
126
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>127 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
128 self.poll_flush(cx)
129 }
130 }
131
132 impl<T: Unpin> ManualFlush<T> {
new() -> Self133 fn new() -> Self {
134 Self { data: Vec::new(), waiting_tasks: Vec::new() }
135 }
136
force_flush(&mut self) -> Vec<T>137 fn force_flush(&mut self) -> Vec<T> {
138 for task in self.waiting_tasks.drain(..) {
139 task.wake()
140 }
141 mem::replace(&mut self.data, Vec::new())
142 }
143 }
144
145 struct ManualAllow<T: Unpin> {
146 data: Vec<T>,
147 allow: Rc<Allow>,
148 }
149
150 struct Allow {
151 flag: Cell<bool>,
152 tasks: RefCell<Vec<Waker>>,
153 }
154
155 impl Allow {
new() -> Self156 fn new() -> Self {
157 Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) }
158 }
159
check(&self, cx: &mut Context<'_>) -> bool160 fn check(&self, cx: &mut Context<'_>) -> bool {
161 if self.flag.get() {
162 true
163 } else {
164 self.tasks.borrow_mut().push(cx.waker().clone());
165 false
166 }
167 }
168
start(&self)169 fn start(&self) {
170 self.flag.set(true);
171 let mut tasks = self.tasks.borrow_mut();
172 for task in tasks.drain(..) {
173 task.wake();
174 }
175 }
176 }
177
178 impl<T: Unpin> Sink<T> for ManualAllow<T> {
179 type Error = ();
180
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>181 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182 if self.allow.check(cx) {
183 Poll::Ready(Ok(()))
184 } else {
185 Poll::Pending
186 }
187 }
188
start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error>189 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
190 self.data.push(item);
191 Ok(())
192 }
193
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>194 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195 Poll::Ready(Ok(()))
196 }
197
poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>198 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199 Poll::Ready(Ok(()))
200 }
201 }
202
manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>)203 fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
204 let allow = Rc::new(Allow::new());
205 let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() };
206 (manual_allow, allow)
207 }
208
209 #[test]
either_sink()210 fn either_sink() {
211 let mut s =
212 if true { Vec::<i32>::new().left_sink() } else { VecDeque::<i32>::new().right_sink() };
213
214 Pin::new(&mut s).start_send(0).unwrap();
215 }
216
217 #[test]
vec_sink()218 fn vec_sink() {
219 let mut v = Vec::new();
220 Pin::new(&mut v).start_send(0).unwrap();
221 Pin::new(&mut v).start_send(1).unwrap();
222 assert_eq!(v, vec![0, 1]);
223 block_on(v.flush()).unwrap();
224 assert_eq!(v, vec![0, 1]);
225 }
226
227 #[test]
vecdeque_sink()228 fn vecdeque_sink() {
229 let mut deque = VecDeque::new();
230 Pin::new(&mut deque).start_send(2).unwrap();
231 Pin::new(&mut deque).start_send(3).unwrap();
232
233 assert_eq!(deque.pop_front(), Some(2));
234 assert_eq!(deque.pop_front(), Some(3));
235 assert_eq!(deque.pop_front(), None);
236 }
237
238 #[test]
send()239 fn send() {
240 let mut v = Vec::new();
241
242 block_on(v.send(0)).unwrap();
243 assert_eq!(v, vec![0]);
244
245 block_on(v.send(1)).unwrap();
246 assert_eq!(v, vec![0, 1]);
247
248 block_on(v.send(2)).unwrap();
249 assert_eq!(v, vec![0, 1, 2]);
250 }
251
252 #[test]
send_all()253 fn send_all() {
254 let mut v = Vec::new();
255
256 block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap();
257 assert_eq!(v, vec![0, 1]);
258
259 block_on(v.send_all(&mut stream::iter(vec![2, 3]).map(Ok))).unwrap();
260 assert_eq!(v, vec![0, 1, 2, 3]);
261
262 block_on(v.send_all(&mut stream::iter(vec![4, 5]).map(Ok))).unwrap();
263 assert_eq!(v, vec![0, 1, 2, 3, 4, 5]);
264 }
265
266 // Test that `start_send` on an `mpsc` channel does indeed block when the
267 // channel is full
268 #[test]
mpsc_blocking_start_send()269 fn mpsc_blocking_start_send() {
270 let (mut tx, mut rx) = mpsc::channel::<i32>(0);
271
272 block_on(future::lazy(|_| {
273 tx.start_send(0).unwrap();
274
275 flag_cx(|flag, cx| {
276 let mut task = StartSendFut::new(tx, 1);
277
278 assert!(task.poll_unpin(cx).is_pending());
279 assert!(!flag.take());
280 sassert_next(&mut rx, 0);
281 assert!(flag.take());
282 unwrap(task.poll_unpin(cx));
283 assert!(!flag.take());
284 sassert_next(&mut rx, 1);
285 })
286 }));
287 }
288
289 // test `flush` by using `with` to make the first insertion into a sink block
290 // until a oneshot is completed
291 #[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
292 #[test]
with_flush()293 fn with_flush() {
294 let (tx, rx) = oneshot::channel();
295 let mut block = rx.boxed();
296 let mut sink = Vec::new().with(|elem| {
297 mem::replace(&mut block, future::ok(()).boxed())
298 .map_ok(move |()| elem + 1)
299 .map_err(|_| -> Never { panic!() })
300 });
301
302 assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(()));
303
304 flag_cx(|flag, cx| {
305 let mut task = sink.flush();
306 assert!(task.poll_unpin(cx).is_pending());
307 tx.send(()).unwrap();
308 assert!(flag.take());
309
310 unwrap(task.poll_unpin(cx));
311
312 block_on(sink.send(1)).unwrap();
313 assert_eq!(sink.get_ref(), &[1, 2]);
314 })
315 }
316
317 // test simple use of with to change data
318 #[test]
with_as_map()319 fn with_as_map() {
320 let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2));
321 block_on(sink.send(0)).unwrap();
322 block_on(sink.send(1)).unwrap();
323 block_on(sink.send(2)).unwrap();
324 assert_eq!(sink.get_ref(), &[0, 2, 4]);
325 }
326
327 // test simple use of with_flat_map
328 #[test]
with_flat_map()329 fn with_flat_map() {
330 let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok));
331 block_on(sink.send(0)).unwrap();
332 block_on(sink.send(1)).unwrap();
333 block_on(sink.send(2)).unwrap();
334 block_on(sink.send(3)).unwrap();
335 assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]);
336 }
337
338 // Check that `with` propagates `poll_ready` to the inner sink.
339 // Regression test for the issue #1834.
340 #[test]
with_propagates_poll_ready()341 fn with_propagates_poll_ready() {
342 let (tx, mut rx) = mpsc::channel::<i32>(0);
343 let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10));
344
345 block_on(future::lazy(|_| {
346 flag_cx(|flag, cx| {
347 let mut tx = Pin::new(&mut tx);
348
349 // Should be ready for the first item.
350 assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
351 assert_eq!(tx.as_mut().start_send(0), Ok(()));
352
353 // Should be ready for the second item only after the first one is received.
354 assert_eq!(tx.as_mut().poll_ready(cx), Poll::Pending);
355 assert!(!flag.take());
356 sassert_next(&mut rx, 10);
357 assert!(flag.take());
358 assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
359 assert_eq!(tx.as_mut().start_send(1), Ok(()));
360 })
361 }));
362 }
363
364 // test that the `with` sink doesn't require the underlying sink to flush,
365 // but doesn't claim to be flushed until the underlying sink is
366 #[test]
with_flush_propagate()367 fn with_flush_propagate() {
368 let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>);
369 flag_cx(|flag, cx| {
370 unwrap(Pin::new(&mut sink).poll_ready(cx));
371 Pin::new(&mut sink).start_send(Some(0)).unwrap();
372 unwrap(Pin::new(&mut sink).poll_ready(cx));
373 Pin::new(&mut sink).start_send(Some(1)).unwrap();
374
375 {
376 let mut task = sink.flush();
377 assert!(task.poll_unpin(cx).is_pending());
378 assert!(!flag.take());
379 }
380 assert_eq!(sink.get_mut().force_flush(), vec![0, 1]);
381 assert!(flag.take());
382 unwrap(sink.flush().poll_unpin(cx));
383 })
384 }
385
386 // test that `Clone` is implemented on `with` sinks
387 #[test]
with_implements_clone()388 fn with_implements_clone() {
389 let (mut tx, rx) = mpsc::channel(5);
390
391 {
392 let mut is_positive = tx.clone().with(|item| future::ok::<bool, mpsc::SendError>(item > 0));
393
394 let mut is_long =
395 tx.clone().with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5));
396
397 block_on(is_positive.clone().send(-1)).unwrap();
398 block_on(is_long.clone().send("123456")).unwrap();
399 block_on(is_long.send("123")).unwrap();
400 block_on(is_positive.send(1)).unwrap();
401 }
402
403 block_on(tx.send(false)).unwrap();
404
405 block_on(tx.close()).unwrap();
406
407 assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![false, true, false, true, false]);
408 }
409
410 // test that a buffer is a no-nop around a sink that always accepts sends
411 #[test]
buffer_noop()412 fn buffer_noop() {
413 let mut sink = Vec::new().buffer(0);
414 block_on(sink.send(0)).unwrap();
415 block_on(sink.send(1)).unwrap();
416 assert_eq!(sink.get_ref(), &[0, 1]);
417
418 let mut sink = Vec::new().buffer(1);
419 block_on(sink.send(0)).unwrap();
420 block_on(sink.send(1)).unwrap();
421 assert_eq!(sink.get_ref(), &[0, 1]);
422 }
423
424 // test basic buffer functionality, including both filling up to capacity,
425 // and writing out when the underlying sink is ready
426 #[test]
buffer()427 fn buffer() {
428 let (sink, allow) = manual_allow::<i32>();
429 let sink = sink.buffer(2);
430
431 let sink = block_on(StartSendFut::new(sink, 0)).unwrap();
432 let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap();
433
434 flag_cx(|flag, cx| {
435 let mut task = sink.send(2);
436 assert!(task.poll_unpin(cx).is_pending());
437 assert!(!flag.take());
438 allow.start();
439 assert!(flag.take());
440 unwrap(task.poll_unpin(cx));
441 assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
442 })
443 }
444
445 #[test]
fanout_smoke()446 fn fanout_smoke() {
447 let sink1 = Vec::new();
448 let sink2 = Vec::new();
449 let mut sink = sink1.fanout(sink2);
450 block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]).map(Ok))).unwrap();
451 let (sink1, sink2) = sink.into_inner();
452 assert_eq!(sink1, vec![1, 2, 3]);
453 assert_eq!(sink2, vec![1, 2, 3]);
454 }
455
456 #[test]
fanout_backpressure()457 fn fanout_backpressure() {
458 let (left_send, mut left_recv) = mpsc::channel(0);
459 let (right_send, mut right_recv) = mpsc::channel(0);
460 let sink = left_send.fanout(right_send);
461
462 let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap();
463
464 flag_cx(|flag, cx| {
465 let mut task = sink.send(2);
466 assert!(!flag.take());
467 assert!(task.poll_unpin(cx).is_pending());
468 assert_eq!(block_on(left_recv.next()), Some(0));
469 assert!(flag.take());
470 assert!(task.poll_unpin(cx).is_pending());
471 assert_eq!(block_on(right_recv.next()), Some(0));
472 assert!(flag.take());
473
474 assert!(task.poll_unpin(cx).is_pending());
475 assert_eq!(block_on(left_recv.next()), Some(2));
476 assert!(flag.take());
477 assert!(task.poll_unpin(cx).is_pending());
478 assert_eq!(block_on(right_recv.next()), Some(2));
479 assert!(flag.take());
480
481 unwrap(task.poll_unpin(cx));
482 // make sure receivers live until end of test to prevent send errors
483 drop(left_recv);
484 drop(right_recv);
485 })
486 }
487
488 #[test]
sink_map_err()489 fn sink_map_err() {
490 {
491 let cx = &mut panic_context();
492 let (tx, _rx) = mpsc::channel(1);
493 let mut tx = tx.sink_map_err(|_| ());
494 assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
495 assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
496 }
497
498 let tx = mpsc::channel(0).0;
499 assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(()));
500 }
501
502 #[test]
sink_unfold()503 fn sink_unfold() {
504 block_on(poll_fn(|cx| {
505 let (tx, mut rx) = mpsc::channel(1);
506 let unfold = sink::unfold((), |(), i: i32| {
507 let mut tx = tx.clone();
508 async move {
509 tx.send(i).await.unwrap();
510 Ok::<_, String>(())
511 }
512 });
513 futures::pin_mut!(unfold);
514 assert_eq!(unfold.as_mut().start_send(1), Ok(()));
515 assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
516 assert_eq!(rx.try_next().unwrap(), Some(1));
517
518 assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
519 assert_eq!(unfold.as_mut().start_send(2), Ok(()));
520 assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
521 assert_eq!(unfold.as_mut().start_send(3), Ok(()));
522 assert_eq!(rx.try_next().unwrap(), Some(2));
523 assert!(rx.try_next().is_err());
524 assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
525 assert_eq!(unfold.as_mut().start_send(4), Ok(()));
526 assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full
527 assert_eq!(rx.try_next().unwrap(), Some(3));
528 assert_eq!(rx.try_next().unwrap(), Some(4));
529
530 Poll::Ready(())
531 }))
532 }
533
534 #[test]
err_into()535 fn err_into() {
536 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
537 struct ErrIntoTest;
538
539 impl From<mpsc::SendError> for ErrIntoTest {
540 fn from(_: mpsc::SendError) -> Self {
541 Self
542 }
543 }
544
545 {
546 let cx = &mut panic_context();
547 let (tx, _rx) = mpsc::channel(1);
548 let mut tx: SinkErrInto<mpsc::Sender<()>, _, ErrIntoTest> = tx.sink_err_into();
549 assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
550 assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
551 }
552
553 let tx = mpsc::channel(0).0;
554 assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest));
555 }
556