1 | //! Combinators for the [`Stream`] trait. |
2 | //! |
3 | //! # Examples |
4 | //! |
5 | //! ``` |
6 | //! use futures_lite::stream::{self, StreamExt}; |
7 | //! |
8 | //! # spin_on::spin_on(async { |
9 | //! let mut s = stream::iter(vec![1, 2, 3]); |
10 | //! |
11 | //! assert_eq!(s.next().await, Some(1)); |
12 | //! assert_eq!(s.next().await, Some(2)); |
13 | //! assert_eq!(s.next().await, Some(3)); |
14 | //! assert_eq!(s.next().await, None); |
15 | //! # }); |
16 | //! ``` |
17 | |
18 | #[cfg (feature = "alloc" )] |
19 | extern crate alloc; |
20 | |
21 | #[doc (no_inline)] |
22 | pub use futures_core::stream::Stream; |
23 | |
24 | #[cfg (feature = "alloc" )] |
25 | use alloc::boxed::Box; |
26 | |
27 | use core::fmt; |
28 | use core::future::Future; |
29 | use core::marker::PhantomData; |
30 | use core::mem; |
31 | use core::pin::Pin; |
32 | use core::task::{Context, Poll}; |
33 | |
34 | use pin_project_lite::pin_project; |
35 | |
36 | use crate::ready; |
37 | |
38 | /// Converts a stream into a blocking iterator. |
39 | /// |
40 | /// # Examples |
41 | /// |
42 | /// ``` |
43 | /// use futures_lite::{pin, stream}; |
44 | /// |
45 | /// let stream = stream::once(7); |
46 | /// pin!(stream); |
47 | /// |
48 | /// let mut iter = stream::block_on(stream); |
49 | /// assert_eq!(iter.next(), Some(7)); |
50 | /// assert_eq!(iter.next(), None); |
51 | /// ``` |
52 | #[cfg (feature = "std" )] |
53 | pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> { |
54 | BlockOn(stream) |
55 | } |
56 | |
57 | /// Iterator for the [`block_on()`] function. |
58 | #[derive (Debug)] |
59 | pub struct BlockOn<S>(S); |
60 | |
61 | #[cfg (feature = "std" )] |
62 | impl<S: Stream + Unpin> Iterator for BlockOn<S> { |
63 | type Item = S::Item; |
64 | |
65 | fn next(&mut self) -> Option<Self::Item> { |
66 | crate::future::block_on(self.0.next()) |
67 | } |
68 | |
69 | fn size_hint(&self) -> (usize, Option<usize>) { |
70 | self.0.size_hint() |
71 | } |
72 | |
73 | fn count(self) -> usize { |
74 | crate::future::block_on(self.0.count()) |
75 | } |
76 | |
77 | fn last(self) -> Option<Self::Item> { |
78 | crate::future::block_on(self.0.last()) |
79 | } |
80 | |
81 | fn nth(&mut self, n: usize) -> Option<Self::Item> { |
82 | crate::future::block_on(self.0.nth(n)) |
83 | } |
84 | |
85 | fn fold<B, F>(self, init: B, f: F) -> B |
86 | where |
87 | F: FnMut(B, Self::Item) -> B, |
88 | { |
89 | crate::future::block_on(self.0.fold(init, f)) |
90 | } |
91 | |
92 | fn for_each<F>(self, f: F) -> F::Output |
93 | where |
94 | F: FnMut(Self::Item), |
95 | { |
96 | crate::future::block_on(self.0.for_each(f)) |
97 | } |
98 | |
99 | fn all<F>(&mut self, f: F) -> bool |
100 | where |
101 | F: FnMut(Self::Item) -> bool, |
102 | { |
103 | crate::future::block_on(self.0.all(f)) |
104 | } |
105 | |
106 | fn any<F>(&mut self, f: F) -> bool |
107 | where |
108 | F: FnMut(Self::Item) -> bool, |
109 | { |
110 | crate::future::block_on(self.0.any(f)) |
111 | } |
112 | |
113 | fn find<P>(&mut self, predicate: P) -> Option<Self::Item> |
114 | where |
115 | P: FnMut(&Self::Item) -> bool, |
116 | { |
117 | crate::future::block_on(self.0.find(predicate)) |
118 | } |
119 | |
120 | fn find_map<B, F>(&mut self, f: F) -> Option<B> |
121 | where |
122 | F: FnMut(Self::Item) -> Option<B>, |
123 | { |
124 | crate::future::block_on(self.0.find_map(f)) |
125 | } |
126 | |
127 | fn position<P>(&mut self, predicate: P) -> Option<usize> |
128 | where |
129 | P: FnMut(Self::Item) -> bool, |
130 | { |
131 | crate::future::block_on(self.0.position(predicate)) |
132 | } |
133 | } |
134 | |
135 | /// Creates an empty stream. |
136 | /// |
137 | /// # Examples |
138 | /// |
139 | /// ``` |
140 | /// use futures_lite::stream::{self, StreamExt}; |
141 | /// |
142 | /// # spin_on::spin_on(async { |
143 | /// let mut s = stream::empty::<i32>(); |
144 | /// assert_eq!(s.next().await, None); |
145 | /// # }) |
146 | /// ``` |
147 | pub fn empty<T>() -> Empty<T> { |
148 | Empty { |
149 | _marker: PhantomData, |
150 | } |
151 | } |
152 | |
153 | /// Stream for the [`empty()`] function. |
154 | #[derive (Clone, Debug)] |
155 | #[must_use = "streams do nothing unless polled" ] |
156 | pub struct Empty<T> { |
157 | _marker: PhantomData<T>, |
158 | } |
159 | |
160 | impl<T> Unpin for Empty<T> {} |
161 | |
162 | impl<T> Stream for Empty<T> { |
163 | type Item = T; |
164 | |
165 | fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
166 | Poll::Ready(None) |
167 | } |
168 | |
169 | fn size_hint(&self) -> (usize, Option<usize>) { |
170 | (0, Some(0)) |
171 | } |
172 | } |
173 | |
174 | /// Creates a stream from an iterator. |
175 | /// |
176 | /// # Examples |
177 | /// |
178 | /// ``` |
179 | /// use futures_lite::stream::{self, StreamExt}; |
180 | /// |
181 | /// # spin_on::spin_on(async { |
182 | /// let mut s = stream::iter(vec![1, 2]); |
183 | /// |
184 | /// assert_eq!(s.next().await, Some(1)); |
185 | /// assert_eq!(s.next().await, Some(2)); |
186 | /// assert_eq!(s.next().await, None); |
187 | /// # }) |
188 | /// ``` |
189 | pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> { |
190 | Iter { |
191 | iter: iter.into_iter(), |
192 | } |
193 | } |
194 | |
195 | /// Stream for the [`iter()`] function. |
196 | #[derive (Clone, Debug)] |
197 | #[must_use = "streams do nothing unless polled" ] |
198 | pub struct Iter<I> { |
199 | iter: I, |
200 | } |
201 | |
202 | impl<I> Unpin for Iter<I> {} |
203 | |
204 | impl<I: Iterator> Stream for Iter<I> { |
205 | type Item = I::Item; |
206 | |
207 | fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
208 | Poll::Ready(self.iter.next()) |
209 | } |
210 | |
211 | fn size_hint(&self) -> (usize, Option<usize>) { |
212 | self.iter.size_hint() |
213 | } |
214 | } |
215 | |
216 | /// Creates a stream that yields a single item. |
217 | /// |
218 | /// # Examples |
219 | /// |
220 | /// ``` |
221 | /// use futures_lite::stream::{self, StreamExt}; |
222 | /// |
223 | /// # spin_on::spin_on(async { |
224 | /// let mut s = stream::once(7); |
225 | /// |
226 | /// assert_eq!(s.next().await, Some(7)); |
227 | /// assert_eq!(s.next().await, None); |
228 | /// # }) |
229 | /// ``` |
230 | pub fn once<T>(t: T) -> Once<T> { |
231 | Once { value: Some(t) } |
232 | } |
233 | |
234 | pin_project! { |
235 | /// Stream for the [`once()`] function. |
236 | #[derive(Clone, Debug)] |
237 | #[must_use = "streams do nothing unless polled" ] |
238 | pub struct Once<T> { |
239 | value: Option<T>, |
240 | } |
241 | } |
242 | |
243 | impl<T> Stream for Once<T> { |
244 | type Item = T; |
245 | |
246 | fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { |
247 | Poll::Ready(self.project().value.take()) |
248 | } |
249 | |
250 | fn size_hint(&self) -> (usize, Option<usize>) { |
251 | if self.value.is_some() { |
252 | (1, Some(1)) |
253 | } else { |
254 | (0, Some(0)) |
255 | } |
256 | } |
257 | } |
258 | |
259 | /// Creates a stream that is always pending. |
260 | /// |
261 | /// # Examples |
262 | /// |
263 | /// ```no_run |
264 | /// use futures_lite::stream::{self, StreamExt}; |
265 | /// |
266 | /// # spin_on::spin_on(async { |
267 | /// let mut s = stream::pending::<i32>(); |
268 | /// s.next().await; |
269 | /// unreachable!(); |
270 | /// # }) |
271 | /// ``` |
272 | pub fn pending<T>() -> Pending<T> { |
273 | Pending { |
274 | _marker: PhantomData, |
275 | } |
276 | } |
277 | |
278 | /// Stream for the [`pending()`] function. |
279 | #[derive (Clone, Debug)] |
280 | #[must_use = "streams do nothing unless polled" ] |
281 | pub struct Pending<T> { |
282 | _marker: PhantomData<T>, |
283 | } |
284 | |
285 | impl<T> Unpin for Pending<T> {} |
286 | |
287 | impl<T> Stream for Pending<T> { |
288 | type Item = T; |
289 | |
290 | fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { |
291 | Poll::Pending |
292 | } |
293 | |
294 | fn size_hint(&self) -> (usize, Option<usize>) { |
295 | (0, Some(0)) |
296 | } |
297 | } |
298 | |
299 | /// Creates a stream from a function returning [`Poll`]. |
300 | /// |
301 | /// # Examples |
302 | /// |
303 | /// ``` |
304 | /// use futures_lite::stream::{self, StreamExt}; |
305 | /// use std::task::{Context, Poll}; |
306 | /// |
307 | /// # spin_on::spin_on(async { |
308 | /// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> { |
309 | /// Poll::Ready(Some(7)) |
310 | /// } |
311 | /// |
312 | /// assert_eq!(stream::poll_fn(f).next().await, Some(7)); |
313 | /// # }) |
314 | /// ``` |
315 | pub fn poll_fn<T, F>(f: F) -> PollFn<F> |
316 | where |
317 | F: FnMut(&mut Context<'_>) -> Poll<Option<T>>, |
318 | { |
319 | PollFn { f } |
320 | } |
321 | |
322 | /// Stream for the [`poll_fn()`] function. |
323 | #[derive (Clone)] |
324 | #[must_use = "streams do nothing unless polled" ] |
325 | pub struct PollFn<F> { |
326 | f: F, |
327 | } |
328 | |
329 | impl<F> Unpin for PollFn<F> {} |
330 | |
331 | impl<F> fmt::Debug for PollFn<F> { |
332 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
333 | f.debug_struct(name:"PollFn" ).finish() |
334 | } |
335 | } |
336 | |
337 | impl<T, F> Stream for PollFn<F> |
338 | where |
339 | F: FnMut(&mut Context<'_>) -> Poll<Option<T>>, |
340 | { |
341 | type Item = T; |
342 | |
343 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
344 | (&mut self.f)(cx) |
345 | } |
346 | } |
347 | |
348 | /// Creates an infinite stream that yields the same item repeatedly. |
349 | /// |
350 | /// # Examples |
351 | /// |
352 | /// ``` |
353 | /// use futures_lite::stream::{self, StreamExt}; |
354 | /// |
355 | /// # spin_on::spin_on(async { |
356 | /// let mut s = stream::repeat(7); |
357 | /// |
358 | /// assert_eq!(s.next().await, Some(7)); |
359 | /// assert_eq!(s.next().await, Some(7)); |
360 | /// # }) |
361 | /// ``` |
362 | pub fn repeat<T: Clone>(item: T) -> Repeat<T> { |
363 | Repeat { item } |
364 | } |
365 | |
366 | /// Stream for the [`repeat()`] function. |
367 | #[derive (Clone, Debug)] |
368 | #[must_use = "streams do nothing unless polled" ] |
369 | pub struct Repeat<T> { |
370 | item: T, |
371 | } |
372 | |
373 | impl<T> Unpin for Repeat<T> {} |
374 | |
375 | impl<T: Clone> Stream for Repeat<T> { |
376 | type Item = T; |
377 | |
378 | fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
379 | Poll::Ready(Some(self.item.clone())) |
380 | } |
381 | |
382 | fn size_hint(&self) -> (usize, Option<usize>) { |
383 | (usize::max_value(), None) |
384 | } |
385 | } |
386 | |
387 | /// Creates an infinite stream from a closure that generates items. |
388 | /// |
389 | /// # Examples |
390 | /// |
391 | /// ``` |
392 | /// use futures_lite::stream::{self, StreamExt}; |
393 | /// |
394 | /// # spin_on::spin_on(async { |
395 | /// let mut s = stream::repeat_with(|| 7); |
396 | /// |
397 | /// assert_eq!(s.next().await, Some(7)); |
398 | /// assert_eq!(s.next().await, Some(7)); |
399 | /// # }) |
400 | /// ``` |
401 | pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F> |
402 | where |
403 | F: FnMut() -> T, |
404 | { |
405 | RepeatWith { f: repeater } |
406 | } |
407 | |
408 | /// Stream for the [`repeat_with()`] function. |
409 | #[derive (Clone, Debug)] |
410 | #[must_use = "streams do nothing unless polled" ] |
411 | pub struct RepeatWith<F> { |
412 | f: F, |
413 | } |
414 | |
415 | impl<F> Unpin for RepeatWith<F> {} |
416 | |
417 | impl<T, F> Stream for RepeatWith<F> |
418 | where |
419 | F: FnMut() -> T, |
420 | { |
421 | type Item = T; |
422 | |
423 | fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
424 | let item: T = (&mut self.f)(); |
425 | Poll::Ready(Some(item)) |
426 | } |
427 | |
428 | fn size_hint(&self) -> (usize, Option<usize>) { |
429 | (usize::max_value(), None) |
430 | } |
431 | } |
432 | |
433 | /// Creates a stream from a seed value and an async closure operating on it. |
434 | /// |
435 | /// # Examples |
436 | /// |
437 | /// ``` |
438 | /// use futures_lite::stream::{self, StreamExt}; |
439 | /// |
440 | /// # spin_on::spin_on(async { |
441 | /// let s = stream::unfold(0, |mut n| async move { |
442 | /// if n < 2 { |
443 | /// let m = n + 1; |
444 | /// Some((n, m)) |
445 | /// } else { |
446 | /// None |
447 | /// } |
448 | /// }); |
449 | /// |
450 | /// let v: Vec<i32> = s.collect().await; |
451 | /// assert_eq!(v, [0, 1]); |
452 | /// # }) |
453 | /// ``` |
454 | pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut> |
455 | where |
456 | F: FnMut(T) -> Fut, |
457 | Fut: Future<Output = Option<(Item, T)>>, |
458 | { |
459 | Unfold { |
460 | f, |
461 | state: Some(seed), |
462 | fut: None, |
463 | } |
464 | } |
465 | |
466 | pin_project! { |
467 | /// Stream for the [`unfold()`] function. |
468 | #[derive(Clone)] |
469 | #[must_use = "streams do nothing unless polled" ] |
470 | pub struct Unfold<T, F, Fut> { |
471 | f: F, |
472 | state: Option<T>, |
473 | #[pin] |
474 | fut: Option<Fut>, |
475 | } |
476 | } |
477 | |
478 | impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut> |
479 | where |
480 | T: fmt::Debug, |
481 | Fut: fmt::Debug, |
482 | { |
483 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
484 | f&mut DebugStruct<'_, '_>.debug_struct("Unfold" ) |
485 | .field("state" , &self.state) |
486 | .field(name:"fut" , &self.fut) |
487 | .finish() |
488 | } |
489 | } |
490 | |
491 | impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut> |
492 | where |
493 | F: FnMut(T) -> Fut, |
494 | Fut: Future<Output = Option<(Item, T)>>, |
495 | { |
496 | type Item = Item; |
497 | |
498 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
499 | let mut this = self.project(); |
500 | |
501 | if let Some(state) = this.state.take() { |
502 | this.fut.set(Some((this.f)(state))); |
503 | } |
504 | |
505 | let step = ready!(this |
506 | .fut |
507 | .as_mut() |
508 | .as_pin_mut() |
509 | .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`" ) |
510 | .poll(cx)); |
511 | this.fut.set(None); |
512 | |
513 | if let Some((item, next_state)) = step { |
514 | *this.state = Some(next_state); |
515 | Poll::Ready(Some(item)) |
516 | } else { |
517 | Poll::Ready(None) |
518 | } |
519 | } |
520 | } |
521 | |
522 | /// Creates a stream from a seed value and a fallible async closure operating on it. |
523 | /// |
524 | /// # Examples |
525 | /// |
526 | /// ``` |
527 | /// use futures_lite::stream::{self, StreamExt}; |
528 | /// |
529 | /// # spin_on::spin_on(async { |
530 | /// let s = stream::try_unfold(0, |mut n| async move { |
531 | /// if n < 2 { |
532 | /// let m = n + 1; |
533 | /// Ok(Some((n, m))) |
534 | /// } else { |
535 | /// std::io::Result::Ok(None) |
536 | /// } |
537 | /// }); |
538 | /// |
539 | /// let v: Vec<i32> = s.try_collect().await?; |
540 | /// assert_eq!(v, [0, 1]); |
541 | /// # std::io::Result::Ok(()) }); |
542 | /// ``` |
543 | pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut> |
544 | where |
545 | F: FnMut(T) -> Fut, |
546 | Fut: Future<Output = Result<Option<(Item, T)>, E>>, |
547 | { |
548 | TryUnfold { |
549 | f, |
550 | state: Some(init), |
551 | fut: None, |
552 | } |
553 | } |
554 | |
555 | pin_project! { |
556 | /// Stream for the [`try_unfold()`] function. |
557 | #[derive(Clone)] |
558 | #[must_use = "streams do nothing unless polled" ] |
559 | pub struct TryUnfold<T, F, Fut> { |
560 | f: F, |
561 | state: Option<T>, |
562 | #[pin] |
563 | fut: Option<Fut>, |
564 | } |
565 | } |
566 | |
567 | impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut> |
568 | where |
569 | T: fmt::Debug, |
570 | Fut: fmt::Debug, |
571 | { |
572 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
573 | f&mut DebugStruct<'_, '_>.debug_struct("TryUnfold" ) |
574 | .field("state" , &self.state) |
575 | .field(name:"fut" , &self.fut) |
576 | .finish() |
577 | } |
578 | } |
579 | |
580 | impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut> |
581 | where |
582 | F: FnMut(T) -> Fut, |
583 | Fut: Future<Output = Result<Option<(Item, T)>, E>>, |
584 | { |
585 | type Item = Result<Item, E>; |
586 | |
587 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
588 | let mut this = self.project(); |
589 | |
590 | if let Some(state) = this.state.take() { |
591 | this.fut.set(Some((this.f)(state))); |
592 | } |
593 | |
594 | match this.fut.as_mut().as_pin_mut() { |
595 | None => { |
596 | // The future previously errored |
597 | Poll::Ready(None) |
598 | } |
599 | Some(future) => { |
600 | let step = ready!(future.poll(cx)); |
601 | this.fut.set(None); |
602 | |
603 | match step { |
604 | Ok(Some((item, next_state))) => { |
605 | *this.state = Some(next_state); |
606 | Poll::Ready(Some(Ok(item))) |
607 | } |
608 | Ok(None) => Poll::Ready(None), |
609 | Err(e) => Poll::Ready(Some(Err(e))), |
610 | } |
611 | } |
612 | } |
613 | } |
614 | } |
615 | |
616 | /// Creates a stream that invokes the given future as its first item, and then |
617 | /// produces no more items. |
618 | /// |
619 | /// # Example |
620 | /// |
621 | /// ``` |
622 | /// use futures_lite::{stream, prelude::*}; |
623 | /// |
624 | /// # spin_on::spin_on(async { |
625 | /// let mut stream = Box::pin(stream::once_future(async { 1 })); |
626 | /// assert_eq!(stream.next().await, Some(1)); |
627 | /// assert_eq!(stream.next().await, None); |
628 | /// # }); |
629 | /// ``` |
630 | pub fn once_future<F: Future>(future: F) -> OnceFuture<F> { |
631 | OnceFuture { |
632 | future: Some(future), |
633 | } |
634 | } |
635 | |
636 | pin_project! { |
637 | /// Stream for the [`once_future()`] method. |
638 | #[derive(Debug)] |
639 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
640 | pub struct OnceFuture<F> { |
641 | #[pin] |
642 | future: Option<F>, |
643 | } |
644 | } |
645 | |
646 | impl<F: Future> Stream for OnceFuture<F> { |
647 | type Item = F::Output; |
648 | |
649 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
650 | let mut this: Projection<'_, F> = self.project(); |
651 | |
652 | match this.future.as_mut().as_pin_mut().map(|f: Pin<&mut F>| f.poll(cx)) { |
653 | Some(Poll::Ready(t: ::Output)) => { |
654 | this.future.set(None); |
655 | Poll::Ready(Some(t)) |
656 | } |
657 | Some(Poll::Pending) => Poll::Pending, |
658 | None => Poll::Ready(None), |
659 | } |
660 | } |
661 | } |
662 | |
663 | /// Extension trait for [`Stream`]. |
664 | pub trait StreamExt: Stream { |
665 | /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types. |
666 | fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> |
667 | where |
668 | Self: Unpin, |
669 | { |
670 | Stream::poll_next(Pin::new(self), cx) |
671 | } |
672 | |
673 | /// Retrieves the next item in the stream. |
674 | /// |
675 | /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to |
676 | /// resume iteration after that. |
677 | /// |
678 | /// # Examples |
679 | /// |
680 | /// ``` |
681 | /// use futures_lite::stream::{self, StreamExt}; |
682 | /// |
683 | /// # spin_on::spin_on(async { |
684 | /// let mut s = stream::iter(1..=3); |
685 | /// |
686 | /// assert_eq!(s.next().await, Some(1)); |
687 | /// assert_eq!(s.next().await, Some(2)); |
688 | /// assert_eq!(s.next().await, Some(3)); |
689 | /// assert_eq!(s.next().await, None); |
690 | /// # }); |
691 | /// ``` |
692 | fn next(&mut self) -> NextFuture<'_, Self> |
693 | where |
694 | Self: Unpin, |
695 | { |
696 | NextFuture { stream: self } |
697 | } |
698 | |
699 | /// Retrieves the next item in the stream. |
700 | /// |
701 | /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns |
702 | /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`. |
703 | /// |
704 | /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`. |
705 | /// |
706 | /// # Examples |
707 | /// |
708 | /// ``` |
709 | /// use futures_lite::stream::{self, StreamExt}; |
710 | /// |
711 | /// # spin_on::spin_on(async { |
712 | /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error" )]); |
713 | /// |
714 | /// assert_eq!(s.try_next().await, Ok(Some(1))); |
715 | /// assert_eq!(s.try_next().await, Ok(Some(2))); |
716 | /// assert_eq!(s.try_next().await, Err("error" )); |
717 | /// assert_eq!(s.try_next().await, Ok(None)); |
718 | /// # }); |
719 | /// ``` |
720 | fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self> |
721 | where |
722 | Self: Stream<Item = Result<T, E>> + Unpin, |
723 | { |
724 | TryNextFuture { stream: self } |
725 | } |
726 | |
727 | /// Counts the number of items in the stream. |
728 | /// |
729 | /// # Examples |
730 | /// |
731 | /// ``` |
732 | /// use futures_lite::stream::{self, StreamExt}; |
733 | /// |
734 | /// # spin_on::spin_on(async { |
735 | /// let s1 = stream::iter(vec![0]); |
736 | /// let s2 = stream::iter(vec![1, 2, 3]); |
737 | /// |
738 | /// assert_eq!(s1.count().await, 1); |
739 | /// assert_eq!(s2.count().await, 3); |
740 | /// # }); |
741 | /// ``` |
742 | fn count(self) -> CountFuture<Self> |
743 | where |
744 | Self: Sized, |
745 | { |
746 | CountFuture { |
747 | stream: self, |
748 | count: 0, |
749 | } |
750 | } |
751 | |
752 | /// Maps items of the stream to new values using a closure. |
753 | /// |
754 | /// # Examples |
755 | /// |
756 | /// ``` |
757 | /// use futures_lite::stream::{self, StreamExt}; |
758 | /// |
759 | /// # spin_on::spin_on(async { |
760 | /// let s = stream::iter(vec![1, 2, 3]); |
761 | /// let mut s = s.map(|x| 2 * x); |
762 | /// |
763 | /// assert_eq!(s.next().await, Some(2)); |
764 | /// assert_eq!(s.next().await, Some(4)); |
765 | /// assert_eq!(s.next().await, Some(6)); |
766 | /// assert_eq!(s.next().await, None); |
767 | /// # }); |
768 | /// ``` |
769 | fn map<T, F>(self, f: F) -> Map<Self, F> |
770 | where |
771 | Self: Sized, |
772 | F: FnMut(Self::Item) -> T, |
773 | { |
774 | Map { stream: self, f } |
775 | } |
776 | |
777 | /// Maps items to streams and then concatenates them. |
778 | /// |
779 | /// # Examples |
780 | /// |
781 | /// ``` |
782 | /// use futures_lite::stream::{self, StreamExt}; |
783 | /// |
784 | /// # spin_on::spin_on(async { |
785 | /// let words = stream::iter(vec!["one" , "two" ]); |
786 | /// |
787 | /// let s: String = words |
788 | /// .flat_map(|s| stream::iter(s.chars())) |
789 | /// .collect() |
790 | /// .await; |
791 | /// |
792 | /// assert_eq!(s, "onetwo" ); |
793 | /// # }); |
794 | /// ``` |
795 | fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> |
796 | where |
797 | Self: Sized, |
798 | U: Stream, |
799 | F: FnMut(Self::Item) -> U, |
800 | { |
801 | FlatMap { |
802 | stream: self.map(f), |
803 | inner_stream: None, |
804 | } |
805 | } |
806 | |
807 | /// Concatenates inner streams. |
808 | /// |
809 | /// # Examples |
810 | /// |
811 | /// ``` |
812 | /// use futures_lite::stream::{self, StreamExt}; |
813 | /// |
814 | /// # spin_on::spin_on(async { |
815 | /// let s1 = stream::iter(vec![1, 2, 3]); |
816 | /// let s2 = stream::iter(vec![4, 5]); |
817 | /// |
818 | /// let s = stream::iter(vec![s1, s2]); |
819 | /// let v: Vec<_> = s.flatten().collect().await; |
820 | /// assert_eq!(v, [1, 2, 3, 4, 5]); |
821 | /// # }); |
822 | /// ``` |
823 | fn flatten(self) -> Flatten<Self> |
824 | where |
825 | Self: Sized, |
826 | Self::Item: Stream, |
827 | { |
828 | Flatten { |
829 | stream: self, |
830 | inner_stream: None, |
831 | } |
832 | } |
833 | |
834 | /// Maps items of the stream to new values using an async closure. |
835 | /// |
836 | /// # Examples |
837 | /// |
838 | /// ``` |
839 | /// use futures_lite::pin; |
840 | /// use futures_lite::stream::{self, StreamExt}; |
841 | /// |
842 | /// # spin_on::spin_on(async { |
843 | /// let s = stream::iter(vec![1, 2, 3]); |
844 | /// let mut s = s.then(|x| async move { 2 * x }); |
845 | /// |
846 | /// pin!(s); |
847 | /// assert_eq!(s.next().await, Some(2)); |
848 | /// assert_eq!(s.next().await, Some(4)); |
849 | /// assert_eq!(s.next().await, Some(6)); |
850 | /// assert_eq!(s.next().await, None); |
851 | /// # }); |
852 | /// ``` |
853 | fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut> |
854 | where |
855 | Self: Sized, |
856 | F: FnMut(Self::Item) -> Fut, |
857 | Fut: Future, |
858 | { |
859 | Then { |
860 | stream: self, |
861 | future: None, |
862 | f, |
863 | } |
864 | } |
865 | |
866 | /// Keeps items of the stream for which `predicate` returns `true`. |
867 | /// |
868 | /// # Examples |
869 | /// |
870 | /// ``` |
871 | /// use futures_lite::stream::{self, StreamExt}; |
872 | /// |
873 | /// # spin_on::spin_on(async { |
874 | /// let s = stream::iter(vec![1, 2, 3, 4]); |
875 | /// let mut s = s.filter(|i| i % 2 == 0); |
876 | /// |
877 | /// assert_eq!(s.next().await, Some(2)); |
878 | /// assert_eq!(s.next().await, Some(4)); |
879 | /// assert_eq!(s.next().await, None); |
880 | /// # }); |
881 | /// ``` |
882 | fn filter<P>(self, predicate: P) -> Filter<Self, P> |
883 | where |
884 | Self: Sized, |
885 | P: FnMut(&Self::Item) -> bool, |
886 | { |
887 | Filter { |
888 | stream: self, |
889 | predicate, |
890 | } |
891 | } |
892 | |
893 | /// Filters and maps items of the stream using a closure. |
894 | /// |
895 | /// # Examples |
896 | /// |
897 | /// ``` |
898 | /// use futures_lite::stream::{self, StreamExt}; |
899 | /// |
900 | /// # spin_on::spin_on(async { |
901 | /// let s = stream::iter(vec!["1" , "lol" , "3" , "NaN" , "5" ]); |
902 | /// let mut s = s.filter_map(|a| a.parse::<u32>().ok()); |
903 | /// |
904 | /// assert_eq!(s.next().await, Some(1)); |
905 | /// assert_eq!(s.next().await, Some(3)); |
906 | /// assert_eq!(s.next().await, Some(5)); |
907 | /// assert_eq!(s.next().await, None); |
908 | /// # }); |
909 | /// ``` |
910 | fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> |
911 | where |
912 | Self: Sized, |
913 | F: FnMut(Self::Item) -> Option<T>, |
914 | { |
915 | FilterMap { stream: self, f } |
916 | } |
917 | |
918 | /// Takes only the first `n` items of the stream. |
919 | /// |
920 | /// # Examples |
921 | /// |
922 | /// ``` |
923 | /// use futures_lite::stream::{self, StreamExt}; |
924 | /// |
925 | /// # spin_on::spin_on(async { |
926 | /// let mut s = stream::repeat(7).take(2); |
927 | /// |
928 | /// assert_eq!(s.next().await, Some(7)); |
929 | /// assert_eq!(s.next().await, Some(7)); |
930 | /// assert_eq!(s.next().await, None); |
931 | /// # }); |
932 | /// ``` |
933 | fn take(self, n: usize) -> Take<Self> |
934 | where |
935 | Self: Sized, |
936 | { |
937 | Take { stream: self, n } |
938 | } |
939 | |
940 | /// Takes items while `predicate` returns `true`. |
941 | /// |
942 | /// # Examples |
943 | /// |
944 | /// ``` |
945 | /// use futures_lite::stream::{self, StreamExt}; |
946 | /// |
947 | /// # spin_on::spin_on(async { |
948 | /// let s = stream::iter(vec![1, 2, 3, 4]); |
949 | /// let mut s = s.take_while(|x| *x < 3); |
950 | /// |
951 | /// assert_eq!(s.next().await, Some(1)); |
952 | /// assert_eq!(s.next().await, Some(2)); |
953 | /// assert_eq!(s.next().await, None); |
954 | /// # }); |
955 | /// ``` |
956 | fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P> |
957 | where |
958 | Self: Sized, |
959 | P: FnMut(&Self::Item) -> bool, |
960 | { |
961 | TakeWhile { |
962 | stream: self, |
963 | predicate, |
964 | } |
965 | } |
966 | |
967 | /// Skips the first `n` items of the stream. |
968 | /// |
969 | /// # Examples |
970 | /// |
971 | /// ``` |
972 | /// use futures_lite::stream::{self, StreamExt}; |
973 | /// |
974 | /// # spin_on::spin_on(async { |
975 | /// let s = stream::iter(vec![1, 2, 3]); |
976 | /// let mut s = s.skip(2); |
977 | /// |
978 | /// assert_eq!(s.next().await, Some(3)); |
979 | /// assert_eq!(s.next().await, None); |
980 | /// # }); |
981 | /// ``` |
982 | fn skip(self, n: usize) -> Skip<Self> |
983 | where |
984 | Self: Sized, |
985 | { |
986 | Skip { stream: self, n } |
987 | } |
988 | |
989 | /// Skips items while `predicate` returns `true`. |
990 | /// |
991 | /// # Examples |
992 | /// |
993 | /// ``` |
994 | /// use futures_lite::stream::{self, StreamExt}; |
995 | /// |
996 | /// # spin_on::spin_on(async { |
997 | /// let s = stream::iter(vec![-1i32, 0, 1]); |
998 | /// let mut s = s.skip_while(|x| x.is_negative()); |
999 | /// |
1000 | /// assert_eq!(s.next().await, Some(0)); |
1001 | /// assert_eq!(s.next().await, Some(1)); |
1002 | /// assert_eq!(s.next().await, None); |
1003 | /// # }); |
1004 | /// ``` |
1005 | fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P> |
1006 | where |
1007 | Self: Sized, |
1008 | P: FnMut(&Self::Item) -> bool, |
1009 | { |
1010 | SkipWhile { |
1011 | stream: self, |
1012 | predicate: Some(predicate), |
1013 | } |
1014 | } |
1015 | |
1016 | /// Yields every `step`th item. |
1017 | /// |
1018 | /// # Panics |
1019 | /// |
1020 | /// This method will panic if the `step` is 0. |
1021 | /// |
1022 | /// # Examples |
1023 | /// |
1024 | /// ``` |
1025 | /// use futures_lite::stream::{self, StreamExt}; |
1026 | /// |
1027 | /// # spin_on::spin_on(async { |
1028 | /// let s = stream::iter(vec![0, 1, 2, 3, 4]); |
1029 | /// let mut s = s.step_by(2); |
1030 | /// |
1031 | /// assert_eq!(s.next().await, Some(0)); |
1032 | /// assert_eq!(s.next().await, Some(2)); |
1033 | /// assert_eq!(s.next().await, Some(4)); |
1034 | /// assert_eq!(s.next().await, None); |
1035 | /// # }); |
1036 | /// ``` |
1037 | fn step_by(self, step: usize) -> StepBy<Self> |
1038 | where |
1039 | Self: Sized, |
1040 | { |
1041 | assert!(step > 0, "`step` must be greater than zero" ); |
1042 | StepBy { |
1043 | stream: self, |
1044 | step, |
1045 | i: 0, |
1046 | } |
1047 | } |
1048 | |
1049 | /// Appends another stream to the end of this one. |
1050 | /// |
1051 | /// # Examples |
1052 | /// |
1053 | /// ``` |
1054 | /// use futures_lite::stream::{self, StreamExt}; |
1055 | /// |
1056 | /// # spin_on::spin_on(async { |
1057 | /// let s1 = stream::iter(vec![1, 2]); |
1058 | /// let s2 = stream::iter(vec![7, 8]); |
1059 | /// let mut s = s1.chain(s2); |
1060 | /// |
1061 | /// assert_eq!(s.next().await, Some(1)); |
1062 | /// assert_eq!(s.next().await, Some(2)); |
1063 | /// assert_eq!(s.next().await, Some(7)); |
1064 | /// assert_eq!(s.next().await, Some(8)); |
1065 | /// assert_eq!(s.next().await, None); |
1066 | /// # }); |
1067 | /// ``` |
1068 | fn chain<U>(self, other: U) -> Chain<Self, U> |
1069 | where |
1070 | Self: Sized, |
1071 | U: Stream<Item = Self::Item> + Sized, |
1072 | { |
1073 | Chain { |
1074 | first: self.fuse(), |
1075 | second: other.fuse(), |
1076 | } |
1077 | } |
1078 | |
1079 | /// Clones all items. |
1080 | /// |
1081 | /// # Examples |
1082 | /// |
1083 | /// ``` |
1084 | /// use futures_lite::stream::{self, StreamExt}; |
1085 | /// |
1086 | /// # spin_on::spin_on(async { |
1087 | /// let s = stream::iter(vec![&1, &2]); |
1088 | /// let mut s = s.cloned(); |
1089 | /// |
1090 | /// assert_eq!(s.next().await, Some(1)); |
1091 | /// assert_eq!(s.next().await, Some(2)); |
1092 | /// assert_eq!(s.next().await, None); |
1093 | /// # }); |
1094 | /// ``` |
1095 | fn cloned<'a, T>(self) -> Cloned<Self> |
1096 | where |
1097 | Self: Stream<Item = &'a T> + Sized, |
1098 | T: Clone + 'a, |
1099 | { |
1100 | Cloned { stream: self } |
1101 | } |
1102 | |
1103 | /// Copies all items. |
1104 | /// |
1105 | /// # Examples |
1106 | /// |
1107 | /// ``` |
1108 | /// use futures_lite::stream::{self, StreamExt}; |
1109 | /// |
1110 | /// # spin_on::spin_on(async { |
1111 | /// let s = stream::iter(vec![&1, &2]); |
1112 | /// let mut s = s.copied(); |
1113 | /// |
1114 | /// assert_eq!(s.next().await, Some(1)); |
1115 | /// assert_eq!(s.next().await, Some(2)); |
1116 | /// assert_eq!(s.next().await, None); |
1117 | /// # }); |
1118 | /// ``` |
1119 | fn copied<'a, T>(self) -> Copied<Self> |
1120 | where |
1121 | Self: Stream<Item = &'a T> + Sized, |
1122 | T: Copy + 'a, |
1123 | { |
1124 | Copied { stream: self } |
1125 | } |
1126 | |
1127 | /// Collects all items in the stream into a collection. |
1128 | /// |
1129 | /// # Examples |
1130 | /// |
1131 | /// ``` |
1132 | /// use futures_lite::stream::{self, StreamExt}; |
1133 | /// |
1134 | /// # spin_on::spin_on(async { |
1135 | /// let mut s = stream::iter(1..=3); |
1136 | /// |
1137 | /// let items: Vec<_> = s.collect().await; |
1138 | /// assert_eq!(items, [1, 2, 3]); |
1139 | /// # }); |
1140 | /// ``` |
1141 | fn collect<C>(self) -> CollectFuture<Self, C> |
1142 | where |
1143 | Self: Sized, |
1144 | C: Default + Extend<Self::Item>, |
1145 | { |
1146 | CollectFuture { |
1147 | stream: self, |
1148 | collection: Default::default(), |
1149 | } |
1150 | } |
1151 | |
1152 | /// Collects all items in the fallible stream into a collection. |
1153 | /// |
1154 | /// ``` |
1155 | /// use futures_lite::stream::{self, StreamExt}; |
1156 | /// |
1157 | /// # spin_on::spin_on(async { |
1158 | /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]); |
1159 | /// let res: Result<Vec<i32>, i32> = s.try_collect().await; |
1160 | /// assert_eq!(res, Err(2)); |
1161 | /// |
1162 | /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]); |
1163 | /// let res: Result<Vec<i32>, i32> = s.try_collect().await; |
1164 | /// assert_eq!(res, Ok(vec![1, 2, 3])); |
1165 | /// # }) |
1166 | /// ``` |
1167 | fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C> |
1168 | where |
1169 | Self: Stream<Item = Result<T, E>> + Sized, |
1170 | C: Default + Extend<T>, |
1171 | { |
1172 | TryCollectFuture { |
1173 | stream: self, |
1174 | items: Default::default(), |
1175 | } |
1176 | } |
1177 | |
1178 | /// Partitions items into those for which `predicate` is `true` and those for which it is |
1179 | /// `false`, and then collects them into two collections. |
1180 | /// |
1181 | /// # Examples |
1182 | /// |
1183 | /// ``` |
1184 | /// use futures_lite::stream::{self, StreamExt}; |
1185 | /// |
1186 | /// # spin_on::spin_on(async { |
1187 | /// let s = stream::iter(vec![1, 2, 3]); |
1188 | /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await; |
1189 | /// |
1190 | /// assert_eq!(even, &[2]); |
1191 | /// assert_eq!(odd, &[1, 3]); |
1192 | /// # }) |
1193 | /// ``` |
1194 | fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B> |
1195 | where |
1196 | Self: Sized, |
1197 | B: Default + Extend<Self::Item>, |
1198 | P: FnMut(&Self::Item) -> bool, |
1199 | { |
1200 | PartitionFuture { |
1201 | stream: self, |
1202 | predicate, |
1203 | res: Some(Default::default()), |
1204 | } |
1205 | } |
1206 | |
1207 | /// Accumulates a computation over the stream. |
1208 | /// |
1209 | /// The computation begins with the accumulator value set to `init`, and then applies `f` to |
1210 | /// the accumulator and each item in the stream. The final accumulator value is returned. |
1211 | /// |
1212 | /// # Examples |
1213 | /// |
1214 | /// ``` |
1215 | /// use futures_lite::stream::{self, StreamExt}; |
1216 | /// |
1217 | /// # spin_on::spin_on(async { |
1218 | /// let s = stream::iter(vec![1, 2, 3]); |
1219 | /// let sum = s.fold(0, |acc, x| acc + x).await; |
1220 | /// |
1221 | /// assert_eq!(sum, 6); |
1222 | /// # }) |
1223 | /// ``` |
1224 | fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T> |
1225 | where |
1226 | Self: Sized, |
1227 | F: FnMut(T, Self::Item) -> T, |
1228 | { |
1229 | FoldFuture { |
1230 | stream: self, |
1231 | f, |
1232 | acc: Some(init), |
1233 | } |
1234 | } |
1235 | |
1236 | /// Accumulates a fallible computation over the stream. |
1237 | /// |
1238 | /// The computation begins with the accumulator value set to `init`, and then applies `f` to |
1239 | /// the accumulator and each item in the stream. The final accumulator value is returned, or an |
1240 | /// error if `f` failed the computation. |
1241 | /// |
1242 | /// # Examples |
1243 | /// |
1244 | /// ``` |
1245 | /// use futures_lite::stream::{self, StreamExt}; |
1246 | /// |
1247 | /// # spin_on::spin_on(async { |
1248 | /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]); |
1249 | /// |
1250 | /// let sum = s.try_fold(0, |acc, v| { |
1251 | /// if (acc + v) % 2 == 1 { |
1252 | /// Ok(acc + v) |
1253 | /// } else { |
1254 | /// Err("fail" ) |
1255 | /// } |
1256 | /// }) |
1257 | /// .await; |
1258 | /// |
1259 | /// assert_eq!(sum, Err("fail" )); |
1260 | /// # }) |
1261 | /// ``` |
1262 | fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B> |
1263 | where |
1264 | Self: Stream<Item = Result<T, E>> + Unpin + Sized, |
1265 | F: FnMut(B, T) -> Result<B, E>, |
1266 | { |
1267 | TryFoldFuture { |
1268 | stream: self, |
1269 | f, |
1270 | acc: Some(init), |
1271 | } |
1272 | } |
1273 | |
1274 | /// Maps items of the stream to new values using a state value and a closure. |
1275 | /// |
1276 | /// Scanning begins with the inital state set to `initial_state`, and then applies `f` to the |
1277 | /// state and each item in the stream. The stream stops when `f` returns `None`. |
1278 | /// |
1279 | /// # Examples |
1280 | /// |
1281 | /// ``` |
1282 | /// use futures_lite::stream::{self, StreamExt}; |
1283 | /// |
1284 | /// # spin_on::spin_on(async { |
1285 | /// let s = stream::iter(vec![1, 2, 3]); |
1286 | /// let mut s = s.scan(1, |state, x| { |
1287 | /// *state = *state * x; |
1288 | /// Some(-*state) |
1289 | /// }); |
1290 | /// |
1291 | /// assert_eq!(s.next().await, Some(-1)); |
1292 | /// assert_eq!(s.next().await, Some(-2)); |
1293 | /// assert_eq!(s.next().await, Some(-6)); |
1294 | /// assert_eq!(s.next().await, None); |
1295 | /// # }) |
1296 | /// ``` |
1297 | fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F> |
1298 | where |
1299 | Self: Sized, |
1300 | F: FnMut(&mut St, Self::Item) -> Option<B>, |
1301 | { |
1302 | Scan { |
1303 | stream: self, |
1304 | state_f: (initial_state, f), |
1305 | } |
1306 | } |
1307 | |
1308 | /// Fuses the stream so that it stops yielding items after the first [`None`]. |
1309 | /// |
1310 | /// # Examples |
1311 | /// |
1312 | /// ``` |
1313 | /// use futures_lite::stream::{self, StreamExt}; |
1314 | /// |
1315 | /// # spin_on::spin_on(async { |
1316 | /// let mut s = stream::once(1).fuse(); |
1317 | /// |
1318 | /// assert_eq!(s.next().await, Some(1)); |
1319 | /// assert_eq!(s.next().await, None); |
1320 | /// assert_eq!(s.next().await, None); |
1321 | /// # }) |
1322 | /// ``` |
1323 | fn fuse(self) -> Fuse<Self> |
1324 | where |
1325 | Self: Sized, |
1326 | { |
1327 | Fuse { |
1328 | stream: self, |
1329 | done: false, |
1330 | } |
1331 | } |
1332 | |
1333 | /// Repeats the stream from beginning to end, forever. |
1334 | /// |
1335 | /// # Examples |
1336 | /// |
1337 | /// ``` |
1338 | /// use futures_lite::stream::{self, StreamExt}; |
1339 | /// |
1340 | /// # spin_on::spin_on(async { |
1341 | /// let mut s = stream::iter(vec![1, 2]).cycle(); |
1342 | /// |
1343 | /// assert_eq!(s.next().await, Some(1)); |
1344 | /// assert_eq!(s.next().await, Some(2)); |
1345 | /// assert_eq!(s.next().await, Some(1)); |
1346 | /// assert_eq!(s.next().await, Some(2)); |
1347 | /// # }); |
1348 | /// ``` |
1349 | fn cycle(self) -> Cycle<Self> |
1350 | where |
1351 | Self: Clone + Sized, |
1352 | { |
1353 | Cycle { |
1354 | orig: self.clone(), |
1355 | stream: self, |
1356 | } |
1357 | } |
1358 | |
1359 | /// Enumerates items, mapping them to `(index, item)`. |
1360 | /// |
1361 | /// # Examples |
1362 | /// |
1363 | /// ``` |
1364 | /// use futures_lite::stream::{self, StreamExt}; |
1365 | /// |
1366 | /// # spin_on::spin_on(async { |
1367 | /// let s = stream::iter(vec!['a' , 'b' , 'c' ]); |
1368 | /// let mut s = s.enumerate(); |
1369 | /// |
1370 | /// assert_eq!(s.next().await, Some((0, 'a' ))); |
1371 | /// assert_eq!(s.next().await, Some((1, 'b' ))); |
1372 | /// assert_eq!(s.next().await, Some((2, 'c' ))); |
1373 | /// assert_eq!(s.next().await, None); |
1374 | /// # }); |
1375 | /// ``` |
1376 | fn enumerate(self) -> Enumerate<Self> |
1377 | where |
1378 | Self: Sized, |
1379 | { |
1380 | Enumerate { stream: self, i: 0 } |
1381 | } |
1382 | |
1383 | /// Calls a closure on each item and passes it on. |
1384 | /// |
1385 | /// # Examples |
1386 | /// |
1387 | /// ``` |
1388 | /// use futures_lite::stream::{self, StreamExt}; |
1389 | /// |
1390 | /// # spin_on::spin_on(async { |
1391 | /// let s = stream::iter(vec![1, 2, 3, 4, 5]); |
1392 | /// |
1393 | /// let sum = s |
1394 | /// .inspect(|x| println!("about to filter {}" , x)) |
1395 | /// .filter(|x| x % 2 == 0) |
1396 | /// .inspect(|x| println!("made it through filter: {}" , x)) |
1397 | /// .fold(0, |sum, i| sum + i) |
1398 | /// .await; |
1399 | /// # }); |
1400 | /// ``` |
1401 | fn inspect<F>(self, f: F) -> Inspect<Self, F> |
1402 | where |
1403 | Self: Sized, |
1404 | F: FnMut(&Self::Item), |
1405 | { |
1406 | Inspect { stream: self, f } |
1407 | } |
1408 | |
1409 | /// Gets the `n`th item of the stream. |
1410 | /// |
1411 | /// In the end, `n+1` items of the stream will be consumed. |
1412 | /// |
1413 | /// # Examples |
1414 | /// |
1415 | /// ``` |
1416 | /// use futures_lite::stream::{self, StreamExt}; |
1417 | /// |
1418 | /// # spin_on::spin_on(async { |
1419 | /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]); |
1420 | /// |
1421 | /// assert_eq!(s.nth(2).await, Some(2)); |
1422 | /// assert_eq!(s.nth(2).await, Some(5)); |
1423 | /// assert_eq!(s.nth(2).await, None); |
1424 | /// # }); |
1425 | /// ``` |
1426 | fn nth(&mut self, n: usize) -> NthFuture<'_, Self> |
1427 | where |
1428 | Self: Unpin, |
1429 | { |
1430 | NthFuture { stream: self, n } |
1431 | } |
1432 | |
1433 | /// Returns the last item in the stream. |
1434 | /// |
1435 | /// # Examples |
1436 | /// |
1437 | /// ``` |
1438 | /// use futures_lite::stream::{self, StreamExt}; |
1439 | /// |
1440 | /// # spin_on::spin_on(async { |
1441 | /// let s = stream::iter(vec![1, 2, 3, 4]); |
1442 | /// assert_eq!(s.last().await, Some(4)); |
1443 | /// |
1444 | /// let s = stream::empty::<i32>(); |
1445 | /// assert_eq!(s.last().await, None); |
1446 | /// # }); |
1447 | /// ``` |
1448 | fn last(self) -> LastFuture<Self> |
1449 | where |
1450 | Self: Sized, |
1451 | { |
1452 | LastFuture { |
1453 | stream: self, |
1454 | last: None, |
1455 | } |
1456 | } |
1457 | |
1458 | /// Finds the first item of the stream for which `predicate` returns `true`. |
1459 | /// |
1460 | /// # Examples |
1461 | /// |
1462 | /// ``` |
1463 | /// use futures_lite::stream::{self, StreamExt}; |
1464 | /// |
1465 | /// # spin_on::spin_on(async { |
1466 | /// let mut s = stream::iter(vec![11, 12, 13, 14]); |
1467 | /// |
1468 | /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12)); |
1469 | /// assert_eq!(s.next().await, Some(13)); |
1470 | /// # }); |
1471 | /// ``` |
1472 | fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P> |
1473 | where |
1474 | Self: Unpin, |
1475 | P: FnMut(&Self::Item) -> bool, |
1476 | { |
1477 | FindFuture { |
1478 | stream: self, |
1479 | predicate, |
1480 | } |
1481 | } |
1482 | |
1483 | /// Applies a closure to items in the stream and returns the first [`Some`] result. |
1484 | /// |
1485 | /// # Examples |
1486 | /// |
1487 | /// ``` |
1488 | /// use futures_lite::stream::{self, StreamExt}; |
1489 | /// |
1490 | /// # spin_on::spin_on(async { |
1491 | /// let mut s = stream::iter(vec!["lol" , "NaN" , "2" , "5" ]); |
1492 | /// let number = s.find_map(|s| s.parse().ok()).await; |
1493 | /// |
1494 | /// assert_eq!(number, Some(2)); |
1495 | /// # }); |
1496 | /// ``` |
1497 | fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F> |
1498 | where |
1499 | Self: Unpin, |
1500 | F: FnMut(Self::Item) -> Option<B>, |
1501 | { |
1502 | FindMapFuture { stream: self, f } |
1503 | } |
1504 | |
1505 | /// Finds the index of the first item of the stream for which `predicate` returns `true`. |
1506 | /// |
1507 | /// # Examples |
1508 | /// |
1509 | /// ``` |
1510 | /// use futures_lite::stream::{self, StreamExt}; |
1511 | /// |
1512 | /// # spin_on::spin_on(async { |
1513 | /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]); |
1514 | /// |
1515 | /// assert_eq!(s.position(|x| x == 2).await, Some(2)); |
1516 | /// assert_eq!(s.position(|x| x == 3).await, Some(0)); |
1517 | /// assert_eq!(s.position(|x| x == 9).await, None); |
1518 | /// # }); |
1519 | /// ``` |
1520 | fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P> |
1521 | where |
1522 | Self: Unpin, |
1523 | P: FnMut(Self::Item) -> bool, |
1524 | { |
1525 | PositionFuture { |
1526 | stream: self, |
1527 | predicate, |
1528 | index: 0, |
1529 | } |
1530 | } |
1531 | |
1532 | /// Tests if `predicate` returns `true` for all items in the stream. |
1533 | /// |
1534 | /// The result is `true` for an empty stream. |
1535 | /// |
1536 | /// # Examples |
1537 | /// |
1538 | /// ``` |
1539 | /// use futures_lite::stream::{self, StreamExt}; |
1540 | /// |
1541 | /// # spin_on::spin_on(async { |
1542 | /// let mut s = stream::iter(vec![1, 2, 3]); |
1543 | /// assert!(!s.all(|x| x % 2 == 0).await); |
1544 | /// |
1545 | /// let mut s = stream::iter(vec![2, 4, 6, 8]); |
1546 | /// assert!(s.all(|x| x % 2 == 0).await); |
1547 | /// |
1548 | /// let mut s = stream::empty::<i32>(); |
1549 | /// assert!(s.all(|x| x % 2 == 0).await); |
1550 | /// # }); |
1551 | /// ``` |
1552 | fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P> |
1553 | where |
1554 | Self: Unpin, |
1555 | P: FnMut(Self::Item) -> bool, |
1556 | { |
1557 | AllFuture { |
1558 | stream: self, |
1559 | predicate, |
1560 | } |
1561 | } |
1562 | |
1563 | /// Tests if `predicate` returns `true` for any item in the stream. |
1564 | /// |
1565 | /// The result is `false` for an empty stream. |
1566 | /// |
1567 | /// # Examples |
1568 | /// |
1569 | /// ``` |
1570 | /// use futures_lite::stream::{self, StreamExt}; |
1571 | /// |
1572 | /// # spin_on::spin_on(async { |
1573 | /// let mut s = stream::iter(vec![1, 3, 5, 7]); |
1574 | /// assert!(!s.any(|x| x % 2 == 0).await); |
1575 | /// |
1576 | /// let mut s = stream::iter(vec![1, 2, 3]); |
1577 | /// assert!(s.any(|x| x % 2 == 0).await); |
1578 | /// |
1579 | /// let mut s = stream::empty::<i32>(); |
1580 | /// assert!(!s.any(|x| x % 2 == 0).await); |
1581 | /// # }); |
1582 | /// ``` |
1583 | fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P> |
1584 | where |
1585 | Self: Unpin, |
1586 | P: FnMut(Self::Item) -> bool, |
1587 | { |
1588 | AnyFuture { |
1589 | stream: self, |
1590 | predicate, |
1591 | } |
1592 | } |
1593 | |
1594 | /// Calls a closure on each item of the stream. |
1595 | /// |
1596 | /// # Examples |
1597 | /// |
1598 | /// ``` |
1599 | /// use futures_lite::stream::{self, StreamExt}; |
1600 | /// |
1601 | /// # spin_on::spin_on(async { |
1602 | /// let mut s = stream::iter(vec![1, 2, 3]); |
1603 | /// s.for_each(|s| println!("{}" , s)).await; |
1604 | /// # }); |
1605 | /// ``` |
1606 | fn for_each<F>(self, f: F) -> ForEachFuture<Self, F> |
1607 | where |
1608 | Self: Sized, |
1609 | F: FnMut(Self::Item), |
1610 | { |
1611 | ForEachFuture { stream: self, f } |
1612 | } |
1613 | |
1614 | /// Calls a fallible closure on each item of the stream, stopping on first error. |
1615 | /// |
1616 | /// # Examples |
1617 | /// |
1618 | /// ``` |
1619 | /// use futures_lite::stream::{self, StreamExt}; |
1620 | /// |
1621 | /// # spin_on::spin_on(async { |
1622 | /// let mut s = stream::iter(vec![0, 1, 2, 3]); |
1623 | /// |
1624 | /// let mut v = vec![]; |
1625 | /// let res = s |
1626 | /// .try_for_each(|n| { |
1627 | /// if n < 2 { |
1628 | /// v.push(n); |
1629 | /// Ok(()) |
1630 | /// } else { |
1631 | /// Err("too big" ) |
1632 | /// } |
1633 | /// }) |
1634 | /// .await; |
1635 | /// |
1636 | /// assert_eq!(v, &[0, 1]); |
1637 | /// assert_eq!(res, Err("too big" )); |
1638 | /// # }); |
1639 | /// ``` |
1640 | fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F> |
1641 | where |
1642 | Self: Unpin, |
1643 | F: FnMut(Self::Item) -> Result<(), E>, |
1644 | { |
1645 | TryForEachFuture { stream: self, f } |
1646 | } |
1647 | |
1648 | /// Zips up two streams into a single stream of pairs. |
1649 | /// |
1650 | /// The stream of pairs stops when either of the original two streams is exhausted. |
1651 | /// |
1652 | /// # Examples |
1653 | /// |
1654 | /// ``` |
1655 | /// use futures_lite::stream::{self, StreamExt}; |
1656 | /// |
1657 | /// # spin_on::spin_on(async { |
1658 | /// let l = stream::iter(vec![1, 2, 3]); |
1659 | /// let r = stream::iter(vec![4, 5, 6, 7]); |
1660 | /// let mut s = l.zip(r); |
1661 | /// |
1662 | /// assert_eq!(s.next().await, Some((1, 4))); |
1663 | /// assert_eq!(s.next().await, Some((2, 5))); |
1664 | /// assert_eq!(s.next().await, Some((3, 6))); |
1665 | /// assert_eq!(s.next().await, None); |
1666 | /// # }); |
1667 | /// ``` |
1668 | fn zip<U>(self, other: U) -> Zip<Self, U> |
1669 | where |
1670 | Self: Sized, |
1671 | U: Stream, |
1672 | { |
1673 | Zip { |
1674 | item_slot: None, |
1675 | first: self, |
1676 | second: other, |
1677 | } |
1678 | } |
1679 | |
1680 | /// Collects a stream of pairs into a pair of collections. |
1681 | /// |
1682 | /// # Examples |
1683 | /// |
1684 | /// ``` |
1685 | /// use futures_lite::stream::{self, StreamExt}; |
1686 | /// |
1687 | /// # spin_on::spin_on(async { |
1688 | /// let s = stream::iter(vec![(1, 2), (3, 4)]); |
1689 | /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await; |
1690 | /// |
1691 | /// assert_eq!(left, [1, 3]); |
1692 | /// assert_eq!(right, [2, 4]); |
1693 | /// # }); |
1694 | /// ``` |
1695 | fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB> |
1696 | where |
1697 | FromA: Default + Extend<A>, |
1698 | FromB: Default + Extend<B>, |
1699 | Self: Stream<Item = (A, B)> + Sized, |
1700 | { |
1701 | UnzipFuture { |
1702 | stream: self, |
1703 | res: Some(Default::default()), |
1704 | } |
1705 | } |
1706 | |
1707 | /// Merges with `other` stream, preferring items from `self` whenever both streams are ready. |
1708 | /// |
1709 | /// # Examples |
1710 | /// |
1711 | /// ``` |
1712 | /// use futures_lite::stream::{self, StreamExt}; |
1713 | /// use futures_lite::stream::{once, pending}; |
1714 | /// |
1715 | /// # spin_on::spin_on(async { |
1716 | /// assert_eq!(once(1).or(pending()).next().await, Some(1)); |
1717 | /// assert_eq!(pending().or(once(2)).next().await, Some(2)); |
1718 | /// |
1719 | /// // The first future wins. |
1720 | /// assert_eq!(once(1).or(once(2)).next().await, Some(1)); |
1721 | /// # }) |
1722 | /// ``` |
1723 | fn or<S>(self, other: S) -> Or<Self, S> |
1724 | where |
1725 | Self: Sized, |
1726 | S: Stream<Item = Self::Item>, |
1727 | { |
1728 | Or { |
1729 | stream1: self, |
1730 | stream2: other, |
1731 | } |
1732 | } |
1733 | |
1734 | /// Merges with `other` stream, with no preference for either stream when both are ready. |
1735 | /// |
1736 | /// # Examples |
1737 | /// |
1738 | /// ``` |
1739 | /// use futures_lite::stream::{self, StreamExt}; |
1740 | /// use futures_lite::stream::{once, pending}; |
1741 | /// |
1742 | /// # spin_on::spin_on(async { |
1743 | /// assert_eq!(once(1).race(pending()).next().await, Some(1)); |
1744 | /// assert_eq!(pending().race(once(2)).next().await, Some(2)); |
1745 | /// |
1746 | /// // One of the two stream is randomly chosen as the winner. |
1747 | /// let res = once(1).race(once(2)).next().await; |
1748 | /// # }) |
1749 | /// ``` |
1750 | #[cfg (feature = "std" )] |
1751 | fn race<S>(self, other: S) -> Race<Self, S> |
1752 | where |
1753 | Self: Sized, |
1754 | S: Stream<Item = Self::Item>, |
1755 | { |
1756 | Race { |
1757 | stream1: self, |
1758 | stream2: other, |
1759 | } |
1760 | } |
1761 | |
1762 | /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`. |
1763 | /// |
1764 | /// # Examples |
1765 | /// |
1766 | /// ``` |
1767 | /// use futures_lite::stream::{self, StreamExt}; |
1768 | /// |
1769 | /// # spin_on::spin_on(async { |
1770 | /// let a = stream::once(1); |
1771 | /// let b = stream::empty(); |
1772 | /// |
1773 | /// // Streams of different types can be stored in |
1774 | /// // the same collection when they are boxed: |
1775 | /// let streams = vec![a.boxed(), b.boxed()]; |
1776 | /// # }) |
1777 | /// ``` |
1778 | #[cfg (feature = "alloc" )] |
1779 | fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>> |
1780 | where |
1781 | Self: Send + Sized + 'a, |
1782 | { |
1783 | Box::pin(self) |
1784 | } |
1785 | |
1786 | /// Boxes the stream and changes its type to `dyn Stream + 'a`. |
1787 | /// |
1788 | /// # Examples |
1789 | /// |
1790 | /// ``` |
1791 | /// use futures_lite::stream::{self, StreamExt}; |
1792 | /// |
1793 | /// # spin_on::spin_on(async { |
1794 | /// let a = stream::once(1); |
1795 | /// let b = stream::empty(); |
1796 | /// |
1797 | /// // Streams of different types can be stored in |
1798 | /// // the same collection when they are boxed: |
1799 | /// let streams = vec![a.boxed_local(), b.boxed_local()]; |
1800 | /// # }) |
1801 | /// ``` |
1802 | #[cfg (feature = "alloc" )] |
1803 | fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>> |
1804 | where |
1805 | Self: Sized + 'a, |
1806 | { |
1807 | Box::pin(self) |
1808 | } |
1809 | } |
1810 | |
1811 | impl<S: Stream + ?Sized> StreamExt for S {} |
1812 | |
1813 | /// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`. |
1814 | /// |
1815 | /// # Examples |
1816 | /// |
1817 | /// ``` |
1818 | /// use futures_lite::stream::{self, StreamExt}; |
1819 | /// |
1820 | /// // These two lines are equivalent: |
1821 | /// let s1: stream::Boxed<i32> = stream::once(7).boxed(); |
1822 | /// let s2: stream::Boxed<i32> = Box::pin(stream::once(7)); |
1823 | /// ``` |
1824 | #[cfg (feature = "alloc" )] |
1825 | pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>; |
1826 | |
1827 | /// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`. |
1828 | /// |
1829 | /// # Examples |
1830 | /// |
1831 | /// ``` |
1832 | /// use futures_lite::stream::{self, StreamExt}; |
1833 | /// |
1834 | /// // These two lines are equivalent: |
1835 | /// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local(); |
1836 | /// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7)); |
1837 | /// ``` |
1838 | #[cfg (feature = "alloc" )] |
1839 | pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>; |
1840 | |
1841 | /// Future for the [`StreamExt::next()`] method. |
1842 | #[derive (Debug)] |
1843 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
1844 | pub struct NextFuture<'a, S: ?Sized> { |
1845 | stream: &'a mut S, |
1846 | } |
1847 | |
1848 | impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {} |
1849 | |
1850 | impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> { |
1851 | type Output = Option<S::Item>; |
1852 | |
1853 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1854 | self.stream.poll_next(cx) |
1855 | } |
1856 | } |
1857 | |
1858 | /// Future for the [`StreamExt::try_next()`] method. |
1859 | #[derive (Debug)] |
1860 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
1861 | pub struct TryNextFuture<'a, S: ?Sized> { |
1862 | stream: &'a mut S, |
1863 | } |
1864 | |
1865 | impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {} |
1866 | |
1867 | impl<T, E, S> Future for TryNextFuture<'_, S> |
1868 | where |
1869 | S: Stream<Item = Result<T, E>> + Unpin + ?Sized, |
1870 | { |
1871 | type Output = Result<Option<T>, E>; |
1872 | |
1873 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1874 | let res: Option> = ready!(self.stream.poll_next(cx)); |
1875 | Poll::Ready(res.transpose()) |
1876 | } |
1877 | } |
1878 | |
1879 | pin_project! { |
1880 | /// Future for the [`StreamExt::count()`] method. |
1881 | #[derive(Debug)] |
1882 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
1883 | pub struct CountFuture<S: ?Sized> { |
1884 | count: usize, |
1885 | #[pin] |
1886 | stream: S, |
1887 | } |
1888 | } |
1889 | |
1890 | impl<S: Stream + ?Sized> Future for CountFuture<S> { |
1891 | type Output = usize; |
1892 | |
1893 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1894 | loop { |
1895 | match ready!(self.as_mut().project().stream.poll_next(cx)) { |
1896 | None => return Poll::Ready(self.count), |
1897 | Some(_) => *self.as_mut().project().count += 1, |
1898 | } |
1899 | } |
1900 | } |
1901 | } |
1902 | |
1903 | pin_project! { |
1904 | /// Future for the [`StreamExt::collect()`] method. |
1905 | #[derive(Debug)] |
1906 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
1907 | pub struct CollectFuture<S, C> { |
1908 | #[pin] |
1909 | stream: S, |
1910 | collection: C, |
1911 | } |
1912 | } |
1913 | |
1914 | impl<S, C> Future for CollectFuture<S, C> |
1915 | where |
1916 | S: Stream, |
1917 | C: Default + Extend<S::Item>, |
1918 | { |
1919 | type Output = C; |
1920 | |
1921 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> { |
1922 | let mut this: Projection<'_, S, C> = self.as_mut().project(); |
1923 | loop { |
1924 | match ready!(this.stream.as_mut().poll_next(cx)) { |
1925 | Some(e: ::Item) => this.collection.extend(iter:Some(e)), |
1926 | None => return Poll::Ready(mem::take(self.project().collection)), |
1927 | } |
1928 | } |
1929 | } |
1930 | } |
1931 | |
1932 | pin_project! { |
1933 | /// Future for the [`StreamExt::try_collect()`] method. |
1934 | #[derive(Debug)] |
1935 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
1936 | pub struct TryCollectFuture<S, C> { |
1937 | #[pin] |
1938 | stream: S, |
1939 | items: C, |
1940 | } |
1941 | } |
1942 | |
1943 | impl<T, E, S, C> Future for TryCollectFuture<S, C> |
1944 | where |
1945 | S: Stream<Item = Result<T, E>>, |
1946 | C: Default + Extend<T>, |
1947 | { |
1948 | type Output = Result<C, E>; |
1949 | |
1950 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1951 | let mut this: Projection<'_, S, C> = self.project(); |
1952 | Poll::Ready(Ok(loop { |
1953 | match ready!(this.stream.as_mut().poll_next(cx)?) { |
1954 | Some(x: T) => this.items.extend(iter:Some(x)), |
1955 | None => break mem::take(dest:this.items), |
1956 | } |
1957 | })) |
1958 | } |
1959 | } |
1960 | |
1961 | pin_project! { |
1962 | /// Future for the [`StreamExt::partition()`] method. |
1963 | #[derive(Debug)] |
1964 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
1965 | pub struct PartitionFuture<S, P, B> { |
1966 | #[pin] |
1967 | stream: S, |
1968 | predicate: P, |
1969 | res: Option<(B, B)>, |
1970 | } |
1971 | } |
1972 | |
1973 | impl<S, P, B> Future for PartitionFuture<S, P, B> |
1974 | where |
1975 | S: Stream + Sized, |
1976 | P: FnMut(&S::Item) -> bool, |
1977 | B: Default + Extend<S::Item>, |
1978 | { |
1979 | type Output = (B, B); |
1980 | |
1981 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1982 | let mut this: Projection<'_, S, P, B> = self.project(); |
1983 | loop { |
1984 | match ready!(this.stream.as_mut().poll_next(cx)) { |
1985 | Some(v: ::Item) => { |
1986 | let res: &mut (B, B) = this.res.as_mut().unwrap(); |
1987 | if (this.predicate)(&v) { |
1988 | res.0.extend(iter:Some(v)) |
1989 | } else { |
1990 | res.1.extend(iter:Some(v)) |
1991 | } |
1992 | } |
1993 | None => return Poll::Ready(this.res.take().unwrap()), |
1994 | } |
1995 | } |
1996 | } |
1997 | } |
1998 | |
1999 | pin_project! { |
2000 | /// Future for the [`StreamExt::fold()`] method. |
2001 | #[derive(Debug)] |
2002 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2003 | pub struct FoldFuture<S, F, T> { |
2004 | #[pin] |
2005 | stream: S, |
2006 | f: F, |
2007 | acc: Option<T>, |
2008 | } |
2009 | } |
2010 | |
2011 | impl<S, F, T> Future for FoldFuture<S, F, T> |
2012 | where |
2013 | S: Stream, |
2014 | F: FnMut(T, S::Item) -> T, |
2015 | { |
2016 | type Output = T; |
2017 | |
2018 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2019 | let mut this: Projection<'_, S, F, T> = self.project(); |
2020 | loop { |
2021 | match ready!(this.stream.as_mut().poll_next(cx)) { |
2022 | Some(v: ::Item) => { |
2023 | let old: T = this.acc.take().unwrap(); |
2024 | let new: T = (this.f)(old, v); |
2025 | *this.acc = Some(new); |
2026 | } |
2027 | None => return Poll::Ready(this.acc.take().unwrap()), |
2028 | } |
2029 | } |
2030 | } |
2031 | } |
2032 | |
2033 | /// Future for the [`StreamExt::try_fold()`] method. |
2034 | #[derive (Debug)] |
2035 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2036 | pub struct TryFoldFuture<'a, S, F, B> { |
2037 | stream: &'a mut S, |
2038 | f: F, |
2039 | acc: Option<B>, |
2040 | } |
2041 | |
2042 | impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {} |
2043 | |
2044 | impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B> |
2045 | where |
2046 | S: Stream<Item = Result<T, E>> + Unpin, |
2047 | F: FnMut(B, T) -> Result<B, E>, |
2048 | { |
2049 | type Output = Result<B, E>; |
2050 | |
2051 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2052 | loop { |
2053 | match ready!(self.stream.poll_next(cx)) { |
2054 | Some(Err(e: E)) => return Poll::Ready(Err(e)), |
2055 | Some(Ok(t: T)) => { |
2056 | let old: B = self.acc.take().unwrap(); |
2057 | let new: Result = (&mut self.f)(old, t); |
2058 | |
2059 | match new { |
2060 | Ok(t: B) => self.acc = Some(t), |
2061 | Err(e: E) => return Poll::Ready(Err(e)), |
2062 | } |
2063 | } |
2064 | None => return Poll::Ready(Ok(self.acc.take().unwrap())), |
2065 | } |
2066 | } |
2067 | } |
2068 | } |
2069 | |
2070 | pin_project! { |
2071 | /// Stream for the [`StreamExt::scan()`] method. |
2072 | #[derive(Clone, Debug)] |
2073 | #[must_use = "streams do nothing unless polled" ] |
2074 | pub struct Scan<S, St, F> { |
2075 | #[pin] |
2076 | stream: S, |
2077 | state_f: (St, F), |
2078 | } |
2079 | } |
2080 | |
2081 | impl<S, St, F, B> Stream for Scan<S, St, F> |
2082 | where |
2083 | S: Stream, |
2084 | F: FnMut(&mut St, S::Item) -> Option<B>, |
2085 | { |
2086 | type Item = B; |
2087 | |
2088 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> { |
2089 | let mut this: Projection<'_, S, St, F> = self.project(); |
2090 | this.stream.as_mut().poll_next(cx).map(|item: Option<::Item>| { |
2091 | item.and_then(|item: ::Item| { |
2092 | let (state: &mut St, f: &mut F) = this.state_f; |
2093 | f(state, item) |
2094 | }) |
2095 | }) |
2096 | } |
2097 | } |
2098 | |
2099 | pin_project! { |
2100 | /// Stream for the [`StreamExt::fuse()`] method. |
2101 | #[derive(Clone, Debug)] |
2102 | #[must_use = "streams do nothing unless polled" ] |
2103 | pub struct Fuse<S> { |
2104 | #[pin] |
2105 | stream: S, |
2106 | done: bool, |
2107 | } |
2108 | } |
2109 | |
2110 | impl<S: Stream> Stream for Fuse<S> { |
2111 | type Item = S::Item; |
2112 | |
2113 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |
2114 | let this: Projection<'_, S> = self.project(); |
2115 | |
2116 | if *this.done { |
2117 | Poll::Ready(None) |
2118 | } else { |
2119 | let next: Option<::Item> = ready!(this.stream.poll_next(cx)); |
2120 | if next.is_none() { |
2121 | *this.done = true; |
2122 | } |
2123 | Poll::Ready(next) |
2124 | } |
2125 | } |
2126 | } |
2127 | |
2128 | pin_project! { |
2129 | /// Stream for the [`StreamExt::map()`] method. |
2130 | #[derive(Clone, Debug)] |
2131 | #[must_use = "streams do nothing unless polled" ] |
2132 | pub struct Map<S, F> { |
2133 | #[pin] |
2134 | stream: S, |
2135 | f: F, |
2136 | } |
2137 | } |
2138 | |
2139 | impl<S, F, T> Stream for Map<S, F> |
2140 | where |
2141 | S: Stream, |
2142 | F: FnMut(S::Item) -> T, |
2143 | { |
2144 | type Item = T; |
2145 | |
2146 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2147 | let this: Projection<'_, S, F> = self.project(); |
2148 | let next: Option<::Item> = ready!(this.stream.poll_next(cx)); |
2149 | Poll::Ready(next.map(this.f)) |
2150 | } |
2151 | |
2152 | fn size_hint(&self) -> (usize, Option<usize>) { |
2153 | self.stream.size_hint() |
2154 | } |
2155 | } |
2156 | |
2157 | pin_project! { |
2158 | /// Stream for the [`StreamExt::flat_map()`] method. |
2159 | #[derive(Clone, Debug)] |
2160 | #[must_use = "streams do nothing unless polled" ] |
2161 | pub struct FlatMap<S, U, F> { |
2162 | #[pin] |
2163 | stream: Map<S, F>, |
2164 | #[pin] |
2165 | inner_stream: Option<U>, |
2166 | } |
2167 | } |
2168 | |
2169 | impl<S, U, F> Stream for FlatMap<S, U, F> |
2170 | where |
2171 | S: Stream, |
2172 | U: Stream, |
2173 | F: FnMut(S::Item) -> U, |
2174 | { |
2175 | type Item = U::Item; |
2176 | |
2177 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2178 | let mut this: Projection<'_, S, U, F> = self.project(); |
2179 | loop { |
2180 | if let Some(inner: Pin<&mut U>) = this.inner_stream.as_mut().as_pin_mut() { |
2181 | match ready!(inner.poll_next(cx)) { |
2182 | Some(item: ::Item) => return Poll::Ready(Some(item)), |
2183 | None => this.inner_stream.set(None), |
2184 | } |
2185 | } |
2186 | |
2187 | match ready!(this.stream.as_mut().poll_next(cx)) { |
2188 | Some(stream: U) => this.inner_stream.set(Some(stream)), |
2189 | None => return Poll::Ready(None), |
2190 | } |
2191 | } |
2192 | } |
2193 | } |
2194 | |
2195 | pin_project! { |
2196 | /// Stream for the [`StreamExt::flatten()`] method. |
2197 | #[derive(Clone, Debug)] |
2198 | #[must_use = "streams do nothing unless polled" ] |
2199 | pub struct Flatten<S: Stream> { |
2200 | #[pin] |
2201 | stream: S, |
2202 | #[pin] |
2203 | inner_stream: Option<S::Item>, |
2204 | } |
2205 | } |
2206 | |
2207 | impl<S, U> Stream for Flatten<S> |
2208 | where |
2209 | S: Stream<Item = U>, |
2210 | U: Stream, |
2211 | { |
2212 | type Item = U::Item; |
2213 | |
2214 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2215 | let mut this: Projection<'_, S> = self.project(); |
2216 | loop { |
2217 | if let Some(inner: Pin<&mut U>) = this.inner_stream.as_mut().as_pin_mut() { |
2218 | match ready!(inner.poll_next(cx)) { |
2219 | Some(item: ::Item) => return Poll::Ready(Some(item)), |
2220 | None => this.inner_stream.set(None), |
2221 | } |
2222 | } |
2223 | |
2224 | match ready!(this.stream.as_mut().poll_next(cx)) { |
2225 | Some(inner: U) => this.inner_stream.set(Some(inner)), |
2226 | None => return Poll::Ready(None), |
2227 | } |
2228 | } |
2229 | } |
2230 | } |
2231 | |
2232 | pin_project! { |
2233 | /// Stream for the [`StreamExt::then()`] method. |
2234 | #[derive(Clone, Debug)] |
2235 | #[must_use = "streams do nothing unless polled" ] |
2236 | pub struct Then<S, F, Fut> { |
2237 | #[pin] |
2238 | stream: S, |
2239 | #[pin] |
2240 | future: Option<Fut>, |
2241 | f: F, |
2242 | } |
2243 | } |
2244 | |
2245 | impl<S, F, Fut> Stream for Then<S, F, Fut> |
2246 | where |
2247 | S: Stream, |
2248 | F: FnMut(S::Item) -> Fut, |
2249 | Fut: Future, |
2250 | { |
2251 | type Item = Fut::Output; |
2252 | |
2253 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2254 | let mut this = self.project(); |
2255 | |
2256 | loop { |
2257 | if let Some(fut) = this.future.as_mut().as_pin_mut() { |
2258 | let item = ready!(fut.poll(cx)); |
2259 | this.future.set(None); |
2260 | return Poll::Ready(Some(item)); |
2261 | } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { |
2262 | this.future.set(Some((this.f)(item))); |
2263 | } else { |
2264 | return Poll::Ready(None); |
2265 | } |
2266 | } |
2267 | } |
2268 | |
2269 | fn size_hint(&self) -> (usize, Option<usize>) { |
2270 | let future_len = self.future.is_some() as usize; |
2271 | let (lower, upper) = self.stream.size_hint(); |
2272 | let lower = lower.saturating_add(future_len); |
2273 | let upper = upper.and_then(|u| u.checked_add(future_len)); |
2274 | (lower, upper) |
2275 | } |
2276 | } |
2277 | |
2278 | pin_project! { |
2279 | /// Stream for the [`StreamExt::filter()`] method. |
2280 | #[derive(Clone, Debug)] |
2281 | #[must_use = "streams do nothing unless polled" ] |
2282 | pub struct Filter<S, P> { |
2283 | #[pin] |
2284 | stream: S, |
2285 | predicate: P, |
2286 | } |
2287 | } |
2288 | |
2289 | impl<S, P> Stream for Filter<S, P> |
2290 | where |
2291 | S: Stream, |
2292 | P: FnMut(&S::Item) -> bool, |
2293 | { |
2294 | type Item = S::Item; |
2295 | |
2296 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2297 | let mut this: Projection<'_, S, P> = self.project(); |
2298 | loop { |
2299 | match ready!(this.stream.as_mut().poll_next(cx)) { |
2300 | None => return Poll::Ready(None), |
2301 | Some(v: ::Item) if (this.predicate)(&v) => return Poll::Ready(Some(v)), |
2302 | Some(_) => {} |
2303 | } |
2304 | } |
2305 | } |
2306 | } |
2307 | |
2308 | /// Merges two streams, preferring items from `stream1` whenever both streams are ready. |
2309 | /// |
2310 | /// # Examples |
2311 | /// |
2312 | /// ``` |
2313 | /// use futures_lite::stream::{self, once, pending, StreamExt}; |
2314 | /// |
2315 | /// # spin_on::spin_on(async { |
2316 | /// assert_eq!(stream::or(once(1), pending()).next().await, Some(1)); |
2317 | /// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2)); |
2318 | /// |
2319 | /// // The first stream wins. |
2320 | /// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1)); |
2321 | /// # }) |
2322 | /// ``` |
2323 | pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2> |
2324 | where |
2325 | S1: Stream<Item = T>, |
2326 | S2: Stream<Item = T>, |
2327 | { |
2328 | Or { stream1, stream2 } |
2329 | } |
2330 | |
2331 | pin_project! { |
2332 | /// Stream for the [`or()`] function and the [`StreamExt::or()`] method. |
2333 | #[derive(Clone, Debug)] |
2334 | #[must_use = "streams do nothing unless polled" ] |
2335 | pub struct Or<S1, S2> { |
2336 | #[pin] |
2337 | stream1: S1, |
2338 | #[pin] |
2339 | stream2: S2, |
2340 | } |
2341 | } |
2342 | |
2343 | impl<T, S1, S2> Stream for Or<S1, S2> |
2344 | where |
2345 | S1: Stream<Item = T>, |
2346 | S2: Stream<Item = T>, |
2347 | { |
2348 | type Item = T; |
2349 | |
2350 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2351 | let mut this: Projection<'_, S1, S2> = self.project(); |
2352 | |
2353 | if let Poll::Ready(Some(t: T)) = this.stream1.as_mut().poll_next(cx) { |
2354 | return Poll::Ready(Some(t)); |
2355 | } |
2356 | this.stream2.as_mut().poll_next(cx) |
2357 | } |
2358 | } |
2359 | |
2360 | /// Merges two streams, with no preference for either stream when both are ready. |
2361 | /// |
2362 | /// # Examples |
2363 | /// |
2364 | /// ``` |
2365 | /// use futures_lite::stream::{self, once, pending, StreamExt}; |
2366 | /// |
2367 | /// # spin_on::spin_on(async { |
2368 | /// assert_eq!(stream::race(once(1), pending()).next().await, Some(1)); |
2369 | /// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2)); |
2370 | /// |
2371 | /// // One of the two stream is randomly chosen as the winner. |
2372 | /// let res = stream::race(once(1), once(2)).next().await; |
2373 | /// # }) |
2374 | #[cfg (feature = "std" )] |
2375 | pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2> |
2376 | where |
2377 | S1: Stream<Item = T>, |
2378 | S2: Stream<Item = T>, |
2379 | { |
2380 | Race { stream1, stream2 } |
2381 | } |
2382 | |
2383 | #[cfg (feature = "std" )] |
2384 | pin_project! { |
2385 | /// Stream for the [`race()`] function and the [`StreamExt::race()`] method. |
2386 | #[derive(Clone, Debug)] |
2387 | #[must_use = "streams do nothing unless polled" ] |
2388 | pub struct Race<S1, S2> { |
2389 | #[pin] |
2390 | stream1: S1, |
2391 | #[pin] |
2392 | stream2: S2, |
2393 | } |
2394 | } |
2395 | |
2396 | #[cfg (feature = "std" )] |
2397 | impl<T, S1, S2> Stream for Race<S1, S2> |
2398 | where |
2399 | S1: Stream<Item = T>, |
2400 | S2: Stream<Item = T>, |
2401 | { |
2402 | type Item = T; |
2403 | |
2404 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2405 | let mut this: Projection<'_, S1, S2> = self.project(); |
2406 | |
2407 | if fastrand::bool() { |
2408 | if let Poll::Ready(Some(t: T)) = this.stream1.as_mut().poll_next(cx) { |
2409 | return Poll::Ready(Some(t)); |
2410 | } |
2411 | if let Poll::Ready(Some(t: T)) = this.stream2.as_mut().poll_next(cx) { |
2412 | return Poll::Ready(Some(t)); |
2413 | } |
2414 | } else { |
2415 | if let Poll::Ready(Some(t: T)) = this.stream2.as_mut().poll_next(cx) { |
2416 | return Poll::Ready(Some(t)); |
2417 | } |
2418 | if let Poll::Ready(Some(t: T)) = this.stream1.as_mut().poll_next(cx) { |
2419 | return Poll::Ready(Some(t)); |
2420 | } |
2421 | } |
2422 | Poll::Pending |
2423 | } |
2424 | } |
2425 | |
2426 | pin_project! { |
2427 | /// Stream for the [`StreamExt::filter_map()`] method. |
2428 | #[derive(Clone, Debug)] |
2429 | #[must_use = "streams do nothing unless polled" ] |
2430 | pub struct FilterMap<S, F> { |
2431 | #[pin] |
2432 | stream: S, |
2433 | f: F, |
2434 | } |
2435 | } |
2436 | |
2437 | impl<S, F, T> Stream for FilterMap<S, F> |
2438 | where |
2439 | S: Stream, |
2440 | F: FnMut(S::Item) -> Option<T>, |
2441 | { |
2442 | type Item = T; |
2443 | |
2444 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2445 | let mut this: Projection<'_, S, F> = self.project(); |
2446 | loop { |
2447 | match ready!(this.stream.as_mut().poll_next(cx)) { |
2448 | None => return Poll::Ready(None), |
2449 | Some(v: ::Item) => { |
2450 | if let Some(t: T) = (this.f)(v) { |
2451 | return Poll::Ready(Some(t)); |
2452 | } |
2453 | } |
2454 | } |
2455 | } |
2456 | } |
2457 | } |
2458 | |
2459 | pin_project! { |
2460 | /// Stream for the [`StreamExt::take()`] method. |
2461 | #[derive(Clone, Debug)] |
2462 | #[must_use = "streams do nothing unless polled" ] |
2463 | pub struct Take<S> { |
2464 | #[pin] |
2465 | stream: S, |
2466 | n: usize, |
2467 | } |
2468 | } |
2469 | |
2470 | impl<S: Stream> Stream for Take<S> { |
2471 | type Item = S::Item; |
2472 | |
2473 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |
2474 | let this: Projection<'_, S> = self.project(); |
2475 | |
2476 | if *this.n == 0 { |
2477 | Poll::Ready(None) |
2478 | } else { |
2479 | let next: Option<::Item> = ready!(this.stream.poll_next(cx)); |
2480 | match next { |
2481 | Some(_) => *this.n -= 1, |
2482 | None => *this.n = 0, |
2483 | } |
2484 | Poll::Ready(next) |
2485 | } |
2486 | } |
2487 | } |
2488 | |
2489 | pin_project! { |
2490 | /// Stream for the [`StreamExt::take_while()`] method. |
2491 | #[derive(Clone, Debug)] |
2492 | #[must_use = "streams do nothing unless polled" ] |
2493 | pub struct TakeWhile<S, P> { |
2494 | #[pin] |
2495 | stream: S, |
2496 | predicate: P, |
2497 | } |
2498 | } |
2499 | |
2500 | impl<S, P> Stream for TakeWhile<S, P> |
2501 | where |
2502 | S: Stream, |
2503 | P: FnMut(&S::Item) -> bool, |
2504 | { |
2505 | type Item = S::Item; |
2506 | |
2507 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2508 | let this: Projection<'_, S, P> = self.project(); |
2509 | |
2510 | match ready!(this.stream.poll_next(cx)) { |
2511 | Some(v: ::Item) => { |
2512 | if (this.predicate)(&v) { |
2513 | Poll::Ready(Some(v)) |
2514 | } else { |
2515 | Poll::Ready(None) |
2516 | } |
2517 | } |
2518 | None => Poll::Ready(None), |
2519 | } |
2520 | } |
2521 | } |
2522 | |
2523 | pin_project! { |
2524 | /// Stream for the [`StreamExt::skip()`] method. |
2525 | #[derive(Clone, Debug)] |
2526 | #[must_use = "streams do nothing unless polled" ] |
2527 | pub struct Skip<S> { |
2528 | #[pin] |
2529 | stream: S, |
2530 | n: usize, |
2531 | } |
2532 | } |
2533 | |
2534 | impl<S: Stream> Stream for Skip<S> { |
2535 | type Item = S::Item; |
2536 | |
2537 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2538 | let mut this: Projection<'_, S> = self.project(); |
2539 | loop { |
2540 | match ready!(this.stream.as_mut().poll_next(cx)) { |
2541 | Some(v: ::Item) => match *this.n { |
2542 | 0 => return Poll::Ready(Some(v)), |
2543 | _ => *this.n -= 1, |
2544 | }, |
2545 | None => return Poll::Ready(None), |
2546 | } |
2547 | } |
2548 | } |
2549 | } |
2550 | |
2551 | pin_project! { |
2552 | /// Stream for the [`StreamExt::skip_while()`] method. |
2553 | #[derive(Clone, Debug)] |
2554 | #[must_use = "streams do nothing unless polled" ] |
2555 | pub struct SkipWhile<S, P> { |
2556 | #[pin] |
2557 | stream: S, |
2558 | predicate: Option<P>, |
2559 | } |
2560 | } |
2561 | |
2562 | impl<S, P> Stream for SkipWhile<S, P> |
2563 | where |
2564 | S: Stream, |
2565 | P: FnMut(&S::Item) -> bool, |
2566 | { |
2567 | type Item = S::Item; |
2568 | |
2569 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2570 | let mut this: Projection<'_, S, P> = self.project(); |
2571 | loop { |
2572 | match ready!(this.stream.as_mut().poll_next(cx)) { |
2573 | Some(v: ::Item) => match this.predicate { |
2574 | Some(p: &mut P) => { |
2575 | if !p(&v) { |
2576 | *this.predicate = None; |
2577 | return Poll::Ready(Some(v)); |
2578 | } |
2579 | } |
2580 | None => return Poll::Ready(Some(v)), |
2581 | }, |
2582 | None => return Poll::Ready(None), |
2583 | } |
2584 | } |
2585 | } |
2586 | } |
2587 | |
2588 | pin_project! { |
2589 | /// Stream for the [`StreamExt::step_by()`] method. |
2590 | #[derive(Clone, Debug)] |
2591 | #[must_use = "streams do nothing unless polled" ] |
2592 | pub struct StepBy<S> { |
2593 | #[pin] |
2594 | stream: S, |
2595 | step: usize, |
2596 | i: usize, |
2597 | } |
2598 | } |
2599 | |
2600 | impl<S: Stream> Stream for StepBy<S> { |
2601 | type Item = S::Item; |
2602 | |
2603 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2604 | let mut this: Projection<'_, S> = self.project(); |
2605 | loop { |
2606 | match ready!(this.stream.as_mut().poll_next(cx)) { |
2607 | Some(v: ::Item) => { |
2608 | if *this.i == 0 { |
2609 | *this.i = *this.step - 1; |
2610 | return Poll::Ready(Some(v)); |
2611 | } else { |
2612 | *this.i -= 1; |
2613 | } |
2614 | } |
2615 | None => return Poll::Ready(None), |
2616 | } |
2617 | } |
2618 | } |
2619 | } |
2620 | |
2621 | pin_project! { |
2622 | /// Stream for the [`StreamExt::chain()`] method. |
2623 | #[derive(Clone, Debug)] |
2624 | #[must_use = "streams do nothing unless polled" ] |
2625 | pub struct Chain<S, U> { |
2626 | #[pin] |
2627 | first: Fuse<S>, |
2628 | #[pin] |
2629 | second: Fuse<U>, |
2630 | } |
2631 | } |
2632 | |
2633 | impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> { |
2634 | type Item = S::Item; |
2635 | |
2636 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2637 | let mut this = self.project(); |
2638 | |
2639 | if !this.first.done { |
2640 | let next = ready!(this.first.as_mut().poll_next(cx)); |
2641 | if let Some(next) = next { |
2642 | return Poll::Ready(Some(next)); |
2643 | } |
2644 | } |
2645 | |
2646 | if !this.second.done { |
2647 | let next = ready!(this.second.as_mut().poll_next(cx)); |
2648 | if let Some(next) = next { |
2649 | return Poll::Ready(Some(next)); |
2650 | } |
2651 | } |
2652 | |
2653 | if this.first.done && this.second.done { |
2654 | Poll::Ready(None) |
2655 | } else { |
2656 | Poll::Pending |
2657 | } |
2658 | } |
2659 | } |
2660 | |
2661 | pin_project! { |
2662 | /// Stream for the [`StreamExt::cloned()`] method. |
2663 | #[derive(Clone, Debug)] |
2664 | #[must_use = "streams do nothing unless polled" ] |
2665 | pub struct Cloned<S> { |
2666 | #[pin] |
2667 | stream: S, |
2668 | } |
2669 | } |
2670 | |
2671 | impl<'a, S, T: 'a> Stream for Cloned<S> |
2672 | where |
2673 | S: Stream<Item = &'a T>, |
2674 | T: Clone, |
2675 | { |
2676 | type Item = T; |
2677 | |
2678 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2679 | let this: Projection<'_, S> = self.project(); |
2680 | let next: Option<&T> = ready!(this.stream.poll_next(cx)); |
2681 | Poll::Ready(next.cloned()) |
2682 | } |
2683 | } |
2684 | |
2685 | pin_project! { |
2686 | /// Stream for the [`StreamExt::copied()`] method. |
2687 | #[derive(Clone, Debug)] |
2688 | #[must_use = "streams do nothing unless polled" ] |
2689 | pub struct Copied<S> { |
2690 | #[pin] |
2691 | stream: S, |
2692 | } |
2693 | } |
2694 | |
2695 | impl<'a, S, T: 'a> Stream for Copied<S> |
2696 | where |
2697 | S: Stream<Item = &'a T>, |
2698 | T: Copy, |
2699 | { |
2700 | type Item = T; |
2701 | |
2702 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2703 | let this: Projection<'_, S> = self.project(); |
2704 | let next: Option<&T> = ready!(this.stream.poll_next(cx)); |
2705 | Poll::Ready(next.copied()) |
2706 | } |
2707 | } |
2708 | |
2709 | pin_project! { |
2710 | /// Stream for the [`StreamExt::cycle()`] method. |
2711 | #[derive(Clone, Debug)] |
2712 | #[must_use = "streams do nothing unless polled" ] |
2713 | pub struct Cycle<S> { |
2714 | orig: S, |
2715 | #[pin] |
2716 | stream: S, |
2717 | } |
2718 | } |
2719 | |
2720 | impl<S> Stream for Cycle<S> |
2721 | where |
2722 | S: Stream + Clone, |
2723 | { |
2724 | type Item = S::Item; |
2725 | |
2726 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2727 | match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) { |
2728 | Some(item: ::Item) => Poll::Ready(Some(item)), |
2729 | None => { |
2730 | let new: S = self.as_mut().orig.clone(); |
2731 | self.as_mut().project().stream.set(new); |
2732 | self.project().stream.poll_next(cx) |
2733 | } |
2734 | } |
2735 | } |
2736 | } |
2737 | |
2738 | pin_project! { |
2739 | /// Stream for the [`StreamExt::enumerate()`] method. |
2740 | #[derive(Clone, Debug)] |
2741 | #[must_use = "streams do nothing unless polled" ] |
2742 | pub struct Enumerate<S> { |
2743 | #[pin] |
2744 | stream: S, |
2745 | i: usize, |
2746 | } |
2747 | } |
2748 | |
2749 | impl<S> Stream for Enumerate<S> |
2750 | where |
2751 | S: Stream, |
2752 | { |
2753 | type Item = (usize, S::Item); |
2754 | |
2755 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2756 | let this: Projection<'_, S> = self.project(); |
2757 | |
2758 | match ready!(this.stream.poll_next(cx)) { |
2759 | Some(v: ::Item) => { |
2760 | let ret: (usize, ::Item) = (*this.i, v); |
2761 | *this.i += 1; |
2762 | Poll::Ready(Some(ret)) |
2763 | } |
2764 | None => Poll::Ready(None), |
2765 | } |
2766 | } |
2767 | } |
2768 | |
2769 | pin_project! { |
2770 | /// Stream for the [`StreamExt::inspect()`] method. |
2771 | #[derive(Clone, Debug)] |
2772 | #[must_use = "streams do nothing unless polled" ] |
2773 | pub struct Inspect<S, F> { |
2774 | #[pin] |
2775 | stream: S, |
2776 | f: F, |
2777 | } |
2778 | } |
2779 | |
2780 | impl<S, F> Stream for Inspect<S, F> |
2781 | where |
2782 | S: Stream, |
2783 | F: FnMut(&S::Item), |
2784 | { |
2785 | type Item = S::Item; |
2786 | |
2787 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2788 | let mut this: Projection<'_, S, F> = self.project(); |
2789 | let next: Option<::Item> = ready!(this.stream.as_mut().poll_next(cx)); |
2790 | if let Some(x: &::Item) = &next { |
2791 | (this.f)(x); |
2792 | } |
2793 | Poll::Ready(next) |
2794 | } |
2795 | } |
2796 | |
2797 | /// Future for the [`StreamExt::nth()`] method. |
2798 | #[derive (Debug)] |
2799 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2800 | pub struct NthFuture<'a, S: ?Sized> { |
2801 | stream: &'a mut S, |
2802 | n: usize, |
2803 | } |
2804 | |
2805 | impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {} |
2806 | |
2807 | impl<'a, S> Future for NthFuture<'a, S> |
2808 | where |
2809 | S: Stream + Unpin + ?Sized, |
2810 | { |
2811 | type Output = Option<S::Item>; |
2812 | |
2813 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2814 | loop { |
2815 | match ready!(self.stream.poll_next(cx)) { |
2816 | Some(v: ::Item) => match self.n { |
2817 | 0 => return Poll::Ready(Some(v)), |
2818 | _ => self.n -= 1, |
2819 | }, |
2820 | None => return Poll::Ready(None), |
2821 | } |
2822 | } |
2823 | } |
2824 | } |
2825 | |
2826 | pin_project! { |
2827 | /// Future for the [`StreamExt::last()`] method. |
2828 | #[derive(Debug)] |
2829 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2830 | pub struct LastFuture<S: Stream> { |
2831 | #[pin] |
2832 | stream: S, |
2833 | last: Option<S::Item>, |
2834 | } |
2835 | } |
2836 | |
2837 | impl<S: Stream> Future for LastFuture<S> { |
2838 | type Output = Option<S::Item>; |
2839 | |
2840 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2841 | let mut this: Projection<'_, S> = self.project(); |
2842 | loop { |
2843 | match ready!(this.stream.as_mut().poll_next(cx)) { |
2844 | Some(new: ::Item) => *this.last = Some(new), |
2845 | None => return Poll::Ready(this.last.take()), |
2846 | } |
2847 | } |
2848 | } |
2849 | } |
2850 | |
2851 | /// Future for the [`StreamExt::find()`] method. |
2852 | #[derive (Debug)] |
2853 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2854 | pub struct FindFuture<'a, S: ?Sized, P> { |
2855 | stream: &'a mut S, |
2856 | predicate: P, |
2857 | } |
2858 | |
2859 | impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {} |
2860 | |
2861 | impl<'a, S, P> Future for FindFuture<'a, S, P> |
2862 | where |
2863 | S: Stream + Unpin + ?Sized, |
2864 | P: FnMut(&S::Item) -> bool, |
2865 | { |
2866 | type Output = Option<S::Item>; |
2867 | |
2868 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2869 | loop { |
2870 | match ready!(self.stream.poll_next(cx)) { |
2871 | Some(v: ::Item) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)), |
2872 | Some(_) => {} |
2873 | None => return Poll::Ready(None), |
2874 | } |
2875 | } |
2876 | } |
2877 | } |
2878 | |
2879 | /// Future for the [`StreamExt::find_map()`] method. |
2880 | #[derive (Debug)] |
2881 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2882 | pub struct FindMapFuture<'a, S: ?Sized, F> { |
2883 | stream: &'a mut S, |
2884 | f: F, |
2885 | } |
2886 | |
2887 | impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {} |
2888 | |
2889 | impl<'a, S, B, F> Future for FindMapFuture<'a, S, F> |
2890 | where |
2891 | S: Stream + Unpin + ?Sized, |
2892 | F: FnMut(S::Item) -> Option<B>, |
2893 | { |
2894 | type Output = Option<B>; |
2895 | |
2896 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2897 | loop { |
2898 | match ready!(self.stream.poll_next(cx)) { |
2899 | Some(v: ::Item) => { |
2900 | if let Some(v: B) = (&mut self.f)(v) { |
2901 | return Poll::Ready(Some(v)); |
2902 | } |
2903 | } |
2904 | None => return Poll::Ready(None), |
2905 | } |
2906 | } |
2907 | } |
2908 | } |
2909 | |
2910 | /// Future for the [`StreamExt::position()`] method. |
2911 | #[derive (Debug)] |
2912 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2913 | pub struct PositionFuture<'a, S: ?Sized, P> { |
2914 | stream: &'a mut S, |
2915 | predicate: P, |
2916 | index: usize, |
2917 | } |
2918 | |
2919 | impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {} |
2920 | |
2921 | impl<'a, S, P> Future for PositionFuture<'a, S, P> |
2922 | where |
2923 | S: Stream + Unpin + ?Sized, |
2924 | P: FnMut(S::Item) -> bool, |
2925 | { |
2926 | type Output = Option<usize>; |
2927 | |
2928 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2929 | loop { |
2930 | match ready!(self.stream.poll_next(cx)) { |
2931 | Some(v: ::Item) => { |
2932 | if (&mut self.predicate)(v) { |
2933 | return Poll::Ready(Some(self.index)); |
2934 | } else { |
2935 | self.index += 1; |
2936 | } |
2937 | } |
2938 | None => return Poll::Ready(None), |
2939 | } |
2940 | } |
2941 | } |
2942 | } |
2943 | |
2944 | /// Future for the [`StreamExt::all()`] method. |
2945 | #[derive (Debug)] |
2946 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2947 | pub struct AllFuture<'a, S: ?Sized, P> { |
2948 | stream: &'a mut S, |
2949 | predicate: P, |
2950 | } |
2951 | |
2952 | impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {} |
2953 | |
2954 | impl<S, P> Future for AllFuture<'_, S, P> |
2955 | where |
2956 | S: Stream + Unpin + ?Sized, |
2957 | P: FnMut(S::Item) -> bool, |
2958 | { |
2959 | type Output = bool; |
2960 | |
2961 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2962 | loop { |
2963 | match ready!(self.stream.poll_next(cx)) { |
2964 | Some(v: ::Item) => { |
2965 | if !(&mut self.predicate)(v) { |
2966 | return Poll::Ready(false); |
2967 | } |
2968 | } |
2969 | None => return Poll::Ready(true), |
2970 | } |
2971 | } |
2972 | } |
2973 | } |
2974 | |
2975 | /// Future for the [`StreamExt::any()`] method. |
2976 | #[derive (Debug)] |
2977 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2978 | pub struct AnyFuture<'a, S: ?Sized, P> { |
2979 | stream: &'a mut S, |
2980 | predicate: P, |
2981 | } |
2982 | |
2983 | impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {} |
2984 | |
2985 | impl<S, P> Future for AnyFuture<'_, S, P> |
2986 | where |
2987 | S: Stream + Unpin + ?Sized, |
2988 | P: FnMut(S::Item) -> bool, |
2989 | { |
2990 | type Output = bool; |
2991 | |
2992 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2993 | loop { |
2994 | match ready!(self.stream.poll_next(cx)) { |
2995 | Some(v: ::Item) => { |
2996 | if (&mut self.predicate)(v) { |
2997 | return Poll::Ready(true); |
2998 | } |
2999 | } |
3000 | None => return Poll::Ready(false), |
3001 | } |
3002 | } |
3003 | } |
3004 | } |
3005 | |
3006 | pin_project! { |
3007 | /// Future for the [`StreamExt::for_each()`] method. |
3008 | #[derive(Debug)] |
3009 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
3010 | pub struct ForEachFuture<S, F> { |
3011 | #[pin] |
3012 | stream: S, |
3013 | f: F, |
3014 | } |
3015 | } |
3016 | |
3017 | impl<S, F> Future for ForEachFuture<S, F> |
3018 | where |
3019 | S: Stream, |
3020 | F: FnMut(S::Item), |
3021 | { |
3022 | type Output = (); |
3023 | |
3024 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
3025 | let mut this: Projection<'_, S, F> = self.project(); |
3026 | loop { |
3027 | match ready!(this.stream.as_mut().poll_next(cx)) { |
3028 | Some(v: ::Item) => (this.f)(v), |
3029 | None => return Poll::Ready(()), |
3030 | } |
3031 | } |
3032 | } |
3033 | } |
3034 | |
3035 | /// Future for the [`StreamExt::try_for_each()`] method. |
3036 | #[derive (Debug)] |
3037 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
3038 | pub struct TryForEachFuture<'a, S: ?Sized, F> { |
3039 | stream: &'a mut S, |
3040 | f: F, |
3041 | } |
3042 | |
3043 | impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {} |
3044 | |
3045 | impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F> |
3046 | where |
3047 | S: Stream + Unpin + ?Sized, |
3048 | F: FnMut(S::Item) -> Result<(), E>, |
3049 | { |
3050 | type Output = Result<(), E>; |
3051 | |
3052 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
3053 | loop { |
3054 | match ready!(self.stream.poll_next(cx)) { |
3055 | None => return Poll::Ready(Ok(())), |
3056 | Some(v: ::Item) => (&mut self.f)(v)?, |
3057 | } |
3058 | } |
3059 | } |
3060 | } |
3061 | |
3062 | pin_project! { |
3063 | /// Stream for the [`StreamExt::zip()`] method. |
3064 | #[derive(Clone, Debug)] |
3065 | #[must_use = "streams do nothing unless polled" ] |
3066 | pub struct Zip<A: Stream, B> { |
3067 | item_slot: Option<A::Item>, |
3068 | #[pin] |
3069 | first: A, |
3070 | #[pin] |
3071 | second: B, |
3072 | } |
3073 | } |
3074 | |
3075 | impl<A: Stream, B: Stream> Stream for Zip<A, B> { |
3076 | type Item = (A::Item, B::Item); |
3077 | |
3078 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
3079 | let this: Projection<'_, A, B> = self.project(); |
3080 | |
3081 | if this.item_slot.is_none() { |
3082 | match this.first.poll_next(cx) { |
3083 | Poll::Pending => return Poll::Pending, |
3084 | Poll::Ready(None) => return Poll::Ready(None), |
3085 | Poll::Ready(Some(item: ::Item)) => *this.item_slot = Some(item), |
3086 | } |
3087 | } |
3088 | |
3089 | let second_item: Option<::Item> = ready!(this.second.poll_next(cx)); |
3090 | let first_item: ::Item = this.item_slot.take().unwrap(); |
3091 | Poll::Ready(second_item.map(|second_item: ::Item| (first_item, second_item))) |
3092 | } |
3093 | } |
3094 | |
3095 | pin_project! { |
3096 | /// Future for the [`StreamExt::unzip()`] method. |
3097 | #[derive(Debug)] |
3098 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
3099 | pub struct UnzipFuture<S, FromA, FromB> { |
3100 | #[pin] |
3101 | stream: S, |
3102 | res: Option<(FromA, FromB)>, |
3103 | } |
3104 | } |
3105 | |
3106 | impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB> |
3107 | where |
3108 | S: Stream<Item = (A, B)>, |
3109 | FromA: Default + Extend<A>, |
3110 | FromB: Default + Extend<B>, |
3111 | { |
3112 | type Output = (FromA, FromB); |
3113 | |
3114 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
3115 | let mut this: Projection<'_, S, FromA, …> = self.project(); |
3116 | |
3117 | loop { |
3118 | match ready!(this.stream.as_mut().poll_next(cx)) { |
3119 | Some((a: A, b: B)) => { |
3120 | let res: &mut (FromA, FromB) = this.res.as_mut().unwrap(); |
3121 | res.0.extend(iter:Some(a)); |
3122 | res.1.extend(iter:Some(b)); |
3123 | } |
3124 | None => return Poll::Ready(this.res.take().unwrap()), |
3125 | } |
3126 | } |
3127 | } |
3128 | } |
3129 | |