| 1 | use crate::frame::Reason; |
| 2 | use crate::proto::{WindowSize, MAX_WINDOW_SIZE}; |
| 3 | |
| 4 | use std::fmt; |
| 5 | |
| 6 | // We don't want to send WINDOW_UPDATE frames for tiny changes, but instead |
| 7 | // aggregate them when the changes are significant. Many implementations do |
| 8 | // this by keeping a "ratio" of the update version the allowed window size. |
| 9 | // |
| 10 | // While some may wish to represent this ratio as percentage, using a f32, |
| 11 | // we skip having to deal with float math and stick to integers. To do so, |
| 12 | // the "ratio" is represented by 2 i32s, split into the numerator and |
| 13 | // denominator. For example, a 50% ratio is simply represented as 1/2. |
| 14 | // |
| 15 | // An example applying this ratio: If a stream has an allowed window size of |
| 16 | // 100 bytes, WINDOW_UPDATE frames are scheduled when the unclaimed change |
| 17 | // becomes greater than 1/2, or 50 bytes. |
| 18 | const UNCLAIMED_NUMERATOR: i32 = 1; |
| 19 | const UNCLAIMED_DENOMINATOR: i32 = 2; |
| 20 | |
| 21 | #[test ] |
| 22 | #[allow (clippy::assertions_on_constants)] |
| 23 | fn sanity_unclaimed_ratio() { |
| 24 | assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR); |
| 25 | assert!(UNCLAIMED_NUMERATOR >= 0); |
| 26 | assert!(UNCLAIMED_DENOMINATOR > 0); |
| 27 | } |
| 28 | |
| 29 | #[derive (Copy, Clone, Debug)] |
| 30 | pub struct FlowControl { |
| 31 | /// Window the peer knows about. |
| 32 | /// |
| 33 | /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received. |
| 34 | /// |
| 35 | /// For example, say the peer sends a request and uses 32kb of the window. |
| 36 | /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust |
| 37 | /// its understanding of the capacity of the window, and that would be: |
| 38 | /// |
| 39 | /// ```notrust |
| 40 | /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb |
| 41 | /// ``` |
| 42 | window_size: Window, |
| 43 | |
| 44 | /// Window that we know about. |
| 45 | /// |
| 46 | /// This can go negative if a user declares a smaller target window than |
| 47 | /// the peer knows about. |
| 48 | available: Window, |
| 49 | } |
| 50 | |
| 51 | impl FlowControl { |
| 52 | pub fn new() -> FlowControl { |
| 53 | FlowControl { |
| 54 | window_size: Window(0), |
| 55 | available: Window(0), |
| 56 | } |
| 57 | } |
| 58 | |
| 59 | /// Returns the window size as known by the peer |
| 60 | pub fn window_size(&self) -> WindowSize { |
| 61 | self.window_size.as_size() |
| 62 | } |
| 63 | |
| 64 | /// Returns the window size available to the consumer |
| 65 | pub fn available(&self) -> Window { |
| 66 | self.available |
| 67 | } |
| 68 | |
| 69 | /// Returns true if there is unavailable window capacity |
| 70 | pub fn has_unavailable(&self) -> bool { |
| 71 | if self.window_size < 0 { |
| 72 | return false; |
| 73 | } |
| 74 | |
| 75 | self.window_size > self.available |
| 76 | } |
| 77 | |
| 78 | pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> { |
| 79 | self.available.decrease_by(capacity) |
| 80 | } |
| 81 | |
| 82 | pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> { |
| 83 | self.available.increase_by(capacity) |
| 84 | } |
| 85 | |
| 86 | /// If a WINDOW_UPDATE frame should be sent, returns a positive number |
| 87 | /// representing the increment to be used. |
| 88 | /// |
| 89 | /// If there is no available bytes to be reclaimed, or the number of |
| 90 | /// available bytes does not reach the threshold, this returns `None`. |
| 91 | /// |
| 92 | /// This represents pending outbound WINDOW_UPDATE frames. |
| 93 | pub fn unclaimed_capacity(&self) -> Option<WindowSize> { |
| 94 | let available = self.available; |
| 95 | |
| 96 | if self.window_size >= available { |
| 97 | return None; |
| 98 | } |
| 99 | |
| 100 | let unclaimed = available.0 - self.window_size.0; |
| 101 | let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR; |
| 102 | |
| 103 | if unclaimed < threshold { |
| 104 | None |
| 105 | } else { |
| 106 | Some(unclaimed as WindowSize) |
| 107 | } |
| 108 | } |
| 109 | |
| 110 | /// Increase the window size. |
| 111 | /// |
| 112 | /// This is called after receiving a WINDOW_UPDATE frame |
| 113 | pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> { |
| 114 | let (val, overflow) = self.window_size.0.overflowing_add(sz as i32); |
| 115 | |
| 116 | if overflow { |
| 117 | return Err(Reason::FLOW_CONTROL_ERROR); |
| 118 | } |
| 119 | |
| 120 | if val > MAX_WINDOW_SIZE as i32 { |
| 121 | return Err(Reason::FLOW_CONTROL_ERROR); |
| 122 | } |
| 123 | |
| 124 | tracing::trace!( |
| 125 | "inc_window; sz= {}; old= {}; new= {}" , |
| 126 | sz, |
| 127 | self.window_size, |
| 128 | val |
| 129 | ); |
| 130 | |
| 131 | self.window_size = Window(val); |
| 132 | Ok(()) |
| 133 | } |
| 134 | |
| 135 | /// Decrement the send-side window size. |
| 136 | /// |
| 137 | /// This is called after receiving a SETTINGS frame with a lower |
| 138 | /// INITIAL_WINDOW_SIZE value. |
| 139 | pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> { |
| 140 | tracing::trace!( |
| 141 | "dec_window; sz= {}; window= {}, available= {}" , |
| 142 | sz, |
| 143 | self.window_size, |
| 144 | self.available |
| 145 | ); |
| 146 | // ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can. |
| 147 | self.window_size.decrease_by(sz)?; |
| 148 | Ok(()) |
| 149 | } |
| 150 | |
| 151 | /// Decrement the recv-side window size. |
| 152 | /// |
| 153 | /// This is called after receiving a SETTINGS ACK frame with a lower |
| 154 | /// INITIAL_WINDOW_SIZE value. |
| 155 | pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> { |
| 156 | tracing::trace!( |
| 157 | "dec_recv_window; sz= {}; window= {}, available= {}" , |
| 158 | sz, |
| 159 | self.window_size, |
| 160 | self.available |
| 161 | ); |
| 162 | // This should not be able to overflow `window_size` from the bottom. |
| 163 | self.window_size.decrease_by(sz)?; |
| 164 | self.available.decrease_by(sz)?; |
| 165 | Ok(()) |
| 166 | } |
| 167 | |
| 168 | /// Decrements the window reflecting data has actually been sent. The caller |
| 169 | /// must ensure that the window has capacity. |
| 170 | pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> { |
| 171 | tracing::trace!( |
| 172 | "send_data; sz= {}; window= {}; available= {}" , |
| 173 | sz, |
| 174 | self.window_size, |
| 175 | self.available |
| 176 | ); |
| 177 | |
| 178 | // If send size is zero it's meaningless to update flow control window |
| 179 | if sz > 0 { |
| 180 | // Ensure that the argument is correct |
| 181 | assert!(self.window_size.0 >= sz as i32); |
| 182 | |
| 183 | // Update values |
| 184 | self.window_size.decrease_by(sz)?; |
| 185 | self.available.decrease_by(sz)?; |
| 186 | } |
| 187 | Ok(()) |
| 188 | } |
| 189 | } |
| 190 | |
| 191 | /// The current capacity of a flow-controlled Window. |
| 192 | /// |
| 193 | /// This number can go negative when either side has used a certain amount |
| 194 | /// of capacity when the other side advertises a reduction in size. |
| 195 | /// |
| 196 | /// This type tries to centralize the knowledge of addition and subtraction |
| 197 | /// to this capacity, instead of having integer casts throughout the source. |
| 198 | #[derive (Clone, Copy, Debug, PartialEq, Eq, PartialOrd)] |
| 199 | pub struct Window(i32); |
| 200 | |
| 201 | impl Window { |
| 202 | pub fn as_size(&self) -> WindowSize { |
| 203 | if self.0 < 0 { |
| 204 | 0 |
| 205 | } else { |
| 206 | self.0 as WindowSize |
| 207 | } |
| 208 | } |
| 209 | |
| 210 | pub fn checked_size(&self) -> WindowSize { |
| 211 | assert!(self.0 >= 0, "negative Window" ); |
| 212 | self.0 as WindowSize |
| 213 | } |
| 214 | |
| 215 | pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> { |
| 216 | if let Some(v) = self.0.checked_sub(other as i32) { |
| 217 | self.0 = v; |
| 218 | Ok(()) |
| 219 | } else { |
| 220 | Err(Reason::FLOW_CONTROL_ERROR) |
| 221 | } |
| 222 | } |
| 223 | |
| 224 | pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> { |
| 225 | let other = self.add(other)?; |
| 226 | self.0 = other.0; |
| 227 | Ok(()) |
| 228 | } |
| 229 | |
| 230 | pub fn add(&self, other: WindowSize) -> Result<Self, Reason> { |
| 231 | if let Some(v) = self.0.checked_add(other as i32) { |
| 232 | Ok(Self(v)) |
| 233 | } else { |
| 234 | Err(Reason::FLOW_CONTROL_ERROR) |
| 235 | } |
| 236 | } |
| 237 | } |
| 238 | |
| 239 | impl PartialEq<usize> for Window { |
| 240 | fn eq(&self, other: &usize) -> bool { |
| 241 | if self.0 < 0 { |
| 242 | false |
| 243 | } else { |
| 244 | (self.0 as usize).eq(other) |
| 245 | } |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | impl PartialOrd<usize> for Window { |
| 250 | fn partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering> { |
| 251 | if self.0 < 0 { |
| 252 | Some(::std::cmp::Ordering::Less) |
| 253 | } else { |
| 254 | (self.0 as usize).partial_cmp(other) |
| 255 | } |
| 256 | } |
| 257 | } |
| 258 | |
| 259 | impl fmt::Display for Window { |
| 260 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| 261 | fmt::Display::fmt(&self.0, f) |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | impl From<Window> for isize { |
| 266 | fn from(w: Window) -> isize { |
| 267 | w.0 as isize |
| 268 | } |
| 269 | } |
| 270 | |