1use super::*;
2
3use std::usize;
4
5#[derive(Debug)]
6pub(super) struct Counts {
7 /// Acting as a client or server. This allows us to track which values to
8 /// inc / dec.
9 peer: peer::Dyn,
10
11 /// Maximum number of locally initiated streams
12 max_send_streams: usize,
13
14 /// Current number of remote initiated streams
15 num_send_streams: usize,
16
17 /// Maximum number of remote initiated streams
18 max_recv_streams: usize,
19
20 /// Current number of locally initiated streams
21 num_recv_streams: usize,
22
23 /// Maximum number of pending locally reset streams
24 max_local_reset_streams: usize,
25
26 /// Current number of pending locally reset streams
27 num_local_reset_streams: usize,
28
29 /// Max number of "pending accept" streams that were remotely reset
30 max_remote_reset_streams: usize,
31
32 /// Current number of "pending accept" streams that were remotely reset
33 num_remote_reset_streams: usize,
34
35 /// Maximum number of locally reset streams due to protocol error across
36 /// the lifetime of the connection.
37 ///
38 /// When this gets exceeded, we issue GOAWAYs.
39 max_local_error_reset_streams: Option<usize>,
40
41 /// Total number of locally reset streams due to protocol error across the
42 /// lifetime of the connection.
43 num_local_error_reset_streams: usize,
44}
45
46impl Counts {
47 /// Create a new `Counts` using the provided configuration values.
48 pub fn new(peer: peer::Dyn, config: &Config) -> Self {
49 Counts {
50 peer,
51 max_send_streams: config.initial_max_send_streams,
52 num_send_streams: 0,
53 max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
54 num_recv_streams: 0,
55 max_local_reset_streams: config.local_reset_max,
56 num_local_reset_streams: 0,
57 max_remote_reset_streams: config.remote_reset_max,
58 num_remote_reset_streams: 0,
59 max_local_error_reset_streams: config.local_max_error_reset_streams,
60 num_local_error_reset_streams: 0,
61 }
62 }
63
64 /// Returns true when the next opened stream will reach capacity of outbound streams
65 ///
66 /// The number of client send streams is incremented in prioritize; send_request has to guess if
67 /// it should wait before allowing another request to be sent.
68 pub fn next_send_stream_will_reach_capacity(&self) -> bool {
69 self.max_send_streams <= (self.num_send_streams + 1)
70 }
71
72 /// Returns the current peer
73 pub fn peer(&self) -> peer::Dyn {
74 self.peer
75 }
76
77 pub fn has_streams(&self) -> bool {
78 self.num_send_streams != 0 || self.num_recv_streams != 0
79 }
80
81 /// Returns true if we can issue another local reset due to protocol error.
82 pub fn can_inc_num_local_error_resets(&self) -> bool {
83 if let Some(max) = self.max_local_error_reset_streams {
84 max > self.num_local_error_reset_streams
85 } else {
86 true
87 }
88 }
89
90 pub fn inc_num_local_error_resets(&mut self) {
91 assert!(self.can_inc_num_local_error_resets());
92
93 // Increment the number of remote initiated streams
94 self.num_local_error_reset_streams += 1;
95 }
96
97 pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
98 self.max_local_error_reset_streams
99 }
100
101 /// Returns true if the receive stream concurrency can be incremented
102 pub fn can_inc_num_recv_streams(&self) -> bool {
103 self.max_recv_streams > self.num_recv_streams
104 }
105
106 /// Increments the number of concurrent receive streams.
107 ///
108 /// # Panics
109 ///
110 /// Panics on failure as this should have been validated before hand.
111 pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
112 assert!(self.can_inc_num_recv_streams());
113 assert!(!stream.is_counted);
114
115 // Increment the number of remote initiated streams
116 self.num_recv_streams += 1;
117 stream.is_counted = true;
118 }
119
120 /// Returns true if the send stream concurrency can be incremented
121 pub fn can_inc_num_send_streams(&self) -> bool {
122 self.max_send_streams > self.num_send_streams
123 }
124
125 /// Increments the number of concurrent send streams.
126 ///
127 /// # Panics
128 ///
129 /// Panics on failure as this should have been validated before hand.
130 pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
131 assert!(self.can_inc_num_send_streams());
132 assert!(!stream.is_counted);
133
134 // Increment the number of remote initiated streams
135 self.num_send_streams += 1;
136 stream.is_counted = true;
137 }
138
139 /// Returns true if the number of pending reset streams can be incremented.
140 pub fn can_inc_num_reset_streams(&self) -> bool {
141 self.max_local_reset_streams > self.num_local_reset_streams
142 }
143
144 /// Increments the number of pending reset streams.
145 ///
146 /// # Panics
147 ///
148 /// Panics on failure as this should have been validated before hand.
149 pub fn inc_num_reset_streams(&mut self) {
150 assert!(self.can_inc_num_reset_streams());
151
152 self.num_local_reset_streams += 1;
153 }
154
155 pub(crate) fn max_remote_reset_streams(&self) -> usize {
156 self.max_remote_reset_streams
157 }
158
159 /// Returns true if the number of pending REMOTE reset streams can be
160 /// incremented.
161 pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
162 self.max_remote_reset_streams > self.num_remote_reset_streams
163 }
164
165 /// Increments the number of pending REMOTE reset streams.
166 ///
167 /// # Panics
168 ///
169 /// Panics on failure as this should have been validated before hand.
170 pub(crate) fn inc_num_remote_reset_streams(&mut self) {
171 assert!(self.can_inc_num_remote_reset_streams());
172
173 self.num_remote_reset_streams += 1;
174 }
175
176 pub(crate) fn dec_num_remote_reset_streams(&mut self) {
177 assert!(self.num_remote_reset_streams > 0);
178
179 self.num_remote_reset_streams -= 1;
180 }
181
182 pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
183 if let Some(val) = settings.max_concurrent_streams() {
184 self.max_send_streams = val as usize;
185 }
186 }
187
188 /// Run a block of code that could potentially transition a stream's state.
189 ///
190 /// If the stream state transitions to closed, this function will perform
191 /// all necessary cleanup.
192 ///
193 /// TODO: Is this function still needed?
194 pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
195 where
196 F: FnOnce(&mut Self, &mut store::Ptr) -> U,
197 {
198 // TODO: Does this need to be computed before performing the action?
199 let is_pending_reset = stream.is_pending_reset_expiration();
200
201 // Run the action
202 let ret = f(self, &mut stream);
203
204 self.transition_after(stream, is_pending_reset);
205
206 ret
207 }
208
209 // TODO: move this to macro?
210 pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
211 tracing::trace!(
212 "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
213 pending_send_empty={:?}; buffered_send_data={}; \
214 num_recv={}; num_send={}",
215 stream.id,
216 stream.state,
217 stream.is_closed(),
218 stream.pending_send.is_empty(),
219 stream.buffered_send_data,
220 self.num_recv_streams,
221 self.num_send_streams
222 );
223
224 if stream.is_closed() {
225 if !stream.is_pending_reset_expiration() {
226 stream.unlink();
227 if is_reset_counted {
228 self.dec_num_reset_streams();
229 }
230 }
231
232 if stream.is_counted {
233 tracing::trace!("dec_num_streams; stream={:?}", stream.id);
234 // Decrement the number of active streams.
235 self.dec_num_streams(&mut stream);
236 }
237 }
238
239 // Release the stream if it requires releasing
240 if stream.is_released() {
241 stream.remove();
242 }
243 }
244
245 /// Returns the maximum number of streams that can be initiated by this
246 /// peer.
247 pub(crate) fn max_send_streams(&self) -> usize {
248 self.max_send_streams
249 }
250
251 /// Returns the maximum number of streams that can be initiated by the
252 /// remote peer.
253 pub(crate) fn max_recv_streams(&self) -> usize {
254 self.max_recv_streams
255 }
256
257 fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
258 assert!(stream.is_counted);
259
260 if self.peer.is_local_init(stream.id) {
261 assert!(self.num_send_streams > 0);
262 self.num_send_streams -= 1;
263 stream.is_counted = false;
264 } else {
265 assert!(self.num_recv_streams > 0);
266 self.num_recv_streams -= 1;
267 stream.is_counted = false;
268 }
269 }
270
271 fn dec_num_reset_streams(&mut self) {
272 assert!(self.num_local_reset_streams > 0);
273 self.num_local_reset_streams -= 1;
274 }
275}
276
277impl Drop for Counts {
278 fn drop(&mut self) {
279 use std::thread;
280
281 if !thread::panicking() {
282 debug_assert!(!self.has_streams());
283 }
284 }
285}
286