| 1 | use super::assert_stream; |
| 2 | use crate::stream::{select_with_strategy, PollNext, SelectWithStrategy}; |
| 3 | use core::pin::Pin; |
| 4 | use futures_core::stream::{FusedStream, Stream}; |
| 5 | use futures_core::task::{Context, Poll}; |
| 6 | use pin_project_lite::pin_project; |
| 7 | |
| 8 | pin_project! { |
| 9 | /// Stream for the [`select()`] function. |
| 10 | #[derive (Debug)] |
| 11 | #[must_use = "streams do nothing unless polled" ] |
| 12 | pub struct Select<St1, St2> { |
| 13 | #[pin] |
| 14 | inner: SelectWithStrategy<St1, St2, fn(&mut PollNext)-> PollNext, PollNext>, |
| 15 | } |
| 16 | } |
| 17 | |
| 18 | /// This function will attempt to pull items from both streams. Each |
| 19 | /// stream will be polled in a round-robin fashion, and whenever a stream is |
| 20 | /// ready to yield an item that item is yielded. |
| 21 | /// |
| 22 | /// After one of the two input streams completes, the remaining one will be |
| 23 | /// polled exclusively. The returned stream completes when both input |
| 24 | /// streams have completed. |
| 25 | /// |
| 26 | /// Note that this function consumes both streams and returns a wrapped |
| 27 | /// version of them. |
| 28 | /// |
| 29 | /// ## Examples |
| 30 | /// |
| 31 | /// ```rust |
| 32 | /// # futures::executor::block_on(async { |
| 33 | /// use futures::stream::{ repeat, select, StreamExt }; |
| 34 | /// |
| 35 | /// let left = repeat(1); |
| 36 | /// let right = repeat(2); |
| 37 | /// |
| 38 | /// let mut out = select(left, right); |
| 39 | /// |
| 40 | /// for _ in 0..100 { |
| 41 | /// // We should be alternating. |
| 42 | /// assert_eq!(1, out.select_next_some().await); |
| 43 | /// assert_eq!(2, out.select_next_some().await); |
| 44 | /// } |
| 45 | /// # }); |
| 46 | /// ``` |
| 47 | pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2> |
| 48 | where |
| 49 | St1: Stream, |
| 50 | St2: Stream<Item = St1::Item>, |
| 51 | { |
| 52 | fn round_robin(last: &mut PollNext) -> PollNext { |
| 53 | last.toggle() |
| 54 | } |
| 55 | |
| 56 | assert_stream::<St1::Item, _>(Select { |
| 57 | inner: select_with_strategy(stream1, stream2, which:round_robin), |
| 58 | }) |
| 59 | } |
| 60 | |
| 61 | impl<St1, St2> Select<St1, St2> { |
| 62 | /// Acquires a reference to the underlying streams that this combinator is |
| 63 | /// pulling from. |
| 64 | pub fn get_ref(&self) -> (&St1, &St2) { |
| 65 | self.inner.get_ref() |
| 66 | } |
| 67 | |
| 68 | /// Acquires a mutable reference to the underlying streams that this |
| 69 | /// combinator is pulling from. |
| 70 | /// |
| 71 | /// Note that care must be taken to avoid tampering with the state of the |
| 72 | /// stream which may otherwise confuse this combinator. |
| 73 | pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { |
| 74 | self.inner.get_mut() |
| 75 | } |
| 76 | |
| 77 | /// Acquires a pinned mutable reference to the underlying streams that this |
| 78 | /// combinator is pulling from. |
| 79 | /// |
| 80 | /// Note that care must be taken to avoid tampering with the state of the |
| 81 | /// stream which may otherwise confuse this combinator. |
| 82 | pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { |
| 83 | let this = self.project(); |
| 84 | this.inner.get_pin_mut() |
| 85 | } |
| 86 | |
| 87 | /// Consumes this combinator, returning the underlying streams. |
| 88 | /// |
| 89 | /// Note that this may discard intermediate state of this combinator, so |
| 90 | /// care should be taken to avoid losing resources when this is called. |
| 91 | pub fn into_inner(self) -> (St1, St2) { |
| 92 | self.inner.into_inner() |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | impl<St1, St2> FusedStream for Select<St1, St2> |
| 97 | where |
| 98 | St1: Stream, |
| 99 | St2: Stream<Item = St1::Item>, |
| 100 | { |
| 101 | fn is_terminated(&self) -> bool { |
| 102 | self.inner.is_terminated() |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | impl<St1, St2> Stream for Select<St1, St2> |
| 107 | where |
| 108 | St1: Stream, |
| 109 | St2: Stream<Item = St1::Item>, |
| 110 | { |
| 111 | type Item = St1::Item; |
| 112 | |
| 113 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> { |
| 114 | let this: Projection<'_, St1, St2> = self.project(); |
| 115 | this.inner.poll_next(cx) |
| 116 | } |
| 117 | } |
| 118 | |