1 | use super::*; |
2 | |
3 | #[derive (Debug)] |
4 | pub(super) struct Counts { |
5 | /// Acting as a client or server. This allows us to track which values to |
6 | /// inc / dec. |
7 | peer: peer::Dyn, |
8 | |
9 | /// Maximum number of locally initiated streams |
10 | max_send_streams: usize, |
11 | |
12 | /// Current number of remote initiated streams |
13 | num_send_streams: usize, |
14 | |
15 | /// Maximum number of remote initiated streams |
16 | max_recv_streams: usize, |
17 | |
18 | /// Current number of locally initiated streams |
19 | num_recv_streams: usize, |
20 | |
21 | /// Maximum number of pending locally reset streams |
22 | max_local_reset_streams: usize, |
23 | |
24 | /// Current number of pending locally reset streams |
25 | num_local_reset_streams: usize, |
26 | |
27 | /// Max number of "pending accept" streams that were remotely reset |
28 | max_remote_reset_streams: usize, |
29 | |
30 | /// Current number of "pending accept" streams that were remotely reset |
31 | num_remote_reset_streams: usize, |
32 | |
33 | /// Maximum number of locally reset streams due to protocol error across |
34 | /// the lifetime of the connection. |
35 | /// |
36 | /// When this gets exceeded, we issue GOAWAYs. |
37 | max_local_error_reset_streams: Option<usize>, |
38 | |
39 | /// Total number of locally reset streams due to protocol error across the |
40 | /// lifetime of the connection. |
41 | num_local_error_reset_streams: usize, |
42 | } |
43 | |
44 | impl Counts { |
45 | /// Create a new `Counts` using the provided configuration values. |
46 | pub fn new(peer: peer::Dyn, config: &Config) -> Self { |
47 | Counts { |
48 | peer, |
49 | max_send_streams: config.initial_max_send_streams, |
50 | num_send_streams: 0, |
51 | max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), |
52 | num_recv_streams: 0, |
53 | max_local_reset_streams: config.local_reset_max, |
54 | num_local_reset_streams: 0, |
55 | max_remote_reset_streams: config.remote_reset_max, |
56 | num_remote_reset_streams: 0, |
57 | max_local_error_reset_streams: config.local_max_error_reset_streams, |
58 | num_local_error_reset_streams: 0, |
59 | } |
60 | } |
61 | |
62 | /// Returns true when the next opened stream will reach capacity of outbound streams |
63 | /// |
64 | /// The number of client send streams is incremented in prioritize; send_request has to guess if |
65 | /// it should wait before allowing another request to be sent. |
66 | pub fn next_send_stream_will_reach_capacity(&self) -> bool { |
67 | self.max_send_streams <= (self.num_send_streams + 1) |
68 | } |
69 | |
70 | /// Returns the current peer |
71 | pub fn peer(&self) -> peer::Dyn { |
72 | self.peer |
73 | } |
74 | |
75 | pub fn has_streams(&self) -> bool { |
76 | self.num_send_streams != 0 || self.num_recv_streams != 0 |
77 | } |
78 | |
79 | /// Returns true if we can issue another local reset due to protocol error. |
80 | pub fn can_inc_num_local_error_resets(&self) -> bool { |
81 | if let Some(max) = self.max_local_error_reset_streams { |
82 | max > self.num_local_error_reset_streams |
83 | } else { |
84 | true |
85 | } |
86 | } |
87 | |
88 | pub fn inc_num_local_error_resets(&mut self) { |
89 | assert!(self.can_inc_num_local_error_resets()); |
90 | |
91 | // Increment the number of remote initiated streams |
92 | self.num_local_error_reset_streams += 1; |
93 | } |
94 | |
95 | pub(crate) fn max_local_error_resets(&self) -> Option<usize> { |
96 | self.max_local_error_reset_streams |
97 | } |
98 | |
99 | /// Returns true if the receive stream concurrency can be incremented |
100 | pub fn can_inc_num_recv_streams(&self) -> bool { |
101 | self.max_recv_streams > self.num_recv_streams |
102 | } |
103 | |
104 | /// Increments the number of concurrent receive streams. |
105 | /// |
106 | /// # Panics |
107 | /// |
108 | /// Panics on failure as this should have been validated before hand. |
109 | pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) { |
110 | assert!(self.can_inc_num_recv_streams()); |
111 | assert!(!stream.is_counted); |
112 | |
113 | // Increment the number of remote initiated streams |
114 | self.num_recv_streams += 1; |
115 | stream.is_counted = true; |
116 | } |
117 | |
118 | /// Returns true if the send stream concurrency can be incremented |
119 | pub fn can_inc_num_send_streams(&self) -> bool { |
120 | self.max_send_streams > self.num_send_streams |
121 | } |
122 | |
123 | /// Increments the number of concurrent send streams. |
124 | /// |
125 | /// # Panics |
126 | /// |
127 | /// Panics on failure as this should have been validated before hand. |
128 | pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) { |
129 | assert!(self.can_inc_num_send_streams()); |
130 | assert!(!stream.is_counted); |
131 | |
132 | // Increment the number of remote initiated streams |
133 | self.num_send_streams += 1; |
134 | stream.is_counted = true; |
135 | } |
136 | |
137 | /// Returns true if the number of pending reset streams can be incremented. |
138 | pub fn can_inc_num_reset_streams(&self) -> bool { |
139 | self.max_local_reset_streams > self.num_local_reset_streams |
140 | } |
141 | |
142 | /// Increments the number of pending reset streams. |
143 | /// |
144 | /// # Panics |
145 | /// |
146 | /// Panics on failure as this should have been validated before hand. |
147 | pub fn inc_num_reset_streams(&mut self) { |
148 | assert!(self.can_inc_num_reset_streams()); |
149 | |
150 | self.num_local_reset_streams += 1; |
151 | } |
152 | |
153 | pub(crate) fn max_remote_reset_streams(&self) -> usize { |
154 | self.max_remote_reset_streams |
155 | } |
156 | |
157 | /// Returns true if the number of pending REMOTE reset streams can be |
158 | /// incremented. |
159 | pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool { |
160 | self.max_remote_reset_streams > self.num_remote_reset_streams |
161 | } |
162 | |
163 | /// Increments the number of pending REMOTE reset streams. |
164 | /// |
165 | /// # Panics |
166 | /// |
167 | /// Panics on failure as this should have been validated before hand. |
168 | pub(crate) fn inc_num_remote_reset_streams(&mut self) { |
169 | assert!(self.can_inc_num_remote_reset_streams()); |
170 | |
171 | self.num_remote_reset_streams += 1; |
172 | } |
173 | |
174 | pub(crate) fn dec_num_remote_reset_streams(&mut self) { |
175 | assert!(self.num_remote_reset_streams > 0); |
176 | |
177 | self.num_remote_reset_streams -= 1; |
178 | } |
179 | |
180 | pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) { |
181 | match settings.max_concurrent_streams() { |
182 | Some(val) => self.max_send_streams = val as usize, |
183 | None if is_initial => self.max_send_streams = usize::MAX, |
184 | None => {} |
185 | } |
186 | } |
187 | |
188 | /// Run a block of code that could potentially transition a stream's state. |
189 | /// |
190 | /// If the stream state transitions to closed, this function will perform |
191 | /// all necessary cleanup. |
192 | /// |
193 | /// TODO: Is this function still needed? |
194 | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U |
195 | where |
196 | F: FnOnce(&mut Self, &mut store::Ptr) -> U, |
197 | { |
198 | // TODO: Does this need to be computed before performing the action? |
199 | let is_pending_reset = stream.is_pending_reset_expiration(); |
200 | |
201 | // Run the action |
202 | let ret = f(self, &mut stream); |
203 | |
204 | self.transition_after(stream, is_pending_reset); |
205 | |
206 | ret |
207 | } |
208 | |
209 | // TODO: move this to macro? |
210 | pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) { |
211 | tracing::trace!( |
212 | "transition_after; stream= {:?}; state= {:?}; is_closed= {:?}; \ |
213 | pending_send_empty= {:?}; buffered_send_data= {}; \ |
214 | num_recv= {}; num_send= {}" , |
215 | stream.id, |
216 | stream.state, |
217 | stream.is_closed(), |
218 | stream.pending_send.is_empty(), |
219 | stream.buffered_send_data, |
220 | self.num_recv_streams, |
221 | self.num_send_streams |
222 | ); |
223 | |
224 | if stream.is_closed() { |
225 | if !stream.is_pending_reset_expiration() { |
226 | stream.unlink(); |
227 | if is_reset_counted { |
228 | self.dec_num_reset_streams(); |
229 | } |
230 | } |
231 | |
232 | if !stream.state.is_scheduled_reset() && stream.is_counted { |
233 | tracing::trace!("dec_num_streams; stream= {:?}" , stream.id); |
234 | // Decrement the number of active streams. |
235 | self.dec_num_streams(&mut stream); |
236 | } |
237 | } |
238 | |
239 | // Release the stream if it requires releasing |
240 | if stream.is_released() { |
241 | stream.remove(); |
242 | } |
243 | } |
244 | |
245 | /// Returns the maximum number of streams that can be initiated by this |
246 | /// peer. |
247 | pub(crate) fn max_send_streams(&self) -> usize { |
248 | self.max_send_streams |
249 | } |
250 | |
251 | /// Returns the maximum number of streams that can be initiated by the |
252 | /// remote peer. |
253 | pub(crate) fn max_recv_streams(&self) -> usize { |
254 | self.max_recv_streams |
255 | } |
256 | |
257 | fn dec_num_streams(&mut self, stream: &mut store::Ptr) { |
258 | assert!(stream.is_counted); |
259 | |
260 | if self.peer.is_local_init(stream.id) { |
261 | assert!(self.num_send_streams > 0); |
262 | self.num_send_streams -= 1; |
263 | stream.is_counted = false; |
264 | } else { |
265 | assert!(self.num_recv_streams > 0); |
266 | self.num_recv_streams -= 1; |
267 | stream.is_counted = false; |
268 | } |
269 | } |
270 | |
271 | fn dec_num_reset_streams(&mut self) { |
272 | assert!(self.num_local_reset_streams > 0); |
273 | self.num_local_reset_streams -= 1; |
274 | } |
275 | } |
276 | |
277 | impl Drop for Counts { |
278 | fn drop(&mut self) { |
279 | use std::thread; |
280 | |
281 | if !thread::panicking() { |
282 | debug_assert!(!self.has_streams()); |
283 | } |
284 | } |
285 | } |
286 | |