1#![feature(test)]
2
3extern crate test;
4use crate::test::Bencher;
5
6use futures::channel::oneshot;
7use futures::executor::block_on;
8use futures::future;
9use futures::stream::{self, StreamExt};
10use futures::task::Poll;
11use futures_util::FutureExt;
12use std::collections::VecDeque;
13use std::thread;
14
15#[bench]
16fn oneshot_streams(b: &mut Bencher) {
17 const STREAM_COUNT: usize = 10_000;
18 const STREAM_ITEM_COUNT: usize = 1;
19
20 b.iter(|| {
21 let mut txs = VecDeque::with_capacity(STREAM_COUNT);
22 let mut rxs = Vec::new();
23
24 for _ in 0..STREAM_COUNT {
25 let (tx, rx) = oneshot::channel();
26 txs.push_back(tx);
27 rxs.push(rx);
28 }
29
30 thread::spawn(move || {
31 let mut last = 1;
32 while let Some(tx) = txs.pop_front() {
33 let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
34 last += STREAM_ITEM_COUNT;
35 }
36 });
37
38 let mut flatten = stream::iter(rxs)
39 .map(|recv| recv.into_stream().map(|val| val.unwrap()).flatten())
40 .flatten_unordered(None);
41
42 block_on(future::poll_fn(move |cx| {
43 let mut count = 0;
44 loop {
45 match flatten.poll_next_unpin(cx) {
46 Poll::Ready(None) => break,
47 Poll::Ready(Some(_)) => {
48 count += 1;
49 }
50 _ => {}
51 }
52 }
53 assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);
54
55 Poll::Ready(())
56 }))
57 });
58}
59