1use core::future::Future;
2use futures_core::Stream;
3
4mod all;
5use all::AllFuture;
6
7mod any;
8use any::AnyFuture;
9
10mod chain;
11use chain::Chain;
12
13pub(crate) mod collect;
14use collect::{Collect, FromStream};
15
16mod filter;
17use filter::Filter;
18
19mod filter_map;
20use filter_map::FilterMap;
21
22mod fold;
23use fold::FoldFuture;
24
25mod fuse;
26use fuse::Fuse;
27
28mod map;
29use map::Map;
30
31mod map_while;
32use map_while::MapWhile;
33
34mod merge;
35use merge::Merge;
36
37mod next;
38use next::Next;
39
40mod skip;
41use skip::Skip;
42
43mod skip_while;
44use skip_while::SkipWhile;
45
46mod take;
47use take::Take;
48
49mod take_while;
50use take_while::TakeWhile;
51
52mod then;
53use then::Then;
54
55mod try_next;
56use try_next::TryNext;
57
58mod peekable;
59use peekable::Peekable;
60
61cfg_time! {
62 pub(crate) mod timeout;
63 pub(crate) mod timeout_repeating;
64 use timeout::Timeout;
65 use timeout_repeating::TimeoutRepeating;
66 use tokio::time::{Duration, Interval};
67 mod throttle;
68 use throttle::{throttle, Throttle};
69 mod chunks_timeout;
70 use chunks_timeout::ChunksTimeout;
71}
72
73/// An extension trait for the [`Stream`] trait that provides a variety of
74/// convenient combinator functions.
75///
76/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
77/// in the [futures] crate, however both Tokio and futures provide separate
78/// `StreamExt` utility traits, and some utilities are only available on one of
79/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
80/// trait in the futures crate.
81///
82/// If you need utilities from both `StreamExt` traits, you should prefer to
83/// import one of them, and use the other through the fully qualified call
84/// syntax. For example:
85/// ```
86/// // import one of the traits:
87/// use futures::stream::StreamExt;
88/// # #[tokio::main(flavor = "current_thread")]
89/// # async fn main() {
90///
91/// let a = tokio_stream::iter(vec![1, 3, 5]);
92/// let b = tokio_stream::iter(vec![2, 4, 6]);
93///
94/// // use the fully qualified call syntax for the other trait:
95/// let merged = tokio_stream::StreamExt::merge(a, b);
96///
97/// // use normal call notation for futures::stream::StreamExt::collect
98/// let output: Vec<_> = merged.collect().await;
99/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
100/// # }
101/// ```
102///
103/// [`Stream`]: crate::Stream
104/// [futures]: https://docs.rs/futures
105/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
106pub trait StreamExt: Stream {
107 /// Consumes and returns the next value in the stream or `None` if the
108 /// stream is finished.
109 ///
110 /// Equivalent to:
111 ///
112 /// ```ignore
113 /// async fn next(&mut self) -> Option<Self::Item>;
114 /// ```
115 ///
116 /// Note that because `next` doesn't take ownership over the stream,
117 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
118 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
119 /// be done by boxing the stream using [`Box::pin`] or
120 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
121 /// crate.
122 ///
123 /// # Cancel safety
124 ///
125 /// This method is cancel safe. The returned future only
126 /// holds onto a reference to the underlying stream,
127 /// so dropping it will never lose a value.
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// # #[tokio::main]
133 /// # async fn main() {
134 /// use tokio_stream::{self as stream, StreamExt};
135 ///
136 /// let mut stream = stream::iter(1..=3);
137 ///
138 /// assert_eq!(stream.next().await, Some(1));
139 /// assert_eq!(stream.next().await, Some(2));
140 /// assert_eq!(stream.next().await, Some(3));
141 /// assert_eq!(stream.next().await, None);
142 /// # }
143 /// ```
144 fn next(&mut self) -> Next<'_, Self>
145 where
146 Self: Unpin,
147 {
148 Next::new(self)
149 }
150
151 /// Consumes and returns the next item in the stream. If an error is
152 /// encountered before the next item, the error is returned instead.
153 ///
154 /// Equivalent to:
155 ///
156 /// ```ignore
157 /// async fn try_next(&mut self) -> Result<Option<T>, E>;
158 /// ```
159 ///
160 /// This is similar to the [`next`](StreamExt::next) combinator,
161 /// but returns a [`Result<Option<T>, E>`](Result) rather than
162 /// an [`Option<Result<T, E>>`](Option), making for easy use
163 /// with the [`?`](std::ops::Try) operator.
164 ///
165 /// # Cancel safety
166 ///
167 /// This method is cancel safe. The returned future only
168 /// holds onto a reference to the underlying stream,
169 /// so dropping it will never lose a value.
170 ///
171 /// # Examples
172 ///
173 /// ```
174 /// # #[tokio::main]
175 /// # async fn main() {
176 /// use tokio_stream::{self as stream, StreamExt};
177 ///
178 /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
179 ///
180 /// assert_eq!(stream.try_next().await, Ok(Some(1)));
181 /// assert_eq!(stream.try_next().await, Ok(Some(2)));
182 /// assert_eq!(stream.try_next().await, Err("nope"));
183 /// # }
184 /// ```
185 fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
186 where
187 Self: Stream<Item = Result<T, E>> + Unpin,
188 {
189 TryNext::new(self)
190 }
191
192 /// Maps this stream's items to a different type, returning a new stream of
193 /// the resulting type.
194 ///
195 /// The provided closure is executed over all elements of this stream as
196 /// they are made available. It is executed inline with calls to
197 /// [`poll_next`](Stream::poll_next).
198 ///
199 /// Note that this function consumes the stream passed into it and returns a
200 /// wrapped version of it, similar to the existing `map` methods in the
201 /// standard library.
202 ///
203 /// # Examples
204 ///
205 /// ```
206 /// # #[tokio::main]
207 /// # async fn main() {
208 /// use tokio_stream::{self as stream, StreamExt};
209 ///
210 /// let stream = stream::iter(1..=3);
211 /// let mut stream = stream.map(|x| x + 3);
212 ///
213 /// assert_eq!(stream.next().await, Some(4));
214 /// assert_eq!(stream.next().await, Some(5));
215 /// assert_eq!(stream.next().await, Some(6));
216 /// # }
217 /// ```
218 fn map<T, F>(self, f: F) -> Map<Self, F>
219 where
220 F: FnMut(Self::Item) -> T,
221 Self: Sized,
222 {
223 Map::new(self, f)
224 }
225
226 /// Map this stream's items to a different type for as long as determined by
227 /// the provided closure. A stream of the target type will be returned,
228 /// which will yield elements until the closure returns `None`.
229 ///
230 /// The provided closure is executed over all elements of this stream as
231 /// they are made available, until it returns `None`. It is executed inline
232 /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
233 /// the underlying stream will not be polled again.
234 ///
235 /// Note that this function consumes the stream passed into it and returns a
236 /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
237 /// standard library.
238 ///
239 /// # Examples
240 ///
241 /// ```
242 /// # #[tokio::main]
243 /// # async fn main() {
244 /// use tokio_stream::{self as stream, StreamExt};
245 ///
246 /// let stream = stream::iter(1..=10);
247 /// let mut stream = stream.map_while(|x| {
248 /// if x < 4 {
249 /// Some(x + 3)
250 /// } else {
251 /// None
252 /// }
253 /// });
254 /// assert_eq!(stream.next().await, Some(4));
255 /// assert_eq!(stream.next().await, Some(5));
256 /// assert_eq!(stream.next().await, Some(6));
257 /// assert_eq!(stream.next().await, None);
258 /// # }
259 /// ```
260 fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
261 where
262 F: FnMut(Self::Item) -> Option<T>,
263 Self: Sized,
264 {
265 MapWhile::new(self, f)
266 }
267
268 /// Maps this stream's items asynchronously to a different type, returning a
269 /// new stream of the resulting type.
270 ///
271 /// The provided closure is executed over all elements of this stream as
272 /// they are made available, and the returned future is executed. Only one
273 /// future is executed at the time.
274 ///
275 /// Note that this function consumes the stream passed into it and returns a
276 /// wrapped version of it, similar to the existing `then` methods in the
277 /// standard library.
278 ///
279 /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
280 /// returned by this method. To handle this, you can use `tokio::pin!` as in
281 /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
282 ///
283 /// # Examples
284 ///
285 /// ```
286 /// # #[tokio::main]
287 /// # async fn main() {
288 /// use tokio_stream::{self as stream, StreamExt};
289 ///
290 /// async fn do_async_work(value: i32) -> i32 {
291 /// value + 3
292 /// }
293 ///
294 /// let stream = stream::iter(1..=3);
295 /// let stream = stream.then(do_async_work);
296 ///
297 /// tokio::pin!(stream);
298 ///
299 /// assert_eq!(stream.next().await, Some(4));
300 /// assert_eq!(stream.next().await, Some(5));
301 /// assert_eq!(stream.next().await, Some(6));
302 /// # }
303 /// ```
304 fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
305 where
306 F: FnMut(Self::Item) -> Fut,
307 Fut: Future,
308 Self: Sized,
309 {
310 Then::new(self, f)
311 }
312
313 /// Combine two streams into one by interleaving the output of both as it
314 /// is produced.
315 ///
316 /// Values are produced from the merged stream in the order they arrive from
317 /// the two source streams. If both source streams provide values
318 /// simultaneously, the merge stream alternates between them. This provides
319 /// some level of fairness. You should not chain calls to `merge`, as this
320 /// will break the fairness of the merging.
321 ///
322 /// The merged stream completes once **both** source streams complete. When
323 /// one source stream completes before the other, the merge stream
324 /// exclusively polls the remaining stream.
325 ///
326 /// For merging multiple streams, consider using [`StreamMap`] instead.
327 ///
328 /// [`StreamMap`]: crate::StreamMap
329 ///
330 /// # Examples
331 ///
332 /// ```
333 /// use tokio_stream::{StreamExt, Stream};
334 /// use tokio::sync::mpsc;
335 /// use tokio::time;
336 ///
337 /// use std::time::Duration;
338 /// use std::pin::Pin;
339 ///
340 /// # /*
341 /// #[tokio::main]
342 /// # */
343 /// # #[tokio::main(flavor = "current_thread")]
344 /// async fn main() {
345 /// # time::pause();
346 /// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
347 /// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
348 ///
349 /// // Convert the channels to a `Stream`.
350 /// let rx1 = Box::pin(async_stream::stream! {
351 /// while let Some(item) = rx1.recv().await {
352 /// yield item;
353 /// }
354 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
355 ///
356 /// let rx2 = Box::pin(async_stream::stream! {
357 /// while let Some(item) = rx2.recv().await {
358 /// yield item;
359 /// }
360 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
361 ///
362 /// let mut rx = rx1.merge(rx2);
363 ///
364 /// tokio::spawn(async move {
365 /// // Send some values immediately
366 /// tx1.send(1).await.unwrap();
367 /// tx1.send(2).await.unwrap();
368 ///
369 /// // Let the other task send values
370 /// time::sleep(Duration::from_millis(20)).await;
371 ///
372 /// tx1.send(4).await.unwrap();
373 /// });
374 ///
375 /// tokio::spawn(async move {
376 /// // Wait for the first task to send values
377 /// time::sleep(Duration::from_millis(5)).await;
378 ///
379 /// tx2.send(3).await.unwrap();
380 ///
381 /// time::sleep(Duration::from_millis(25)).await;
382 ///
383 /// // Send the final value
384 /// tx2.send(5).await.unwrap();
385 /// });
386 ///
387 /// assert_eq!(1, rx.next().await.unwrap());
388 /// assert_eq!(2, rx.next().await.unwrap());
389 /// assert_eq!(3, rx.next().await.unwrap());
390 /// assert_eq!(4, rx.next().await.unwrap());
391 /// assert_eq!(5, rx.next().await.unwrap());
392 ///
393 /// // The merged stream is consumed
394 /// assert!(rx.next().await.is_none());
395 /// }
396 /// ```
397 fn merge<U>(self, other: U) -> Merge<Self, U>
398 where
399 U: Stream<Item = Self::Item>,
400 Self: Sized,
401 {
402 Merge::new(self, other)
403 }
404
405 /// Filters the values produced by this stream according to the provided
406 /// predicate.
407 ///
408 /// As values of this stream are made available, the provided predicate `f`
409 /// will be run against them. If the predicate
410 /// resolves to `true`, then the stream will yield the value, but if the
411 /// predicate resolves to `false`, then the value
412 /// will be discarded and the next value will be produced.
413 ///
414 /// Note that this function consumes the stream passed into it and returns a
415 /// wrapped version of it, similar to [`Iterator::filter`] method in the
416 /// standard library.
417 ///
418 /// # Examples
419 ///
420 /// ```
421 /// # #[tokio::main]
422 /// # async fn main() {
423 /// use tokio_stream::{self as stream, StreamExt};
424 ///
425 /// let stream = stream::iter(1..=8);
426 /// let mut evens = stream.filter(|x| x % 2 == 0);
427 ///
428 /// assert_eq!(Some(2), evens.next().await);
429 /// assert_eq!(Some(4), evens.next().await);
430 /// assert_eq!(Some(6), evens.next().await);
431 /// assert_eq!(Some(8), evens.next().await);
432 /// assert_eq!(None, evens.next().await);
433 /// # }
434 /// ```
435 fn filter<F>(self, f: F) -> Filter<Self, F>
436 where
437 F: FnMut(&Self::Item) -> bool,
438 Self: Sized,
439 {
440 Filter::new(self, f)
441 }
442
443 /// Filters the values produced by this stream while simultaneously mapping
444 /// them to a different type according to the provided closure.
445 ///
446 /// As values of this stream are made available, the provided function will
447 /// be run on them. If the predicate `f` resolves to
448 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
449 /// it resolves to [`None`], then the value will be skipped.
450 ///
451 /// Note that this function consumes the stream passed into it and returns a
452 /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
453 /// standard library.
454 ///
455 /// # Examples
456 /// ```
457 /// # #[tokio::main]
458 /// # async fn main() {
459 /// use tokio_stream::{self as stream, StreamExt};
460 ///
461 /// let stream = stream::iter(1..=8);
462 /// let mut evens = stream.filter_map(|x| {
463 /// if x % 2 == 0 { Some(x + 1) } else { None }
464 /// });
465 ///
466 /// assert_eq!(Some(3), evens.next().await);
467 /// assert_eq!(Some(5), evens.next().await);
468 /// assert_eq!(Some(7), evens.next().await);
469 /// assert_eq!(Some(9), evens.next().await);
470 /// assert_eq!(None, evens.next().await);
471 /// # }
472 /// ```
473 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
474 where
475 F: FnMut(Self::Item) -> Option<T>,
476 Self: Sized,
477 {
478 FilterMap::new(self, f)
479 }
480
481 /// Creates a stream which ends after the first `None`.
482 ///
483 /// After a stream returns `None`, behavior is undefined. Future calls to
484 /// `poll_next` may or may not return `Some(T)` again or they may panic.
485 /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
486 /// return `None` forever.
487 ///
488 /// # Examples
489 ///
490 /// ```
491 /// use tokio_stream::{Stream, StreamExt};
492 ///
493 /// use std::pin::Pin;
494 /// use std::task::{Context, Poll};
495 ///
496 /// // a stream which alternates between Some and None
497 /// struct Alternate {
498 /// state: i32,
499 /// }
500 ///
501 /// impl Stream for Alternate {
502 /// type Item = i32;
503 ///
504 /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
505 /// let val = self.state;
506 /// self.state = self.state + 1;
507 ///
508 /// // if it's even, Some(i32), else None
509 /// if val % 2 == 0 {
510 /// Poll::Ready(Some(val))
511 /// } else {
512 /// Poll::Ready(None)
513 /// }
514 /// }
515 /// }
516 ///
517 /// #[tokio::main]
518 /// async fn main() {
519 /// let mut stream = Alternate { state: 0 };
520 ///
521 /// // the stream goes back and forth
522 /// assert_eq!(stream.next().await, Some(0));
523 /// assert_eq!(stream.next().await, None);
524 /// assert_eq!(stream.next().await, Some(2));
525 /// assert_eq!(stream.next().await, None);
526 ///
527 /// // however, once it is fused
528 /// let mut stream = stream.fuse();
529 ///
530 /// assert_eq!(stream.next().await, Some(4));
531 /// assert_eq!(stream.next().await, None);
532 ///
533 /// // it will always return `None` after the first time.
534 /// assert_eq!(stream.next().await, None);
535 /// assert_eq!(stream.next().await, None);
536 /// assert_eq!(stream.next().await, None);
537 /// }
538 /// ```
539 fn fuse(self) -> Fuse<Self>
540 where
541 Self: Sized,
542 {
543 Fuse::new(self)
544 }
545
546 /// Creates a new stream of at most `n` items of the underlying stream.
547 ///
548 /// Once `n` items have been yielded from this stream then it will always
549 /// return that the stream is done.
550 ///
551 /// # Examples
552 ///
553 /// ```
554 /// # #[tokio::main]
555 /// # async fn main() {
556 /// use tokio_stream::{self as stream, StreamExt};
557 ///
558 /// let mut stream = stream::iter(1..=10).take(3);
559 ///
560 /// assert_eq!(Some(1), stream.next().await);
561 /// assert_eq!(Some(2), stream.next().await);
562 /// assert_eq!(Some(3), stream.next().await);
563 /// assert_eq!(None, stream.next().await);
564 /// # }
565 /// ```
566 fn take(self, n: usize) -> Take<Self>
567 where
568 Self: Sized,
569 {
570 Take::new(self, n)
571 }
572
573 /// Take elements from this stream while the provided predicate
574 /// resolves to `true`.
575 ///
576 /// This function, like `Iterator::take_while`, will take elements from the
577 /// stream until the predicate `f` resolves to `false`. Once one element
578 /// returns false it will always return that the stream is done.
579 ///
580 /// # Examples
581 ///
582 /// ```
583 /// # #[tokio::main]
584 /// # async fn main() {
585 /// use tokio_stream::{self as stream, StreamExt};
586 ///
587 /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
588 ///
589 /// assert_eq!(Some(1), stream.next().await);
590 /// assert_eq!(Some(2), stream.next().await);
591 /// assert_eq!(Some(3), stream.next().await);
592 /// assert_eq!(None, stream.next().await);
593 /// # }
594 /// ```
595 fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
596 where
597 F: FnMut(&Self::Item) -> bool,
598 Self: Sized,
599 {
600 TakeWhile::new(self, f)
601 }
602
603 /// Creates a new stream that will skip the `n` first items of the
604 /// underlying stream.
605 ///
606 /// # Examples
607 ///
608 /// ```
609 /// # #[tokio::main]
610 /// # async fn main() {
611 /// use tokio_stream::{self as stream, StreamExt};
612 ///
613 /// let mut stream = stream::iter(1..=10).skip(7);
614 ///
615 /// assert_eq!(Some(8), stream.next().await);
616 /// assert_eq!(Some(9), stream.next().await);
617 /// assert_eq!(Some(10), stream.next().await);
618 /// assert_eq!(None, stream.next().await);
619 /// # }
620 /// ```
621 fn skip(self, n: usize) -> Skip<Self>
622 where
623 Self: Sized,
624 {
625 Skip::new(self, n)
626 }
627
628 /// Skip elements from the underlying stream while the provided predicate
629 /// resolves to `true`.
630 ///
631 /// This function, like [`Iterator::skip_while`], will ignore elements from the
632 /// stream until the predicate `f` resolves to `false`. Once one element
633 /// returns false, the rest of the elements will be yielded.
634 ///
635 /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
636 ///
637 /// # Examples
638 ///
639 /// ```
640 /// # #[tokio::main]
641 /// # async fn main() {
642 /// use tokio_stream::{self as stream, StreamExt};
643 /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
644 ///
645 /// assert_eq!(Some(3), stream.next().await);
646 /// assert_eq!(Some(4), stream.next().await);
647 /// assert_eq!(Some(1), stream.next().await);
648 /// assert_eq!(None, stream.next().await);
649 /// # }
650 /// ```
651 fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
652 where
653 F: FnMut(&Self::Item) -> bool,
654 Self: Sized,
655 {
656 SkipWhile::new(self, f)
657 }
658
659 /// Tests if every element of the stream matches a predicate.
660 ///
661 /// Equivalent to:
662 ///
663 /// ```ignore
664 /// async fn all<F>(&mut self, f: F) -> bool;
665 /// ```
666 ///
667 /// `all()` takes a closure that returns `true` or `false`. It applies
668 /// this closure to each element of the stream, and if they all return
669 /// `true`, then so does `all`. If any of them return `false`, it
670 /// returns `false`. An empty stream returns `true`.
671 ///
672 /// `all()` is short-circuiting; in other words, it will stop processing
673 /// as soon as it finds a `false`, given that no matter what else happens,
674 /// the result will also be `false`.
675 ///
676 /// An empty stream returns `true`.
677 ///
678 /// # Examples
679 ///
680 /// Basic usage:
681 ///
682 /// ```
683 /// # #[tokio::main]
684 /// # async fn main() {
685 /// use tokio_stream::{self as stream, StreamExt};
686 ///
687 /// let a = [1, 2, 3];
688 ///
689 /// assert!(stream::iter(&a).all(|&x| x > 0).await);
690 ///
691 /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
692 /// # }
693 /// ```
694 ///
695 /// Stopping at the first `false`:
696 ///
697 /// ```
698 /// # #[tokio::main]
699 /// # async fn main() {
700 /// use tokio_stream::{self as stream, StreamExt};
701 ///
702 /// let a = [1, 2, 3];
703 ///
704 /// let mut iter = stream::iter(&a);
705 ///
706 /// assert!(!iter.all(|&x| x != 2).await);
707 ///
708 /// // we can still use `iter`, as there are more elements.
709 /// assert_eq!(iter.next().await, Some(&3));
710 /// # }
711 /// ```
712 fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
713 where
714 Self: Unpin,
715 F: FnMut(Self::Item) -> bool,
716 {
717 AllFuture::new(self, f)
718 }
719
720 /// Tests if any element of the stream matches a predicate.
721 ///
722 /// Equivalent to:
723 ///
724 /// ```ignore
725 /// async fn any<F>(&mut self, f: F) -> bool;
726 /// ```
727 ///
728 /// `any()` takes a closure that returns `true` or `false`. It applies
729 /// this closure to each element of the stream, and if any of them return
730 /// `true`, then so does `any()`. If they all return `false`, it
731 /// returns `false`.
732 ///
733 /// `any()` is short-circuiting; in other words, it will stop processing
734 /// as soon as it finds a `true`, given that no matter what else happens,
735 /// the result will also be `true`.
736 ///
737 /// An empty stream returns `false`.
738 ///
739 /// Basic usage:
740 ///
741 /// ```
742 /// # #[tokio::main]
743 /// # async fn main() {
744 /// use tokio_stream::{self as stream, StreamExt};
745 ///
746 /// let a = [1, 2, 3];
747 ///
748 /// assert!(stream::iter(&a).any(|&x| x > 0).await);
749 ///
750 /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
751 /// # }
752 /// ```
753 ///
754 /// Stopping at the first `true`:
755 ///
756 /// ```
757 /// # #[tokio::main]
758 /// # async fn main() {
759 /// use tokio_stream::{self as stream, StreamExt};
760 ///
761 /// let a = [1, 2, 3];
762 ///
763 /// let mut iter = stream::iter(&a);
764 ///
765 /// assert!(iter.any(|&x| x != 2).await);
766 ///
767 /// // we can still use `iter`, as there are more elements.
768 /// assert_eq!(iter.next().await, Some(&2));
769 /// # }
770 /// ```
771 fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
772 where
773 Self: Unpin,
774 F: FnMut(Self::Item) -> bool,
775 {
776 AnyFuture::new(self, f)
777 }
778
779 /// Combine two streams into one by first returning all values from the
780 /// first stream then all values from the second stream.
781 ///
782 /// As long as `self` still has values to emit, no values from `other` are
783 /// emitted, even if some are ready.
784 ///
785 /// # Examples
786 ///
787 /// ```
788 /// use tokio_stream::{self as stream, StreamExt};
789 ///
790 /// #[tokio::main]
791 /// async fn main() {
792 /// let one = stream::iter(vec![1, 2, 3]);
793 /// let two = stream::iter(vec![4, 5, 6]);
794 ///
795 /// let mut stream = one.chain(two);
796 ///
797 /// assert_eq!(stream.next().await, Some(1));
798 /// assert_eq!(stream.next().await, Some(2));
799 /// assert_eq!(stream.next().await, Some(3));
800 /// assert_eq!(stream.next().await, Some(4));
801 /// assert_eq!(stream.next().await, Some(5));
802 /// assert_eq!(stream.next().await, Some(6));
803 /// assert_eq!(stream.next().await, None);
804 /// }
805 /// ```
806 fn chain<U>(self, other: U) -> Chain<Self, U>
807 where
808 U: Stream<Item = Self::Item>,
809 Self: Sized,
810 {
811 Chain::new(self, other)
812 }
813
814 /// A combinator that applies a function to every element in a stream
815 /// producing a single, final value.
816 ///
817 /// Equivalent to:
818 ///
819 /// ```ignore
820 /// async fn fold<B, F>(self, init: B, f: F) -> B;
821 /// ```
822 ///
823 /// # Examples
824 /// Basic usage:
825 /// ```
826 /// # #[tokio::main]
827 /// # async fn main() {
828 /// use tokio_stream::{self as stream, *};
829 ///
830 /// let s = stream::iter(vec![1u8, 2, 3]);
831 /// let sum = s.fold(0, |acc, x| acc + x).await;
832 ///
833 /// assert_eq!(sum, 6);
834 /// # }
835 /// ```
836 fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
837 where
838 Self: Sized,
839 F: FnMut(B, Self::Item) -> B,
840 {
841 FoldFuture::new(self, init, f)
842 }
843
844 /// Drain stream pushing all emitted values into a collection.
845 ///
846 /// Equivalent to:
847 ///
848 /// ```ignore
849 /// async fn collect<T>(self) -> T;
850 /// ```
851 ///
852 /// `collect` streams all values, awaiting as needed. Values are pushed into
853 /// a collection. A number of different target collection types are
854 /// supported, including [`Vec`], [`String`], and [`Bytes`].
855 ///
856 /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
857 ///
858 /// # `Result`
859 ///
860 /// `collect()` can also be used with streams of type `Result<T, E>` where
861 /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
862 /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
863 /// streaming is terminated and `collect()` returns the `Err`.
864 ///
865 /// # Notes
866 ///
867 /// `FromStream` is currently a sealed trait. Stabilization is pending
868 /// enhancements to the Rust language.
869 ///
870 /// # Examples
871 ///
872 /// Basic usage:
873 ///
874 /// ```
875 /// use tokio_stream::{self as stream, StreamExt};
876 ///
877 /// #[tokio::main]
878 /// async fn main() {
879 /// let doubled: Vec<i32> =
880 /// stream::iter(vec![1, 2, 3])
881 /// .map(|x| x * 2)
882 /// .collect()
883 /// .await;
884 ///
885 /// assert_eq!(vec![2, 4, 6], doubled);
886 /// }
887 /// ```
888 ///
889 /// Collecting a stream of `Result` values
890 ///
891 /// ```
892 /// use tokio_stream::{self as stream, StreamExt};
893 ///
894 /// #[tokio::main]
895 /// async fn main() {
896 /// // A stream containing only `Ok` values will be collected
897 /// let values: Result<Vec<i32>, &str> =
898 /// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
899 /// .collect()
900 /// .await;
901 ///
902 /// assert_eq!(Ok(vec![1, 2, 3]), values);
903 ///
904 /// // A stream containing `Err` values will return the first error.
905 /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
906 ///
907 /// let values: Result<Vec<i32>, &str> =
908 /// stream::iter(results)
909 /// .collect()
910 /// .await;
911 ///
912 /// assert_eq!(Err("no"), values);
913 /// }
914 /// ```
915 fn collect<T>(self) -> Collect<Self, T>
916 where
917 T: FromStream<Self::Item>,
918 Self: Sized,
919 {
920 Collect::new(self)
921 }
922
923 /// Applies a per-item timeout to the passed stream.
924 ///
925 /// `timeout()` takes a `Duration` that represents the maximum amount of
926 /// time each element of the stream has to complete before timing out.
927 ///
928 /// If the wrapped stream yields a value before the deadline is reached, the
929 /// value is returned. Otherwise, an error is returned. The caller may decide
930 /// to continue consuming the stream and will eventually get the next source
931 /// stream value once it becomes available. See
932 /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
933 /// where the timeouts will repeat.
934 ///
935 /// # Notes
936 ///
937 /// This function consumes the stream passed into it and returns a
938 /// wrapped version of it.
939 ///
940 /// Polling the returned stream will continue to poll the inner stream even
941 /// if one or more items time out.
942 ///
943 /// # Examples
944 ///
945 /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
946 ///
947 /// ```
948 /// # #[tokio::main]
949 /// # async fn main() {
950 /// use tokio_stream::{self as stream, StreamExt};
951 /// use std::time::Duration;
952 /// # let int_stream = stream::iter(1..=3);
953 ///
954 /// let int_stream = int_stream.timeout(Duration::from_secs(1));
955 /// tokio::pin!(int_stream);
956 ///
957 /// // When no items time out, we get the 3 elements in succession:
958 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
959 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
960 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
961 /// assert_eq!(int_stream.try_next().await, Ok(None));
962 ///
963 /// // If the second item times out, we get an error and continue polling the stream:
964 /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
965 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
966 /// assert!(int_stream.try_next().await.is_err());
967 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
968 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
969 /// assert_eq!(int_stream.try_next().await, Ok(None));
970 ///
971 /// // If we want to stop consuming the source stream the first time an
972 /// // element times out, we can use the `take_while` operator:
973 /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
974 /// let mut int_stream = int_stream.take_while(Result::is_ok);
975 ///
976 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
977 /// assert_eq!(int_stream.try_next().await, Ok(None));
978 /// # }
979 /// ```
980 ///
981 /// Once a timeout error is received, no further events will be received
982 /// unless the wrapped stream yields a value (timeouts do not repeat).
983 ///
984 /// ```
985 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
986 /// # async fn main() {
987 /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
988 /// use std::time::Duration;
989 /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
990 /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
991 /// tokio::pin!(timeout_stream);
992 ///
993 /// // Only one timeout will be received between values in the source stream.
994 /// assert!(timeout_stream.try_next().await.is_ok());
995 /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
996 /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
997 /// # }
998 /// ```
999 #[cfg(feature = "time")]
1000 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1001 fn timeout(self, duration: Duration) -> Timeout<Self>
1002 where
1003 Self: Sized,
1004 {
1005 Timeout::new(self, duration)
1006 }
1007
1008 /// Applies a per-item timeout to the passed stream.
1009 ///
1010 /// `timeout_repeating()` takes an [`Interval`] that controls the time each
1011 /// element of the stream has to complete before timing out.
1012 ///
1013 /// If the wrapped stream yields a value before the deadline is reached, the
1014 /// value is returned. Otherwise, an error is returned. The caller may decide
1015 /// to continue consuming the stream and will eventually get the next source
1016 /// stream value once it becomes available. Unlike `timeout()`, if no value
1017 /// becomes available before the deadline is reached, additional errors are
1018 /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
1019 /// for an alternative where the timeouts do not repeat.
1020 ///
1021 /// # Notes
1022 ///
1023 /// This function consumes the stream passed into it and returns a
1024 /// wrapped version of it.
1025 ///
1026 /// Polling the returned stream will continue to poll the inner stream even
1027 /// if one or more items time out.
1028 ///
1029 /// # Examples
1030 ///
1031 /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
1032 ///
1033 /// ```
1034 /// # #[tokio::main]
1035 /// # async fn main() {
1036 /// use tokio_stream::{self as stream, StreamExt};
1037 /// use std::time::Duration;
1038 /// # let int_stream = stream::iter(1..=3);
1039 ///
1040 /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
1041 /// tokio::pin!(int_stream);
1042 ///
1043 /// // When no items time out, we get the 3 elements in succession:
1044 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1045 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1046 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1047 /// assert_eq!(int_stream.try_next().await, Ok(None));
1048 ///
1049 /// // If the second item times out, we get an error and continue polling the stream:
1050 /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1051 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1052 /// assert!(int_stream.try_next().await.is_err());
1053 /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1054 /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1055 /// assert_eq!(int_stream.try_next().await, Ok(None));
1056 ///
1057 /// // If we want to stop consuming the source stream the first time an
1058 /// // element times out, we can use the `take_while` operator:
1059 /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1060 /// let mut int_stream = int_stream.take_while(Result::is_ok);
1061 ///
1062 /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1063 /// assert_eq!(int_stream.try_next().await, Ok(None));
1064 /// # }
1065 /// ```
1066 ///
1067 /// Timeout errors will be continuously produced at the specified interval
1068 /// until the wrapped stream yields a value.
1069 ///
1070 /// ```
1071 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1072 /// # async fn main() {
1073 /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
1074 /// use std::time::Duration;
1075 /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
1076 /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
1077 /// tokio::pin!(timeout_stream);
1078 ///
1079 /// // Multiple timeouts will be received between values in the source stream.
1080 /// assert!(timeout_stream.try_next().await.is_ok());
1081 /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1082 /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
1083 /// // Will eventually receive another value from the source stream...
1084 /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
1085 /// # }
1086 /// ```
1087 #[cfg(feature = "time")]
1088 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1089 fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
1090 where
1091 Self: Sized,
1092 {
1093 TimeoutRepeating::new(self, interval)
1094 }
1095
1096 /// Slows down a stream by enforcing a delay between items.
1097 ///
1098 /// The underlying timer behind this utility has a granularity of one millisecond.
1099 ///
1100 /// # Example
1101 ///
1102 /// Create a throttled stream.
1103 /// ```rust,no_run
1104 /// use std::time::Duration;
1105 /// use tokio_stream::StreamExt;
1106 ///
1107 /// # async fn dox() {
1108 /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
1109 /// tokio::pin!(item_stream);
1110 ///
1111 /// loop {
1112 /// // The string will be produced at most every 2 seconds
1113 /// println!("{:?}", item_stream.next().await);
1114 /// }
1115 /// # }
1116 /// ```
1117 #[cfg(feature = "time")]
1118 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1119 fn throttle(self, duration: Duration) -> Throttle<Self>
1120 where
1121 Self: Sized,
1122 {
1123 throttle(duration, self)
1124 }
1125
1126 /// Batches the items in the given stream using a maximum duration and size for each batch.
1127 ///
1128 /// This stream returns the next batch of items in the following situations:
1129 /// 1. The inner stream has returned at least `max_size` many items since the last batch.
1130 /// 2. The time since the first item of a batch is greater than the given duration.
1131 /// 3. The end of the stream is reached.
1132 ///
1133 /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1134 /// will not be emitted if no items are received upstream.
1135 ///
1136 /// # Panics
1137 ///
1138 /// This function panics if `max_size` is zero
1139 ///
1140 /// # Example
1141 ///
1142 /// ```rust
1143 /// use std::time::Duration;
1144 /// use tokio::time;
1145 /// use tokio_stream::{self as stream, StreamExt};
1146 /// use futures::FutureExt;
1147 ///
1148 /// #[tokio::main]
1149 /// # async fn _unused() {}
1150 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1151 /// async fn main() {
1152 /// let iter = vec![1, 2, 3, 4].into_iter();
1153 /// let stream0 = stream::iter(iter);
1154 ///
1155 /// let iter = vec![5].into_iter();
1156 /// let stream1 = stream::iter(iter)
1157 /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1158 ///
1159 /// let chunk_stream = stream0
1160 /// .chain(stream1)
1161 /// .chunks_timeout(3, Duration::from_secs(2));
1162 /// tokio::pin!(chunk_stream);
1163 ///
1164 /// // a full batch was received
1165 /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1166 /// // deadline was reached before max_size was reached
1167 /// assert_eq!(chunk_stream.next().await, Some(vec![4]));
1168 /// // last element in the stream
1169 /// assert_eq!(chunk_stream.next().await, Some(vec![5]));
1170 /// }
1171 /// ```
1172 #[cfg(feature = "time")]
1173 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1174 #[track_caller]
1175 fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1176 where
1177 Self: Sized,
1178 {
1179 assert!(max_size > 0, "`max_size` must be non-zero.");
1180 ChunksTimeout::new(self, max_size, duration)
1181 }
1182
1183 /// Turns the stream into a peekable stream, whose next element can be peeked at without being
1184 /// consumed.
1185 /// ```rust
1186 /// use tokio_stream::{self as stream, StreamExt};
1187 ///
1188 /// #[tokio::main]
1189 /// # async fn _unused() {}
1190 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1191 /// async fn main() {
1192 /// let iter = vec![1, 2, 3, 4].into_iter();
1193 /// let mut stream = stream::iter(iter).peekable();
1194 ///
1195 /// assert_eq!(*stream.peek().await.unwrap(), 1);
1196 /// assert_eq!(*stream.peek().await.unwrap(), 1);
1197 /// assert_eq!(stream.next().await.unwrap(), 1);
1198 /// assert_eq!(*stream.peek().await.unwrap(), 2);
1199 /// }
1200 /// ```
1201 fn peekable(self) -> Peekable<Self>
1202 where
1203 Self: Sized,
1204 {
1205 Peekable::new(self)
1206 }
1207}
1208
1209impl<St: ?Sized> StreamExt for St where St: Stream {}
1210
1211/// Merge the size hints from two streams.
1212fn merge_size_hints(
1213 (left_low, left_high): (usize, Option<usize>),
1214 (right_low, right_high): (usize, Option<usize>),
1215) -> (usize, Option<usize>) {
1216 let low = left_low.saturating_add(right_low);
1217 let high = match (left_high, right_high) {
1218 (Some(h1), Some(h2)) => h1.checked_add(h2),
1219 _ => None,
1220 };
1221 (low, high)
1222}
1223