1 | use core::pin::Pin; |
2 | use std::convert::Infallible; |
3 | |
4 | use futures::{ |
5 | stream::{self, repeat, Repeat, StreamExt, TryStreamExt}, |
6 | task::Poll, |
7 | Stream, |
8 | }; |
9 | use futures_executor::block_on; |
10 | use futures_task::Context; |
11 | use futures_test::task::noop_context; |
12 | |
13 | #[test] |
14 | fn try_filter_map_after_err() { |
15 | let cx = &mut noop_context(); |
16 | let mut s = stream::iter(1..=3) |
17 | .map(Ok) |
18 | .try_filter_map(|v| async move { Err::<Option<()>, _>(v) }) |
19 | .filter_map(|r| async move { r.ok() }) |
20 | .boxed(); |
21 | assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); |
22 | } |
23 | |
24 | #[test] |
25 | fn try_skip_while_after_err() { |
26 | let cx = &mut noop_context(); |
27 | let mut s = stream::iter(1..=3) |
28 | .map(Ok) |
29 | .try_skip_while(|_| async move { Err::<_, ()>(()) }) |
30 | .filter_map(|r| async move { r.ok() }) |
31 | .boxed(); |
32 | assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); |
33 | } |
34 | |
35 | #[test] |
36 | fn try_take_while_after_err() { |
37 | let cx = &mut noop_context(); |
38 | let mut s = stream::iter(1..=3) |
39 | .map(Ok) |
40 | .try_take_while(|_| async move { Err::<_, ()>(()) }) |
41 | .filter_map(|r| async move { r.ok() }) |
42 | .boxed(); |
43 | assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); |
44 | } |
45 | |
46 | #[test] |
47 | fn try_flatten_unordered() { |
48 | let test_st = stream::iter(1..7) |
49 | .map(|val: u32| { |
50 | if val % 2 == 0 { |
51 | Ok(stream::unfold((val, 1), |(val, pow)| async move { |
52 | Some((val.pow(pow), (val, pow + 1))) |
53 | }) |
54 | .take(3) |
55 | .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) })) |
56 | } else { |
57 | Err(val) |
58 | } |
59 | }) |
60 | .map_ok(Box::pin) |
61 | .try_flatten_unordered(None); |
62 | |
63 | block_on(async move { |
64 | assert_eq!( |
65 | // All numbers can be divided by 16 and odds must be `Err` |
66 | // For all basic evens we must have powers from 1 to 3 |
67 | vec![ |
68 | Err(1), |
69 | Err(3), |
70 | Err(5), |
71 | Ok(2), |
72 | Ok(4), |
73 | Ok(6), |
74 | Ok(4), |
75 | Err(16), |
76 | Ok(36), |
77 | Ok(8), |
78 | Err(64), |
79 | Ok(216) |
80 | ], |
81 | test_st.collect::<Vec<_>>().await |
82 | ) |
83 | }); |
84 | |
85 | #[derive(Clone, Debug)] |
86 | struct ErrorStream { |
87 | error_after: usize, |
88 | polled: usize, |
89 | } |
90 | |
91 | impl Stream for ErrorStream { |
92 | type Item = Result<Repeat<Result<(), ()>>, ()>; |
93 | |
94 | fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> { |
95 | if self.polled > self.error_after { |
96 | panic!("Polled after error" ); |
97 | } else { |
98 | let out = |
99 | if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) }; |
100 | self.polled += 1; |
101 | Poll::Ready(Some(out)) |
102 | } |
103 | } |
104 | } |
105 | |
106 | block_on(async move { |
107 | let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None); |
108 | let mut ctr = 0; |
109 | while (st.try_next().await).is_ok() { |
110 | ctr += 1; |
111 | } |
112 | assert_eq!(ctr, 0); |
113 | |
114 | assert_eq!( |
115 | ErrorStream { error_after: 10, polled: 0 } |
116 | .try_flatten_unordered(None) |
117 | .inspect_ok(|_| panic!("Unexpected `Ok`" )) |
118 | .try_collect::<Vec<_>>() |
119 | .await, |
120 | Err(()) |
121 | ); |
122 | |
123 | let mut taken = 0; |
124 | assert_eq!( |
125 | ErrorStream { error_after: 10, polled: 0 } |
126 | .map_ok(|st| st.take(3)) |
127 | .try_flatten_unordered(1) |
128 | .inspect(|_| taken += 1) |
129 | .try_fold((), |(), res| async move { Ok(res) }) |
130 | .await, |
131 | Err(()) |
132 | ); |
133 | assert_eq!(taken, 31); |
134 | }) |
135 | } |
136 | |
137 | async fn is_even(number: u8) -> bool { |
138 | number % 2 == 0 |
139 | } |
140 | |
141 | #[test] |
142 | fn try_all() { |
143 | block_on(async { |
144 | let empty: [Result<u8, Infallible>; 0] = []; |
145 | let st = stream::iter(empty); |
146 | let all = st.try_all(is_even).await; |
147 | assert_eq!(Ok(true), all); |
148 | |
149 | let st = stream::iter([Ok::<_, Infallible>(2), Ok(4), Ok(6), Ok(8)]); |
150 | let all = st.try_all(is_even).await; |
151 | assert_eq!(Ok(true), all); |
152 | |
153 | let st = stream::iter([Ok::<_, Infallible>(2), Ok(3), Ok(4)]); |
154 | let all = st.try_all(is_even).await; |
155 | assert_eq!(Ok(false), all); |
156 | |
157 | let st = stream::iter([Ok(2), Ok(4), Err("err" ), Ok(8)]); |
158 | let all = st.try_all(is_even).await; |
159 | assert_eq!(Err("err" ), all); |
160 | }); |
161 | } |
162 | |
163 | #[test] |
164 | fn try_any() { |
165 | block_on(async { |
166 | let empty: [Result<u8, Infallible>; 0] = []; |
167 | let st = stream::iter(empty); |
168 | let any = st.try_any(is_even).await; |
169 | assert_eq!(Ok(false), any); |
170 | |
171 | let st = stream::iter([Ok::<_, Infallible>(1), Ok(2), Ok(3)]); |
172 | let any = st.try_any(is_even).await; |
173 | assert_eq!(Ok(true), any); |
174 | |
175 | let st = stream::iter([Ok::<_, Infallible>(1), Ok(3), Ok(5)]); |
176 | let any = st.try_any(is_even).await; |
177 | assert_eq!(Ok(false), any); |
178 | |
179 | let st = stream::iter([Ok(1), Ok(3), Err("err" ), Ok(8)]); |
180 | let any = st.try_any(is_even).await; |
181 | assert_eq!(Err("err" ), any); |
182 | }); |
183 | } |
184 | |