1 | #![feature (test)] |
2 | |
3 | extern crate test; |
4 | use crate::test::Bencher; |
5 | |
6 | use { |
7 | futures::{ |
8 | channel::mpsc::{self, Sender, UnboundedSender}, |
9 | ready, |
10 | sink::Sink, |
11 | stream::{Stream, StreamExt}, |
12 | task::{Context, Poll}, |
13 | }, |
14 | futures_test::task::noop_context, |
15 | std::pin::Pin, |
16 | }; |
17 | |
18 | /// Single producer, single consumer |
19 | #[bench] |
20 | fn unbounded_1_tx(b: &mut Bencher) { |
21 | let mut cx = noop_context(); |
22 | b.iter(|| { |
23 | let (tx, mut rx) = mpsc::unbounded(); |
24 | |
25 | // 1000 iterations to avoid measuring overhead of initialization |
26 | // Result should be divided by 1000 |
27 | for i in 0..1000 { |
28 | // Poll, not ready, park |
29 | assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); |
30 | |
31 | UnboundedSender::unbounded_send(&tx, i).unwrap(); |
32 | |
33 | // Now poll ready |
34 | assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); |
35 | } |
36 | }) |
37 | } |
38 | |
39 | /// 100 producers, single consumer |
40 | #[bench] |
41 | fn unbounded_100_tx(b: &mut Bencher) { |
42 | let mut cx = noop_context(); |
43 | b.iter(|| { |
44 | let (tx, mut rx) = mpsc::unbounded(); |
45 | |
46 | let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect(); |
47 | |
48 | // 1000 send/recv operations total, result should be divided by 1000 |
49 | for _ in 0..10 { |
50 | for (i, x) in tx.iter().enumerate() { |
51 | assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx)); |
52 | |
53 | UnboundedSender::unbounded_send(x, i).unwrap(); |
54 | |
55 | assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); |
56 | } |
57 | } |
58 | }) |
59 | } |
60 | |
61 | #[bench] |
62 | fn unbounded_uncontended(b: &mut Bencher) { |
63 | let mut cx = noop_context(); |
64 | b.iter(|| { |
65 | let (tx, mut rx) = mpsc::unbounded(); |
66 | |
67 | for i in 0..1000 { |
68 | UnboundedSender::unbounded_send(&tx, i).expect("send" ); |
69 | // No need to create a task, because poll is not going to park. |
70 | assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx)); |
71 | } |
72 | }) |
73 | } |
74 | |
75 | /// A Stream that continuously sends incrementing number of the queue |
76 | struct TestSender { |
77 | tx: Sender<u32>, |
78 | last: u32, // Last number sent |
79 | } |
80 | |
81 | // Could be a Future, it doesn't matter |
82 | impl Stream for TestSender { |
83 | type Item = u32; |
84 | |
85 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
86 | let this = &mut *self; |
87 | let mut tx = Pin::new(&mut this.tx); |
88 | |
89 | ready!(tx.as_mut().poll_ready(cx)).unwrap(); |
90 | tx.as_mut().start_send(this.last + 1).unwrap(); |
91 | this.last += 1; |
92 | assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx)); |
93 | Poll::Ready(Some(this.last)) |
94 | } |
95 | } |
96 | |
97 | /// Single producers, single consumer |
98 | #[bench] |
99 | fn bounded_1_tx(b: &mut Bencher) { |
100 | let mut cx = noop_context(); |
101 | b.iter(|| { |
102 | let (tx, mut rx) = mpsc::channel(0); |
103 | |
104 | let mut tx = TestSender { tx, last: 0 }; |
105 | |
106 | for i in 0..1000 { |
107 | assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx)); |
108 | assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx)); |
109 | assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx)); |
110 | } |
111 | }) |
112 | } |
113 | |
114 | /// 100 producers, single consumer |
115 | #[bench] |
116 | fn bounded_100_tx(b: &mut Bencher) { |
117 | let mut cx = noop_context(); |
118 | b.iter(|| { |
119 | // Each sender can send one item after specified capacity |
120 | let (tx, mut rx) = mpsc::channel(0); |
121 | |
122 | let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect(); |
123 | |
124 | for i in 0..10 { |
125 | for x in &mut tx { |
126 | // Send an item |
127 | assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx)); |
128 | // Then block |
129 | assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx)); |
130 | // Recv the item |
131 | assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx)); |
132 | } |
133 | } |
134 | }) |
135 | } |
136 | |