1 | /// HTTP2 Ping usage |
2 | /// |
3 | /// hyper uses HTTP2 pings for two purposes: |
4 | /// |
5 | /// 1. Adaptive flow control using BDP |
6 | /// 2. Connection keep-alive |
7 | /// |
8 | /// Both cases are optional. |
9 | /// |
10 | /// # BDP Algorithm |
11 | /// |
12 | /// 1. When receiving a DATA frame, if a BDP ping isn't outstanding: |
13 | /// 1a. Record current time. |
14 | /// 1b. Send a BDP ping. |
15 | /// 2. Increment the number of received bytes. |
16 | /// 3. When the BDP ping ack is received: |
17 | /// 3a. Record duration from sent time. |
18 | /// 3b. Merge RTT with a running average. |
19 | /// 3c. Calculate bdp as bytes/rtt. |
20 | /// 3d. If bdp is over 2/3 max, set new max to bdp and update windows. |
21 | |
22 | #[cfg (feature = "runtime" )] |
23 | use std::fmt; |
24 | #[cfg (feature = "runtime" )] |
25 | use std::future::Future; |
26 | #[cfg (feature = "runtime" )] |
27 | use std::pin::Pin; |
28 | use std::sync::{Arc, Mutex}; |
29 | use std::task::{self, Poll}; |
30 | use std::time::Duration; |
31 | #[cfg (not(feature = "runtime" ))] |
32 | use std::time::Instant; |
33 | |
34 | use h2::{Ping, PingPong}; |
35 | #[cfg (feature = "runtime" )] |
36 | use tokio::time::{Instant, Sleep}; |
37 | use tracing::{debug, trace}; |
38 | |
39 | type WindowSize = u32; |
40 | |
41 | pub(super) fn disabled() -> Recorder { |
42 | Recorder { shared: None } |
43 | } |
44 | |
45 | pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) { |
46 | debug_assert!( |
47 | config.is_enabled(), |
48 | "ping channel requires bdp or keep-alive config" , |
49 | ); |
50 | |
51 | let bdp = config.bdp_initial_window.map(|wnd| Bdp { |
52 | bdp: wnd, |
53 | max_bandwidth: 0.0, |
54 | rtt: 0.0, |
55 | ping_delay: Duration::from_millis(100), |
56 | stable_count: 0, |
57 | }); |
58 | |
59 | let (bytes, next_bdp_at) = if bdp.is_some() { |
60 | (Some(0), Some(Instant::now())) |
61 | } else { |
62 | (None, None) |
63 | }; |
64 | |
65 | #[cfg (feature = "runtime" )] |
66 | let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive { |
67 | interval, |
68 | timeout: config.keep_alive_timeout, |
69 | while_idle: config.keep_alive_while_idle, |
70 | timer: Box::pin(tokio::time::sleep(interval)), |
71 | state: KeepAliveState::Init, |
72 | }); |
73 | |
74 | #[cfg (feature = "runtime" )] |
75 | let last_read_at = keep_alive.as_ref().map(|_| Instant::now()); |
76 | |
77 | let shared = Arc::new(Mutex::new(Shared { |
78 | bytes, |
79 | #[cfg (feature = "runtime" )] |
80 | last_read_at, |
81 | #[cfg (feature = "runtime" )] |
82 | is_keep_alive_timed_out: false, |
83 | ping_pong, |
84 | ping_sent_at: None, |
85 | next_bdp_at, |
86 | })); |
87 | |
88 | ( |
89 | Recorder { |
90 | shared: Some(shared.clone()), |
91 | }, |
92 | Ponger { |
93 | bdp, |
94 | #[cfg (feature = "runtime" )] |
95 | keep_alive, |
96 | shared, |
97 | }, |
98 | ) |
99 | } |
100 | |
101 | #[derive (Clone)] |
102 | pub(super) struct Config { |
103 | pub(super) bdp_initial_window: Option<WindowSize>, |
104 | /// If no frames are received in this amount of time, a PING frame is sent. |
105 | #[cfg (feature = "runtime" )] |
106 | pub(super) keep_alive_interval: Option<Duration>, |
107 | /// After sending a keepalive PING, the connection will be closed if |
108 | /// a pong is not received in this amount of time. |
109 | #[cfg (feature = "runtime" )] |
110 | pub(super) keep_alive_timeout: Duration, |
111 | /// If true, sends pings even when there are no active streams. |
112 | #[cfg (feature = "runtime" )] |
113 | pub(super) keep_alive_while_idle: bool, |
114 | } |
115 | |
116 | #[derive (Clone)] |
117 | pub(crate) struct Recorder { |
118 | shared: Option<Arc<Mutex<Shared>>>, |
119 | } |
120 | |
121 | pub(super) struct Ponger { |
122 | bdp: Option<Bdp>, |
123 | #[cfg (feature = "runtime" )] |
124 | keep_alive: Option<KeepAlive>, |
125 | shared: Arc<Mutex<Shared>>, |
126 | } |
127 | |
128 | struct Shared { |
129 | ping_pong: PingPong, |
130 | ping_sent_at: Option<Instant>, |
131 | |
132 | // bdp |
133 | /// If `Some`, bdp is enabled, and this tracks how many bytes have been |
134 | /// read during the current sample. |
135 | bytes: Option<usize>, |
136 | /// We delay a variable amount of time between BDP pings. This allows us |
137 | /// to send less pings as the bandwidth stabilizes. |
138 | next_bdp_at: Option<Instant>, |
139 | |
140 | // keep-alive |
141 | /// If `Some`, keep-alive is enabled, and the Instant is how long ago |
142 | /// the connection read the last frame. |
143 | #[cfg (feature = "runtime" )] |
144 | last_read_at: Option<Instant>, |
145 | |
146 | #[cfg (feature = "runtime" )] |
147 | is_keep_alive_timed_out: bool, |
148 | } |
149 | |
150 | struct Bdp { |
151 | /// Current BDP in bytes |
152 | bdp: u32, |
153 | /// Largest bandwidth we've seen so far. |
154 | max_bandwidth: f64, |
155 | /// Round trip time in seconds |
156 | rtt: f64, |
157 | /// Delay the next ping by this amount. |
158 | /// |
159 | /// This will change depending on how stable the current bandwidth is. |
160 | ping_delay: Duration, |
161 | /// The count of ping round trips where BDP has stayed the same. |
162 | stable_count: u32, |
163 | } |
164 | |
165 | #[cfg (feature = "runtime" )] |
166 | struct KeepAlive { |
167 | /// If no frames are received in this amount of time, a PING frame is sent. |
168 | interval: Duration, |
169 | /// After sending a keepalive PING, the connection will be closed if |
170 | /// a pong is not received in this amount of time. |
171 | timeout: Duration, |
172 | /// If true, sends pings even when there are no active streams. |
173 | while_idle: bool, |
174 | |
175 | state: KeepAliveState, |
176 | timer: Pin<Box<Sleep>>, |
177 | } |
178 | |
179 | #[cfg (feature = "runtime" )] |
180 | enum KeepAliveState { |
181 | Init, |
182 | Scheduled, |
183 | PingSent, |
184 | } |
185 | |
186 | pub(super) enum Ponged { |
187 | SizeUpdate(WindowSize), |
188 | #[cfg (feature = "runtime" )] |
189 | KeepAliveTimedOut, |
190 | } |
191 | |
192 | #[cfg (feature = "runtime" )] |
193 | #[derive (Debug)] |
194 | pub(super) struct KeepAliveTimedOut; |
195 | |
196 | // ===== impl Config ===== |
197 | |
198 | impl Config { |
199 | pub(super) fn is_enabled(&self) -> bool { |
200 | #[cfg (feature = "runtime" )] |
201 | { |
202 | self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some() |
203 | } |
204 | |
205 | #[cfg (not(feature = "runtime" ))] |
206 | { |
207 | self.bdp_initial_window.is_some() |
208 | } |
209 | } |
210 | } |
211 | |
212 | // ===== impl Recorder ===== |
213 | |
214 | impl Recorder { |
215 | pub(crate) fn record_data(&self, len: usize) { |
216 | let shared = if let Some(ref shared) = self.shared { |
217 | shared |
218 | } else { |
219 | return; |
220 | }; |
221 | |
222 | let mut locked = shared.lock().unwrap(); |
223 | |
224 | #[cfg (feature = "runtime" )] |
225 | locked.update_last_read_at(); |
226 | |
227 | // are we ready to send another bdp ping? |
228 | // if not, we don't need to record bytes either |
229 | |
230 | if let Some(ref next_bdp_at) = locked.next_bdp_at { |
231 | if Instant::now() < *next_bdp_at { |
232 | return; |
233 | } else { |
234 | locked.next_bdp_at = None; |
235 | } |
236 | } |
237 | |
238 | if let Some(ref mut bytes) = locked.bytes { |
239 | *bytes += len; |
240 | } else { |
241 | // no need to send bdp ping if bdp is disabled |
242 | return; |
243 | } |
244 | |
245 | if !locked.is_ping_sent() { |
246 | locked.send_ping(); |
247 | } |
248 | } |
249 | |
250 | pub(crate) fn record_non_data(&self) { |
251 | #[cfg (feature = "runtime" )] |
252 | { |
253 | let shared = if let Some(ref shared) = self.shared { |
254 | shared |
255 | } else { |
256 | return; |
257 | }; |
258 | |
259 | let mut locked = shared.lock().unwrap(); |
260 | |
261 | locked.update_last_read_at(); |
262 | } |
263 | } |
264 | |
265 | /// If the incoming stream is already closed, convert self into |
266 | /// a disabled reporter. |
267 | #[cfg (feature = "client" )] |
268 | pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self { |
269 | if stream.is_end_stream() { |
270 | disabled() |
271 | } else { |
272 | self |
273 | } |
274 | } |
275 | |
276 | pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> { |
277 | #[cfg (feature = "runtime" )] |
278 | { |
279 | if let Some(ref shared) = self.shared { |
280 | let locked = shared.lock().unwrap(); |
281 | if locked.is_keep_alive_timed_out { |
282 | return Err(KeepAliveTimedOut.crate_error()); |
283 | } |
284 | } |
285 | } |
286 | |
287 | // else |
288 | Ok(()) |
289 | } |
290 | } |
291 | |
292 | // ===== impl Ponger ===== |
293 | |
294 | impl Ponger { |
295 | pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> { |
296 | let now = Instant::now(); |
297 | let mut locked = self.shared.lock().unwrap(); |
298 | #[cfg (feature = "runtime" )] |
299 | let is_idle = self.is_idle(); |
300 | |
301 | #[cfg (feature = "runtime" )] |
302 | { |
303 | if let Some(ref mut ka) = self.keep_alive { |
304 | ka.schedule(is_idle, &locked); |
305 | ka.maybe_ping(cx, &mut locked); |
306 | } |
307 | } |
308 | |
309 | if !locked.is_ping_sent() { |
310 | // XXX: this doesn't register a waker...? |
311 | return Poll::Pending; |
312 | } |
313 | |
314 | match locked.ping_pong.poll_pong(cx) { |
315 | Poll::Ready(Ok(_pong)) => { |
316 | let start = locked |
317 | .ping_sent_at |
318 | .expect("pong received implies ping_sent_at" ); |
319 | locked.ping_sent_at = None; |
320 | let rtt = now - start; |
321 | trace!("recv pong" ); |
322 | |
323 | #[cfg (feature = "runtime" )] |
324 | { |
325 | if let Some(ref mut ka) = self.keep_alive { |
326 | locked.update_last_read_at(); |
327 | ka.schedule(is_idle, &locked); |
328 | } |
329 | } |
330 | |
331 | if let Some(ref mut bdp) = self.bdp { |
332 | let bytes = locked.bytes.expect("bdp enabled implies bytes" ); |
333 | locked.bytes = Some(0); // reset |
334 | trace!("received BDP ack; bytes = {}, rtt = {:?}" , bytes, rtt); |
335 | |
336 | let update = bdp.calculate(bytes, rtt); |
337 | locked.next_bdp_at = Some(now + bdp.ping_delay); |
338 | if let Some(update) = update { |
339 | return Poll::Ready(Ponged::SizeUpdate(update)); |
340 | } |
341 | } |
342 | } |
343 | Poll::Ready(Err(e)) => { |
344 | debug!("pong error: {}" , e); |
345 | } |
346 | Poll::Pending => { |
347 | #[cfg (feature = "runtime" )] |
348 | { |
349 | if let Some(ref mut ka) = self.keep_alive { |
350 | if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) { |
351 | self.keep_alive = None; |
352 | locked.is_keep_alive_timed_out = true; |
353 | return Poll::Ready(Ponged::KeepAliveTimedOut); |
354 | } |
355 | } |
356 | } |
357 | } |
358 | } |
359 | |
360 | // XXX: this doesn't register a waker...? |
361 | Poll::Pending |
362 | } |
363 | |
364 | #[cfg (feature = "runtime" )] |
365 | fn is_idle(&self) -> bool { |
366 | Arc::strong_count(&self.shared) <= 2 |
367 | } |
368 | } |
369 | |
370 | // ===== impl Shared ===== |
371 | |
372 | impl Shared { |
373 | fn send_ping(&mut self) { |
374 | match self.ping_pong.send_ping(Ping::opaque()) { |
375 | Ok(()) => { |
376 | self.ping_sent_at = Some(Instant::now()); |
377 | trace!("sent ping" ); |
378 | } |
379 | Err(err) => { |
380 | debug!("error sending ping: {}" , err); |
381 | } |
382 | } |
383 | } |
384 | |
385 | fn is_ping_sent(&self) -> bool { |
386 | self.ping_sent_at.is_some() |
387 | } |
388 | |
389 | #[cfg (feature = "runtime" )] |
390 | fn update_last_read_at(&mut self) { |
391 | if self.last_read_at.is_some() { |
392 | self.last_read_at = Some(Instant::now()); |
393 | } |
394 | } |
395 | |
396 | #[cfg (feature = "runtime" )] |
397 | fn last_read_at(&self) -> Instant { |
398 | self.last_read_at.expect("keep_alive expects last_read_at" ) |
399 | } |
400 | } |
401 | |
402 | // ===== impl Bdp ===== |
403 | |
404 | /// Any higher than this likely will be hitting the TCP flow control. |
405 | const BDP_LIMIT: usize = 1024 * 1024 * 16; |
406 | |
407 | impl Bdp { |
408 | fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> { |
409 | // No need to do any math if we're at the limit. |
410 | if self.bdp as usize == BDP_LIMIT { |
411 | self.stabilize_delay(); |
412 | return None; |
413 | } |
414 | |
415 | // average the rtt |
416 | let rtt = seconds(rtt); |
417 | if self.rtt == 0.0 { |
418 | // First sample means rtt is first rtt. |
419 | self.rtt = rtt; |
420 | } else { |
421 | // Weigh this rtt as 1/8 for a moving average. |
422 | self.rtt += (rtt - self.rtt) * 0.125; |
423 | } |
424 | |
425 | // calculate the current bandwidth |
426 | let bw = (bytes as f64) / (self.rtt * 1.5); |
427 | trace!("current bandwidth = {:.1}B/s" , bw); |
428 | |
429 | if bw < self.max_bandwidth { |
430 | // not a faster bandwidth, so don't update |
431 | self.stabilize_delay(); |
432 | return None; |
433 | } else { |
434 | self.max_bandwidth = bw; |
435 | } |
436 | |
437 | // if the current `bytes` sample is at least 2/3 the previous |
438 | // bdp, increase to double the current sample. |
439 | if bytes >= self.bdp as usize * 2 / 3 { |
440 | self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; |
441 | trace!("BDP increased to {}" , self.bdp); |
442 | |
443 | self.stable_count = 0; |
444 | self.ping_delay /= 2; |
445 | Some(self.bdp) |
446 | } else { |
447 | self.stabilize_delay(); |
448 | None |
449 | } |
450 | } |
451 | |
452 | fn stabilize_delay(&mut self) { |
453 | if self.ping_delay < Duration::from_secs(10) { |
454 | self.stable_count += 1; |
455 | |
456 | if self.stable_count >= 2 { |
457 | self.ping_delay *= 4; |
458 | self.stable_count = 0; |
459 | } |
460 | } |
461 | } |
462 | } |
463 | |
464 | fn seconds(dur: Duration) -> f64 { |
465 | const NANOS_PER_SEC: f64 = 1_000_000_000.0; |
466 | let secs: f64 = dur.as_secs() as f64; |
467 | secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC |
468 | } |
469 | |
470 | // ===== impl KeepAlive ===== |
471 | |
472 | #[cfg (feature = "runtime" )] |
473 | impl KeepAlive { |
474 | fn schedule(&mut self, is_idle: bool, shared: &Shared) { |
475 | match self.state { |
476 | KeepAliveState::Init => { |
477 | if !self.while_idle && is_idle { |
478 | return; |
479 | } |
480 | |
481 | self.state = KeepAliveState::Scheduled; |
482 | let interval = shared.last_read_at() + self.interval; |
483 | self.timer.as_mut().reset(interval); |
484 | } |
485 | KeepAliveState::PingSent => { |
486 | if shared.is_ping_sent() { |
487 | return; |
488 | } |
489 | |
490 | self.state = KeepAliveState::Scheduled; |
491 | let interval = shared.last_read_at() + self.interval; |
492 | self.timer.as_mut().reset(interval); |
493 | } |
494 | KeepAliveState::Scheduled => (), |
495 | } |
496 | } |
497 | |
498 | fn maybe_ping(&mut self, cx: &mut task::Context<'_>, shared: &mut Shared) { |
499 | match self.state { |
500 | KeepAliveState::Scheduled => { |
501 | if Pin::new(&mut self.timer).poll(cx).is_pending() { |
502 | return; |
503 | } |
504 | // check if we've received a frame while we were scheduled |
505 | if shared.last_read_at() + self.interval > self.timer.deadline() { |
506 | self.state = KeepAliveState::Init; |
507 | cx.waker().wake_by_ref(); // schedule us again |
508 | return; |
509 | } |
510 | trace!("keep-alive interval ( {:?}) reached" , self.interval); |
511 | shared.send_ping(); |
512 | self.state = KeepAliveState::PingSent; |
513 | let timeout = Instant::now() + self.timeout; |
514 | self.timer.as_mut().reset(timeout); |
515 | } |
516 | KeepAliveState::Init | KeepAliveState::PingSent => (), |
517 | } |
518 | } |
519 | |
520 | fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> { |
521 | match self.state { |
522 | KeepAliveState::PingSent => { |
523 | if Pin::new(&mut self.timer).poll(cx).is_pending() { |
524 | return Ok(()); |
525 | } |
526 | trace!("keep-alive timeout ( {:?}) reached" , self.timeout); |
527 | Err(KeepAliveTimedOut) |
528 | } |
529 | KeepAliveState::Init | KeepAliveState::Scheduled => Ok(()), |
530 | } |
531 | } |
532 | } |
533 | |
534 | // ===== impl KeepAliveTimedOut ===== |
535 | |
536 | #[cfg (feature = "runtime" )] |
537 | impl KeepAliveTimedOut { |
538 | pub(super) fn crate_error(self) -> crate::Error { |
539 | crate::Error::new(crate::error::Kind::Http2).with(self) |
540 | } |
541 | } |
542 | |
543 | #[cfg (feature = "runtime" )] |
544 | impl fmt::Display for KeepAliveTimedOut { |
545 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
546 | f.write_str(data:"keep-alive timed out" ) |
547 | } |
548 | } |
549 | |
550 | #[cfg (feature = "runtime" )] |
551 | impl std::error::Error for KeepAliveTimedOut { |
552 | fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { |
553 | Some(&crate::error::TimedOut) |
554 | } |
555 | } |
556 | |