1#![feature(test)]
2
3extern crate test;
4use crate::test::Bencher;
5
6use {
7 futures::{
8 channel::mpsc::{self, Sender, UnboundedSender},
9 ready,
10 sink::Sink,
11 stream::{Stream, StreamExt},
12 task::{Context, Poll},
13 },
14 futures_test::task::noop_context,
15 std::pin::Pin,
16};
17
18/// Single producer, single consumer
19#[bench]
20fn unbounded_1_tx(b: &mut Bencher) {
21 let mut cx = noop_context();
22 b.iter(|| {
23 let (tx, mut rx) = mpsc::unbounded();
24
25 // 1000 iterations to avoid measuring overhead of initialization
26 // Result should be divided by 1000
27 for i in 0..1000 {
28 // Poll, not ready, park
29 assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
30
31 UnboundedSender::unbounded_send(&tx, i).unwrap();
32
33 // Now poll ready
34 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
35 }
36 })
37}
38
39/// 100 producers, single consumer
40#[bench]
41fn unbounded_100_tx(b: &mut Bencher) {
42 let mut cx = noop_context();
43 b.iter(|| {
44 let (tx, mut rx) = mpsc::unbounded();
45
46 let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
47
48 // 1000 send/recv operations total, result should be divided by 1000
49 for _ in 0..10 {
50 for (i, x) in tx.iter().enumerate() {
51 assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
52
53 UnboundedSender::unbounded_send(x, i).unwrap();
54
55 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
56 }
57 }
58 })
59}
60
61#[bench]
62fn unbounded_uncontended(b: &mut Bencher) {
63 let mut cx = noop_context();
64 b.iter(|| {
65 let (tx, mut rx) = mpsc::unbounded();
66
67 for i in 0..1000 {
68 UnboundedSender::unbounded_send(&tx, i).expect("send");
69 // No need to create a task, because poll is not going to park.
70 assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
71 }
72 })
73}
74
75/// A Stream that continuously sends incrementing number of the queue
76struct TestSender {
77 tx: Sender<u32>,
78 last: u32, // Last number sent
79}
80
81// Could be a Future, it doesn't matter
82impl Stream for TestSender {
83 type Item = u32;
84
85 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
86 let this = &mut *self;
87 let mut tx = Pin::new(&mut this.tx);
88
89 ready!(tx.as_mut().poll_ready(cx)).unwrap();
90 tx.as_mut().start_send(this.last + 1).unwrap();
91 this.last += 1;
92 assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx));
93 Poll::Ready(Some(this.last))
94 }
95}
96
97/// Single producers, single consumer
98#[bench]
99fn bounded_1_tx(b: &mut Bencher) {
100 let mut cx = noop_context();
101 b.iter(|| {
102 let (tx, mut rx) = mpsc::channel(0);
103
104 let mut tx = TestSender { tx, last: 0 };
105
106 for i in 0..1000 {
107 assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
108 assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
109 assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
110 }
111 })
112}
113
114/// 100 producers, single consumer
115#[bench]
116fn bounded_100_tx(b: &mut Bencher) {
117 let mut cx = noop_context();
118 b.iter(|| {
119 // Each sender can send one item after specified capacity
120 let (tx, mut rx) = mpsc::channel(0);
121
122 let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();
123
124 for i in 0..10 {
125 for x in &mut tx {
126 // Send an item
127 assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx));
128 // Then block
129 assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx));
130 // Recv the item
131 assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
132 }
133 }
134 })
135}
136