| 1 | use futures::channel::{mpsc, oneshot}; |
| 2 | use futures::executor::block_on; |
| 3 | use futures::future::{self, poll_fn, FutureExt}; |
| 4 | use futures::sink::SinkExt; |
| 5 | use futures::stream::StreamExt; |
| 6 | use futures::task::{Context, Poll}; |
| 7 | use futures::{ |
| 8 | join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join, |
| 9 | }; |
| 10 | use std::mem; |
| 11 | |
| 12 | #[test] |
| 13 | fn poll_and_pending() { |
| 14 | let pending_once = async { pending!() }; |
| 15 | block_on(async { |
| 16 | pin_mut!(pending_once); |
| 17 | assert_eq!(Poll::Pending, poll!(&mut pending_once)); |
| 18 | assert_eq!(Poll::Ready(()), poll!(&mut pending_once)); |
| 19 | }); |
| 20 | } |
| 21 | |
| 22 | #[test] |
| 23 | fn join() { |
| 24 | let (tx1, rx1) = oneshot::channel::<i32>(); |
| 25 | let (tx2, rx2) = oneshot::channel::<i32>(); |
| 26 | |
| 27 | let fut = async { |
| 28 | let res = join!(rx1, rx2); |
| 29 | assert_eq!((Ok(1), Ok(2)), res); |
| 30 | }; |
| 31 | |
| 32 | block_on(async { |
| 33 | pin_mut!(fut); |
| 34 | assert_eq!(Poll::Pending, poll!(&mut fut)); |
| 35 | tx1.send(1).unwrap(); |
| 36 | assert_eq!(Poll::Pending, poll!(&mut fut)); |
| 37 | tx2.send(2).unwrap(); |
| 38 | assert_eq!(Poll::Ready(()), poll!(&mut fut)); |
| 39 | }); |
| 40 | } |
| 41 | |
| 42 | #[test] |
| 43 | fn select() { |
| 44 | let (tx1, rx1) = oneshot::channel::<i32>(); |
| 45 | let (_tx2, rx2) = oneshot::channel::<i32>(); |
| 46 | tx1.send(1).unwrap(); |
| 47 | let mut ran = false; |
| 48 | block_on(async { |
| 49 | select! { |
| 50 | res = rx1.fuse() => { |
| 51 | assert_eq!(Ok(1), res); |
| 52 | ran = true; |
| 53 | }, |
| 54 | _ = rx2.fuse() => unreachable!(), |
| 55 | } |
| 56 | }); |
| 57 | assert!(ran); |
| 58 | } |
| 59 | |
| 60 | #[test] |
| 61 | fn select_biased() { |
| 62 | let (tx1, rx1) = oneshot::channel::<i32>(); |
| 63 | let (_tx2, rx2) = oneshot::channel::<i32>(); |
| 64 | tx1.send(1).unwrap(); |
| 65 | let mut ran = false; |
| 66 | block_on(async { |
| 67 | select_biased! { |
| 68 | res = rx1.fuse() => { |
| 69 | assert_eq!(Ok(1), res); |
| 70 | ran = true; |
| 71 | }, |
| 72 | _ = rx2.fuse() => unreachable!(), |
| 73 | } |
| 74 | }); |
| 75 | assert!(ran); |
| 76 | } |
| 77 | |
| 78 | #[test] |
| 79 | fn select_streams() { |
| 80 | let (mut tx1, rx1) = mpsc::channel::<i32>(1); |
| 81 | let (mut tx2, rx2) = mpsc::channel::<i32>(1); |
| 82 | let mut rx1 = rx1.fuse(); |
| 83 | let mut rx2 = rx2.fuse(); |
| 84 | let mut ran = false; |
| 85 | let mut total = 0; |
| 86 | block_on(async { |
| 87 | let mut tx1_opt; |
| 88 | let mut tx2_opt; |
| 89 | select! { |
| 90 | _ = rx1.next() => panic!(), |
| 91 | _ = rx2.next() => panic!(), |
| 92 | default => { |
| 93 | tx1.send(2).await.unwrap(); |
| 94 | tx2.send(3).await.unwrap(); |
| 95 | tx1_opt = Some(tx1); |
| 96 | tx2_opt = Some(tx2); |
| 97 | } |
| 98 | complete => panic!(), |
| 99 | } |
| 100 | loop { |
| 101 | select! { |
| 102 | // runs first and again after default |
| 103 | x = rx1.next() => if let Some(x) = x { total += x; }, |
| 104 | // runs second and again after default |
| 105 | x = rx2.next() => if let Some(x) = x { total += x; }, |
| 106 | // runs third |
| 107 | default => { |
| 108 | assert_eq!(total, 5); |
| 109 | ran = true; |
| 110 | drop(tx1_opt.take().unwrap()); |
| 111 | drop(tx2_opt.take().unwrap()); |
| 112 | }, |
| 113 | // runs last |
| 114 | complete => break, |
| 115 | }; |
| 116 | } |
| 117 | }); |
| 118 | assert!(ran); |
| 119 | } |
| 120 | |
| 121 | #[test] |
| 122 | fn select_can_move_uncompleted_futures() { |
| 123 | let (tx1, rx1) = oneshot::channel::<i32>(); |
| 124 | let (tx2, rx2) = oneshot::channel::<i32>(); |
| 125 | tx1.send(1).unwrap(); |
| 126 | tx2.send(2).unwrap(); |
| 127 | let mut ran = false; |
| 128 | let mut rx1 = rx1.fuse(); |
| 129 | let mut rx2 = rx2.fuse(); |
| 130 | block_on(async { |
| 131 | select! { |
| 132 | res = rx1 => { |
| 133 | assert_eq!(Ok(1), res); |
| 134 | assert_eq!(Ok(2), rx2.await); |
| 135 | ran = true; |
| 136 | }, |
| 137 | res = rx2 => { |
| 138 | assert_eq!(Ok(2), res); |
| 139 | assert_eq!(Ok(1), rx1.await); |
| 140 | ran = true; |
| 141 | }, |
| 142 | } |
| 143 | }); |
| 144 | assert!(ran); |
| 145 | } |
| 146 | |
| 147 | #[test] |
| 148 | fn select_nested() { |
| 149 | let mut outer_fut = future::ready(1); |
| 150 | let mut inner_fut = future::ready(2); |
| 151 | let res = block_on(async { |
| 152 | select! { |
| 153 | x = outer_fut => { |
| 154 | select! { |
| 155 | y = inner_fut => x + y, |
| 156 | } |
| 157 | } |
| 158 | } |
| 159 | }); |
| 160 | assert_eq!(res, 3); |
| 161 | } |
| 162 | |
| 163 | #[cfg_attr (not(target_pointer_width = "64" ), ignore)] |
| 164 | #[test] |
| 165 | fn select_size() { |
| 166 | let fut = async { |
| 167 | let mut ready = future::ready(0i32); |
| 168 | select! { |
| 169 | _ = ready => {}, |
| 170 | } |
| 171 | }; |
| 172 | assert_eq!(mem::size_of_val(&fut), 24); |
| 173 | |
| 174 | let fut = async { |
| 175 | let mut ready1 = future::ready(0i32); |
| 176 | let mut ready2 = future::ready(0i32); |
| 177 | select! { |
| 178 | _ = ready1 => {}, |
| 179 | _ = ready2 => {}, |
| 180 | } |
| 181 | }; |
| 182 | assert_eq!(mem::size_of_val(&fut), 40); |
| 183 | } |
| 184 | |
| 185 | #[test] |
| 186 | fn select_on_non_unpin_expressions() { |
| 187 | // The returned Future is !Unpin |
| 188 | let make_non_unpin_fut = || async { 5 }; |
| 189 | |
| 190 | let res = block_on(async { |
| 191 | let select_res; |
| 192 | select! { |
| 193 | value_1 = make_non_unpin_fut().fuse() => select_res = value_1, |
| 194 | value_2 = make_non_unpin_fut().fuse() => select_res = value_2, |
| 195 | }; |
| 196 | select_res |
| 197 | }); |
| 198 | assert_eq!(res, 5); |
| 199 | } |
| 200 | |
| 201 | #[test] |
| 202 | fn select_on_non_unpin_expressions_with_default() { |
| 203 | // The returned Future is !Unpin |
| 204 | let make_non_unpin_fut = || async { 5 }; |
| 205 | |
| 206 | let res = block_on(async { |
| 207 | let select_res; |
| 208 | select! { |
| 209 | value_1 = make_non_unpin_fut().fuse() => select_res = value_1, |
| 210 | value_2 = make_non_unpin_fut().fuse() => select_res = value_2, |
| 211 | default => select_res = 7, |
| 212 | }; |
| 213 | select_res |
| 214 | }); |
| 215 | assert_eq!(res, 5); |
| 216 | } |
| 217 | |
| 218 | #[cfg_attr (not(target_pointer_width = "64" ), ignore)] |
| 219 | #[test] |
| 220 | fn select_on_non_unpin_size() { |
| 221 | // The returned Future is !Unpin |
| 222 | let make_non_unpin_fut = || async { 5 }; |
| 223 | |
| 224 | let fut = async { |
| 225 | let select_res; |
| 226 | select! { |
| 227 | value_1 = make_non_unpin_fut().fuse() => select_res = value_1, |
| 228 | value_2 = make_non_unpin_fut().fuse() => select_res = value_2, |
| 229 | }; |
| 230 | select_res |
| 231 | }; |
| 232 | |
| 233 | assert_eq!(32, mem::size_of_val(&fut)); |
| 234 | } |
| 235 | |
| 236 | #[test] |
| 237 | fn select_can_be_used_as_expression() { |
| 238 | block_on(async { |
| 239 | let res = select! { |
| 240 | x = future::ready(7) => x, |
| 241 | y = future::ready(3) => y + 1, |
| 242 | }; |
| 243 | assert!(res == 7 || res == 4); |
| 244 | }); |
| 245 | } |
| 246 | |
| 247 | #[test] |
| 248 | fn select_with_default_can_be_used_as_expression() { |
| 249 | fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> { |
| 250 | Poll::Pending |
| 251 | } |
| 252 | |
| 253 | block_on(async { |
| 254 | let res = select! { |
| 255 | x = poll_fn(poll_always_pending::<i32>).fuse() => x, |
| 256 | y = poll_fn(poll_always_pending::<i32>).fuse() => y + 1, |
| 257 | default => 99, |
| 258 | }; |
| 259 | assert_eq!(res, 99); |
| 260 | }); |
| 261 | } |
| 262 | |
| 263 | #[test] |
| 264 | fn select_with_complete_can_be_used_as_expression() { |
| 265 | block_on(async { |
| 266 | let res = select! { |
| 267 | x = future::pending::<i32>() => x, |
| 268 | y = future::pending::<i32>() => y + 1, |
| 269 | default => 99, |
| 270 | complete => 237, |
| 271 | }; |
| 272 | assert_eq!(res, 237); |
| 273 | }); |
| 274 | } |
| 275 | |
| 276 | #[test] |
| 277 | #[allow (unused_assignments)] |
| 278 | fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { |
| 279 | async fn require_mutable(_: &mut i32) {} |
| 280 | async fn async_noop() {} |
| 281 | |
| 282 | block_on(async { |
| 283 | let mut value = 234; |
| 284 | select! { |
| 285 | _ = require_mutable(&mut value).fuse() => { }, |
| 286 | _ = async_noop().fuse() => { |
| 287 | value += 5; |
| 288 | }, |
| 289 | } |
| 290 | }); |
| 291 | } |
| 292 | |
| 293 | #[test] |
| 294 | #[allow (unused_assignments)] |
| 295 | fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { |
| 296 | async fn require_mutable(_: &mut i32) {} |
| 297 | async fn async_noop() {} |
| 298 | |
| 299 | block_on(async { |
| 300 | let mut value = 234; |
| 301 | select! { |
| 302 | _ = require_mutable(&mut value).fuse() => { }, |
| 303 | _ = async_noop().fuse() => { |
| 304 | value += 5; |
| 305 | }, |
| 306 | default => { |
| 307 | value += 27; |
| 308 | }, |
| 309 | } |
| 310 | }); |
| 311 | } |
| 312 | |
| 313 | #[test] |
| 314 | #[allow (unused_assignments)] |
| 315 | fn stream_select() { |
| 316 | // stream_select! macro |
| 317 | block_on(async { |
| 318 | let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()); |
| 319 | |
| 320 | let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending()); |
| 321 | assert_eq!(endless_ones.next().await, Some(1)); |
| 322 | assert_eq!(endless_ones.next().await, Some(1)); |
| 323 | |
| 324 | let mut finite_list = |
| 325 | stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter())); |
| 326 | assert_eq!(finite_list.next().await, Some(1)); |
| 327 | assert_eq!(finite_list.next().await, Some(1)); |
| 328 | assert_eq!(finite_list.next().await, None); |
| 329 | |
| 330 | let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3)); |
| 331 | // Take 1000, and assert a somewhat even distribution of values. |
| 332 | // The fairness is randomized, but over 1000 samples we should be pretty close to even. |
| 333 | // This test may be a bit flaky. Feel free to adjust the margins as you see fit. |
| 334 | let mut count = 0; |
| 335 | let results = endless_mixed |
| 336 | .take_while(move |_| { |
| 337 | count += 1; |
| 338 | let ret = count < 1000; |
| 339 | async move { ret } |
| 340 | }) |
| 341 | .collect::<Vec<_>>() |
| 342 | .await; |
| 343 | assert!(results.iter().filter(|x| **x == 1).count() >= 299); |
| 344 | assert!(results.iter().filter(|x| **x == 2).count() >= 299); |
| 345 | assert!(results.iter().filter(|x| **x == 3).count() >= 299); |
| 346 | }); |
| 347 | } |
| 348 | |
| 349 | #[cfg_attr (not(target_pointer_width = "64" ), ignore)] |
| 350 | #[test] |
| 351 | fn join_size() { |
| 352 | let fut = async { |
| 353 | let ready = future::ready(0i32); |
| 354 | join!(ready) |
| 355 | }; |
| 356 | assert_eq!(mem::size_of_val(&fut), 24); |
| 357 | |
| 358 | let fut = async { |
| 359 | let ready1 = future::ready(0i32); |
| 360 | let ready2 = future::ready(0i32); |
| 361 | join!(ready1, ready2) |
| 362 | }; |
| 363 | assert_eq!(mem::size_of_val(&fut), 40); |
| 364 | } |
| 365 | |
| 366 | #[cfg_attr (not(target_pointer_width = "64" ), ignore)] |
| 367 | #[test] |
| 368 | fn try_join_size() { |
| 369 | let fut = async { |
| 370 | let ready = future::ready(Ok::<i32, i32>(0)); |
| 371 | try_join!(ready) |
| 372 | }; |
| 373 | assert_eq!(mem::size_of_val(&fut), 24); |
| 374 | |
| 375 | let fut = async { |
| 376 | let ready1 = future::ready(Ok::<i32, i32>(0)); |
| 377 | let ready2 = future::ready(Ok::<i32, i32>(0)); |
| 378 | try_join!(ready1, ready2) |
| 379 | }; |
| 380 | assert_eq!(mem::size_of_val(&fut), 48); |
| 381 | } |
| 382 | |
| 383 | #[allow (clippy::let_underscore_future)] |
| 384 | #[test] |
| 385 | fn join_doesnt_require_unpin() { |
| 386 | let _ = async { join!(async {}, async {}) }; |
| 387 | } |
| 388 | |
| 389 | #[allow (clippy::let_underscore_future)] |
| 390 | #[test] |
| 391 | fn try_join_doesnt_require_unpin() { |
| 392 | let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) }; |
| 393 | } |
| 394 | |