1use super::*;
2
3use std::usize;
4
5#[derive(Debug)]
6pub(super) struct Counts {
7 /// Acting as a client or server. This allows us to track which values to
8 /// inc / dec.
9 peer: peer::Dyn,
10
11 /// Maximum number of locally initiated streams
12 max_send_streams: usize,
13
14 /// Current number of remote initiated streams
15 num_send_streams: usize,
16
17 /// Maximum number of remote initiated streams
18 max_recv_streams: usize,
19
20 /// Current number of locally initiated streams
21 num_recv_streams: usize,
22
23 /// Maximum number of pending locally reset streams
24 max_local_reset_streams: usize,
25
26 /// Current number of pending locally reset streams
27 num_local_reset_streams: usize,
28
29 /// Max number of "pending accept" streams that were remotely reset
30 max_remote_reset_streams: usize,
31
32 /// Current number of "pending accept" streams that were remotely reset
33 num_remote_reset_streams: usize,
34}
35
36impl Counts {
37 /// Create a new `Counts` using the provided configuration values.
38 pub fn new(peer: peer::Dyn, config: &Config) -> Self {
39 Counts {
40 peer,
41 max_send_streams: config.initial_max_send_streams,
42 num_send_streams: 0,
43 max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
44 num_recv_streams: 0,
45 max_local_reset_streams: config.local_reset_max,
46 num_local_reset_streams: 0,
47 max_remote_reset_streams: config.remote_reset_max,
48 num_remote_reset_streams: 0,
49 }
50 }
51
52 /// Returns true when the next opened stream will reach capacity of outbound streams
53 ///
54 /// The number of client send streams is incremented in prioritize; send_request has to guess if
55 /// it should wait before allowing another request to be sent.
56 pub fn next_send_stream_will_reach_capacity(&self) -> bool {
57 self.max_send_streams <= (self.num_send_streams + 1)
58 }
59
60 /// Returns the current peer
61 pub fn peer(&self) -> peer::Dyn {
62 self.peer
63 }
64
65 pub fn has_streams(&self) -> bool {
66 self.num_send_streams != 0 || self.num_recv_streams != 0
67 }
68
69 /// Returns true if the receive stream concurrency can be incremented
70 pub fn can_inc_num_recv_streams(&self) -> bool {
71 self.max_recv_streams > self.num_recv_streams
72 }
73
74 /// Increments the number of concurrent receive streams.
75 ///
76 /// # Panics
77 ///
78 /// Panics on failure as this should have been validated before hand.
79 pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
80 assert!(self.can_inc_num_recv_streams());
81 assert!(!stream.is_counted);
82
83 // Increment the number of remote initiated streams
84 self.num_recv_streams += 1;
85 stream.is_counted = true;
86 }
87
88 /// Returns true if the send stream concurrency can be incremented
89 pub fn can_inc_num_send_streams(&self) -> bool {
90 self.max_send_streams > self.num_send_streams
91 }
92
93 /// Increments the number of concurrent send streams.
94 ///
95 /// # Panics
96 ///
97 /// Panics on failure as this should have been validated before hand.
98 pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
99 assert!(self.can_inc_num_send_streams());
100 assert!(!stream.is_counted);
101
102 // Increment the number of remote initiated streams
103 self.num_send_streams += 1;
104 stream.is_counted = true;
105 }
106
107 /// Returns true if the number of pending reset streams can be incremented.
108 pub fn can_inc_num_reset_streams(&self) -> bool {
109 self.max_local_reset_streams > self.num_local_reset_streams
110 }
111
112 /// Increments the number of pending reset streams.
113 ///
114 /// # Panics
115 ///
116 /// Panics on failure as this should have been validated before hand.
117 pub fn inc_num_reset_streams(&mut self) {
118 assert!(self.can_inc_num_reset_streams());
119
120 self.num_local_reset_streams += 1;
121 }
122
123 pub(crate) fn max_remote_reset_streams(&self) -> usize {
124 self.max_remote_reset_streams
125 }
126
127 /// Returns true if the number of pending REMOTE reset streams can be
128 /// incremented.
129 pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
130 self.max_remote_reset_streams > self.num_remote_reset_streams
131 }
132
133 /// Increments the number of pending REMOTE reset streams.
134 ///
135 /// # Panics
136 ///
137 /// Panics on failure as this should have been validated before hand.
138 pub(crate) fn inc_num_remote_reset_streams(&mut self) {
139 assert!(self.can_inc_num_remote_reset_streams());
140
141 self.num_remote_reset_streams += 1;
142 }
143
144 pub(crate) fn dec_num_remote_reset_streams(&mut self) {
145 assert!(self.num_remote_reset_streams > 0);
146
147 self.num_remote_reset_streams -= 1;
148 }
149
150 pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
151 if let Some(val) = settings.max_concurrent_streams() {
152 self.max_send_streams = val as usize;
153 }
154 }
155
156 /// Run a block of code that could potentially transition a stream's state.
157 ///
158 /// If the stream state transitions to closed, this function will perform
159 /// all necessary cleanup.
160 ///
161 /// TODO: Is this function still needed?
162 pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
163 where
164 F: FnOnce(&mut Self, &mut store::Ptr) -> U,
165 {
166 // TODO: Does this need to be computed before performing the action?
167 let is_pending_reset = stream.is_pending_reset_expiration();
168
169 // Run the action
170 let ret = f(self, &mut stream);
171
172 self.transition_after(stream, is_pending_reset);
173
174 ret
175 }
176
177 // TODO: move this to macro?
178 pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
179 tracing::trace!(
180 "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
181 pending_send_empty={:?}; buffered_send_data={}; \
182 num_recv={}; num_send={}",
183 stream.id,
184 stream.state,
185 stream.is_closed(),
186 stream.pending_send.is_empty(),
187 stream.buffered_send_data,
188 self.num_recv_streams,
189 self.num_send_streams
190 );
191
192 if stream.is_closed() {
193 if !stream.is_pending_reset_expiration() {
194 stream.unlink();
195 if is_reset_counted {
196 self.dec_num_reset_streams();
197 }
198 }
199
200 if stream.is_counted {
201 tracing::trace!("dec_num_streams; stream={:?}", stream.id);
202 // Decrement the number of active streams.
203 self.dec_num_streams(&mut stream);
204 }
205 }
206
207 // Release the stream if it requires releasing
208 if stream.is_released() {
209 stream.remove();
210 }
211 }
212
213 /// Returns the maximum number of streams that can be initiated by this
214 /// peer.
215 pub(crate) fn max_send_streams(&self) -> usize {
216 self.max_send_streams
217 }
218
219 /// Returns the maximum number of streams that can be initiated by the
220 /// remote peer.
221 pub(crate) fn max_recv_streams(&self) -> usize {
222 self.max_recv_streams
223 }
224
225 fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
226 assert!(stream.is_counted);
227
228 if self.peer.is_local_init(stream.id) {
229 assert!(self.num_send_streams > 0);
230 self.num_send_streams -= 1;
231 stream.is_counted = false;
232 } else {
233 assert!(self.num_recv_streams > 0);
234 self.num_recv_streams -= 1;
235 stream.is_counted = false;
236 }
237 }
238
239 fn dec_num_reset_streams(&mut self) {
240 assert!(self.num_local_reset_streams > 0);
241 self.num_local_reset_streams -= 1;
242 }
243}
244
245impl Drop for Counts {
246 fn drop(&mut self) {
247 use std::thread;
248
249 if !thread::panicking() {
250 debug_assert!(!self.has_streams());
251 }
252 }
253}
254