1 | /// HTTP2 Ping usage |
2 | /// |
3 | /// hyper uses HTTP2 pings for two purposes: |
4 | /// |
5 | /// 1. Adaptive flow control using BDP |
6 | /// 2. Connection keep-alive |
7 | /// |
8 | /// Both cases are optional. |
9 | /// |
10 | /// # BDP Algorithm |
11 | /// |
12 | /// 1. When receiving a DATA frame, if a BDP ping isn't outstanding: |
13 | /// 1a. Record current time. |
14 | /// 1b. Send a BDP ping. |
15 | /// 2. Increment the number of received bytes. |
16 | /// 3. When the BDP ping ack is received: |
17 | /// 3a. Record duration from sent time. |
18 | /// 3b. Merge RTT with a running average. |
19 | /// 3c. Calculate bdp as bytes/rtt. |
20 | /// 3d. If bdp is over 2/3 max, set new max to bdp and update windows. |
21 | use std::fmt; |
22 | use std::future::Future; |
23 | use std::pin::Pin; |
24 | use std::sync::{Arc, Mutex}; |
25 | use std::task::{self, Poll}; |
26 | use std::time::{Duration, Instant}; |
27 | |
28 | use h2::{Ping, PingPong}; |
29 | |
30 | use crate::common::time::Time; |
31 | use crate::rt::Sleep; |
32 | |
33 | type WindowSize = u32; |
34 | |
35 | pub(super) fn disabled() -> Recorder { |
36 | Recorder { shared: None } |
37 | } |
38 | |
39 | pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Recorder, Ponger) { |
40 | debug_assert!( |
41 | config.is_enabled(), |
42 | "ping channel requires bdp or keep-alive config" , |
43 | ); |
44 | |
45 | let bdp = config.bdp_initial_window.map(|wnd| Bdp { |
46 | bdp: wnd, |
47 | max_bandwidth: 0.0, |
48 | rtt: 0.0, |
49 | ping_delay: Duration::from_millis(100), |
50 | stable_count: 0, |
51 | }); |
52 | |
53 | let (bytes, next_bdp_at) = if bdp.is_some() { |
54 | (Some(0), Some(Instant::now())) |
55 | } else { |
56 | (None, None) |
57 | }; |
58 | |
59 | let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive { |
60 | interval, |
61 | timeout: config.keep_alive_timeout, |
62 | while_idle: config.keep_alive_while_idle, |
63 | sleep: __timer.sleep(interval), |
64 | state: KeepAliveState::Init, |
65 | timer: __timer, |
66 | }); |
67 | |
68 | let last_read_at = keep_alive.as_ref().map(|_| Instant::now()); |
69 | |
70 | let shared = Arc::new(Mutex::new(Shared { |
71 | bytes, |
72 | last_read_at, |
73 | is_keep_alive_timed_out: false, |
74 | ping_pong, |
75 | ping_sent_at: None, |
76 | next_bdp_at, |
77 | })); |
78 | |
79 | ( |
80 | Recorder { |
81 | shared: Some(shared.clone()), |
82 | }, |
83 | Ponger { |
84 | bdp, |
85 | keep_alive, |
86 | shared, |
87 | }, |
88 | ) |
89 | } |
90 | |
91 | #[derive (Clone)] |
92 | pub(super) struct Config { |
93 | pub(super) bdp_initial_window: Option<WindowSize>, |
94 | /// If no frames are received in this amount of time, a PING frame is sent. |
95 | pub(super) keep_alive_interval: Option<Duration>, |
96 | /// After sending a keepalive PING, the connection will be closed if |
97 | /// a pong is not received in this amount of time. |
98 | pub(super) keep_alive_timeout: Duration, |
99 | /// If true, sends pings even when there are no active streams. |
100 | pub(super) keep_alive_while_idle: bool, |
101 | } |
102 | |
103 | #[derive (Clone)] |
104 | pub(crate) struct Recorder { |
105 | shared: Option<Arc<Mutex<Shared>>>, |
106 | } |
107 | |
108 | pub(super) struct Ponger { |
109 | bdp: Option<Bdp>, |
110 | keep_alive: Option<KeepAlive>, |
111 | shared: Arc<Mutex<Shared>>, |
112 | } |
113 | |
114 | struct Shared { |
115 | ping_pong: PingPong, |
116 | ping_sent_at: Option<Instant>, |
117 | |
118 | // bdp |
119 | /// If `Some`, bdp is enabled, and this tracks how many bytes have been |
120 | /// read during the current sample. |
121 | bytes: Option<usize>, |
122 | /// We delay a variable amount of time between BDP pings. This allows us |
123 | /// to send less pings as the bandwidth stabilizes. |
124 | next_bdp_at: Option<Instant>, |
125 | |
126 | // keep-alive |
127 | /// If `Some`, keep-alive is enabled, and the Instant is how long ago |
128 | /// the connection read the last frame. |
129 | last_read_at: Option<Instant>, |
130 | |
131 | is_keep_alive_timed_out: bool, |
132 | } |
133 | |
134 | struct Bdp { |
135 | /// Current BDP in bytes |
136 | bdp: u32, |
137 | /// Largest bandwidth we've seen so far. |
138 | max_bandwidth: f64, |
139 | /// Round trip time in seconds |
140 | rtt: f64, |
141 | /// Delay the next ping by this amount. |
142 | /// |
143 | /// This will change depending on how stable the current bandwidth is. |
144 | ping_delay: Duration, |
145 | /// The count of ping round trips where BDP has stayed the same. |
146 | stable_count: u32, |
147 | } |
148 | |
149 | struct KeepAlive { |
150 | /// If no frames are received in this amount of time, a PING frame is sent. |
151 | interval: Duration, |
152 | /// After sending a keepalive PING, the connection will be closed if |
153 | /// a pong is not received in this amount of time. |
154 | timeout: Duration, |
155 | /// If true, sends pings even when there are no active streams. |
156 | while_idle: bool, |
157 | state: KeepAliveState, |
158 | sleep: Pin<Box<dyn Sleep>>, |
159 | timer: Time, |
160 | } |
161 | |
162 | enum KeepAliveState { |
163 | Init, |
164 | Scheduled(Instant), |
165 | PingSent, |
166 | } |
167 | |
168 | pub(super) enum Ponged { |
169 | SizeUpdate(WindowSize), |
170 | KeepAliveTimedOut, |
171 | } |
172 | |
173 | #[derive (Debug)] |
174 | pub(super) struct KeepAliveTimedOut; |
175 | |
176 | // ===== impl Config ===== |
177 | |
178 | impl Config { |
179 | pub(super) fn is_enabled(&self) -> bool { |
180 | self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some() |
181 | } |
182 | } |
183 | |
184 | // ===== impl Recorder ===== |
185 | |
186 | impl Recorder { |
187 | pub(crate) fn record_data(&self, len: usize) { |
188 | let shared = if let Some(ref shared) = self.shared { |
189 | shared |
190 | } else { |
191 | return; |
192 | }; |
193 | |
194 | let mut locked = shared.lock().unwrap(); |
195 | |
196 | locked.update_last_read_at(); |
197 | |
198 | // are we ready to send another bdp ping? |
199 | // if not, we don't need to record bytes either |
200 | |
201 | if let Some(ref next_bdp_at) = locked.next_bdp_at { |
202 | if Instant::now() < *next_bdp_at { |
203 | return; |
204 | } else { |
205 | locked.next_bdp_at = None; |
206 | } |
207 | } |
208 | |
209 | if let Some(ref mut bytes) = locked.bytes { |
210 | *bytes += len; |
211 | } else { |
212 | // no need to send bdp ping if bdp is disabled |
213 | return; |
214 | } |
215 | |
216 | if !locked.is_ping_sent() { |
217 | locked.send_ping(); |
218 | } |
219 | } |
220 | |
221 | pub(crate) fn record_non_data(&self) { |
222 | let shared = if let Some(ref shared) = self.shared { |
223 | shared |
224 | } else { |
225 | return; |
226 | }; |
227 | |
228 | let mut locked = shared.lock().unwrap(); |
229 | |
230 | locked.update_last_read_at(); |
231 | } |
232 | |
233 | /// If the incoming stream is already closed, convert self into |
234 | /// a disabled reporter. |
235 | #[cfg (feature = "client" )] |
236 | pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self { |
237 | if stream.is_end_stream() { |
238 | disabled() |
239 | } else { |
240 | self |
241 | } |
242 | } |
243 | |
244 | pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> { |
245 | if let Some(ref shared) = self.shared { |
246 | let locked = shared.lock().unwrap(); |
247 | if locked.is_keep_alive_timed_out { |
248 | return Err(KeepAliveTimedOut.crate_error()); |
249 | } |
250 | } |
251 | |
252 | // else |
253 | Ok(()) |
254 | } |
255 | } |
256 | |
257 | // ===== impl Ponger ===== |
258 | |
259 | impl Ponger { |
260 | pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> { |
261 | let now = Instant::now(); |
262 | let mut locked = self.shared.lock().unwrap(); |
263 | let is_idle = self.is_idle(); |
264 | |
265 | if let Some(ref mut ka) = self.keep_alive { |
266 | ka.maybe_schedule(is_idle, &locked); |
267 | ka.maybe_ping(cx, is_idle, &mut locked); |
268 | } |
269 | |
270 | if !locked.is_ping_sent() { |
271 | // XXX: this doesn't register a waker...? |
272 | return Poll::Pending; |
273 | } |
274 | |
275 | match locked.ping_pong.poll_pong(cx) { |
276 | Poll::Ready(Ok(_pong)) => { |
277 | let start = locked |
278 | .ping_sent_at |
279 | .expect("pong received implies ping_sent_at" ); |
280 | locked.ping_sent_at = None; |
281 | let rtt = now - start; |
282 | trace!("recv pong" ); |
283 | |
284 | if let Some(ref mut ka) = self.keep_alive { |
285 | locked.update_last_read_at(); |
286 | ka.maybe_schedule(is_idle, &locked); |
287 | ka.maybe_ping(cx, is_idle, &mut locked); |
288 | } |
289 | |
290 | if let Some(ref mut bdp) = self.bdp { |
291 | let bytes = locked.bytes.expect("bdp enabled implies bytes" ); |
292 | locked.bytes = Some(0); // reset |
293 | trace!("received BDP ack; bytes = {}, rtt = {:?}" , bytes, rtt); |
294 | |
295 | let update = bdp.calculate(bytes, rtt); |
296 | locked.next_bdp_at = Some(now + bdp.ping_delay); |
297 | if let Some(update) = update { |
298 | return Poll::Ready(Ponged::SizeUpdate(update)); |
299 | } |
300 | } |
301 | } |
302 | Poll::Ready(Err(_e)) => { |
303 | debug!("pong error: {}" , _e); |
304 | } |
305 | Poll::Pending => { |
306 | if let Some(ref mut ka) = self.keep_alive { |
307 | if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) { |
308 | self.keep_alive = None; |
309 | locked.is_keep_alive_timed_out = true; |
310 | return Poll::Ready(Ponged::KeepAliveTimedOut); |
311 | } |
312 | } |
313 | } |
314 | } |
315 | |
316 | // XXX: this doesn't register a waker...? |
317 | Poll::Pending |
318 | } |
319 | |
320 | fn is_idle(&self) -> bool { |
321 | Arc::strong_count(&self.shared) <= 2 |
322 | } |
323 | } |
324 | |
325 | // ===== impl Shared ===== |
326 | |
327 | impl Shared { |
328 | fn send_ping(&mut self) { |
329 | match self.ping_pong.send_ping(Ping::opaque()) { |
330 | Ok(()) => { |
331 | self.ping_sent_at = Some(Instant::now()); |
332 | trace!("sent ping" ); |
333 | } |
334 | Err(_err) => { |
335 | debug!("error sending ping: {}" , _err); |
336 | } |
337 | } |
338 | } |
339 | |
340 | fn is_ping_sent(&self) -> bool { |
341 | self.ping_sent_at.is_some() |
342 | } |
343 | |
344 | fn update_last_read_at(&mut self) { |
345 | if self.last_read_at.is_some() { |
346 | self.last_read_at = Some(Instant::now()); |
347 | } |
348 | } |
349 | |
350 | fn last_read_at(&self) -> Instant { |
351 | self.last_read_at.expect("keep_alive expects last_read_at" ) |
352 | } |
353 | } |
354 | |
355 | // ===== impl Bdp ===== |
356 | |
357 | /// Any higher than this likely will be hitting the TCP flow control. |
358 | const BDP_LIMIT: usize = 1024 * 1024 * 16; |
359 | |
360 | impl Bdp { |
361 | fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> { |
362 | // No need to do any math if we're at the limit. |
363 | if self.bdp as usize == BDP_LIMIT { |
364 | self.stabilize_delay(); |
365 | return None; |
366 | } |
367 | |
368 | // average the rtt |
369 | let rtt = seconds(rtt); |
370 | if self.rtt == 0.0 { |
371 | // First sample means rtt is first rtt. |
372 | self.rtt = rtt; |
373 | } else { |
374 | // Weigh this rtt as 1/8 for a moving average. |
375 | self.rtt += (rtt - self.rtt) * 0.125; |
376 | } |
377 | |
378 | // calculate the current bandwidth |
379 | let bw = (bytes as f64) / (self.rtt * 1.5); |
380 | trace!("current bandwidth = {:.1}B/s" , bw); |
381 | |
382 | if bw < self.max_bandwidth { |
383 | // not a faster bandwidth, so don't update |
384 | self.stabilize_delay(); |
385 | return None; |
386 | } else { |
387 | self.max_bandwidth = bw; |
388 | } |
389 | |
390 | // if the current `bytes` sample is at least 2/3 the previous |
391 | // bdp, increase to double the current sample. |
392 | if bytes >= self.bdp as usize * 2 / 3 { |
393 | self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; |
394 | trace!("BDP increased to {}" , self.bdp); |
395 | |
396 | self.stable_count = 0; |
397 | self.ping_delay /= 2; |
398 | Some(self.bdp) |
399 | } else { |
400 | self.stabilize_delay(); |
401 | None |
402 | } |
403 | } |
404 | |
405 | fn stabilize_delay(&mut self) { |
406 | if self.ping_delay < Duration::from_secs(10) { |
407 | self.stable_count += 1; |
408 | |
409 | if self.stable_count >= 2 { |
410 | self.ping_delay *= 4; |
411 | self.stable_count = 0; |
412 | } |
413 | } |
414 | } |
415 | } |
416 | |
417 | fn seconds(dur: Duration) -> f64 { |
418 | const NANOS_PER_SEC: f64 = 1_000_000_000.0; |
419 | let secs: f64 = dur.as_secs() as f64; |
420 | secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC |
421 | } |
422 | |
423 | // ===== impl KeepAlive ===== |
424 | |
425 | impl KeepAlive { |
426 | fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) { |
427 | match self.state { |
428 | KeepAliveState::Init => { |
429 | if !self.while_idle && is_idle { |
430 | return; |
431 | } |
432 | |
433 | self.schedule(shared); |
434 | } |
435 | KeepAliveState::PingSent => { |
436 | if shared.is_ping_sent() { |
437 | return; |
438 | } |
439 | self.schedule(shared); |
440 | } |
441 | KeepAliveState::Scheduled(..) => (), |
442 | } |
443 | } |
444 | |
445 | fn schedule(&mut self, shared: &Shared) { |
446 | let interval = shared.last_read_at() + self.interval; |
447 | self.state = KeepAliveState::Scheduled(interval); |
448 | self.timer.reset(&mut self.sleep, interval); |
449 | } |
450 | |
451 | fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) { |
452 | match self.state { |
453 | KeepAliveState::Scheduled(at) => { |
454 | if Pin::new(&mut self.sleep).poll(cx).is_pending() { |
455 | return; |
456 | } |
457 | // check if we've received a frame while we were scheduled |
458 | if shared.last_read_at() + self.interval > at { |
459 | self.state = KeepAliveState::Init; |
460 | cx.waker().wake_by_ref(); // schedule us again |
461 | return; |
462 | } |
463 | if !self.while_idle && is_idle { |
464 | trace!("keep-alive no need to ping when idle and while_idle=false" ); |
465 | return; |
466 | } |
467 | trace!("keep-alive interval ({:?}) reached" , self.interval); |
468 | shared.send_ping(); |
469 | self.state = KeepAliveState::PingSent; |
470 | let timeout = Instant::now() + self.timeout; |
471 | self.timer.reset(&mut self.sleep, timeout); |
472 | } |
473 | KeepAliveState::Init | KeepAliveState::PingSent => (), |
474 | } |
475 | } |
476 | |
477 | fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> { |
478 | match self.state { |
479 | KeepAliveState::PingSent => { |
480 | if Pin::new(&mut self.sleep).poll(cx).is_pending() { |
481 | return Ok(()); |
482 | } |
483 | trace!("keep-alive timeout ({:?}) reached" , self.timeout); |
484 | Err(KeepAliveTimedOut) |
485 | } |
486 | KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()), |
487 | } |
488 | } |
489 | } |
490 | |
491 | // ===== impl KeepAliveTimedOut ===== |
492 | |
493 | impl KeepAliveTimedOut { |
494 | pub(super) fn crate_error(self) -> crate::Error { |
495 | crate::Error::new(crate::error::Kind::Http2).with(self) |
496 | } |
497 | } |
498 | |
499 | impl fmt::Display for KeepAliveTimedOut { |
500 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
501 | f.write_str(data:"keep-alive timed out" ) |
502 | } |
503 | } |
504 | |
505 | impl std::error::Error for KeepAliveTimedOut { |
506 | fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { |
507 | Some(&crate::error::TimedOut) |
508 | } |
509 | } |
510 | |