1 | use crate::stream::{Fuse, StreamExt}; |
2 | use core::cmp; |
3 | use core::pin::Pin; |
4 | use futures_core::stream::{FusedStream, Stream}; |
5 | use futures_core::task::{Context, Poll}; |
6 | use pin_project_lite::pin_project; |
7 | |
8 | pin_project! { |
9 | /// Stream for the [`zip`](super::StreamExt::zip) method. |
10 | #[derive(Debug)] |
11 | #[must_use = "streams do nothing unless polled" ] |
12 | pub struct Zip<St1: Stream, St2: Stream> { |
13 | #[pin] |
14 | stream1: Fuse<St1>, |
15 | #[pin] |
16 | stream2: Fuse<St2>, |
17 | queued1: Option<St1::Item>, |
18 | queued2: Option<St2::Item>, |
19 | } |
20 | } |
21 | |
22 | impl<St1: Stream, St2: Stream> Zip<St1, St2> { |
23 | pub(super) fn new(stream1: St1, stream2: St2) -> Self { |
24 | Self { stream1: stream1.fuse(), stream2: stream2.fuse(), queued1: None, queued2: None } |
25 | } |
26 | |
27 | /// Acquires a reference to the underlying streams that this combinator is |
28 | /// pulling from. |
29 | pub fn get_ref(&self) -> (&St1, &St2) { |
30 | (self.stream1.get_ref(), self.stream2.get_ref()) |
31 | } |
32 | |
33 | /// Acquires a mutable reference to the underlying streams that this |
34 | /// combinator is pulling from. |
35 | /// |
36 | /// Note that care must be taken to avoid tampering with the state of the |
37 | /// stream which may otherwise confuse this combinator. |
38 | pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { |
39 | (self.stream1.get_mut(), self.stream2.get_mut()) |
40 | } |
41 | |
42 | /// Acquires a pinned mutable reference to the underlying streams that this |
43 | /// combinator is pulling from. |
44 | /// |
45 | /// Note that care must be taken to avoid tampering with the state of the |
46 | /// stream which may otherwise confuse this combinator. |
47 | pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { |
48 | let this = self.project(); |
49 | (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) |
50 | } |
51 | |
52 | /// Consumes this combinator, returning the underlying streams. |
53 | /// |
54 | /// Note that this may discard intermediate state of this combinator, so |
55 | /// care should be taken to avoid losing resources when this is called. |
56 | pub fn into_inner(self) -> (St1, St2) { |
57 | (self.stream1.into_inner(), self.stream2.into_inner()) |
58 | } |
59 | } |
60 | |
61 | impl<St1, St2> FusedStream for Zip<St1, St2> |
62 | where |
63 | St1: Stream, |
64 | St2: Stream, |
65 | { |
66 | fn is_terminated(&self) -> bool { |
67 | self.stream1.is_terminated() && self.stream2.is_terminated() |
68 | } |
69 | } |
70 | |
71 | impl<St1, St2> Stream for Zip<St1, St2> |
72 | where |
73 | St1: Stream, |
74 | St2: Stream, |
75 | { |
76 | type Item = (St1::Item, St2::Item); |
77 | |
78 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
79 | let mut this = self.project(); |
80 | |
81 | if this.queued1.is_none() { |
82 | match this.stream1.as_mut().poll_next(cx) { |
83 | Poll::Ready(Some(item1)) => *this.queued1 = Some(item1), |
84 | Poll::Ready(None) | Poll::Pending => {} |
85 | } |
86 | } |
87 | if this.queued2.is_none() { |
88 | match this.stream2.as_mut().poll_next(cx) { |
89 | Poll::Ready(Some(item2)) => *this.queued2 = Some(item2), |
90 | Poll::Ready(None) | Poll::Pending => {} |
91 | } |
92 | } |
93 | |
94 | if this.queued1.is_some() && this.queued2.is_some() { |
95 | let pair = (this.queued1.take().unwrap(), this.queued2.take().unwrap()); |
96 | Poll::Ready(Some(pair)) |
97 | } else if this.stream1.is_done() || this.stream2.is_done() { |
98 | Poll::Ready(None) |
99 | } else { |
100 | Poll::Pending |
101 | } |
102 | } |
103 | |
104 | fn size_hint(&self) -> (usize, Option<usize>) { |
105 | let queued1_len = usize::from(self.queued1.is_some()); |
106 | let queued2_len = usize::from(self.queued2.is_some()); |
107 | let (stream1_lower, stream1_upper) = self.stream1.size_hint(); |
108 | let (stream2_lower, stream2_upper) = self.stream2.size_hint(); |
109 | |
110 | let stream1_lower = stream1_lower.saturating_add(queued1_len); |
111 | let stream2_lower = stream2_lower.saturating_add(queued2_len); |
112 | |
113 | let lower = cmp::min(stream1_lower, stream2_lower); |
114 | |
115 | let upper = match (stream1_upper, stream2_upper) { |
116 | (Some(x), Some(y)) => { |
117 | let x = x.saturating_add(queued1_len); |
118 | let y = y.saturating_add(queued2_len); |
119 | Some(cmp::min(x, y)) |
120 | } |
121 | (Some(x), None) => x.checked_add(queued1_len), |
122 | (None, Some(y)) => y.checked_add(queued2_len), |
123 | (None, None) => None, |
124 | }; |
125 | |
126 | (lower, upper) |
127 | } |
128 | } |
129 | |