• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::stream_ext::Fuse;
2 use crate::Stream;
3 
4 use core::pin::Pin;
5 use core::task::{Context, Poll};
6 use pin_project_lite::pin_project;
7 
8 pin_project! {
9     /// Stream returned by the [`merge`](super::StreamExt::merge) method.
10     pub struct Merge<T, U> {
11         #[pin]
12         a: Fuse<T>,
13         #[pin]
14         b: Fuse<U>,
15         // When `true`, poll `a` first, otherwise, `poll` b`.
16         a_first: bool,
17     }
18 }
19 
20 impl<T, U> Merge<T, U> {
new(a: T, b: U) -> Merge<T, U> where T: Stream, U: Stream,21     pub(super) fn new(a: T, b: U) -> Merge<T, U>
22     where
23         T: Stream,
24         U: Stream,
25     {
26         Merge {
27             a: Fuse::new(a),
28             b: Fuse::new(b),
29             a_first: true,
30         }
31     }
32 }
33 
34 impl<T, U> Stream for Merge<T, U>
35 where
36     T: Stream,
37     U: Stream<Item = T::Item>,
38 {
39     type Item = T::Item;
40 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>>41     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
42         let me = self.project();
43         let a_first = *me.a_first;
44 
45         // Toggle the flag
46         *me.a_first = !a_first;
47 
48         if a_first {
49             poll_next(me.a, me.b, cx)
50         } else {
51             poll_next(me.b, me.a, cx)
52         }
53     }
54 
size_hint(&self) -> (usize, Option<usize>)55     fn size_hint(&self) -> (usize, Option<usize>) {
56         super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
57     }
58 }
59 
poll_next<T, U>( first: Pin<&mut T>, second: Pin<&mut U>, cx: &mut Context<'_>, ) -> Poll<Option<T::Item>> where T: Stream, U: Stream<Item = T::Item>,60 fn poll_next<T, U>(
61     first: Pin<&mut T>,
62     second: Pin<&mut U>,
63     cx: &mut Context<'_>,
64 ) -> Poll<Option<T::Item>>
65 where
66     T: Stream,
67     U: Stream<Item = T::Item>,
68 {
69     use Poll::*;
70 
71     let mut done = true;
72 
73     match first.poll_next(cx) {
74         Ready(Some(val)) => return Ready(Some(val)),
75         Ready(None) => {}
76         Pending => done = false,
77     }
78 
79     match second.poll_next(cx) {
80         Ready(Some(val)) => return Ready(Some(val)),
81         Ready(None) => {}
82         Pending => done = false,
83     }
84 
85     if done {
86         Ready(None)
87     } else {
88         Pending
89     }
90 }
91