| 1 | use std::error::Error as StdError; |
| 2 | use std::future::Future; |
| 3 | use std::io::{Cursor, IoSlice}; |
| 4 | use std::mem; |
| 5 | use std::pin::Pin; |
| 6 | use std::task::{Context, Poll}; |
| 7 | |
| 8 | use bytes::{Buf, Bytes}; |
| 9 | use futures_util::ready; |
| 10 | use h2::{Reason, RecvStream, SendStream}; |
| 11 | use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE}; |
| 12 | use http::HeaderMap; |
| 13 | use pin_project_lite::pin_project; |
| 14 | |
| 15 | use crate::body::Body; |
| 16 | use crate::proto::h2::ping::Recorder; |
| 17 | use crate::rt::{Read, ReadBufCursor, Write}; |
| 18 | |
| 19 | pub(crate) mod ping; |
| 20 | |
| 21 | cfg_client! { |
| 22 | pub(crate) mod client; |
| 23 | pub(crate) use self::client::ClientTask; |
| 24 | } |
| 25 | |
| 26 | cfg_server! { |
| 27 | pub(crate) mod server; |
| 28 | pub(crate) use self::server::Server; |
| 29 | } |
| 30 | |
| 31 | /// Default initial stream window size defined in HTTP2 spec. |
| 32 | pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535; |
| 33 | |
| 34 | // List of connection headers from RFC 9110 Section 7.6.1 |
| 35 | // |
| 36 | // TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're |
| 37 | // tested separately. |
| 38 | static CONNECTION_HEADERS: [HeaderName; 4] = [ |
| 39 | HeaderName::from_static(src:"keep-alive" ), |
| 40 | HeaderName::from_static(src:"proxy-connection" ), |
| 41 | TRANSFER_ENCODING, |
| 42 | UPGRADE, |
| 43 | ]; |
| 44 | |
| 45 | fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { |
| 46 | for header in &CONNECTION_HEADERS { |
| 47 | if headers.remove(header).is_some() { |
| 48 | warn!("Connection header illegal in HTTP/2: {}" , header.as_str()); |
| 49 | } |
| 50 | } |
| 51 | |
| 52 | if is_request { |
| 53 | if headers |
| 54 | .get(TE) |
| 55 | .map_or(false, |te_header| te_header != "trailers" ) |
| 56 | { |
| 57 | warn!("TE headers not set to \"trailers \" are illegal in HTTP/2 requests" ); |
| 58 | headers.remove(TE); |
| 59 | } |
| 60 | } else if headers.remove(TE).is_some() { |
| 61 | warn!("TE headers illegal in HTTP/2 responses" ); |
| 62 | } |
| 63 | |
| 64 | if let Some(header) = headers.remove(CONNECTION) { |
| 65 | warn!( |
| 66 | "Connection header illegal in HTTP/2: {}" , |
| 67 | CONNECTION.as_str() |
| 68 | ); |
| 69 | let header_contents = header.to_str().unwrap(); |
| 70 | |
| 71 | // A `Connection` header may have a comma-separated list of names of other headers that |
| 72 | // are meant for only this specific connection. |
| 73 | // |
| 74 | // Iterate these names and remove them as headers. Connection-specific headers are |
| 75 | // forbidden in HTTP2, as that information has been moved into frame types of the h2 |
| 76 | // protocol. |
| 77 | for name in header_contents.split(',' ) { |
| 78 | let name = name.trim(); |
| 79 | headers.remove(name); |
| 80 | } |
| 81 | } |
| 82 | } |
| 83 | |
| 84 | // body adapters used by both Client and Server |
| 85 | |
| 86 | pin_project! { |
| 87 | pub(crate) struct PipeToSendStream<S> |
| 88 | where |
| 89 | S: Body, |
| 90 | { |
| 91 | body_tx: SendStream<SendBuf<S::Data>>, |
| 92 | data_done: bool, |
| 93 | #[pin] |
| 94 | stream: S, |
| 95 | } |
| 96 | } |
| 97 | |
| 98 | impl<S> PipeToSendStream<S> |
| 99 | where |
| 100 | S: Body, |
| 101 | { |
| 102 | fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> { |
| 103 | PipeToSendStream { |
| 104 | body_tx: tx, |
| 105 | data_done: false, |
| 106 | stream, |
| 107 | } |
| 108 | } |
| 109 | } |
| 110 | |
| 111 | impl<S> Future for PipeToSendStream<S> |
| 112 | where |
| 113 | S: Body, |
| 114 | S::Error: Into<Box<dyn StdError + Send + Sync>>, |
| 115 | { |
| 116 | type Output = crate::Result<()>; |
| 117 | |
| 118 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 119 | let mut me = self.project(); |
| 120 | loop { |
| 121 | // we don't have the next chunk of data yet, so just reserve 1 byte to make |
| 122 | // sure there's some capacity available. h2 will handle the capacity management |
| 123 | // for the actual body chunk. |
| 124 | me.body_tx.reserve_capacity(1); |
| 125 | |
| 126 | if me.body_tx.capacity() == 0 { |
| 127 | loop { |
| 128 | match ready!(me.body_tx.poll_capacity(cx)) { |
| 129 | Some(Ok(0)) => {} |
| 130 | Some(Ok(_)) => break, |
| 131 | Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))), |
| 132 | None => { |
| 133 | // None means the stream is no longer in a |
| 134 | // streaming state, we either finished it |
| 135 | // somehow, or the remote reset us. |
| 136 | return Poll::Ready(Err(crate::Error::new_body_write( |
| 137 | "send stream capacity unexpectedly closed" , |
| 138 | ))); |
| 139 | } |
| 140 | } |
| 141 | } |
| 142 | } else if let Poll::Ready(reason) = me |
| 143 | .body_tx |
| 144 | .poll_reset(cx) |
| 145 | .map_err(crate::Error::new_body_write)? |
| 146 | { |
| 147 | debug!("stream received RST_STREAM: {:?}" , reason); |
| 148 | return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason)))); |
| 149 | } |
| 150 | |
| 151 | match ready!(me.stream.as_mut().poll_frame(cx)) { |
| 152 | Some(Ok(frame)) => { |
| 153 | if frame.is_data() { |
| 154 | let chunk = frame.into_data().unwrap_or_else(|_| unreachable!()); |
| 155 | let is_eos = me.stream.is_end_stream(); |
| 156 | trace!( |
| 157 | "send body chunk: {} bytes, eos={}" , |
| 158 | chunk.remaining(), |
| 159 | is_eos, |
| 160 | ); |
| 161 | |
| 162 | let buf = SendBuf::Buf(chunk); |
| 163 | me.body_tx |
| 164 | .send_data(buf, is_eos) |
| 165 | .map_err(crate::Error::new_body_write)?; |
| 166 | |
| 167 | if is_eos { |
| 168 | return Poll::Ready(Ok(())); |
| 169 | } |
| 170 | } else if frame.is_trailers() { |
| 171 | // no more DATA, so give any capacity back |
| 172 | me.body_tx.reserve_capacity(0); |
| 173 | me.body_tx |
| 174 | .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!())) |
| 175 | .map_err(crate::Error::new_body_write)?; |
| 176 | return Poll::Ready(Ok(())); |
| 177 | } else { |
| 178 | trace!("discarding unknown frame" ); |
| 179 | // loop again |
| 180 | } |
| 181 | } |
| 182 | Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), |
| 183 | None => { |
| 184 | // no more frames means we're done here |
| 185 | // but at this point, we haven't sent an EOS DATA, or |
| 186 | // any trailers, so send an empty EOS DATA. |
| 187 | return Poll::Ready(me.body_tx.send_eos_frame()); |
| 188 | } |
| 189 | } |
| 190 | } |
| 191 | } |
| 192 | } |
| 193 | |
| 194 | trait SendStreamExt { |
| 195 | fn on_user_err<E>(&mut self, err: E) -> crate::Error |
| 196 | where |
| 197 | E: Into<Box<dyn std::error::Error + Send + Sync>>; |
| 198 | fn send_eos_frame(&mut self) -> crate::Result<()>; |
| 199 | } |
| 200 | |
| 201 | impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> { |
| 202 | fn on_user_err<E>(&mut self, err: E) -> crate::Error |
| 203 | where |
| 204 | E: Into<Box<dyn std::error::Error + Send + Sync>>, |
| 205 | { |
| 206 | let err: Error = crate::Error::new_user_body(cause:err); |
| 207 | debug!("send body user stream error: {}" , err); |
| 208 | self.send_reset(err.h2_reason()); |
| 209 | err |
| 210 | } |
| 211 | |
| 212 | fn send_eos_frame(&mut self) -> crate::Result<()> { |
| 213 | trace!("send body eos" ); |
| 214 | self.send_data(SendBuf::None, true) |
| 215 | .map_err(op:crate::Error::new_body_write) |
| 216 | } |
| 217 | } |
| 218 | |
| 219 | #[repr (usize)] |
| 220 | enum SendBuf<B> { |
| 221 | Buf(B), |
| 222 | Cursor(Cursor<Box<[u8]>>), |
| 223 | None, |
| 224 | } |
| 225 | |
| 226 | impl<B: Buf> Buf for SendBuf<B> { |
| 227 | #[inline ] |
| 228 | fn remaining(&self) -> usize { |
| 229 | match *self { |
| 230 | Self::Buf(ref b) => b.remaining(), |
| 231 | Self::Cursor(ref c) => Buf::remaining(c), |
| 232 | Self::None => 0, |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | #[inline ] |
| 237 | fn chunk(&self) -> &[u8] { |
| 238 | match *self { |
| 239 | Self::Buf(ref b) => b.chunk(), |
| 240 | Self::Cursor(ref c) => c.chunk(), |
| 241 | Self::None => &[], |
| 242 | } |
| 243 | } |
| 244 | |
| 245 | #[inline ] |
| 246 | fn advance(&mut self, cnt: usize) { |
| 247 | match *self { |
| 248 | Self::Buf(ref mut b) => b.advance(cnt), |
| 249 | Self::Cursor(ref mut c) => c.advance(cnt), |
| 250 | Self::None => {} |
| 251 | } |
| 252 | } |
| 253 | |
| 254 | fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { |
| 255 | match *self { |
| 256 | Self::Buf(ref b) => b.chunks_vectored(dst), |
| 257 | Self::Cursor(ref c) => c.chunks_vectored(dst), |
| 258 | Self::None => 0, |
| 259 | } |
| 260 | } |
| 261 | } |
| 262 | |
| 263 | struct H2Upgraded<B> |
| 264 | where |
| 265 | B: Buf, |
| 266 | { |
| 267 | ping: Recorder, |
| 268 | send_stream: UpgradedSendStream<B>, |
| 269 | recv_stream: RecvStream, |
| 270 | buf: Bytes, |
| 271 | } |
| 272 | |
| 273 | impl<B> Read for H2Upgraded<B> |
| 274 | where |
| 275 | B: Buf, |
| 276 | { |
| 277 | fn poll_read( |
| 278 | mut self: Pin<&mut Self>, |
| 279 | cx: &mut Context<'_>, |
| 280 | mut read_buf: ReadBufCursor<'_>, |
| 281 | ) -> Poll<Result<(), std::io::Error>> { |
| 282 | if self.buf.is_empty() { |
| 283 | self.buf = loop { |
| 284 | match ready!(self.recv_stream.poll_data(cx)) { |
| 285 | None => return Poll::Ready(Ok(())), |
| 286 | Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => { |
| 287 | continue |
| 288 | } |
| 289 | Some(Ok(buf)) => { |
| 290 | self.ping.record_data(buf.len()); |
| 291 | break buf; |
| 292 | } |
| 293 | Some(Err(e)) => { |
| 294 | return Poll::Ready(match e.reason() { |
| 295 | Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()), |
| 296 | Some(Reason::STREAM_CLOSED) => { |
| 297 | Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)) |
| 298 | } |
| 299 | _ => Err(h2_to_io_error(e)), |
| 300 | }) |
| 301 | } |
| 302 | } |
| 303 | }; |
| 304 | } |
| 305 | let cnt = std::cmp::min(self.buf.len(), read_buf.remaining()); |
| 306 | read_buf.put_slice(&self.buf[..cnt]); |
| 307 | self.buf.advance(cnt); |
| 308 | let _ = self.recv_stream.flow_control().release_capacity(cnt); |
| 309 | Poll::Ready(Ok(())) |
| 310 | } |
| 311 | } |
| 312 | |
| 313 | impl<B> Write for H2Upgraded<B> |
| 314 | where |
| 315 | B: Buf, |
| 316 | { |
| 317 | fn poll_write( |
| 318 | mut self: Pin<&mut Self>, |
| 319 | cx: &mut Context<'_>, |
| 320 | buf: &[u8], |
| 321 | ) -> Poll<Result<usize, std::io::Error>> { |
| 322 | if buf.is_empty() { |
| 323 | return Poll::Ready(Ok(0)); |
| 324 | } |
| 325 | self.send_stream.reserve_capacity(buf.len()); |
| 326 | |
| 327 | // We ignore all errors returned by `poll_capacity` and `write`, as we |
| 328 | // will get the correct from `poll_reset` anyway. |
| 329 | let cnt = match ready!(self.send_stream.poll_capacity(cx)) { |
| 330 | None => Some(0), |
| 331 | Some(Ok(cnt)) => self |
| 332 | .send_stream |
| 333 | .write(&buf[..cnt], false) |
| 334 | .ok() |
| 335 | .map(|()| cnt), |
| 336 | Some(Err(_)) => None, |
| 337 | }; |
| 338 | |
| 339 | if let Some(cnt) = cnt { |
| 340 | return Poll::Ready(Ok(cnt)); |
| 341 | } |
| 342 | |
| 343 | Poll::Ready(Err(h2_to_io_error( |
| 344 | match ready!(self.send_stream.poll_reset(cx)) { |
| 345 | Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { |
| 346 | return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())) |
| 347 | } |
| 348 | Ok(reason) => reason.into(), |
| 349 | Err(e) => e, |
| 350 | }, |
| 351 | ))) |
| 352 | } |
| 353 | |
| 354 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { |
| 355 | Poll::Ready(Ok(())) |
| 356 | } |
| 357 | |
| 358 | fn poll_shutdown( |
| 359 | mut self: Pin<&mut Self>, |
| 360 | cx: &mut Context<'_>, |
| 361 | ) -> Poll<Result<(), std::io::Error>> { |
| 362 | if self.send_stream.write(&[], true).is_ok() { |
| 363 | return Poll::Ready(Ok(())); |
| 364 | } |
| 365 | |
| 366 | Poll::Ready(Err(h2_to_io_error( |
| 367 | match ready!(self.send_stream.poll_reset(cx)) { |
| 368 | Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())), |
| 369 | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { |
| 370 | return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())) |
| 371 | } |
| 372 | Ok(reason) => reason.into(), |
| 373 | Err(e) => e, |
| 374 | }, |
| 375 | ))) |
| 376 | } |
| 377 | } |
| 378 | |
| 379 | fn h2_to_io_error(e: h2::Error) -> std::io::Error { |
| 380 | if e.is_io() { |
| 381 | e.into_io().unwrap() |
| 382 | } else { |
| 383 | std::io::Error::new(kind:std::io::ErrorKind::Other, error:e) |
| 384 | } |
| 385 | } |
| 386 | |
| 387 | struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>); |
| 388 | |
| 389 | impl<B> UpgradedSendStream<B> |
| 390 | where |
| 391 | B: Buf, |
| 392 | { |
| 393 | unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self { |
| 394 | assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>()); |
| 395 | Self(mem::transmute(inner)) |
| 396 | } |
| 397 | |
| 398 | fn reserve_capacity(&mut self, cnt: usize) { |
| 399 | unsafe { self.as_inner_unchecked().reserve_capacity(cnt) } |
| 400 | } |
| 401 | |
| 402 | fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> { |
| 403 | unsafe { self.as_inner_unchecked().poll_capacity(cx) } |
| 404 | } |
| 405 | |
| 406 | fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> { |
| 407 | unsafe { self.as_inner_unchecked().poll_reset(cx) } |
| 408 | } |
| 409 | |
| 410 | fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), std::io::Error> { |
| 411 | let send_buf = SendBuf::Cursor(Cursor::new(buf.into())); |
| 412 | unsafe { |
| 413 | self.as_inner_unchecked() |
| 414 | .send_data(send_buf, end_of_stream) |
| 415 | .map_err(h2_to_io_error) |
| 416 | } |
| 417 | } |
| 418 | |
| 419 | unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> { |
| 420 | &mut *(&mut self.0 as *mut _ as *mut _) |
| 421 | } |
| 422 | } |
| 423 | |
| 424 | #[repr (transparent)] |
| 425 | struct Neutered<B> { |
| 426 | _inner: B, |
| 427 | impossible: Impossible, |
| 428 | } |
| 429 | |
| 430 | enum Impossible {} |
| 431 | |
| 432 | unsafe impl<B> Send for Neutered<B> {} |
| 433 | |
| 434 | impl<B> Buf for Neutered<B> { |
| 435 | fn remaining(&self) -> usize { |
| 436 | match self.impossible {} |
| 437 | } |
| 438 | |
| 439 | fn chunk(&self) -> &[u8] { |
| 440 | match self.impossible {} |
| 441 | } |
| 442 | |
| 443 | fn advance(&mut self, _cnt: usize) { |
| 444 | match self.impossible {} |
| 445 | } |
| 446 | } |
| 447 | |