1 | //! Streams |
2 | //! |
3 | //! This module contains a number of functions for working with `Streams`s |
4 | //! that return `Result`s, allowing for short-circuiting computations. |
5 | |
6 | #[cfg (feature = "compat" )] |
7 | use crate::compat::Compat; |
8 | use crate::fns::{ |
9 | inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn, |
10 | IntoFn, MapErrFn, MapOkFn, |
11 | }; |
12 | use crate::future::assert_future; |
13 | use crate::stream::assert_stream; |
14 | use crate::stream::{Inspect, Map}; |
15 | #[cfg (feature = "alloc" )] |
16 | use alloc::vec::Vec; |
17 | use core::pin::Pin; |
18 | |
19 | use futures_core::{ |
20 | future::{Future, TryFuture}, |
21 | stream::TryStream, |
22 | task::{Context, Poll}, |
23 | }; |
24 | |
25 | mod and_then; |
26 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
27 | pub use self::and_then::AndThen; |
28 | |
29 | delegate_all!( |
30 | /// Stream for the [`err_into`](super::TryStreamExt::err_into) method. |
31 | ErrInto<St, E>( |
32 | MapErr<St, IntoFn<E>> |
33 | ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())] |
34 | ); |
35 | |
36 | delegate_all!( |
37 | /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method. |
38 | InspectOk<St, F>( |
39 | Inspect<IntoStream<St>, InspectOkFn<F>> |
40 | ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))] |
41 | ); |
42 | |
43 | delegate_all!( |
44 | /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method. |
45 | InspectErr<St, F>( |
46 | Inspect<IntoStream<St>, InspectErrFn<F>> |
47 | ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))] |
48 | ); |
49 | |
50 | mod into_stream; |
51 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
52 | pub use self::into_stream::IntoStream; |
53 | |
54 | delegate_all!( |
55 | /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method. |
56 | MapOk<St, F>( |
57 | Map<IntoStream<St>, MapOkFn<F>> |
58 | ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))] |
59 | ); |
60 | |
61 | delegate_all!( |
62 | /// Stream for the [`map_err`](super::TryStreamExt::map_err) method. |
63 | MapErr<St, F>( |
64 | Map<IntoStream<St>, MapErrFn<F>> |
65 | ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))] |
66 | ); |
67 | |
68 | mod or_else; |
69 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
70 | pub use self::or_else::OrElse; |
71 | |
72 | mod try_next; |
73 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
74 | pub use self::try_next::TryNext; |
75 | |
76 | mod try_for_each; |
77 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
78 | pub use self::try_for_each::TryForEach; |
79 | |
80 | mod try_filter; |
81 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
82 | pub use self::try_filter::TryFilter; |
83 | |
84 | mod try_filter_map; |
85 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
86 | pub use self::try_filter_map::TryFilterMap; |
87 | |
88 | mod try_flatten; |
89 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
90 | pub use self::try_flatten::TryFlatten; |
91 | |
92 | #[cfg (not(futures_no_atomic_cas))] |
93 | #[cfg (feature = "alloc" )] |
94 | mod try_flatten_unordered; |
95 | #[cfg (not(futures_no_atomic_cas))] |
96 | #[cfg (feature = "alloc" )] |
97 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
98 | pub use self::try_flatten_unordered::TryFlattenUnordered; |
99 | |
100 | mod try_collect; |
101 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
102 | pub use self::try_collect::TryCollect; |
103 | |
104 | mod try_concat; |
105 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
106 | pub use self::try_concat::TryConcat; |
107 | |
108 | #[cfg (feature = "alloc" )] |
109 | mod try_chunks; |
110 | #[cfg (feature = "alloc" )] |
111 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
112 | pub use self::try_chunks::{TryChunks, TryChunksError}; |
113 | |
114 | mod try_fold; |
115 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
116 | pub use self::try_fold::TryFold; |
117 | |
118 | mod try_unfold; |
119 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
120 | pub use self::try_unfold::{try_unfold, TryUnfold}; |
121 | |
122 | mod try_skip_while; |
123 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
124 | pub use self::try_skip_while::TrySkipWhile; |
125 | |
126 | mod try_take_while; |
127 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
128 | pub use self::try_take_while::TryTakeWhile; |
129 | |
130 | #[cfg (not(futures_no_atomic_cas))] |
131 | #[cfg (feature = "alloc" )] |
132 | mod try_buffer_unordered; |
133 | #[cfg (not(futures_no_atomic_cas))] |
134 | #[cfg (feature = "alloc" )] |
135 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
136 | pub use self::try_buffer_unordered::TryBufferUnordered; |
137 | |
138 | #[cfg (not(futures_no_atomic_cas))] |
139 | #[cfg (feature = "alloc" )] |
140 | mod try_buffered; |
141 | #[cfg (not(futures_no_atomic_cas))] |
142 | #[cfg (feature = "alloc" )] |
143 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
144 | pub use self::try_buffered::TryBuffered; |
145 | |
146 | #[cfg (not(futures_no_atomic_cas))] |
147 | #[cfg (feature = "alloc" )] |
148 | mod try_for_each_concurrent; |
149 | #[cfg (not(futures_no_atomic_cas))] |
150 | #[cfg (feature = "alloc" )] |
151 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
152 | pub use self::try_for_each_concurrent::TryForEachConcurrent; |
153 | |
154 | #[cfg (feature = "io" )] |
155 | #[cfg (feature = "std" )] |
156 | mod into_async_read; |
157 | #[cfg (feature = "io" )] |
158 | #[cfg_attr (docsrs, doc(cfg(feature = "io" )))] |
159 | #[cfg (feature = "std" )] |
160 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
161 | pub use self::into_async_read::IntoAsyncRead; |
162 | |
163 | impl<S: ?Sized + TryStream> TryStreamExt for S {} |
164 | |
165 | /// Adapters specific to `Result`-returning streams |
166 | pub trait TryStreamExt: TryStream { |
167 | /// Wraps the current stream in a new stream which converts the error type |
168 | /// into the one provided. |
169 | /// |
170 | /// # Examples |
171 | /// |
172 | /// ``` |
173 | /// # futures::executor::block_on(async { |
174 | /// use futures::stream::{self, TryStreamExt}; |
175 | /// |
176 | /// let mut stream = |
177 | /// stream::iter(vec![Ok(()), Err(5i32)]) |
178 | /// .err_into::<i64>(); |
179 | /// |
180 | /// assert_eq!(stream.try_next().await, Ok(Some(()))); |
181 | /// assert_eq!(stream.try_next().await, Err(5i64)); |
182 | /// # }) |
183 | /// ``` |
184 | fn err_into<E>(self) -> ErrInto<Self, E> |
185 | where |
186 | Self: Sized, |
187 | Self::Error: Into<E>, |
188 | { |
189 | assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self)) |
190 | } |
191 | |
192 | /// Wraps the current stream in a new stream which maps the success value |
193 | /// using the provided closure. |
194 | /// |
195 | /// # Examples |
196 | /// |
197 | /// ``` |
198 | /// # futures::executor::block_on(async { |
199 | /// use futures::stream::{self, TryStreamExt}; |
200 | /// |
201 | /// let mut stream = |
202 | /// stream::iter(vec![Ok(5), Err(0)]) |
203 | /// .map_ok(|x| x + 2); |
204 | /// |
205 | /// assert_eq!(stream.try_next().await, Ok(Some(7))); |
206 | /// assert_eq!(stream.try_next().await, Err(0)); |
207 | /// # }) |
208 | /// ``` |
209 | fn map_ok<T, F>(self, f: F) -> MapOk<Self, F> |
210 | where |
211 | Self: Sized, |
212 | F: FnMut(Self::Ok) -> T, |
213 | { |
214 | assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f)) |
215 | } |
216 | |
217 | /// Wraps the current stream in a new stream which maps the error value |
218 | /// using the provided closure. |
219 | /// |
220 | /// # Examples |
221 | /// |
222 | /// ``` |
223 | /// # futures::executor::block_on(async { |
224 | /// use futures::stream::{self, TryStreamExt}; |
225 | /// |
226 | /// let mut stream = |
227 | /// stream::iter(vec![Ok(5), Err(0)]) |
228 | /// .map_err(|x| x + 2); |
229 | /// |
230 | /// assert_eq!(stream.try_next().await, Ok(Some(5))); |
231 | /// assert_eq!(stream.try_next().await, Err(2)); |
232 | /// # }) |
233 | /// ``` |
234 | fn map_err<E, F>(self, f: F) -> MapErr<Self, F> |
235 | where |
236 | Self: Sized, |
237 | F: FnMut(Self::Error) -> E, |
238 | { |
239 | assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f)) |
240 | } |
241 | |
242 | /// Chain on a computation for when a value is ready, passing the successful |
243 | /// results to the provided closure `f`. |
244 | /// |
245 | /// This function can be used to run a unit of work when the next successful |
246 | /// value on a stream is ready. The closure provided will be yielded a value |
247 | /// when ready, and the returned future will then be run to completion to |
248 | /// produce the next value on this stream. |
249 | /// |
250 | /// Any errors produced by this stream will not be passed to the closure, |
251 | /// and will be passed through. |
252 | /// |
253 | /// The returned value of the closure must implement the `TryFuture` trait |
254 | /// and can represent some more work to be done before the composed stream |
255 | /// is finished. |
256 | /// |
257 | /// Note that this function consumes the receiving stream and returns a |
258 | /// wrapped version of it. |
259 | /// |
260 | /// To process the entire stream and return a single future representing |
261 | /// success or error, use `try_for_each` instead. |
262 | /// |
263 | /// # Examples |
264 | /// |
265 | /// ``` |
266 | /// use futures::channel::mpsc; |
267 | /// use futures::future; |
268 | /// use futures::stream::TryStreamExt; |
269 | /// |
270 | /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1); |
271 | /// |
272 | /// let rx = rx.and_then(|result| { |
273 | /// future::ok(if result % 2 == 0 { |
274 | /// Some(result) |
275 | /// } else { |
276 | /// None |
277 | /// }) |
278 | /// }); |
279 | /// ``` |
280 | fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> |
281 | where |
282 | F: FnMut(Self::Ok) -> Fut, |
283 | Fut: TryFuture<Error = Self::Error>, |
284 | Self: Sized, |
285 | { |
286 | assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f)) |
287 | } |
288 | |
289 | /// Chain on a computation for when an error happens, passing the |
290 | /// erroneous result to the provided closure `f`. |
291 | /// |
292 | /// This function can be used to run a unit of work and attempt to recover from |
293 | /// an error if one happens. The closure provided will be yielded an error |
294 | /// when one appears, and the returned future will then be run to completion |
295 | /// to produce the next value on this stream. |
296 | /// |
297 | /// Any successful values produced by this stream will not be passed to the |
298 | /// closure, and will be passed through. |
299 | /// |
300 | /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait |
301 | /// and can represent some more work to be done before the composed stream |
302 | /// is finished. |
303 | /// |
304 | /// Note that this function consumes the receiving stream and returns a |
305 | /// wrapped version of it. |
306 | fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> |
307 | where |
308 | F: FnMut(Self::Error) -> Fut, |
309 | Fut: TryFuture<Ok = Self::Ok>, |
310 | Self: Sized, |
311 | { |
312 | assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f)) |
313 | } |
314 | |
315 | /// Do something with the success value of this stream, afterwards passing |
316 | /// it on. |
317 | /// |
318 | /// This is similar to the `StreamExt::inspect` method where it allows |
319 | /// easily inspecting the success value as it passes through the stream, for |
320 | /// example to debug what's going on. |
321 | fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F> |
322 | where |
323 | F: FnMut(&Self::Ok), |
324 | Self: Sized, |
325 | { |
326 | assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f)) |
327 | } |
328 | |
329 | /// Do something with the error value of this stream, afterwards passing it on. |
330 | /// |
331 | /// This is similar to the `StreamExt::inspect` method where it allows |
332 | /// easily inspecting the error value as it passes through the stream, for |
333 | /// example to debug what's going on. |
334 | fn inspect_err<F>(self, f: F) -> InspectErr<Self, F> |
335 | where |
336 | F: FnMut(&Self::Error), |
337 | Self: Sized, |
338 | { |
339 | assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f)) |
340 | } |
341 | |
342 | /// Wraps a [`TryStream`] into a type that implements |
343 | /// [`Stream`](futures_core::stream::Stream) |
344 | /// |
345 | /// [`TryStream`]s currently do not implement the |
346 | /// [`Stream`](futures_core::stream::Stream) trait because of limitations |
347 | /// of the compiler. |
348 | /// |
349 | /// # Examples |
350 | /// |
351 | /// ``` |
352 | /// use futures::stream::{Stream, TryStream, TryStreamExt}; |
353 | /// |
354 | /// # type T = i32; |
355 | /// # type E = (); |
356 | /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... } |
357 | /// # futures::stream::empty() |
358 | /// # } |
359 | /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ } |
360 | /// |
361 | /// take_stream(make_try_stream().into_stream()); |
362 | /// ``` |
363 | fn into_stream(self) -> IntoStream<Self> |
364 | where |
365 | Self: Sized, |
366 | { |
367 | assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self)) |
368 | } |
369 | |
370 | /// Creates a future that attempts to resolve the next item in the stream. |
371 | /// If an error is encountered before the next item, the error is returned |
372 | /// instead. |
373 | /// |
374 | /// This is similar to the `Stream::next` combinator, but returns a |
375 | /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making |
376 | /// for easy use with the `?` operator. |
377 | /// |
378 | /// # Examples |
379 | /// |
380 | /// ``` |
381 | /// # futures::executor::block_on(async { |
382 | /// use futures::stream::{self, TryStreamExt}; |
383 | /// |
384 | /// let mut stream = stream::iter(vec![Ok(()), Err(())]); |
385 | /// |
386 | /// assert_eq!(stream.try_next().await, Ok(Some(()))); |
387 | /// assert_eq!(stream.try_next().await, Err(())); |
388 | /// # }) |
389 | /// ``` |
390 | fn try_next(&mut self) -> TryNext<'_, Self> |
391 | where |
392 | Self: Unpin, |
393 | { |
394 | assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self)) |
395 | } |
396 | |
397 | /// Attempts to run this stream to completion, executing the provided |
398 | /// asynchronous closure for each element on the stream. |
399 | /// |
400 | /// The provided closure will be called for each item this stream produces, |
401 | /// yielding a future. That future will then be executed to completion |
402 | /// before moving on to the next item. |
403 | /// |
404 | /// The returned value is a [`Future`](futures_core::future::Future) where the |
405 | /// [`Output`](futures_core::future::Future::Output) type is |
406 | /// `Result<(), Self::Error>`. If any of the intermediate |
407 | /// futures or the stream returns an error, this future will return |
408 | /// immediately with an error. |
409 | /// |
410 | /// # Examples |
411 | /// |
412 | /// ``` |
413 | /// # futures::executor::block_on(async { |
414 | /// use futures::future; |
415 | /// use futures::stream::{self, TryStreamExt}; |
416 | /// |
417 | /// let mut x = 0i32; |
418 | /// |
419 | /// { |
420 | /// let fut = stream::repeat(Ok(1)).try_for_each(|item| { |
421 | /// x += item; |
422 | /// future::ready(if x == 3 { Err(()) } else { Ok(()) }) |
423 | /// }); |
424 | /// assert_eq!(fut.await, Err(())); |
425 | /// } |
426 | /// |
427 | /// assert_eq!(x, 3); |
428 | /// # }) |
429 | /// ``` |
430 | fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> |
431 | where |
432 | F: FnMut(Self::Ok) -> Fut, |
433 | Fut: TryFuture<Ok = (), Error = Self::Error>, |
434 | Self: Sized, |
435 | { |
436 | assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f)) |
437 | } |
438 | |
439 | /// Skip elements on this stream while the provided asynchronous predicate |
440 | /// resolves to `true`. |
441 | /// |
442 | /// This function is similar to |
443 | /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits |
444 | /// early if an error occurs. |
445 | /// |
446 | /// # Examples |
447 | /// |
448 | /// ``` |
449 | /// # futures::executor::block_on(async { |
450 | /// use futures::future; |
451 | /// use futures::stream::{self, TryStreamExt}; |
452 | /// |
453 | /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]); |
454 | /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3))); |
455 | /// |
456 | /// let output: Result<Vec<i32>, i32> = stream.try_collect().await; |
457 | /// assert_eq!(output, Ok(vec![3, 2])); |
458 | /// # }) |
459 | /// ``` |
460 | fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> |
461 | where |
462 | F: FnMut(&Self::Ok) -> Fut, |
463 | Fut: TryFuture<Ok = bool, Error = Self::Error>, |
464 | Self: Sized, |
465 | { |
466 | assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f)) |
467 | } |
468 | |
469 | /// Take elements on this stream while the provided asynchronous predicate |
470 | /// resolves to `true`. |
471 | /// |
472 | /// This function is similar to |
473 | /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits |
474 | /// early if an error occurs. |
475 | /// |
476 | /// # Examples |
477 | /// |
478 | /// ``` |
479 | /// # futures::executor::block_on(async { |
480 | /// use futures::future; |
481 | /// use futures::stream::{self, TryStreamExt}; |
482 | /// |
483 | /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]); |
484 | /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3))); |
485 | /// |
486 | /// let output: Result<Vec<i32>, i32> = stream.try_collect().await; |
487 | /// assert_eq!(output, Ok(vec![1, 2])); |
488 | /// # }) |
489 | /// ``` |
490 | fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> |
491 | where |
492 | F: FnMut(&Self::Ok) -> Fut, |
493 | Fut: TryFuture<Ok = bool, Error = Self::Error>, |
494 | Self: Sized, |
495 | { |
496 | assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f)) |
497 | } |
498 | |
499 | /// Attempts to run this stream to completion, executing the provided asynchronous |
500 | /// closure for each element on the stream concurrently as elements become |
501 | /// available, exiting as soon as an error occurs. |
502 | /// |
503 | /// This is similar to |
504 | /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent), |
505 | /// but will resolve to an error immediately if the underlying stream or the provided |
506 | /// closure return an error. |
507 | /// |
508 | /// This method is only available when the `std` or `alloc` feature of this |
509 | /// library is activated, and it is activated by default. |
510 | /// |
511 | /// # Examples |
512 | /// |
513 | /// ``` |
514 | /// # futures::executor::block_on(async { |
515 | /// use futures::channel::oneshot; |
516 | /// use futures::stream::{self, StreamExt, TryStreamExt}; |
517 | /// |
518 | /// let (tx1, rx1) = oneshot::channel(); |
519 | /// let (tx2, rx2) = oneshot::channel(); |
520 | /// let (_tx3, rx3) = oneshot::channel(); |
521 | /// |
522 | /// let stream = stream::iter(vec![rx1, rx2, rx3]); |
523 | /// let fut = stream.map(Ok).try_for_each_concurrent( |
524 | /// /* limit */ 2, |
525 | /// |rx| async move { |
526 | /// let res: Result<(), oneshot::Canceled> = rx.await; |
527 | /// res |
528 | /// } |
529 | /// ); |
530 | /// |
531 | /// tx1.send(()).unwrap(); |
532 | /// // Drop the second sender so that `rx2` resolves to `Canceled`. |
533 | /// drop(tx2); |
534 | /// |
535 | /// // The final result is an error because the second future |
536 | /// // resulted in an error. |
537 | /// assert_eq!(Err(oneshot::Canceled), fut.await); |
538 | /// # }) |
539 | /// ``` |
540 | #[cfg (not(futures_no_atomic_cas))] |
541 | #[cfg (feature = "alloc" )] |
542 | fn try_for_each_concurrent<Fut, F>( |
543 | self, |
544 | limit: impl Into<Option<usize>>, |
545 | f: F, |
546 | ) -> TryForEachConcurrent<Self, Fut, F> |
547 | where |
548 | F: FnMut(Self::Ok) -> Fut, |
549 | Fut: Future<Output = Result<(), Self::Error>>, |
550 | Self: Sized, |
551 | { |
552 | assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new( |
553 | self, |
554 | limit.into(), |
555 | f, |
556 | )) |
557 | } |
558 | |
559 | /// Attempt to transform a stream into a collection, |
560 | /// returning a future representing the result of that computation. |
561 | /// |
562 | /// This combinator will collect all successful results of this stream and |
563 | /// collect them into the specified collection type. If an error happens then all |
564 | /// collected elements will be dropped and the error will be returned. |
565 | /// |
566 | /// The returned future will be resolved when the stream terminates. |
567 | /// |
568 | /// # Examples |
569 | /// |
570 | /// ``` |
571 | /// # futures::executor::block_on(async { |
572 | /// use futures::channel::mpsc; |
573 | /// use futures::stream::TryStreamExt; |
574 | /// use std::thread; |
575 | /// |
576 | /// let (tx, rx) = mpsc::unbounded(); |
577 | /// |
578 | /// thread::spawn(move || { |
579 | /// for i in 1..=5 { |
580 | /// tx.unbounded_send(Ok(i)).unwrap(); |
581 | /// } |
582 | /// tx.unbounded_send(Err(6)).unwrap(); |
583 | /// }); |
584 | /// |
585 | /// let output: Result<Vec<i32>, i32> = rx.try_collect().await; |
586 | /// assert_eq!(output, Err(6)); |
587 | /// # }) |
588 | /// ``` |
589 | fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C> |
590 | where |
591 | Self: Sized, |
592 | { |
593 | assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self)) |
594 | } |
595 | |
596 | /// An adaptor for chunking up successful items of the stream inside a vector. |
597 | /// |
598 | /// This combinator will attempt to pull successful items from this stream and buffer |
599 | /// them into a local vector. At most `capacity` items will get buffered |
600 | /// before they're yielded from the returned stream. |
601 | /// |
602 | /// Note that the vectors returned from this iterator may not always have |
603 | /// `capacity` elements. If the underlying stream ended and only a partial |
604 | /// vector was created, it'll be returned. Additionally if an error happens |
605 | /// from the underlying stream then the currently buffered items will be |
606 | /// yielded. |
607 | /// |
608 | /// This method is only available when the `std` or `alloc` feature of this |
609 | /// library is activated, and it is activated by default. |
610 | /// |
611 | /// This function is similar to |
612 | /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits |
613 | /// early if an error occurs. |
614 | /// |
615 | /// # Examples |
616 | /// |
617 | /// ``` |
618 | /// # futures::executor::block_on(async { |
619 | /// use futures::stream::{self, TryChunksError, TryStreamExt}; |
620 | /// |
621 | /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]); |
622 | /// let mut stream = stream.try_chunks(2); |
623 | /// |
624 | /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2]))); |
625 | /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4))); |
626 | /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6]))); |
627 | /// # }) |
628 | /// ``` |
629 | /// |
630 | /// # Panics |
631 | /// |
632 | /// This method will panic if `capacity` is zero. |
633 | #[cfg (feature = "alloc" )] |
634 | fn try_chunks(self, capacity: usize) -> TryChunks<Self> |
635 | where |
636 | Self: Sized, |
637 | { |
638 | assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>( |
639 | TryChunks::new(self, capacity), |
640 | ) |
641 | } |
642 | |
643 | /// Attempt to filter the values produced by this stream according to the |
644 | /// provided asynchronous closure. |
645 | /// |
646 | /// As values of this stream are made available, the provided predicate `f` |
647 | /// will be run on them. If the predicate returns a `Future` which resolves |
648 | /// to `true`, then the stream will yield the value, but if the predicate |
649 | /// return a `Future` which resolves to `false`, then the value will be |
650 | /// discarded and the next value will be produced. |
651 | /// |
652 | /// All errors are passed through without filtering in this combinator. |
653 | /// |
654 | /// Note that this function consumes the stream passed into it and returns a |
655 | /// wrapped version of it, similar to the existing `filter` methods in |
656 | /// the standard library. |
657 | /// |
658 | /// # Examples |
659 | /// ``` |
660 | /// # futures::executor::block_on(async { |
661 | /// use futures::future; |
662 | /// use futures::stream::{self, StreamExt, TryStreamExt}; |
663 | /// |
664 | /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error" )]); |
665 | /// let mut evens = stream.try_filter(|x| { |
666 | /// future::ready(x % 2 == 0) |
667 | /// }); |
668 | /// |
669 | /// assert_eq!(evens.next().await, Some(Ok(2))); |
670 | /// assert_eq!(evens.next().await, Some(Err("error" ))); |
671 | /// # }) |
672 | /// ``` |
673 | fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> |
674 | where |
675 | Fut: Future<Output = bool>, |
676 | F: FnMut(&Self::Ok) -> Fut, |
677 | Self: Sized, |
678 | { |
679 | assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f)) |
680 | } |
681 | |
682 | /// Attempt to filter the values produced by this stream while |
683 | /// simultaneously mapping them to a different type according to the |
684 | /// provided asynchronous closure. |
685 | /// |
686 | /// As values of this stream are made available, the provided function will |
687 | /// be run on them. If the future returned by the predicate `f` resolves to |
688 | /// [`Some(item)`](Some) then the stream will yield the value `item`, but if |
689 | /// it resolves to [`None`] then the next value will be produced. |
690 | /// |
691 | /// All errors are passed through without filtering in this combinator. |
692 | /// |
693 | /// Note that this function consumes the stream passed into it and returns a |
694 | /// wrapped version of it, similar to the existing `filter_map` methods in |
695 | /// the standard library. |
696 | /// |
697 | /// # Examples |
698 | /// ``` |
699 | /// # futures::executor::block_on(async { |
700 | /// use futures::stream::{self, StreamExt, TryStreamExt}; |
701 | /// use futures::pin_mut; |
702 | /// |
703 | /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error" )]); |
704 | /// let halves = stream.try_filter_map(|x| async move { |
705 | /// let ret = if x % 2 == 0 { Some(x / 2) } else { None }; |
706 | /// Ok(ret) |
707 | /// }); |
708 | /// |
709 | /// pin_mut!(halves); |
710 | /// assert_eq!(halves.next().await, Some(Ok(3))); |
711 | /// assert_eq!(halves.next().await, Some(Err("error" ))); |
712 | /// # }) |
713 | /// ``` |
714 | fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> |
715 | where |
716 | Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, |
717 | F: FnMut(Self::Ok) -> Fut, |
718 | Self: Sized, |
719 | { |
720 | assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f)) |
721 | } |
722 | |
723 | /// Flattens a stream of streams into just one continuous stream. Produced streams |
724 | /// will be polled concurrently and any errors will be passed through without looking at them. |
725 | /// If the underlying base stream returns an error, it will be **immediately** propagated. |
726 | /// |
727 | /// The only argument is an optional limit on the number of concurrently |
728 | /// polled streams. If this limit is not `None`, no more than `limit` streams |
729 | /// will be polled at the same time. The `limit` argument is of type |
730 | /// `Into<Option<usize>>`, and so can be provided as either `None`, |
731 | /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as |
732 | /// no limit at all, and will have the same result as passing in `None`. |
733 | /// |
734 | /// # Examples |
735 | /// |
736 | /// ``` |
737 | /// # futures::executor::block_on(async { |
738 | /// use futures::channel::mpsc; |
739 | /// use futures::stream::{StreamExt, TryStreamExt}; |
740 | /// use std::thread; |
741 | /// |
742 | /// let (tx1, rx1) = mpsc::unbounded(); |
743 | /// let (tx2, rx2) = mpsc::unbounded(); |
744 | /// let (tx3, rx3) = mpsc::unbounded(); |
745 | /// |
746 | /// thread::spawn(move || { |
747 | /// tx1.unbounded_send(Ok(1)).unwrap(); |
748 | /// }); |
749 | /// thread::spawn(move || { |
750 | /// tx2.unbounded_send(Ok(2)).unwrap(); |
751 | /// tx2.unbounded_send(Err(3)).unwrap(); |
752 | /// tx2.unbounded_send(Ok(4)).unwrap(); |
753 | /// }); |
754 | /// thread::spawn(move || { |
755 | /// tx3.unbounded_send(Ok(rx1)).unwrap(); |
756 | /// tx3.unbounded_send(Ok(rx2)).unwrap(); |
757 | /// tx3.unbounded_send(Err(5)).unwrap(); |
758 | /// }); |
759 | /// |
760 | /// let stream = rx3.try_flatten_unordered(None); |
761 | /// let mut values: Vec<_> = stream.collect().await; |
762 | /// values.sort(); |
763 | /// |
764 | /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]); |
765 | /// # }); |
766 | /// ``` |
767 | #[cfg (not(futures_no_atomic_cas))] |
768 | #[cfg (feature = "alloc" )] |
769 | fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self> |
770 | where |
771 | Self::Ok: TryStream + Unpin, |
772 | <Self::Ok as TryStream>::Error: From<Self::Error>, |
773 | Self: Sized, |
774 | { |
775 | assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>( |
776 | TryFlattenUnordered::new(self, limit), |
777 | ) |
778 | } |
779 | |
780 | /// Flattens a stream of streams into just one continuous stream. |
781 | /// |
782 | /// If this stream's elements are themselves streams then this combinator |
783 | /// will flatten out the entire stream to one long chain of elements. Any |
784 | /// errors are passed through without looking at them, but otherwise each |
785 | /// individual stream will get exhausted before moving on to the next. |
786 | /// |
787 | /// # Examples |
788 | /// |
789 | /// ``` |
790 | /// # futures::executor::block_on(async { |
791 | /// use futures::channel::mpsc; |
792 | /// use futures::stream::{StreamExt, TryStreamExt}; |
793 | /// use std::thread; |
794 | /// |
795 | /// let (tx1, rx1) = mpsc::unbounded(); |
796 | /// let (tx2, rx2) = mpsc::unbounded(); |
797 | /// let (tx3, rx3) = mpsc::unbounded(); |
798 | /// |
799 | /// thread::spawn(move || { |
800 | /// tx1.unbounded_send(Ok(1)).unwrap(); |
801 | /// }); |
802 | /// thread::spawn(move || { |
803 | /// tx2.unbounded_send(Ok(2)).unwrap(); |
804 | /// tx2.unbounded_send(Err(3)).unwrap(); |
805 | /// tx2.unbounded_send(Ok(4)).unwrap(); |
806 | /// }); |
807 | /// thread::spawn(move || { |
808 | /// tx3.unbounded_send(Ok(rx1)).unwrap(); |
809 | /// tx3.unbounded_send(Ok(rx2)).unwrap(); |
810 | /// tx3.unbounded_send(Err(5)).unwrap(); |
811 | /// }); |
812 | /// |
813 | /// let mut stream = rx3.try_flatten(); |
814 | /// assert_eq!(stream.next().await, Some(Ok(1))); |
815 | /// assert_eq!(stream.next().await, Some(Ok(2))); |
816 | /// assert_eq!(stream.next().await, Some(Err(3))); |
817 | /// assert_eq!(stream.next().await, Some(Ok(4))); |
818 | /// assert_eq!(stream.next().await, Some(Err(5))); |
819 | /// assert_eq!(stream.next().await, None); |
820 | /// # }); |
821 | /// ``` |
822 | fn try_flatten(self) -> TryFlatten<Self> |
823 | where |
824 | Self::Ok: TryStream, |
825 | <Self::Ok as TryStream>::Error: From<Self::Error>, |
826 | Self: Sized, |
827 | { |
828 | assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>( |
829 | TryFlatten::new(self), |
830 | ) |
831 | } |
832 | |
833 | /// Attempt to execute an accumulating asynchronous computation over a |
834 | /// stream, collecting all the values into one final result. |
835 | /// |
836 | /// This combinator will accumulate all values returned by this stream |
837 | /// according to the closure provided. The initial state is also provided to |
838 | /// this method and then is returned again by each execution of the closure. |
839 | /// Once the entire stream has been exhausted the returned future will |
840 | /// resolve to this value. |
841 | /// |
842 | /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will |
843 | /// exit early if an error is encountered in either the stream or the |
844 | /// provided closure. |
845 | /// |
846 | /// # Examples |
847 | /// |
848 | /// ``` |
849 | /// # futures::executor::block_on(async { |
850 | /// use futures::stream::{self, TryStreamExt}; |
851 | /// |
852 | /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]); |
853 | /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) }); |
854 | /// assert_eq!(sum.await, Ok(3)); |
855 | /// |
856 | /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]); |
857 | /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) }); |
858 | /// assert_eq!(sum.await, Err(2)); |
859 | /// # }) |
860 | /// ``` |
861 | fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> |
862 | where |
863 | F: FnMut(T, Self::Ok) -> Fut, |
864 | Fut: TryFuture<Ok = T, Error = Self::Error>, |
865 | Self: Sized, |
866 | { |
867 | assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init)) |
868 | } |
869 | |
870 | /// Attempt to concatenate all items of a stream into a single |
871 | /// extendable destination, returning a future representing the end result. |
872 | /// |
873 | /// This combinator will extend the first item with the contents of all |
874 | /// the subsequent successful results of the stream. If the stream is empty, |
875 | /// the default value will be returned. |
876 | /// |
877 | /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait. |
878 | /// |
879 | /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will |
880 | /// exit early if an error is encountered in the stream. |
881 | /// |
882 | /// # Examples |
883 | /// |
884 | /// ``` |
885 | /// # futures::executor::block_on(async { |
886 | /// use futures::channel::mpsc; |
887 | /// use futures::stream::TryStreamExt; |
888 | /// use std::thread; |
889 | /// |
890 | /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>(); |
891 | /// |
892 | /// thread::spawn(move || { |
893 | /// for i in (0..3).rev() { |
894 | /// let n = i * 3; |
895 | /// tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap(); |
896 | /// } |
897 | /// }); |
898 | /// |
899 | /// let result = rx.try_concat().await; |
900 | /// |
901 | /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3])); |
902 | /// # }); |
903 | /// ``` |
904 | fn try_concat(self) -> TryConcat<Self> |
905 | where |
906 | Self: Sized, |
907 | Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default, |
908 | { |
909 | assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self)) |
910 | } |
911 | |
912 | /// Attempt to execute several futures from a stream concurrently (unordered). |
913 | /// |
914 | /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type |
915 | /// that matches the stream's `Error` type. |
916 | /// |
917 | /// This adaptor will buffer up to `n` futures and then return their |
918 | /// outputs in the order in which they complete. If the underlying stream |
919 | /// returns an error, it will be immediately propagated. |
920 | /// |
921 | /// The returned stream will be a stream of results, each containing either |
922 | /// an error or a future's output. An error can be produced either by the |
923 | /// underlying stream itself or by one of the futures it yielded. |
924 | /// |
925 | /// This method is only available when the `std` or `alloc` feature of this |
926 | /// library is activated, and it is activated by default. |
927 | /// |
928 | /// # Examples |
929 | /// |
930 | /// Results are returned in the order of completion: |
931 | /// ``` |
932 | /// # futures::executor::block_on(async { |
933 | /// use futures::channel::oneshot; |
934 | /// use futures::stream::{self, StreamExt, TryStreamExt}; |
935 | /// |
936 | /// let (send_one, recv_one) = oneshot::channel(); |
937 | /// let (send_two, recv_two) = oneshot::channel(); |
938 | /// |
939 | /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]); |
940 | /// |
941 | /// let mut buffered = stream_of_futures.try_buffer_unordered(10); |
942 | /// |
943 | /// send_two.send(2i32)?; |
944 | /// assert_eq!(buffered.next().await, Some(Ok(2i32))); |
945 | /// |
946 | /// send_one.send(1i32)?; |
947 | /// assert_eq!(buffered.next().await, Some(Ok(1i32))); |
948 | /// |
949 | /// assert_eq!(buffered.next().await, None); |
950 | /// # Ok::<(), i32>(()) }).unwrap(); |
951 | /// ``` |
952 | /// |
953 | /// Errors from the underlying stream itself are propagated: |
954 | /// ``` |
955 | /// # futures::executor::block_on(async { |
956 | /// use futures::channel::mpsc; |
957 | /// use futures::stream::{StreamExt, TryStreamExt}; |
958 | /// |
959 | /// let (sink, stream_of_futures) = mpsc::unbounded(); |
960 | /// let mut buffered = stream_of_futures.try_buffer_unordered(10); |
961 | /// |
962 | /// sink.unbounded_send(Ok(async { Ok(7i32) }))?; |
963 | /// assert_eq!(buffered.next().await, Some(Ok(7i32))); |
964 | /// |
965 | /// sink.unbounded_send(Err("error in the stream" ))?; |
966 | /// assert_eq!(buffered.next().await, Some(Err("error in the stream" ))); |
967 | /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); |
968 | /// ``` |
969 | #[cfg (not(futures_no_atomic_cas))] |
970 | #[cfg (feature = "alloc" )] |
971 | fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> |
972 | where |
973 | Self::Ok: TryFuture<Error = Self::Error>, |
974 | Self: Sized, |
975 | { |
976 | assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>( |
977 | TryBufferUnordered::new(self, n), |
978 | ) |
979 | } |
980 | |
981 | /// Attempt to execute several futures from a stream concurrently. |
982 | /// |
983 | /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type |
984 | /// that matches the stream's `Error` type. |
985 | /// |
986 | /// This adaptor will buffer up to `n` futures and then return their |
987 | /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will |
988 | /// be immediately propagated. |
989 | /// |
990 | /// The returned stream will be a stream of results, each containing either |
991 | /// an error or a future's output. An error can be produced either by the |
992 | /// underlying stream itself or by one of the futures it yielded. |
993 | /// |
994 | /// This method is only available when the `std` or `alloc` feature of this |
995 | /// library is activated, and it is activated by default. |
996 | /// |
997 | /// # Examples |
998 | /// |
999 | /// Results are returned in the order of addition: |
1000 | /// ``` |
1001 | /// # futures::executor::block_on(async { |
1002 | /// use futures::channel::oneshot; |
1003 | /// use futures::future::lazy; |
1004 | /// use futures::stream::{self, StreamExt, TryStreamExt}; |
1005 | /// |
1006 | /// let (send_one, recv_one) = oneshot::channel(); |
1007 | /// let (send_two, recv_two) = oneshot::channel(); |
1008 | /// |
1009 | /// let mut buffered = lazy(move |cx| { |
1010 | /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]); |
1011 | /// |
1012 | /// let mut buffered = stream_of_futures.try_buffered(10); |
1013 | /// |
1014 | /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); |
1015 | /// |
1016 | /// send_two.send(2i32)?; |
1017 | /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); |
1018 | /// Ok::<_, i32>(buffered) |
1019 | /// }).await?; |
1020 | /// |
1021 | /// send_one.send(1i32)?; |
1022 | /// assert_eq!(buffered.next().await, Some(Ok(1i32))); |
1023 | /// assert_eq!(buffered.next().await, Some(Ok(2i32))); |
1024 | /// |
1025 | /// assert_eq!(buffered.next().await, None); |
1026 | /// # Ok::<(), i32>(()) }).unwrap(); |
1027 | /// ``` |
1028 | /// |
1029 | /// Errors from the underlying stream itself are propagated: |
1030 | /// ``` |
1031 | /// # futures::executor::block_on(async { |
1032 | /// use futures::channel::mpsc; |
1033 | /// use futures::stream::{StreamExt, TryStreamExt}; |
1034 | /// |
1035 | /// let (sink, stream_of_futures) = mpsc::unbounded(); |
1036 | /// let mut buffered = stream_of_futures.try_buffered(10); |
1037 | /// |
1038 | /// sink.unbounded_send(Ok(async { Ok(7i32) }))?; |
1039 | /// assert_eq!(buffered.next().await, Some(Ok(7i32))); |
1040 | /// |
1041 | /// sink.unbounded_send(Err("error in the stream" ))?; |
1042 | /// assert_eq!(buffered.next().await, Some(Err("error in the stream" ))); |
1043 | /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); |
1044 | /// ``` |
1045 | #[cfg (not(futures_no_atomic_cas))] |
1046 | #[cfg (feature = "alloc" )] |
1047 | fn try_buffered(self, n: usize) -> TryBuffered<Self> |
1048 | where |
1049 | Self::Ok: TryFuture<Error = Self::Error>, |
1050 | Self: Sized, |
1051 | { |
1052 | assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new( |
1053 | self, n, |
1054 | )) |
1055 | } |
1056 | |
1057 | // TODO: false positive warning from rustdoc. Verify once #43466 settles |
1058 | // |
1059 | /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`] |
1060 | /// stream types. |
1061 | fn try_poll_next_unpin( |
1062 | &mut self, |
1063 | cx: &mut Context<'_>, |
1064 | ) -> Poll<Option<Result<Self::Ok, Self::Error>>> |
1065 | where |
1066 | Self: Unpin, |
1067 | { |
1068 | Pin::new(self).try_poll_next(cx) |
1069 | } |
1070 | |
1071 | /// Wraps a [`TryStream`] into a stream compatible with libraries using |
1072 | /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled. |
1073 | /// ``` |
1074 | /// # if cfg!(miri) { return; } // Miri does not support epoll |
1075 | /// use futures::future::{FutureExt, TryFutureExt}; |
1076 | /// # let (tx, rx) = futures::channel::oneshot::channel(); |
1077 | /// |
1078 | /// let future03 = async { |
1079 | /// println!("Running on the pool"); |
1080 | /// tx.send(42).unwrap(); |
1081 | /// }; |
1082 | /// |
1083 | /// let future01 = future03 |
1084 | /// .unit_error() // Make it a TryFuture |
1085 | /// .boxed() // Make it Unpin |
1086 | /// .compat(); |
1087 | /// |
1088 | /// tokio::run(future01); |
1089 | /// # assert_eq!(42, futures::executor::block_on(rx).unwrap()); |
1090 | /// ``` |
1091 | #[cfg (feature = "compat" )] |
1092 | #[cfg_attr (docsrs, doc(cfg(feature = "compat" )))] |
1093 | fn compat(self) -> Compat<Self> |
1094 | where |
1095 | Self: Sized + Unpin, |
1096 | { |
1097 | Compat::new(self) |
1098 | } |
1099 | |
1100 | /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead). |
1101 | /// |
1102 | /// This method is only available when the `std` feature of this |
1103 | /// library is activated, and it is activated by default. |
1104 | /// |
1105 | /// # Examples |
1106 | /// |
1107 | /// ``` |
1108 | /// # futures::executor::block_on(async { |
1109 | /// use futures::stream::{self, TryStreamExt}; |
1110 | /// use futures::io::AsyncReadExt; |
1111 | /// |
1112 | /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]); |
1113 | /// let mut reader = stream.into_async_read(); |
1114 | /// |
1115 | /// let mut buf = Vec::new(); |
1116 | /// reader.read_to_end(&mut buf).await.unwrap(); |
1117 | /// assert_eq!(buf, [1, 2, 3, 4, 5]); |
1118 | /// # }) |
1119 | /// ``` |
1120 | #[cfg (feature = "io" )] |
1121 | #[cfg_attr (docsrs, doc(cfg(feature = "io" )))] |
1122 | #[cfg (feature = "std" )] |
1123 | fn into_async_read(self) -> IntoAsyncRead<Self> |
1124 | where |
1125 | Self: Sized + TryStreamExt<Error = std::io::Error>, |
1126 | Self::Ok: AsRef<[u8]>, |
1127 | { |
1128 | crate::io::assert_read(IntoAsyncRead::new(self)) |
1129 | } |
1130 | } |
1131 | |