| 1 | use crate::stream_ext::Fuse; |
| 2 | use crate::Stream; |
| 3 | |
| 4 | use core::pin::Pin; |
| 5 | use core::task::{Context, Poll}; |
| 6 | use pin_project_lite::pin_project; |
| 7 | |
| 8 | pin_project! { |
| 9 | /// Stream returned by the [`merge`](super::StreamExt::merge) method. |
| 10 | pub struct Merge<T, U> { |
| 11 | #[pin] |
| 12 | a: Fuse<T>, |
| 13 | #[pin] |
| 14 | b: Fuse<U>, |
| 15 | // When `true`, poll `a` first, otherwise, `poll` b`. |
| 16 | a_first: bool, |
| 17 | } |
| 18 | } |
| 19 | |
| 20 | impl<T, U> Merge<T, U> { |
| 21 | pub(super) fn new(a: T, b: U) -> Merge<T, U> |
| 22 | where |
| 23 | T: Stream, |
| 24 | U: Stream, |
| 25 | { |
| 26 | Merge { |
| 27 | a: Fuse::new(a), |
| 28 | b: Fuse::new(b), |
| 29 | a_first: true, |
| 30 | } |
| 31 | } |
| 32 | } |
| 33 | |
| 34 | impl<T, U> Stream for Merge<T, U> |
| 35 | where |
| 36 | T: Stream, |
| 37 | U: Stream<Item = T::Item>, |
| 38 | { |
| 39 | type Item = T::Item; |
| 40 | |
| 41 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { |
| 42 | let me = self.project(); |
| 43 | let a_first = *me.a_first; |
| 44 | |
| 45 | // Toggle the flag |
| 46 | *me.a_first = !a_first; |
| 47 | |
| 48 | if a_first { |
| 49 | poll_next(me.a, me.b, cx) |
| 50 | } else { |
| 51 | poll_next(me.b, me.a, cx) |
| 52 | } |
| 53 | } |
| 54 | |
| 55 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 56 | super::merge_size_hints(self.a.size_hint(), self.b.size_hint()) |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | fn poll_next<T, U>( |
| 61 | first: Pin<&mut T>, |
| 62 | second: Pin<&mut U>, |
| 63 | cx: &mut Context<'_>, |
| 64 | ) -> Poll<Option<T::Item>> |
| 65 | where |
| 66 | T: Stream, |
| 67 | U: Stream<Item = T::Item>, |
| 68 | { |
| 69 | let mut done = true; |
| 70 | |
| 71 | match first.poll_next(cx) { |
| 72 | Poll::Ready(Some(val)) => return Poll::Ready(Some(val)), |
| 73 | Poll::Ready(None) => {} |
| 74 | Poll::Pending => done = false, |
| 75 | } |
| 76 | |
| 77 | match second.poll_next(cx) { |
| 78 | Poll::Ready(Some(val)) => return Poll::Ready(Some(val)), |
| 79 | Poll::Ready(None) => {} |
| 80 | Poll::Pending => done = false, |
| 81 | } |
| 82 | |
| 83 | if done { |
| 84 | Poll::Ready(None) |
| 85 | } else { |
| 86 | Poll::Pending |
| 87 | } |
| 88 | } |
| 89 | |