1 | //! An unbounded set of streams |
2 | |
3 | use core::fmt::{self, Debug}; |
4 | use core::iter::FromIterator; |
5 | use core::pin::Pin; |
6 | |
7 | use futures_core::ready; |
8 | use futures_core::stream::{FusedStream, Stream}; |
9 | use futures_core::task::{Context, Poll}; |
10 | |
11 | use super::assert_stream; |
12 | use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture}; |
13 | |
14 | /// An unbounded set of streams |
15 | /// |
16 | /// This "combinator" provides the ability to maintain a set of streams |
17 | /// and drive them all to completion. |
18 | /// |
19 | /// Streams are pushed into this set and their realized values are |
20 | /// yielded as they become ready. Streams will only be polled when they |
21 | /// generate notifications. This allows to coordinate a large number of streams. |
22 | /// |
23 | /// Note that you can create a ready-made `SelectAll` via the |
24 | /// `select_all` function in the `stream` module, or you can start with an |
25 | /// empty set with the `SelectAll::new` constructor. |
26 | #[must_use = "streams do nothing unless polled" ] |
27 | pub struct SelectAll<St> { |
28 | inner: FuturesUnordered<StreamFuture<St>>, |
29 | } |
30 | |
31 | impl<St: Debug> Debug for SelectAll<St> { |
32 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
33 | write!(f, "SelectAll {{ ... }}" ) |
34 | } |
35 | } |
36 | |
37 | impl<St: Stream + Unpin> SelectAll<St> { |
38 | /// Constructs a new, empty `SelectAll` |
39 | /// |
40 | /// The returned `SelectAll` does not contain any streams and, in this |
41 | /// state, `SelectAll::poll` will return `Poll::Ready(None)`. |
42 | pub fn new() -> Self { |
43 | Self { inner: FuturesUnordered::new() } |
44 | } |
45 | |
46 | /// Returns the number of streams contained in the set. |
47 | /// |
48 | /// This represents the total number of in-flight streams. |
49 | pub fn len(&self) -> usize { |
50 | self.inner.len() |
51 | } |
52 | |
53 | /// Returns `true` if the set contains no streams |
54 | pub fn is_empty(&self) -> bool { |
55 | self.inner.is_empty() |
56 | } |
57 | |
58 | /// Push a stream into the set. |
59 | /// |
60 | /// This function submits the given stream to the set for managing. This |
61 | /// function will not call `poll` on the submitted stream. The caller must |
62 | /// ensure that `SelectAll::poll` is called in order to receive task |
63 | /// notifications. |
64 | pub fn push(&mut self, stream: St) { |
65 | self.inner.push(stream.into_future()); |
66 | } |
67 | |
68 | /// Returns an iterator that allows inspecting each stream in the set. |
69 | pub fn iter(&self) -> Iter<'_, St> { |
70 | Iter(self.inner.iter()) |
71 | } |
72 | |
73 | /// Returns an iterator that allows modifying each stream in the set. |
74 | pub fn iter_mut(&mut self) -> IterMut<'_, St> { |
75 | IterMut(self.inner.iter_mut()) |
76 | } |
77 | |
78 | /// Clears the set, removing all streams. |
79 | pub fn clear(&mut self) { |
80 | self.inner.clear() |
81 | } |
82 | } |
83 | |
84 | impl<St: Stream + Unpin> Default for SelectAll<St> { |
85 | fn default() -> Self { |
86 | Self::new() |
87 | } |
88 | } |
89 | |
90 | impl<St: Stream + Unpin> Stream for SelectAll<St> { |
91 | type Item = St::Item; |
92 | |
93 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
94 | loop { |
95 | match ready!(self.inner.poll_next_unpin(cx)) { |
96 | Some((Some(item), remaining)) => { |
97 | self.push(remaining); |
98 | return Poll::Ready(Some(item)); |
99 | } |
100 | Some((None, _)) => { |
101 | // `FuturesUnordered` thinks it isn't terminated |
102 | // because it yielded a Some. |
103 | // We do not return, but poll `FuturesUnordered` |
104 | // in the next loop iteration. |
105 | } |
106 | None => return Poll::Ready(None), |
107 | } |
108 | } |
109 | } |
110 | } |
111 | |
112 | impl<St: Stream + Unpin> FusedStream for SelectAll<St> { |
113 | fn is_terminated(&self) -> bool { |
114 | self.inner.is_terminated() |
115 | } |
116 | } |
117 | |
118 | /// Convert a list of streams into a `Stream` of results from the streams. |
119 | /// |
120 | /// This essentially takes a list of streams (e.g. a vector, an iterator, etc.) |
121 | /// and bundles them together into a single stream. |
122 | /// The stream will yield items as they become available on the underlying |
123 | /// streams internally, in the order they become available. |
124 | /// |
125 | /// Note that the returned set can also be used to dynamically push more |
126 | /// streams into the set as they become available. |
127 | /// |
128 | /// This function is only available when the `std` or `alloc` feature of this |
129 | /// library is activated, and it is activated by default. |
130 | pub fn select_all<I>(streams: I) -> SelectAll<I::Item> |
131 | where |
132 | I: IntoIterator, |
133 | I::Item: Stream + Unpin, |
134 | { |
135 | let mut set = SelectAll::new(); |
136 | |
137 | for stream in streams { |
138 | set.push(stream); |
139 | } |
140 | |
141 | assert_stream::<<I::Item as Stream>::Item, _>(set) |
142 | } |
143 | |
144 | impl<St: Stream + Unpin> FromIterator<St> for SelectAll<St> { |
145 | fn from_iter<T: IntoIterator<Item = St>>(iter: T) -> Self { |
146 | select_all(iter) |
147 | } |
148 | } |
149 | |
150 | impl<St: Stream + Unpin> Extend<St> for SelectAll<St> { |
151 | fn extend<T: IntoIterator<Item = St>>(&mut self, iter: T) { |
152 | for st in iter { |
153 | self.push(st) |
154 | } |
155 | } |
156 | } |
157 | |
158 | impl<St: Stream + Unpin> IntoIterator for SelectAll<St> { |
159 | type Item = St; |
160 | type IntoIter = IntoIter<St>; |
161 | |
162 | fn into_iter(self) -> Self::IntoIter { |
163 | IntoIter(self.inner.into_iter()) |
164 | } |
165 | } |
166 | |
167 | impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll<St> { |
168 | type Item = &'a St; |
169 | type IntoIter = Iter<'a, St>; |
170 | |
171 | fn into_iter(self) -> Self::IntoIter { |
172 | self.iter() |
173 | } |
174 | } |
175 | |
176 | impl<'a, St: Stream + Unpin> IntoIterator for &'a mut SelectAll<St> { |
177 | type Item = &'a mut St; |
178 | type IntoIter = IterMut<'a, St>; |
179 | |
180 | fn into_iter(self) -> Self::IntoIter { |
181 | self.iter_mut() |
182 | } |
183 | } |
184 | |
185 | /// Immutable iterator over all streams in the unordered set. |
186 | #[derive(Debug)] |
187 | pub struct Iter<'a, St: Unpin>(futures_unordered::Iter<'a, StreamFuture<St>>); |
188 | |
189 | /// Mutable iterator over all streams in the unordered set. |
190 | #[derive(Debug)] |
191 | pub struct IterMut<'a, St: Unpin>(futures_unordered::IterMut<'a, StreamFuture<St>>); |
192 | |
193 | /// Owned iterator over all streams in the unordered set. |
194 | #[derive(Debug)] |
195 | pub struct IntoIter<St: Unpin>(futures_unordered::IntoIter<StreamFuture<St>>); |
196 | |
197 | impl<'a, St: Stream + Unpin> Iterator for Iter<'a, St> { |
198 | type Item = &'a St; |
199 | |
200 | fn next(&mut self) -> Option<Self::Item> { |
201 | let st = self.0.next()?; |
202 | let next = st.get_ref(); |
203 | // This should always be true because FuturesUnordered removes completed futures. |
204 | debug_assert!(next.is_some()); |
205 | next |
206 | } |
207 | |
208 | fn size_hint(&self) -> (usize, Option<usize>) { |
209 | self.0.size_hint() |
210 | } |
211 | } |
212 | |
213 | impl<St: Stream + Unpin> ExactSizeIterator for Iter<'_, St> {} |
214 | |
215 | impl<'a, St: Stream + Unpin> Iterator for IterMut<'a, St> { |
216 | type Item = &'a mut St; |
217 | |
218 | fn next(&mut self) -> Option<Self::Item> { |
219 | let st = self.0.next()?; |
220 | let next = st.get_mut(); |
221 | // This should always be true because FuturesUnordered removes completed futures. |
222 | debug_assert!(next.is_some()); |
223 | next |
224 | } |
225 | |
226 | fn size_hint(&self) -> (usize, Option<usize>) { |
227 | self.0.size_hint() |
228 | } |
229 | } |
230 | |
231 | impl<St: Stream + Unpin> ExactSizeIterator for IterMut<'_, St> {} |
232 | |
233 | impl<St: Stream + Unpin> Iterator for IntoIter<St> { |
234 | type Item = St; |
235 | |
236 | fn next(&mut self) -> Option<Self::Item> { |
237 | let st = self.0.next()?; |
238 | let next = st.into_inner(); |
239 | // This should always be true because FuturesUnordered removes completed futures. |
240 | debug_assert!(next.is_some()); |
241 | next |
242 | } |
243 | |
244 | fn size_hint(&self) -> (usize, Option<usize>) { |
245 | self.0.size_hint() |
246 | } |
247 | } |
248 | |
249 | impl<St: Stream + Unpin> ExactSizeIterator for IntoIter<St> {} |
250 | |