| 1 | //! An unbounded set of streams |
| 2 | |
| 3 | use core::fmt::{self, Debug}; |
| 4 | use core::iter::FromIterator; |
| 5 | use core::pin::Pin; |
| 6 | |
| 7 | use futures_core::ready; |
| 8 | use futures_core::stream::{FusedStream, Stream}; |
| 9 | use futures_core::task::{Context, Poll}; |
| 10 | |
| 11 | use super::assert_stream; |
| 12 | use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture}; |
| 13 | |
| 14 | /// An unbounded set of streams |
| 15 | /// |
| 16 | /// This "combinator" provides the ability to maintain a set of streams |
| 17 | /// and drive them all to completion. |
| 18 | /// |
| 19 | /// Streams are pushed into this set and their realized values are |
| 20 | /// yielded as they become ready. Streams will only be polled when they |
| 21 | /// generate notifications. This allows to coordinate a large number of streams. |
| 22 | /// |
| 23 | /// Note that you can create a ready-made `SelectAll` via the |
| 24 | /// `select_all` function in the `stream` module, or you can start with an |
| 25 | /// empty set with the `SelectAll::new` constructor. |
| 26 | #[must_use = "streams do nothing unless polled" ] |
| 27 | pub struct SelectAll<St> { |
| 28 | inner: FuturesUnordered<StreamFuture<St>>, |
| 29 | } |
| 30 | |
| 31 | impl<St: Debug> Debug for SelectAll<St> { |
| 32 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 33 | write!(f, "SelectAll {{ ... }}" ) |
| 34 | } |
| 35 | } |
| 36 | |
| 37 | impl<St: Stream + Unpin> SelectAll<St> { |
| 38 | /// Constructs a new, empty `SelectAll` |
| 39 | /// |
| 40 | /// The returned `SelectAll` does not contain any streams and, in this |
| 41 | /// state, `SelectAll::poll` will return `Poll::Ready(None)`. |
| 42 | pub fn new() -> Self { |
| 43 | Self { inner: FuturesUnordered::new() } |
| 44 | } |
| 45 | |
| 46 | /// Returns the number of streams contained in the set. |
| 47 | /// |
| 48 | /// This represents the total number of in-flight streams. |
| 49 | pub fn len(&self) -> usize { |
| 50 | self.inner.len() |
| 51 | } |
| 52 | |
| 53 | /// Returns `true` if the set contains no streams |
| 54 | pub fn is_empty(&self) -> bool { |
| 55 | self.inner.is_empty() |
| 56 | } |
| 57 | |
| 58 | /// Push a stream into the set. |
| 59 | /// |
| 60 | /// This function submits the given stream to the set for managing. This |
| 61 | /// function will not call `poll` on the submitted stream. The caller must |
| 62 | /// ensure that `SelectAll::poll` is called in order to receive task |
| 63 | /// notifications. |
| 64 | pub fn push(&mut self, stream: St) { |
| 65 | self.inner.push(stream.into_future()); |
| 66 | } |
| 67 | |
| 68 | /// Returns an iterator that allows inspecting each stream in the set. |
| 69 | pub fn iter(&self) -> Iter<'_, St> { |
| 70 | Iter(self.inner.iter()) |
| 71 | } |
| 72 | |
| 73 | /// Returns an iterator that allows modifying each stream in the set. |
| 74 | pub fn iter_mut(&mut self) -> IterMut<'_, St> { |
| 75 | IterMut(self.inner.iter_mut()) |
| 76 | } |
| 77 | |
| 78 | /// Clears the set, removing all streams. |
| 79 | pub fn clear(&mut self) { |
| 80 | self.inner.clear() |
| 81 | } |
| 82 | } |
| 83 | |
| 84 | impl<St: Stream + Unpin> Default for SelectAll<St> { |
| 85 | fn default() -> Self { |
| 86 | Self::new() |
| 87 | } |
| 88 | } |
| 89 | |
| 90 | impl<St: Stream + Unpin> Stream for SelectAll<St> { |
| 91 | type Item = St::Item; |
| 92 | |
| 93 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 94 | loop { |
| 95 | match ready!(self.inner.poll_next_unpin(cx)) { |
| 96 | Some((Some(item: ::Item), remaining: St)) => { |
| 97 | self.push(stream:remaining); |
| 98 | return Poll::Ready(Some(item)); |
| 99 | } |
| 100 | Some((None, _)) => { |
| 101 | // `FuturesUnordered` thinks it isn't terminated |
| 102 | // because it yielded a Some. |
| 103 | // We do not return, but poll `FuturesUnordered` |
| 104 | // in the next loop iteration. |
| 105 | } |
| 106 | None => return Poll::Ready(None), |
| 107 | } |
| 108 | } |
| 109 | } |
| 110 | } |
| 111 | |
| 112 | impl<St: Stream + Unpin> FusedStream for SelectAll<St> { |
| 113 | fn is_terminated(&self) -> bool { |
| 114 | self.inner.is_terminated() |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | /// Convert a list of streams into a `Stream` of results from the streams. |
| 119 | /// |
| 120 | /// This essentially takes a list of streams (e.g. a vector, an iterator, etc.) |
| 121 | /// and bundles them together into a single stream. |
| 122 | /// The stream will yield items as they become available on the underlying |
| 123 | /// streams internally, in the order they become available. |
| 124 | /// |
| 125 | /// Note that the returned set can also be used to dynamically push more |
| 126 | /// streams into the set as they become available. |
| 127 | /// |
| 128 | /// This function is only available when the `std` or `alloc` feature of this |
| 129 | /// library is activated, and it is activated by default. |
| 130 | pub fn select_all<I>(streams: I) -> SelectAll<I::Item> |
| 131 | where |
| 132 | I: IntoIterator, |
| 133 | I::Item: Stream + Unpin, |
| 134 | { |
| 135 | let mut set: SelectAll = SelectAll::new(); |
| 136 | |
| 137 | for stream: impl Stream + Unpin in streams { |
| 138 | set.push(stream); |
| 139 | } |
| 140 | |
| 141 | assert_stream::<<I::Item as Stream>::Item, _>(set) |
| 142 | } |
| 143 | |
| 144 | impl<St: Stream + Unpin> FromIterator<St> for SelectAll<St> { |
| 145 | fn from_iter<T: IntoIterator<Item = St>>(iter: T) -> Self { |
| 146 | select_all(streams:iter) |
| 147 | } |
| 148 | } |
| 149 | |
| 150 | impl<St: Stream + Unpin> Extend<St> for SelectAll<St> { |
| 151 | fn extend<T: IntoIterator<Item = St>>(&mut self, iter: T) { |
| 152 | for st: St in iter { |
| 153 | self.push(stream:st) |
| 154 | } |
| 155 | } |
| 156 | } |
| 157 | |
| 158 | impl<St: Stream + Unpin> IntoIterator for SelectAll<St> { |
| 159 | type Item = St; |
| 160 | type IntoIter = IntoIter<St>; |
| 161 | |
| 162 | fn into_iter(self) -> Self::IntoIter { |
| 163 | IntoIter(self.inner.into_iter()) |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll<St> { |
| 168 | type Item = &'a St; |
| 169 | type IntoIter = Iter<'a, St>; |
| 170 | |
| 171 | fn into_iter(self) -> Self::IntoIter { |
| 172 | self.iter() |
| 173 | } |
| 174 | } |
| 175 | |
| 176 | impl<'a, St: Stream + Unpin> IntoIterator for &'a mut SelectAll<St> { |
| 177 | type Item = &'a mut St; |
| 178 | type IntoIter = IterMut<'a, St>; |
| 179 | |
| 180 | fn into_iter(self) -> Self::IntoIter { |
| 181 | self.iter_mut() |
| 182 | } |
| 183 | } |
| 184 | |
| 185 | /// Immutable iterator over all streams in the unordered set. |
| 186 | #[derive (Debug)] |
| 187 | pub struct Iter<'a, St: Unpin>(futures_unordered::Iter<'a, StreamFuture<St>>); |
| 188 | |
| 189 | /// Mutable iterator over all streams in the unordered set. |
| 190 | #[derive (Debug)] |
| 191 | pub struct IterMut<'a, St: Unpin>(futures_unordered::IterMut<'a, StreamFuture<St>>); |
| 192 | |
| 193 | /// Owned iterator over all streams in the unordered set. |
| 194 | #[derive (Debug)] |
| 195 | pub struct IntoIter<St: Unpin>(futures_unordered::IntoIter<StreamFuture<St>>); |
| 196 | |
| 197 | impl<'a, St: Stream + Unpin> Iterator for Iter<'a, St> { |
| 198 | type Item = &'a St; |
| 199 | |
| 200 | fn next(&mut self) -> Option<Self::Item> { |
| 201 | let st: &'a StreamFuture = self.0.next()?; |
| 202 | let next: Option<&St> = st.get_ref(); |
| 203 | // This should always be true because FuturesUnordered removes completed futures. |
| 204 | debug_assert!(next.is_some()); |
| 205 | next |
| 206 | } |
| 207 | |
| 208 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 209 | self.0.size_hint() |
| 210 | } |
| 211 | } |
| 212 | |
| 213 | impl<St: Stream + Unpin> ExactSizeIterator for Iter<'_, St> {} |
| 214 | |
| 215 | impl<'a, St: Stream + Unpin> Iterator for IterMut<'a, St> { |
| 216 | type Item = &'a mut St; |
| 217 | |
| 218 | fn next(&mut self) -> Option<Self::Item> { |
| 219 | let st: &'a mut StreamFuture = self.0.next()?; |
| 220 | let next: Option<&mut St> = st.get_mut(); |
| 221 | // This should always be true because FuturesUnordered removes completed futures. |
| 222 | debug_assert!(next.is_some()); |
| 223 | next |
| 224 | } |
| 225 | |
| 226 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 227 | self.0.size_hint() |
| 228 | } |
| 229 | } |
| 230 | |
| 231 | impl<St: Stream + Unpin> ExactSizeIterator for IterMut<'_, St> {} |
| 232 | |
| 233 | impl<St: Stream + Unpin> Iterator for IntoIter<St> { |
| 234 | type Item = St; |
| 235 | |
| 236 | fn next(&mut self) -> Option<Self::Item> { |
| 237 | let st: StreamFuture = self.0.next()?; |
| 238 | let next: Option = st.into_inner(); |
| 239 | // This should always be true because FuturesUnordered removes completed futures. |
| 240 | debug_assert!(next.is_some()); |
| 241 | next |
| 242 | } |
| 243 | |
| 244 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 245 | self.0.size_hint() |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | impl<St: Stream + Unpin> ExactSizeIterator for IntoIter<St> {} |
| 250 | |