1 | #![feature (test)] |
2 | |
3 | extern crate test; |
4 | use crate::test::Bencher; |
5 | |
6 | use futures::channel::oneshot; |
7 | use futures::executor::block_on; |
8 | use futures::future; |
9 | use futures::stream::{self, StreamExt}; |
10 | use futures::task::Poll; |
11 | use futures_util::FutureExt; |
12 | use std::collections::VecDeque; |
13 | use std::thread; |
14 | |
15 | #[bench] |
16 | fn oneshot_streams(b: &mut Bencher) { |
17 | const STREAM_COUNT: usize = 10_000; |
18 | const STREAM_ITEM_COUNT: usize = 1; |
19 | |
20 | b.iter(|| { |
21 | let mut txs = VecDeque::with_capacity(STREAM_COUNT); |
22 | let mut rxs = Vec::new(); |
23 | |
24 | for _ in 0..STREAM_COUNT { |
25 | let (tx, rx) = oneshot::channel(); |
26 | txs.push_back(tx); |
27 | rxs.push(rx); |
28 | } |
29 | |
30 | thread::spawn(move || { |
31 | let mut last = 1; |
32 | while let Some(tx) = txs.pop_front() { |
33 | let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT)); |
34 | last += STREAM_ITEM_COUNT; |
35 | } |
36 | }); |
37 | |
38 | let mut flatten = stream::iter(rxs) |
39 | .map(|recv| recv.into_stream().map(|val| val.unwrap()).flatten()) |
40 | .flatten_unordered(None); |
41 | |
42 | block_on(future::poll_fn(move |cx| { |
43 | let mut count = 0; |
44 | loop { |
45 | match flatten.poll_next_unpin(cx) { |
46 | Poll::Ready(None) => break, |
47 | Poll::Ready(Some(_)) => { |
48 | count += 1; |
49 | } |
50 | _ => {} |
51 | } |
52 | } |
53 | assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT); |
54 | |
55 | Poll::Ready(()) |
56 | })) |
57 | }); |
58 | } |
59 | |