1//! Streams
2//!
3//! This module contains a number of functions for working with `Streams`s
4//! that return `Result`s, allowing for short-circuiting computations.
5
6#[cfg(feature = "compat")]
7use crate::compat::Compat;
8use crate::fns::{
9 inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn,
10 IntoFn, MapErrFn, MapOkFn,
11};
12use crate::future::assert_future;
13use crate::stream::assert_stream;
14use crate::stream::{Inspect, Map};
15#[cfg(feature = "alloc")]
16use alloc::vec::Vec;
17use core::pin::Pin;
18
19use futures_core::{
20 future::{Future, TryFuture},
21 stream::TryStream,
22 task::{Context, Poll},
23};
24
25mod and_then;
26#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
27pub use self::and_then::AndThen;
28
29delegate_all!(
30 /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
31 ErrInto<St, E>(
32 MapErr<St, IntoFn<E>>
33 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
34);
35
36delegate_all!(
37 /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
38 InspectOk<St, F>(
39 Inspect<IntoStream<St>, InspectOkFn<F>>
40 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
41);
42
43delegate_all!(
44 /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
45 InspectErr<St, F>(
46 Inspect<IntoStream<St>, InspectErrFn<F>>
47 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
48);
49
50mod into_stream;
51#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
52pub use self::into_stream::IntoStream;
53
54delegate_all!(
55 /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
56 MapOk<St, F>(
57 Map<IntoStream<St>, MapOkFn<F>>
58 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
59);
60
61delegate_all!(
62 /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
63 MapErr<St, F>(
64 Map<IntoStream<St>, MapErrFn<F>>
65 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
66);
67
68mod or_else;
69#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
70pub use self::or_else::OrElse;
71
72mod try_next;
73#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
74pub use self::try_next::TryNext;
75
76mod try_for_each;
77#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
78pub use self::try_for_each::TryForEach;
79
80mod try_filter;
81#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
82pub use self::try_filter::TryFilter;
83
84mod try_filter_map;
85#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
86pub use self::try_filter_map::TryFilterMap;
87
88mod try_flatten;
89#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
90pub use self::try_flatten::TryFlatten;
91
92#[cfg(not(futures_no_atomic_cas))]
93#[cfg(feature = "alloc")]
94mod try_flatten_unordered;
95#[cfg(not(futures_no_atomic_cas))]
96#[cfg(feature = "alloc")]
97#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
98pub use self::try_flatten_unordered::TryFlattenUnordered;
99
100mod try_collect;
101#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
102pub use self::try_collect::TryCollect;
103
104mod try_concat;
105#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
106pub use self::try_concat::TryConcat;
107
108#[cfg(feature = "alloc")]
109mod try_chunks;
110#[cfg(feature = "alloc")]
111#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
112pub use self::try_chunks::{TryChunks, TryChunksError};
113
114#[cfg(feature = "alloc")]
115mod try_ready_chunks;
116#[cfg(feature = "alloc")]
117#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
118pub use self::try_ready_chunks::{TryReadyChunks, TryReadyChunksError};
119
120mod try_fold;
121#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
122pub use self::try_fold::TryFold;
123
124mod try_unfold;
125#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
126pub use self::try_unfold::{try_unfold, TryUnfold};
127
128mod try_skip_while;
129#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
130pub use self::try_skip_while::TrySkipWhile;
131
132mod try_take_while;
133#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
134pub use self::try_take_while::TryTakeWhile;
135
136#[cfg(not(futures_no_atomic_cas))]
137#[cfg(feature = "alloc")]
138mod try_buffer_unordered;
139#[cfg(not(futures_no_atomic_cas))]
140#[cfg(feature = "alloc")]
141#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
142pub use self::try_buffer_unordered::TryBufferUnordered;
143
144#[cfg(not(futures_no_atomic_cas))]
145#[cfg(feature = "alloc")]
146mod try_buffered;
147#[cfg(not(futures_no_atomic_cas))]
148#[cfg(feature = "alloc")]
149#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
150pub use self::try_buffered::TryBuffered;
151
152#[cfg(not(futures_no_atomic_cas))]
153#[cfg(feature = "alloc")]
154mod try_for_each_concurrent;
155#[cfg(not(futures_no_atomic_cas))]
156#[cfg(feature = "alloc")]
157#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
158pub use self::try_for_each_concurrent::TryForEachConcurrent;
159
160#[cfg(feature = "io")]
161#[cfg(feature = "std")]
162mod into_async_read;
163#[cfg(feature = "io")]
164#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
165#[cfg(feature = "std")]
166#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
167pub use self::into_async_read::IntoAsyncRead;
168
169mod try_all;
170#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
171pub use self::try_all::TryAll;
172
173mod try_any;
174#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
175pub use self::try_any::TryAny;
176
177impl<S: ?Sized + TryStream> TryStreamExt for S {}
178
179/// Adapters specific to `Result`-returning streams
180pub trait TryStreamExt: TryStream {
181 /// Wraps the current stream in a new stream which converts the error type
182 /// into the one provided.
183 ///
184 /// # Examples
185 ///
186 /// ```
187 /// # futures::executor::block_on(async {
188 /// use futures::stream::{self, TryStreamExt};
189 ///
190 /// let mut stream =
191 /// stream::iter(vec![Ok(()), Err(5i32)])
192 /// .err_into::<i64>();
193 ///
194 /// assert_eq!(stream.try_next().await, Ok(Some(())));
195 /// assert_eq!(stream.try_next().await, Err(5i64));
196 /// # })
197 /// ```
198 fn err_into<E>(self) -> ErrInto<Self, E>
199 where
200 Self: Sized,
201 Self::Error: Into<E>,
202 {
203 assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
204 }
205
206 /// Wraps the current stream in a new stream which maps the success value
207 /// using the provided closure.
208 ///
209 /// # Examples
210 ///
211 /// ```
212 /// # futures::executor::block_on(async {
213 /// use futures::stream::{self, TryStreamExt};
214 ///
215 /// let mut stream =
216 /// stream::iter(vec![Ok(5), Err(0)])
217 /// .map_ok(|x| x + 2);
218 ///
219 /// assert_eq!(stream.try_next().await, Ok(Some(7)));
220 /// assert_eq!(stream.try_next().await, Err(0));
221 /// # })
222 /// ```
223 fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
224 where
225 Self: Sized,
226 F: FnMut(Self::Ok) -> T,
227 {
228 assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
229 }
230
231 /// Wraps the current stream in a new stream which maps the error value
232 /// using the provided closure.
233 ///
234 /// # Examples
235 ///
236 /// ```
237 /// # futures::executor::block_on(async {
238 /// use futures::stream::{self, TryStreamExt};
239 ///
240 /// let mut stream =
241 /// stream::iter(vec![Ok(5), Err(0)])
242 /// .map_err(|x| x + 2);
243 ///
244 /// assert_eq!(stream.try_next().await, Ok(Some(5)));
245 /// assert_eq!(stream.try_next().await, Err(2));
246 /// # })
247 /// ```
248 fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
249 where
250 Self: Sized,
251 F: FnMut(Self::Error) -> E,
252 {
253 assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
254 }
255
256 /// Chain on a computation for when a value is ready, passing the successful
257 /// results to the provided closure `f`.
258 ///
259 /// This function can be used to run a unit of work when the next successful
260 /// value on a stream is ready. The closure provided will be yielded a value
261 /// when ready, and the returned future will then be run to completion to
262 /// produce the next value on this stream.
263 ///
264 /// Any errors produced by this stream will not be passed to the closure,
265 /// and will be passed through.
266 ///
267 /// The returned value of the closure must implement the `TryFuture` trait
268 /// and can represent some more work to be done before the composed stream
269 /// is finished.
270 ///
271 /// Note that this function consumes the receiving stream and returns a
272 /// wrapped version of it.
273 ///
274 /// To process the entire stream and return a single future representing
275 /// success or error, use `try_for_each` instead.
276 ///
277 /// # Examples
278 ///
279 /// ```
280 /// use futures::channel::mpsc;
281 /// use futures::future;
282 /// use futures::stream::TryStreamExt;
283 ///
284 /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1);
285 ///
286 /// let rx = rx.and_then(|result| {
287 /// future::ok(if result % 2 == 0 {
288 /// Some(result)
289 /// } else {
290 /// None
291 /// })
292 /// });
293 /// ```
294 fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
295 where
296 F: FnMut(Self::Ok) -> Fut,
297 Fut: TryFuture<Error = Self::Error>,
298 Self: Sized,
299 {
300 assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
301 }
302
303 /// Chain on a computation for when an error happens, passing the
304 /// erroneous result to the provided closure `f`.
305 ///
306 /// This function can be used to run a unit of work and attempt to recover from
307 /// an error if one happens. The closure provided will be yielded an error
308 /// when one appears, and the returned future will then be run to completion
309 /// to produce the next value on this stream.
310 ///
311 /// Any successful values produced by this stream will not be passed to the
312 /// closure, and will be passed through.
313 ///
314 /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
315 /// and can represent some more work to be done before the composed stream
316 /// is finished.
317 ///
318 /// Note that this function consumes the receiving stream and returns a
319 /// wrapped version of it.
320 fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
321 where
322 F: FnMut(Self::Error) -> Fut,
323 Fut: TryFuture<Ok = Self::Ok>,
324 Self: Sized,
325 {
326 assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
327 }
328
329 /// Do something with the success value of this stream, afterwards passing
330 /// it on.
331 ///
332 /// This is similar to the `StreamExt::inspect` method where it allows
333 /// easily inspecting the success value as it passes through the stream, for
334 /// example to debug what's going on.
335 fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
336 where
337 F: FnMut(&Self::Ok),
338 Self: Sized,
339 {
340 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
341 }
342
343 /// Do something with the error value of this stream, afterwards passing it on.
344 ///
345 /// This is similar to the `StreamExt::inspect` method where it allows
346 /// easily inspecting the error value as it passes through the stream, for
347 /// example to debug what's going on.
348 fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
349 where
350 F: FnMut(&Self::Error),
351 Self: Sized,
352 {
353 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
354 }
355
356 /// Wraps a [`TryStream`] into a type that implements
357 /// [`Stream`](futures_core::stream::Stream)
358 ///
359 /// [`TryStream`]s currently do not implement the
360 /// [`Stream`](futures_core::stream::Stream) trait because of limitations
361 /// of the compiler.
362 ///
363 /// # Examples
364 ///
365 /// ```
366 /// use futures::stream::{Stream, TryStream, TryStreamExt};
367 ///
368 /// # type T = i32;
369 /// # type E = ();
370 /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... }
371 /// # futures::stream::empty()
372 /// # }
373 /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ }
374 ///
375 /// take_stream(make_try_stream().into_stream());
376 /// ```
377 fn into_stream(self) -> IntoStream<Self>
378 where
379 Self: Sized,
380 {
381 assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
382 }
383
384 /// Creates a future that attempts to resolve the next item in the stream.
385 /// If an error is encountered before the next item, the error is returned
386 /// instead.
387 ///
388 /// This is similar to the `Stream::next` combinator, but returns a
389 /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
390 /// for easy use with the `?` operator.
391 ///
392 /// # Examples
393 ///
394 /// ```
395 /// # futures::executor::block_on(async {
396 /// use futures::stream::{self, TryStreamExt};
397 ///
398 /// let mut stream = stream::iter(vec![Ok(()), Err(())]);
399 ///
400 /// assert_eq!(stream.try_next().await, Ok(Some(())));
401 /// assert_eq!(stream.try_next().await, Err(()));
402 /// # })
403 /// ```
404 fn try_next(&mut self) -> TryNext<'_, Self>
405 where
406 Self: Unpin,
407 {
408 assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
409 }
410
411 /// Attempts to run this stream to completion, executing the provided
412 /// asynchronous closure for each element on the stream.
413 ///
414 /// The provided closure will be called for each item this stream produces,
415 /// yielding a future. That future will then be executed to completion
416 /// before moving on to the next item.
417 ///
418 /// The returned value is a [`Future`](futures_core::future::Future) where the
419 /// [`Output`](futures_core::future::Future::Output) type is
420 /// `Result<(), Self::Error>`. If any of the intermediate
421 /// futures or the stream returns an error, this future will return
422 /// immediately with an error.
423 ///
424 /// # Examples
425 ///
426 /// ```
427 /// # futures::executor::block_on(async {
428 /// use futures::future;
429 /// use futures::stream::{self, TryStreamExt};
430 ///
431 /// let mut x = 0i32;
432 ///
433 /// {
434 /// let fut = stream::repeat(Ok(1)).try_for_each(|item| {
435 /// x += item;
436 /// future::ready(if x == 3 { Err(()) } else { Ok(()) })
437 /// });
438 /// assert_eq!(fut.await, Err(()));
439 /// }
440 ///
441 /// assert_eq!(x, 3);
442 /// # })
443 /// ```
444 fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
445 where
446 F: FnMut(Self::Ok) -> Fut,
447 Fut: TryFuture<Ok = (), Error = Self::Error>,
448 Self: Sized,
449 {
450 assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
451 }
452
453 /// Skip elements on this stream while the provided asynchronous predicate
454 /// resolves to `true`.
455 ///
456 /// This function is similar to
457 /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
458 /// early if an error occurs.
459 ///
460 /// # Examples
461 ///
462 /// ```
463 /// # futures::executor::block_on(async {
464 /// use futures::future;
465 /// use futures::stream::{self, TryStreamExt};
466 ///
467 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]);
468 /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3)));
469 ///
470 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
471 /// assert_eq!(output, Ok(vec![3, 2]));
472 /// # })
473 /// ```
474 fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
475 where
476 F: FnMut(&Self::Ok) -> Fut,
477 Fut: TryFuture<Ok = bool, Error = Self::Error>,
478 Self: Sized,
479 {
480 assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
481 }
482
483 /// Take elements on this stream while the provided asynchronous predicate
484 /// resolves to `true`.
485 ///
486 /// This function is similar to
487 /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
488 /// early if an error occurs.
489 ///
490 /// # Examples
491 ///
492 /// ```
493 /// # futures::executor::block_on(async {
494 /// use futures::future;
495 /// use futures::stream::{self, TryStreamExt};
496 ///
497 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
498 /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
499 ///
500 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
501 /// assert_eq!(output, Ok(vec![1, 2]));
502 /// # })
503 /// ```
504 fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
505 where
506 F: FnMut(&Self::Ok) -> Fut,
507 Fut: TryFuture<Ok = bool, Error = Self::Error>,
508 Self: Sized,
509 {
510 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
511 }
512
513 /// Attempts to run this stream to completion, executing the provided asynchronous
514 /// closure for each element on the stream concurrently as elements become
515 /// available, exiting as soon as an error occurs.
516 ///
517 /// This is similar to
518 /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
519 /// but will resolve to an error immediately if the underlying stream or the provided
520 /// closure return an error.
521 ///
522 /// This method is only available when the `std` or `alloc` feature of this
523 /// library is activated, and it is activated by default.
524 ///
525 /// # Examples
526 ///
527 /// ```
528 /// # futures::executor::block_on(async {
529 /// use futures::channel::oneshot;
530 /// use futures::stream::{self, StreamExt, TryStreamExt};
531 ///
532 /// let (tx1, rx1) = oneshot::channel();
533 /// let (tx2, rx2) = oneshot::channel();
534 /// let (_tx3, rx3) = oneshot::channel();
535 ///
536 /// let stream = stream::iter(vec![rx1, rx2, rx3]);
537 /// let fut = stream.map(Ok).try_for_each_concurrent(
538 /// /* limit */ 2,
539 /// |rx| async move {
540 /// let res: Result<(), oneshot::Canceled> = rx.await;
541 /// res
542 /// }
543 /// );
544 ///
545 /// tx1.send(()).unwrap();
546 /// // Drop the second sender so that `rx2` resolves to `Canceled`.
547 /// drop(tx2);
548 ///
549 /// // The final result is an error because the second future
550 /// // resulted in an error.
551 /// assert_eq!(Err(oneshot::Canceled), fut.await);
552 /// # })
553 /// ```
554 #[cfg(not(futures_no_atomic_cas))]
555 #[cfg(feature = "alloc")]
556 fn try_for_each_concurrent<Fut, F>(
557 self,
558 limit: impl Into<Option<usize>>,
559 f: F,
560 ) -> TryForEachConcurrent<Self, Fut, F>
561 where
562 F: FnMut(Self::Ok) -> Fut,
563 Fut: Future<Output = Result<(), Self::Error>>,
564 Self: Sized,
565 {
566 assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
567 self,
568 limit.into(),
569 f,
570 ))
571 }
572
573 /// Attempt to transform a stream into a collection,
574 /// returning a future representing the result of that computation.
575 ///
576 /// This combinator will collect all successful results of this stream and
577 /// collect them into the specified collection type. If an error happens then all
578 /// collected elements will be dropped and the error will be returned.
579 ///
580 /// The returned future will be resolved when the stream terminates.
581 ///
582 /// # Examples
583 ///
584 /// ```
585 /// # futures::executor::block_on(async {
586 /// use futures::channel::mpsc;
587 /// use futures::stream::TryStreamExt;
588 /// use std::thread;
589 ///
590 /// let (tx, rx) = mpsc::unbounded();
591 ///
592 /// thread::spawn(move || {
593 /// for i in 1..=5 {
594 /// tx.unbounded_send(Ok(i)).unwrap();
595 /// }
596 /// tx.unbounded_send(Err(6)).unwrap();
597 /// });
598 ///
599 /// let output: Result<Vec<i32>, i32> = rx.try_collect().await;
600 /// assert_eq!(output, Err(6));
601 /// # })
602 /// ```
603 fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
604 where
605 Self: Sized,
606 {
607 assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
608 }
609
610 /// An adaptor for chunking up successful items of the stream inside a vector.
611 ///
612 /// This combinator will attempt to pull successful items from this stream and buffer
613 /// them into a local vector. At most `capacity` items will get buffered
614 /// before they're yielded from the returned stream.
615 ///
616 /// Note that the vectors returned from this iterator may not always have
617 /// `capacity` elements. If the underlying stream ended and only a partial
618 /// vector was created, it'll be returned. Additionally if an error happens
619 /// from the underlying stream then the currently buffered items will be
620 /// yielded.
621 ///
622 /// This method is only available when the `std` or `alloc` feature of this
623 /// library is activated, and it is activated by default.
624 ///
625 /// This function is similar to
626 /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
627 /// early if an error occurs.
628 ///
629 /// # Examples
630 ///
631 /// ```
632 /// # futures::executor::block_on(async {
633 /// use futures::stream::{self, TryChunksError, TryStreamExt};
634 ///
635 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
636 /// let mut stream = stream.try_chunks(2);
637 ///
638 /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
639 /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
640 /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
641 /// # })
642 /// ```
643 ///
644 /// # Panics
645 ///
646 /// This method will panic if `capacity` is zero.
647 #[cfg(feature = "alloc")]
648 fn try_chunks(self, capacity: usize) -> TryChunks<Self>
649 where
650 Self: Sized,
651 {
652 assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
653 TryChunks::new(self, capacity),
654 )
655 }
656
657 /// An adaptor for chunking up successful, ready items of the stream inside a vector.
658 ///
659 /// This combinator will attempt to pull successful items from this stream and buffer
660 /// them into a local vector. At most `capacity` items will get buffered
661 /// before they're yielded from the returned stream. If the underlying stream
662 /// returns `Poll::Pending`, and the collected chunk is not empty, it will
663 /// be immidiatly returned.
664 ///
665 /// Note that the vectors returned from this iterator may not always have
666 /// `capacity` elements. If the underlying stream ended and only a partial
667 /// vector was created, it'll be returned. Additionally if an error happens
668 /// from the underlying stream then the currently buffered items will be
669 /// yielded.
670 ///
671 /// This method is only available when the `std` or `alloc` feature of this
672 /// library is activated, and it is activated by default.
673 ///
674 /// This function is similar to
675 /// [`StreamExt::ready_chunks`](crate::stream::StreamExt::ready_chunks) but exits
676 /// early if an error occurs.
677 ///
678 /// # Examples
679 ///
680 /// ```
681 /// # futures::executor::block_on(async {
682 /// use futures::stream::{self, TryReadyChunksError, TryStreamExt};
683 ///
684 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
685 /// let mut stream = stream.try_ready_chunks(2);
686 ///
687 /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
688 /// assert_eq!(stream.try_next().await, Err(TryReadyChunksError(vec![3], 4)));
689 /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
690 /// # })
691 /// ```
692 ///
693 /// # Panics
694 ///
695 /// This method will panic if `capacity` is zero.
696 #[cfg(feature = "alloc")]
697 fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>
698 where
699 Self: Sized,
700 {
701 assert_stream::<Result<Vec<Self::Ok>, TryReadyChunksError<Self::Ok, Self::Error>>, _>(
702 TryReadyChunks::new(self, capacity),
703 )
704 }
705
706 /// Attempt to filter the values produced by this stream according to the
707 /// provided asynchronous closure.
708 ///
709 /// As values of this stream are made available, the provided predicate `f`
710 /// will be run on them. If the predicate returns a `Future` which resolves
711 /// to `true`, then the stream will yield the value, but if the predicate
712 /// return a `Future` which resolves to `false`, then the value will be
713 /// discarded and the next value will be produced.
714 ///
715 /// All errors are passed through without filtering in this combinator.
716 ///
717 /// Note that this function consumes the stream passed into it and returns a
718 /// wrapped version of it, similar to the existing `filter` methods in
719 /// the standard library.
720 ///
721 /// # Examples
722 /// ```
723 /// # futures::executor::block_on(async {
724 /// use futures::future;
725 /// use futures::stream::{self, StreamExt, TryStreamExt};
726 ///
727 /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]);
728 /// let mut evens = stream.try_filter(|x| {
729 /// future::ready(x % 2 == 0)
730 /// });
731 ///
732 /// assert_eq!(evens.next().await, Some(Ok(2)));
733 /// assert_eq!(evens.next().await, Some(Err("error")));
734 /// # })
735 /// ```
736 fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
737 where
738 Fut: Future<Output = bool>,
739 F: FnMut(&Self::Ok) -> Fut,
740 Self: Sized,
741 {
742 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
743 }
744
745 /// Attempt to filter the values produced by this stream while
746 /// simultaneously mapping them to a different type according to the
747 /// provided asynchronous closure.
748 ///
749 /// As values of this stream are made available, the provided function will
750 /// be run on them. If the future returned by the predicate `f` resolves to
751 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
752 /// it resolves to [`None`] then the next value will be produced.
753 ///
754 /// All errors are passed through without filtering in this combinator.
755 ///
756 /// Note that this function consumes the stream passed into it and returns a
757 /// wrapped version of it, similar to the existing `filter_map` methods in
758 /// the standard library.
759 ///
760 /// # Examples
761 /// ```
762 /// # futures::executor::block_on(async {
763 /// use futures::stream::{self, StreamExt, TryStreamExt};
764 /// use futures::pin_mut;
765 ///
766 /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]);
767 /// let halves = stream.try_filter_map(|x| async move {
768 /// let ret = if x % 2 == 0 { Some(x / 2) } else { None };
769 /// Ok(ret)
770 /// });
771 ///
772 /// pin_mut!(halves);
773 /// assert_eq!(halves.next().await, Some(Ok(3)));
774 /// assert_eq!(halves.next().await, Some(Err("error")));
775 /// # })
776 /// ```
777 fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
778 where
779 Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
780 F: FnMut(Self::Ok) -> Fut,
781 Self: Sized,
782 {
783 assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
784 }
785
786 /// Flattens a stream of streams into just one continuous stream. Produced streams
787 /// will be polled concurrently and any errors will be passed through without looking at them.
788 /// If the underlying base stream returns an error, it will be **immediately** propagated.
789 ///
790 /// The only argument is an optional limit on the number of concurrently
791 /// polled streams. If this limit is not `None`, no more than `limit` streams
792 /// will be polled at the same time. The `limit` argument is of type
793 /// `Into<Option<usize>>`, and so can be provided as either `None`,
794 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
795 /// no limit at all, and will have the same result as passing in `None`.
796 ///
797 /// # Examples
798 ///
799 /// ```
800 /// # futures::executor::block_on(async {
801 /// use futures::channel::mpsc;
802 /// use futures::stream::{StreamExt, TryStreamExt};
803 /// use std::thread;
804 ///
805 /// let (tx1, rx1) = mpsc::unbounded();
806 /// let (tx2, rx2) = mpsc::unbounded();
807 /// let (tx3, rx3) = mpsc::unbounded();
808 ///
809 /// thread::spawn(move || {
810 /// tx1.unbounded_send(Ok(1)).unwrap();
811 /// });
812 /// thread::spawn(move || {
813 /// tx2.unbounded_send(Ok(2)).unwrap();
814 /// tx2.unbounded_send(Err(3)).unwrap();
815 /// tx2.unbounded_send(Ok(4)).unwrap();
816 /// });
817 /// thread::spawn(move || {
818 /// tx3.unbounded_send(Ok(rx1)).unwrap();
819 /// tx3.unbounded_send(Ok(rx2)).unwrap();
820 /// tx3.unbounded_send(Err(5)).unwrap();
821 /// });
822 ///
823 /// let stream = rx3.try_flatten_unordered(None);
824 /// let mut values: Vec<_> = stream.collect().await;
825 /// values.sort();
826 ///
827 /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]);
828 /// # });
829 /// ```
830 #[cfg(not(futures_no_atomic_cas))]
831 #[cfg(feature = "alloc")]
832 fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
833 where
834 Self::Ok: TryStream + Unpin,
835 <Self::Ok as TryStream>::Error: From<Self::Error>,
836 Self: Sized,
837 {
838 assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
839 TryFlattenUnordered::new(self, limit),
840 )
841 }
842
843 /// Flattens a stream of streams into just one continuous stream.
844 ///
845 /// If this stream's elements are themselves streams then this combinator
846 /// will flatten out the entire stream to one long chain of elements. Any
847 /// errors are passed through without looking at them, but otherwise each
848 /// individual stream will get exhausted before moving on to the next.
849 ///
850 /// # Examples
851 ///
852 /// ```
853 /// # futures::executor::block_on(async {
854 /// use futures::channel::mpsc;
855 /// use futures::stream::{StreamExt, TryStreamExt};
856 /// use std::thread;
857 ///
858 /// let (tx1, rx1) = mpsc::unbounded();
859 /// let (tx2, rx2) = mpsc::unbounded();
860 /// let (tx3, rx3) = mpsc::unbounded();
861 ///
862 /// thread::spawn(move || {
863 /// tx1.unbounded_send(Ok(1)).unwrap();
864 /// });
865 /// thread::spawn(move || {
866 /// tx2.unbounded_send(Ok(2)).unwrap();
867 /// tx2.unbounded_send(Err(3)).unwrap();
868 /// tx2.unbounded_send(Ok(4)).unwrap();
869 /// });
870 /// thread::spawn(move || {
871 /// tx3.unbounded_send(Ok(rx1)).unwrap();
872 /// tx3.unbounded_send(Ok(rx2)).unwrap();
873 /// tx3.unbounded_send(Err(5)).unwrap();
874 /// });
875 ///
876 /// let mut stream = rx3.try_flatten();
877 /// assert_eq!(stream.next().await, Some(Ok(1)));
878 /// assert_eq!(stream.next().await, Some(Ok(2)));
879 /// assert_eq!(stream.next().await, Some(Err(3)));
880 /// assert_eq!(stream.next().await, Some(Ok(4)));
881 /// assert_eq!(stream.next().await, Some(Err(5)));
882 /// assert_eq!(stream.next().await, None);
883 /// # });
884 /// ```
885 fn try_flatten(self) -> TryFlatten<Self>
886 where
887 Self::Ok: TryStream,
888 <Self::Ok as TryStream>::Error: From<Self::Error>,
889 Self: Sized,
890 {
891 assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
892 TryFlatten::new(self),
893 )
894 }
895
896 /// Attempt to execute an accumulating asynchronous computation over a
897 /// stream, collecting all the values into one final result.
898 ///
899 /// This combinator will accumulate all values returned by this stream
900 /// according to the closure provided. The initial state is also provided to
901 /// this method and then is returned again by each execution of the closure.
902 /// Once the entire stream has been exhausted the returned future will
903 /// resolve to this value.
904 ///
905 /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will
906 /// exit early if an error is encountered in either the stream or the
907 /// provided closure.
908 ///
909 /// # Examples
910 ///
911 /// ```
912 /// # futures::executor::block_on(async {
913 /// use futures::stream::{self, TryStreamExt};
914 ///
915 /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
916 /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) });
917 /// assert_eq!(sum.await, Ok(3));
918 ///
919 /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
920 /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) });
921 /// assert_eq!(sum.await, Err(2));
922 /// # })
923 /// ```
924 fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
925 where
926 F: FnMut(T, Self::Ok) -> Fut,
927 Fut: TryFuture<Ok = T, Error = Self::Error>,
928 Self: Sized,
929 {
930 assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
931 }
932
933 /// Attempt to concatenate all items of a stream into a single
934 /// extendable destination, returning a future representing the end result.
935 ///
936 /// This combinator will extend the first item with the contents of all
937 /// the subsequent successful results of the stream. If the stream is empty,
938 /// the default value will be returned.
939 ///
940 /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait.
941 ///
942 /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will
943 /// exit early if an error is encountered in the stream.
944 ///
945 /// # Examples
946 ///
947 /// ```
948 /// # futures::executor::block_on(async {
949 /// use futures::channel::mpsc;
950 /// use futures::stream::TryStreamExt;
951 /// use std::thread;
952 ///
953 /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>();
954 ///
955 /// thread::spawn(move || {
956 /// for i in (0..3).rev() {
957 /// let n = i * 3;
958 /// tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap();
959 /// }
960 /// });
961 ///
962 /// let result = rx.try_concat().await;
963 ///
964 /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
965 /// # });
966 /// ```
967 fn try_concat(self) -> TryConcat<Self>
968 where
969 Self: Sized,
970 Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
971 {
972 assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
973 }
974
975 /// Attempt to execute several futures from a stream concurrently (unordered).
976 ///
977 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
978 /// that matches the stream's `Error` type.
979 ///
980 /// This adaptor will buffer up to `n` futures and then return their
981 /// outputs in the order in which they complete. If the underlying stream
982 /// returns an error, it will be immediately propagated.
983 ///
984 /// The returned stream will be a stream of results, each containing either
985 /// an error or a future's output. An error can be produced either by the
986 /// underlying stream itself or by one of the futures it yielded.
987 ///
988 /// This method is only available when the `std` or `alloc` feature of this
989 /// library is activated, and it is activated by default.
990 ///
991 /// # Examples
992 ///
993 /// Results are returned in the order of completion:
994 /// ```
995 /// # futures::executor::block_on(async {
996 /// use futures::channel::oneshot;
997 /// use futures::stream::{self, StreamExt, TryStreamExt};
998 ///
999 /// let (send_one, recv_one) = oneshot::channel();
1000 /// let (send_two, recv_two) = oneshot::channel();
1001 ///
1002 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
1003 ///
1004 /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
1005 ///
1006 /// send_two.send(2i32)?;
1007 /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1008 ///
1009 /// send_one.send(1i32)?;
1010 /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1011 ///
1012 /// assert_eq!(buffered.next().await, None);
1013 /// # Ok::<(), i32>(()) }).unwrap();
1014 /// ```
1015 ///
1016 /// Errors from the underlying stream itself are propagated:
1017 /// ```
1018 /// # futures::executor::block_on(async {
1019 /// use futures::channel::mpsc;
1020 /// use futures::stream::{StreamExt, TryStreamExt};
1021 ///
1022 /// let (sink, stream_of_futures) = mpsc::unbounded();
1023 /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
1024 ///
1025 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
1026 /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
1027 ///
1028 /// sink.unbounded_send(Err("error in the stream"))?;
1029 /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
1030 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
1031 /// ```
1032 #[cfg(not(futures_no_atomic_cas))]
1033 #[cfg(feature = "alloc")]
1034 fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
1035 where
1036 Self::Ok: TryFuture<Error = Self::Error>,
1037 Self: Sized,
1038 {
1039 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
1040 TryBufferUnordered::new(self, n),
1041 )
1042 }
1043
1044 /// Attempt to execute several futures from a stream concurrently.
1045 ///
1046 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
1047 /// that matches the stream's `Error` type.
1048 ///
1049 /// This adaptor will buffer up to `n` futures and then return their
1050 /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
1051 /// be immediately propagated.
1052 ///
1053 /// The returned stream will be a stream of results, each containing either
1054 /// an error or a future's output. An error can be produced either by the
1055 /// underlying stream itself or by one of the futures it yielded.
1056 ///
1057 /// This method is only available when the `std` or `alloc` feature of this
1058 /// library is activated, and it is activated by default.
1059 ///
1060 /// # Examples
1061 ///
1062 /// Results are returned in the order of addition:
1063 /// ```
1064 /// # futures::executor::block_on(async {
1065 /// use futures::channel::oneshot;
1066 /// use futures::future::lazy;
1067 /// use futures::stream::{self, StreamExt, TryStreamExt};
1068 ///
1069 /// let (send_one, recv_one) = oneshot::channel();
1070 /// let (send_two, recv_two) = oneshot::channel();
1071 ///
1072 /// let mut buffered = lazy(move |cx| {
1073 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
1074 ///
1075 /// let mut buffered = stream_of_futures.try_buffered(10);
1076 ///
1077 /// assert!(buffered.try_poll_next_unpin(cx).is_pending());
1078 ///
1079 /// send_two.send(2i32)?;
1080 /// assert!(buffered.try_poll_next_unpin(cx).is_pending());
1081 /// Ok::<_, i32>(buffered)
1082 /// }).await?;
1083 ///
1084 /// send_one.send(1i32)?;
1085 /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1086 /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1087 ///
1088 /// assert_eq!(buffered.next().await, None);
1089 /// # Ok::<(), i32>(()) }).unwrap();
1090 /// ```
1091 ///
1092 /// Errors from the underlying stream itself are propagated:
1093 /// ```
1094 /// # futures::executor::block_on(async {
1095 /// use futures::channel::mpsc;
1096 /// use futures::stream::{StreamExt, TryStreamExt};
1097 ///
1098 /// let (sink, stream_of_futures) = mpsc::unbounded();
1099 /// let mut buffered = stream_of_futures.try_buffered(10);
1100 ///
1101 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
1102 /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
1103 ///
1104 /// sink.unbounded_send(Err("error in the stream"))?;
1105 /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
1106 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
1107 /// ```
1108 #[cfg(not(futures_no_atomic_cas))]
1109 #[cfg(feature = "alloc")]
1110 fn try_buffered(self, n: usize) -> TryBuffered<Self>
1111 where
1112 Self::Ok: TryFuture<Error = Self::Error>,
1113 Self: Sized,
1114 {
1115 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(
1116 self, n,
1117 ))
1118 }
1119
1120 // TODO: false positive warning from rustdoc. Verify once #43466 settles
1121 //
1122 /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
1123 /// stream types.
1124 fn try_poll_next_unpin(
1125 &mut self,
1126 cx: &mut Context<'_>,
1127 ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
1128 where
1129 Self: Unpin,
1130 {
1131 Pin::new(self).try_poll_next(cx)
1132 }
1133
1134 /// Wraps a [`TryStream`] into a stream compatible with libraries using
1135 /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
1136 /// ```
1137 /// # if cfg!(miri) { return; } // Miri does not support epoll
1138 /// use futures::future::{FutureExt, TryFutureExt};
1139 /// # let (tx, rx) = futures::channel::oneshot::channel();
1140 ///
1141 /// let future03 = async {
1142 /// println!("Running on the pool");
1143 /// tx.send(42).unwrap();
1144 /// };
1145 ///
1146 /// let future01 = future03
1147 /// .unit_error() // Make it a TryFuture
1148 /// .boxed() // Make it Unpin
1149 /// .compat();
1150 ///
1151 /// tokio::run(future01);
1152 /// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
1153 /// ```
1154 #[cfg(feature = "compat")]
1155 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
1156 fn compat(self) -> Compat<Self>
1157 where
1158 Self: Sized + Unpin,
1159 {
1160 Compat::new(self)
1161 }
1162
1163 /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
1164 ///
1165 /// This method is only available when the `std` feature of this
1166 /// library is activated, and it is activated by default.
1167 ///
1168 /// # Examples
1169 ///
1170 /// ```
1171 /// # futures::executor::block_on(async {
1172 /// use futures::stream::{self, TryStreamExt};
1173 /// use futures::io::AsyncReadExt;
1174 ///
1175 /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
1176 /// let mut reader = stream.into_async_read();
1177 ///
1178 /// let mut buf = Vec::new();
1179 /// reader.read_to_end(&mut buf).await.unwrap();
1180 /// assert_eq!(buf, [1, 2, 3, 4, 5]);
1181 /// # })
1182 /// ```
1183 #[cfg(feature = "io")]
1184 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
1185 #[cfg(feature = "std")]
1186 fn into_async_read(self) -> IntoAsyncRead<Self>
1187 where
1188 Self: Sized + TryStreamExt<Error = std::io::Error>,
1189 Self::Ok: AsRef<[u8]>,
1190 {
1191 crate::io::assert_read(IntoAsyncRead::new(self))
1192 }
1193
1194 /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items
1195 /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
1196 /// that does not satisfy the predicate.
1197 ///
1198 /// # Examples
1199 ///
1200 /// ```
1201 /// # futures::executor::block_on(async {
1202 /// use futures::stream::{self, StreamExt, TryStreamExt};
1203 /// use std::convert::Infallible;
1204 ///
1205 /// let number_stream = stream::iter(1..10).map(Ok::<_, Infallible>);
1206 /// let positive = number_stream.try_all(|i| async move { i > 0 });
1207 /// assert_eq!(positive.await, Ok(true));
1208 ///
1209 /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]);
1210 /// let positive = stream_with_errors.try_all(|i| async move { i > 0 });
1211 /// assert_eq!(positive.await, Err("err"));
1212 /// # });
1213 /// ```
1214 fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
1215 where
1216 Self: Sized,
1217 F: FnMut(Self::Ok) -> Fut,
1218 Fut: Future<Output = bool>,
1219 {
1220 assert_future::<Result<bool, Self::Error>, _>(TryAll::new(self, f))
1221 }
1222
1223 /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items
1224 /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
1225 /// that satisfies the predicate.
1226 ///
1227 /// # Examples
1228 ///
1229 /// ```
1230 /// # futures::executor::block_on(async {
1231 /// use futures::stream::{self, StreamExt, TryStreamExt};
1232 /// use std::convert::Infallible;
1233 ///
1234 /// let number_stream = stream::iter(0..10).map(Ok::<_, Infallible>);
1235 /// let contain_three = number_stream.try_any(|i| async move { i == 3 });
1236 /// assert_eq!(contain_three.await, Ok(true));
1237 ///
1238 /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]);
1239 /// let contain_three = stream_with_errors.try_any(|i| async move { i == 3 });
1240 /// assert_eq!(contain_three.await, Err("err"));
1241 /// # });
1242 /// ```
1243 fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F>
1244 where
1245 Self: Sized,
1246 F: FnMut(Self::Ok) -> Fut,
1247 Fut: Future<Output = bool>,
1248 {
1249 assert_future::<Result<bool, Self::Error>, _>(TryAny::new(self, f))
1250 }
1251}
1252