1use crate::frame::Reason;
2use crate::proto::{WindowSize, MAX_WINDOW_SIZE};
3
4use std::fmt;
5
6// We don't want to send WINDOW_UPDATE frames for tiny changes, but instead
7// aggregate them when the changes are significant. Many implementations do
8// this by keeping a "ratio" of the update version the allowed window size.
9//
10// While some may wish to represent this ratio as percentage, using a f32,
11// we skip having to deal with float math and stick to integers. To do so,
12// the "ratio" is represented by 2 i32s, split into the numerator and
13// denominator. For example, a 50% ratio is simply represented as 1/2.
14//
15// An example applying this ratio: If a stream has an allowed window size of
16// 100 bytes, WINDOW_UPDATE frames are scheduled when the unclaimed change
17// becomes greater than 1/2, or 50 bytes.
18const UNCLAIMED_NUMERATOR: i32 = 1;
19const UNCLAIMED_DENOMINATOR: i32 = 2;
20
21#[test]
22#[allow(clippy::assertions_on_constants)]
23fn sanity_unclaimed_ratio() {
24 assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR);
25 assert!(UNCLAIMED_NUMERATOR >= 0);
26 assert!(UNCLAIMED_DENOMINATOR > 0);
27}
28
29#[derive(Copy, Clone, Debug)]
30pub struct FlowControl {
31 /// Window the peer knows about.
32 ///
33 /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received.
34 ///
35 /// For example, say the peer sends a request and uses 32kb of the window.
36 /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust
37 /// its understanding of the capacity of the window, and that would be:
38 ///
39 /// ```notrust
40 /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb
41 /// ```
42 window_size: Window,
43
44 /// Window that we know about.
45 ///
46 /// This can go negative if a user declares a smaller target window than
47 /// the peer knows about.
48 available: Window,
49}
50
51impl FlowControl {
52 pub fn new() -> FlowControl {
53 FlowControl {
54 window_size: Window(0),
55 available: Window(0),
56 }
57 }
58
59 /// Returns the window size as known by the peer
60 pub fn window_size(&self) -> WindowSize {
61 self.window_size.as_size()
62 }
63
64 /// Returns the window size available to the consumer
65 pub fn available(&self) -> Window {
66 self.available
67 }
68
69 /// Returns true if there is unavailable window capacity
70 pub fn has_unavailable(&self) -> bool {
71 if self.window_size < 0 {
72 return false;
73 }
74
75 self.window_size > self.available
76 }
77
78 pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
79 self.available.decrease_by(capacity)
80 }
81
82 pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
83 self.available.increase_by(capacity)
84 }
85
86 /// If a WINDOW_UPDATE frame should be sent, returns a positive number
87 /// representing the increment to be used.
88 ///
89 /// If there is no available bytes to be reclaimed, or the number of
90 /// available bytes does not reach the threshold, this returns `None`.
91 ///
92 /// This represents pending outbound WINDOW_UPDATE frames.
93 pub fn unclaimed_capacity(&self) -> Option<WindowSize> {
94 let available = self.available;
95
96 if self.window_size >= available {
97 return None;
98 }
99
100 let unclaimed = available.0 - self.window_size.0;
101 let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR;
102
103 if unclaimed < threshold {
104 None
105 } else {
106 Some(unclaimed as WindowSize)
107 }
108 }
109
110 /// Increase the window size.
111 ///
112 /// This is called after receiving a WINDOW_UPDATE frame
113 pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
114 let (val, overflow) = self.window_size.0.overflowing_add(sz as i32);
115
116 if overflow {
117 return Err(Reason::FLOW_CONTROL_ERROR);
118 }
119
120 if val > MAX_WINDOW_SIZE as i32 {
121 return Err(Reason::FLOW_CONTROL_ERROR);
122 }
123
124 tracing::trace!(
125 "inc_window; sz={}; old={}; new={}",
126 sz,
127 self.window_size,
128 val
129 );
130
131 self.window_size = Window(val);
132 Ok(())
133 }
134
135 /// Decrement the send-side window size.
136 ///
137 /// This is called after receiving a SETTINGS frame with a lower
138 /// INITIAL_WINDOW_SIZE value.
139 pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
140 tracing::trace!(
141 "dec_window; sz={}; window={}, available={}",
142 sz,
143 self.window_size,
144 self.available
145 );
146 // ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can.
147 self.window_size.decrease_by(sz)?;
148 Ok(())
149 }
150
151 /// Decrement the recv-side window size.
152 ///
153 /// This is called after receiving a SETTINGS ACK frame with a lower
154 /// INITIAL_WINDOW_SIZE value.
155 pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
156 tracing::trace!(
157 "dec_recv_window; sz={}; window={}, available={}",
158 sz,
159 self.window_size,
160 self.available
161 );
162 // This should not be able to overflow `window_size` from the bottom.
163 self.window_size.decrease_by(sz)?;
164 self.available.decrease_by(sz)?;
165 Ok(())
166 }
167
168 /// Decrements the window reflecting data has actually been sent. The caller
169 /// must ensure that the window has capacity.
170 pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> {
171 tracing::trace!(
172 "send_data; sz={}; window={}; available={}",
173 sz,
174 self.window_size,
175 self.available
176 );
177
178 // If send size is zero it's meaningless to update flow control window
179 if sz > 0 {
180 // Ensure that the argument is correct
181 assert!(self.window_size.0 >= sz as i32);
182
183 // Update values
184 self.window_size.decrease_by(sz)?;
185 self.available.decrease_by(sz)?;
186 }
187 Ok(())
188 }
189}
190
191/// The current capacity of a flow-controlled Window.
192///
193/// This number can go negative when either side has used a certain amount
194/// of capacity when the other side advertises a reduction in size.
195///
196/// This type tries to centralize the knowledge of addition and subtraction
197/// to this capacity, instead of having integer casts throughout the source.
198#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd)]
199pub struct Window(i32);
200
201impl Window {
202 pub fn as_size(&self) -> WindowSize {
203 if self.0 < 0 {
204 0
205 } else {
206 self.0 as WindowSize
207 }
208 }
209
210 pub fn checked_size(&self) -> WindowSize {
211 assert!(self.0 >= 0, "negative Window");
212 self.0 as WindowSize
213 }
214
215 pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> {
216 if let Some(v) = self.0.checked_sub(other as i32) {
217 self.0 = v;
218 Ok(())
219 } else {
220 Err(Reason::FLOW_CONTROL_ERROR)
221 }
222 }
223
224 pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> {
225 let other = self.add(other)?;
226 self.0 = other.0;
227 Ok(())
228 }
229
230 pub fn add(&self, other: WindowSize) -> Result<Self, Reason> {
231 if let Some(v) = self.0.checked_add(other as i32) {
232 Ok(Self(v))
233 } else {
234 Err(Reason::FLOW_CONTROL_ERROR)
235 }
236 }
237}
238
239impl PartialEq<usize> for Window {
240 fn eq(&self, other: &usize) -> bool {
241 if self.0 < 0 {
242 false
243 } else {
244 (self.0 as usize).eq(other)
245 }
246 }
247}
248
249impl PartialOrd<usize> for Window {
250 fn partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering> {
251 if self.0 < 0 {
252 Some(::std::cmp::Ordering::Less)
253 } else {
254 (self.0 as usize).partial_cmp(other)
255 }
256 }
257}
258
259impl fmt::Display for Window {
260 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
261 fmt::Display::fmt(&self.0, f)
262 }
263}
264
265impl From<Window> for isize {
266 fn from(w: Window) -> isize {
267 w.0 as isize
268 }
269}
270