| 1 | use crate::stream::{Fuse, StreamExt}; |
| 2 | use core::cmp; |
| 3 | use core::pin::Pin; |
| 4 | use futures_core::stream::{FusedStream, Stream}; |
| 5 | use futures_core::task::{Context, Poll}; |
| 6 | use pin_project_lite::pin_project; |
| 7 | |
| 8 | pin_project! { |
| 9 | /// Stream for the [`zip`](super::StreamExt::zip) method. |
| 10 | #[derive (Debug)] |
| 11 | #[must_use = "streams do nothing unless polled" ] |
| 12 | pub struct Zip<St1: Stream, St2: Stream> { |
| 13 | #[pin] |
| 14 | stream1: Fuse<St1>, |
| 15 | #[pin] |
| 16 | stream2: Fuse<St2>, |
| 17 | queued1: Option<St1::Item>, |
| 18 | queued2: Option<St2::Item>, |
| 19 | } |
| 20 | } |
| 21 | |
| 22 | impl<St1: Stream, St2: Stream> Zip<St1, St2> { |
| 23 | pub(super) fn new(stream1: St1, stream2: St2) -> Self { |
| 24 | Self { stream1: stream1.fuse(), stream2: stream2.fuse(), queued1: None, queued2: None } |
| 25 | } |
| 26 | |
| 27 | /// Acquires a reference to the underlying streams that this combinator is |
| 28 | /// pulling from. |
| 29 | pub fn get_ref(&self) -> (&St1, &St2) { |
| 30 | (self.stream1.get_ref(), self.stream2.get_ref()) |
| 31 | } |
| 32 | |
| 33 | /// Acquires a mutable reference to the underlying streams that this |
| 34 | /// combinator is pulling from. |
| 35 | /// |
| 36 | /// Note that care must be taken to avoid tampering with the state of the |
| 37 | /// stream which may otherwise confuse this combinator. |
| 38 | pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { |
| 39 | (self.stream1.get_mut(), self.stream2.get_mut()) |
| 40 | } |
| 41 | |
| 42 | /// Acquires a pinned mutable reference to the underlying streams that this |
| 43 | /// combinator is pulling from. |
| 44 | /// |
| 45 | /// Note that care must be taken to avoid tampering with the state of the |
| 46 | /// stream which may otherwise confuse this combinator. |
| 47 | pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { |
| 48 | let this = self.project(); |
| 49 | (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) |
| 50 | } |
| 51 | |
| 52 | /// Consumes this combinator, returning the underlying streams. |
| 53 | /// |
| 54 | /// Note that this may discard intermediate state of this combinator, so |
| 55 | /// care should be taken to avoid losing resources when this is called. |
| 56 | pub fn into_inner(self) -> (St1, St2) { |
| 57 | (self.stream1.into_inner(), self.stream2.into_inner()) |
| 58 | } |
| 59 | } |
| 60 | |
| 61 | impl<St1, St2> FusedStream for Zip<St1, St2> |
| 62 | where |
| 63 | St1: Stream, |
| 64 | St2: Stream, |
| 65 | { |
| 66 | fn is_terminated(&self) -> bool { |
| 67 | self.stream1.is_terminated() && self.stream2.is_terminated() |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | impl<St1, St2> Stream for Zip<St1, St2> |
| 72 | where |
| 73 | St1: Stream, |
| 74 | St2: Stream, |
| 75 | { |
| 76 | type Item = (St1::Item, St2::Item); |
| 77 | |
| 78 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 79 | let mut this = self.project(); |
| 80 | |
| 81 | if this.queued1.is_none() { |
| 82 | match this.stream1.as_mut().poll_next(cx) { |
| 83 | Poll::Ready(Some(item1)) => *this.queued1 = Some(item1), |
| 84 | Poll::Ready(None) | Poll::Pending => {} |
| 85 | } |
| 86 | } |
| 87 | if this.queued2.is_none() { |
| 88 | match this.stream2.as_mut().poll_next(cx) { |
| 89 | Poll::Ready(Some(item2)) => *this.queued2 = Some(item2), |
| 90 | Poll::Ready(None) | Poll::Pending => {} |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | if this.queued1.is_some() && this.queued2.is_some() { |
| 95 | let pair = (this.queued1.take().unwrap(), this.queued2.take().unwrap()); |
| 96 | Poll::Ready(Some(pair)) |
| 97 | } else if this.stream1.is_done() || this.stream2.is_done() { |
| 98 | Poll::Ready(None) |
| 99 | } else { |
| 100 | Poll::Pending |
| 101 | } |
| 102 | } |
| 103 | |
| 104 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 105 | let queued1_len = usize::from(self.queued1.is_some()); |
| 106 | let queued2_len = usize::from(self.queued2.is_some()); |
| 107 | let (stream1_lower, stream1_upper) = self.stream1.size_hint(); |
| 108 | let (stream2_lower, stream2_upper) = self.stream2.size_hint(); |
| 109 | |
| 110 | let stream1_lower = stream1_lower.saturating_add(queued1_len); |
| 111 | let stream2_lower = stream2_lower.saturating_add(queued2_len); |
| 112 | |
| 113 | let lower = cmp::min(stream1_lower, stream2_lower); |
| 114 | |
| 115 | let upper = match (stream1_upper, stream2_upper) { |
| 116 | (Some(x), Some(y)) => { |
| 117 | let x = x.saturating_add(queued1_len); |
| 118 | let y = y.saturating_add(queued2_len); |
| 119 | Some(cmp::min(x, y)) |
| 120 | } |
| 121 | (Some(x), None) => x.checked_add(queued1_len), |
| 122 | (None, Some(y)) => y.checked_add(queued2_len), |
| 123 | (None, None) => None, |
| 124 | }; |
| 125 | |
| 126 | (lower, upper) |
| 127 | } |
| 128 | } |
| 129 | |