1//! Streams
2//!
3//! This module contains a number of functions for working with `Stream`s,
4//! including the `StreamExt` trait which adds methods to `Stream` types.
5
6use crate::future::{assert_future, Either};
7use crate::stream::assert_stream;
8#[cfg(feature = "alloc")]
9use alloc::boxed::Box;
10#[cfg(feature = "alloc")]
11use alloc::vec::Vec;
12use core::pin::Pin;
13#[cfg(feature = "sink")]
14use futures_core::stream::TryStream;
15#[cfg(feature = "alloc")]
16use futures_core::stream::{BoxStream, LocalBoxStream};
17use futures_core::{
18 future::Future,
19 stream::{FusedStream, Stream},
20 task::{Context, Poll},
21};
22#[cfg(feature = "sink")]
23use futures_sink::Sink;
24
25use crate::fns::{inspect_fn, InspectFn};
26
27mod chain;
28#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
29pub use self::chain::Chain;
30
31mod collect;
32#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
33pub use self::collect::Collect;
34
35mod unzip;
36#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
37pub use self::unzip::Unzip;
38
39mod concat;
40#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
41pub use self::concat::Concat;
42
43mod count;
44#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
45pub use self::count::Count;
46
47mod cycle;
48#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
49pub use self::cycle::Cycle;
50
51mod enumerate;
52#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
53pub use self::enumerate::Enumerate;
54
55mod filter;
56#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
57pub use self::filter::Filter;
58
59mod filter_map;
60#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
61pub use self::filter_map::FilterMap;
62
63mod flatten;
64
65delegate_all!(
66 /// Stream for the [`flatten`](StreamExt::flatten) method.
67 Flatten<St>(
68 flatten::Flatten<St, St::Item>
69 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)]
70 where St: Stream
71);
72
73mod fold;
74#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
75pub use self::fold::Fold;
76
77mod any;
78#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
79pub use self::any::Any;
80
81mod all;
82#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
83pub use self::all::All;
84
85#[cfg(feature = "sink")]
86mod forward;
87
88#[cfg(feature = "sink")]
89delegate_all!(
90 /// Future for the [`forward`](super::StreamExt::forward) method.
91 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
92 Forward<St, Si>(
93 forward::Forward<St, Si, St::Ok>
94 ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)]
95 where St: TryStream
96);
97
98mod for_each;
99#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
100pub use self::for_each::ForEach;
101
102mod fuse;
103#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
104pub use self::fuse::Fuse;
105
106mod into_future;
107#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
108pub use self::into_future::StreamFuture;
109
110delegate_all!(
111 /// Stream for the [`inspect`](StreamExt::inspect) method.
112 Inspect<St, F>(
113 map::Map<St, InspectFn<F>>
114 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))]
115);
116
117mod map;
118#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
119pub use self::map::Map;
120
121delegate_all!(
122 /// Stream for the [`flat_map`](StreamExt::flat_map) method.
123 FlatMap<St, U, F>(
124 flatten::Flatten<Map<St, F>, U>
125 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))]
126);
127
128mod next;
129#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
130pub use self::next::Next;
131
132mod select_next_some;
133#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
134pub use self::select_next_some::SelectNextSome;
135
136mod peek;
137#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
138pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable};
139
140mod skip;
141#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
142pub use self::skip::Skip;
143
144mod skip_while;
145#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
146pub use self::skip_while::SkipWhile;
147
148mod take;
149#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
150pub use self::take::Take;
151
152mod take_while;
153#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
154pub use self::take_while::TakeWhile;
155
156mod take_until;
157#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
158pub use self::take_until::TakeUntil;
159
160mod then;
161#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
162pub use self::then::Then;
163
164mod zip;
165#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
166pub use self::zip::Zip;
167
168#[cfg(feature = "alloc")]
169mod chunks;
170#[cfg(feature = "alloc")]
171#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
172pub use self::chunks::Chunks;
173
174#[cfg(feature = "alloc")]
175mod ready_chunks;
176#[cfg(feature = "alloc")]
177#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
178pub use self::ready_chunks::ReadyChunks;
179
180mod scan;
181#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
182pub use self::scan::Scan;
183
184#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
185#[cfg(feature = "alloc")]
186mod buffer_unordered;
187#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
188#[cfg(feature = "alloc")]
189#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
190pub use self::buffer_unordered::BufferUnordered;
191
192#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
193#[cfg(feature = "alloc")]
194mod buffered;
195#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
196#[cfg(feature = "alloc")]
197#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
198pub use self::buffered::Buffered;
199
200#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
201#[cfg(feature = "alloc")]
202pub(crate) mod flatten_unordered;
203
204#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
205#[cfg(feature = "alloc")]
206#[allow(unreachable_pub)]
207pub use self::flatten_unordered::FlattenUnordered;
208
209#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
210#[cfg(feature = "alloc")]
211delegate_all!(
212 /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method.
213 FlatMapUnordered<St, U, F>(
214 FlattenUnordered<Map<St, F>>
215 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)]
216 where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
217);
218
219#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
220#[cfg(feature = "alloc")]
221mod for_each_concurrent;
222#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
223#[cfg(feature = "alloc")]
224#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
225pub use self::for_each_concurrent::ForEachConcurrent;
226
227#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
228#[cfg(feature = "sink")]
229#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
230#[cfg(feature = "alloc")]
231mod split;
232#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
233#[cfg(feature = "sink")]
234#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
235#[cfg(feature = "alloc")]
236#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
237pub use self::split::{ReuniteError, SplitSink, SplitStream};
238
239#[cfg(feature = "std")]
240mod catch_unwind;
241#[cfg(feature = "std")]
242#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
243pub use self::catch_unwind::CatchUnwind;
244
245impl<T: ?Sized> StreamExt for T where T: Stream {}
246
247/// An extension trait for `Stream`s that provides a variety of convenient
248/// combinator functions.
249pub trait StreamExt: Stream {
250 /// Creates a future that resolves to the next item in the stream.
251 ///
252 /// Note that because `next` doesn't take ownership over the stream,
253 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
254 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
255 /// be done by boxing the stream using [`Box::pin`] or
256 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
257 /// crate.
258 ///
259 /// # Examples
260 ///
261 /// ```
262 /// # futures::executor::block_on(async {
263 /// use futures::stream::{self, StreamExt};
264 ///
265 /// let mut stream = stream::iter(1..=3);
266 ///
267 /// assert_eq!(stream.next().await, Some(1));
268 /// assert_eq!(stream.next().await, Some(2));
269 /// assert_eq!(stream.next().await, Some(3));
270 /// assert_eq!(stream.next().await, None);
271 /// # });
272 /// ```
273 fn next(&mut self) -> Next<'_, Self>
274 where
275 Self: Unpin,
276 {
277 assert_future::<Option<Self::Item>, _>(Next::new(self))
278 }
279
280 /// Converts this stream into a future of `(next_item, tail_of_stream)`.
281 /// If the stream terminates, then the next item is [`None`].
282 ///
283 /// The returned future can be used to compose streams and futures together
284 /// by placing everything into the "world of futures".
285 ///
286 /// Note that because `into_future` moves the stream, the [`Stream`] type
287 /// must be [`Unpin`]. If you want to use `into_future` with a
288 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
289 /// be done by boxing the stream using [`Box::pin`] or
290 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
291 /// crate.
292 ///
293 /// # Examples
294 ///
295 /// ```
296 /// # futures::executor::block_on(async {
297 /// use futures::stream::{self, StreamExt};
298 ///
299 /// let stream = stream::iter(1..=3);
300 ///
301 /// let (item, stream) = stream.into_future().await;
302 /// assert_eq!(Some(1), item);
303 ///
304 /// let (item, stream) = stream.into_future().await;
305 /// assert_eq!(Some(2), item);
306 /// # });
307 /// ```
308 fn into_future(self) -> StreamFuture<Self>
309 where
310 Self: Sized + Unpin,
311 {
312 assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self))
313 }
314
315 /// Maps this stream's items to a different type, returning a new stream of
316 /// the resulting type.
317 ///
318 /// The provided closure is executed over all elements of this stream as
319 /// they are made available. It is executed inline with calls to
320 /// [`poll_next`](Stream::poll_next).
321 ///
322 /// Note that this function consumes the stream passed into it and returns a
323 /// wrapped version of it, similar to the existing `map` methods in the
324 /// standard library.
325 ///
326 /// See [`StreamExt::then`](Self::then) if you want to use a closure that
327 /// returns a future instead of a value.
328 ///
329 /// # Examples
330 ///
331 /// ```
332 /// # futures::executor::block_on(async {
333 /// use futures::stream::{self, StreamExt};
334 ///
335 /// let stream = stream::iter(1..=3);
336 /// let stream = stream.map(|x| x + 3);
337 ///
338 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
339 /// # });
340 /// ```
341 fn map<T, F>(self, f: F) -> Map<Self, F>
342 where
343 F: FnMut(Self::Item) -> T,
344 Self: Sized,
345 {
346 assert_stream::<T, _>(Map::new(self, f))
347 }
348
349 /// Creates a stream which gives the current iteration count as well as
350 /// the next value.
351 ///
352 /// The stream returned yields pairs `(i, val)`, where `i` is the
353 /// current index of iteration and `val` is the value returned by the
354 /// stream.
355 ///
356 /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a
357 /// different sized integer, the [`zip`](StreamExt::zip) function provides similar
358 /// functionality.
359 ///
360 /// # Overflow Behavior
361 ///
362 /// The method does no guarding against overflows, so enumerating more than
363 /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If
364 /// debug assertions are enabled, a panic is guaranteed.
365 ///
366 /// # Panics
367 ///
368 /// The returned stream might panic if the to-be-returned index would
369 /// overflow a [`usize`].
370 ///
371 /// # Examples
372 ///
373 /// ```
374 /// # futures::executor::block_on(async {
375 /// use futures::stream::{self, StreamExt};
376 ///
377 /// let stream = stream::iter(vec!['a', 'b', 'c']);
378 ///
379 /// let mut stream = stream.enumerate();
380 ///
381 /// assert_eq!(stream.next().await, Some((0, 'a')));
382 /// assert_eq!(stream.next().await, Some((1, 'b')));
383 /// assert_eq!(stream.next().await, Some((2, 'c')));
384 /// assert_eq!(stream.next().await, None);
385 /// # });
386 /// ```
387 fn enumerate(self) -> Enumerate<Self>
388 where
389 Self: Sized,
390 {
391 assert_stream::<(usize, Self::Item), _>(Enumerate::new(self))
392 }
393
394 /// Filters the values produced by this stream according to the provided
395 /// asynchronous predicate.
396 ///
397 /// As values of this stream are made available, the provided predicate `f`
398 /// will be run against them. If the predicate returns a `Future` which
399 /// resolves to `true`, then the stream will yield the value, but if the
400 /// predicate returns a `Future` which resolves to `false`, then the value
401 /// will be discarded and the next value will be produced.
402 ///
403 /// Note that this function consumes the stream passed into it and returns a
404 /// wrapped version of it, similar to the existing `filter` methods in the
405 /// standard library.
406 ///
407 /// # Examples
408 ///
409 /// ```
410 /// # futures::executor::block_on(async {
411 /// use futures::future;
412 /// use futures::stream::{self, StreamExt};
413 ///
414 /// let stream = stream::iter(1..=10);
415 /// let events = stream.filter(|x| future::ready(x % 2 == 0));
416 ///
417 /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await);
418 /// # });
419 /// ```
420 fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
421 where
422 F: FnMut(&Self::Item) -> Fut,
423 Fut: Future<Output = bool>,
424 Self: Sized,
425 {
426 assert_stream::<Self::Item, _>(Filter::new(self, f))
427 }
428
429 /// Filters the values produced by this stream while simultaneously mapping
430 /// them to a different type according to the provided asynchronous closure.
431 ///
432 /// As values of this stream are made available, the provided function will
433 /// be run on them. If the future returned by the predicate `f` resolves to
434 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
435 /// it resolves to [`None`] then the next value will be produced.
436 ///
437 /// Note that this function consumes the stream passed into it and returns a
438 /// wrapped version of it, similar to the existing `filter_map` methods in
439 /// the standard library.
440 ///
441 /// # Examples
442 /// ```
443 /// # futures::executor::block_on(async {
444 /// use futures::stream::{self, StreamExt};
445 ///
446 /// let stream = stream::iter(1..=10);
447 /// let events = stream.filter_map(|x| async move {
448 /// if x % 2 == 0 { Some(x + 1) } else { None }
449 /// });
450 ///
451 /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await);
452 /// # });
453 /// ```
454 fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
455 where
456 F: FnMut(Self::Item) -> Fut,
457 Fut: Future<Output = Option<T>>,
458 Self: Sized,
459 {
460 assert_stream::<T, _>(FilterMap::new(self, f))
461 }
462
463 /// Computes from this stream's items new items of a different type using
464 /// an asynchronous closure.
465 ///
466 /// The provided closure `f` will be called with an `Item` once a value is
467 /// ready, it returns a future which will then be run to completion
468 /// to produce the next value on this stream.
469 ///
470 /// Note that this function consumes the stream passed into it and returns a
471 /// wrapped version of it.
472 ///
473 /// See [`StreamExt::map`](Self::map) if you want to use a closure that
474 /// returns a value instead of a future.
475 ///
476 /// # Examples
477 ///
478 /// ```
479 /// # futures::executor::block_on(async {
480 /// use futures::stream::{self, StreamExt};
481 ///
482 /// let stream = stream::iter(1..=3);
483 /// let stream = stream.then(|x| async move { x + 3 });
484 ///
485 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
486 /// # });
487 /// ```
488 fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
489 where
490 F: FnMut(Self::Item) -> Fut,
491 Fut: Future,
492 Self: Sized,
493 {
494 assert_stream::<Fut::Output, _>(Then::new(self, f))
495 }
496
497 /// Transforms a stream into a collection, returning a
498 /// future representing the result of that computation.
499 ///
500 /// The returned future will be resolved when the stream terminates.
501 ///
502 /// # Examples
503 ///
504 /// ```
505 /// # futures::executor::block_on(async {
506 /// use futures::channel::mpsc;
507 /// use futures::stream::StreamExt;
508 /// use std::thread;
509 ///
510 /// let (tx, rx) = mpsc::unbounded();
511 ///
512 /// thread::spawn(move || {
513 /// for i in 1..=5 {
514 /// tx.unbounded_send(i).unwrap();
515 /// }
516 /// });
517 ///
518 /// let output = rx.collect::<Vec<i32>>().await;
519 /// assert_eq!(output, vec![1, 2, 3, 4, 5]);
520 /// # });
521 /// ```
522 fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C>
523 where
524 Self: Sized,
525 {
526 assert_future::<C, _>(Collect::new(self))
527 }
528
529 /// Converts a stream of pairs into a future, which
530 /// resolves to pair of containers.
531 ///
532 /// `unzip()` produces a future, which resolves to two
533 /// collections: one from the left elements of the pairs,
534 /// and one from the right elements.
535 ///
536 /// The returned future will be resolved when the stream terminates.
537 ///
538 /// # Examples
539 ///
540 /// ```
541 /// # futures::executor::block_on(async {
542 /// use futures::channel::mpsc;
543 /// use futures::stream::StreamExt;
544 /// use std::thread;
545 ///
546 /// let (tx, rx) = mpsc::unbounded();
547 ///
548 /// thread::spawn(move || {
549 /// tx.unbounded_send((1, 2)).unwrap();
550 /// tx.unbounded_send((3, 4)).unwrap();
551 /// tx.unbounded_send((5, 6)).unwrap();
552 /// });
553 ///
554 /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await;
555 /// assert_eq!(o1, vec![1, 3, 5]);
556 /// assert_eq!(o2, vec![2, 4, 6]);
557 /// # });
558 /// ```
559 fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
560 where
561 FromA: Default + Extend<A>,
562 FromB: Default + Extend<B>,
563 Self: Sized + Stream<Item = (A, B)>,
564 {
565 assert_future::<(FromA, FromB), _>(Unzip::new(self))
566 }
567
568 /// Concatenate all items of a stream into a single extendable
569 /// destination, returning a future representing the end result.
570 ///
571 /// This combinator will extend the first item with the contents
572 /// of all the subsequent results of the stream. If the stream is
573 /// empty, the default value will be returned.
574 ///
575 /// Works with all collections that implement the
576 /// [`Extend`](std::iter::Extend) trait.
577 ///
578 /// # Examples
579 ///
580 /// ```
581 /// # futures::executor::block_on(async {
582 /// use futures::channel::mpsc;
583 /// use futures::stream::StreamExt;
584 /// use std::thread;
585 ///
586 /// let (tx, rx) = mpsc::unbounded();
587 ///
588 /// thread::spawn(move || {
589 /// for i in (0..3).rev() {
590 /// let n = i * 3;
591 /// tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap();
592 /// }
593 /// });
594 ///
595 /// let result = rx.concat().await;
596 ///
597 /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
598 /// # });
599 /// ```
600 fn concat(self) -> Concat<Self>
601 where
602 Self: Sized,
603 Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
604 {
605 assert_future::<Self::Item, _>(Concat::new(self))
606 }
607
608 /// Drives the stream to completion, counting the number of items.
609 ///
610 /// # Overflow Behavior
611 ///
612 /// The method does no guarding against overflows, so counting elements of a
613 /// stream with more than [`usize::MAX`] elements either produces the wrong
614 /// result or panics. If debug assertions are enabled, a panic is guaranteed.
615 ///
616 /// # Panics
617 ///
618 /// This function might panic if the iterator has more than [`usize::MAX`]
619 /// elements.
620 ///
621 /// # Examples
622 ///
623 /// ```
624 /// # futures::executor::block_on(async {
625 /// use futures::stream::{self, StreamExt};
626 ///
627 /// let stream = stream::iter(1..=10);
628 /// let count = stream.count().await;
629 ///
630 /// assert_eq!(count, 10);
631 /// # });
632 /// ```
633 fn count(self) -> Count<Self>
634 where
635 Self: Sized,
636 {
637 assert_future::<usize, _>(Count::new(self))
638 }
639
640 /// Repeats a stream endlessly.
641 ///
642 /// The stream never terminates. Note that you likely want to avoid
643 /// usage of `collect` or such on the returned stream as it will exhaust
644 /// available memory as it tries to just fill up all RAM.
645 ///
646 /// # Examples
647 ///
648 /// ```
649 /// # futures::executor::block_on(async {
650 /// use futures::stream::{self, StreamExt};
651 /// let a = [1, 2, 3];
652 /// let mut s = stream::iter(a.iter()).cycle();
653 ///
654 /// assert_eq!(s.next().await, Some(&1));
655 /// assert_eq!(s.next().await, Some(&2));
656 /// assert_eq!(s.next().await, Some(&3));
657 /// assert_eq!(s.next().await, Some(&1));
658 /// assert_eq!(s.next().await, Some(&2));
659 /// assert_eq!(s.next().await, Some(&3));
660 /// assert_eq!(s.next().await, Some(&1));
661 /// # });
662 /// ```
663 fn cycle(self) -> Cycle<Self>
664 where
665 Self: Sized + Clone,
666 {
667 assert_stream::<Self::Item, _>(Cycle::new(self))
668 }
669
670 /// Execute an accumulating asynchronous computation over a stream,
671 /// collecting all the values into one final result.
672 ///
673 /// This combinator will accumulate all values returned by this stream
674 /// according to the closure provided. The initial state is also provided to
675 /// this method and then is returned again by each execution of the closure.
676 /// Once the entire stream has been exhausted the returned future will
677 /// resolve to this value.
678 ///
679 /// # Examples
680 ///
681 /// ```
682 /// # futures::executor::block_on(async {
683 /// use futures::stream::{self, StreamExt};
684 ///
685 /// let number_stream = stream::iter(0..6);
686 /// let sum = number_stream.fold(0, |acc, x| async move { acc + x });
687 /// assert_eq!(sum.await, 15);
688 /// # });
689 /// ```
690 fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
691 where
692 F: FnMut(T, Self::Item) -> Fut,
693 Fut: Future<Output = T>,
694 Self: Sized,
695 {
696 assert_future::<T, _>(Fold::new(self, f, init))
697 }
698
699 /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate.
700 ///
701 /// # Examples
702 ///
703 /// ```
704 /// # futures::executor::block_on(async {
705 /// use futures::stream::{self, StreamExt};
706 ///
707 /// let number_stream = stream::iter(0..10);
708 /// let contain_three = number_stream.any(|i| async move { i == 3 });
709 /// assert_eq!(contain_three.await, true);
710 /// # });
711 /// ```
712 fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
713 where
714 F: FnMut(Self::Item) -> Fut,
715 Fut: Future<Output = bool>,
716 Self: Sized,
717 {
718 assert_future::<bool, _>(Any::new(self, f))
719 }
720
721 /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate.
722 ///
723 /// # Examples
724 ///
725 /// ```
726 /// # futures::executor::block_on(async {
727 /// use futures::stream::{self, StreamExt};
728 ///
729 /// let number_stream = stream::iter(0..10);
730 /// let less_then_twenty = number_stream.all(|i| async move { i < 20 });
731 /// assert_eq!(less_then_twenty.await, true);
732 /// # });
733 /// ```
734 fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
735 where
736 F: FnMut(Self::Item) -> Fut,
737 Fut: Future<Output = bool>,
738 Self: Sized,
739 {
740 assert_future::<bool, _>(All::new(self, f))
741 }
742
743 /// Flattens a stream of streams into just one continuous stream.
744 ///
745 /// # Examples
746 ///
747 /// ```
748 /// # futures::executor::block_on(async {
749 /// use futures::channel::mpsc;
750 /// use futures::stream::StreamExt;
751 /// use std::thread;
752 ///
753 /// let (tx1, rx1) = mpsc::unbounded();
754 /// let (tx2, rx2) = mpsc::unbounded();
755 /// let (tx3, rx3) = mpsc::unbounded();
756 ///
757 /// thread::spawn(move || {
758 /// tx1.unbounded_send(1).unwrap();
759 /// tx1.unbounded_send(2).unwrap();
760 /// });
761 /// thread::spawn(move || {
762 /// tx2.unbounded_send(3).unwrap();
763 /// tx2.unbounded_send(4).unwrap();
764 /// });
765 /// thread::spawn(move || {
766 /// tx3.unbounded_send(rx1).unwrap();
767 /// tx3.unbounded_send(rx2).unwrap();
768 /// });
769 ///
770 /// let output = rx3.flatten().collect::<Vec<i32>>().await;
771 /// assert_eq!(output, vec![1, 2, 3, 4]);
772 /// # });
773 /// ```
774 fn flatten(self) -> Flatten<Self>
775 where
776 Self::Item: Stream,
777 Self: Sized,
778 {
779 assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
780 }
781
782 /// Flattens a stream of streams into just one continuous stream. Polls
783 /// inner streams produced by the base stream concurrently.
784 ///
785 /// The only argument is an optional limit on the number of concurrently
786 /// polled streams. If this limit is not `None`, no more than `limit` streams
787 /// will be polled at the same time. The `limit` argument is of type
788 /// `Into<Option<usize>>`, and so can be provided as either `None`,
789 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
790 /// no limit at all, and will have the same result as passing in `None`.
791 ///
792 /// # Examples
793 ///
794 /// ```
795 /// # futures::executor::block_on(async {
796 /// use futures::channel::mpsc;
797 /// use futures::stream::StreamExt;
798 /// use std::thread;
799 ///
800 /// let (tx1, rx1) = mpsc::unbounded();
801 /// let (tx2, rx2) = mpsc::unbounded();
802 /// let (tx3, rx3) = mpsc::unbounded();
803 ///
804 /// thread::spawn(move || {
805 /// tx1.unbounded_send(1).unwrap();
806 /// tx1.unbounded_send(2).unwrap();
807 /// });
808 /// thread::spawn(move || {
809 /// tx2.unbounded_send(3).unwrap();
810 /// tx2.unbounded_send(4).unwrap();
811 /// });
812 /// thread::spawn(move || {
813 /// tx3.unbounded_send(rx1).unwrap();
814 /// tx3.unbounded_send(rx2).unwrap();
815 /// });
816 ///
817 /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await;
818 /// output.sort();
819 ///
820 /// assert_eq!(output, vec![1, 2, 3, 4]);
821 /// # });
822 /// ```
823 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
824 #[cfg(feature = "alloc")]
825 fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
826 where
827 Self::Item: Stream + Unpin,
828 Self: Sized,
829 {
830 assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into()))
831 }
832
833 /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
834 ///
835 /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
836 /// you would have to chain combinators like `.map(f).flatten()` while this
837 /// combinator provides ability to write `.flat_map(f)` instead of chaining.
838 ///
839 /// The provided closure which produces inner streams is executed over all elements
840 /// of stream as last inner stream is terminated and next stream item is available.
841 ///
842 /// Note that this function consumes the stream passed into it and returns a
843 /// wrapped version of it, similar to the existing `flat_map` methods in the
844 /// standard library.
845 ///
846 /// # Examples
847 ///
848 /// ```
849 /// # futures::executor::block_on(async {
850 /// use futures::stream::{self, StreamExt};
851 ///
852 /// let stream = stream::iter(1..=3);
853 /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x]));
854 ///
855 /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await);
856 /// # });
857 /// ```
858 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
859 where
860 F: FnMut(Self::Item) -> U,
861 U: Stream,
862 Self: Sized,
863 {
864 assert_stream::<U::Item, _>(FlatMap::new(self, f))
865 }
866
867 /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s
868 /// and polls them concurrently, yielding items in any order, as they made
869 /// available.
870 ///
871 /// [`StreamExt::map`] is very useful, but if it produces `Stream`s
872 /// instead, and you need to poll all of them concurrently, you would
873 /// have to use something like `for_each_concurrent` and merge values
874 /// by hand. This combinator provides ability to collect all values
875 /// from concurrently polled streams into one stream.
876 ///
877 /// The first argument is an optional limit on the number of concurrently
878 /// polled streams. If this limit is not `None`, no more than `limit` streams
879 /// will be polled at the same time. The `limit` argument is of type
880 /// `Into<Option<usize>>`, and so can be provided as either `None`,
881 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
882 /// no limit at all, and will have the same result as passing in `None`.
883 ///
884 /// The provided closure which produces inner streams is executed over
885 /// all elements of stream as next stream item is available and limit
886 /// of concurrently processed streams isn't exceeded.
887 ///
888 /// Note that this function consumes the stream passed into it and
889 /// returns a wrapped version of it.
890 ///
891 /// # Examples
892 ///
893 /// ```
894 /// # futures::executor::block_on(async {
895 /// use futures::stream::{self, StreamExt};
896 ///
897 /// let stream = stream::iter(1..5);
898 /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x]));
899 /// let mut values = stream.collect::<Vec<_>>().await;
900 /// values.sort();
901 ///
902 /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values);
903 /// # });
904 /// ```
905 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
906 #[cfg(feature = "alloc")]
907 fn flat_map_unordered<U, F>(
908 self,
909 limit: impl Into<Option<usize>>,
910 f: F,
911 ) -> FlatMapUnordered<Self, U, F>
912 where
913 U: Stream + Unpin,
914 F: FnMut(Self::Item) -> U,
915 Self: Sized,
916 {
917 assert_stream::<U::Item, _>(FlatMapUnordered::new(self, limit.into(), f))
918 }
919
920 /// Combinator similar to [`StreamExt::fold`] that holds internal state
921 /// and produces a new stream.
922 ///
923 /// Accepts initial state and closure which will be applied to each element
924 /// of the stream until provided closure returns `None`. Once `None` is
925 /// returned, stream will be terminated.
926 ///
927 /// # Examples
928 ///
929 /// ```
930 /// # futures::executor::block_on(async {
931 /// use futures::future;
932 /// use futures::stream::{self, StreamExt};
933 ///
934 /// let stream = stream::iter(1..=10);
935 ///
936 /// let stream = stream.scan(0, |state, x| {
937 /// *state += x;
938 /// future::ready(if *state < 10 { Some(x) } else { None })
939 /// });
940 ///
941 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
942 /// # });
943 /// ```
944 fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
945 where
946 F: FnMut(&mut S, Self::Item) -> Fut,
947 Fut: Future<Output = Option<B>>,
948 Self: Sized,
949 {
950 assert_stream::<B, _>(Scan::new(self, initial_state, f))
951 }
952
953 /// Skip elements on this stream while the provided asynchronous predicate
954 /// resolves to `true`.
955 ///
956 /// This function, like `Iterator::skip_while`, will skip elements on the
957 /// stream until the predicate `f` resolves to `false`. Once one element
958 /// returns `false`, all future elements will be returned from the underlying
959 /// stream.
960 ///
961 /// # Examples
962 ///
963 /// ```
964 /// # futures::executor::block_on(async {
965 /// use futures::future;
966 /// use futures::stream::{self, StreamExt};
967 ///
968 /// let stream = stream::iter(1..=10);
969 ///
970 /// let stream = stream.skip_while(|x| future::ready(*x <= 5));
971 ///
972 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
973 /// # });
974 /// ```
975 fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
976 where
977 F: FnMut(&Self::Item) -> Fut,
978 Fut: Future<Output = bool>,
979 Self: Sized,
980 {
981 assert_stream::<Self::Item, _>(SkipWhile::new(self, f))
982 }
983
984 /// Take elements from this stream while the provided asynchronous predicate
985 /// resolves to `true`.
986 ///
987 /// This function, like `Iterator::take_while`, will take elements from the
988 /// stream until the predicate `f` resolves to `false`. Once one element
989 /// returns `false`, it will always return that the stream is done.
990 ///
991 /// # Examples
992 ///
993 /// ```
994 /// # futures::executor::block_on(async {
995 /// use futures::future;
996 /// use futures::stream::{self, StreamExt};
997 ///
998 /// let stream = stream::iter(1..=10);
999 ///
1000 /// let stream = stream.take_while(|x| future::ready(*x <= 5));
1001 ///
1002 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
1003 /// # });
1004 /// ```
1005 fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
1006 where
1007 F: FnMut(&Self::Item) -> Fut,
1008 Fut: Future<Output = bool>,
1009 Self: Sized,
1010 {
1011 assert_stream::<Self::Item, _>(TakeWhile::new(self, f))
1012 }
1013
1014 /// Take elements from this stream until the provided future resolves.
1015 ///
1016 /// This function will take elements from the stream until the provided
1017 /// stopping future `fut` resolves. Once the `fut` future becomes ready,
1018 /// this stream combinator will always return that the stream is done.
1019 ///
1020 /// The stopping future may return any type. Once the stream is stopped
1021 /// the result of the stopping future may be accessed with `TakeUntil::take_result()`.
1022 /// The stream may also be resumed with `TakeUntil::take_future()`.
1023 /// See the documentation of [`TakeUntil`] for more information.
1024 ///
1025 /// # Examples
1026 ///
1027 /// ```
1028 /// # futures::executor::block_on(async {
1029 /// use futures::future;
1030 /// use futures::stream::{self, StreamExt};
1031 /// use futures::task::Poll;
1032 ///
1033 /// let stream = stream::iter(1..=10);
1034 ///
1035 /// let mut i = 0;
1036 /// let stop_fut = future::poll_fn(|_cx| {
1037 /// i += 1;
1038 /// if i <= 5 {
1039 /// Poll::Pending
1040 /// } else {
1041 /// Poll::Ready(())
1042 /// }
1043 /// });
1044 ///
1045 /// let stream = stream.take_until(stop_fut);
1046 ///
1047 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
1048 /// # });
1049 /// ```
1050 fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
1051 where
1052 Fut: Future,
1053 Self: Sized,
1054 {
1055 assert_stream::<Self::Item, _>(TakeUntil::new(self, fut))
1056 }
1057
1058 /// Runs this stream to completion, executing the provided asynchronous
1059 /// closure for each element on the stream.
1060 ///
1061 /// The closure provided will be called for each item this stream produces,
1062 /// yielding a future. That future will then be executed to completion
1063 /// before moving on to the next item.
1064 ///
1065 /// The returned value is a `Future` where the `Output` type is `()`; it is
1066 /// executed entirely for its side effects.
1067 ///
1068 /// To process each item in the stream and produce another stream instead
1069 /// of a single future, use `then` instead.
1070 ///
1071 /// # Examples
1072 ///
1073 /// ```
1074 /// # futures::executor::block_on(async {
1075 /// use futures::future;
1076 /// use futures::stream::{self, StreamExt};
1077 ///
1078 /// let mut x = 0;
1079 ///
1080 /// {
1081 /// let fut = stream::repeat(1).take(3).for_each(|item| {
1082 /// x += item;
1083 /// future::ready(())
1084 /// });
1085 /// fut.await;
1086 /// }
1087 ///
1088 /// assert_eq!(x, 3);
1089 /// # });
1090 /// ```
1091 fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
1092 where
1093 F: FnMut(Self::Item) -> Fut,
1094 Fut: Future<Output = ()>,
1095 Self: Sized,
1096 {
1097 assert_future::<(), _>(ForEach::new(self, f))
1098 }
1099
1100 /// Runs this stream to completion, executing the provided asynchronous
1101 /// closure for each element on the stream concurrently as elements become
1102 /// available.
1103 ///
1104 /// This is similar to [`StreamExt::for_each`], but the futures
1105 /// produced by the closure are run concurrently (but not in parallel--
1106 /// this combinator does not introduce any threads).
1107 ///
1108 /// The closure provided will be called for each item this stream produces,
1109 /// yielding a future. That future will then be executed to completion
1110 /// concurrently with the other futures produced by the closure.
1111 ///
1112 /// The first argument is an optional limit on the number of concurrent
1113 /// futures. If this limit is not `None`, no more than `limit` futures
1114 /// will be run concurrently. The `limit` argument is of type
1115 /// `Into<Option<usize>>`, and so can be provided as either `None`,
1116 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
1117 /// no limit at all, and will have the same result as passing in `None`.
1118 ///
1119 /// This method is only available when the `std` or `alloc` feature of this
1120 /// library is activated, and it is activated by default.
1121 ///
1122 /// # Examples
1123 ///
1124 /// ```
1125 /// # futures::executor::block_on(async {
1126 /// use futures::channel::oneshot;
1127 /// use futures::stream::{self, StreamExt};
1128 ///
1129 /// let (tx1, rx1) = oneshot::channel();
1130 /// let (tx2, rx2) = oneshot::channel();
1131 /// let (tx3, rx3) = oneshot::channel();
1132 ///
1133 /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent(
1134 /// /* limit */ 2,
1135 /// |rx| async move {
1136 /// rx.await.unwrap();
1137 /// }
1138 /// );
1139 /// tx1.send(()).unwrap();
1140 /// tx2.send(()).unwrap();
1141 /// tx3.send(()).unwrap();
1142 /// fut.await;
1143 /// # })
1144 /// ```
1145 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1146 #[cfg(feature = "alloc")]
1147 fn for_each_concurrent<Fut, F>(
1148 self,
1149 limit: impl Into<Option<usize>>,
1150 f: F,
1151 ) -> ForEachConcurrent<Self, Fut, F>
1152 where
1153 F: FnMut(Self::Item) -> Fut,
1154 Fut: Future<Output = ()>,
1155 Self: Sized,
1156 {
1157 assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f))
1158 }
1159
1160 /// Creates a new stream of at most `n` items of the underlying stream.
1161 ///
1162 /// Once `n` items have been yielded from this stream then it will always
1163 /// return that the stream is done.
1164 ///
1165 /// # Examples
1166 ///
1167 /// ```
1168 /// # futures::executor::block_on(async {
1169 /// use futures::stream::{self, StreamExt};
1170 ///
1171 /// let stream = stream::iter(1..=10).take(3);
1172 ///
1173 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
1174 /// # });
1175 /// ```
1176 fn take(self, n: usize) -> Take<Self>
1177 where
1178 Self: Sized,
1179 {
1180 assert_stream::<Self::Item, _>(Take::new(self, n))
1181 }
1182
1183 /// Creates a new stream which skips `n` items of the underlying stream.
1184 ///
1185 /// Once `n` items have been skipped from this stream then it will always
1186 /// return the remaining items on this stream.
1187 ///
1188 /// # Examples
1189 ///
1190 /// ```
1191 /// # futures::executor::block_on(async {
1192 /// use futures::stream::{self, StreamExt};
1193 ///
1194 /// let stream = stream::iter(1..=10).skip(5);
1195 ///
1196 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
1197 /// # });
1198 /// ```
1199 fn skip(self, n: usize) -> Skip<Self>
1200 where
1201 Self: Sized,
1202 {
1203 assert_stream::<Self::Item, _>(Skip::new(self, n))
1204 }
1205
1206 /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never
1207 /// again be called once it has finished. This method can be used to turn
1208 /// any `Stream` into a `FusedStream`.
1209 ///
1210 /// Normally, once a stream has returned [`None`] from
1211 /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad
1212 /// behavior such as block forever, panic, never return, etc. If it is known
1213 /// that [`poll_next`](Stream::poll_next) may be called after stream
1214 /// has already finished, then this method can be used to ensure that it has
1215 /// defined semantics.
1216 ///
1217 /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream
1218 /// is guaranteed to return [`None`] after the underlying stream has
1219 /// finished.
1220 ///
1221 /// # Examples
1222 ///
1223 /// ```
1224 /// use futures::executor::block_on_stream;
1225 /// use futures::stream::{self, StreamExt};
1226 /// use futures::task::Poll;
1227 ///
1228 /// let mut x = 0;
1229 /// let stream = stream::poll_fn(|_| {
1230 /// x += 1;
1231 /// match x {
1232 /// 0..=2 => Poll::Ready(Some(x)),
1233 /// 3 => Poll::Ready(None),
1234 /// _ => panic!("should not happen")
1235 /// }
1236 /// }).fuse();
1237 ///
1238 /// let mut iter = block_on_stream(stream);
1239 /// assert_eq!(Some(1), iter.next());
1240 /// assert_eq!(Some(2), iter.next());
1241 /// assert_eq!(None, iter.next());
1242 /// assert_eq!(None, iter.next());
1243 /// // ...
1244 /// ```
1245 fn fuse(self) -> Fuse<Self>
1246 where
1247 Self: Sized,
1248 {
1249 assert_stream::<Self::Item, _>(Fuse::new(self))
1250 }
1251
1252 /// Borrows a stream, rather than consuming it.
1253 ///
1254 /// This is useful to allow applying stream adaptors while still retaining
1255 /// ownership of the original stream.
1256 ///
1257 /// # Examples
1258 ///
1259 /// ```
1260 /// # futures::executor::block_on(async {
1261 /// use futures::stream::{self, StreamExt};
1262 ///
1263 /// let mut stream = stream::iter(1..5);
1264 ///
1265 /// let sum = stream.by_ref()
1266 /// .take(2)
1267 /// .fold(0, |a, b| async move { a + b })
1268 /// .await;
1269 /// assert_eq!(sum, 3);
1270 ///
1271 /// // You can use the stream again
1272 /// let sum = stream.take(2)
1273 /// .fold(0, |a, b| async move { a + b })
1274 /// .await;
1275 /// assert_eq!(sum, 7);
1276 /// # });
1277 /// ```
1278 fn by_ref(&mut self) -> &mut Self {
1279 self
1280 }
1281
1282 /// Catches unwinding panics while polling the stream.
1283 ///
1284 /// Caught panic (if any) will be the last element of the resulting stream.
1285 ///
1286 /// In general, panics within a stream can propagate all the way out to the
1287 /// task level. This combinator makes it possible to halt unwinding within
1288 /// the stream itself. It's most commonly used within task executors. This
1289 /// method should not be used for error handling.
1290 ///
1291 /// Note that this method requires the `UnwindSafe` bound from the standard
1292 /// library. This isn't always applied automatically, and the standard
1293 /// library provides an `AssertUnwindSafe` wrapper type to apply it
1294 /// after-the fact. To assist using this method, the [`Stream`] trait is
1295 /// also implemented for `AssertUnwindSafe<St>` where `St` implements
1296 /// [`Stream`].
1297 ///
1298 /// This method is only available when the `std` feature of this
1299 /// library is activated, and it is activated by default.
1300 ///
1301 /// # Examples
1302 ///
1303 /// ```
1304 /// # futures::executor::block_on(async {
1305 /// use futures::stream::{self, StreamExt};
1306 ///
1307 /// let stream = stream::iter(vec![Some(10), None, Some(11)]);
1308 /// // Panic on second element
1309 /// let stream_panicking = stream.map(|o| o.unwrap());
1310 /// // Collect all the results
1311 /// let stream = stream_panicking.catch_unwind();
1312 ///
1313 /// let results: Vec<Result<i32, _>> = stream.collect().await;
1314 /// match results[0] {
1315 /// Ok(10) => {}
1316 /// _ => panic!("unexpected result!"),
1317 /// }
1318 /// assert!(results[1].is_err());
1319 /// assert_eq!(results.len(), 2);
1320 /// # });
1321 /// ```
1322 #[cfg(feature = "std")]
1323 fn catch_unwind(self) -> CatchUnwind<Self>
1324 where
1325 Self: Sized + std::panic::UnwindSafe,
1326 {
1327 assert_stream(CatchUnwind::new(self))
1328 }
1329
1330 /// Wrap the stream in a Box, pinning it.
1331 ///
1332 /// This method is only available when the `std` or `alloc` feature of this
1333 /// library is activated, and it is activated by default.
1334 #[cfg(feature = "alloc")]
1335 fn boxed<'a>(self) -> BoxStream<'a, Self::Item>
1336 where
1337 Self: Sized + Send + 'a,
1338 {
1339 assert_stream::<Self::Item, _>(Box::pin(self))
1340 }
1341
1342 /// Wrap the stream in a Box, pinning it.
1343 ///
1344 /// Similar to `boxed`, but without the `Send` requirement.
1345 ///
1346 /// This method is only available when the `std` or `alloc` feature of this
1347 /// library is activated, and it is activated by default.
1348 #[cfg(feature = "alloc")]
1349 fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item>
1350 where
1351 Self: Sized + 'a,
1352 {
1353 assert_stream::<Self::Item, _>(Box::pin(self))
1354 }
1355
1356 /// An adaptor for creating a buffered list of pending futures.
1357 ///
1358 /// If this stream's item can be converted into a future, then this adaptor
1359 /// will buffer up to at most `n` futures and then return the outputs in the
1360 /// same order as the underlying stream. No more than `n` futures will be
1361 /// buffered at any point in time, and less than `n` may also be buffered
1362 /// depending on the state of each future.
1363 ///
1364 /// The returned stream will be a stream of each future's output.
1365 ///
1366 /// This method is only available when the `std` or `alloc` feature of this
1367 /// library is activated, and it is activated by default.
1368 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1369 #[cfg(feature = "alloc")]
1370 fn buffered(self, n: usize) -> Buffered<Self>
1371 where
1372 Self::Item: Future,
1373 Self: Sized,
1374 {
1375 assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n))
1376 }
1377
1378 /// An adaptor for creating a buffered list of pending futures (unordered).
1379 ///
1380 /// If this stream's item can be converted into a future, then this adaptor
1381 /// will buffer up to `n` futures and then return the outputs in the order
1382 /// in which they complete. No more than `n` futures will be buffered at
1383 /// any point in time, and less than `n` may also be buffered depending on
1384 /// the state of each future.
1385 ///
1386 /// The returned stream will be a stream of each future's output.
1387 ///
1388 /// This method is only available when the `std` or `alloc` feature of this
1389 /// library is activated, and it is activated by default.
1390 ///
1391 /// # Examples
1392 ///
1393 /// ```
1394 /// # futures::executor::block_on(async {
1395 /// use futures::channel::oneshot;
1396 /// use futures::stream::{self, StreamExt};
1397 ///
1398 /// let (send_one, recv_one) = oneshot::channel();
1399 /// let (send_two, recv_two) = oneshot::channel();
1400 ///
1401 /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
1402 /// let mut buffered = stream_of_futures.buffer_unordered(10);
1403 ///
1404 /// send_two.send(2i32)?;
1405 /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1406 ///
1407 /// send_one.send(1i32)?;
1408 /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1409 ///
1410 /// assert_eq!(buffered.next().await, None);
1411 /// # Ok::<(), i32>(()) }).unwrap();
1412 /// ```
1413 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1414 #[cfg(feature = "alloc")]
1415 fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
1416 where
1417 Self::Item: Future,
1418 Self: Sized,
1419 {
1420 assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n))
1421 }
1422
1423 /// An adapter for zipping two streams together.
1424 ///
1425 /// The zipped stream waits for both streams to produce an item, and then
1426 /// returns that pair. If either stream ends then the zipped stream will
1427 /// also end.
1428 ///
1429 /// # Examples
1430 ///
1431 /// ```
1432 /// # futures::executor::block_on(async {
1433 /// use futures::stream::{self, StreamExt};
1434 ///
1435 /// let stream1 = stream::iter(1..=3);
1436 /// let stream2 = stream::iter(5..=10);
1437 ///
1438 /// let vec = stream1.zip(stream2)
1439 /// .collect::<Vec<_>>()
1440 /// .await;
1441 /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec);
1442 /// # });
1443 /// ```
1444 ///
1445 fn zip<St>(self, other: St) -> Zip<Self, St>
1446 where
1447 St: Stream,
1448 Self: Sized,
1449 {
1450 assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other))
1451 }
1452
1453 /// Adapter for chaining two streams.
1454 ///
1455 /// The resulting stream emits elements from the first stream, and when
1456 /// first stream reaches the end, emits the elements from the second stream.
1457 ///
1458 /// ```
1459 /// # futures::executor::block_on(async {
1460 /// use futures::stream::{self, StreamExt};
1461 ///
1462 /// let stream1 = stream::iter(vec![Ok(10), Err(false)]);
1463 /// let stream2 = stream::iter(vec![Err(true), Ok(20)]);
1464 ///
1465 /// let stream = stream1.chain(stream2);
1466 ///
1467 /// let result: Vec<_> = stream.collect().await;
1468 /// assert_eq!(result, vec![
1469 /// Ok(10),
1470 /// Err(false),
1471 /// Err(true),
1472 /// Ok(20),
1473 /// ]);
1474 /// # });
1475 /// ```
1476 fn chain<St>(self, other: St) -> Chain<Self, St>
1477 where
1478 St: Stream<Item = Self::Item>,
1479 Self: Sized,
1480 {
1481 assert_stream::<Self::Item, _>(Chain::new(self, other))
1482 }
1483
1484 /// Creates a new stream which exposes a `peek` method.
1485 ///
1486 /// Calling `peek` returns a reference to the next item in the stream.
1487 fn peekable(self) -> Peekable<Self>
1488 where
1489 Self: Sized,
1490 {
1491 assert_stream::<Self::Item, _>(Peekable::new(self))
1492 }
1493
1494 /// An adaptor for chunking up items of the stream inside a vector.
1495 ///
1496 /// This combinator will attempt to pull items from this stream and buffer
1497 /// them into a local vector. At most `capacity` items will get buffered
1498 /// before they're yielded from the returned stream.
1499 ///
1500 /// Note that the vectors returned from this iterator may not always have
1501 /// `capacity` elements. If the underlying stream ended and only a partial
1502 /// vector was created, it'll be returned. Additionally if an error happens
1503 /// from the underlying stream then the currently buffered items will be
1504 /// yielded.
1505 ///
1506 /// This method is only available when the `std` or `alloc` feature of this
1507 /// library is activated, and it is activated by default.
1508 ///
1509 /// # Panics
1510 ///
1511 /// This method will panic if `capacity` is zero.
1512 #[cfg(feature = "alloc")]
1513 fn chunks(self, capacity: usize) -> Chunks<Self>
1514 where
1515 Self: Sized,
1516 {
1517 assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity))
1518 }
1519
1520 /// An adaptor for chunking up ready items of the stream inside a vector.
1521 ///
1522 /// This combinator will attempt to pull ready items from this stream and
1523 /// buffer them into a local vector. At most `capacity` items will get
1524 /// buffered before they're yielded from the returned stream. If underlying
1525 /// stream returns `Poll::Pending`, and collected chunk is not empty, it will
1526 /// be immediately returned.
1527 ///
1528 /// If the underlying stream ended and only a partial vector was created,
1529 /// it will be returned.
1530 ///
1531 /// This method is only available when the `std` or `alloc` feature of this
1532 /// library is activated, and it is activated by default.
1533 ///
1534 /// # Panics
1535 ///
1536 /// This method will panic if `capacity` is zero.
1537 #[cfg(feature = "alloc")]
1538 fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
1539 where
1540 Self: Sized,
1541 {
1542 assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity))
1543 }
1544
1545 /// A future that completes after the given stream has been fully processed
1546 /// into the sink and the sink has been flushed and closed.
1547 ///
1548 /// This future will drive the stream to keep producing items until it is
1549 /// exhausted, sending each item to the sink. It will complete once the
1550 /// stream is exhausted, the sink has received and flushed all items, and
1551 /// the sink is closed. Note that neither the original stream nor provided
1552 /// sink will be output by this future. Pass the sink by `Pin<&mut S>`
1553 /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in
1554 /// order to preserve access to the `Sink`. If the stream produces an error,
1555 /// that error will be returned by this future without flushing/closing the sink.
1556 #[cfg(feature = "sink")]
1557 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
1558 fn forward<S>(self, sink: S) -> Forward<Self, S>
1559 where
1560 S: Sink<Self::Ok, Error = Self::Error>,
1561 Self: TryStream + Sized,
1562 // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>,
1563 {
1564 // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>`
1565 // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink))
1566 Forward::new(self, sink)
1567 }
1568
1569 /// Splits this `Stream + Sink` object into separate `Sink` and `Stream`
1570 /// objects.
1571 ///
1572 /// This can be useful when you want to split ownership between tasks, or
1573 /// allow direct interaction between the two objects (e.g. via
1574 /// `Sink::send_all`).
1575 ///
1576 /// This method is only available when the `std` or `alloc` feature of this
1577 /// library is activated, and it is activated by default.
1578 #[cfg(feature = "sink")]
1579 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
1580 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1581 #[cfg(feature = "alloc")]
1582 fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
1583 where
1584 Self: Sink<Item> + Sized,
1585 {
1586 let (sink, stream) = split::split(self);
1587 (
1588 crate::sink::assert_sink::<Item, Self::Error, _>(sink),
1589 assert_stream::<Self::Item, _>(stream),
1590 )
1591 }
1592
1593 /// Do something with each item of this stream, afterwards passing it on.
1594 ///
1595 /// This is similar to the `Iterator::inspect` method in the standard
1596 /// library where it allows easily inspecting each value as it passes
1597 /// through the stream, for example to debug what's going on.
1598 fn inspect<F>(self, f: F) -> Inspect<Self, F>
1599 where
1600 F: FnMut(&Self::Item),
1601 Self: Sized,
1602 {
1603 assert_stream::<Self::Item, _>(Inspect::new(self, f))
1604 }
1605
1606 /// Wrap this stream in an `Either` stream, making it the left-hand variant
1607 /// of that `Either`.
1608 ///
1609 /// This can be used in combination with the `right_stream` method to write `if`
1610 /// statements that evaluate to different streams in different branches.
1611 fn left_stream<B>(self) -> Either<Self, B>
1612 where
1613 B: Stream<Item = Self::Item>,
1614 Self: Sized,
1615 {
1616 assert_stream::<Self::Item, _>(Either::Left(self))
1617 }
1618
1619 /// Wrap this stream in an `Either` stream, making it the right-hand variant
1620 /// of that `Either`.
1621 ///
1622 /// This can be used in combination with the `left_stream` method to write `if`
1623 /// statements that evaluate to different streams in different branches.
1624 fn right_stream<B>(self) -> Either<B, Self>
1625 where
1626 B: Stream<Item = Self::Item>,
1627 Self: Sized,
1628 {
1629 assert_stream::<Self::Item, _>(Either::Right(self))
1630 }
1631
1632 /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`]
1633 /// stream types.
1634 fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
1635 where
1636 Self: Unpin,
1637 {
1638 Pin::new(self).poll_next(cx)
1639 }
1640
1641 /// Returns a [`Future`] that resolves when the next item in this stream is
1642 /// ready.
1643 ///
1644 /// This is similar to the [`next`][StreamExt::next] method, but it won't
1645 /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the
1646 /// returned future type will return `true` from
1647 /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing
1648 /// [`select_next_some`][StreamExt::select_next_some] to be easily used with
1649 /// the [`select!`] macro.
1650 ///
1651 /// If the future is polled after this [`Stream`] is empty it will panic.
1652 /// Using the future with a [`FusedFuture`][]-aware primitive like the
1653 /// [`select!`] macro will prevent this.
1654 ///
1655 /// [`FusedFuture`]: futures_core::future::FusedFuture
1656 /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated
1657 ///
1658 /// # Examples
1659 ///
1660 /// ```
1661 /// # futures::executor::block_on(async {
1662 /// use futures::{future, select};
1663 /// use futures::stream::{StreamExt, FuturesUnordered};
1664 ///
1665 /// let mut fut = future::ready(1);
1666 /// let mut async_tasks = FuturesUnordered::new();
1667 /// let mut total = 0;
1668 /// loop {
1669 /// select! {
1670 /// num = fut => {
1671 /// // First, the `ready` future completes.
1672 /// total += num;
1673 /// // Then we spawn a new task onto `async_tasks`,
1674 /// async_tasks.push(async { 5 });
1675 /// },
1676 /// // On the next iteration of the loop, the task we spawned
1677 /// // completes.
1678 /// num = async_tasks.select_next_some() => {
1679 /// total += num;
1680 /// }
1681 /// // Finally, both the `ready` future and `async_tasks` have
1682 /// // finished, so we enter the `complete` branch.
1683 /// complete => break,
1684 /// }
1685 /// }
1686 /// assert_eq!(total, 6);
1687 /// # });
1688 /// ```
1689 ///
1690 /// [`select!`]: crate::select
1691 fn select_next_some(&mut self) -> SelectNextSome<'_, Self>
1692 where
1693 Self: Unpin + FusedStream,
1694 {
1695 assert_future::<Self::Item, _>(SelectNextSome::new(self))
1696 }
1697}
1698