1use core::pin::Pin;
2use futures_core::ready;
3use futures_core::stream::{FusedStream, Stream};
4use futures_core::task::{Context, Poll};
5use pin_project_lite::pin_project;
6
7pin_project! {
8 /// Stream for the [`chain`](super::StreamExt::chain) method.
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Chain<St1, St2> {
12 #[pin]
13 first: Option<St1>,
14 #[pin]
15 second: St2,
16 }
17}
18
19// All interactions with `Pin<&mut Chain<..>>` happen through these methods
20impl<St1, St2> Chain<St1, St2>
21where
22 St1: Stream,
23 St2: Stream<Item = St1::Item>,
24{
25 pub(super) fn new(stream1: St1, stream2: St2) -> Self {
26 Self { first: Some(stream1), second: stream2 }
27 }
28}
29
30impl<St1, St2> FusedStream for Chain<St1, St2>
31where
32 St1: Stream,
33 St2: FusedStream<Item = St1::Item>,
34{
35 fn is_terminated(&self) -> bool {
36 self.first.is_none() && self.second.is_terminated()
37 }
38}
39
40impl<St1, St2> Stream for Chain<St1, St2>
41where
42 St1: Stream,
43 St2: Stream<Item = St1::Item>,
44{
45 type Item = St1::Item;
46
47 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48 let mut this = self.project();
49 if let Some(first) = this.first.as_mut().as_pin_mut() {
50 if let Some(item) = ready!(first.poll_next(cx)) {
51 return Poll::Ready(Some(item));
52 }
53
54 this.first.set(None);
55 }
56 this.second.poll_next(cx)
57 }
58
59 fn size_hint(&self) -> (usize, Option<usize>) {
60 if let Some(first) = &self.first {
61 let (first_lower, first_upper) = first.size_hint();
62 let (second_lower, second_upper) = self.second.size_hint();
63
64 let lower = first_lower.saturating_add(second_lower);
65
66 let upper = match (first_upper, second_upper) {
67 (Some(x), Some(y)) => x.checked_add(y),
68 _ => None,
69 };
70
71 (lower, upper)
72 } else {
73 self.second.size_hint()
74 }
75 }
76}
77