1//! Combinators for the [`Stream`] trait.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::stream::{self, StreamExt};
7//!
8//! # spin_on::spin_on(async {
9//! let mut s = stream::iter(vec![1, 2, 3]);
10//!
11//! assert_eq!(s.next().await, Some(1));
12//! assert_eq!(s.next().await, Some(2));
13//! assert_eq!(s.next().await, Some(3));
14//! assert_eq!(s.next().await, None);
15//! # });
16//! ```
17
18#[cfg(feature = "alloc")]
19extern crate alloc;
20
21#[doc(no_inline)]
22pub use futures_core::stream::Stream;
23
24#[cfg(feature = "alloc")]
25use alloc::boxed::Box;
26
27use core::fmt;
28use core::future::Future;
29use core::marker::PhantomData;
30use core::mem;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33
34use pin_project_lite::pin_project;
35
36use crate::ready;
37
38/// Converts a stream into a blocking iterator.
39///
40/// # Examples
41///
42/// ```
43/// use futures_lite::{pin, stream};
44///
45/// let stream = stream::once(7);
46/// pin!(stream);
47///
48/// let mut iter = stream::block_on(stream);
49/// assert_eq!(iter.next(), Some(7));
50/// assert_eq!(iter.next(), None);
51/// ```
52#[cfg(feature = "std")]
53pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
54 BlockOn(stream)
55}
56
57/// Iterator for the [`block_on()`] function.
58#[derive(Debug)]
59pub struct BlockOn<S>(S);
60
61#[cfg(feature = "std")]
62impl<S: Stream + Unpin> Iterator for BlockOn<S> {
63 type Item = S::Item;
64
65 fn next(&mut self) -> Option<Self::Item> {
66 crate::future::block_on(self.0.next())
67 }
68
69 fn size_hint(&self) -> (usize, Option<usize>) {
70 self.0.size_hint()
71 }
72
73 fn count(self) -> usize {
74 crate::future::block_on(self.0.count())
75 }
76
77 fn last(self) -> Option<Self::Item> {
78 crate::future::block_on(self.0.last())
79 }
80
81 fn nth(&mut self, n: usize) -> Option<Self::Item> {
82 crate::future::block_on(self.0.nth(n))
83 }
84
85 fn fold<B, F>(self, init: B, f: F) -> B
86 where
87 F: FnMut(B, Self::Item) -> B,
88 {
89 crate::future::block_on(self.0.fold(init, f))
90 }
91
92 fn for_each<F>(self, f: F) -> F::Output
93 where
94 F: FnMut(Self::Item),
95 {
96 crate::future::block_on(self.0.for_each(f))
97 }
98
99 fn all<F>(&mut self, f: F) -> bool
100 where
101 F: FnMut(Self::Item) -> bool,
102 {
103 crate::future::block_on(self.0.all(f))
104 }
105
106 fn any<F>(&mut self, f: F) -> bool
107 where
108 F: FnMut(Self::Item) -> bool,
109 {
110 crate::future::block_on(self.0.any(f))
111 }
112
113 fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
114 where
115 P: FnMut(&Self::Item) -> bool,
116 {
117 crate::future::block_on(self.0.find(predicate))
118 }
119
120 fn find_map<B, F>(&mut self, f: F) -> Option<B>
121 where
122 F: FnMut(Self::Item) -> Option<B>,
123 {
124 crate::future::block_on(self.0.find_map(f))
125 }
126
127 fn position<P>(&mut self, predicate: P) -> Option<usize>
128 where
129 P: FnMut(Self::Item) -> bool,
130 {
131 crate::future::block_on(self.0.position(predicate))
132 }
133}
134
135/// Creates an empty stream.
136///
137/// # Examples
138///
139/// ```
140/// use futures_lite::stream::{self, StreamExt};
141///
142/// # spin_on::spin_on(async {
143/// let mut s = stream::empty::<i32>();
144/// assert_eq!(s.next().await, None);
145/// # })
146/// ```
147pub fn empty<T>() -> Empty<T> {
148 Empty {
149 _marker: PhantomData,
150 }
151}
152
153/// Stream for the [`empty()`] function.
154#[derive(Clone, Debug)]
155#[must_use = "streams do nothing unless polled"]
156pub struct Empty<T> {
157 _marker: PhantomData<T>,
158}
159
160impl<T> Unpin for Empty<T> {}
161
162impl<T> Stream for Empty<T> {
163 type Item = T;
164
165 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
166 Poll::Ready(None)
167 }
168
169 fn size_hint(&self) -> (usize, Option<usize>) {
170 (0, Some(0))
171 }
172}
173
174/// Creates a stream from an iterator.
175///
176/// # Examples
177///
178/// ```
179/// use futures_lite::stream::{self, StreamExt};
180///
181/// # spin_on::spin_on(async {
182/// let mut s = stream::iter(vec![1, 2]);
183///
184/// assert_eq!(s.next().await, Some(1));
185/// assert_eq!(s.next().await, Some(2));
186/// assert_eq!(s.next().await, None);
187/// # })
188/// ```
189pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
190 Iter {
191 iter: iter.into_iter(),
192 }
193}
194
195/// Stream for the [`iter()`] function.
196#[derive(Clone, Debug)]
197#[must_use = "streams do nothing unless polled"]
198pub struct Iter<I> {
199 iter: I,
200}
201
202impl<I> Unpin for Iter<I> {}
203
204impl<I: Iterator> Stream for Iter<I> {
205 type Item = I::Item;
206
207 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
208 Poll::Ready(self.iter.next())
209 }
210
211 fn size_hint(&self) -> (usize, Option<usize>) {
212 self.iter.size_hint()
213 }
214}
215
216/// Creates a stream that yields a single item.
217///
218/// # Examples
219///
220/// ```
221/// use futures_lite::stream::{self, StreamExt};
222///
223/// # spin_on::spin_on(async {
224/// let mut s = stream::once(7);
225///
226/// assert_eq!(s.next().await, Some(7));
227/// assert_eq!(s.next().await, None);
228/// # })
229/// ```
230pub fn once<T>(t: T) -> Once<T> {
231 Once { value: Some(t) }
232}
233
234pin_project! {
235 /// Stream for the [`once()`] function.
236 #[derive(Clone, Debug)]
237 #[must_use = "streams do nothing unless polled"]
238 pub struct Once<T> {
239 value: Option<T>,
240 }
241}
242
243impl<T> Stream for Once<T> {
244 type Item = T;
245
246 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
247 Poll::Ready(self.project().value.take())
248 }
249
250 fn size_hint(&self) -> (usize, Option<usize>) {
251 if self.value.is_some() {
252 (1, Some(1))
253 } else {
254 (0, Some(0))
255 }
256 }
257}
258
259/// Creates a stream that is always pending.
260///
261/// # Examples
262///
263/// ```no_run
264/// use futures_lite::stream::{self, StreamExt};
265///
266/// # spin_on::spin_on(async {
267/// let mut s = stream::pending::<i32>();
268/// s.next().await;
269/// unreachable!();
270/// # })
271/// ```
272pub fn pending<T>() -> Pending<T> {
273 Pending {
274 _marker: PhantomData,
275 }
276}
277
278/// Stream for the [`pending()`] function.
279#[derive(Clone, Debug)]
280#[must_use = "streams do nothing unless polled"]
281pub struct Pending<T> {
282 _marker: PhantomData<T>,
283}
284
285impl<T> Unpin for Pending<T> {}
286
287impl<T> Stream for Pending<T> {
288 type Item = T;
289
290 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
291 Poll::Pending
292 }
293
294 fn size_hint(&self) -> (usize, Option<usize>) {
295 (0, Some(0))
296 }
297}
298
299/// Creates a stream from a function returning [`Poll`].
300///
301/// # Examples
302///
303/// ```
304/// use futures_lite::stream::{self, StreamExt};
305/// use std::task::{Context, Poll};
306///
307/// # spin_on::spin_on(async {
308/// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
309/// Poll::Ready(Some(7))
310/// }
311///
312/// assert_eq!(stream::poll_fn(f).next().await, Some(7));
313/// # })
314/// ```
315pub fn poll_fn<T, F>(f: F) -> PollFn<F>
316where
317 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
318{
319 PollFn { f }
320}
321
322/// Stream for the [`poll_fn()`] function.
323#[derive(Clone)]
324#[must_use = "streams do nothing unless polled"]
325pub struct PollFn<F> {
326 f: F,
327}
328
329impl<F> Unpin for PollFn<F> {}
330
331impl<F> fmt::Debug for PollFn<F> {
332 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333 f.debug_struct(name:"PollFn").finish()
334 }
335}
336
337impl<T, F> Stream for PollFn<F>
338where
339 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
340{
341 type Item = T;
342
343 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
344 (&mut self.f)(cx)
345 }
346}
347
348/// Creates an infinite stream that yields the same item repeatedly.
349///
350/// # Examples
351///
352/// ```
353/// use futures_lite::stream::{self, StreamExt};
354///
355/// # spin_on::spin_on(async {
356/// let mut s = stream::repeat(7);
357///
358/// assert_eq!(s.next().await, Some(7));
359/// assert_eq!(s.next().await, Some(7));
360/// # })
361/// ```
362pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
363 Repeat { item }
364}
365
366/// Stream for the [`repeat()`] function.
367#[derive(Clone, Debug)]
368#[must_use = "streams do nothing unless polled"]
369pub struct Repeat<T> {
370 item: T,
371}
372
373impl<T> Unpin for Repeat<T> {}
374
375impl<T: Clone> Stream for Repeat<T> {
376 type Item = T;
377
378 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
379 Poll::Ready(Some(self.item.clone()))
380 }
381
382 fn size_hint(&self) -> (usize, Option<usize>) {
383 (usize::max_value(), None)
384 }
385}
386
387/// Creates an infinite stream from a closure that generates items.
388///
389/// # Examples
390///
391/// ```
392/// use futures_lite::stream::{self, StreamExt};
393///
394/// # spin_on::spin_on(async {
395/// let mut s = stream::repeat_with(|| 7);
396///
397/// assert_eq!(s.next().await, Some(7));
398/// assert_eq!(s.next().await, Some(7));
399/// # })
400/// ```
401pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
402where
403 F: FnMut() -> T,
404{
405 RepeatWith { f: repeater }
406}
407
408/// Stream for the [`repeat_with()`] function.
409#[derive(Clone, Debug)]
410#[must_use = "streams do nothing unless polled"]
411pub struct RepeatWith<F> {
412 f: F,
413}
414
415impl<F> Unpin for RepeatWith<F> {}
416
417impl<T, F> Stream for RepeatWith<F>
418where
419 F: FnMut() -> T,
420{
421 type Item = T;
422
423 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
424 let item: T = (&mut self.f)();
425 Poll::Ready(Some(item))
426 }
427
428 fn size_hint(&self) -> (usize, Option<usize>) {
429 (usize::max_value(), None)
430 }
431}
432
433/// Creates a stream from a seed value and an async closure operating on it.
434///
435/// # Examples
436///
437/// ```
438/// use futures_lite::stream::{self, StreamExt};
439///
440/// # spin_on::spin_on(async {
441/// let s = stream::unfold(0, |mut n| async move {
442/// if n < 2 {
443/// let m = n + 1;
444/// Some((n, m))
445/// } else {
446/// None
447/// }
448/// });
449///
450/// let v: Vec<i32> = s.collect().await;
451/// assert_eq!(v, [0, 1]);
452/// # })
453/// ```
454pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
455where
456 F: FnMut(T) -> Fut,
457 Fut: Future<Output = Option<(Item, T)>>,
458{
459 Unfold {
460 f,
461 state: Some(seed),
462 fut: None,
463 }
464}
465
466pin_project! {
467 /// Stream for the [`unfold()`] function.
468 #[derive(Clone)]
469 #[must_use = "streams do nothing unless polled"]
470 pub struct Unfold<T, F, Fut> {
471 f: F,
472 state: Option<T>,
473 #[pin]
474 fut: Option<Fut>,
475 }
476}
477
478impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
479where
480 T: fmt::Debug,
481 Fut: fmt::Debug,
482{
483 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484 f&mut DebugStruct<'_, '_>.debug_struct("Unfold")
485 .field("state", &self.state)
486 .field(name:"fut", &self.fut)
487 .finish()
488 }
489}
490
491impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
492where
493 F: FnMut(T) -> Fut,
494 Fut: Future<Output = Option<(Item, T)>>,
495{
496 type Item = Item;
497
498 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
499 let mut this = self.project();
500
501 if let Some(state) = this.state.take() {
502 this.fut.set(Some((this.f)(state)));
503 }
504
505 let step = ready!(this
506 .fut
507 .as_mut()
508 .as_pin_mut()
509 .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
510 .poll(cx));
511 this.fut.set(None);
512
513 if let Some((item, next_state)) = step {
514 *this.state = Some(next_state);
515 Poll::Ready(Some(item))
516 } else {
517 Poll::Ready(None)
518 }
519 }
520}
521
522/// Creates a stream from a seed value and a fallible async closure operating on it.
523///
524/// # Examples
525///
526/// ```
527/// use futures_lite::stream::{self, StreamExt};
528///
529/// # spin_on::spin_on(async {
530/// let s = stream::try_unfold(0, |mut n| async move {
531/// if n < 2 {
532/// let m = n + 1;
533/// Ok(Some((n, m)))
534/// } else {
535/// std::io::Result::Ok(None)
536/// }
537/// });
538///
539/// let v: Vec<i32> = s.try_collect().await?;
540/// assert_eq!(v, [0, 1]);
541/// # std::io::Result::Ok(()) });
542/// ```
543pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
544where
545 F: FnMut(T) -> Fut,
546 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
547{
548 TryUnfold {
549 f,
550 state: Some(init),
551 fut: None,
552 }
553}
554
555pin_project! {
556 /// Stream for the [`try_unfold()`] function.
557 #[derive(Clone)]
558 #[must_use = "streams do nothing unless polled"]
559 pub struct TryUnfold<T, F, Fut> {
560 f: F,
561 state: Option<T>,
562 #[pin]
563 fut: Option<Fut>,
564 }
565}
566
567impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
568where
569 T: fmt::Debug,
570 Fut: fmt::Debug,
571{
572 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
573 f&mut DebugStruct<'_, '_>.debug_struct("TryUnfold")
574 .field("state", &self.state)
575 .field(name:"fut", &self.fut)
576 .finish()
577 }
578}
579
580impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
581where
582 F: FnMut(T) -> Fut,
583 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
584{
585 type Item = Result<Item, E>;
586
587 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
588 let mut this = self.project();
589
590 if let Some(state) = this.state.take() {
591 this.fut.set(Some((this.f)(state)));
592 }
593
594 match this.fut.as_mut().as_pin_mut() {
595 None => {
596 // The future previously errored
597 Poll::Ready(None)
598 }
599 Some(future) => {
600 let step = ready!(future.poll(cx));
601 this.fut.set(None);
602
603 match step {
604 Ok(Some((item, next_state))) => {
605 *this.state = Some(next_state);
606 Poll::Ready(Some(Ok(item)))
607 }
608 Ok(None) => Poll::Ready(None),
609 Err(e) => Poll::Ready(Some(Err(e))),
610 }
611 }
612 }
613 }
614}
615
616/// Creates a stream that invokes the given future as its first item, and then
617/// produces no more items.
618///
619/// # Example
620///
621/// ```
622/// use futures_lite::{stream, prelude::*};
623///
624/// # spin_on::spin_on(async {
625/// let mut stream = Box::pin(stream::once_future(async { 1 }));
626/// assert_eq!(stream.next().await, Some(1));
627/// assert_eq!(stream.next().await, None);
628/// # });
629/// ```
630pub fn once_future<F: Future>(future: F) -> OnceFuture<F> {
631 OnceFuture {
632 future: Some(future),
633 }
634}
635
636pin_project! {
637 /// Stream for the [`once_future()`] method.
638 #[derive(Debug)]
639 #[must_use = "futures do nothing unless you `.await` or poll them"]
640 pub struct OnceFuture<F> {
641 #[pin]
642 future: Option<F>,
643 }
644}
645
646impl<F: Future> Stream for OnceFuture<F> {
647 type Item = F::Output;
648
649 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
650 let mut this: Projection<'_, F> = self.project();
651
652 match this.future.as_mut().as_pin_mut().map(|f: Pin<&mut F>| f.poll(cx)) {
653 Some(Poll::Ready(t: ::Output)) => {
654 this.future.set(None);
655 Poll::Ready(Some(t))
656 }
657 Some(Poll::Pending) => Poll::Pending,
658 None => Poll::Ready(None),
659 }
660 }
661}
662
663/// Extension trait for [`Stream`].
664pub trait StreamExt: Stream {
665 /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
666 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
667 where
668 Self: Unpin,
669 {
670 Stream::poll_next(Pin::new(self), cx)
671 }
672
673 /// Retrieves the next item in the stream.
674 ///
675 /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
676 /// resume iteration after that.
677 ///
678 /// # Examples
679 ///
680 /// ```
681 /// use futures_lite::stream::{self, StreamExt};
682 ///
683 /// # spin_on::spin_on(async {
684 /// let mut s = stream::iter(1..=3);
685 ///
686 /// assert_eq!(s.next().await, Some(1));
687 /// assert_eq!(s.next().await, Some(2));
688 /// assert_eq!(s.next().await, Some(3));
689 /// assert_eq!(s.next().await, None);
690 /// # });
691 /// ```
692 fn next(&mut self) -> NextFuture<'_, Self>
693 where
694 Self: Unpin,
695 {
696 NextFuture { stream: self }
697 }
698
699 /// Retrieves the next item in the stream.
700 ///
701 /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
702 /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
703 ///
704 /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
705 ///
706 /// # Examples
707 ///
708 /// ```
709 /// use futures_lite::stream::{self, StreamExt};
710 ///
711 /// # spin_on::spin_on(async {
712 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
713 ///
714 /// assert_eq!(s.try_next().await, Ok(Some(1)));
715 /// assert_eq!(s.try_next().await, Ok(Some(2)));
716 /// assert_eq!(s.try_next().await, Err("error"));
717 /// assert_eq!(s.try_next().await, Ok(None));
718 /// # });
719 /// ```
720 fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
721 where
722 Self: Stream<Item = Result<T, E>> + Unpin,
723 {
724 TryNextFuture { stream: self }
725 }
726
727 /// Counts the number of items in the stream.
728 ///
729 /// # Examples
730 ///
731 /// ```
732 /// use futures_lite::stream::{self, StreamExt};
733 ///
734 /// # spin_on::spin_on(async {
735 /// let s1 = stream::iter(vec![0]);
736 /// let s2 = stream::iter(vec![1, 2, 3]);
737 ///
738 /// assert_eq!(s1.count().await, 1);
739 /// assert_eq!(s2.count().await, 3);
740 /// # });
741 /// ```
742 fn count(self) -> CountFuture<Self>
743 where
744 Self: Sized,
745 {
746 CountFuture {
747 stream: self,
748 count: 0,
749 }
750 }
751
752 /// Maps items of the stream to new values using a closure.
753 ///
754 /// # Examples
755 ///
756 /// ```
757 /// use futures_lite::stream::{self, StreamExt};
758 ///
759 /// # spin_on::spin_on(async {
760 /// let s = stream::iter(vec![1, 2, 3]);
761 /// let mut s = s.map(|x| 2 * x);
762 ///
763 /// assert_eq!(s.next().await, Some(2));
764 /// assert_eq!(s.next().await, Some(4));
765 /// assert_eq!(s.next().await, Some(6));
766 /// assert_eq!(s.next().await, None);
767 /// # });
768 /// ```
769 fn map<T, F>(self, f: F) -> Map<Self, F>
770 where
771 Self: Sized,
772 F: FnMut(Self::Item) -> T,
773 {
774 Map { stream: self, f }
775 }
776
777 /// Maps items to streams and then concatenates them.
778 ///
779 /// # Examples
780 ///
781 /// ```
782 /// use futures_lite::stream::{self, StreamExt};
783 ///
784 /// # spin_on::spin_on(async {
785 /// let words = stream::iter(vec!["one", "two"]);
786 ///
787 /// let s: String = words
788 /// .flat_map(|s| stream::iter(s.chars()))
789 /// .collect()
790 /// .await;
791 ///
792 /// assert_eq!(s, "onetwo");
793 /// # });
794 /// ```
795 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
796 where
797 Self: Sized,
798 U: Stream,
799 F: FnMut(Self::Item) -> U,
800 {
801 FlatMap {
802 stream: self.map(f),
803 inner_stream: None,
804 }
805 }
806
807 /// Concatenates inner streams.
808 ///
809 /// # Examples
810 ///
811 /// ```
812 /// use futures_lite::stream::{self, StreamExt};
813 ///
814 /// # spin_on::spin_on(async {
815 /// let s1 = stream::iter(vec![1, 2, 3]);
816 /// let s2 = stream::iter(vec![4, 5]);
817 ///
818 /// let s = stream::iter(vec![s1, s2]);
819 /// let v: Vec<_> = s.flatten().collect().await;
820 /// assert_eq!(v, [1, 2, 3, 4, 5]);
821 /// # });
822 /// ```
823 fn flatten(self) -> Flatten<Self>
824 where
825 Self: Sized,
826 Self::Item: Stream,
827 {
828 Flatten {
829 stream: self,
830 inner_stream: None,
831 }
832 }
833
834 /// Maps items of the stream to new values using an async closure.
835 ///
836 /// # Examples
837 ///
838 /// ```
839 /// use futures_lite::pin;
840 /// use futures_lite::stream::{self, StreamExt};
841 ///
842 /// # spin_on::spin_on(async {
843 /// let s = stream::iter(vec![1, 2, 3]);
844 /// let mut s = s.then(|x| async move { 2 * x });
845 ///
846 /// pin!(s);
847 /// assert_eq!(s.next().await, Some(2));
848 /// assert_eq!(s.next().await, Some(4));
849 /// assert_eq!(s.next().await, Some(6));
850 /// assert_eq!(s.next().await, None);
851 /// # });
852 /// ```
853 fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
854 where
855 Self: Sized,
856 F: FnMut(Self::Item) -> Fut,
857 Fut: Future,
858 {
859 Then {
860 stream: self,
861 future: None,
862 f,
863 }
864 }
865
866 /// Keeps items of the stream for which `predicate` returns `true`.
867 ///
868 /// # Examples
869 ///
870 /// ```
871 /// use futures_lite::stream::{self, StreamExt};
872 ///
873 /// # spin_on::spin_on(async {
874 /// let s = stream::iter(vec![1, 2, 3, 4]);
875 /// let mut s = s.filter(|i| i % 2 == 0);
876 ///
877 /// assert_eq!(s.next().await, Some(2));
878 /// assert_eq!(s.next().await, Some(4));
879 /// assert_eq!(s.next().await, None);
880 /// # });
881 /// ```
882 fn filter<P>(self, predicate: P) -> Filter<Self, P>
883 where
884 Self: Sized,
885 P: FnMut(&Self::Item) -> bool,
886 {
887 Filter {
888 stream: self,
889 predicate,
890 }
891 }
892
893 /// Filters and maps items of the stream using a closure.
894 ///
895 /// # Examples
896 ///
897 /// ```
898 /// use futures_lite::stream::{self, StreamExt};
899 ///
900 /// # spin_on::spin_on(async {
901 /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
902 /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
903 ///
904 /// assert_eq!(s.next().await, Some(1));
905 /// assert_eq!(s.next().await, Some(3));
906 /// assert_eq!(s.next().await, Some(5));
907 /// assert_eq!(s.next().await, None);
908 /// # });
909 /// ```
910 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
911 where
912 Self: Sized,
913 F: FnMut(Self::Item) -> Option<T>,
914 {
915 FilterMap { stream: self, f }
916 }
917
918 /// Takes only the first `n` items of the stream.
919 ///
920 /// # Examples
921 ///
922 /// ```
923 /// use futures_lite::stream::{self, StreamExt};
924 ///
925 /// # spin_on::spin_on(async {
926 /// let mut s = stream::repeat(7).take(2);
927 ///
928 /// assert_eq!(s.next().await, Some(7));
929 /// assert_eq!(s.next().await, Some(7));
930 /// assert_eq!(s.next().await, None);
931 /// # });
932 /// ```
933 fn take(self, n: usize) -> Take<Self>
934 where
935 Self: Sized,
936 {
937 Take { stream: self, n }
938 }
939
940 /// Takes items while `predicate` returns `true`.
941 ///
942 /// # Examples
943 ///
944 /// ```
945 /// use futures_lite::stream::{self, StreamExt};
946 ///
947 /// # spin_on::spin_on(async {
948 /// let s = stream::iter(vec![1, 2, 3, 4]);
949 /// let mut s = s.take_while(|x| *x < 3);
950 ///
951 /// assert_eq!(s.next().await, Some(1));
952 /// assert_eq!(s.next().await, Some(2));
953 /// assert_eq!(s.next().await, None);
954 /// # });
955 /// ```
956 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
957 where
958 Self: Sized,
959 P: FnMut(&Self::Item) -> bool,
960 {
961 TakeWhile {
962 stream: self,
963 predicate,
964 }
965 }
966
967 /// Skips the first `n` items of the stream.
968 ///
969 /// # Examples
970 ///
971 /// ```
972 /// use futures_lite::stream::{self, StreamExt};
973 ///
974 /// # spin_on::spin_on(async {
975 /// let s = stream::iter(vec![1, 2, 3]);
976 /// let mut s = s.skip(2);
977 ///
978 /// assert_eq!(s.next().await, Some(3));
979 /// assert_eq!(s.next().await, None);
980 /// # });
981 /// ```
982 fn skip(self, n: usize) -> Skip<Self>
983 where
984 Self: Sized,
985 {
986 Skip { stream: self, n }
987 }
988
989 /// Skips items while `predicate` returns `true`.
990 ///
991 /// # Examples
992 ///
993 /// ```
994 /// use futures_lite::stream::{self, StreamExt};
995 ///
996 /// # spin_on::spin_on(async {
997 /// let s = stream::iter(vec![-1i32, 0, 1]);
998 /// let mut s = s.skip_while(|x| x.is_negative());
999 ///
1000 /// assert_eq!(s.next().await, Some(0));
1001 /// assert_eq!(s.next().await, Some(1));
1002 /// assert_eq!(s.next().await, None);
1003 /// # });
1004 /// ```
1005 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1006 where
1007 Self: Sized,
1008 P: FnMut(&Self::Item) -> bool,
1009 {
1010 SkipWhile {
1011 stream: self,
1012 predicate: Some(predicate),
1013 }
1014 }
1015
1016 /// Yields every `step`th item.
1017 ///
1018 /// # Panics
1019 ///
1020 /// This method will panic if the `step` is 0.
1021 ///
1022 /// # Examples
1023 ///
1024 /// ```
1025 /// use futures_lite::stream::{self, StreamExt};
1026 ///
1027 /// # spin_on::spin_on(async {
1028 /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
1029 /// let mut s = s.step_by(2);
1030 ///
1031 /// assert_eq!(s.next().await, Some(0));
1032 /// assert_eq!(s.next().await, Some(2));
1033 /// assert_eq!(s.next().await, Some(4));
1034 /// assert_eq!(s.next().await, None);
1035 /// # });
1036 /// ```
1037 fn step_by(self, step: usize) -> StepBy<Self>
1038 where
1039 Self: Sized,
1040 {
1041 assert!(step > 0, "`step` must be greater than zero");
1042 StepBy {
1043 stream: self,
1044 step,
1045 i: 0,
1046 }
1047 }
1048
1049 /// Appends another stream to the end of this one.
1050 ///
1051 /// # Examples
1052 ///
1053 /// ```
1054 /// use futures_lite::stream::{self, StreamExt};
1055 ///
1056 /// # spin_on::spin_on(async {
1057 /// let s1 = stream::iter(vec![1, 2]);
1058 /// let s2 = stream::iter(vec![7, 8]);
1059 /// let mut s = s1.chain(s2);
1060 ///
1061 /// assert_eq!(s.next().await, Some(1));
1062 /// assert_eq!(s.next().await, Some(2));
1063 /// assert_eq!(s.next().await, Some(7));
1064 /// assert_eq!(s.next().await, Some(8));
1065 /// assert_eq!(s.next().await, None);
1066 /// # });
1067 /// ```
1068 fn chain<U>(self, other: U) -> Chain<Self, U>
1069 where
1070 Self: Sized,
1071 U: Stream<Item = Self::Item> + Sized,
1072 {
1073 Chain {
1074 first: self.fuse(),
1075 second: other.fuse(),
1076 }
1077 }
1078
1079 /// Clones all items.
1080 ///
1081 /// # Examples
1082 ///
1083 /// ```
1084 /// use futures_lite::stream::{self, StreamExt};
1085 ///
1086 /// # spin_on::spin_on(async {
1087 /// let s = stream::iter(vec![&1, &2]);
1088 /// let mut s = s.cloned();
1089 ///
1090 /// assert_eq!(s.next().await, Some(1));
1091 /// assert_eq!(s.next().await, Some(2));
1092 /// assert_eq!(s.next().await, None);
1093 /// # });
1094 /// ```
1095 fn cloned<'a, T>(self) -> Cloned<Self>
1096 where
1097 Self: Stream<Item = &'a T> + Sized,
1098 T: Clone + 'a,
1099 {
1100 Cloned { stream: self }
1101 }
1102
1103 /// Copies all items.
1104 ///
1105 /// # Examples
1106 ///
1107 /// ```
1108 /// use futures_lite::stream::{self, StreamExt};
1109 ///
1110 /// # spin_on::spin_on(async {
1111 /// let s = stream::iter(vec![&1, &2]);
1112 /// let mut s = s.copied();
1113 ///
1114 /// assert_eq!(s.next().await, Some(1));
1115 /// assert_eq!(s.next().await, Some(2));
1116 /// assert_eq!(s.next().await, None);
1117 /// # });
1118 /// ```
1119 fn copied<'a, T>(self) -> Copied<Self>
1120 where
1121 Self: Stream<Item = &'a T> + Sized,
1122 T: Copy + 'a,
1123 {
1124 Copied { stream: self }
1125 }
1126
1127 /// Collects all items in the stream into a collection.
1128 ///
1129 /// # Examples
1130 ///
1131 /// ```
1132 /// use futures_lite::stream::{self, StreamExt};
1133 ///
1134 /// # spin_on::spin_on(async {
1135 /// let mut s = stream::iter(1..=3);
1136 ///
1137 /// let items: Vec<_> = s.collect().await;
1138 /// assert_eq!(items, [1, 2, 3]);
1139 /// # });
1140 /// ```
1141 fn collect<C>(self) -> CollectFuture<Self, C>
1142 where
1143 Self: Sized,
1144 C: Default + Extend<Self::Item>,
1145 {
1146 CollectFuture {
1147 stream: self,
1148 collection: Default::default(),
1149 }
1150 }
1151
1152 /// Collects all items in the fallible stream into a collection.
1153 ///
1154 /// ```
1155 /// use futures_lite::stream::{self, StreamExt};
1156 ///
1157 /// # spin_on::spin_on(async {
1158 /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1159 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1160 /// assert_eq!(res, Err(2));
1161 ///
1162 /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1163 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1164 /// assert_eq!(res, Ok(vec![1, 2, 3]));
1165 /// # })
1166 /// ```
1167 fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1168 where
1169 Self: Stream<Item = Result<T, E>> + Sized,
1170 C: Default + Extend<T>,
1171 {
1172 TryCollectFuture {
1173 stream: self,
1174 items: Default::default(),
1175 }
1176 }
1177
1178 /// Partitions items into those for which `predicate` is `true` and those for which it is
1179 /// `false`, and then collects them into two collections.
1180 ///
1181 /// # Examples
1182 ///
1183 /// ```
1184 /// use futures_lite::stream::{self, StreamExt};
1185 ///
1186 /// # spin_on::spin_on(async {
1187 /// let s = stream::iter(vec![1, 2, 3]);
1188 /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1189 ///
1190 /// assert_eq!(even, &[2]);
1191 /// assert_eq!(odd, &[1, 3]);
1192 /// # })
1193 /// ```
1194 fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1195 where
1196 Self: Sized,
1197 B: Default + Extend<Self::Item>,
1198 P: FnMut(&Self::Item) -> bool,
1199 {
1200 PartitionFuture {
1201 stream: self,
1202 predicate,
1203 res: Some(Default::default()),
1204 }
1205 }
1206
1207 /// Accumulates a computation over the stream.
1208 ///
1209 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1210 /// the accumulator and each item in the stream. The final accumulator value is returned.
1211 ///
1212 /// # Examples
1213 ///
1214 /// ```
1215 /// use futures_lite::stream::{self, StreamExt};
1216 ///
1217 /// # spin_on::spin_on(async {
1218 /// let s = stream::iter(vec![1, 2, 3]);
1219 /// let sum = s.fold(0, |acc, x| acc + x).await;
1220 ///
1221 /// assert_eq!(sum, 6);
1222 /// # })
1223 /// ```
1224 fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1225 where
1226 Self: Sized,
1227 F: FnMut(T, Self::Item) -> T,
1228 {
1229 FoldFuture {
1230 stream: self,
1231 f,
1232 acc: Some(init),
1233 }
1234 }
1235
1236 /// Accumulates a fallible computation over the stream.
1237 ///
1238 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1239 /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1240 /// error if `f` failed the computation.
1241 ///
1242 /// # Examples
1243 ///
1244 /// ```
1245 /// use futures_lite::stream::{self, StreamExt};
1246 ///
1247 /// # spin_on::spin_on(async {
1248 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1249 ///
1250 /// let sum = s.try_fold(0, |acc, v| {
1251 /// if (acc + v) % 2 == 1 {
1252 /// Ok(acc + v)
1253 /// } else {
1254 /// Err("fail")
1255 /// }
1256 /// })
1257 /// .await;
1258 ///
1259 /// assert_eq!(sum, Err("fail"));
1260 /// # })
1261 /// ```
1262 fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1263 where
1264 Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1265 F: FnMut(B, T) -> Result<B, E>,
1266 {
1267 TryFoldFuture {
1268 stream: self,
1269 f,
1270 acc: Some(init),
1271 }
1272 }
1273
1274 /// Maps items of the stream to new values using a state value and a closure.
1275 ///
1276 /// Scanning begins with the inital state set to `initial_state`, and then applies `f` to the
1277 /// state and each item in the stream. The stream stops when `f` returns `None`.
1278 ///
1279 /// # Examples
1280 ///
1281 /// ```
1282 /// use futures_lite::stream::{self, StreamExt};
1283 ///
1284 /// # spin_on::spin_on(async {
1285 /// let s = stream::iter(vec![1, 2, 3]);
1286 /// let mut s = s.scan(1, |state, x| {
1287 /// *state = *state * x;
1288 /// Some(-*state)
1289 /// });
1290 ///
1291 /// assert_eq!(s.next().await, Some(-1));
1292 /// assert_eq!(s.next().await, Some(-2));
1293 /// assert_eq!(s.next().await, Some(-6));
1294 /// assert_eq!(s.next().await, None);
1295 /// # })
1296 /// ```
1297 fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1298 where
1299 Self: Sized,
1300 F: FnMut(&mut St, Self::Item) -> Option<B>,
1301 {
1302 Scan {
1303 stream: self,
1304 state_f: (initial_state, f),
1305 }
1306 }
1307
1308 /// Fuses the stream so that it stops yielding items after the first [`None`].
1309 ///
1310 /// # Examples
1311 ///
1312 /// ```
1313 /// use futures_lite::stream::{self, StreamExt};
1314 ///
1315 /// # spin_on::spin_on(async {
1316 /// let mut s = stream::once(1).fuse();
1317 ///
1318 /// assert_eq!(s.next().await, Some(1));
1319 /// assert_eq!(s.next().await, None);
1320 /// assert_eq!(s.next().await, None);
1321 /// # })
1322 /// ```
1323 fn fuse(self) -> Fuse<Self>
1324 where
1325 Self: Sized,
1326 {
1327 Fuse {
1328 stream: self,
1329 done: false,
1330 }
1331 }
1332
1333 /// Repeats the stream from beginning to end, forever.
1334 ///
1335 /// # Examples
1336 ///
1337 /// ```
1338 /// use futures_lite::stream::{self, StreamExt};
1339 ///
1340 /// # spin_on::spin_on(async {
1341 /// let mut s = stream::iter(vec![1, 2]).cycle();
1342 ///
1343 /// assert_eq!(s.next().await, Some(1));
1344 /// assert_eq!(s.next().await, Some(2));
1345 /// assert_eq!(s.next().await, Some(1));
1346 /// assert_eq!(s.next().await, Some(2));
1347 /// # });
1348 /// ```
1349 fn cycle(self) -> Cycle<Self>
1350 where
1351 Self: Clone + Sized,
1352 {
1353 Cycle {
1354 orig: self.clone(),
1355 stream: self,
1356 }
1357 }
1358
1359 /// Enumerates items, mapping them to `(index, item)`.
1360 ///
1361 /// # Examples
1362 ///
1363 /// ```
1364 /// use futures_lite::stream::{self, StreamExt};
1365 ///
1366 /// # spin_on::spin_on(async {
1367 /// let s = stream::iter(vec!['a', 'b', 'c']);
1368 /// let mut s = s.enumerate();
1369 ///
1370 /// assert_eq!(s.next().await, Some((0, 'a')));
1371 /// assert_eq!(s.next().await, Some((1, 'b')));
1372 /// assert_eq!(s.next().await, Some((2, 'c')));
1373 /// assert_eq!(s.next().await, None);
1374 /// # });
1375 /// ```
1376 fn enumerate(self) -> Enumerate<Self>
1377 where
1378 Self: Sized,
1379 {
1380 Enumerate { stream: self, i: 0 }
1381 }
1382
1383 /// Calls a closure on each item and passes it on.
1384 ///
1385 /// # Examples
1386 ///
1387 /// ```
1388 /// use futures_lite::stream::{self, StreamExt};
1389 ///
1390 /// # spin_on::spin_on(async {
1391 /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1392 ///
1393 /// let sum = s
1394 /// .inspect(|x| println!("about to filter {}", x))
1395 /// .filter(|x| x % 2 == 0)
1396 /// .inspect(|x| println!("made it through filter: {}", x))
1397 /// .fold(0, |sum, i| sum + i)
1398 /// .await;
1399 /// # });
1400 /// ```
1401 fn inspect<F>(self, f: F) -> Inspect<Self, F>
1402 where
1403 Self: Sized,
1404 F: FnMut(&Self::Item),
1405 {
1406 Inspect { stream: self, f }
1407 }
1408
1409 /// Gets the `n`th item of the stream.
1410 ///
1411 /// In the end, `n+1` items of the stream will be consumed.
1412 ///
1413 /// # Examples
1414 ///
1415 /// ```
1416 /// use futures_lite::stream::{self, StreamExt};
1417 ///
1418 /// # spin_on::spin_on(async {
1419 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1420 ///
1421 /// assert_eq!(s.nth(2).await, Some(2));
1422 /// assert_eq!(s.nth(2).await, Some(5));
1423 /// assert_eq!(s.nth(2).await, None);
1424 /// # });
1425 /// ```
1426 fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1427 where
1428 Self: Unpin,
1429 {
1430 NthFuture { stream: self, n }
1431 }
1432
1433 /// Returns the last item in the stream.
1434 ///
1435 /// # Examples
1436 ///
1437 /// ```
1438 /// use futures_lite::stream::{self, StreamExt};
1439 ///
1440 /// # spin_on::spin_on(async {
1441 /// let s = stream::iter(vec![1, 2, 3, 4]);
1442 /// assert_eq!(s.last().await, Some(4));
1443 ///
1444 /// let s = stream::empty::<i32>();
1445 /// assert_eq!(s.last().await, None);
1446 /// # });
1447 /// ```
1448 fn last(self) -> LastFuture<Self>
1449 where
1450 Self: Sized,
1451 {
1452 LastFuture {
1453 stream: self,
1454 last: None,
1455 }
1456 }
1457
1458 /// Finds the first item of the stream for which `predicate` returns `true`.
1459 ///
1460 /// # Examples
1461 ///
1462 /// ```
1463 /// use futures_lite::stream::{self, StreamExt};
1464 ///
1465 /// # spin_on::spin_on(async {
1466 /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1467 ///
1468 /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1469 /// assert_eq!(s.next().await, Some(13));
1470 /// # });
1471 /// ```
1472 fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1473 where
1474 Self: Unpin,
1475 P: FnMut(&Self::Item) -> bool,
1476 {
1477 FindFuture {
1478 stream: self,
1479 predicate,
1480 }
1481 }
1482
1483 /// Applies a closure to items in the stream and returns the first [`Some`] result.
1484 ///
1485 /// # Examples
1486 ///
1487 /// ```
1488 /// use futures_lite::stream::{self, StreamExt};
1489 ///
1490 /// # spin_on::spin_on(async {
1491 /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1492 /// let number = s.find_map(|s| s.parse().ok()).await;
1493 ///
1494 /// assert_eq!(number, Some(2));
1495 /// # });
1496 /// ```
1497 fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1498 where
1499 Self: Unpin,
1500 F: FnMut(Self::Item) -> Option<B>,
1501 {
1502 FindMapFuture { stream: self, f }
1503 }
1504
1505 /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1506 ///
1507 /// # Examples
1508 ///
1509 /// ```
1510 /// use futures_lite::stream::{self, StreamExt};
1511 ///
1512 /// # spin_on::spin_on(async {
1513 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1514 ///
1515 /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1516 /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1517 /// assert_eq!(s.position(|x| x == 9).await, None);
1518 /// # });
1519 /// ```
1520 fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1521 where
1522 Self: Unpin,
1523 P: FnMut(Self::Item) -> bool,
1524 {
1525 PositionFuture {
1526 stream: self,
1527 predicate,
1528 index: 0,
1529 }
1530 }
1531
1532 /// Tests if `predicate` returns `true` for all items in the stream.
1533 ///
1534 /// The result is `true` for an empty stream.
1535 ///
1536 /// # Examples
1537 ///
1538 /// ```
1539 /// use futures_lite::stream::{self, StreamExt};
1540 ///
1541 /// # spin_on::spin_on(async {
1542 /// let mut s = stream::iter(vec![1, 2, 3]);
1543 /// assert!(!s.all(|x| x % 2 == 0).await);
1544 ///
1545 /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1546 /// assert!(s.all(|x| x % 2 == 0).await);
1547 ///
1548 /// let mut s = stream::empty::<i32>();
1549 /// assert!(s.all(|x| x % 2 == 0).await);
1550 /// # });
1551 /// ```
1552 fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1553 where
1554 Self: Unpin,
1555 P: FnMut(Self::Item) -> bool,
1556 {
1557 AllFuture {
1558 stream: self,
1559 predicate,
1560 }
1561 }
1562
1563 /// Tests if `predicate` returns `true` for any item in the stream.
1564 ///
1565 /// The result is `false` for an empty stream.
1566 ///
1567 /// # Examples
1568 ///
1569 /// ```
1570 /// use futures_lite::stream::{self, StreamExt};
1571 ///
1572 /// # spin_on::spin_on(async {
1573 /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1574 /// assert!(!s.any(|x| x % 2 == 0).await);
1575 ///
1576 /// let mut s = stream::iter(vec![1, 2, 3]);
1577 /// assert!(s.any(|x| x % 2 == 0).await);
1578 ///
1579 /// let mut s = stream::empty::<i32>();
1580 /// assert!(!s.any(|x| x % 2 == 0).await);
1581 /// # });
1582 /// ```
1583 fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1584 where
1585 Self: Unpin,
1586 P: FnMut(Self::Item) -> bool,
1587 {
1588 AnyFuture {
1589 stream: self,
1590 predicate,
1591 }
1592 }
1593
1594 /// Calls a closure on each item of the stream.
1595 ///
1596 /// # Examples
1597 ///
1598 /// ```
1599 /// use futures_lite::stream::{self, StreamExt};
1600 ///
1601 /// # spin_on::spin_on(async {
1602 /// let mut s = stream::iter(vec![1, 2, 3]);
1603 /// s.for_each(|s| println!("{}", s)).await;
1604 /// # });
1605 /// ```
1606 fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1607 where
1608 Self: Sized,
1609 F: FnMut(Self::Item),
1610 {
1611 ForEachFuture { stream: self, f }
1612 }
1613
1614 /// Calls a fallible closure on each item of the stream, stopping on first error.
1615 ///
1616 /// # Examples
1617 ///
1618 /// ```
1619 /// use futures_lite::stream::{self, StreamExt};
1620 ///
1621 /// # spin_on::spin_on(async {
1622 /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1623 ///
1624 /// let mut v = vec![];
1625 /// let res = s
1626 /// .try_for_each(|n| {
1627 /// if n < 2 {
1628 /// v.push(n);
1629 /// Ok(())
1630 /// } else {
1631 /// Err("too big")
1632 /// }
1633 /// })
1634 /// .await;
1635 ///
1636 /// assert_eq!(v, &[0, 1]);
1637 /// assert_eq!(res, Err("too big"));
1638 /// # });
1639 /// ```
1640 fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1641 where
1642 Self: Unpin,
1643 F: FnMut(Self::Item) -> Result<(), E>,
1644 {
1645 TryForEachFuture { stream: self, f }
1646 }
1647
1648 /// Zips up two streams into a single stream of pairs.
1649 ///
1650 /// The stream of pairs stops when either of the original two streams is exhausted.
1651 ///
1652 /// # Examples
1653 ///
1654 /// ```
1655 /// use futures_lite::stream::{self, StreamExt};
1656 ///
1657 /// # spin_on::spin_on(async {
1658 /// let l = stream::iter(vec![1, 2, 3]);
1659 /// let r = stream::iter(vec![4, 5, 6, 7]);
1660 /// let mut s = l.zip(r);
1661 ///
1662 /// assert_eq!(s.next().await, Some((1, 4)));
1663 /// assert_eq!(s.next().await, Some((2, 5)));
1664 /// assert_eq!(s.next().await, Some((3, 6)));
1665 /// assert_eq!(s.next().await, None);
1666 /// # });
1667 /// ```
1668 fn zip<U>(self, other: U) -> Zip<Self, U>
1669 where
1670 Self: Sized,
1671 U: Stream,
1672 {
1673 Zip {
1674 item_slot: None,
1675 first: self,
1676 second: other,
1677 }
1678 }
1679
1680 /// Collects a stream of pairs into a pair of collections.
1681 ///
1682 /// # Examples
1683 ///
1684 /// ```
1685 /// use futures_lite::stream::{self, StreamExt};
1686 ///
1687 /// # spin_on::spin_on(async {
1688 /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1689 /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1690 ///
1691 /// assert_eq!(left, [1, 3]);
1692 /// assert_eq!(right, [2, 4]);
1693 /// # });
1694 /// ```
1695 fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1696 where
1697 FromA: Default + Extend<A>,
1698 FromB: Default + Extend<B>,
1699 Self: Stream<Item = (A, B)> + Sized,
1700 {
1701 UnzipFuture {
1702 stream: self,
1703 res: Some(Default::default()),
1704 }
1705 }
1706
1707 /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1708 ///
1709 /// # Examples
1710 ///
1711 /// ```
1712 /// use futures_lite::stream::{self, StreamExt};
1713 /// use futures_lite::stream::{once, pending};
1714 ///
1715 /// # spin_on::spin_on(async {
1716 /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1717 /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1718 ///
1719 /// // The first future wins.
1720 /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1721 /// # })
1722 /// ```
1723 fn or<S>(self, other: S) -> Or<Self, S>
1724 where
1725 Self: Sized,
1726 S: Stream<Item = Self::Item>,
1727 {
1728 Or {
1729 stream1: self,
1730 stream2: other,
1731 }
1732 }
1733
1734 /// Merges with `other` stream, with no preference for either stream when both are ready.
1735 ///
1736 /// # Examples
1737 ///
1738 /// ```
1739 /// use futures_lite::stream::{self, StreamExt};
1740 /// use futures_lite::stream::{once, pending};
1741 ///
1742 /// # spin_on::spin_on(async {
1743 /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1744 /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1745 ///
1746 /// // One of the two stream is randomly chosen as the winner.
1747 /// let res = once(1).race(once(2)).next().await;
1748 /// # })
1749 /// ```
1750 #[cfg(feature = "std")]
1751 fn race<S>(self, other: S) -> Race<Self, S>
1752 where
1753 Self: Sized,
1754 S: Stream<Item = Self::Item>,
1755 {
1756 Race {
1757 stream1: self,
1758 stream2: other,
1759 }
1760 }
1761
1762 /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
1763 ///
1764 /// # Examples
1765 ///
1766 /// ```
1767 /// use futures_lite::stream::{self, StreamExt};
1768 ///
1769 /// # spin_on::spin_on(async {
1770 /// let a = stream::once(1);
1771 /// let b = stream::empty();
1772 ///
1773 /// // Streams of different types can be stored in
1774 /// // the same collection when they are boxed:
1775 /// let streams = vec![a.boxed(), b.boxed()];
1776 /// # })
1777 /// ```
1778 #[cfg(feature = "alloc")]
1779 fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
1780 where
1781 Self: Send + Sized + 'a,
1782 {
1783 Box::pin(self)
1784 }
1785
1786 /// Boxes the stream and changes its type to `dyn Stream + 'a`.
1787 ///
1788 /// # Examples
1789 ///
1790 /// ```
1791 /// use futures_lite::stream::{self, StreamExt};
1792 ///
1793 /// # spin_on::spin_on(async {
1794 /// let a = stream::once(1);
1795 /// let b = stream::empty();
1796 ///
1797 /// // Streams of different types can be stored in
1798 /// // the same collection when they are boxed:
1799 /// let streams = vec![a.boxed_local(), b.boxed_local()];
1800 /// # })
1801 /// ```
1802 #[cfg(feature = "alloc")]
1803 fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
1804 where
1805 Self: Sized + 'a,
1806 {
1807 Box::pin(self)
1808 }
1809}
1810
1811impl<S: Stream + ?Sized> StreamExt for S {}
1812
1813/// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
1814///
1815/// # Examples
1816///
1817/// ```
1818/// use futures_lite::stream::{self, StreamExt};
1819///
1820/// // These two lines are equivalent:
1821/// let s1: stream::Boxed<i32> = stream::once(7).boxed();
1822/// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
1823/// ```
1824#[cfg(feature = "alloc")]
1825pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
1826
1827/// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
1828///
1829/// # Examples
1830///
1831/// ```
1832/// use futures_lite::stream::{self, StreamExt};
1833///
1834/// // These two lines are equivalent:
1835/// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
1836/// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
1837/// ```
1838#[cfg(feature = "alloc")]
1839pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
1840
1841/// Future for the [`StreamExt::next()`] method.
1842#[derive(Debug)]
1843#[must_use = "futures do nothing unless you `.await` or poll them"]
1844pub struct NextFuture<'a, S: ?Sized> {
1845 stream: &'a mut S,
1846}
1847
1848impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
1849
1850impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
1851 type Output = Option<S::Item>;
1852
1853 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1854 self.stream.poll_next(cx)
1855 }
1856}
1857
1858/// Future for the [`StreamExt::try_next()`] method.
1859#[derive(Debug)]
1860#[must_use = "futures do nothing unless you `.await` or poll them"]
1861pub struct TryNextFuture<'a, S: ?Sized> {
1862 stream: &'a mut S,
1863}
1864
1865impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
1866
1867impl<T, E, S> Future for TryNextFuture<'_, S>
1868where
1869 S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
1870{
1871 type Output = Result<Option<T>, E>;
1872
1873 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1874 let res: Option> = ready!(self.stream.poll_next(cx));
1875 Poll::Ready(res.transpose())
1876 }
1877}
1878
1879pin_project! {
1880 /// Future for the [`StreamExt::count()`] method.
1881 #[derive(Debug)]
1882 #[must_use = "futures do nothing unless you `.await` or poll them"]
1883 pub struct CountFuture<S: ?Sized> {
1884 count: usize,
1885 #[pin]
1886 stream: S,
1887 }
1888}
1889
1890impl<S: Stream + ?Sized> Future for CountFuture<S> {
1891 type Output = usize;
1892
1893 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1894 loop {
1895 match ready!(self.as_mut().project().stream.poll_next(cx)) {
1896 None => return Poll::Ready(self.count),
1897 Some(_) => *self.as_mut().project().count += 1,
1898 }
1899 }
1900 }
1901}
1902
1903pin_project! {
1904 /// Future for the [`StreamExt::collect()`] method.
1905 #[derive(Debug)]
1906 #[must_use = "futures do nothing unless you `.await` or poll them"]
1907 pub struct CollectFuture<S, C> {
1908 #[pin]
1909 stream: S,
1910 collection: C,
1911 }
1912}
1913
1914impl<S, C> Future for CollectFuture<S, C>
1915where
1916 S: Stream,
1917 C: Default + Extend<S::Item>,
1918{
1919 type Output = C;
1920
1921 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
1922 let mut this: Projection<'_, S, C> = self.as_mut().project();
1923 loop {
1924 match ready!(this.stream.as_mut().poll_next(cx)) {
1925 Some(e: ::Item) => this.collection.extend(iter:Some(e)),
1926 None => return Poll::Ready(mem::take(self.project().collection)),
1927 }
1928 }
1929 }
1930}
1931
1932pin_project! {
1933 /// Future for the [`StreamExt::try_collect()`] method.
1934 #[derive(Debug)]
1935 #[must_use = "futures do nothing unless you `.await` or poll them"]
1936 pub struct TryCollectFuture<S, C> {
1937 #[pin]
1938 stream: S,
1939 items: C,
1940 }
1941}
1942
1943impl<T, E, S, C> Future for TryCollectFuture<S, C>
1944where
1945 S: Stream<Item = Result<T, E>>,
1946 C: Default + Extend<T>,
1947{
1948 type Output = Result<C, E>;
1949
1950 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1951 let mut this: Projection<'_, S, C> = self.project();
1952 Poll::Ready(Ok(loop {
1953 match ready!(this.stream.as_mut().poll_next(cx)?) {
1954 Some(x: T) => this.items.extend(iter:Some(x)),
1955 None => break mem::take(dest:this.items),
1956 }
1957 }))
1958 }
1959}
1960
1961pin_project! {
1962 /// Future for the [`StreamExt::partition()`] method.
1963 #[derive(Debug)]
1964 #[must_use = "futures do nothing unless you `.await` or poll them"]
1965 pub struct PartitionFuture<S, P, B> {
1966 #[pin]
1967 stream: S,
1968 predicate: P,
1969 res: Option<(B, B)>,
1970 }
1971}
1972
1973impl<S, P, B> Future for PartitionFuture<S, P, B>
1974where
1975 S: Stream + Sized,
1976 P: FnMut(&S::Item) -> bool,
1977 B: Default + Extend<S::Item>,
1978{
1979 type Output = (B, B);
1980
1981 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1982 let mut this: Projection<'_, S, P, B> = self.project();
1983 loop {
1984 match ready!(this.stream.as_mut().poll_next(cx)) {
1985 Some(v: ::Item) => {
1986 let res: &mut (B, B) = this.res.as_mut().unwrap();
1987 if (this.predicate)(&v) {
1988 res.0.extend(iter:Some(v))
1989 } else {
1990 res.1.extend(iter:Some(v))
1991 }
1992 }
1993 None => return Poll::Ready(this.res.take().unwrap()),
1994 }
1995 }
1996 }
1997}
1998
1999pin_project! {
2000 /// Future for the [`StreamExt::fold()`] method.
2001 #[derive(Debug)]
2002 #[must_use = "futures do nothing unless you `.await` or poll them"]
2003 pub struct FoldFuture<S, F, T> {
2004 #[pin]
2005 stream: S,
2006 f: F,
2007 acc: Option<T>,
2008 }
2009}
2010
2011impl<S, F, T> Future for FoldFuture<S, F, T>
2012where
2013 S: Stream,
2014 F: FnMut(T, S::Item) -> T,
2015{
2016 type Output = T;
2017
2018 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2019 let mut this: Projection<'_, S, F, T> = self.project();
2020 loop {
2021 match ready!(this.stream.as_mut().poll_next(cx)) {
2022 Some(v: ::Item) => {
2023 let old: T = this.acc.take().unwrap();
2024 let new: T = (this.f)(old, v);
2025 *this.acc = Some(new);
2026 }
2027 None => return Poll::Ready(this.acc.take().unwrap()),
2028 }
2029 }
2030 }
2031}
2032
2033/// Future for the [`StreamExt::try_fold()`] method.
2034#[derive(Debug)]
2035#[must_use = "futures do nothing unless you `.await` or poll them"]
2036pub struct TryFoldFuture<'a, S, F, B> {
2037 stream: &'a mut S,
2038 f: F,
2039 acc: Option<B>,
2040}
2041
2042impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
2043
2044impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
2045where
2046 S: Stream<Item = Result<T, E>> + Unpin,
2047 F: FnMut(B, T) -> Result<B, E>,
2048{
2049 type Output = Result<B, E>;
2050
2051 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2052 loop {
2053 match ready!(self.stream.poll_next(cx)) {
2054 Some(Err(e: E)) => return Poll::Ready(Err(e)),
2055 Some(Ok(t: T)) => {
2056 let old: B = self.acc.take().unwrap();
2057 let new: Result = (&mut self.f)(old, t);
2058
2059 match new {
2060 Ok(t: B) => self.acc = Some(t),
2061 Err(e: E) => return Poll::Ready(Err(e)),
2062 }
2063 }
2064 None => return Poll::Ready(Ok(self.acc.take().unwrap())),
2065 }
2066 }
2067 }
2068}
2069
2070pin_project! {
2071 /// Stream for the [`StreamExt::scan()`] method.
2072 #[derive(Clone, Debug)]
2073 #[must_use = "streams do nothing unless polled"]
2074 pub struct Scan<S, St, F> {
2075 #[pin]
2076 stream: S,
2077 state_f: (St, F),
2078 }
2079}
2080
2081impl<S, St, F, B> Stream for Scan<S, St, F>
2082where
2083 S: Stream,
2084 F: FnMut(&mut St, S::Item) -> Option<B>,
2085{
2086 type Item = B;
2087
2088 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
2089 let mut this: Projection<'_, S, St, F> = self.project();
2090 this.stream.as_mut().poll_next(cx).map(|item: Option<::Item>| {
2091 item.and_then(|item: ::Item| {
2092 let (state: &mut St, f: &mut F) = this.state_f;
2093 f(state, item)
2094 })
2095 })
2096 }
2097}
2098
2099pin_project! {
2100 /// Stream for the [`StreamExt::fuse()`] method.
2101 #[derive(Clone, Debug)]
2102 #[must_use = "streams do nothing unless polled"]
2103 pub struct Fuse<S> {
2104 #[pin]
2105 stream: S,
2106 done: bool,
2107 }
2108}
2109
2110impl<S: Stream> Stream for Fuse<S> {
2111 type Item = S::Item;
2112
2113 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2114 let this: Projection<'_, S> = self.project();
2115
2116 if *this.done {
2117 Poll::Ready(None)
2118 } else {
2119 let next: Option<::Item> = ready!(this.stream.poll_next(cx));
2120 if next.is_none() {
2121 *this.done = true;
2122 }
2123 Poll::Ready(next)
2124 }
2125 }
2126}
2127
2128pin_project! {
2129 /// Stream for the [`StreamExt::map()`] method.
2130 #[derive(Clone, Debug)]
2131 #[must_use = "streams do nothing unless polled"]
2132 pub struct Map<S, F> {
2133 #[pin]
2134 stream: S,
2135 f: F,
2136 }
2137}
2138
2139impl<S, F, T> Stream for Map<S, F>
2140where
2141 S: Stream,
2142 F: FnMut(S::Item) -> T,
2143{
2144 type Item = T;
2145
2146 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2147 let this: Projection<'_, S, F> = self.project();
2148 let next: Option<::Item> = ready!(this.stream.poll_next(cx));
2149 Poll::Ready(next.map(this.f))
2150 }
2151
2152 fn size_hint(&self) -> (usize, Option<usize>) {
2153 self.stream.size_hint()
2154 }
2155}
2156
2157pin_project! {
2158 /// Stream for the [`StreamExt::flat_map()`] method.
2159 #[derive(Clone, Debug)]
2160 #[must_use = "streams do nothing unless polled"]
2161 pub struct FlatMap<S, U, F> {
2162 #[pin]
2163 stream: Map<S, F>,
2164 #[pin]
2165 inner_stream: Option<U>,
2166 }
2167}
2168
2169impl<S, U, F> Stream for FlatMap<S, U, F>
2170where
2171 S: Stream,
2172 U: Stream,
2173 F: FnMut(S::Item) -> U,
2174{
2175 type Item = U::Item;
2176
2177 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2178 let mut this: Projection<'_, S, U, F> = self.project();
2179 loop {
2180 if let Some(inner: Pin<&mut U>) = this.inner_stream.as_mut().as_pin_mut() {
2181 match ready!(inner.poll_next(cx)) {
2182 Some(item: ::Item) => return Poll::Ready(Some(item)),
2183 None => this.inner_stream.set(None),
2184 }
2185 }
2186
2187 match ready!(this.stream.as_mut().poll_next(cx)) {
2188 Some(stream: U) => this.inner_stream.set(Some(stream)),
2189 None => return Poll::Ready(None),
2190 }
2191 }
2192 }
2193}
2194
2195pin_project! {
2196 /// Stream for the [`StreamExt::flatten()`] method.
2197 #[derive(Clone, Debug)]
2198 #[must_use = "streams do nothing unless polled"]
2199 pub struct Flatten<S: Stream> {
2200 #[pin]
2201 stream: S,
2202 #[pin]
2203 inner_stream: Option<S::Item>,
2204 }
2205}
2206
2207impl<S, U> Stream for Flatten<S>
2208where
2209 S: Stream<Item = U>,
2210 U: Stream,
2211{
2212 type Item = U::Item;
2213
2214 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2215 let mut this: Projection<'_, S> = self.project();
2216 loop {
2217 if let Some(inner: Pin<&mut U>) = this.inner_stream.as_mut().as_pin_mut() {
2218 match ready!(inner.poll_next(cx)) {
2219 Some(item: ::Item) => return Poll::Ready(Some(item)),
2220 None => this.inner_stream.set(None),
2221 }
2222 }
2223
2224 match ready!(this.stream.as_mut().poll_next(cx)) {
2225 Some(inner: U) => this.inner_stream.set(Some(inner)),
2226 None => return Poll::Ready(None),
2227 }
2228 }
2229 }
2230}
2231
2232pin_project! {
2233 /// Stream for the [`StreamExt::then()`] method.
2234 #[derive(Clone, Debug)]
2235 #[must_use = "streams do nothing unless polled"]
2236 pub struct Then<S, F, Fut> {
2237 #[pin]
2238 stream: S,
2239 #[pin]
2240 future: Option<Fut>,
2241 f: F,
2242 }
2243}
2244
2245impl<S, F, Fut> Stream for Then<S, F, Fut>
2246where
2247 S: Stream,
2248 F: FnMut(S::Item) -> Fut,
2249 Fut: Future,
2250{
2251 type Item = Fut::Output;
2252
2253 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2254 let mut this = self.project();
2255
2256 loop {
2257 if let Some(fut) = this.future.as_mut().as_pin_mut() {
2258 let item = ready!(fut.poll(cx));
2259 this.future.set(None);
2260 return Poll::Ready(Some(item));
2261 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2262 this.future.set(Some((this.f)(item)));
2263 } else {
2264 return Poll::Ready(None);
2265 }
2266 }
2267 }
2268
2269 fn size_hint(&self) -> (usize, Option<usize>) {
2270 let future_len = self.future.is_some() as usize;
2271 let (lower, upper) = self.stream.size_hint();
2272 let lower = lower.saturating_add(future_len);
2273 let upper = upper.and_then(|u| u.checked_add(future_len));
2274 (lower, upper)
2275 }
2276}
2277
2278pin_project! {
2279 /// Stream for the [`StreamExt::filter()`] method.
2280 #[derive(Clone, Debug)]
2281 #[must_use = "streams do nothing unless polled"]
2282 pub struct Filter<S, P> {
2283 #[pin]
2284 stream: S,
2285 predicate: P,
2286 }
2287}
2288
2289impl<S, P> Stream for Filter<S, P>
2290where
2291 S: Stream,
2292 P: FnMut(&S::Item) -> bool,
2293{
2294 type Item = S::Item;
2295
2296 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2297 let mut this: Projection<'_, S, P> = self.project();
2298 loop {
2299 match ready!(this.stream.as_mut().poll_next(cx)) {
2300 None => return Poll::Ready(None),
2301 Some(v: ::Item) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2302 Some(_) => {}
2303 }
2304 }
2305 }
2306}
2307
2308/// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2309///
2310/// # Examples
2311///
2312/// ```
2313/// use futures_lite::stream::{self, once, pending, StreamExt};
2314///
2315/// # spin_on::spin_on(async {
2316/// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2317/// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2318///
2319/// // The first stream wins.
2320/// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2321/// # })
2322/// ```
2323pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2324where
2325 S1: Stream<Item = T>,
2326 S2: Stream<Item = T>,
2327{
2328 Or { stream1, stream2 }
2329}
2330
2331pin_project! {
2332 /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2333 #[derive(Clone, Debug)]
2334 #[must_use = "streams do nothing unless polled"]
2335 pub struct Or<S1, S2> {
2336 #[pin]
2337 stream1: S1,
2338 #[pin]
2339 stream2: S2,
2340 }
2341}
2342
2343impl<T, S1, S2> Stream for Or<S1, S2>
2344where
2345 S1: Stream<Item = T>,
2346 S2: Stream<Item = T>,
2347{
2348 type Item = T;
2349
2350 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2351 let mut this: Projection<'_, S1, S2> = self.project();
2352
2353 if let Poll::Ready(Some(t: T)) = this.stream1.as_mut().poll_next(cx) {
2354 return Poll::Ready(Some(t));
2355 }
2356 this.stream2.as_mut().poll_next(cx)
2357 }
2358}
2359
2360/// Merges two streams, with no preference for either stream when both are ready.
2361///
2362/// # Examples
2363///
2364/// ```
2365/// use futures_lite::stream::{self, once, pending, StreamExt};
2366///
2367/// # spin_on::spin_on(async {
2368/// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2369/// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2370///
2371/// // One of the two stream is randomly chosen as the winner.
2372/// let res = stream::race(once(1), once(2)).next().await;
2373/// # })
2374#[cfg(feature = "std")]
2375pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2376where
2377 S1: Stream<Item = T>,
2378 S2: Stream<Item = T>,
2379{
2380 Race { stream1, stream2 }
2381}
2382
2383#[cfg(feature = "std")]
2384pin_project! {
2385 /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2386 #[derive(Clone, Debug)]
2387 #[must_use = "streams do nothing unless polled"]
2388 pub struct Race<S1, S2> {
2389 #[pin]
2390 stream1: S1,
2391 #[pin]
2392 stream2: S2,
2393 }
2394}
2395
2396#[cfg(feature = "std")]
2397impl<T, S1, S2> Stream for Race<S1, S2>
2398where
2399 S1: Stream<Item = T>,
2400 S2: Stream<Item = T>,
2401{
2402 type Item = T;
2403
2404 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2405 let mut this: Projection<'_, S1, S2> = self.project();
2406
2407 if fastrand::bool() {
2408 if let Poll::Ready(Some(t: T)) = this.stream1.as_mut().poll_next(cx) {
2409 return Poll::Ready(Some(t));
2410 }
2411 if let Poll::Ready(Some(t: T)) = this.stream2.as_mut().poll_next(cx) {
2412 return Poll::Ready(Some(t));
2413 }
2414 } else {
2415 if let Poll::Ready(Some(t: T)) = this.stream2.as_mut().poll_next(cx) {
2416 return Poll::Ready(Some(t));
2417 }
2418 if let Poll::Ready(Some(t: T)) = this.stream1.as_mut().poll_next(cx) {
2419 return Poll::Ready(Some(t));
2420 }
2421 }
2422 Poll::Pending
2423 }
2424}
2425
2426pin_project! {
2427 /// Stream for the [`StreamExt::filter_map()`] method.
2428 #[derive(Clone, Debug)]
2429 #[must_use = "streams do nothing unless polled"]
2430 pub struct FilterMap<S, F> {
2431 #[pin]
2432 stream: S,
2433 f: F,
2434 }
2435}
2436
2437impl<S, F, T> Stream for FilterMap<S, F>
2438where
2439 S: Stream,
2440 F: FnMut(S::Item) -> Option<T>,
2441{
2442 type Item = T;
2443
2444 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2445 let mut this: Projection<'_, S, F> = self.project();
2446 loop {
2447 match ready!(this.stream.as_mut().poll_next(cx)) {
2448 None => return Poll::Ready(None),
2449 Some(v: ::Item) => {
2450 if let Some(t: T) = (this.f)(v) {
2451 return Poll::Ready(Some(t));
2452 }
2453 }
2454 }
2455 }
2456 }
2457}
2458
2459pin_project! {
2460 /// Stream for the [`StreamExt::take()`] method.
2461 #[derive(Clone, Debug)]
2462 #[must_use = "streams do nothing unless polled"]
2463 pub struct Take<S> {
2464 #[pin]
2465 stream: S,
2466 n: usize,
2467 }
2468}
2469
2470impl<S: Stream> Stream for Take<S> {
2471 type Item = S::Item;
2472
2473 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2474 let this: Projection<'_, S> = self.project();
2475
2476 if *this.n == 0 {
2477 Poll::Ready(None)
2478 } else {
2479 let next: Option<::Item> = ready!(this.stream.poll_next(cx));
2480 match next {
2481 Some(_) => *this.n -= 1,
2482 None => *this.n = 0,
2483 }
2484 Poll::Ready(next)
2485 }
2486 }
2487}
2488
2489pin_project! {
2490 /// Stream for the [`StreamExt::take_while()`] method.
2491 #[derive(Clone, Debug)]
2492 #[must_use = "streams do nothing unless polled"]
2493 pub struct TakeWhile<S, P> {
2494 #[pin]
2495 stream: S,
2496 predicate: P,
2497 }
2498}
2499
2500impl<S, P> Stream for TakeWhile<S, P>
2501where
2502 S: Stream,
2503 P: FnMut(&S::Item) -> bool,
2504{
2505 type Item = S::Item;
2506
2507 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2508 let this: Projection<'_, S, P> = self.project();
2509
2510 match ready!(this.stream.poll_next(cx)) {
2511 Some(v: ::Item) => {
2512 if (this.predicate)(&v) {
2513 Poll::Ready(Some(v))
2514 } else {
2515 Poll::Ready(None)
2516 }
2517 }
2518 None => Poll::Ready(None),
2519 }
2520 }
2521}
2522
2523pin_project! {
2524 /// Stream for the [`StreamExt::skip()`] method.
2525 #[derive(Clone, Debug)]
2526 #[must_use = "streams do nothing unless polled"]
2527 pub struct Skip<S> {
2528 #[pin]
2529 stream: S,
2530 n: usize,
2531 }
2532}
2533
2534impl<S: Stream> Stream for Skip<S> {
2535 type Item = S::Item;
2536
2537 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2538 let mut this: Projection<'_, S> = self.project();
2539 loop {
2540 match ready!(this.stream.as_mut().poll_next(cx)) {
2541 Some(v: ::Item) => match *this.n {
2542 0 => return Poll::Ready(Some(v)),
2543 _ => *this.n -= 1,
2544 },
2545 None => return Poll::Ready(None),
2546 }
2547 }
2548 }
2549}
2550
2551pin_project! {
2552 /// Stream for the [`StreamExt::skip_while()`] method.
2553 #[derive(Clone, Debug)]
2554 #[must_use = "streams do nothing unless polled"]
2555 pub struct SkipWhile<S, P> {
2556 #[pin]
2557 stream: S,
2558 predicate: Option<P>,
2559 }
2560}
2561
2562impl<S, P> Stream for SkipWhile<S, P>
2563where
2564 S: Stream,
2565 P: FnMut(&S::Item) -> bool,
2566{
2567 type Item = S::Item;
2568
2569 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2570 let mut this: Projection<'_, S, P> = self.project();
2571 loop {
2572 match ready!(this.stream.as_mut().poll_next(cx)) {
2573 Some(v: ::Item) => match this.predicate {
2574 Some(p: &mut P) => {
2575 if !p(&v) {
2576 *this.predicate = None;
2577 return Poll::Ready(Some(v));
2578 }
2579 }
2580 None => return Poll::Ready(Some(v)),
2581 },
2582 None => return Poll::Ready(None),
2583 }
2584 }
2585 }
2586}
2587
2588pin_project! {
2589 /// Stream for the [`StreamExt::step_by()`] method.
2590 #[derive(Clone, Debug)]
2591 #[must_use = "streams do nothing unless polled"]
2592 pub struct StepBy<S> {
2593 #[pin]
2594 stream: S,
2595 step: usize,
2596 i: usize,
2597 }
2598}
2599
2600impl<S: Stream> Stream for StepBy<S> {
2601 type Item = S::Item;
2602
2603 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2604 let mut this: Projection<'_, S> = self.project();
2605 loop {
2606 match ready!(this.stream.as_mut().poll_next(cx)) {
2607 Some(v: ::Item) => {
2608 if *this.i == 0 {
2609 *this.i = *this.step - 1;
2610 return Poll::Ready(Some(v));
2611 } else {
2612 *this.i -= 1;
2613 }
2614 }
2615 None => return Poll::Ready(None),
2616 }
2617 }
2618 }
2619}
2620
2621pin_project! {
2622 /// Stream for the [`StreamExt::chain()`] method.
2623 #[derive(Clone, Debug)]
2624 #[must_use = "streams do nothing unless polled"]
2625 pub struct Chain<S, U> {
2626 #[pin]
2627 first: Fuse<S>,
2628 #[pin]
2629 second: Fuse<U>,
2630 }
2631}
2632
2633impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2634 type Item = S::Item;
2635
2636 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2637 let mut this = self.project();
2638
2639 if !this.first.done {
2640 let next = ready!(this.first.as_mut().poll_next(cx));
2641 if let Some(next) = next {
2642 return Poll::Ready(Some(next));
2643 }
2644 }
2645
2646 if !this.second.done {
2647 let next = ready!(this.second.as_mut().poll_next(cx));
2648 if let Some(next) = next {
2649 return Poll::Ready(Some(next));
2650 }
2651 }
2652
2653 if this.first.done && this.second.done {
2654 Poll::Ready(None)
2655 } else {
2656 Poll::Pending
2657 }
2658 }
2659}
2660
2661pin_project! {
2662 /// Stream for the [`StreamExt::cloned()`] method.
2663 #[derive(Clone, Debug)]
2664 #[must_use = "streams do nothing unless polled"]
2665 pub struct Cloned<S> {
2666 #[pin]
2667 stream: S,
2668 }
2669}
2670
2671impl<'a, S, T: 'a> Stream for Cloned<S>
2672where
2673 S: Stream<Item = &'a T>,
2674 T: Clone,
2675{
2676 type Item = T;
2677
2678 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2679 let this: Projection<'_, S> = self.project();
2680 let next: Option<&T> = ready!(this.stream.poll_next(cx));
2681 Poll::Ready(next.cloned())
2682 }
2683}
2684
2685pin_project! {
2686 /// Stream for the [`StreamExt::copied()`] method.
2687 #[derive(Clone, Debug)]
2688 #[must_use = "streams do nothing unless polled"]
2689 pub struct Copied<S> {
2690 #[pin]
2691 stream: S,
2692 }
2693}
2694
2695impl<'a, S, T: 'a> Stream for Copied<S>
2696where
2697 S: Stream<Item = &'a T>,
2698 T: Copy,
2699{
2700 type Item = T;
2701
2702 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2703 let this: Projection<'_, S> = self.project();
2704 let next: Option<&T> = ready!(this.stream.poll_next(cx));
2705 Poll::Ready(next.copied())
2706 }
2707}
2708
2709pin_project! {
2710 /// Stream for the [`StreamExt::cycle()`] method.
2711 #[derive(Clone, Debug)]
2712 #[must_use = "streams do nothing unless polled"]
2713 pub struct Cycle<S> {
2714 orig: S,
2715 #[pin]
2716 stream: S,
2717 }
2718}
2719
2720impl<S> Stream for Cycle<S>
2721where
2722 S: Stream + Clone,
2723{
2724 type Item = S::Item;
2725
2726 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2727 match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
2728 Some(item: ::Item) => Poll::Ready(Some(item)),
2729 None => {
2730 let new: S = self.as_mut().orig.clone();
2731 self.as_mut().project().stream.set(new);
2732 self.project().stream.poll_next(cx)
2733 }
2734 }
2735 }
2736}
2737
2738pin_project! {
2739 /// Stream for the [`StreamExt::enumerate()`] method.
2740 #[derive(Clone, Debug)]
2741 #[must_use = "streams do nothing unless polled"]
2742 pub struct Enumerate<S> {
2743 #[pin]
2744 stream: S,
2745 i: usize,
2746 }
2747}
2748
2749impl<S> Stream for Enumerate<S>
2750where
2751 S: Stream,
2752{
2753 type Item = (usize, S::Item);
2754
2755 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2756 let this: Projection<'_, S> = self.project();
2757
2758 match ready!(this.stream.poll_next(cx)) {
2759 Some(v: ::Item) => {
2760 let ret: (usize, ::Item) = (*this.i, v);
2761 *this.i += 1;
2762 Poll::Ready(Some(ret))
2763 }
2764 None => Poll::Ready(None),
2765 }
2766 }
2767}
2768
2769pin_project! {
2770 /// Stream for the [`StreamExt::inspect()`] method.
2771 #[derive(Clone, Debug)]
2772 #[must_use = "streams do nothing unless polled"]
2773 pub struct Inspect<S, F> {
2774 #[pin]
2775 stream: S,
2776 f: F,
2777 }
2778}
2779
2780impl<S, F> Stream for Inspect<S, F>
2781where
2782 S: Stream,
2783 F: FnMut(&S::Item),
2784{
2785 type Item = S::Item;
2786
2787 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2788 let mut this: Projection<'_, S, F> = self.project();
2789 let next: Option<::Item> = ready!(this.stream.as_mut().poll_next(cx));
2790 if let Some(x: &::Item) = &next {
2791 (this.f)(x);
2792 }
2793 Poll::Ready(next)
2794 }
2795}
2796
2797/// Future for the [`StreamExt::nth()`] method.
2798#[derive(Debug)]
2799#[must_use = "futures do nothing unless you `.await` or poll them"]
2800pub struct NthFuture<'a, S: ?Sized> {
2801 stream: &'a mut S,
2802 n: usize,
2803}
2804
2805impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
2806
2807impl<'a, S> Future for NthFuture<'a, S>
2808where
2809 S: Stream + Unpin + ?Sized,
2810{
2811 type Output = Option<S::Item>;
2812
2813 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2814 loop {
2815 match ready!(self.stream.poll_next(cx)) {
2816 Some(v: ::Item) => match self.n {
2817 0 => return Poll::Ready(Some(v)),
2818 _ => self.n -= 1,
2819 },
2820 None => return Poll::Ready(None),
2821 }
2822 }
2823 }
2824}
2825
2826pin_project! {
2827 /// Future for the [`StreamExt::last()`] method.
2828 #[derive(Debug)]
2829 #[must_use = "futures do nothing unless you `.await` or poll them"]
2830 pub struct LastFuture<S: Stream> {
2831 #[pin]
2832 stream: S,
2833 last: Option<S::Item>,
2834 }
2835}
2836
2837impl<S: Stream> Future for LastFuture<S> {
2838 type Output = Option<S::Item>;
2839
2840 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2841 let mut this: Projection<'_, S> = self.project();
2842 loop {
2843 match ready!(this.stream.as_mut().poll_next(cx)) {
2844 Some(new: ::Item) => *this.last = Some(new),
2845 None => return Poll::Ready(this.last.take()),
2846 }
2847 }
2848 }
2849}
2850
2851/// Future for the [`StreamExt::find()`] method.
2852#[derive(Debug)]
2853#[must_use = "futures do nothing unless you `.await` or poll them"]
2854pub struct FindFuture<'a, S: ?Sized, P> {
2855 stream: &'a mut S,
2856 predicate: P,
2857}
2858
2859impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
2860
2861impl<'a, S, P> Future for FindFuture<'a, S, P>
2862where
2863 S: Stream + Unpin + ?Sized,
2864 P: FnMut(&S::Item) -> bool,
2865{
2866 type Output = Option<S::Item>;
2867
2868 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2869 loop {
2870 match ready!(self.stream.poll_next(cx)) {
2871 Some(v: ::Item) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
2872 Some(_) => {}
2873 None => return Poll::Ready(None),
2874 }
2875 }
2876 }
2877}
2878
2879/// Future for the [`StreamExt::find_map()`] method.
2880#[derive(Debug)]
2881#[must_use = "futures do nothing unless you `.await` or poll them"]
2882pub struct FindMapFuture<'a, S: ?Sized, F> {
2883 stream: &'a mut S,
2884 f: F,
2885}
2886
2887impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
2888
2889impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
2890where
2891 S: Stream + Unpin + ?Sized,
2892 F: FnMut(S::Item) -> Option<B>,
2893{
2894 type Output = Option<B>;
2895
2896 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2897 loop {
2898 match ready!(self.stream.poll_next(cx)) {
2899 Some(v: ::Item) => {
2900 if let Some(v: B) = (&mut self.f)(v) {
2901 return Poll::Ready(Some(v));
2902 }
2903 }
2904 None => return Poll::Ready(None),
2905 }
2906 }
2907 }
2908}
2909
2910/// Future for the [`StreamExt::position()`] method.
2911#[derive(Debug)]
2912#[must_use = "futures do nothing unless you `.await` or poll them"]
2913pub struct PositionFuture<'a, S: ?Sized, P> {
2914 stream: &'a mut S,
2915 predicate: P,
2916 index: usize,
2917}
2918
2919impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
2920
2921impl<'a, S, P> Future for PositionFuture<'a, S, P>
2922where
2923 S: Stream + Unpin + ?Sized,
2924 P: FnMut(S::Item) -> bool,
2925{
2926 type Output = Option<usize>;
2927
2928 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2929 loop {
2930 match ready!(self.stream.poll_next(cx)) {
2931 Some(v: ::Item) => {
2932 if (&mut self.predicate)(v) {
2933 return Poll::Ready(Some(self.index));
2934 } else {
2935 self.index += 1;
2936 }
2937 }
2938 None => return Poll::Ready(None),
2939 }
2940 }
2941 }
2942}
2943
2944/// Future for the [`StreamExt::all()`] method.
2945#[derive(Debug)]
2946#[must_use = "futures do nothing unless you `.await` or poll them"]
2947pub struct AllFuture<'a, S: ?Sized, P> {
2948 stream: &'a mut S,
2949 predicate: P,
2950}
2951
2952impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
2953
2954impl<S, P> Future for AllFuture<'_, S, P>
2955where
2956 S: Stream + Unpin + ?Sized,
2957 P: FnMut(S::Item) -> bool,
2958{
2959 type Output = bool;
2960
2961 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2962 loop {
2963 match ready!(self.stream.poll_next(cx)) {
2964 Some(v: ::Item) => {
2965 if !(&mut self.predicate)(v) {
2966 return Poll::Ready(false);
2967 }
2968 }
2969 None => return Poll::Ready(true),
2970 }
2971 }
2972 }
2973}
2974
2975/// Future for the [`StreamExt::any()`] method.
2976#[derive(Debug)]
2977#[must_use = "futures do nothing unless you `.await` or poll them"]
2978pub struct AnyFuture<'a, S: ?Sized, P> {
2979 stream: &'a mut S,
2980 predicate: P,
2981}
2982
2983impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
2984
2985impl<S, P> Future for AnyFuture<'_, S, P>
2986where
2987 S: Stream + Unpin + ?Sized,
2988 P: FnMut(S::Item) -> bool,
2989{
2990 type Output = bool;
2991
2992 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2993 loop {
2994 match ready!(self.stream.poll_next(cx)) {
2995 Some(v: ::Item) => {
2996 if (&mut self.predicate)(v) {
2997 return Poll::Ready(true);
2998 }
2999 }
3000 None => return Poll::Ready(false),
3001 }
3002 }
3003 }
3004}
3005
3006pin_project! {
3007 /// Future for the [`StreamExt::for_each()`] method.
3008 #[derive(Debug)]
3009 #[must_use = "futures do nothing unless you `.await` or poll them"]
3010 pub struct ForEachFuture<S, F> {
3011 #[pin]
3012 stream: S,
3013 f: F,
3014 }
3015}
3016
3017impl<S, F> Future for ForEachFuture<S, F>
3018where
3019 S: Stream,
3020 F: FnMut(S::Item),
3021{
3022 type Output = ();
3023
3024 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3025 let mut this: Projection<'_, S, F> = self.project();
3026 loop {
3027 match ready!(this.stream.as_mut().poll_next(cx)) {
3028 Some(v: ::Item) => (this.f)(v),
3029 None => return Poll::Ready(()),
3030 }
3031 }
3032 }
3033}
3034
3035/// Future for the [`StreamExt::try_for_each()`] method.
3036#[derive(Debug)]
3037#[must_use = "futures do nothing unless you `.await` or poll them"]
3038pub struct TryForEachFuture<'a, S: ?Sized, F> {
3039 stream: &'a mut S,
3040 f: F,
3041}
3042
3043impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
3044
3045impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
3046where
3047 S: Stream + Unpin + ?Sized,
3048 F: FnMut(S::Item) -> Result<(), E>,
3049{
3050 type Output = Result<(), E>;
3051
3052 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3053 loop {
3054 match ready!(self.stream.poll_next(cx)) {
3055 None => return Poll::Ready(Ok(())),
3056 Some(v: ::Item) => (&mut self.f)(v)?,
3057 }
3058 }
3059 }
3060}
3061
3062pin_project! {
3063 /// Stream for the [`StreamExt::zip()`] method.
3064 #[derive(Clone, Debug)]
3065 #[must_use = "streams do nothing unless polled"]
3066 pub struct Zip<A: Stream, B> {
3067 item_slot: Option<A::Item>,
3068 #[pin]
3069 first: A,
3070 #[pin]
3071 second: B,
3072 }
3073}
3074
3075impl<A: Stream, B: Stream> Stream for Zip<A, B> {
3076 type Item = (A::Item, B::Item);
3077
3078 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3079 let this: Projection<'_, A, B> = self.project();
3080
3081 if this.item_slot.is_none() {
3082 match this.first.poll_next(cx) {
3083 Poll::Pending => return Poll::Pending,
3084 Poll::Ready(None) => return Poll::Ready(None),
3085 Poll::Ready(Some(item: ::Item)) => *this.item_slot = Some(item),
3086 }
3087 }
3088
3089 let second_item: Option<::Item> = ready!(this.second.poll_next(cx));
3090 let first_item: ::Item = this.item_slot.take().unwrap();
3091 Poll::Ready(second_item.map(|second_item: ::Item| (first_item, second_item)))
3092 }
3093}
3094
3095pin_project! {
3096 /// Future for the [`StreamExt::unzip()`] method.
3097 #[derive(Debug)]
3098 #[must_use = "futures do nothing unless you `.await` or poll them"]
3099 pub struct UnzipFuture<S, FromA, FromB> {
3100 #[pin]
3101 stream: S,
3102 res: Option<(FromA, FromB)>,
3103 }
3104}
3105
3106impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3107where
3108 S: Stream<Item = (A, B)>,
3109 FromA: Default + Extend<A>,
3110 FromB: Default + Extend<B>,
3111{
3112 type Output = (FromA, FromB);
3113
3114 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3115 let mut this: Projection<'_, S, FromA, …> = self.project();
3116
3117 loop {
3118 match ready!(this.stream.as_mut().poll_next(cx)) {
3119 Some((a: A, b: B)) => {
3120 let res: &mut (FromA, FromB) = this.res.as_mut().unwrap();
3121 res.0.extend(iter:Some(a));
3122 res.1.extend(iter:Some(b));
3123 }
3124 None => return Poll::Ready(this.res.take().unwrap()),
3125 }
3126 }
3127 }
3128}
3129