1/// HTTP2 Ping usage
2///
3/// hyper uses HTTP2 pings for two purposes:
4///
5/// 1. Adaptive flow control using BDP
6/// 2. Connection keep-alive
7///
8/// Both cases are optional.
9///
10/// # BDP Algorithm
11///
12/// 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
13/// 1a. Record current time.
14/// 1b. Send a BDP ping.
15/// 2. Increment the number of received bytes.
16/// 3. When the BDP ping ack is received:
17/// 3a. Record duration from sent time.
18/// 3b. Merge RTT with a running average.
19/// 3c. Calculate bdp as bytes/rtt.
20/// 3d. If bdp is over 2/3 max, set new max to bdp and update windows.
21
22#[cfg(feature = "runtime")]
23use std::fmt;
24#[cfg(feature = "runtime")]
25use std::future::Future;
26#[cfg(feature = "runtime")]
27use std::pin::Pin;
28use std::sync::{Arc, Mutex};
29use std::task::{self, Poll};
30use std::time::Duration;
31#[cfg(not(feature = "runtime"))]
32use std::time::Instant;
33
34use h2::{Ping, PingPong};
35#[cfg(feature = "runtime")]
36use tokio::time::{Instant, Sleep};
37use tracing::{debug, trace};
38
39type WindowSize = u32;
40
41pub(super) fn disabled() -> Recorder {
42 Recorder { shared: None }
43}
44
45pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) {
46 debug_assert!(
47 config.is_enabled(),
48 "ping channel requires bdp or keep-alive config",
49 );
50
51 let bdp = config.bdp_initial_window.map(|wnd| Bdp {
52 bdp: wnd,
53 max_bandwidth: 0.0,
54 rtt: 0.0,
55 ping_delay: Duration::from_millis(100),
56 stable_count: 0,
57 });
58
59 let (bytes, next_bdp_at) = if bdp.is_some() {
60 (Some(0), Some(Instant::now()))
61 } else {
62 (None, None)
63 };
64
65 #[cfg(feature = "runtime")]
66 let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
67 interval,
68 timeout: config.keep_alive_timeout,
69 while_idle: config.keep_alive_while_idle,
70 timer: Box::pin(tokio::time::sleep(interval)),
71 state: KeepAliveState::Init,
72 });
73
74 #[cfg(feature = "runtime")]
75 let last_read_at = keep_alive.as_ref().map(|_| Instant::now());
76
77 let shared = Arc::new(Mutex::new(Shared {
78 bytes,
79 #[cfg(feature = "runtime")]
80 last_read_at,
81 #[cfg(feature = "runtime")]
82 is_keep_alive_timed_out: false,
83 ping_pong,
84 ping_sent_at: None,
85 next_bdp_at,
86 }));
87
88 (
89 Recorder {
90 shared: Some(shared.clone()),
91 },
92 Ponger {
93 bdp,
94 #[cfg(feature = "runtime")]
95 keep_alive,
96 shared,
97 },
98 )
99}
100
101#[derive(Clone)]
102pub(super) struct Config {
103 pub(super) bdp_initial_window: Option<WindowSize>,
104 /// If no frames are received in this amount of time, a PING frame is sent.
105 #[cfg(feature = "runtime")]
106 pub(super) keep_alive_interval: Option<Duration>,
107 /// After sending a keepalive PING, the connection will be closed if
108 /// a pong is not received in this amount of time.
109 #[cfg(feature = "runtime")]
110 pub(super) keep_alive_timeout: Duration,
111 /// If true, sends pings even when there are no active streams.
112 #[cfg(feature = "runtime")]
113 pub(super) keep_alive_while_idle: bool,
114}
115
116#[derive(Clone)]
117pub(crate) struct Recorder {
118 shared: Option<Arc<Mutex<Shared>>>,
119}
120
121pub(super) struct Ponger {
122 bdp: Option<Bdp>,
123 #[cfg(feature = "runtime")]
124 keep_alive: Option<KeepAlive>,
125 shared: Arc<Mutex<Shared>>,
126}
127
128struct Shared {
129 ping_pong: PingPong,
130 ping_sent_at: Option<Instant>,
131
132 // bdp
133 /// If `Some`, bdp is enabled, and this tracks how many bytes have been
134 /// read during the current sample.
135 bytes: Option<usize>,
136 /// We delay a variable amount of time between BDP pings. This allows us
137 /// to send less pings as the bandwidth stabilizes.
138 next_bdp_at: Option<Instant>,
139
140 // keep-alive
141 /// If `Some`, keep-alive is enabled, and the Instant is how long ago
142 /// the connection read the last frame.
143 #[cfg(feature = "runtime")]
144 last_read_at: Option<Instant>,
145
146 #[cfg(feature = "runtime")]
147 is_keep_alive_timed_out: bool,
148}
149
150struct Bdp {
151 /// Current BDP in bytes
152 bdp: u32,
153 /// Largest bandwidth we've seen so far.
154 max_bandwidth: f64,
155 /// Round trip time in seconds
156 rtt: f64,
157 /// Delay the next ping by this amount.
158 ///
159 /// This will change depending on how stable the current bandwidth is.
160 ping_delay: Duration,
161 /// The count of ping round trips where BDP has stayed the same.
162 stable_count: u32,
163}
164
165#[cfg(feature = "runtime")]
166struct KeepAlive {
167 /// If no frames are received in this amount of time, a PING frame is sent.
168 interval: Duration,
169 /// After sending a keepalive PING, the connection will be closed if
170 /// a pong is not received in this amount of time.
171 timeout: Duration,
172 /// If true, sends pings even when there are no active streams.
173 while_idle: bool,
174
175 state: KeepAliveState,
176 timer: Pin<Box<Sleep>>,
177}
178
179#[cfg(feature = "runtime")]
180enum KeepAliveState {
181 Init,
182 Scheduled,
183 PingSent,
184}
185
186pub(super) enum Ponged {
187 SizeUpdate(WindowSize),
188 #[cfg(feature = "runtime")]
189 KeepAliveTimedOut,
190}
191
192#[cfg(feature = "runtime")]
193#[derive(Debug)]
194pub(super) struct KeepAliveTimedOut;
195
196// ===== impl Config =====
197
198impl Config {
199 pub(super) fn is_enabled(&self) -> bool {
200 #[cfg(feature = "runtime")]
201 {
202 self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
203 }
204
205 #[cfg(not(feature = "runtime"))]
206 {
207 self.bdp_initial_window.is_some()
208 }
209 }
210}
211
212// ===== impl Recorder =====
213
214impl Recorder {
215 pub(crate) fn record_data(&self, len: usize) {
216 let shared = if let Some(ref shared) = self.shared {
217 shared
218 } else {
219 return;
220 };
221
222 let mut locked = shared.lock().unwrap();
223
224 #[cfg(feature = "runtime")]
225 locked.update_last_read_at();
226
227 // are we ready to send another bdp ping?
228 // if not, we don't need to record bytes either
229
230 if let Some(ref next_bdp_at) = locked.next_bdp_at {
231 if Instant::now() < *next_bdp_at {
232 return;
233 } else {
234 locked.next_bdp_at = None;
235 }
236 }
237
238 if let Some(ref mut bytes) = locked.bytes {
239 *bytes += len;
240 } else {
241 // no need to send bdp ping if bdp is disabled
242 return;
243 }
244
245 if !locked.is_ping_sent() {
246 locked.send_ping();
247 }
248 }
249
250 pub(crate) fn record_non_data(&self) {
251 #[cfg(feature = "runtime")]
252 {
253 let shared = if let Some(ref shared) = self.shared {
254 shared
255 } else {
256 return;
257 };
258
259 let mut locked = shared.lock().unwrap();
260
261 locked.update_last_read_at();
262 }
263 }
264
265 /// If the incoming stream is already closed, convert self into
266 /// a disabled reporter.
267 #[cfg(feature = "client")]
268 pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
269 if stream.is_end_stream() {
270 disabled()
271 } else {
272 self
273 }
274 }
275
276 pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
277 #[cfg(feature = "runtime")]
278 {
279 if let Some(ref shared) = self.shared {
280 let locked = shared.lock().unwrap();
281 if locked.is_keep_alive_timed_out {
282 return Err(KeepAliveTimedOut.crate_error());
283 }
284 }
285 }
286
287 // else
288 Ok(())
289 }
290}
291
292// ===== impl Ponger =====
293
294impl Ponger {
295 pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
296 let now = Instant::now();
297 let mut locked = self.shared.lock().unwrap();
298 #[cfg(feature = "runtime")]
299 let is_idle = self.is_idle();
300
301 #[cfg(feature = "runtime")]
302 {
303 if let Some(ref mut ka) = self.keep_alive {
304 ka.schedule(is_idle, &locked);
305 ka.maybe_ping(cx, &mut locked);
306 }
307 }
308
309 if !locked.is_ping_sent() {
310 // XXX: this doesn't register a waker...?
311 return Poll::Pending;
312 }
313
314 match locked.ping_pong.poll_pong(cx) {
315 Poll::Ready(Ok(_pong)) => {
316 let start = locked
317 .ping_sent_at
318 .expect("pong received implies ping_sent_at");
319 locked.ping_sent_at = None;
320 let rtt = now - start;
321 trace!("recv pong");
322
323 #[cfg(feature = "runtime")]
324 {
325 if let Some(ref mut ka) = self.keep_alive {
326 locked.update_last_read_at();
327 ka.schedule(is_idle, &locked);
328 }
329 }
330
331 if let Some(ref mut bdp) = self.bdp {
332 let bytes = locked.bytes.expect("bdp enabled implies bytes");
333 locked.bytes = Some(0); // reset
334 trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
335
336 let update = bdp.calculate(bytes, rtt);
337 locked.next_bdp_at = Some(now + bdp.ping_delay);
338 if let Some(update) = update {
339 return Poll::Ready(Ponged::SizeUpdate(update));
340 }
341 }
342 }
343 Poll::Ready(Err(e)) => {
344 debug!("pong error: {}", e);
345 }
346 Poll::Pending => {
347 #[cfg(feature = "runtime")]
348 {
349 if let Some(ref mut ka) = self.keep_alive {
350 if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
351 self.keep_alive = None;
352 locked.is_keep_alive_timed_out = true;
353 return Poll::Ready(Ponged::KeepAliveTimedOut);
354 }
355 }
356 }
357 }
358 }
359
360 // XXX: this doesn't register a waker...?
361 Poll::Pending
362 }
363
364 #[cfg(feature = "runtime")]
365 fn is_idle(&self) -> bool {
366 Arc::strong_count(&self.shared) <= 2
367 }
368}
369
370// ===== impl Shared =====
371
372impl Shared {
373 fn send_ping(&mut self) {
374 match self.ping_pong.send_ping(Ping::opaque()) {
375 Ok(()) => {
376 self.ping_sent_at = Some(Instant::now());
377 trace!("sent ping");
378 }
379 Err(err) => {
380 debug!("error sending ping: {}", err);
381 }
382 }
383 }
384
385 fn is_ping_sent(&self) -> bool {
386 self.ping_sent_at.is_some()
387 }
388
389 #[cfg(feature = "runtime")]
390 fn update_last_read_at(&mut self) {
391 if self.last_read_at.is_some() {
392 self.last_read_at = Some(Instant::now());
393 }
394 }
395
396 #[cfg(feature = "runtime")]
397 fn last_read_at(&self) -> Instant {
398 self.last_read_at.expect("keep_alive expects last_read_at")
399 }
400}
401
402// ===== impl Bdp =====
403
404/// Any higher than this likely will be hitting the TCP flow control.
405const BDP_LIMIT: usize = 1024 * 1024 * 16;
406
407impl Bdp {
408 fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
409 // No need to do any math if we're at the limit.
410 if self.bdp as usize == BDP_LIMIT {
411 self.stabilize_delay();
412 return None;
413 }
414
415 // average the rtt
416 let rtt = seconds(rtt);
417 if self.rtt == 0.0 {
418 // First sample means rtt is first rtt.
419 self.rtt = rtt;
420 } else {
421 // Weigh this rtt as 1/8 for a moving average.
422 self.rtt += (rtt - self.rtt) * 0.125;
423 }
424
425 // calculate the current bandwidth
426 let bw = (bytes as f64) / (self.rtt * 1.5);
427 trace!("current bandwidth = {:.1}B/s", bw);
428
429 if bw < self.max_bandwidth {
430 // not a faster bandwidth, so don't update
431 self.stabilize_delay();
432 return None;
433 } else {
434 self.max_bandwidth = bw;
435 }
436
437 // if the current `bytes` sample is at least 2/3 the previous
438 // bdp, increase to double the current sample.
439 if bytes >= self.bdp as usize * 2 / 3 {
440 self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
441 trace!("BDP increased to {}", self.bdp);
442
443 self.stable_count = 0;
444 self.ping_delay /= 2;
445 Some(self.bdp)
446 } else {
447 self.stabilize_delay();
448 None
449 }
450 }
451
452 fn stabilize_delay(&mut self) {
453 if self.ping_delay < Duration::from_secs(10) {
454 self.stable_count += 1;
455
456 if self.stable_count >= 2 {
457 self.ping_delay *= 4;
458 self.stable_count = 0;
459 }
460 }
461 }
462}
463
464fn seconds(dur: Duration) -> f64 {
465 const NANOS_PER_SEC: f64 = 1_000_000_000.0;
466 let secs: f64 = dur.as_secs() as f64;
467 secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
468}
469
470// ===== impl KeepAlive =====
471
472#[cfg(feature = "runtime")]
473impl KeepAlive {
474 fn schedule(&mut self, is_idle: bool, shared: &Shared) {
475 match self.state {
476 KeepAliveState::Init => {
477 if !self.while_idle && is_idle {
478 return;
479 }
480
481 self.state = KeepAliveState::Scheduled;
482 let interval = shared.last_read_at() + self.interval;
483 self.timer.as_mut().reset(interval);
484 }
485 KeepAliveState::PingSent => {
486 if shared.is_ping_sent() {
487 return;
488 }
489
490 self.state = KeepAliveState::Scheduled;
491 let interval = shared.last_read_at() + self.interval;
492 self.timer.as_mut().reset(interval);
493 }
494 KeepAliveState::Scheduled => (),
495 }
496 }
497
498 fn maybe_ping(&mut self, cx: &mut task::Context<'_>, shared: &mut Shared) {
499 match self.state {
500 KeepAliveState::Scheduled => {
501 if Pin::new(&mut self.timer).poll(cx).is_pending() {
502 return;
503 }
504 // check if we've received a frame while we were scheduled
505 if shared.last_read_at() + self.interval > self.timer.deadline() {
506 self.state = KeepAliveState::Init;
507 cx.waker().wake_by_ref(); // schedule us again
508 return;
509 }
510 trace!("keep-alive interval ({:?}) reached", self.interval);
511 shared.send_ping();
512 self.state = KeepAliveState::PingSent;
513 let timeout = Instant::now() + self.timeout;
514 self.timer.as_mut().reset(timeout);
515 }
516 KeepAliveState::Init | KeepAliveState::PingSent => (),
517 }
518 }
519
520 fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
521 match self.state {
522 KeepAliveState::PingSent => {
523 if Pin::new(&mut self.timer).poll(cx).is_pending() {
524 return Ok(());
525 }
526 trace!("keep-alive timeout ({:?}) reached", self.timeout);
527 Err(KeepAliveTimedOut)
528 }
529 KeepAliveState::Init | KeepAliveState::Scheduled => Ok(()),
530 }
531 }
532}
533
534// ===== impl KeepAliveTimedOut =====
535
536#[cfg(feature = "runtime")]
537impl KeepAliveTimedOut {
538 pub(super) fn crate_error(self) -> crate::Error {
539 crate::Error::new(crate::error::Kind::Http2).with(self)
540 }
541}
542
543#[cfg(feature = "runtime")]
544impl fmt::Display for KeepAliveTimedOut {
545 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546 f.write_str(data:"keep-alive timed out")
547 }
548}
549
550#[cfg(feature = "runtime")]
551impl std::error::Error for KeepAliveTimedOut {
552 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
553 Some(&crate::error::TimedOut)
554 }
555}
556