1//! Streams
2//!
3//! This module contains a number of functions for working with `Streams`s
4//! that return `Result`s, allowing for short-circuiting computations.
5
6#[cfg(feature = "compat")]
7use crate::compat::Compat;
8use crate::fns::{
9 inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn,
10 IntoFn, MapErrFn, MapOkFn,
11};
12use crate::future::assert_future;
13use crate::stream::assert_stream;
14use crate::stream::{Inspect, Map};
15#[cfg(feature = "alloc")]
16use alloc::vec::Vec;
17use core::pin::Pin;
18
19use futures_core::{
20 future::{Future, TryFuture},
21 stream::TryStream,
22 task::{Context, Poll},
23};
24
25mod and_then;
26#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
27pub use self::and_then::AndThen;
28
29delegate_all!(
30 /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
31 ErrInto<St, E>(
32 MapErr<St, IntoFn<E>>
33 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
34);
35
36delegate_all!(
37 /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
38 InspectOk<St, F>(
39 Inspect<IntoStream<St>, InspectOkFn<F>>
40 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
41);
42
43delegate_all!(
44 /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
45 InspectErr<St, F>(
46 Inspect<IntoStream<St>, InspectErrFn<F>>
47 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
48);
49
50mod into_stream;
51#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
52pub use self::into_stream::IntoStream;
53
54delegate_all!(
55 /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
56 MapOk<St, F>(
57 Map<IntoStream<St>, MapOkFn<F>>
58 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
59);
60
61delegate_all!(
62 /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
63 MapErr<St, F>(
64 Map<IntoStream<St>, MapErrFn<F>>
65 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
66);
67
68mod or_else;
69#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
70pub use self::or_else::OrElse;
71
72mod try_next;
73#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
74pub use self::try_next::TryNext;
75
76mod try_for_each;
77#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
78pub use self::try_for_each::TryForEach;
79
80mod try_filter;
81#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
82pub use self::try_filter::TryFilter;
83
84mod try_filter_map;
85#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
86pub use self::try_filter_map::TryFilterMap;
87
88mod try_flatten;
89#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
90pub use self::try_flatten::TryFlatten;
91
92#[cfg(not(futures_no_atomic_cas))]
93#[cfg(feature = "alloc")]
94mod try_flatten_unordered;
95#[cfg(not(futures_no_atomic_cas))]
96#[cfg(feature = "alloc")]
97#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
98pub use self::try_flatten_unordered::TryFlattenUnordered;
99
100mod try_collect;
101#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
102pub use self::try_collect::TryCollect;
103
104mod try_concat;
105#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
106pub use self::try_concat::TryConcat;
107
108#[cfg(feature = "alloc")]
109mod try_chunks;
110#[cfg(feature = "alloc")]
111#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
112pub use self::try_chunks::{TryChunks, TryChunksError};
113
114mod try_fold;
115#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
116pub use self::try_fold::TryFold;
117
118mod try_unfold;
119#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
120pub use self::try_unfold::{try_unfold, TryUnfold};
121
122mod try_skip_while;
123#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
124pub use self::try_skip_while::TrySkipWhile;
125
126mod try_take_while;
127#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
128pub use self::try_take_while::TryTakeWhile;
129
130#[cfg(not(futures_no_atomic_cas))]
131#[cfg(feature = "alloc")]
132mod try_buffer_unordered;
133#[cfg(not(futures_no_atomic_cas))]
134#[cfg(feature = "alloc")]
135#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
136pub use self::try_buffer_unordered::TryBufferUnordered;
137
138#[cfg(not(futures_no_atomic_cas))]
139#[cfg(feature = "alloc")]
140mod try_buffered;
141#[cfg(not(futures_no_atomic_cas))]
142#[cfg(feature = "alloc")]
143#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
144pub use self::try_buffered::TryBuffered;
145
146#[cfg(not(futures_no_atomic_cas))]
147#[cfg(feature = "alloc")]
148mod try_for_each_concurrent;
149#[cfg(not(futures_no_atomic_cas))]
150#[cfg(feature = "alloc")]
151#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
152pub use self::try_for_each_concurrent::TryForEachConcurrent;
153
154#[cfg(feature = "io")]
155#[cfg(feature = "std")]
156mod into_async_read;
157#[cfg(feature = "io")]
158#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
159#[cfg(feature = "std")]
160#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
161pub use self::into_async_read::IntoAsyncRead;
162
163impl<S: ?Sized + TryStream> TryStreamExt for S {}
164
165/// Adapters specific to `Result`-returning streams
166pub trait TryStreamExt: TryStream {
167 /// Wraps the current stream in a new stream which converts the error type
168 /// into the one provided.
169 ///
170 /// # Examples
171 ///
172 /// ```
173 /// # futures::executor::block_on(async {
174 /// use futures::stream::{self, TryStreamExt};
175 ///
176 /// let mut stream =
177 /// stream::iter(vec![Ok(()), Err(5i32)])
178 /// .err_into::<i64>();
179 ///
180 /// assert_eq!(stream.try_next().await, Ok(Some(())));
181 /// assert_eq!(stream.try_next().await, Err(5i64));
182 /// # })
183 /// ```
184 fn err_into<E>(self) -> ErrInto<Self, E>
185 where
186 Self: Sized,
187 Self::Error: Into<E>,
188 {
189 assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
190 }
191
192 /// Wraps the current stream in a new stream which maps the success value
193 /// using the provided closure.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// # futures::executor::block_on(async {
199 /// use futures::stream::{self, TryStreamExt};
200 ///
201 /// let mut stream =
202 /// stream::iter(vec![Ok(5), Err(0)])
203 /// .map_ok(|x| x + 2);
204 ///
205 /// assert_eq!(stream.try_next().await, Ok(Some(7)));
206 /// assert_eq!(stream.try_next().await, Err(0));
207 /// # })
208 /// ```
209 fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
210 where
211 Self: Sized,
212 F: FnMut(Self::Ok) -> T,
213 {
214 assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
215 }
216
217 /// Wraps the current stream in a new stream which maps the error value
218 /// using the provided closure.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// # futures::executor::block_on(async {
224 /// use futures::stream::{self, TryStreamExt};
225 ///
226 /// let mut stream =
227 /// stream::iter(vec![Ok(5), Err(0)])
228 /// .map_err(|x| x + 2);
229 ///
230 /// assert_eq!(stream.try_next().await, Ok(Some(5)));
231 /// assert_eq!(stream.try_next().await, Err(2));
232 /// # })
233 /// ```
234 fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
235 where
236 Self: Sized,
237 F: FnMut(Self::Error) -> E,
238 {
239 assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
240 }
241
242 /// Chain on a computation for when a value is ready, passing the successful
243 /// results to the provided closure `f`.
244 ///
245 /// This function can be used to run a unit of work when the next successful
246 /// value on a stream is ready. The closure provided will be yielded a value
247 /// when ready, and the returned future will then be run to completion to
248 /// produce the next value on this stream.
249 ///
250 /// Any errors produced by this stream will not be passed to the closure,
251 /// and will be passed through.
252 ///
253 /// The returned value of the closure must implement the `TryFuture` trait
254 /// and can represent some more work to be done before the composed stream
255 /// is finished.
256 ///
257 /// Note that this function consumes the receiving stream and returns a
258 /// wrapped version of it.
259 ///
260 /// To process the entire stream and return a single future representing
261 /// success or error, use `try_for_each` instead.
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use futures::channel::mpsc;
267 /// use futures::future;
268 /// use futures::stream::TryStreamExt;
269 ///
270 /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1);
271 ///
272 /// let rx = rx.and_then(|result| {
273 /// future::ok(if result % 2 == 0 {
274 /// Some(result)
275 /// } else {
276 /// None
277 /// })
278 /// });
279 /// ```
280 fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
281 where
282 F: FnMut(Self::Ok) -> Fut,
283 Fut: TryFuture<Error = Self::Error>,
284 Self: Sized,
285 {
286 assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
287 }
288
289 /// Chain on a computation for when an error happens, passing the
290 /// erroneous result to the provided closure `f`.
291 ///
292 /// This function can be used to run a unit of work and attempt to recover from
293 /// an error if one happens. The closure provided will be yielded an error
294 /// when one appears, and the returned future will then be run to completion
295 /// to produce the next value on this stream.
296 ///
297 /// Any successful values produced by this stream will not be passed to the
298 /// closure, and will be passed through.
299 ///
300 /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
301 /// and can represent some more work to be done before the composed stream
302 /// is finished.
303 ///
304 /// Note that this function consumes the receiving stream and returns a
305 /// wrapped version of it.
306 fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
307 where
308 F: FnMut(Self::Error) -> Fut,
309 Fut: TryFuture<Ok = Self::Ok>,
310 Self: Sized,
311 {
312 assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
313 }
314
315 /// Do something with the success value of this stream, afterwards passing
316 /// it on.
317 ///
318 /// This is similar to the `StreamExt::inspect` method where it allows
319 /// easily inspecting the success value as it passes through the stream, for
320 /// example to debug what's going on.
321 fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
322 where
323 F: FnMut(&Self::Ok),
324 Self: Sized,
325 {
326 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
327 }
328
329 /// Do something with the error value of this stream, afterwards passing it on.
330 ///
331 /// This is similar to the `StreamExt::inspect` method where it allows
332 /// easily inspecting the error value as it passes through the stream, for
333 /// example to debug what's going on.
334 fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
335 where
336 F: FnMut(&Self::Error),
337 Self: Sized,
338 {
339 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
340 }
341
342 /// Wraps a [`TryStream`] into a type that implements
343 /// [`Stream`](futures_core::stream::Stream)
344 ///
345 /// [`TryStream`]s currently do not implement the
346 /// [`Stream`](futures_core::stream::Stream) trait because of limitations
347 /// of the compiler.
348 ///
349 /// # Examples
350 ///
351 /// ```
352 /// use futures::stream::{Stream, TryStream, TryStreamExt};
353 ///
354 /// # type T = i32;
355 /// # type E = ();
356 /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... }
357 /// # futures::stream::empty()
358 /// # }
359 /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ }
360 ///
361 /// take_stream(make_try_stream().into_stream());
362 /// ```
363 fn into_stream(self) -> IntoStream<Self>
364 where
365 Self: Sized,
366 {
367 assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
368 }
369
370 /// Creates a future that attempts to resolve the next item in the stream.
371 /// If an error is encountered before the next item, the error is returned
372 /// instead.
373 ///
374 /// This is similar to the `Stream::next` combinator, but returns a
375 /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
376 /// for easy use with the `?` operator.
377 ///
378 /// # Examples
379 ///
380 /// ```
381 /// # futures::executor::block_on(async {
382 /// use futures::stream::{self, TryStreamExt};
383 ///
384 /// let mut stream = stream::iter(vec![Ok(()), Err(())]);
385 ///
386 /// assert_eq!(stream.try_next().await, Ok(Some(())));
387 /// assert_eq!(stream.try_next().await, Err(()));
388 /// # })
389 /// ```
390 fn try_next(&mut self) -> TryNext<'_, Self>
391 where
392 Self: Unpin,
393 {
394 assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
395 }
396
397 /// Attempts to run this stream to completion, executing the provided
398 /// asynchronous closure for each element on the stream.
399 ///
400 /// The provided closure will be called for each item this stream produces,
401 /// yielding a future. That future will then be executed to completion
402 /// before moving on to the next item.
403 ///
404 /// The returned value is a [`Future`](futures_core::future::Future) where the
405 /// [`Output`](futures_core::future::Future::Output) type is
406 /// `Result<(), Self::Error>`. If any of the intermediate
407 /// futures or the stream returns an error, this future will return
408 /// immediately with an error.
409 ///
410 /// # Examples
411 ///
412 /// ```
413 /// # futures::executor::block_on(async {
414 /// use futures::future;
415 /// use futures::stream::{self, TryStreamExt};
416 ///
417 /// let mut x = 0i32;
418 ///
419 /// {
420 /// let fut = stream::repeat(Ok(1)).try_for_each(|item| {
421 /// x += item;
422 /// future::ready(if x == 3 { Err(()) } else { Ok(()) })
423 /// });
424 /// assert_eq!(fut.await, Err(()));
425 /// }
426 ///
427 /// assert_eq!(x, 3);
428 /// # })
429 /// ```
430 fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
431 where
432 F: FnMut(Self::Ok) -> Fut,
433 Fut: TryFuture<Ok = (), Error = Self::Error>,
434 Self: Sized,
435 {
436 assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
437 }
438
439 /// Skip elements on this stream while the provided asynchronous predicate
440 /// resolves to `true`.
441 ///
442 /// This function is similar to
443 /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
444 /// early if an error occurs.
445 ///
446 /// # Examples
447 ///
448 /// ```
449 /// # futures::executor::block_on(async {
450 /// use futures::future;
451 /// use futures::stream::{self, TryStreamExt};
452 ///
453 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]);
454 /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3)));
455 ///
456 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
457 /// assert_eq!(output, Ok(vec![3, 2]));
458 /// # })
459 /// ```
460 fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
461 where
462 F: FnMut(&Self::Ok) -> Fut,
463 Fut: TryFuture<Ok = bool, Error = Self::Error>,
464 Self: Sized,
465 {
466 assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
467 }
468
469 /// Take elements on this stream while the provided asynchronous predicate
470 /// resolves to `true`.
471 ///
472 /// This function is similar to
473 /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
474 /// early if an error occurs.
475 ///
476 /// # Examples
477 ///
478 /// ```
479 /// # futures::executor::block_on(async {
480 /// use futures::future;
481 /// use futures::stream::{self, TryStreamExt};
482 ///
483 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
484 /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
485 ///
486 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
487 /// assert_eq!(output, Ok(vec![1, 2]));
488 /// # })
489 /// ```
490 fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
491 where
492 F: FnMut(&Self::Ok) -> Fut,
493 Fut: TryFuture<Ok = bool, Error = Self::Error>,
494 Self: Sized,
495 {
496 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
497 }
498
499 /// Attempts to run this stream to completion, executing the provided asynchronous
500 /// closure for each element on the stream concurrently as elements become
501 /// available, exiting as soon as an error occurs.
502 ///
503 /// This is similar to
504 /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
505 /// but will resolve to an error immediately if the underlying stream or the provided
506 /// closure return an error.
507 ///
508 /// This method is only available when the `std` or `alloc` feature of this
509 /// library is activated, and it is activated by default.
510 ///
511 /// # Examples
512 ///
513 /// ```
514 /// # futures::executor::block_on(async {
515 /// use futures::channel::oneshot;
516 /// use futures::stream::{self, StreamExt, TryStreamExt};
517 ///
518 /// let (tx1, rx1) = oneshot::channel();
519 /// let (tx2, rx2) = oneshot::channel();
520 /// let (_tx3, rx3) = oneshot::channel();
521 ///
522 /// let stream = stream::iter(vec![rx1, rx2, rx3]);
523 /// let fut = stream.map(Ok).try_for_each_concurrent(
524 /// /* limit */ 2,
525 /// |rx| async move {
526 /// let res: Result<(), oneshot::Canceled> = rx.await;
527 /// res
528 /// }
529 /// );
530 ///
531 /// tx1.send(()).unwrap();
532 /// // Drop the second sender so that `rx2` resolves to `Canceled`.
533 /// drop(tx2);
534 ///
535 /// // The final result is an error because the second future
536 /// // resulted in an error.
537 /// assert_eq!(Err(oneshot::Canceled), fut.await);
538 /// # })
539 /// ```
540 #[cfg(not(futures_no_atomic_cas))]
541 #[cfg(feature = "alloc")]
542 fn try_for_each_concurrent<Fut, F>(
543 self,
544 limit: impl Into<Option<usize>>,
545 f: F,
546 ) -> TryForEachConcurrent<Self, Fut, F>
547 where
548 F: FnMut(Self::Ok) -> Fut,
549 Fut: Future<Output = Result<(), Self::Error>>,
550 Self: Sized,
551 {
552 assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
553 self,
554 limit.into(),
555 f,
556 ))
557 }
558
559 /// Attempt to transform a stream into a collection,
560 /// returning a future representing the result of that computation.
561 ///
562 /// This combinator will collect all successful results of this stream and
563 /// collect them into the specified collection type. If an error happens then all
564 /// collected elements will be dropped and the error will be returned.
565 ///
566 /// The returned future will be resolved when the stream terminates.
567 ///
568 /// # Examples
569 ///
570 /// ```
571 /// # futures::executor::block_on(async {
572 /// use futures::channel::mpsc;
573 /// use futures::stream::TryStreamExt;
574 /// use std::thread;
575 ///
576 /// let (tx, rx) = mpsc::unbounded();
577 ///
578 /// thread::spawn(move || {
579 /// for i in 1..=5 {
580 /// tx.unbounded_send(Ok(i)).unwrap();
581 /// }
582 /// tx.unbounded_send(Err(6)).unwrap();
583 /// });
584 ///
585 /// let output: Result<Vec<i32>, i32> = rx.try_collect().await;
586 /// assert_eq!(output, Err(6));
587 /// # })
588 /// ```
589 fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
590 where
591 Self: Sized,
592 {
593 assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
594 }
595
596 /// An adaptor for chunking up successful items of the stream inside a vector.
597 ///
598 /// This combinator will attempt to pull successful items from this stream and buffer
599 /// them into a local vector. At most `capacity` items will get buffered
600 /// before they're yielded from the returned stream.
601 ///
602 /// Note that the vectors returned from this iterator may not always have
603 /// `capacity` elements. If the underlying stream ended and only a partial
604 /// vector was created, it'll be returned. Additionally if an error happens
605 /// from the underlying stream then the currently buffered items will be
606 /// yielded.
607 ///
608 /// This method is only available when the `std` or `alloc` feature of this
609 /// library is activated, and it is activated by default.
610 ///
611 /// This function is similar to
612 /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
613 /// early if an error occurs.
614 ///
615 /// # Examples
616 ///
617 /// ```
618 /// # futures::executor::block_on(async {
619 /// use futures::stream::{self, TryChunksError, TryStreamExt};
620 ///
621 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
622 /// let mut stream = stream.try_chunks(2);
623 ///
624 /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
625 /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
626 /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
627 /// # })
628 /// ```
629 ///
630 /// # Panics
631 ///
632 /// This method will panic if `capacity` is zero.
633 #[cfg(feature = "alloc")]
634 fn try_chunks(self, capacity: usize) -> TryChunks<Self>
635 where
636 Self: Sized,
637 {
638 assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
639 TryChunks::new(self, capacity),
640 )
641 }
642
643 /// Attempt to filter the values produced by this stream according to the
644 /// provided asynchronous closure.
645 ///
646 /// As values of this stream are made available, the provided predicate `f`
647 /// will be run on them. If the predicate returns a `Future` which resolves
648 /// to `true`, then the stream will yield the value, but if the predicate
649 /// return a `Future` which resolves to `false`, then the value will be
650 /// discarded and the next value will be produced.
651 ///
652 /// All errors are passed through without filtering in this combinator.
653 ///
654 /// Note that this function consumes the stream passed into it and returns a
655 /// wrapped version of it, similar to the existing `filter` methods in
656 /// the standard library.
657 ///
658 /// # Examples
659 /// ```
660 /// # futures::executor::block_on(async {
661 /// use futures::future;
662 /// use futures::stream::{self, StreamExt, TryStreamExt};
663 ///
664 /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]);
665 /// let mut evens = stream.try_filter(|x| {
666 /// future::ready(x % 2 == 0)
667 /// });
668 ///
669 /// assert_eq!(evens.next().await, Some(Ok(2)));
670 /// assert_eq!(evens.next().await, Some(Err("error")));
671 /// # })
672 /// ```
673 fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
674 where
675 Fut: Future<Output = bool>,
676 F: FnMut(&Self::Ok) -> Fut,
677 Self: Sized,
678 {
679 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
680 }
681
682 /// Attempt to filter the values produced by this stream while
683 /// simultaneously mapping them to a different type according to the
684 /// provided asynchronous closure.
685 ///
686 /// As values of this stream are made available, the provided function will
687 /// be run on them. If the future returned by the predicate `f` resolves to
688 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
689 /// it resolves to [`None`] then the next value will be produced.
690 ///
691 /// All errors are passed through without filtering in this combinator.
692 ///
693 /// Note that this function consumes the stream passed into it and returns a
694 /// wrapped version of it, similar to the existing `filter_map` methods in
695 /// the standard library.
696 ///
697 /// # Examples
698 /// ```
699 /// # futures::executor::block_on(async {
700 /// use futures::stream::{self, StreamExt, TryStreamExt};
701 /// use futures::pin_mut;
702 ///
703 /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]);
704 /// let halves = stream.try_filter_map(|x| async move {
705 /// let ret = if x % 2 == 0 { Some(x / 2) } else { None };
706 /// Ok(ret)
707 /// });
708 ///
709 /// pin_mut!(halves);
710 /// assert_eq!(halves.next().await, Some(Ok(3)));
711 /// assert_eq!(halves.next().await, Some(Err("error")));
712 /// # })
713 /// ```
714 fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
715 where
716 Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
717 F: FnMut(Self::Ok) -> Fut,
718 Self: Sized,
719 {
720 assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
721 }
722
723 /// Flattens a stream of streams into just one continuous stream. Produced streams
724 /// will be polled concurrently and any errors will be passed through without looking at them.
725 /// If the underlying base stream returns an error, it will be **immediately** propagated.
726 ///
727 /// The only argument is an optional limit on the number of concurrently
728 /// polled streams. If this limit is not `None`, no more than `limit` streams
729 /// will be polled at the same time. The `limit` argument is of type
730 /// `Into<Option<usize>>`, and so can be provided as either `None`,
731 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
732 /// no limit at all, and will have the same result as passing in `None`.
733 ///
734 /// # Examples
735 ///
736 /// ```
737 /// # futures::executor::block_on(async {
738 /// use futures::channel::mpsc;
739 /// use futures::stream::{StreamExt, TryStreamExt};
740 /// use std::thread;
741 ///
742 /// let (tx1, rx1) = mpsc::unbounded();
743 /// let (tx2, rx2) = mpsc::unbounded();
744 /// let (tx3, rx3) = mpsc::unbounded();
745 ///
746 /// thread::spawn(move || {
747 /// tx1.unbounded_send(Ok(1)).unwrap();
748 /// });
749 /// thread::spawn(move || {
750 /// tx2.unbounded_send(Ok(2)).unwrap();
751 /// tx2.unbounded_send(Err(3)).unwrap();
752 /// tx2.unbounded_send(Ok(4)).unwrap();
753 /// });
754 /// thread::spawn(move || {
755 /// tx3.unbounded_send(Ok(rx1)).unwrap();
756 /// tx3.unbounded_send(Ok(rx2)).unwrap();
757 /// tx3.unbounded_send(Err(5)).unwrap();
758 /// });
759 ///
760 /// let stream = rx3.try_flatten_unordered(None);
761 /// let mut values: Vec<_> = stream.collect().await;
762 /// values.sort();
763 ///
764 /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]);
765 /// # });
766 /// ```
767 #[cfg(not(futures_no_atomic_cas))]
768 #[cfg(feature = "alloc")]
769 fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
770 where
771 Self::Ok: TryStream + Unpin,
772 <Self::Ok as TryStream>::Error: From<Self::Error>,
773 Self: Sized,
774 {
775 assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
776 TryFlattenUnordered::new(self, limit),
777 )
778 }
779
780 /// Flattens a stream of streams into just one continuous stream.
781 ///
782 /// If this stream's elements are themselves streams then this combinator
783 /// will flatten out the entire stream to one long chain of elements. Any
784 /// errors are passed through without looking at them, but otherwise each
785 /// individual stream will get exhausted before moving on to the next.
786 ///
787 /// # Examples
788 ///
789 /// ```
790 /// # futures::executor::block_on(async {
791 /// use futures::channel::mpsc;
792 /// use futures::stream::{StreamExt, TryStreamExt};
793 /// use std::thread;
794 ///
795 /// let (tx1, rx1) = mpsc::unbounded();
796 /// let (tx2, rx2) = mpsc::unbounded();
797 /// let (tx3, rx3) = mpsc::unbounded();
798 ///
799 /// thread::spawn(move || {
800 /// tx1.unbounded_send(Ok(1)).unwrap();
801 /// });
802 /// thread::spawn(move || {
803 /// tx2.unbounded_send(Ok(2)).unwrap();
804 /// tx2.unbounded_send(Err(3)).unwrap();
805 /// tx2.unbounded_send(Ok(4)).unwrap();
806 /// });
807 /// thread::spawn(move || {
808 /// tx3.unbounded_send(Ok(rx1)).unwrap();
809 /// tx3.unbounded_send(Ok(rx2)).unwrap();
810 /// tx3.unbounded_send(Err(5)).unwrap();
811 /// });
812 ///
813 /// let mut stream = rx3.try_flatten();
814 /// assert_eq!(stream.next().await, Some(Ok(1)));
815 /// assert_eq!(stream.next().await, Some(Ok(2)));
816 /// assert_eq!(stream.next().await, Some(Err(3)));
817 /// assert_eq!(stream.next().await, Some(Ok(4)));
818 /// assert_eq!(stream.next().await, Some(Err(5)));
819 /// assert_eq!(stream.next().await, None);
820 /// # });
821 /// ```
822 fn try_flatten(self) -> TryFlatten<Self>
823 where
824 Self::Ok: TryStream,
825 <Self::Ok as TryStream>::Error: From<Self::Error>,
826 Self: Sized,
827 {
828 assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
829 TryFlatten::new(self),
830 )
831 }
832
833 /// Attempt to execute an accumulating asynchronous computation over a
834 /// stream, collecting all the values into one final result.
835 ///
836 /// This combinator will accumulate all values returned by this stream
837 /// according to the closure provided. The initial state is also provided to
838 /// this method and then is returned again by each execution of the closure.
839 /// Once the entire stream has been exhausted the returned future will
840 /// resolve to this value.
841 ///
842 /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will
843 /// exit early if an error is encountered in either the stream or the
844 /// provided closure.
845 ///
846 /// # Examples
847 ///
848 /// ```
849 /// # futures::executor::block_on(async {
850 /// use futures::stream::{self, TryStreamExt};
851 ///
852 /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
853 /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) });
854 /// assert_eq!(sum.await, Ok(3));
855 ///
856 /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
857 /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) });
858 /// assert_eq!(sum.await, Err(2));
859 /// # })
860 /// ```
861 fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
862 where
863 F: FnMut(T, Self::Ok) -> Fut,
864 Fut: TryFuture<Ok = T, Error = Self::Error>,
865 Self: Sized,
866 {
867 assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
868 }
869
870 /// Attempt to concatenate all items of a stream into a single
871 /// extendable destination, returning a future representing the end result.
872 ///
873 /// This combinator will extend the first item with the contents of all
874 /// the subsequent successful results of the stream. If the stream is empty,
875 /// the default value will be returned.
876 ///
877 /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait.
878 ///
879 /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will
880 /// exit early if an error is encountered in the stream.
881 ///
882 /// # Examples
883 ///
884 /// ```
885 /// # futures::executor::block_on(async {
886 /// use futures::channel::mpsc;
887 /// use futures::stream::TryStreamExt;
888 /// use std::thread;
889 ///
890 /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>();
891 ///
892 /// thread::spawn(move || {
893 /// for i in (0..3).rev() {
894 /// let n = i * 3;
895 /// tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap();
896 /// }
897 /// });
898 ///
899 /// let result = rx.try_concat().await;
900 ///
901 /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
902 /// # });
903 /// ```
904 fn try_concat(self) -> TryConcat<Self>
905 where
906 Self: Sized,
907 Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
908 {
909 assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
910 }
911
912 /// Attempt to execute several futures from a stream concurrently (unordered).
913 ///
914 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
915 /// that matches the stream's `Error` type.
916 ///
917 /// This adaptor will buffer up to `n` futures and then return their
918 /// outputs in the order in which they complete. If the underlying stream
919 /// returns an error, it will be immediately propagated.
920 ///
921 /// The returned stream will be a stream of results, each containing either
922 /// an error or a future's output. An error can be produced either by the
923 /// underlying stream itself or by one of the futures it yielded.
924 ///
925 /// This method is only available when the `std` or `alloc` feature of this
926 /// library is activated, and it is activated by default.
927 ///
928 /// # Examples
929 ///
930 /// Results are returned in the order of completion:
931 /// ```
932 /// # futures::executor::block_on(async {
933 /// use futures::channel::oneshot;
934 /// use futures::stream::{self, StreamExt, TryStreamExt};
935 ///
936 /// let (send_one, recv_one) = oneshot::channel();
937 /// let (send_two, recv_two) = oneshot::channel();
938 ///
939 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
940 ///
941 /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
942 ///
943 /// send_two.send(2i32)?;
944 /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
945 ///
946 /// send_one.send(1i32)?;
947 /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
948 ///
949 /// assert_eq!(buffered.next().await, None);
950 /// # Ok::<(), i32>(()) }).unwrap();
951 /// ```
952 ///
953 /// Errors from the underlying stream itself are propagated:
954 /// ```
955 /// # futures::executor::block_on(async {
956 /// use futures::channel::mpsc;
957 /// use futures::stream::{StreamExt, TryStreamExt};
958 ///
959 /// let (sink, stream_of_futures) = mpsc::unbounded();
960 /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
961 ///
962 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
963 /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
964 ///
965 /// sink.unbounded_send(Err("error in the stream"))?;
966 /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
967 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
968 /// ```
969 #[cfg(not(futures_no_atomic_cas))]
970 #[cfg(feature = "alloc")]
971 fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
972 where
973 Self::Ok: TryFuture<Error = Self::Error>,
974 Self: Sized,
975 {
976 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
977 TryBufferUnordered::new(self, n),
978 )
979 }
980
981 /// Attempt to execute several futures from a stream concurrently.
982 ///
983 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
984 /// that matches the stream's `Error` type.
985 ///
986 /// This adaptor will buffer up to `n` futures and then return their
987 /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
988 /// be immediately propagated.
989 ///
990 /// The returned stream will be a stream of results, each containing either
991 /// an error or a future's output. An error can be produced either by the
992 /// underlying stream itself or by one of the futures it yielded.
993 ///
994 /// This method is only available when the `std` or `alloc` feature of this
995 /// library is activated, and it is activated by default.
996 ///
997 /// # Examples
998 ///
999 /// Results are returned in the order of addition:
1000 /// ```
1001 /// # futures::executor::block_on(async {
1002 /// use futures::channel::oneshot;
1003 /// use futures::future::lazy;
1004 /// use futures::stream::{self, StreamExt, TryStreamExt};
1005 ///
1006 /// let (send_one, recv_one) = oneshot::channel();
1007 /// let (send_two, recv_two) = oneshot::channel();
1008 ///
1009 /// let mut buffered = lazy(move |cx| {
1010 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
1011 ///
1012 /// let mut buffered = stream_of_futures.try_buffered(10);
1013 ///
1014 /// assert!(buffered.try_poll_next_unpin(cx).is_pending());
1015 ///
1016 /// send_two.send(2i32)?;
1017 /// assert!(buffered.try_poll_next_unpin(cx).is_pending());
1018 /// Ok::<_, i32>(buffered)
1019 /// }).await?;
1020 ///
1021 /// send_one.send(1i32)?;
1022 /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1023 /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1024 ///
1025 /// assert_eq!(buffered.next().await, None);
1026 /// # Ok::<(), i32>(()) }).unwrap();
1027 /// ```
1028 ///
1029 /// Errors from the underlying stream itself are propagated:
1030 /// ```
1031 /// # futures::executor::block_on(async {
1032 /// use futures::channel::mpsc;
1033 /// use futures::stream::{StreamExt, TryStreamExt};
1034 ///
1035 /// let (sink, stream_of_futures) = mpsc::unbounded();
1036 /// let mut buffered = stream_of_futures.try_buffered(10);
1037 ///
1038 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
1039 /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
1040 ///
1041 /// sink.unbounded_send(Err("error in the stream"))?;
1042 /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
1043 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
1044 /// ```
1045 #[cfg(not(futures_no_atomic_cas))]
1046 #[cfg(feature = "alloc")]
1047 fn try_buffered(self, n: usize) -> TryBuffered<Self>
1048 where
1049 Self::Ok: TryFuture<Error = Self::Error>,
1050 Self: Sized,
1051 {
1052 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(
1053 self, n,
1054 ))
1055 }
1056
1057 // TODO: false positive warning from rustdoc. Verify once #43466 settles
1058 //
1059 /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
1060 /// stream types.
1061 fn try_poll_next_unpin(
1062 &mut self,
1063 cx: &mut Context<'_>,
1064 ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
1065 where
1066 Self: Unpin,
1067 {
1068 Pin::new(self).try_poll_next(cx)
1069 }
1070
1071 /// Wraps a [`TryStream`] into a stream compatible with libraries using
1072 /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
1073 /// ```
1074 /// # if cfg!(miri) { return; } // Miri does not support epoll
1075 /// use futures::future::{FutureExt, TryFutureExt};
1076 /// # let (tx, rx) = futures::channel::oneshot::channel();
1077 ///
1078 /// let future03 = async {
1079 /// println!("Running on the pool");
1080 /// tx.send(42).unwrap();
1081 /// };
1082 ///
1083 /// let future01 = future03
1084 /// .unit_error() // Make it a TryFuture
1085 /// .boxed() // Make it Unpin
1086 /// .compat();
1087 ///
1088 /// tokio::run(future01);
1089 /// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
1090 /// ```
1091 #[cfg(feature = "compat")]
1092 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
1093 fn compat(self) -> Compat<Self>
1094 where
1095 Self: Sized + Unpin,
1096 {
1097 Compat::new(self)
1098 }
1099
1100 /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
1101 ///
1102 /// This method is only available when the `std` feature of this
1103 /// library is activated, and it is activated by default.
1104 ///
1105 /// # Examples
1106 ///
1107 /// ```
1108 /// # futures::executor::block_on(async {
1109 /// use futures::stream::{self, TryStreamExt};
1110 /// use futures::io::AsyncReadExt;
1111 ///
1112 /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
1113 /// let mut reader = stream.into_async_read();
1114 ///
1115 /// let mut buf = Vec::new();
1116 /// reader.read_to_end(&mut buf).await.unwrap();
1117 /// assert_eq!(buf, [1, 2, 3, 4, 5]);
1118 /// # })
1119 /// ```
1120 #[cfg(feature = "io")]
1121 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
1122 #[cfg(feature = "std")]
1123 fn into_async_read(self) -> IntoAsyncRead<Self>
1124 where
1125 Self: Sized + TryStreamExt<Error = std::io::Error>,
1126 Self::Ok: AsRef<[u8]>,
1127 {
1128 crate::io::assert_read(IntoAsyncRead::new(self))
1129 }
1130}
1131