1use crate::stream_ext::Fuse;
2use crate::Stream;
3
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9 /// Stream returned by the [`merge`](super::StreamExt::merge) method.
10 pub struct Merge<T, U> {
11 #[pin]
12 a: Fuse<T>,
13 #[pin]
14 b: Fuse<U>,
15 // When `true`, poll `a` first, otherwise, `poll` b`.
16 a_first: bool,
17 }
18}
19
20impl<T, U> Merge<T, U> {
21 pub(super) fn new(a: T, b: U) -> Merge<T, U>
22 where
23 T: Stream,
24 U: Stream,
25 {
26 Merge {
27 a: Fuse::new(a),
28 b: Fuse::new(b),
29 a_first: true,
30 }
31 }
32}
33
34impl<T, U> Stream for Merge<T, U>
35where
36 T: Stream,
37 U: Stream<Item = T::Item>,
38{
39 type Item = T::Item;
40
41 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
42 let me = self.project();
43 let a_first = *me.a_first;
44
45 // Toggle the flag
46 *me.a_first = !a_first;
47
48 if a_first {
49 poll_next(me.a, me.b, cx)
50 } else {
51 poll_next(me.b, me.a, cx)
52 }
53 }
54
55 fn size_hint(&self) -> (usize, Option<usize>) {
56 super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
57 }
58}
59
60fn poll_next<T, U>(
61 first: Pin<&mut T>,
62 second: Pin<&mut U>,
63 cx: &mut Context<'_>,
64) -> Poll<Option<T::Item>>
65where
66 T: Stream,
67 U: Stream<Item = T::Item>,
68{
69 let mut done = true;
70
71 match first.poll_next(cx) {
72 Poll::Ready(Some(val)) => return Poll::Ready(Some(val)),
73 Poll::Ready(None) => {}
74 Poll::Pending => done = false,
75 }
76
77 match second.poll_next(cx) {
78 Poll::Ready(Some(val)) => return Poll::Ready(Some(val)),
79 Poll::Ready(None) => {}
80 Poll::Pending => done = false,
81 }
82
83 if done {
84 Poll::Ready(None)
85 } else {
86 Poll::Pending
87 }
88}
89