1 | use super::*; |
2 | |
3 | use std::usize; |
4 | |
5 | #[derive (Debug)] |
6 | pub(super) struct Counts { |
7 | /// Acting as a client or server. This allows us to track which values to |
8 | /// inc / dec. |
9 | peer: peer::Dyn, |
10 | |
11 | /// Maximum number of locally initiated streams |
12 | max_send_streams: usize, |
13 | |
14 | /// Current number of remote initiated streams |
15 | num_send_streams: usize, |
16 | |
17 | /// Maximum number of remote initiated streams |
18 | max_recv_streams: usize, |
19 | |
20 | /// Current number of locally initiated streams |
21 | num_recv_streams: usize, |
22 | |
23 | /// Maximum number of pending locally reset streams |
24 | max_local_reset_streams: usize, |
25 | |
26 | /// Current number of pending locally reset streams |
27 | num_local_reset_streams: usize, |
28 | |
29 | /// Max number of "pending accept" streams that were remotely reset |
30 | max_remote_reset_streams: usize, |
31 | |
32 | /// Current number of "pending accept" streams that were remotely reset |
33 | num_remote_reset_streams: usize, |
34 | |
35 | /// Maximum number of locally reset streams due to protocol error across |
36 | /// the lifetime of the connection. |
37 | /// |
38 | /// When this gets exceeded, we issue GOAWAYs. |
39 | max_local_error_reset_streams: Option<usize>, |
40 | |
41 | /// Total number of locally reset streams due to protocol error across the |
42 | /// lifetime of the connection. |
43 | num_local_error_reset_streams: usize, |
44 | } |
45 | |
46 | impl Counts { |
47 | /// Create a new `Counts` using the provided configuration values. |
48 | pub fn new(peer: peer::Dyn, config: &Config) -> Self { |
49 | Counts { |
50 | peer, |
51 | max_send_streams: config.initial_max_send_streams, |
52 | num_send_streams: 0, |
53 | max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), |
54 | num_recv_streams: 0, |
55 | max_local_reset_streams: config.local_reset_max, |
56 | num_local_reset_streams: 0, |
57 | max_remote_reset_streams: config.remote_reset_max, |
58 | num_remote_reset_streams: 0, |
59 | max_local_error_reset_streams: config.local_max_error_reset_streams, |
60 | num_local_error_reset_streams: 0, |
61 | } |
62 | } |
63 | |
64 | /// Returns true when the next opened stream will reach capacity of outbound streams |
65 | /// |
66 | /// The number of client send streams is incremented in prioritize; send_request has to guess if |
67 | /// it should wait before allowing another request to be sent. |
68 | pub fn next_send_stream_will_reach_capacity(&self) -> bool { |
69 | self.max_send_streams <= (self.num_send_streams + 1) |
70 | } |
71 | |
72 | /// Returns the current peer |
73 | pub fn peer(&self) -> peer::Dyn { |
74 | self.peer |
75 | } |
76 | |
77 | pub fn has_streams(&self) -> bool { |
78 | self.num_send_streams != 0 || self.num_recv_streams != 0 |
79 | } |
80 | |
81 | /// Returns true if we can issue another local reset due to protocol error. |
82 | pub fn can_inc_num_local_error_resets(&self) -> bool { |
83 | if let Some(max) = self.max_local_error_reset_streams { |
84 | max > self.num_local_error_reset_streams |
85 | } else { |
86 | true |
87 | } |
88 | } |
89 | |
90 | pub fn inc_num_local_error_resets(&mut self) { |
91 | assert!(self.can_inc_num_local_error_resets()); |
92 | |
93 | // Increment the number of remote initiated streams |
94 | self.num_local_error_reset_streams += 1; |
95 | } |
96 | |
97 | pub(crate) fn max_local_error_resets(&self) -> Option<usize> { |
98 | self.max_local_error_reset_streams |
99 | } |
100 | |
101 | /// Returns true if the receive stream concurrency can be incremented |
102 | pub fn can_inc_num_recv_streams(&self) -> bool { |
103 | self.max_recv_streams > self.num_recv_streams |
104 | } |
105 | |
106 | /// Increments the number of concurrent receive streams. |
107 | /// |
108 | /// # Panics |
109 | /// |
110 | /// Panics on failure as this should have been validated before hand. |
111 | pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) { |
112 | assert!(self.can_inc_num_recv_streams()); |
113 | assert!(!stream.is_counted); |
114 | |
115 | // Increment the number of remote initiated streams |
116 | self.num_recv_streams += 1; |
117 | stream.is_counted = true; |
118 | } |
119 | |
120 | /// Returns true if the send stream concurrency can be incremented |
121 | pub fn can_inc_num_send_streams(&self) -> bool { |
122 | self.max_send_streams > self.num_send_streams |
123 | } |
124 | |
125 | /// Increments the number of concurrent send streams. |
126 | /// |
127 | /// # Panics |
128 | /// |
129 | /// Panics on failure as this should have been validated before hand. |
130 | pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) { |
131 | assert!(self.can_inc_num_send_streams()); |
132 | assert!(!stream.is_counted); |
133 | |
134 | // Increment the number of remote initiated streams |
135 | self.num_send_streams += 1; |
136 | stream.is_counted = true; |
137 | } |
138 | |
139 | /// Returns true if the number of pending reset streams can be incremented. |
140 | pub fn can_inc_num_reset_streams(&self) -> bool { |
141 | self.max_local_reset_streams > self.num_local_reset_streams |
142 | } |
143 | |
144 | /// Increments the number of pending reset streams. |
145 | /// |
146 | /// # Panics |
147 | /// |
148 | /// Panics on failure as this should have been validated before hand. |
149 | pub fn inc_num_reset_streams(&mut self) { |
150 | assert!(self.can_inc_num_reset_streams()); |
151 | |
152 | self.num_local_reset_streams += 1; |
153 | } |
154 | |
155 | pub(crate) fn max_remote_reset_streams(&self) -> usize { |
156 | self.max_remote_reset_streams |
157 | } |
158 | |
159 | /// Returns true if the number of pending REMOTE reset streams can be |
160 | /// incremented. |
161 | pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool { |
162 | self.max_remote_reset_streams > self.num_remote_reset_streams |
163 | } |
164 | |
165 | /// Increments the number of pending REMOTE reset streams. |
166 | /// |
167 | /// # Panics |
168 | /// |
169 | /// Panics on failure as this should have been validated before hand. |
170 | pub(crate) fn inc_num_remote_reset_streams(&mut self) { |
171 | assert!(self.can_inc_num_remote_reset_streams()); |
172 | |
173 | self.num_remote_reset_streams += 1; |
174 | } |
175 | |
176 | pub(crate) fn dec_num_remote_reset_streams(&mut self) { |
177 | assert!(self.num_remote_reset_streams > 0); |
178 | |
179 | self.num_remote_reset_streams -= 1; |
180 | } |
181 | |
182 | pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { |
183 | if let Some(val) = settings.max_concurrent_streams() { |
184 | self.max_send_streams = val as usize; |
185 | } |
186 | } |
187 | |
188 | /// Run a block of code that could potentially transition a stream's state. |
189 | /// |
190 | /// If the stream state transitions to closed, this function will perform |
191 | /// all necessary cleanup. |
192 | /// |
193 | /// TODO: Is this function still needed? |
194 | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U |
195 | where |
196 | F: FnOnce(&mut Self, &mut store::Ptr) -> U, |
197 | { |
198 | // TODO: Does this need to be computed before performing the action? |
199 | let is_pending_reset = stream.is_pending_reset_expiration(); |
200 | |
201 | // Run the action |
202 | let ret = f(self, &mut stream); |
203 | |
204 | self.transition_after(stream, is_pending_reset); |
205 | |
206 | ret |
207 | } |
208 | |
209 | // TODO: move this to macro? |
210 | pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) { |
211 | tracing::trace!( |
212 | "transition_after; stream= {:?}; state= {:?}; is_closed= {:?}; \ |
213 | pending_send_empty= {:?}; buffered_send_data= {}; \ |
214 | num_recv= {}; num_send= {}" , |
215 | stream.id, |
216 | stream.state, |
217 | stream.is_closed(), |
218 | stream.pending_send.is_empty(), |
219 | stream.buffered_send_data, |
220 | self.num_recv_streams, |
221 | self.num_send_streams |
222 | ); |
223 | |
224 | if stream.is_closed() { |
225 | if !stream.is_pending_reset_expiration() { |
226 | stream.unlink(); |
227 | if is_reset_counted { |
228 | self.dec_num_reset_streams(); |
229 | } |
230 | } |
231 | |
232 | if stream.is_counted { |
233 | tracing::trace!("dec_num_streams; stream= {:?}" , stream.id); |
234 | // Decrement the number of active streams. |
235 | self.dec_num_streams(&mut stream); |
236 | } |
237 | } |
238 | |
239 | // Release the stream if it requires releasing |
240 | if stream.is_released() { |
241 | stream.remove(); |
242 | } |
243 | } |
244 | |
245 | /// Returns the maximum number of streams that can be initiated by this |
246 | /// peer. |
247 | pub(crate) fn max_send_streams(&self) -> usize { |
248 | self.max_send_streams |
249 | } |
250 | |
251 | /// Returns the maximum number of streams that can be initiated by the |
252 | /// remote peer. |
253 | pub(crate) fn max_recv_streams(&self) -> usize { |
254 | self.max_recv_streams |
255 | } |
256 | |
257 | fn dec_num_streams(&mut self, stream: &mut store::Ptr) { |
258 | assert!(stream.is_counted); |
259 | |
260 | if self.peer.is_local_init(stream.id) { |
261 | assert!(self.num_send_streams > 0); |
262 | self.num_send_streams -= 1; |
263 | stream.is_counted = false; |
264 | } else { |
265 | assert!(self.num_recv_streams > 0); |
266 | self.num_recv_streams -= 1; |
267 | stream.is_counted = false; |
268 | } |
269 | } |
270 | |
271 | fn dec_num_reset_streams(&mut self) { |
272 | assert!(self.num_local_reset_streams > 0); |
273 | self.num_local_reset_streams -= 1; |
274 | } |
275 | } |
276 | |
277 | impl Drop for Counts { |
278 | fn drop(&mut self) { |
279 | use std::thread; |
280 | |
281 | if !thread::panicking() { |
282 | debug_assert!(!self.has_streams()); |
283 | } |
284 | } |
285 | } |
286 | |