1use core::pin::Pin;
2use std::convert::Infallible;
3
4use futures::{
5 stream::{self, repeat, Repeat, StreamExt, TryStreamExt},
6 task::Poll,
7 Stream,
8};
9use futures_executor::block_on;
10use futures_task::Context;
11use futures_test::task::noop_context;
12
13#[test]
14fn try_filter_map_after_err() {
15 let cx = &mut noop_context();
16 let mut s = stream::iter(1..=3)
17 .map(Ok)
18 .try_filter_map(|v| async move { Err::<Option<()>, _>(v) })
19 .filter_map(|r| async move { r.ok() })
20 .boxed();
21 assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
22}
23
24#[test]
25fn try_skip_while_after_err() {
26 let cx = &mut noop_context();
27 let mut s = stream::iter(1..=3)
28 .map(Ok)
29 .try_skip_while(|_| async move { Err::<_, ()>(()) })
30 .filter_map(|r| async move { r.ok() })
31 .boxed();
32 assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
33}
34
35#[test]
36fn try_take_while_after_err() {
37 let cx = &mut noop_context();
38 let mut s = stream::iter(1..=3)
39 .map(Ok)
40 .try_take_while(|_| async move { Err::<_, ()>(()) })
41 .filter_map(|r| async move { r.ok() })
42 .boxed();
43 assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
44}
45
46#[test]
47fn try_flatten_unordered() {
48 let test_st = stream::iter(1..7)
49 .map(|val: u32| {
50 if val % 2 == 0 {
51 Ok(stream::unfold((val, 1), |(val, pow)| async move {
52 Some((val.pow(pow), (val, pow + 1)))
53 })
54 .take(3)
55 .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) }))
56 } else {
57 Err(val)
58 }
59 })
60 .map_ok(Box::pin)
61 .try_flatten_unordered(None);
62
63 block_on(async move {
64 assert_eq!(
65 // All numbers can be divided by 16 and odds must be `Err`
66 // For all basic evens we must have powers from 1 to 3
67 vec![
68 Err(1),
69 Err(3),
70 Err(5),
71 Ok(2),
72 Ok(4),
73 Ok(6),
74 Ok(4),
75 Err(16),
76 Ok(36),
77 Ok(8),
78 Err(64),
79 Ok(216)
80 ],
81 test_st.collect::<Vec<_>>().await
82 )
83 });
84
85 #[derive(Clone, Debug)]
86 struct ErrorStream {
87 error_after: usize,
88 polled: usize,
89 }
90
91 impl Stream for ErrorStream {
92 type Item = Result<Repeat<Result<(), ()>>, ()>;
93
94 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
95 if self.polled > self.error_after {
96 panic!("Polled after error");
97 } else {
98 let out =
99 if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) };
100 self.polled += 1;
101 Poll::Ready(Some(out))
102 }
103 }
104 }
105
106 block_on(async move {
107 let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None);
108 let mut ctr = 0;
109 while (st.try_next().await).is_ok() {
110 ctr += 1;
111 }
112 assert_eq!(ctr, 0);
113
114 assert_eq!(
115 ErrorStream { error_after: 10, polled: 0 }
116 .try_flatten_unordered(None)
117 .inspect_ok(|_| panic!("Unexpected `Ok`"))
118 .try_collect::<Vec<_>>()
119 .await,
120 Err(())
121 );
122
123 let mut taken = 0;
124 assert_eq!(
125 ErrorStream { error_after: 10, polled: 0 }
126 .map_ok(|st| st.take(3))
127 .try_flatten_unordered(1)
128 .inspect(|_| taken += 1)
129 .try_fold((), |(), res| async move { Ok(res) })
130 .await,
131 Err(())
132 );
133 assert_eq!(taken, 31);
134 })
135}
136
137async fn is_even(number: u8) -> bool {
138 number % 2 == 0
139}
140
141#[test]
142fn try_all() {
143 block_on(async {
144 let empty: [Result<u8, Infallible>; 0] = [];
145 let st = stream::iter(empty);
146 let all = st.try_all(is_even).await;
147 assert_eq!(Ok(true), all);
148
149 let st = stream::iter([Ok::<_, Infallible>(2), Ok(4), Ok(6), Ok(8)]);
150 let all = st.try_all(is_even).await;
151 assert_eq!(Ok(true), all);
152
153 let st = stream::iter([Ok::<_, Infallible>(2), Ok(3), Ok(4)]);
154 let all = st.try_all(is_even).await;
155 assert_eq!(Ok(false), all);
156
157 let st = stream::iter([Ok(2), Ok(4), Err("err"), Ok(8)]);
158 let all = st.try_all(is_even).await;
159 assert_eq!(Err("err"), all);
160 });
161}
162
163#[test]
164fn try_any() {
165 block_on(async {
166 let empty: [Result<u8, Infallible>; 0] = [];
167 let st = stream::iter(empty);
168 let any = st.try_any(is_even).await;
169 assert_eq!(Ok(false), any);
170
171 let st = stream::iter([Ok::<_, Infallible>(1), Ok(2), Ok(3)]);
172 let any = st.try_any(is_even).await;
173 assert_eq!(Ok(true), any);
174
175 let st = stream::iter([Ok::<_, Infallible>(1), Ok(3), Ok(5)]);
176 let any = st.try_any(is_even).await;
177 assert_eq!(Ok(false), any);
178
179 let st = stream::iter([Ok(1), Ok(3), Err("err"), Ok(8)]);
180 let any = st.try_any(is_even).await;
181 assert_eq!(Err("err"), any);
182 });
183}
184