1//! Combinators for the [`Stream`] trait.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::stream::{self, StreamExt};
7//!
8//! # spin_on::spin_on(async {
9//! let mut s = stream::iter(vec![1, 2, 3]);
10//!
11//! assert_eq!(s.next().await, Some(1));
12//! assert_eq!(s.next().await, Some(2));
13//! assert_eq!(s.next().await, Some(3));
14//! assert_eq!(s.next().await, None);
15//! # });
16//! ```
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19extern crate alloc;
20
21#[doc(no_inline)]
22pub use futures_core::stream::Stream;
23
24#[cfg(all(not(feature = "std"), feature = "alloc"))]
25use alloc::boxed::Box;
26
27use core::fmt;
28use core::future::Future;
29use core::marker::PhantomData;
30use core::mem;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33
34#[cfg(feature = "race")]
35use fastrand::Rng;
36
37use pin_project_lite::pin_project;
38
39use crate::ready;
40
41/// Converts a stream into a blocking iterator.
42///
43/// # Examples
44///
45/// ```
46/// use futures_lite::{pin, stream};
47///
48/// let stream = stream::once(7);
49/// pin!(stream);
50///
51/// let mut iter = stream::block_on(stream);
52/// assert_eq!(iter.next(), Some(7));
53/// assert_eq!(iter.next(), None);
54/// ```
55#[cfg(feature = "std")]
56pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
57 BlockOn(stream)
58}
59
60/// Iterator for the [`block_on()`] function.
61#[derive(Debug)]
62pub struct BlockOn<S>(S);
63
64#[cfg(feature = "std")]
65impl<S: Stream + Unpin> Iterator for BlockOn<S> {
66 type Item = S::Item;
67
68 fn next(&mut self) -> Option<Self::Item> {
69 crate::future::block_on(self.0.next())
70 }
71
72 fn size_hint(&self) -> (usize, Option<usize>) {
73 self.0.size_hint()
74 }
75
76 fn count(self) -> usize {
77 crate::future::block_on(self.0.count())
78 }
79
80 fn last(self) -> Option<Self::Item> {
81 crate::future::block_on(self.0.last())
82 }
83
84 fn nth(&mut self, n: usize) -> Option<Self::Item> {
85 crate::future::block_on(self.0.nth(n))
86 }
87
88 fn fold<B, F>(self, init: B, f: F) -> B
89 where
90 F: FnMut(B, Self::Item) -> B,
91 {
92 crate::future::block_on(self.0.fold(init, f))
93 }
94
95 fn for_each<F>(self, f: F) -> F::Output
96 where
97 F: FnMut(Self::Item),
98 {
99 crate::future::block_on(self.0.for_each(f))
100 }
101
102 fn all<F>(&mut self, f: F) -> bool
103 where
104 F: FnMut(Self::Item) -> bool,
105 {
106 crate::future::block_on(self.0.all(f))
107 }
108
109 fn any<F>(&mut self, f: F) -> bool
110 where
111 F: FnMut(Self::Item) -> bool,
112 {
113 crate::future::block_on(self.0.any(f))
114 }
115
116 fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
117 where
118 P: FnMut(&Self::Item) -> bool,
119 {
120 crate::future::block_on(self.0.find(predicate))
121 }
122
123 fn find_map<B, F>(&mut self, f: F) -> Option<B>
124 where
125 F: FnMut(Self::Item) -> Option<B>,
126 {
127 crate::future::block_on(self.0.find_map(f))
128 }
129
130 fn position<P>(&mut self, predicate: P) -> Option<usize>
131 where
132 P: FnMut(Self::Item) -> bool,
133 {
134 crate::future::block_on(self.0.position(predicate))
135 }
136}
137
138/// Creates an empty stream.
139///
140/// # Examples
141///
142/// ```
143/// use futures_lite::stream::{self, StreamExt};
144///
145/// # spin_on::spin_on(async {
146/// let mut s = stream::empty::<i32>();
147/// assert_eq!(s.next().await, None);
148/// # })
149/// ```
150pub fn empty<T>() -> Empty<T> {
151 Empty {
152 _marker: PhantomData,
153 }
154}
155
156/// Stream for the [`empty()`] function.
157#[derive(Clone, Debug)]
158#[must_use = "streams do nothing unless polled"]
159pub struct Empty<T> {
160 _marker: PhantomData<T>,
161}
162
163impl<T> Unpin for Empty<T> {}
164
165impl<T> Stream for Empty<T> {
166 type Item = T;
167
168 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
169 Poll::Ready(None)
170 }
171
172 fn size_hint(&self) -> (usize, Option<usize>) {
173 (0, Some(0))
174 }
175}
176
177/// Creates a stream from an iterator.
178///
179/// # Examples
180///
181/// ```
182/// use futures_lite::stream::{self, StreamExt};
183///
184/// # spin_on::spin_on(async {
185/// let mut s = stream::iter(vec![1, 2]);
186///
187/// assert_eq!(s.next().await, Some(1));
188/// assert_eq!(s.next().await, Some(2));
189/// assert_eq!(s.next().await, None);
190/// # })
191/// ```
192pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
193 Iter {
194 iter: iter.into_iter(),
195 }
196}
197
198/// Stream for the [`iter()`] function.
199#[derive(Clone, Debug)]
200#[must_use = "streams do nothing unless polled"]
201pub struct Iter<I> {
202 iter: I,
203}
204
205impl<I> Unpin for Iter<I> {}
206
207impl<I: Iterator> Stream for Iter<I> {
208 type Item = I::Item;
209
210 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
211 Poll::Ready(self.iter.next())
212 }
213
214 fn size_hint(&self) -> (usize, Option<usize>) {
215 self.iter.size_hint()
216 }
217}
218
219/// Creates a stream that yields a single item.
220///
221/// # Examples
222///
223/// ```
224/// use futures_lite::stream::{self, StreamExt};
225///
226/// # spin_on::spin_on(async {
227/// let mut s = stream::once(7);
228///
229/// assert_eq!(s.next().await, Some(7));
230/// assert_eq!(s.next().await, None);
231/// # })
232/// ```
233pub fn once<T>(t: T) -> Once<T> {
234 Once { value: Some(t) }
235}
236
237pin_project! {
238 /// Stream for the [`once()`] function.
239 #[derive(Clone, Debug)]
240 #[must_use = "streams do nothing unless polled"]
241 pub struct Once<T> {
242 value: Option<T>,
243 }
244}
245
246impl<T> Stream for Once<T> {
247 type Item = T;
248
249 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
250 Poll::Ready(self.project().value.take())
251 }
252
253 fn size_hint(&self) -> (usize, Option<usize>) {
254 if self.value.is_some() {
255 (1, Some(1))
256 } else {
257 (0, Some(0))
258 }
259 }
260}
261
262/// Creates a stream that is always pending.
263///
264/// # Examples
265///
266/// ```no_run
267/// use futures_lite::stream::{self, StreamExt};
268///
269/// # spin_on::spin_on(async {
270/// let mut s = stream::pending::<i32>();
271/// s.next().await;
272/// unreachable!();
273/// # })
274/// ```
275pub fn pending<T>() -> Pending<T> {
276 Pending {
277 _marker: PhantomData,
278 }
279}
280
281/// Stream for the [`pending()`] function.
282#[derive(Clone, Debug)]
283#[must_use = "streams do nothing unless polled"]
284pub struct Pending<T> {
285 _marker: PhantomData<T>,
286}
287
288impl<T> Unpin for Pending<T> {}
289
290impl<T> Stream for Pending<T> {
291 type Item = T;
292
293 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
294 Poll::Pending
295 }
296
297 fn size_hint(&self) -> (usize, Option<usize>) {
298 (0, Some(0))
299 }
300}
301
302/// Creates a stream from a function returning [`Poll`].
303///
304/// # Examples
305///
306/// ```
307/// use futures_lite::stream::{self, StreamExt};
308/// use std::task::{Context, Poll};
309///
310/// # spin_on::spin_on(async {
311/// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
312/// Poll::Ready(Some(7))
313/// }
314///
315/// assert_eq!(stream::poll_fn(f).next().await, Some(7));
316/// # })
317/// ```
318pub fn poll_fn<T, F>(f: F) -> PollFn<F>
319where
320 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
321{
322 PollFn { f }
323}
324
325/// Stream for the [`poll_fn()`] function.
326#[derive(Clone)]
327#[must_use = "streams do nothing unless polled"]
328pub struct PollFn<F> {
329 f: F,
330}
331
332impl<F> Unpin for PollFn<F> {}
333
334impl<F> fmt::Debug for PollFn<F> {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 f.debug_struct(name:"PollFn").finish()
337 }
338}
339
340impl<T, F> Stream for PollFn<F>
341where
342 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
343{
344 type Item = T;
345
346 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
347 (&mut self.f)(cx)
348 }
349}
350
351/// Creates an infinite stream that yields the same item repeatedly.
352///
353/// # Examples
354///
355/// ```
356/// use futures_lite::stream::{self, StreamExt};
357///
358/// # spin_on::spin_on(async {
359/// let mut s = stream::repeat(7);
360///
361/// assert_eq!(s.next().await, Some(7));
362/// assert_eq!(s.next().await, Some(7));
363/// # })
364/// ```
365pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
366 Repeat { item }
367}
368
369/// Stream for the [`repeat()`] function.
370#[derive(Clone, Debug)]
371#[must_use = "streams do nothing unless polled"]
372pub struct Repeat<T> {
373 item: T,
374}
375
376impl<T> Unpin for Repeat<T> {}
377
378impl<T: Clone> Stream for Repeat<T> {
379 type Item = T;
380
381 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
382 Poll::Ready(Some(self.item.clone()))
383 }
384
385 fn size_hint(&self) -> (usize, Option<usize>) {
386 (usize::max_value(), None)
387 }
388}
389
390/// Creates an infinite stream from a closure that generates items.
391///
392/// # Examples
393///
394/// ```
395/// use futures_lite::stream::{self, StreamExt};
396///
397/// # spin_on::spin_on(async {
398/// let mut s = stream::repeat_with(|| 7);
399///
400/// assert_eq!(s.next().await, Some(7));
401/// assert_eq!(s.next().await, Some(7));
402/// # })
403/// ```
404pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
405where
406 F: FnMut() -> T,
407{
408 RepeatWith { f: repeater }
409}
410
411/// Stream for the [`repeat_with()`] function.
412#[derive(Clone, Debug)]
413#[must_use = "streams do nothing unless polled"]
414pub struct RepeatWith<F> {
415 f: F,
416}
417
418impl<F> Unpin for RepeatWith<F> {}
419
420impl<T, F> Stream for RepeatWith<F>
421where
422 F: FnMut() -> T,
423{
424 type Item = T;
425
426 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
427 let item: T = (&mut self.f)();
428 Poll::Ready(Some(item))
429 }
430
431 fn size_hint(&self) -> (usize, Option<usize>) {
432 (usize::max_value(), None)
433 }
434}
435
436/// Creates a stream from a seed value and an async closure operating on it.
437///
438/// # Examples
439///
440/// ```
441/// use futures_lite::stream::{self, StreamExt};
442///
443/// # spin_on::spin_on(async {
444/// let s = stream::unfold(0, |mut n| async move {
445/// if n < 2 {
446/// let m = n + 1;
447/// Some((n, m))
448/// } else {
449/// None
450/// }
451/// });
452///
453/// let v: Vec<i32> = s.collect().await;
454/// assert_eq!(v, [0, 1]);
455/// # })
456/// ```
457pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
458where
459 F: FnMut(T) -> Fut,
460 Fut: Future<Output = Option<(Item, T)>>,
461{
462 Unfold {
463 f,
464 state: Some(seed),
465 fut: None,
466 }
467}
468
469pin_project! {
470 /// Stream for the [`unfold()`] function.
471 #[derive(Clone)]
472 #[must_use = "streams do nothing unless polled"]
473 pub struct Unfold<T, F, Fut> {
474 f: F,
475 state: Option<T>,
476 #[pin]
477 fut: Option<Fut>,
478 }
479}
480
481impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
482where
483 T: fmt::Debug,
484 Fut: fmt::Debug,
485{
486 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487 f&mut DebugStruct<'_, '_>.debug_struct("Unfold")
488 .field("state", &self.state)
489 .field(name:"fut", &self.fut)
490 .finish()
491 }
492}
493
494impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
495where
496 F: FnMut(T) -> Fut,
497 Fut: Future<Output = Option<(Item, T)>>,
498{
499 type Item = Item;
500
501 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
502 let mut this = self.project();
503
504 if let Some(state) = this.state.take() {
505 this.fut.set(Some((this.f)(state)));
506 }
507
508 let step = ready!(this
509 .fut
510 .as_mut()
511 .as_pin_mut()
512 .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
513 .poll(cx));
514 this.fut.set(None);
515
516 if let Some((item, next_state)) = step {
517 *this.state = Some(next_state);
518 Poll::Ready(Some(item))
519 } else {
520 Poll::Ready(None)
521 }
522 }
523}
524
525/// Creates a stream from a seed value and a fallible async closure operating on it.
526///
527/// # Examples
528///
529/// ```
530/// use futures_lite::stream::{self, StreamExt};
531///
532/// # spin_on::spin_on(async {
533/// let s = stream::try_unfold(0, |mut n| async move {
534/// if n < 2 {
535/// let m = n + 1;
536/// Ok(Some((n, m)))
537/// } else {
538/// std::io::Result::Ok(None)
539/// }
540/// });
541///
542/// let v: Vec<i32> = s.try_collect().await?;
543/// assert_eq!(v, [0, 1]);
544/// # std::io::Result::Ok(()) });
545/// ```
546pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
547where
548 F: FnMut(T) -> Fut,
549 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
550{
551 TryUnfold {
552 f,
553 state: Some(init),
554 fut: None,
555 }
556}
557
558pin_project! {
559 /// Stream for the [`try_unfold()`] function.
560 #[derive(Clone)]
561 #[must_use = "streams do nothing unless polled"]
562 pub struct TryUnfold<T, F, Fut> {
563 f: F,
564 state: Option<T>,
565 #[pin]
566 fut: Option<Fut>,
567 }
568}
569
570impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
571where
572 T: fmt::Debug,
573 Fut: fmt::Debug,
574{
575 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
576 f&mut DebugStruct<'_, '_>.debug_struct("TryUnfold")
577 .field("state", &self.state)
578 .field(name:"fut", &self.fut)
579 .finish()
580 }
581}
582
583impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
584where
585 F: FnMut(T) -> Fut,
586 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
587{
588 type Item = Result<Item, E>;
589
590 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
591 let mut this = self.project();
592
593 if let Some(state) = this.state.take() {
594 this.fut.set(Some((this.f)(state)));
595 }
596
597 match this.fut.as_mut().as_pin_mut() {
598 None => {
599 // The future previously errored
600 Poll::Ready(None)
601 }
602 Some(future) => {
603 let step = ready!(future.poll(cx));
604 this.fut.set(None);
605
606 match step {
607 Ok(Some((item, next_state))) => {
608 *this.state = Some(next_state);
609 Poll::Ready(Some(Ok(item)))
610 }
611 Ok(None) => Poll::Ready(None),
612 Err(e) => Poll::Ready(Some(Err(e))),
613 }
614 }
615 }
616 }
617}
618
619/// Creates a stream that invokes the given future as its first item, and then
620/// produces no more items.
621///
622/// # Example
623///
624/// ```
625/// use futures_lite::{stream, prelude::*};
626///
627/// # spin_on::spin_on(async {
628/// let mut stream = Box::pin(stream::once_future(async { 1 }));
629/// assert_eq!(stream.next().await, Some(1));
630/// assert_eq!(stream.next().await, None);
631/// # });
632/// ```
633pub fn once_future<F: Future>(future: F) -> OnceFuture<F> {
634 OnceFuture {
635 future: Some(future),
636 }
637}
638
639pin_project! {
640 /// Stream for the [`once_future()`] method.
641 #[derive(Debug)]
642 #[must_use = "futures do nothing unless you `.await` or poll them"]
643 pub struct OnceFuture<F> {
644 #[pin]
645 future: Option<F>,
646 }
647}
648
649impl<F: Future> Stream for OnceFuture<F> {
650 type Item = F::Output;
651
652 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
653 let mut this: Projection<'_, F> = self.project();
654
655 match this.future.as_mut().as_pin_mut().map(|f: Pin<&mut F>| f.poll(cx)) {
656 Some(Poll::Ready(t: ::Output)) => {
657 this.future.set(None);
658 Poll::Ready(Some(t))
659 }
660 Some(Poll::Pending) => Poll::Pending,
661 None => Poll::Ready(None),
662 }
663 }
664}
665
666/// Extension trait for [`Stream`].
667pub trait StreamExt: Stream {
668 /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
669 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
670 where
671 Self: Unpin,
672 {
673 Stream::poll_next(Pin::new(self), cx)
674 }
675
676 /// Retrieves the next item in the stream.
677 ///
678 /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
679 /// resume iteration after that.
680 ///
681 /// # Examples
682 ///
683 /// ```
684 /// use futures_lite::stream::{self, StreamExt};
685 ///
686 /// # spin_on::spin_on(async {
687 /// let mut s = stream::iter(1..=3);
688 ///
689 /// assert_eq!(s.next().await, Some(1));
690 /// assert_eq!(s.next().await, Some(2));
691 /// assert_eq!(s.next().await, Some(3));
692 /// assert_eq!(s.next().await, None);
693 /// # });
694 /// ```
695 fn next(&mut self) -> NextFuture<'_, Self>
696 where
697 Self: Unpin,
698 {
699 NextFuture { stream: self }
700 }
701
702 /// Retrieves the next item in the stream.
703 ///
704 /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
705 /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
706 ///
707 /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
708 ///
709 /// # Examples
710 ///
711 /// ```
712 /// use futures_lite::stream::{self, StreamExt};
713 ///
714 /// # spin_on::spin_on(async {
715 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
716 ///
717 /// assert_eq!(s.try_next().await, Ok(Some(1)));
718 /// assert_eq!(s.try_next().await, Ok(Some(2)));
719 /// assert_eq!(s.try_next().await, Err("error"));
720 /// assert_eq!(s.try_next().await, Ok(None));
721 /// # });
722 /// ```
723 fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
724 where
725 Self: Stream<Item = Result<T, E>> + Unpin,
726 {
727 TryNextFuture { stream: self }
728 }
729
730 /// Counts the number of items in the stream.
731 ///
732 /// # Examples
733 ///
734 /// ```
735 /// use futures_lite::stream::{self, StreamExt};
736 ///
737 /// # spin_on::spin_on(async {
738 /// let s1 = stream::iter(vec![0]);
739 /// let s2 = stream::iter(vec![1, 2, 3]);
740 ///
741 /// assert_eq!(s1.count().await, 1);
742 /// assert_eq!(s2.count().await, 3);
743 /// # });
744 /// ```
745 fn count(self) -> CountFuture<Self>
746 where
747 Self: Sized,
748 {
749 CountFuture {
750 stream: self,
751 count: 0,
752 }
753 }
754
755 /// Maps items of the stream to new values using a closure.
756 ///
757 /// # Examples
758 ///
759 /// ```
760 /// use futures_lite::stream::{self, StreamExt};
761 ///
762 /// # spin_on::spin_on(async {
763 /// let s = stream::iter(vec![1, 2, 3]);
764 /// let mut s = s.map(|x| 2 * x);
765 ///
766 /// assert_eq!(s.next().await, Some(2));
767 /// assert_eq!(s.next().await, Some(4));
768 /// assert_eq!(s.next().await, Some(6));
769 /// assert_eq!(s.next().await, None);
770 /// # });
771 /// ```
772 fn map<T, F>(self, f: F) -> Map<Self, F>
773 where
774 Self: Sized,
775 F: FnMut(Self::Item) -> T,
776 {
777 Map { stream: self, f }
778 }
779
780 /// Maps items to streams and then concatenates them.
781 ///
782 /// # Examples
783 ///
784 /// ```
785 /// use futures_lite::stream::{self, StreamExt};
786 ///
787 /// # spin_on::spin_on(async {
788 /// let words = stream::iter(vec!["one", "two"]);
789 ///
790 /// let s: String = words
791 /// .flat_map(|s| stream::iter(s.chars()))
792 /// .collect()
793 /// .await;
794 ///
795 /// assert_eq!(s, "onetwo");
796 /// # });
797 /// ```
798 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
799 where
800 Self: Sized,
801 U: Stream,
802 F: FnMut(Self::Item) -> U,
803 {
804 FlatMap {
805 stream: self.map(f),
806 inner_stream: None,
807 }
808 }
809
810 /// Concatenates inner streams.
811 ///
812 /// # Examples
813 ///
814 /// ```
815 /// use futures_lite::stream::{self, StreamExt};
816 ///
817 /// # spin_on::spin_on(async {
818 /// let s1 = stream::iter(vec![1, 2, 3]);
819 /// let s2 = stream::iter(vec![4, 5]);
820 ///
821 /// let s = stream::iter(vec![s1, s2]);
822 /// let v: Vec<_> = s.flatten().collect().await;
823 /// assert_eq!(v, [1, 2, 3, 4, 5]);
824 /// # });
825 /// ```
826 fn flatten(self) -> Flatten<Self>
827 where
828 Self: Sized,
829 Self::Item: Stream,
830 {
831 Flatten {
832 stream: self,
833 inner_stream: None,
834 }
835 }
836
837 /// Maps items of the stream to new values using an async closure.
838 ///
839 /// # Examples
840 ///
841 /// ```
842 /// use futures_lite::pin;
843 /// use futures_lite::stream::{self, StreamExt};
844 ///
845 /// # spin_on::spin_on(async {
846 /// let s = stream::iter(vec![1, 2, 3]);
847 /// let mut s = s.then(|x| async move { 2 * x });
848 ///
849 /// pin!(s);
850 /// assert_eq!(s.next().await, Some(2));
851 /// assert_eq!(s.next().await, Some(4));
852 /// assert_eq!(s.next().await, Some(6));
853 /// assert_eq!(s.next().await, None);
854 /// # });
855 /// ```
856 fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
857 where
858 Self: Sized,
859 F: FnMut(Self::Item) -> Fut,
860 Fut: Future,
861 {
862 Then {
863 stream: self,
864 future: None,
865 f,
866 }
867 }
868
869 /// Keeps items of the stream for which `predicate` returns `true`.
870 ///
871 /// # Examples
872 ///
873 /// ```
874 /// use futures_lite::stream::{self, StreamExt};
875 ///
876 /// # spin_on::spin_on(async {
877 /// let s = stream::iter(vec![1, 2, 3, 4]);
878 /// let mut s = s.filter(|i| i % 2 == 0);
879 ///
880 /// assert_eq!(s.next().await, Some(2));
881 /// assert_eq!(s.next().await, Some(4));
882 /// assert_eq!(s.next().await, None);
883 /// # });
884 /// ```
885 fn filter<P>(self, predicate: P) -> Filter<Self, P>
886 where
887 Self: Sized,
888 P: FnMut(&Self::Item) -> bool,
889 {
890 Filter {
891 stream: self,
892 predicate,
893 }
894 }
895
896 /// Filters and maps items of the stream using a closure.
897 ///
898 /// # Examples
899 ///
900 /// ```
901 /// use futures_lite::stream::{self, StreamExt};
902 ///
903 /// # spin_on::spin_on(async {
904 /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
905 /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
906 ///
907 /// assert_eq!(s.next().await, Some(1));
908 /// assert_eq!(s.next().await, Some(3));
909 /// assert_eq!(s.next().await, Some(5));
910 /// assert_eq!(s.next().await, None);
911 /// # });
912 /// ```
913 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
914 where
915 Self: Sized,
916 F: FnMut(Self::Item) -> Option<T>,
917 {
918 FilterMap { stream: self, f }
919 }
920
921 /// Takes only the first `n` items of the stream.
922 ///
923 /// # Examples
924 ///
925 /// ```
926 /// use futures_lite::stream::{self, StreamExt};
927 ///
928 /// # spin_on::spin_on(async {
929 /// let mut s = stream::repeat(7).take(2);
930 ///
931 /// assert_eq!(s.next().await, Some(7));
932 /// assert_eq!(s.next().await, Some(7));
933 /// assert_eq!(s.next().await, None);
934 /// # });
935 /// ```
936 fn take(self, n: usize) -> Take<Self>
937 where
938 Self: Sized,
939 {
940 Take { stream: self, n }
941 }
942
943 /// Takes items while `predicate` returns `true`.
944 ///
945 /// # Examples
946 ///
947 /// ```
948 /// use futures_lite::stream::{self, StreamExt};
949 ///
950 /// # spin_on::spin_on(async {
951 /// let s = stream::iter(vec![1, 2, 3, 4]);
952 /// let mut s = s.take_while(|x| *x < 3);
953 ///
954 /// assert_eq!(s.next().await, Some(1));
955 /// assert_eq!(s.next().await, Some(2));
956 /// assert_eq!(s.next().await, None);
957 /// # });
958 /// ```
959 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
960 where
961 Self: Sized,
962 P: FnMut(&Self::Item) -> bool,
963 {
964 TakeWhile {
965 stream: self,
966 predicate,
967 }
968 }
969
970 /// Skips the first `n` items of the stream.
971 ///
972 /// # Examples
973 ///
974 /// ```
975 /// use futures_lite::stream::{self, StreamExt};
976 ///
977 /// # spin_on::spin_on(async {
978 /// let s = stream::iter(vec![1, 2, 3]);
979 /// let mut s = s.skip(2);
980 ///
981 /// assert_eq!(s.next().await, Some(3));
982 /// assert_eq!(s.next().await, None);
983 /// # });
984 /// ```
985 fn skip(self, n: usize) -> Skip<Self>
986 where
987 Self: Sized,
988 {
989 Skip { stream: self, n }
990 }
991
992 /// Skips items while `predicate` returns `true`.
993 ///
994 /// # Examples
995 ///
996 /// ```
997 /// use futures_lite::stream::{self, StreamExt};
998 ///
999 /// # spin_on::spin_on(async {
1000 /// let s = stream::iter(vec![-1i32, 0, 1]);
1001 /// let mut s = s.skip_while(|x| x.is_negative());
1002 ///
1003 /// assert_eq!(s.next().await, Some(0));
1004 /// assert_eq!(s.next().await, Some(1));
1005 /// assert_eq!(s.next().await, None);
1006 /// # });
1007 /// ```
1008 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1009 where
1010 Self: Sized,
1011 P: FnMut(&Self::Item) -> bool,
1012 {
1013 SkipWhile {
1014 stream: self,
1015 predicate: Some(predicate),
1016 }
1017 }
1018
1019 /// Yields every `step`th item.
1020 ///
1021 /// # Panics
1022 ///
1023 /// This method will panic if the `step` is 0.
1024 ///
1025 /// # Examples
1026 ///
1027 /// ```
1028 /// use futures_lite::stream::{self, StreamExt};
1029 ///
1030 /// # spin_on::spin_on(async {
1031 /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
1032 /// let mut s = s.step_by(2);
1033 ///
1034 /// assert_eq!(s.next().await, Some(0));
1035 /// assert_eq!(s.next().await, Some(2));
1036 /// assert_eq!(s.next().await, Some(4));
1037 /// assert_eq!(s.next().await, None);
1038 /// # });
1039 /// ```
1040 fn step_by(self, step: usize) -> StepBy<Self>
1041 where
1042 Self: Sized,
1043 {
1044 assert!(step > 0, "`step` must be greater than zero");
1045 StepBy {
1046 stream: self,
1047 step,
1048 i: 0,
1049 }
1050 }
1051
1052 /// Appends another stream to the end of this one.
1053 ///
1054 /// # Examples
1055 ///
1056 /// ```
1057 /// use futures_lite::stream::{self, StreamExt};
1058 ///
1059 /// # spin_on::spin_on(async {
1060 /// let s1 = stream::iter(vec![1, 2]);
1061 /// let s2 = stream::iter(vec![7, 8]);
1062 /// let mut s = s1.chain(s2);
1063 ///
1064 /// assert_eq!(s.next().await, Some(1));
1065 /// assert_eq!(s.next().await, Some(2));
1066 /// assert_eq!(s.next().await, Some(7));
1067 /// assert_eq!(s.next().await, Some(8));
1068 /// assert_eq!(s.next().await, None);
1069 /// # });
1070 /// ```
1071 fn chain<U>(self, other: U) -> Chain<Self, U>
1072 where
1073 Self: Sized,
1074 U: Stream<Item = Self::Item> + Sized,
1075 {
1076 Chain {
1077 first: self.fuse(),
1078 second: other.fuse(),
1079 }
1080 }
1081
1082 /// Clones all items.
1083 ///
1084 /// # Examples
1085 ///
1086 /// ```
1087 /// use futures_lite::stream::{self, StreamExt};
1088 ///
1089 /// # spin_on::spin_on(async {
1090 /// let s = stream::iter(vec![&1, &2]);
1091 /// let mut s = s.cloned();
1092 ///
1093 /// assert_eq!(s.next().await, Some(1));
1094 /// assert_eq!(s.next().await, Some(2));
1095 /// assert_eq!(s.next().await, None);
1096 /// # });
1097 /// ```
1098 fn cloned<'a, T>(self) -> Cloned<Self>
1099 where
1100 Self: Stream<Item = &'a T> + Sized,
1101 T: Clone + 'a,
1102 {
1103 Cloned { stream: self }
1104 }
1105
1106 /// Copies all items.
1107 ///
1108 /// # Examples
1109 ///
1110 /// ```
1111 /// use futures_lite::stream::{self, StreamExt};
1112 ///
1113 /// # spin_on::spin_on(async {
1114 /// let s = stream::iter(vec![&1, &2]);
1115 /// let mut s = s.copied();
1116 ///
1117 /// assert_eq!(s.next().await, Some(1));
1118 /// assert_eq!(s.next().await, Some(2));
1119 /// assert_eq!(s.next().await, None);
1120 /// # });
1121 /// ```
1122 fn copied<'a, T>(self) -> Copied<Self>
1123 where
1124 Self: Stream<Item = &'a T> + Sized,
1125 T: Copy + 'a,
1126 {
1127 Copied { stream: self }
1128 }
1129
1130 /// Collects all items in the stream into a collection.
1131 ///
1132 /// # Examples
1133 ///
1134 /// ```
1135 /// use futures_lite::stream::{self, StreamExt};
1136 ///
1137 /// # spin_on::spin_on(async {
1138 /// let mut s = stream::iter(1..=3);
1139 ///
1140 /// let items: Vec<_> = s.collect().await;
1141 /// assert_eq!(items, [1, 2, 3]);
1142 /// # });
1143 /// ```
1144 fn collect<C>(self) -> CollectFuture<Self, C>
1145 where
1146 Self: Sized,
1147 C: Default + Extend<Self::Item>,
1148 {
1149 CollectFuture {
1150 stream: self,
1151 collection: Default::default(),
1152 }
1153 }
1154
1155 /// Collects all items in the fallible stream into a collection.
1156 ///
1157 /// ```
1158 /// use futures_lite::stream::{self, StreamExt};
1159 ///
1160 /// # spin_on::spin_on(async {
1161 /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1162 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1163 /// assert_eq!(res, Err(2));
1164 ///
1165 /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1166 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1167 /// assert_eq!(res, Ok(vec![1, 2, 3]));
1168 /// # })
1169 /// ```
1170 fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1171 where
1172 Self: Stream<Item = Result<T, E>> + Sized,
1173 C: Default + Extend<T>,
1174 {
1175 TryCollectFuture {
1176 stream: self,
1177 items: Default::default(),
1178 }
1179 }
1180
1181 /// Partitions items into those for which `predicate` is `true` and those for which it is
1182 /// `false`, and then collects them into two collections.
1183 ///
1184 /// # Examples
1185 ///
1186 /// ```
1187 /// use futures_lite::stream::{self, StreamExt};
1188 ///
1189 /// # spin_on::spin_on(async {
1190 /// let s = stream::iter(vec![1, 2, 3]);
1191 /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1192 ///
1193 /// assert_eq!(even, &[2]);
1194 /// assert_eq!(odd, &[1, 3]);
1195 /// # })
1196 /// ```
1197 fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1198 where
1199 Self: Sized,
1200 B: Default + Extend<Self::Item>,
1201 P: FnMut(&Self::Item) -> bool,
1202 {
1203 PartitionFuture {
1204 stream: self,
1205 predicate,
1206 res: Some(Default::default()),
1207 }
1208 }
1209
1210 /// Accumulates a computation over the stream.
1211 ///
1212 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1213 /// the accumulator and each item in the stream. The final accumulator value is returned.
1214 ///
1215 /// # Examples
1216 ///
1217 /// ```
1218 /// use futures_lite::stream::{self, StreamExt};
1219 ///
1220 /// # spin_on::spin_on(async {
1221 /// let s = stream::iter(vec![1, 2, 3]);
1222 /// let sum = s.fold(0, |acc, x| acc + x).await;
1223 ///
1224 /// assert_eq!(sum, 6);
1225 /// # })
1226 /// ```
1227 fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1228 where
1229 Self: Sized,
1230 F: FnMut(T, Self::Item) -> T,
1231 {
1232 FoldFuture {
1233 stream: self,
1234 f,
1235 acc: Some(init),
1236 }
1237 }
1238
1239 /// Accumulates a fallible computation over the stream.
1240 ///
1241 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1242 /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1243 /// error if `f` failed the computation.
1244 ///
1245 /// # Examples
1246 ///
1247 /// ```
1248 /// use futures_lite::stream::{self, StreamExt};
1249 ///
1250 /// # spin_on::spin_on(async {
1251 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1252 ///
1253 /// let sum = s.try_fold(0, |acc, v| {
1254 /// if (acc + v) % 2 == 1 {
1255 /// Ok(acc + v)
1256 /// } else {
1257 /// Err("fail")
1258 /// }
1259 /// })
1260 /// .await;
1261 ///
1262 /// assert_eq!(sum, Err("fail"));
1263 /// # })
1264 /// ```
1265 fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1266 where
1267 Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1268 F: FnMut(B, T) -> Result<B, E>,
1269 {
1270 TryFoldFuture {
1271 stream: self,
1272 f,
1273 acc: Some(init),
1274 }
1275 }
1276
1277 /// Maps items of the stream to new values using a state value and a closure.
1278 ///
1279 /// Scanning begins with the inital state set to `initial_state`, and then applies `f` to the
1280 /// state and each item in the stream. The stream stops when `f` returns `None`.
1281 ///
1282 /// # Examples
1283 ///
1284 /// ```
1285 /// use futures_lite::stream::{self, StreamExt};
1286 ///
1287 /// # spin_on::spin_on(async {
1288 /// let s = stream::iter(vec![1, 2, 3]);
1289 /// let mut s = s.scan(1, |state, x| {
1290 /// *state = *state * x;
1291 /// Some(-*state)
1292 /// });
1293 ///
1294 /// assert_eq!(s.next().await, Some(-1));
1295 /// assert_eq!(s.next().await, Some(-2));
1296 /// assert_eq!(s.next().await, Some(-6));
1297 /// assert_eq!(s.next().await, None);
1298 /// # })
1299 /// ```
1300 fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1301 where
1302 Self: Sized,
1303 F: FnMut(&mut St, Self::Item) -> Option<B>,
1304 {
1305 Scan {
1306 stream: self,
1307 state_f: (initial_state, f),
1308 }
1309 }
1310
1311 /// Fuses the stream so that it stops yielding items after the first [`None`].
1312 ///
1313 /// # Examples
1314 ///
1315 /// ```
1316 /// use futures_lite::stream::{self, StreamExt};
1317 ///
1318 /// # spin_on::spin_on(async {
1319 /// let mut s = stream::once(1).fuse();
1320 ///
1321 /// assert_eq!(s.next().await, Some(1));
1322 /// assert_eq!(s.next().await, None);
1323 /// assert_eq!(s.next().await, None);
1324 /// # })
1325 /// ```
1326 fn fuse(self) -> Fuse<Self>
1327 where
1328 Self: Sized,
1329 {
1330 Fuse {
1331 stream: self,
1332 done: false,
1333 }
1334 }
1335
1336 /// Repeats the stream from beginning to end, forever.
1337 ///
1338 /// # Examples
1339 ///
1340 /// ```
1341 /// use futures_lite::stream::{self, StreamExt};
1342 ///
1343 /// # spin_on::spin_on(async {
1344 /// let mut s = stream::iter(vec![1, 2]).cycle();
1345 ///
1346 /// assert_eq!(s.next().await, Some(1));
1347 /// assert_eq!(s.next().await, Some(2));
1348 /// assert_eq!(s.next().await, Some(1));
1349 /// assert_eq!(s.next().await, Some(2));
1350 /// # });
1351 /// ```
1352 fn cycle(self) -> Cycle<Self>
1353 where
1354 Self: Clone + Sized,
1355 {
1356 Cycle {
1357 orig: self.clone(),
1358 stream: self,
1359 }
1360 }
1361
1362 /// Enumerates items, mapping them to `(index, item)`.
1363 ///
1364 /// # Examples
1365 ///
1366 /// ```
1367 /// use futures_lite::stream::{self, StreamExt};
1368 ///
1369 /// # spin_on::spin_on(async {
1370 /// let s = stream::iter(vec!['a', 'b', 'c']);
1371 /// let mut s = s.enumerate();
1372 ///
1373 /// assert_eq!(s.next().await, Some((0, 'a')));
1374 /// assert_eq!(s.next().await, Some((1, 'b')));
1375 /// assert_eq!(s.next().await, Some((2, 'c')));
1376 /// assert_eq!(s.next().await, None);
1377 /// # });
1378 /// ```
1379 fn enumerate(self) -> Enumerate<Self>
1380 where
1381 Self: Sized,
1382 {
1383 Enumerate { stream: self, i: 0 }
1384 }
1385
1386 /// Calls a closure on each item and passes it on.
1387 ///
1388 /// # Examples
1389 ///
1390 /// ```
1391 /// use futures_lite::stream::{self, StreamExt};
1392 ///
1393 /// # spin_on::spin_on(async {
1394 /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1395 ///
1396 /// let sum = s
1397 /// .inspect(|x| println!("about to filter {}", x))
1398 /// .filter(|x| x % 2 == 0)
1399 /// .inspect(|x| println!("made it through filter: {}", x))
1400 /// .fold(0, |sum, i| sum + i)
1401 /// .await;
1402 /// # });
1403 /// ```
1404 fn inspect<F>(self, f: F) -> Inspect<Self, F>
1405 where
1406 Self: Sized,
1407 F: FnMut(&Self::Item),
1408 {
1409 Inspect { stream: self, f }
1410 }
1411
1412 /// Gets the `n`th item of the stream.
1413 ///
1414 /// In the end, `n+1` items of the stream will be consumed.
1415 ///
1416 /// # Examples
1417 ///
1418 /// ```
1419 /// use futures_lite::stream::{self, StreamExt};
1420 ///
1421 /// # spin_on::spin_on(async {
1422 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1423 ///
1424 /// assert_eq!(s.nth(2).await, Some(2));
1425 /// assert_eq!(s.nth(2).await, Some(5));
1426 /// assert_eq!(s.nth(2).await, None);
1427 /// # });
1428 /// ```
1429 fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1430 where
1431 Self: Unpin,
1432 {
1433 NthFuture { stream: self, n }
1434 }
1435
1436 /// Returns the last item in the stream.
1437 ///
1438 /// # Examples
1439 ///
1440 /// ```
1441 /// use futures_lite::stream::{self, StreamExt};
1442 ///
1443 /// # spin_on::spin_on(async {
1444 /// let s = stream::iter(vec![1, 2, 3, 4]);
1445 /// assert_eq!(s.last().await, Some(4));
1446 ///
1447 /// let s = stream::empty::<i32>();
1448 /// assert_eq!(s.last().await, None);
1449 /// # });
1450 /// ```
1451 fn last(self) -> LastFuture<Self>
1452 where
1453 Self: Sized,
1454 {
1455 LastFuture {
1456 stream: self,
1457 last: None,
1458 }
1459 }
1460
1461 /// Finds the first item of the stream for which `predicate` returns `true`.
1462 ///
1463 /// # Examples
1464 ///
1465 /// ```
1466 /// use futures_lite::stream::{self, StreamExt};
1467 ///
1468 /// # spin_on::spin_on(async {
1469 /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1470 ///
1471 /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1472 /// assert_eq!(s.next().await, Some(13));
1473 /// # });
1474 /// ```
1475 fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1476 where
1477 Self: Unpin,
1478 P: FnMut(&Self::Item) -> bool,
1479 {
1480 FindFuture {
1481 stream: self,
1482 predicate,
1483 }
1484 }
1485
1486 /// Applies a closure to items in the stream and returns the first [`Some`] result.
1487 ///
1488 /// # Examples
1489 ///
1490 /// ```
1491 /// use futures_lite::stream::{self, StreamExt};
1492 ///
1493 /// # spin_on::spin_on(async {
1494 /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1495 /// let number = s.find_map(|s| s.parse().ok()).await;
1496 ///
1497 /// assert_eq!(number, Some(2));
1498 /// # });
1499 /// ```
1500 fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1501 where
1502 Self: Unpin,
1503 F: FnMut(Self::Item) -> Option<B>,
1504 {
1505 FindMapFuture { stream: self, f }
1506 }
1507
1508 /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1509 ///
1510 /// # Examples
1511 ///
1512 /// ```
1513 /// use futures_lite::stream::{self, StreamExt};
1514 ///
1515 /// # spin_on::spin_on(async {
1516 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1517 ///
1518 /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1519 /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1520 /// assert_eq!(s.position(|x| x == 9).await, None);
1521 /// # });
1522 /// ```
1523 fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1524 where
1525 Self: Unpin,
1526 P: FnMut(Self::Item) -> bool,
1527 {
1528 PositionFuture {
1529 stream: self,
1530 predicate,
1531 index: 0,
1532 }
1533 }
1534
1535 /// Tests if `predicate` returns `true` for all items in the stream.
1536 ///
1537 /// The result is `true` for an empty stream.
1538 ///
1539 /// # Examples
1540 ///
1541 /// ```
1542 /// use futures_lite::stream::{self, StreamExt};
1543 ///
1544 /// # spin_on::spin_on(async {
1545 /// let mut s = stream::iter(vec![1, 2, 3]);
1546 /// assert!(!s.all(|x| x % 2 == 0).await);
1547 ///
1548 /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1549 /// assert!(s.all(|x| x % 2 == 0).await);
1550 ///
1551 /// let mut s = stream::empty::<i32>();
1552 /// assert!(s.all(|x| x % 2 == 0).await);
1553 /// # });
1554 /// ```
1555 fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1556 where
1557 Self: Unpin,
1558 P: FnMut(Self::Item) -> bool,
1559 {
1560 AllFuture {
1561 stream: self,
1562 predicate,
1563 }
1564 }
1565
1566 /// Tests if `predicate` returns `true` for any item in the stream.
1567 ///
1568 /// The result is `false` for an empty stream.
1569 ///
1570 /// # Examples
1571 ///
1572 /// ```
1573 /// use futures_lite::stream::{self, StreamExt};
1574 ///
1575 /// # spin_on::spin_on(async {
1576 /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1577 /// assert!(!s.any(|x| x % 2 == 0).await);
1578 ///
1579 /// let mut s = stream::iter(vec![1, 2, 3]);
1580 /// assert!(s.any(|x| x % 2 == 0).await);
1581 ///
1582 /// let mut s = stream::empty::<i32>();
1583 /// assert!(!s.any(|x| x % 2 == 0).await);
1584 /// # });
1585 /// ```
1586 fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1587 where
1588 Self: Unpin,
1589 P: FnMut(Self::Item) -> bool,
1590 {
1591 AnyFuture {
1592 stream: self,
1593 predicate,
1594 }
1595 }
1596
1597 /// Calls a closure on each item of the stream.
1598 ///
1599 /// # Examples
1600 ///
1601 /// ```
1602 /// use futures_lite::stream::{self, StreamExt};
1603 ///
1604 /// # spin_on::spin_on(async {
1605 /// let mut s = stream::iter(vec![1, 2, 3]);
1606 /// s.for_each(|s| println!("{}", s)).await;
1607 /// # });
1608 /// ```
1609 fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1610 where
1611 Self: Sized,
1612 F: FnMut(Self::Item),
1613 {
1614 ForEachFuture { stream: self, f }
1615 }
1616
1617 /// Calls a fallible closure on each item of the stream, stopping on first error.
1618 ///
1619 /// # Examples
1620 ///
1621 /// ```
1622 /// use futures_lite::stream::{self, StreamExt};
1623 ///
1624 /// # spin_on::spin_on(async {
1625 /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1626 ///
1627 /// let mut v = vec![];
1628 /// let res = s
1629 /// .try_for_each(|n| {
1630 /// if n < 2 {
1631 /// v.push(n);
1632 /// Ok(())
1633 /// } else {
1634 /// Err("too big")
1635 /// }
1636 /// })
1637 /// .await;
1638 ///
1639 /// assert_eq!(v, &[0, 1]);
1640 /// assert_eq!(res, Err("too big"));
1641 /// # });
1642 /// ```
1643 fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1644 where
1645 Self: Unpin,
1646 F: FnMut(Self::Item) -> Result<(), E>,
1647 {
1648 TryForEachFuture { stream: self, f }
1649 }
1650
1651 /// Zips up two streams into a single stream of pairs.
1652 ///
1653 /// The stream of pairs stops when either of the original two streams is exhausted.
1654 ///
1655 /// # Examples
1656 ///
1657 /// ```
1658 /// use futures_lite::stream::{self, StreamExt};
1659 ///
1660 /// # spin_on::spin_on(async {
1661 /// let l = stream::iter(vec![1, 2, 3]);
1662 /// let r = stream::iter(vec![4, 5, 6, 7]);
1663 /// let mut s = l.zip(r);
1664 ///
1665 /// assert_eq!(s.next().await, Some((1, 4)));
1666 /// assert_eq!(s.next().await, Some((2, 5)));
1667 /// assert_eq!(s.next().await, Some((3, 6)));
1668 /// assert_eq!(s.next().await, None);
1669 /// # });
1670 /// ```
1671 fn zip<U>(self, other: U) -> Zip<Self, U>
1672 where
1673 Self: Sized,
1674 U: Stream,
1675 {
1676 Zip {
1677 item_slot: None,
1678 first: self,
1679 second: other,
1680 }
1681 }
1682
1683 /// Collects a stream of pairs into a pair of collections.
1684 ///
1685 /// # Examples
1686 ///
1687 /// ```
1688 /// use futures_lite::stream::{self, StreamExt};
1689 ///
1690 /// # spin_on::spin_on(async {
1691 /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1692 /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1693 ///
1694 /// assert_eq!(left, [1, 3]);
1695 /// assert_eq!(right, [2, 4]);
1696 /// # });
1697 /// ```
1698 fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1699 where
1700 FromA: Default + Extend<A>,
1701 FromB: Default + Extend<B>,
1702 Self: Stream<Item = (A, B)> + Sized,
1703 {
1704 UnzipFuture {
1705 stream: self,
1706 res: Some(Default::default()),
1707 }
1708 }
1709
1710 /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1711 ///
1712 /// # Examples
1713 ///
1714 /// ```
1715 /// use futures_lite::stream::{self, StreamExt};
1716 /// use futures_lite::stream::{once, pending};
1717 ///
1718 /// # spin_on::spin_on(async {
1719 /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1720 /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1721 ///
1722 /// // The first future wins.
1723 /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1724 /// # })
1725 /// ```
1726 fn or<S>(self, other: S) -> Or<Self, S>
1727 where
1728 Self: Sized,
1729 S: Stream<Item = Self::Item>,
1730 {
1731 Or {
1732 stream1: self,
1733 stream2: other,
1734 }
1735 }
1736
1737 /// Merges with `other` stream, with no preference for either stream when both are ready.
1738 ///
1739 /// # Examples
1740 ///
1741 /// ```
1742 /// use futures_lite::stream::{self, StreamExt};
1743 /// use futures_lite::stream::{once, pending};
1744 ///
1745 /// # spin_on::spin_on(async {
1746 /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1747 /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1748 ///
1749 /// // One of the two stream is randomly chosen as the winner.
1750 /// let res = once(1).race(once(2)).next().await;
1751 /// # })
1752 /// ```
1753 #[cfg(all(feature = "std", feature = "race"))]
1754 fn race<S>(self, other: S) -> Race<Self, S>
1755 where
1756 Self: Sized,
1757 S: Stream<Item = Self::Item>,
1758 {
1759 Race {
1760 stream1: self,
1761 stream2: other,
1762 rng: Rng::new(),
1763 }
1764 }
1765
1766 /// Yields all immediately available values from a stream.
1767 ///
1768 /// This is intended to be used as a way of polling a stream without waiting, similar to the
1769 /// [`try_iter`] function on [`std::sync::mpsc::Receiver`]. For instance, running this stream
1770 /// on an [`async_channel::Receiver`] will return all messages that are currently in the
1771 /// channel, but will not wait for new messages.
1772 ///
1773 /// This returns a [`Stream`] instead of an [`Iterator`] because it still needs access to the
1774 /// polling context in order to poll the underlying stream. Since this stream will never return
1775 /// `Poll::Pending`, wrapping it in [`block_on`] will allow it to be effectively used as an
1776 /// [`Iterator`].
1777 ///
1778 /// This stream is not necessarily fused. After it returns `None`, it can return `Some(x)` in
1779 /// the future if it is polled again.
1780 ///
1781 /// [`try_iter`]: std::sync::mpsc::Receiver::try_iter
1782 /// [`async_channel::Receiver`]: https://docs.rs/async-channel/latest/async_channel/struct.Receiver.html
1783 /// [`Stream`]: crate::stream::Stream
1784 /// [`Iterator`]: std::iter::Iterator
1785 ///
1786 /// # Examples
1787 ///
1788 /// ```
1789 /// use futures_lite::{future, pin};
1790 /// use futures_lite::stream::{self, StreamExt};
1791 ///
1792 /// # #[cfg(feature = "std")] {
1793 /// // A stream that yields two values, returns `Pending`, and then yields one more value.
1794 /// let pend_once = stream::once_future(async {
1795 /// future::yield_now().await;
1796 /// 3
1797 /// });
1798 /// let s = stream::iter(vec![1, 2]).chain(pend_once);
1799 /// pin!(s);
1800 ///
1801 /// // This will return the first two values, and then `None` because the stream returns
1802 /// // `Pending` after that.
1803 /// let mut iter = stream::block_on(s.drain());
1804 /// assert_eq!(iter.next(), Some(1));
1805 /// assert_eq!(iter.next(), Some(2));
1806 /// assert_eq!(iter.next(), None);
1807 ///
1808 /// // This will return the last value, because the stream returns `Ready` when polled.
1809 /// assert_eq!(iter.next(), Some(3));
1810 /// assert_eq!(iter.next(), None);
1811 /// # }
1812 /// ```
1813 fn drain(&mut self) -> Drain<'_, Self> {
1814 Drain { stream: self }
1815 }
1816
1817 /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
1818 ///
1819 /// # Examples
1820 ///
1821 /// ```
1822 /// use futures_lite::stream::{self, StreamExt};
1823 ///
1824 /// # spin_on::spin_on(async {
1825 /// let a = stream::once(1);
1826 /// let b = stream::empty();
1827 ///
1828 /// // Streams of different types can be stored in
1829 /// // the same collection when they are boxed:
1830 /// let streams = vec![a.boxed(), b.boxed()];
1831 /// # })
1832 /// ```
1833 #[cfg(feature = "alloc")]
1834 fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
1835 where
1836 Self: Send + Sized + 'a,
1837 {
1838 Box::pin(self)
1839 }
1840
1841 /// Boxes the stream and changes its type to `dyn Stream + 'a`.
1842 ///
1843 /// # Examples
1844 ///
1845 /// ```
1846 /// use futures_lite::stream::{self, StreamExt};
1847 ///
1848 /// # spin_on::spin_on(async {
1849 /// let a = stream::once(1);
1850 /// let b = stream::empty();
1851 ///
1852 /// // Streams of different types can be stored in
1853 /// // the same collection when they are boxed:
1854 /// let streams = vec![a.boxed_local(), b.boxed_local()];
1855 /// # })
1856 /// ```
1857 #[cfg(feature = "alloc")]
1858 fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
1859 where
1860 Self: Sized + 'a,
1861 {
1862 Box::pin(self)
1863 }
1864}
1865
1866impl<S: Stream + ?Sized> StreamExt for S {}
1867
1868/// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
1869///
1870/// # Examples
1871///
1872/// ```
1873/// use futures_lite::stream::{self, StreamExt};
1874///
1875/// // These two lines are equivalent:
1876/// let s1: stream::Boxed<i32> = stream::once(7).boxed();
1877/// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
1878/// ```
1879#[cfg(feature = "alloc")]
1880pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
1881
1882/// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
1883///
1884/// # Examples
1885///
1886/// ```
1887/// use futures_lite::stream::{self, StreamExt};
1888///
1889/// // These two lines are equivalent:
1890/// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
1891/// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
1892/// ```
1893#[cfg(feature = "alloc")]
1894pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
1895
1896/// Future for the [`StreamExt::next()`] method.
1897#[derive(Debug)]
1898#[must_use = "futures do nothing unless you `.await` or poll them"]
1899pub struct NextFuture<'a, S: ?Sized> {
1900 stream: &'a mut S,
1901}
1902
1903impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
1904
1905impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
1906 type Output = Option<S::Item>;
1907
1908 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1909 self.stream.poll_next(cx)
1910 }
1911}
1912
1913/// Future for the [`StreamExt::try_next()`] method.
1914#[derive(Debug)]
1915#[must_use = "futures do nothing unless you `.await` or poll them"]
1916pub struct TryNextFuture<'a, S: ?Sized> {
1917 stream: &'a mut S,
1918}
1919
1920impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
1921
1922impl<T, E, S> Future for TryNextFuture<'_, S>
1923where
1924 S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
1925{
1926 type Output = Result<Option<T>, E>;
1927
1928 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1929 let res: Option> = ready!(self.stream.poll_next(cx));
1930 Poll::Ready(res.transpose())
1931 }
1932}
1933
1934pin_project! {
1935 /// Future for the [`StreamExt::count()`] method.
1936 #[derive(Debug)]
1937 #[must_use = "futures do nothing unless you `.await` or poll them"]
1938 pub struct CountFuture<S: ?Sized> {
1939 count: usize,
1940 #[pin]
1941 stream: S,
1942 }
1943}
1944
1945impl<S: Stream + ?Sized> Future for CountFuture<S> {
1946 type Output = usize;
1947
1948 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1949 loop {
1950 match ready!(self.as_mut().project().stream.poll_next(cx)) {
1951 None => return Poll::Ready(self.count),
1952 Some(_) => *self.as_mut().project().count += 1,
1953 }
1954 }
1955 }
1956}
1957
1958pin_project! {
1959 /// Future for the [`StreamExt::collect()`] method.
1960 #[derive(Debug)]
1961 #[must_use = "futures do nothing unless you `.await` or poll them"]
1962 pub struct CollectFuture<S, C> {
1963 #[pin]
1964 stream: S,
1965 collection: C,
1966 }
1967}
1968
1969impl<S, C> Future for CollectFuture<S, C>
1970where
1971 S: Stream,
1972 C: Default + Extend<S::Item>,
1973{
1974 type Output = C;
1975
1976 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
1977 let mut this: Projection<'_, S, C> = self.as_mut().project();
1978 loop {
1979 match ready!(this.stream.as_mut().poll_next(cx)) {
1980 Some(e: ::Item) => this.collection.extend(iter:Some(e)),
1981 None => return Poll::Ready(mem::take(self.project().collection)),
1982 }
1983 }
1984 }
1985}
1986
1987pin_project! {
1988 /// Future for the [`StreamExt::try_collect()`] method.
1989 #[derive(Debug)]
1990 #[must_use = "futures do nothing unless you `.await` or poll them"]
1991 pub struct TryCollectFuture<S, C> {
1992 #[pin]
1993 stream: S,
1994 items: C,
1995 }
1996}
1997
1998impl<T, E, S, C> Future for TryCollectFuture<S, C>
1999where
2000 S: Stream<Item = Result<T, E>>,
2001 C: Default + Extend<T>,
2002{
2003 type Output = Result<C, E>;
2004
2005 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2006 let mut this: Projection<'_, S, C> = self.project();
2007 Poll::Ready(Ok(loop {
2008 match ready!(this.stream.as_mut().poll_next(cx)?) {
2009 Some(x: T) => this.items.extend(iter:Some(x)),
2010 None => break mem::take(dest:this.items),
2011 }
2012 }))
2013 }
2014}
2015
2016pin_project! {
2017 /// Future for the [`StreamExt::partition()`] method.
2018 #[derive(Debug)]
2019 #[must_use = "futures do nothing unless you `.await` or poll them"]
2020 pub struct PartitionFuture<S, P, B> {
2021 #[pin]
2022 stream: S,
2023 predicate: P,
2024 res: Option<(B, B)>,
2025 }
2026}
2027
2028impl<S, P, B> Future for PartitionFuture<S, P, B>
2029where
2030 S: Stream + Sized,
2031 P: FnMut(&S::Item) -> bool,
2032 B: Default + Extend<S::Item>,
2033{
2034 type Output = (B, B);
2035
2036 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2037 let mut this: Projection<'_, S, P, B> = self.project();
2038 loop {
2039 match ready!(this.stream.as_mut().poll_next(cx)) {
2040 Some(v: ::Item) => {
2041 let res: &mut (B, B) = this.res.as_mut().unwrap();
2042 if (this.predicate)(&v) {
2043 res.0.extend(iter:Some(v))
2044 } else {
2045 res.1.extend(iter:Some(v))
2046 }
2047 }
2048 None => return Poll::Ready(this.res.take().unwrap()),
2049 }
2050 }
2051 }
2052}
2053
2054pin_project! {
2055 /// Future for the [`StreamExt::fold()`] method.
2056 #[derive(Debug)]
2057 #[must_use = "futures do nothing unless you `.await` or poll them"]
2058 pub struct FoldFuture<S, F, T> {
2059 #[pin]
2060 stream: S,
2061 f: F,
2062 acc: Option<T>,
2063 }
2064}
2065
2066impl<S, F, T> Future for FoldFuture<S, F, T>
2067where
2068 S: Stream,
2069 F: FnMut(T, S::Item) -> T,
2070{
2071 type Output = T;
2072
2073 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2074 let mut this: Projection<'_, S, F, T> = self.project();
2075 loop {
2076 match ready!(this.stream.as_mut().poll_next(cx)) {
2077 Some(v: ::Item) => {
2078 let old: T = this.acc.take().unwrap();
2079 let new: T = (this.f)(old, v);
2080 *this.acc = Some(new);
2081 }
2082 None => return Poll::Ready(this.acc.take().unwrap()),
2083 }
2084 }
2085 }
2086}
2087
2088/// Future for the [`StreamExt::try_fold()`] method.
2089#[derive(Debug)]
2090#[must_use = "futures do nothing unless you `.await` or poll them"]
2091pub struct TryFoldFuture<'a, S, F, B> {
2092 stream: &'a mut S,
2093 f: F,
2094 acc: Option<B>,
2095}
2096
2097impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
2098
2099impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
2100where
2101 S: Stream<Item = Result<T, E>> + Unpin,
2102 F: FnMut(B, T) -> Result<B, E>,
2103{
2104 type Output = Result<B, E>;
2105
2106 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2107 loop {
2108 match ready!(self.stream.poll_next(cx)) {
2109 Some(Err(e: E)) => return Poll::Ready(Err(e)),
2110 Some(Ok(t: T)) => {
2111 let old: B = self.acc.take().unwrap();
2112 let new: Result = (&mut self.f)(old, t);
2113
2114 match new {
2115 Ok(t: B) => self.acc = Some(t),
2116 Err(e: E) => return Poll::Ready(Err(e)),
2117 }
2118 }
2119 None => return Poll::Ready(Ok(self.acc.take().unwrap())),
2120 }
2121 }
2122 }
2123}
2124
2125pin_project! {
2126 /// Stream for the [`StreamExt::scan()`] method.
2127 #[derive(Clone, Debug)]
2128 #[must_use = "streams do nothing unless polled"]
2129 pub struct Scan<S, St, F> {
2130 #[pin]
2131 stream: S,
2132 state_f: (St, F),
2133 }
2134}
2135
2136impl<S, St, F, B> Stream for Scan<S, St, F>
2137where
2138 S: Stream,
2139 F: FnMut(&mut St, S::Item) -> Option<B>,
2140{
2141 type Item = B;
2142
2143 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
2144 let mut this: Projection<'_, S, St, F> = self.project();
2145 this.stream.as_mut().poll_next(cx).map(|item: Option<::Item>| {
2146 item.and_then(|item: ::Item| {
2147 let (state: &mut St, f: &mut F) = this.state_f;
2148 f(state, item)
2149 })
2150 })
2151 }
2152}
2153
2154pin_project! {
2155 /// Stream for the [`StreamExt::fuse()`] method.
2156 #[derive(Clone, Debug)]
2157 #[must_use = "streams do nothing unless polled"]
2158 pub struct Fuse<S> {
2159 #[pin]
2160 stream: S,
2161 done: bool,
2162 }
2163}
2164
2165impl<S: Stream> Stream for Fuse<S> {
2166 type Item = S::Item;
2167
2168 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2169 let this: Projection<'_, S> = self.project();
2170
2171 if *this.done {
2172 Poll::Ready(None)
2173 } else {
2174 let next: Option<::Item> = ready!(this.stream.poll_next(cx));
2175 if next.is_none() {
2176 *this.done = true;
2177 }
2178 Poll::Ready(next)
2179 }
2180 }
2181}
2182
2183pin_project! {
2184 /// Stream for the [`StreamExt::map()`] method.
2185 #[derive(Clone, Debug)]
2186 #[must_use = "streams do nothing unless polled"]
2187 pub struct Map<S, F> {
2188 #[pin]
2189 stream: S,
2190 f: F,
2191 }
2192}
2193
2194impl<S, F, T> Stream for Map<S, F>
2195where
2196 S: Stream,
2197 F: FnMut(S::Item) -> T,
2198{
2199 type Item = T;
2200
2201 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2202 let this: Projection<'_, S, F> = self.project();
2203 let next: Option<::Item> = ready!(this.stream.poll_next(cx));
2204 Poll::Ready(next.map(this.f))
2205 }
2206
2207 fn size_hint(&self) -> (usize, Option<usize>) {
2208 self.stream.size_hint()
2209 }
2210}
2211
2212pin_project! {
2213 /// Stream for the [`StreamExt::flat_map()`] method.
2214 #[derive(Clone, Debug)]
2215 #[must_use = "streams do nothing unless polled"]
2216 pub struct FlatMap<S, U, F> {
2217 #[pin]
2218 stream: Map<S, F>,
2219 #[pin]
2220 inner_stream: Option<U>,
2221 }
2222}
2223
2224impl<S, U, F> Stream for FlatMap<S, U, F>
2225where
2226 S: Stream,
2227 U: Stream,
2228 F: FnMut(S::Item) -> U,
2229{
2230 type Item = U::Item;
2231
2232 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2233 let mut this: Projection<'_, S, U, F> = self.project();
2234 loop {
2235 if let Some(inner: Pin<&mut U>) = this.inner_stream.as_mut().as_pin_mut() {
2236 match ready!(inner.poll_next(cx)) {
2237 Some(item: ::Item) => return Poll::Ready(Some(item)),
2238 None => this.inner_stream.set(None),
2239 }
2240 }
2241
2242 match ready!(this.stream.as_mut().poll_next(cx)) {
2243 Some(stream: U) => this.inner_stream.set(Some(stream)),
2244 None => return Poll::Ready(None),
2245 }
2246 }
2247 }
2248}
2249
2250pin_project! {
2251 /// Stream for the [`StreamExt::flatten()`] method.
2252 #[derive(Clone, Debug)]
2253 #[must_use = "streams do nothing unless polled"]
2254 pub struct Flatten<S: Stream> {
2255 #[pin]
2256 stream: S,
2257 #[pin]
2258 inner_stream: Option<S::Item>,
2259 }
2260}
2261
2262impl<S, U> Stream for Flatten<S>
2263where
2264 S: Stream<Item = U>,
2265 U: Stream,
2266{
2267 type Item = U::Item;
2268
2269 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2270 let mut this: Projection<'_, S> = self.project();
2271 loop {
2272 if let Some(inner: Pin<&mut U>) = this.inner_stream.as_mut().as_pin_mut() {
2273 match ready!(inner.poll_next(cx)) {
2274 Some(item: ::Item) => return Poll::Ready(Some(item)),
2275 None => this.inner_stream.set(None),
2276 }
2277 }
2278
2279 match ready!(this.stream.as_mut().poll_next(cx)) {
2280 Some(inner: U) => this.inner_stream.set(Some(inner)),
2281 None => return Poll::Ready(None),
2282 }
2283 }
2284 }
2285}
2286
2287pin_project! {
2288 /// Stream for the [`StreamExt::then()`] method.
2289 #[derive(Clone, Debug)]
2290 #[must_use = "streams do nothing unless polled"]
2291 pub struct Then<S, F, Fut> {
2292 #[pin]
2293 stream: S,
2294 #[pin]
2295 future: Option<Fut>,
2296 f: F,
2297 }
2298}
2299
2300impl<S, F, Fut> Stream for Then<S, F, Fut>
2301where
2302 S: Stream,
2303 F: FnMut(S::Item) -> Fut,
2304 Fut: Future,
2305{
2306 type Item = Fut::Output;
2307
2308 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2309 let mut this = self.project();
2310
2311 loop {
2312 if let Some(fut) = this.future.as_mut().as_pin_mut() {
2313 let item = ready!(fut.poll(cx));
2314 this.future.set(None);
2315 return Poll::Ready(Some(item));
2316 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2317 this.future.set(Some((this.f)(item)));
2318 } else {
2319 return Poll::Ready(None);
2320 }
2321 }
2322 }
2323
2324 fn size_hint(&self) -> (usize, Option<usize>) {
2325 let future_len = self.future.is_some() as usize;
2326 let (lower, upper) = self.stream.size_hint();
2327 let lower = lower.saturating_add(future_len);
2328 let upper = upper.and_then(|u| u.checked_add(future_len));
2329 (lower, upper)
2330 }
2331}
2332
2333pin_project! {
2334 /// Stream for the [`StreamExt::filter()`] method.
2335 #[derive(Clone, Debug)]
2336 #[must_use = "streams do nothing unless polled"]
2337 pub struct Filter<S, P> {
2338 #[pin]
2339 stream: S,
2340 predicate: P,
2341 }
2342}
2343
2344impl<S, P> Stream for Filter<S, P>
2345where
2346 S: Stream,
2347 P: FnMut(&S::Item) -> bool,
2348{
2349 type Item = S::Item;
2350
2351 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2352 let mut this: Projection<'_, S, P> = self.project();
2353 loop {
2354 match ready!(this.stream.as_mut().poll_next(cx)) {
2355 None => return Poll::Ready(None),
2356 Some(v: ::Item) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2357 Some(_) => {}
2358 }
2359 }
2360 }
2361
2362 fn size_hint(&self) -> (usize, Option<usize>) {
2363 let (_, hi: Option) = self.stream.size_hint();
2364
2365 // If the filter matches all of the elements, it will match the stream's upper bound.
2366 // If the filter matches none of the elements, there will be zero returned values.
2367 (0, hi)
2368 }
2369}
2370
2371/// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2372///
2373/// # Examples
2374///
2375/// ```
2376/// use futures_lite::stream::{self, once, pending, StreamExt};
2377///
2378/// # spin_on::spin_on(async {
2379/// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2380/// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2381///
2382/// // The first stream wins.
2383/// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2384/// # })
2385/// ```
2386pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2387where
2388 S1: Stream<Item = T>,
2389 S2: Stream<Item = T>,
2390{
2391 Or { stream1, stream2 }
2392}
2393
2394pin_project! {
2395 /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2396 #[derive(Clone, Debug)]
2397 #[must_use = "streams do nothing unless polled"]
2398 pub struct Or<S1, S2> {
2399 #[pin]
2400 stream1: S1,
2401 #[pin]
2402 stream2: S2,
2403 }
2404}
2405
2406impl<T, S1, S2> Stream for Or<S1, S2>
2407where
2408 S1: Stream<Item = T>,
2409 S2: Stream<Item = T>,
2410{
2411 type Item = T;
2412
2413 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2414 let mut this: Projection<'_, S1, S2> = self.project();
2415
2416 if let Poll::Ready(Some(t: T)) = this.stream1.as_mut().poll_next(cx) {
2417 return Poll::Ready(Some(t));
2418 }
2419 this.stream2.as_mut().poll_next(cx)
2420 }
2421}
2422
2423/// Merges two streams, with no preference for either stream when both are ready.
2424///
2425/// # Examples
2426///
2427/// ```
2428/// use futures_lite::stream::{self, once, pending, StreamExt};
2429///
2430/// # spin_on::spin_on(async {
2431/// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2432/// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2433///
2434/// // One of the two stream is randomly chosen as the winner.
2435/// let res = stream::race(once(1), once(2)).next().await;
2436/// # })
2437/// ```
2438#[cfg(all(feature = "std", feature = "race"))]
2439pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2440where
2441 S1: Stream<Item = T>,
2442 S2: Stream<Item = T>,
2443{
2444 Race {
2445 stream1,
2446 stream2,
2447 rng: Rng::new(),
2448 }
2449}
2450
2451/// Races two streams, but with a user-provided seed for randomness.
2452///
2453/// # Examples
2454///
2455/// ```
2456/// use futures_lite::stream::{self, once, pending, StreamExt};
2457///
2458/// // A fixed seed is used for reproducibility.
2459/// const SEED: u64 = 123;
2460///
2461/// # spin_on::spin_on(async {
2462/// assert_eq!(stream::race_with_seed(once(1), pending(), SEED).next().await, Some(1));
2463/// assert_eq!(stream::race_with_seed(pending(), once(2), SEED).next().await, Some(2));
2464///
2465/// // One of the two stream is randomly chosen as the winner.
2466/// let res = stream::race_with_seed(once(1), once(2), SEED).next().await;
2467/// # })
2468/// ```
2469#[cfg(feature = "race")]
2470pub fn race_with_seed<T, S1, S2>(stream1: S1, stream2: S2, seed: u64) -> Race<S1, S2>
2471where
2472 S1: Stream<Item = T>,
2473 S2: Stream<Item = T>,
2474{
2475 Race {
2476 stream1,
2477 stream2,
2478 rng: Rng::with_seed(seed),
2479 }
2480}
2481
2482#[cfg(feature = "race")]
2483pin_project! {
2484 /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2485 #[derive(Clone, Debug)]
2486 #[must_use = "streams do nothing unless polled"]
2487 pub struct Race<S1, S2> {
2488 #[pin]
2489 stream1: S1,
2490 #[pin]
2491 stream2: S2,
2492 rng: Rng,
2493 }
2494}
2495
2496#[cfg(feature = "race")]
2497impl<T, S1, S2> Stream for Race<S1, S2>
2498where
2499 S1: Stream<Item = T>,
2500 S2: Stream<Item = T>,
2501{
2502 type Item = T;
2503
2504 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2505 let mut this: Projection<'_, S1, S2> = self.project();
2506
2507 if this.rng.bool() {
2508 if let Poll::Ready(Some(t: T)) = this.stream1.as_mut().poll_next(cx) {
2509 return Poll::Ready(Some(t));
2510 }
2511 if let Poll::Ready(Some(t: T)) = this.stream2.as_mut().poll_next(cx) {
2512 return Poll::Ready(Some(t));
2513 }
2514 } else {
2515 if let Poll::Ready(Some(t: T)) = this.stream2.as_mut().poll_next(cx) {
2516 return Poll::Ready(Some(t));
2517 }
2518 if let Poll::Ready(Some(t: T)) = this.stream1.as_mut().poll_next(cx) {
2519 return Poll::Ready(Some(t));
2520 }
2521 }
2522 Poll::Pending
2523 }
2524}
2525
2526pin_project! {
2527 /// Stream for the [`StreamExt::filter_map()`] method.
2528 #[derive(Clone, Debug)]
2529 #[must_use = "streams do nothing unless polled"]
2530 pub struct FilterMap<S, F> {
2531 #[pin]
2532 stream: S,
2533 f: F,
2534 }
2535}
2536
2537impl<S, F, T> Stream for FilterMap<S, F>
2538where
2539 S: Stream,
2540 F: FnMut(S::Item) -> Option<T>,
2541{
2542 type Item = T;
2543
2544 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2545 let mut this: Projection<'_, S, F> = self.project();
2546 loop {
2547 match ready!(this.stream.as_mut().poll_next(cx)) {
2548 None => return Poll::Ready(None),
2549 Some(v: ::Item) => {
2550 if let Some(t: T) = (this.f)(v) {
2551 return Poll::Ready(Some(t));
2552 }
2553 }
2554 }
2555 }
2556 }
2557}
2558
2559pin_project! {
2560 /// Stream for the [`StreamExt::take()`] method.
2561 #[derive(Clone, Debug)]
2562 #[must_use = "streams do nothing unless polled"]
2563 pub struct Take<S> {
2564 #[pin]
2565 stream: S,
2566 n: usize,
2567 }
2568}
2569
2570impl<S: Stream> Stream for Take<S> {
2571 type Item = S::Item;
2572
2573 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2574 let this: Projection<'_, S> = self.project();
2575
2576 if *this.n == 0 {
2577 Poll::Ready(None)
2578 } else {
2579 let next: Option<::Item> = ready!(this.stream.poll_next(cx));
2580 match next {
2581 Some(_) => *this.n -= 1,
2582 None => *this.n = 0,
2583 }
2584 Poll::Ready(next)
2585 }
2586 }
2587}
2588
2589pin_project! {
2590 /// Stream for the [`StreamExt::take_while()`] method.
2591 #[derive(Clone, Debug)]
2592 #[must_use = "streams do nothing unless polled"]
2593 pub struct TakeWhile<S, P> {
2594 #[pin]
2595 stream: S,
2596 predicate: P,
2597 }
2598}
2599
2600impl<S, P> Stream for TakeWhile<S, P>
2601where
2602 S: Stream,
2603 P: FnMut(&S::Item) -> bool,
2604{
2605 type Item = S::Item;
2606
2607 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2608 let this: Projection<'_, S, P> = self.project();
2609
2610 match ready!(this.stream.poll_next(cx)) {
2611 Some(v: ::Item) => {
2612 if (this.predicate)(&v) {
2613 Poll::Ready(Some(v))
2614 } else {
2615 Poll::Ready(None)
2616 }
2617 }
2618 None => Poll::Ready(None),
2619 }
2620 }
2621}
2622
2623pin_project! {
2624 /// Stream for the [`StreamExt::skip()`] method.
2625 #[derive(Clone, Debug)]
2626 #[must_use = "streams do nothing unless polled"]
2627 pub struct Skip<S> {
2628 #[pin]
2629 stream: S,
2630 n: usize,
2631 }
2632}
2633
2634impl<S: Stream> Stream for Skip<S> {
2635 type Item = S::Item;
2636
2637 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2638 let mut this: Projection<'_, S> = self.project();
2639 loop {
2640 match ready!(this.stream.as_mut().poll_next(cx)) {
2641 Some(v: ::Item) => match *this.n {
2642 0 => return Poll::Ready(Some(v)),
2643 _ => *this.n -= 1,
2644 },
2645 None => return Poll::Ready(None),
2646 }
2647 }
2648 }
2649}
2650
2651pin_project! {
2652 /// Stream for the [`StreamExt::skip_while()`] method.
2653 #[derive(Clone, Debug)]
2654 #[must_use = "streams do nothing unless polled"]
2655 pub struct SkipWhile<S, P> {
2656 #[pin]
2657 stream: S,
2658 predicate: Option<P>,
2659 }
2660}
2661
2662impl<S, P> Stream for SkipWhile<S, P>
2663where
2664 S: Stream,
2665 P: FnMut(&S::Item) -> bool,
2666{
2667 type Item = S::Item;
2668
2669 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2670 let mut this: Projection<'_, S, P> = self.project();
2671 loop {
2672 match ready!(this.stream.as_mut().poll_next(cx)) {
2673 Some(v: ::Item) => match this.predicate {
2674 Some(p: &mut P) => {
2675 if !p(&v) {
2676 *this.predicate = None;
2677 return Poll::Ready(Some(v));
2678 }
2679 }
2680 None => return Poll::Ready(Some(v)),
2681 },
2682 None => return Poll::Ready(None),
2683 }
2684 }
2685 }
2686}
2687
2688pin_project! {
2689 /// Stream for the [`StreamExt::step_by()`] method.
2690 #[derive(Clone, Debug)]
2691 #[must_use = "streams do nothing unless polled"]
2692 pub struct StepBy<S> {
2693 #[pin]
2694 stream: S,
2695 step: usize,
2696 i: usize,
2697 }
2698}
2699
2700impl<S: Stream> Stream for StepBy<S> {
2701 type Item = S::Item;
2702
2703 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2704 let mut this: Projection<'_, S> = self.project();
2705 loop {
2706 match ready!(this.stream.as_mut().poll_next(cx)) {
2707 Some(v: ::Item) => {
2708 if *this.i == 0 {
2709 *this.i = *this.step - 1;
2710 return Poll::Ready(Some(v));
2711 } else {
2712 *this.i -= 1;
2713 }
2714 }
2715 None => return Poll::Ready(None),
2716 }
2717 }
2718 }
2719}
2720
2721pin_project! {
2722 /// Stream for the [`StreamExt::chain()`] method.
2723 #[derive(Clone, Debug)]
2724 #[must_use = "streams do nothing unless polled"]
2725 pub struct Chain<S, U> {
2726 #[pin]
2727 first: Fuse<S>,
2728 #[pin]
2729 second: Fuse<U>,
2730 }
2731}
2732
2733impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2734 type Item = S::Item;
2735
2736 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2737 let mut this = self.project();
2738
2739 if !this.first.done {
2740 let next = ready!(this.first.as_mut().poll_next(cx));
2741 if let Some(next) = next {
2742 return Poll::Ready(Some(next));
2743 }
2744 }
2745
2746 if !this.second.done {
2747 let next = ready!(this.second.as_mut().poll_next(cx));
2748 if let Some(next) = next {
2749 return Poll::Ready(Some(next));
2750 }
2751 }
2752
2753 if this.first.done && this.second.done {
2754 Poll::Ready(None)
2755 } else {
2756 Poll::Pending
2757 }
2758 }
2759}
2760
2761pin_project! {
2762 /// Stream for the [`StreamExt::cloned()`] method.
2763 #[derive(Clone, Debug)]
2764 #[must_use = "streams do nothing unless polled"]
2765 pub struct Cloned<S> {
2766 #[pin]
2767 stream: S,
2768 }
2769}
2770
2771impl<'a, S, T: 'a> Stream for Cloned<S>
2772where
2773 S: Stream<Item = &'a T>,
2774 T: Clone,
2775{
2776 type Item = T;
2777
2778 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2779 let this: Projection<'_, S> = self.project();
2780 let next: Option<&T> = ready!(this.stream.poll_next(cx));
2781 Poll::Ready(next.cloned())
2782 }
2783}
2784
2785pin_project! {
2786 /// Stream for the [`StreamExt::copied()`] method.
2787 #[derive(Clone, Debug)]
2788 #[must_use = "streams do nothing unless polled"]
2789 pub struct Copied<S> {
2790 #[pin]
2791 stream: S,
2792 }
2793}
2794
2795impl<'a, S, T: 'a> Stream for Copied<S>
2796where
2797 S: Stream<Item = &'a T>,
2798 T: Copy,
2799{
2800 type Item = T;
2801
2802 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2803 let this: Projection<'_, S> = self.project();
2804 let next: Option<&T> = ready!(this.stream.poll_next(cx));
2805 Poll::Ready(next.copied())
2806 }
2807}
2808
2809pin_project! {
2810 /// Stream for the [`StreamExt::cycle()`] method.
2811 #[derive(Clone, Debug)]
2812 #[must_use = "streams do nothing unless polled"]
2813 pub struct Cycle<S> {
2814 orig: S,
2815 #[pin]
2816 stream: S,
2817 }
2818}
2819
2820impl<S> Stream for Cycle<S>
2821where
2822 S: Stream + Clone,
2823{
2824 type Item = S::Item;
2825
2826 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2827 match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
2828 Some(item: ::Item) => Poll::Ready(Some(item)),
2829 None => {
2830 let new: S = self.as_mut().orig.clone();
2831 self.as_mut().project().stream.set(new);
2832 self.project().stream.poll_next(cx)
2833 }
2834 }
2835 }
2836}
2837
2838pin_project! {
2839 /// Stream for the [`StreamExt::enumerate()`] method.
2840 #[derive(Clone, Debug)]
2841 #[must_use = "streams do nothing unless polled"]
2842 pub struct Enumerate<S> {
2843 #[pin]
2844 stream: S,
2845 i: usize,
2846 }
2847}
2848
2849impl<S> Stream for Enumerate<S>
2850where
2851 S: Stream,
2852{
2853 type Item = (usize, S::Item);
2854
2855 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2856 let this: Projection<'_, S> = self.project();
2857
2858 match ready!(this.stream.poll_next(cx)) {
2859 Some(v: ::Item) => {
2860 let ret: (usize, ::Item) = (*this.i, v);
2861 *this.i += 1;
2862 Poll::Ready(Some(ret))
2863 }
2864 None => Poll::Ready(None),
2865 }
2866 }
2867}
2868
2869pin_project! {
2870 /// Stream for the [`StreamExt::inspect()`] method.
2871 #[derive(Clone, Debug)]
2872 #[must_use = "streams do nothing unless polled"]
2873 pub struct Inspect<S, F> {
2874 #[pin]
2875 stream: S,
2876 f: F,
2877 }
2878}
2879
2880impl<S, F> Stream for Inspect<S, F>
2881where
2882 S: Stream,
2883 F: FnMut(&S::Item),
2884{
2885 type Item = S::Item;
2886
2887 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2888 let mut this: Projection<'_, S, F> = self.project();
2889 let next: Option<::Item> = ready!(this.stream.as_mut().poll_next(cx));
2890 if let Some(x: &::Item) = &next {
2891 (this.f)(x);
2892 }
2893 Poll::Ready(next)
2894 }
2895}
2896
2897/// Future for the [`StreamExt::nth()`] method.
2898#[derive(Debug)]
2899#[must_use = "futures do nothing unless you `.await` or poll them"]
2900pub struct NthFuture<'a, S: ?Sized> {
2901 stream: &'a mut S,
2902 n: usize,
2903}
2904
2905impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
2906
2907impl<'a, S> Future for NthFuture<'a, S>
2908where
2909 S: Stream + Unpin + ?Sized,
2910{
2911 type Output = Option<S::Item>;
2912
2913 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2914 loop {
2915 match ready!(self.stream.poll_next(cx)) {
2916 Some(v: ::Item) => match self.n {
2917 0 => return Poll::Ready(Some(v)),
2918 _ => self.n -= 1,
2919 },
2920 None => return Poll::Ready(None),
2921 }
2922 }
2923 }
2924}
2925
2926pin_project! {
2927 /// Future for the [`StreamExt::last()`] method.
2928 #[derive(Debug)]
2929 #[must_use = "futures do nothing unless you `.await` or poll them"]
2930 pub struct LastFuture<S: Stream> {
2931 #[pin]
2932 stream: S,
2933 last: Option<S::Item>,
2934 }
2935}
2936
2937impl<S: Stream> Future for LastFuture<S> {
2938 type Output = Option<S::Item>;
2939
2940 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2941 let mut this: Projection<'_, S> = self.project();
2942 loop {
2943 match ready!(this.stream.as_mut().poll_next(cx)) {
2944 Some(new: ::Item) => *this.last = Some(new),
2945 None => return Poll::Ready(this.last.take()),
2946 }
2947 }
2948 }
2949}
2950
2951/// Future for the [`StreamExt::find()`] method.
2952#[derive(Debug)]
2953#[must_use = "futures do nothing unless you `.await` or poll them"]
2954pub struct FindFuture<'a, S: ?Sized, P> {
2955 stream: &'a mut S,
2956 predicate: P,
2957}
2958
2959impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
2960
2961impl<'a, S, P> Future for FindFuture<'a, S, P>
2962where
2963 S: Stream + Unpin + ?Sized,
2964 P: FnMut(&S::Item) -> bool,
2965{
2966 type Output = Option<S::Item>;
2967
2968 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2969 loop {
2970 match ready!(self.stream.poll_next(cx)) {
2971 Some(v: ::Item) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
2972 Some(_) => {}
2973 None => return Poll::Ready(None),
2974 }
2975 }
2976 }
2977}
2978
2979/// Future for the [`StreamExt::find_map()`] method.
2980#[derive(Debug)]
2981#[must_use = "futures do nothing unless you `.await` or poll them"]
2982pub struct FindMapFuture<'a, S: ?Sized, F> {
2983 stream: &'a mut S,
2984 f: F,
2985}
2986
2987impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
2988
2989impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
2990where
2991 S: Stream + Unpin + ?Sized,
2992 F: FnMut(S::Item) -> Option<B>,
2993{
2994 type Output = Option<B>;
2995
2996 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2997 loop {
2998 match ready!(self.stream.poll_next(cx)) {
2999 Some(v: ::Item) => {
3000 if let Some(v: B) = (&mut self.f)(v) {
3001 return Poll::Ready(Some(v));
3002 }
3003 }
3004 None => return Poll::Ready(None),
3005 }
3006 }
3007 }
3008}
3009
3010/// Future for the [`StreamExt::position()`] method.
3011#[derive(Debug)]
3012#[must_use = "futures do nothing unless you `.await` or poll them"]
3013pub struct PositionFuture<'a, S: ?Sized, P> {
3014 stream: &'a mut S,
3015 predicate: P,
3016 index: usize,
3017}
3018
3019impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
3020
3021impl<'a, S, P> Future for PositionFuture<'a, S, P>
3022where
3023 S: Stream + Unpin + ?Sized,
3024 P: FnMut(S::Item) -> bool,
3025{
3026 type Output = Option<usize>;
3027
3028 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3029 loop {
3030 match ready!(self.stream.poll_next(cx)) {
3031 Some(v: ::Item) => {
3032 if (&mut self.predicate)(v) {
3033 return Poll::Ready(Some(self.index));
3034 } else {
3035 self.index += 1;
3036 }
3037 }
3038 None => return Poll::Ready(None),
3039 }
3040 }
3041 }
3042}
3043
3044/// Future for the [`StreamExt::all()`] method.
3045#[derive(Debug)]
3046#[must_use = "futures do nothing unless you `.await` or poll them"]
3047pub struct AllFuture<'a, S: ?Sized, P> {
3048 stream: &'a mut S,
3049 predicate: P,
3050}
3051
3052impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
3053
3054impl<S, P> Future for AllFuture<'_, S, P>
3055where
3056 S: Stream + Unpin + ?Sized,
3057 P: FnMut(S::Item) -> bool,
3058{
3059 type Output = bool;
3060
3061 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3062 loop {
3063 match ready!(self.stream.poll_next(cx)) {
3064 Some(v: ::Item) => {
3065 if !(&mut self.predicate)(v) {
3066 return Poll::Ready(false);
3067 }
3068 }
3069 None => return Poll::Ready(true),
3070 }
3071 }
3072 }
3073}
3074
3075/// Future for the [`StreamExt::any()`] method.
3076#[derive(Debug)]
3077#[must_use = "futures do nothing unless you `.await` or poll them"]
3078pub struct AnyFuture<'a, S: ?Sized, P> {
3079 stream: &'a mut S,
3080 predicate: P,
3081}
3082
3083impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
3084
3085impl<S, P> Future for AnyFuture<'_, S, P>
3086where
3087 S: Stream + Unpin + ?Sized,
3088 P: FnMut(S::Item) -> bool,
3089{
3090 type Output = bool;
3091
3092 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3093 loop {
3094 match ready!(self.stream.poll_next(cx)) {
3095 Some(v: ::Item) => {
3096 if (&mut self.predicate)(v) {
3097 return Poll::Ready(true);
3098 }
3099 }
3100 None => return Poll::Ready(false),
3101 }
3102 }
3103 }
3104}
3105
3106pin_project! {
3107 /// Future for the [`StreamExt::for_each()`] method.
3108 #[derive(Debug)]
3109 #[must_use = "futures do nothing unless you `.await` or poll them"]
3110 pub struct ForEachFuture<S, F> {
3111 #[pin]
3112 stream: S,
3113 f: F,
3114 }
3115}
3116
3117impl<S, F> Future for ForEachFuture<S, F>
3118where
3119 S: Stream,
3120 F: FnMut(S::Item),
3121{
3122 type Output = ();
3123
3124 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3125 let mut this: Projection<'_, S, F> = self.project();
3126 loop {
3127 match ready!(this.stream.as_mut().poll_next(cx)) {
3128 Some(v: ::Item) => (this.f)(v),
3129 None => return Poll::Ready(()),
3130 }
3131 }
3132 }
3133}
3134
3135/// Future for the [`StreamExt::try_for_each()`] method.
3136#[derive(Debug)]
3137#[must_use = "futures do nothing unless you `.await` or poll them"]
3138pub struct TryForEachFuture<'a, S: ?Sized, F> {
3139 stream: &'a mut S,
3140 f: F,
3141}
3142
3143impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
3144
3145impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
3146where
3147 S: Stream + Unpin + ?Sized,
3148 F: FnMut(S::Item) -> Result<(), E>,
3149{
3150 type Output = Result<(), E>;
3151
3152 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3153 loop {
3154 match ready!(self.stream.poll_next(cx)) {
3155 None => return Poll::Ready(Ok(())),
3156 Some(v: ::Item) => (&mut self.f)(v)?,
3157 }
3158 }
3159 }
3160}
3161
3162pin_project! {
3163 /// Stream for the [`StreamExt::zip()`] method.
3164 #[derive(Clone, Debug)]
3165 #[must_use = "streams do nothing unless polled"]
3166 pub struct Zip<A: Stream, B> {
3167 item_slot: Option<A::Item>,
3168 #[pin]
3169 first: A,
3170 #[pin]
3171 second: B,
3172 }
3173}
3174
3175impl<A: Stream, B: Stream> Stream for Zip<A, B> {
3176 type Item = (A::Item, B::Item);
3177
3178 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3179 let this: Projection<'_, A, B> = self.project();
3180
3181 if this.item_slot.is_none() {
3182 match this.first.poll_next(cx) {
3183 Poll::Pending => return Poll::Pending,
3184 Poll::Ready(None) => return Poll::Ready(None),
3185 Poll::Ready(Some(item: ::Item)) => *this.item_slot = Some(item),
3186 }
3187 }
3188
3189 let second_item: Option<::Item> = ready!(this.second.poll_next(cx));
3190 let first_item: ::Item = this.item_slot.take().unwrap();
3191 Poll::Ready(second_item.map(|second_item: ::Item| (first_item, second_item)))
3192 }
3193}
3194
3195pin_project! {
3196 /// Future for the [`StreamExt::unzip()`] method.
3197 #[derive(Debug)]
3198 #[must_use = "futures do nothing unless you `.await` or poll them"]
3199 pub struct UnzipFuture<S, FromA, FromB> {
3200 #[pin]
3201 stream: S,
3202 res: Option<(FromA, FromB)>,
3203 }
3204}
3205
3206impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3207where
3208 S: Stream<Item = (A, B)>,
3209 FromA: Default + Extend<A>,
3210 FromB: Default + Extend<B>,
3211{
3212 type Output = (FromA, FromB);
3213
3214 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3215 let mut this: Projection<'_, S, FromA, …> = self.project();
3216
3217 loop {
3218 match ready!(this.stream.as_mut().poll_next(cx)) {
3219 Some((a: A, b: B)) => {
3220 let res: &mut (FromA, FromB) = this.res.as_mut().unwrap();
3221 res.0.extend(iter:Some(a));
3222 res.1.extend(iter:Some(b));
3223 }
3224 None => return Poll::Ready(this.res.take().unwrap()),
3225 }
3226 }
3227 }
3228}
3229
3230/// Stream for the [`StreamExt::drain()`] method.
3231#[derive(Debug)]
3232#[must_use = "streams do nothing unless polled"]
3233pub struct Drain<'a, S: ?Sized> {
3234 stream: &'a mut S,
3235}
3236
3237impl<'a, S: Unpin + ?Sized> Unpin for Drain<'a, S> {}
3238
3239impl<'a, S: Unpin + ?Sized> Drain<'a, S> {
3240 /// Get a reference to the underlying stream.
3241 ///
3242 /// ## Examples
3243 ///
3244 /// ```
3245 /// use futures_lite::{prelude::*, stream};
3246 ///
3247 /// # spin_on::spin_on(async {
3248 /// let mut s = stream::iter(vec![1, 2, 3]);
3249 /// let s2 = s.drain();
3250 ///
3251 /// let inner = s2.get_ref();
3252 /// // s and inner are the same.
3253 /// # });
3254 /// ```
3255 pub fn get_ref(&self) -> &S {
3256 &self.stream
3257 }
3258
3259 /// Get a mutable reference to the underlying stream.
3260 ///
3261 /// ## Examples
3262 ///
3263 /// ```
3264 /// use futures_lite::{prelude::*, stream};
3265 ///
3266 /// # spin_on::spin_on(async {
3267 /// let mut s = stream::iter(vec![1, 2, 3]);
3268 /// let mut s2 = s.drain();
3269 ///
3270 /// let inner = s2.get_mut();
3271 /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3272 /// # });
3273 /// ```
3274 pub fn get_mut(&mut self) -> &mut S {
3275 &mut self.stream
3276 }
3277
3278 /// Consume this stream and get the underlying stream.
3279 ///
3280 /// ## Examples
3281 ///
3282 /// ```
3283 /// use futures_lite::{prelude::*, stream};
3284 ///
3285 /// # spin_on::spin_on(async {
3286 /// let mut s = stream::iter(vec![1, 2, 3]);
3287 /// let mut s2 = s.drain();
3288 ///
3289 /// let inner = s2.into_inner();
3290 /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3291 /// # });
3292 /// ```
3293 pub fn into_inner(self) -> &'a mut S {
3294 self.stream
3295 }
3296}
3297
3298impl<'a, S: Stream + Unpin + ?Sized> Stream for Drain<'a, S> {
3299 type Item = S::Item;
3300
3301 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3302 match self.stream.poll_next(cx) {
3303 Poll::Ready(x: Option<::Item>) => Poll::Ready(x),
3304 Poll::Pending => Poll::Ready(None),
3305 }
3306 }
3307
3308 fn size_hint(&self) -> (usize, Option<usize>) {
3309 let (_, hi: Option) = self.stream.size_hint();
3310 (0, hi)
3311 }
3312}
3313