| 1 | use core::pin::Pin; |
| 2 | use std::convert::Infallible; |
| 3 | |
| 4 | use futures::{ |
| 5 | stream::{self, repeat, Repeat, StreamExt, TryStreamExt}, |
| 6 | task::Poll, |
| 7 | Stream, |
| 8 | }; |
| 9 | use futures_executor::block_on; |
| 10 | use futures_task::Context; |
| 11 | use futures_test::task::noop_context; |
| 12 | |
| 13 | #[test] |
| 14 | fn try_filter_map_after_err() { |
| 15 | let cx = &mut noop_context(); |
| 16 | let mut s = stream::iter(1..=3) |
| 17 | .map(Ok) |
| 18 | .try_filter_map(|v| async move { Err::<Option<()>, _>(v) }) |
| 19 | .filter_map(|r| async move { r.ok() }) |
| 20 | .boxed(); |
| 21 | assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); |
| 22 | } |
| 23 | |
| 24 | #[test] |
| 25 | fn try_skip_while_after_err() { |
| 26 | let cx = &mut noop_context(); |
| 27 | let mut s = stream::iter(1..=3) |
| 28 | .map(Ok) |
| 29 | .try_skip_while(|_| async move { Err::<_, ()>(()) }) |
| 30 | .filter_map(|r| async move { r.ok() }) |
| 31 | .boxed(); |
| 32 | assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); |
| 33 | } |
| 34 | |
| 35 | #[test] |
| 36 | fn try_take_while_after_err() { |
| 37 | let cx = &mut noop_context(); |
| 38 | let mut s = stream::iter(1..=3) |
| 39 | .map(Ok) |
| 40 | .try_take_while(|_| async move { Err::<_, ()>(()) }) |
| 41 | .filter_map(|r| async move { r.ok() }) |
| 42 | .boxed(); |
| 43 | assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); |
| 44 | } |
| 45 | |
| 46 | #[test] |
| 47 | fn try_flatten_unordered() { |
| 48 | let test_st = stream::iter(1..7) |
| 49 | .map(|val: u32| { |
| 50 | if val % 2 == 0 { |
| 51 | Ok(stream::unfold((val, 1), |(val, pow)| async move { |
| 52 | Some((val.pow(pow), (val, pow + 1))) |
| 53 | }) |
| 54 | .take(3) |
| 55 | .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) })) |
| 56 | } else { |
| 57 | Err(val) |
| 58 | } |
| 59 | }) |
| 60 | .map_ok(Box::pin) |
| 61 | .try_flatten_unordered(None); |
| 62 | |
| 63 | block_on(async move { |
| 64 | assert_eq!( |
| 65 | // All numbers can be divided by 16 and odds must be `Err` |
| 66 | // For all basic evens we must have powers from 1 to 3 |
| 67 | vec![ |
| 68 | Err(1), |
| 69 | Err(3), |
| 70 | Err(5), |
| 71 | Ok(2), |
| 72 | Ok(4), |
| 73 | Ok(6), |
| 74 | Ok(4), |
| 75 | Err(16), |
| 76 | Ok(36), |
| 77 | Ok(8), |
| 78 | Err(64), |
| 79 | Ok(216) |
| 80 | ], |
| 81 | test_st.collect::<Vec<_>>().await |
| 82 | ) |
| 83 | }); |
| 84 | |
| 85 | #[derive(Clone, Debug)] |
| 86 | struct ErrorStream { |
| 87 | error_after: usize, |
| 88 | polled: usize, |
| 89 | } |
| 90 | |
| 91 | impl Stream for ErrorStream { |
| 92 | type Item = Result<Repeat<Result<(), ()>>, ()>; |
| 93 | |
| 94 | fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> { |
| 95 | if self.polled > self.error_after { |
| 96 | panic!("Polled after error" ); |
| 97 | } else { |
| 98 | let out = |
| 99 | if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) }; |
| 100 | self.polled += 1; |
| 101 | Poll::Ready(Some(out)) |
| 102 | } |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | block_on(async move { |
| 107 | let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None); |
| 108 | let mut ctr = 0; |
| 109 | while (st.try_next().await).is_ok() { |
| 110 | ctr += 1; |
| 111 | } |
| 112 | assert_eq!(ctr, 0); |
| 113 | |
| 114 | assert_eq!( |
| 115 | ErrorStream { error_after: 10, polled: 0 } |
| 116 | .try_flatten_unordered(None) |
| 117 | .inspect_ok(|_| panic!("Unexpected `Ok`" )) |
| 118 | .try_collect::<Vec<_>>() |
| 119 | .await, |
| 120 | Err(()) |
| 121 | ); |
| 122 | |
| 123 | let mut taken = 0; |
| 124 | assert_eq!( |
| 125 | ErrorStream { error_after: 10, polled: 0 } |
| 126 | .map_ok(|st| st.take(3)) |
| 127 | .try_flatten_unordered(1) |
| 128 | .inspect(|_| taken += 1) |
| 129 | .try_fold((), |(), res| async move { Ok(res) }) |
| 130 | .await, |
| 131 | Err(()) |
| 132 | ); |
| 133 | assert_eq!(taken, 31); |
| 134 | }) |
| 135 | } |
| 136 | |
| 137 | async fn is_even(number: u8) -> bool { |
| 138 | number % 2 == 0 |
| 139 | } |
| 140 | |
| 141 | #[test] |
| 142 | fn try_all() { |
| 143 | block_on(async { |
| 144 | let empty: [Result<u8, Infallible>; 0] = []; |
| 145 | let st = stream::iter(empty); |
| 146 | let all = st.try_all(is_even).await; |
| 147 | assert_eq!(Ok(true), all); |
| 148 | |
| 149 | let st = stream::iter([Ok::<_, Infallible>(2), Ok(4), Ok(6), Ok(8)]); |
| 150 | let all = st.try_all(is_even).await; |
| 151 | assert_eq!(Ok(true), all); |
| 152 | |
| 153 | let st = stream::iter([Ok::<_, Infallible>(2), Ok(3), Ok(4)]); |
| 154 | let all = st.try_all(is_even).await; |
| 155 | assert_eq!(Ok(false), all); |
| 156 | |
| 157 | let st = stream::iter([Ok(2), Ok(4), Err("err" ), Ok(8)]); |
| 158 | let all = st.try_all(is_even).await; |
| 159 | assert_eq!(Err("err" ), all); |
| 160 | }); |
| 161 | } |
| 162 | |
| 163 | #[test] |
| 164 | fn try_any() { |
| 165 | block_on(async { |
| 166 | let empty: [Result<u8, Infallible>; 0] = []; |
| 167 | let st = stream::iter(empty); |
| 168 | let any = st.try_any(is_even).await; |
| 169 | assert_eq!(Ok(false), any); |
| 170 | |
| 171 | let st = stream::iter([Ok::<_, Infallible>(1), Ok(2), Ok(3)]); |
| 172 | let any = st.try_any(is_even).await; |
| 173 | assert_eq!(Ok(true), any); |
| 174 | |
| 175 | let st = stream::iter([Ok::<_, Infallible>(1), Ok(3), Ok(5)]); |
| 176 | let any = st.try_any(is_even).await; |
| 177 | assert_eq!(Ok(false), any); |
| 178 | |
| 179 | let st = stream::iter([Ok(1), Ok(3), Err("err" ), Ok(8)]); |
| 180 | let any = st.try_any(is_even).await; |
| 181 | assert_eq!(Err("err" ), any); |
| 182 | }); |
| 183 | } |
| 184 | |