1 | use crate::stream_ext::Fuse; |
2 | use crate::Stream; |
3 | |
4 | use core::pin::Pin; |
5 | use core::task::{Context, Poll}; |
6 | use pin_project_lite::pin_project; |
7 | |
8 | pin_project! { |
9 | /// Stream returned by the [`merge`](super::StreamExt::merge) method. |
10 | pub struct Merge<T, U> { |
11 | #[pin] |
12 | a: Fuse<T>, |
13 | #[pin] |
14 | b: Fuse<U>, |
15 | // When `true`, poll `a` first, otherwise, `poll` b`. |
16 | a_first: bool, |
17 | } |
18 | } |
19 | |
20 | impl<T, U> Merge<T, U> { |
21 | pub(super) fn new(a: T, b: U) -> Merge<T, U> |
22 | where |
23 | T: Stream, |
24 | U: Stream, |
25 | { |
26 | Merge { |
27 | a: Fuse::new(a), |
28 | b: Fuse::new(b), |
29 | a_first: true, |
30 | } |
31 | } |
32 | } |
33 | |
34 | impl<T, U> Stream for Merge<T, U> |
35 | where |
36 | T: Stream, |
37 | U: Stream<Item = T::Item>, |
38 | { |
39 | type Item = T::Item; |
40 | |
41 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { |
42 | let me = self.project(); |
43 | let a_first = *me.a_first; |
44 | |
45 | // Toggle the flag |
46 | *me.a_first = !a_first; |
47 | |
48 | if a_first { |
49 | poll_next(me.a, me.b, cx) |
50 | } else { |
51 | poll_next(me.b, me.a, cx) |
52 | } |
53 | } |
54 | |
55 | fn size_hint(&self) -> (usize, Option<usize>) { |
56 | super::merge_size_hints(self.a.size_hint(), self.b.size_hint()) |
57 | } |
58 | } |
59 | |
60 | fn poll_next<T, U>( |
61 | first: Pin<&mut T>, |
62 | second: Pin<&mut U>, |
63 | cx: &mut Context<'_>, |
64 | ) -> Poll<Option<T::Item>> |
65 | where |
66 | T: Stream, |
67 | U: Stream<Item = T::Item>, |
68 | { |
69 | let mut done = true; |
70 | |
71 | match first.poll_next(cx) { |
72 | Poll::Ready(Some(val)) => return Poll::Ready(Some(val)), |
73 | Poll::Ready(None) => {} |
74 | Poll::Pending => done = false, |
75 | } |
76 | |
77 | match second.poll_next(cx) { |
78 | Poll::Ready(Some(val)) => return Poll::Ready(Some(val)), |
79 | Poll::Ready(None) => {} |
80 | Poll::Pending => done = false, |
81 | } |
82 | |
83 | if done { |
84 | Poll::Ready(None) |
85 | } else { |
86 | Poll::Pending |
87 | } |
88 | } |
89 | |