1 use crate::stream_ext::Fuse;
2 use crate::Stream;
3
4 use core::pin::Pin;
5 use core::task::{Context, Poll};
6 use pin_project_lite::pin_project;
7
8 pin_project! {
9 /// Stream returned by the [`merge`](super::StreamExt::merge) method.
10 pub struct Merge<T, U> {
11 #[pin]
12 a: Fuse<T>,
13 #[pin]
14 b: Fuse<U>,
15 // When `true`, poll `a` first, otherwise, `poll` b`.
16 a_first: bool,
17 }
18 }
19
20 impl<T, U> Merge<T, U> {
new(a: T, b: U) -> Merge<T, U> where T: Stream, U: Stream,21 pub(super) fn new(a: T, b: U) -> Merge<T, U>
22 where
23 T: Stream,
24 U: Stream,
25 {
26 Merge {
27 a: Fuse::new(a),
28 b: Fuse::new(b),
29 a_first: true,
30 }
31 }
32 }
33
34 impl<T, U> Stream for Merge<T, U>
35 where
36 T: Stream,
37 U: Stream<Item = T::Item>,
38 {
39 type Item = T::Item;
40
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>>41 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
42 let me = self.project();
43 let a_first = *me.a_first;
44
45 // Toggle the flag
46 *me.a_first = !a_first;
47
48 if a_first {
49 poll_next(me.a, me.b, cx)
50 } else {
51 poll_next(me.b, me.a, cx)
52 }
53 }
54
size_hint(&self) -> (usize, Option<usize>)55 fn size_hint(&self) -> (usize, Option<usize>) {
56 super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
57 }
58 }
59
poll_next<T, U>( first: Pin<&mut T>, second: Pin<&mut U>, cx: &mut Context<'_>, ) -> Poll<Option<T::Item>> where T: Stream, U: Stream<Item = T::Item>,60 fn poll_next<T, U>(
61 first: Pin<&mut T>,
62 second: Pin<&mut U>,
63 cx: &mut Context<'_>,
64 ) -> Poll<Option<T::Item>>
65 where
66 T: Stream,
67 U: Stream<Item = T::Item>,
68 {
69 use Poll::*;
70
71 let mut done = true;
72
73 match first.poll_next(cx) {
74 Ready(Some(val)) => return Ready(Some(val)),
75 Ready(None) => {}
76 Pending => done = false,
77 }
78
79 match second.poll_next(cx) {
80 Ready(Some(val)) => return Ready(Some(val)),
81 Ready(None) => {}
82 Pending => done = false,
83 }
84
85 if done {
86 Ready(None)
87 } else {
88 Pending
89 }
90 }
91