| 1 | use super::*; |
| 2 | |
| 3 | #[derive (Debug)] |
| 4 | pub(super) struct Counts { |
| 5 | /// Acting as a client or server. This allows us to track which values to |
| 6 | /// inc / dec. |
| 7 | peer: peer::Dyn, |
| 8 | |
| 9 | /// Maximum number of locally initiated streams |
| 10 | max_send_streams: usize, |
| 11 | |
| 12 | /// Current number of remote initiated streams |
| 13 | num_send_streams: usize, |
| 14 | |
| 15 | /// Maximum number of remote initiated streams |
| 16 | max_recv_streams: usize, |
| 17 | |
| 18 | /// Current number of locally initiated streams |
| 19 | num_recv_streams: usize, |
| 20 | |
| 21 | /// Maximum number of pending locally reset streams |
| 22 | max_local_reset_streams: usize, |
| 23 | |
| 24 | /// Current number of pending locally reset streams |
| 25 | num_local_reset_streams: usize, |
| 26 | |
| 27 | /// Max number of "pending accept" streams that were remotely reset |
| 28 | max_remote_reset_streams: usize, |
| 29 | |
| 30 | /// Current number of "pending accept" streams that were remotely reset |
| 31 | num_remote_reset_streams: usize, |
| 32 | |
| 33 | /// Maximum number of locally reset streams due to protocol error across |
| 34 | /// the lifetime of the connection. |
| 35 | /// |
| 36 | /// When this gets exceeded, we issue GOAWAYs. |
| 37 | max_local_error_reset_streams: Option<usize>, |
| 38 | |
| 39 | /// Total number of locally reset streams due to protocol error across the |
| 40 | /// lifetime of the connection. |
| 41 | num_local_error_reset_streams: usize, |
| 42 | } |
| 43 | |
| 44 | impl Counts { |
| 45 | /// Create a new `Counts` using the provided configuration values. |
| 46 | pub fn new(peer: peer::Dyn, config: &Config) -> Self { |
| 47 | Counts { |
| 48 | peer, |
| 49 | max_send_streams: config.initial_max_send_streams, |
| 50 | num_send_streams: 0, |
| 51 | max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), |
| 52 | num_recv_streams: 0, |
| 53 | max_local_reset_streams: config.local_reset_max, |
| 54 | num_local_reset_streams: 0, |
| 55 | max_remote_reset_streams: config.remote_reset_max, |
| 56 | num_remote_reset_streams: 0, |
| 57 | max_local_error_reset_streams: config.local_max_error_reset_streams, |
| 58 | num_local_error_reset_streams: 0, |
| 59 | } |
| 60 | } |
| 61 | |
| 62 | /// Returns true when the next opened stream will reach capacity of outbound streams |
| 63 | /// |
| 64 | /// The number of client send streams is incremented in prioritize; send_request has to guess if |
| 65 | /// it should wait before allowing another request to be sent. |
| 66 | pub fn next_send_stream_will_reach_capacity(&self) -> bool { |
| 67 | self.max_send_streams <= (self.num_send_streams + 1) |
| 68 | } |
| 69 | |
| 70 | /// Returns the current peer |
| 71 | pub fn peer(&self) -> peer::Dyn { |
| 72 | self.peer |
| 73 | } |
| 74 | |
| 75 | pub fn has_streams(&self) -> bool { |
| 76 | self.num_send_streams != 0 || self.num_recv_streams != 0 |
| 77 | } |
| 78 | |
| 79 | /// Returns true if we can issue another local reset due to protocol error. |
| 80 | pub fn can_inc_num_local_error_resets(&self) -> bool { |
| 81 | if let Some(max) = self.max_local_error_reset_streams { |
| 82 | max > self.num_local_error_reset_streams |
| 83 | } else { |
| 84 | true |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | pub fn inc_num_local_error_resets(&mut self) { |
| 89 | assert!(self.can_inc_num_local_error_resets()); |
| 90 | |
| 91 | // Increment the number of remote initiated streams |
| 92 | self.num_local_error_reset_streams += 1; |
| 93 | } |
| 94 | |
| 95 | pub(crate) fn max_local_error_resets(&self) -> Option<usize> { |
| 96 | self.max_local_error_reset_streams |
| 97 | } |
| 98 | |
| 99 | /// Returns true if the receive stream concurrency can be incremented |
| 100 | pub fn can_inc_num_recv_streams(&self) -> bool { |
| 101 | self.max_recv_streams > self.num_recv_streams |
| 102 | } |
| 103 | |
| 104 | /// Increments the number of concurrent receive streams. |
| 105 | /// |
| 106 | /// # Panics |
| 107 | /// |
| 108 | /// Panics on failure as this should have been validated before hand. |
| 109 | pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) { |
| 110 | assert!(self.can_inc_num_recv_streams()); |
| 111 | assert!(!stream.is_counted); |
| 112 | |
| 113 | // Increment the number of remote initiated streams |
| 114 | self.num_recv_streams += 1; |
| 115 | stream.is_counted = true; |
| 116 | } |
| 117 | |
| 118 | /// Returns true if the send stream concurrency can be incremented |
| 119 | pub fn can_inc_num_send_streams(&self) -> bool { |
| 120 | self.max_send_streams > self.num_send_streams |
| 121 | } |
| 122 | |
| 123 | /// Increments the number of concurrent send streams. |
| 124 | /// |
| 125 | /// # Panics |
| 126 | /// |
| 127 | /// Panics on failure as this should have been validated before hand. |
| 128 | pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) { |
| 129 | assert!(self.can_inc_num_send_streams()); |
| 130 | assert!(!stream.is_counted); |
| 131 | |
| 132 | // Increment the number of remote initiated streams |
| 133 | self.num_send_streams += 1; |
| 134 | stream.is_counted = true; |
| 135 | } |
| 136 | |
| 137 | /// Returns true if the number of pending reset streams can be incremented. |
| 138 | pub fn can_inc_num_reset_streams(&self) -> bool { |
| 139 | self.max_local_reset_streams > self.num_local_reset_streams |
| 140 | } |
| 141 | |
| 142 | /// Increments the number of pending reset streams. |
| 143 | /// |
| 144 | /// # Panics |
| 145 | /// |
| 146 | /// Panics on failure as this should have been validated before hand. |
| 147 | pub fn inc_num_reset_streams(&mut self) { |
| 148 | assert!(self.can_inc_num_reset_streams()); |
| 149 | |
| 150 | self.num_local_reset_streams += 1; |
| 151 | } |
| 152 | |
| 153 | pub(crate) fn max_remote_reset_streams(&self) -> usize { |
| 154 | self.max_remote_reset_streams |
| 155 | } |
| 156 | |
| 157 | /// Returns true if the number of pending REMOTE reset streams can be |
| 158 | /// incremented. |
| 159 | pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool { |
| 160 | self.max_remote_reset_streams > self.num_remote_reset_streams |
| 161 | } |
| 162 | |
| 163 | /// Increments the number of pending REMOTE reset streams. |
| 164 | /// |
| 165 | /// # Panics |
| 166 | /// |
| 167 | /// Panics on failure as this should have been validated before hand. |
| 168 | pub(crate) fn inc_num_remote_reset_streams(&mut self) { |
| 169 | assert!(self.can_inc_num_remote_reset_streams()); |
| 170 | |
| 171 | self.num_remote_reset_streams += 1; |
| 172 | } |
| 173 | |
| 174 | pub(crate) fn dec_num_remote_reset_streams(&mut self) { |
| 175 | assert!(self.num_remote_reset_streams > 0); |
| 176 | |
| 177 | self.num_remote_reset_streams -= 1; |
| 178 | } |
| 179 | |
| 180 | pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) { |
| 181 | match settings.max_concurrent_streams() { |
| 182 | Some(val) => self.max_send_streams = val as usize, |
| 183 | None if is_initial => self.max_send_streams = usize::MAX, |
| 184 | None => {} |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | /// Run a block of code that could potentially transition a stream's state. |
| 189 | /// |
| 190 | /// If the stream state transitions to closed, this function will perform |
| 191 | /// all necessary cleanup. |
| 192 | /// |
| 193 | /// TODO: Is this function still needed? |
| 194 | pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U |
| 195 | where |
| 196 | F: FnOnce(&mut Self, &mut store::Ptr) -> U, |
| 197 | { |
| 198 | // TODO: Does this need to be computed before performing the action? |
| 199 | let is_pending_reset = stream.is_pending_reset_expiration(); |
| 200 | |
| 201 | // Run the action |
| 202 | let ret = f(self, &mut stream); |
| 203 | |
| 204 | self.transition_after(stream, is_pending_reset); |
| 205 | |
| 206 | ret |
| 207 | } |
| 208 | |
| 209 | // TODO: move this to macro? |
| 210 | pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) { |
| 211 | tracing::trace!( |
| 212 | "transition_after; stream= {:?}; state= {:?}; is_closed= {:?}; \ |
| 213 | pending_send_empty= {:?}; buffered_send_data= {}; \ |
| 214 | num_recv= {}; num_send= {}" , |
| 215 | stream.id, |
| 216 | stream.state, |
| 217 | stream.is_closed(), |
| 218 | stream.pending_send.is_empty(), |
| 219 | stream.buffered_send_data, |
| 220 | self.num_recv_streams, |
| 221 | self.num_send_streams |
| 222 | ); |
| 223 | |
| 224 | if stream.is_closed() { |
| 225 | if !stream.is_pending_reset_expiration() { |
| 226 | stream.unlink(); |
| 227 | if is_reset_counted { |
| 228 | self.dec_num_reset_streams(); |
| 229 | } |
| 230 | } |
| 231 | |
| 232 | if !stream.state.is_scheduled_reset() && stream.is_counted { |
| 233 | tracing::trace!("dec_num_streams; stream= {:?}" , stream.id); |
| 234 | // Decrement the number of active streams. |
| 235 | self.dec_num_streams(&mut stream); |
| 236 | } |
| 237 | } |
| 238 | |
| 239 | // Release the stream if it requires releasing |
| 240 | if stream.is_released() { |
| 241 | stream.remove(); |
| 242 | } |
| 243 | } |
| 244 | |
| 245 | /// Returns the maximum number of streams that can be initiated by this |
| 246 | /// peer. |
| 247 | pub(crate) fn max_send_streams(&self) -> usize { |
| 248 | self.max_send_streams |
| 249 | } |
| 250 | |
| 251 | /// Returns the maximum number of streams that can be initiated by the |
| 252 | /// remote peer. |
| 253 | pub(crate) fn max_recv_streams(&self) -> usize { |
| 254 | self.max_recv_streams |
| 255 | } |
| 256 | |
| 257 | fn dec_num_streams(&mut self, stream: &mut store::Ptr) { |
| 258 | assert!(stream.is_counted); |
| 259 | |
| 260 | if self.peer.is_local_init(stream.id) { |
| 261 | assert!(self.num_send_streams > 0); |
| 262 | self.num_send_streams -= 1; |
| 263 | stream.is_counted = false; |
| 264 | } else { |
| 265 | assert!(self.num_recv_streams > 0); |
| 266 | self.num_recv_streams -= 1; |
| 267 | stream.is_counted = false; |
| 268 | } |
| 269 | } |
| 270 | |
| 271 | fn dec_num_reset_streams(&mut self) { |
| 272 | assert!(self.num_local_reset_streams > 0); |
| 273 | self.num_local_reset_streams -= 1; |
| 274 | } |
| 275 | } |
| 276 | |
| 277 | impl Drop for Counts { |
| 278 | fn drop(&mut self) { |
| 279 | use std::thread; |
| 280 | |
| 281 | if !thread::panicking() { |
| 282 | debug_assert!(!self.has_streams()); |
| 283 | } |
| 284 | } |
| 285 | } |
| 286 | |