1use bytes::{Buf, Bytes};
2use h2::{Reason, RecvStream, SendStream};
3use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRADE};
4use http::HeaderMap;
5use pin_project_lite::pin_project;
6use std::error::Error as StdError;
7use std::future::Future;
8use std::io::{self, Cursor, IoSlice};
9use std::mem;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
13use tracing::{debug, trace, warn};
14
15use crate::body::HttpBody;
16use crate::proto::h2::ping::Recorder;
17
18pub(crate) mod ping;
19
20cfg_client! {
21 pub(crate) mod client;
22 pub(crate) use self::client::ClientTask;
23}
24
25cfg_server! {
26 pub(crate) mod server;
27 pub(crate) use self::server::Server;
28}
29
30/// Default initial stream window size defined in HTTP2 spec.
31pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
32
33fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
34 // List of connection headers from:
35 // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection
36 //
37 // TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're
38 // tested separately.
39 let connection_headers = [
40 HeaderName::from_lowercase(b"keep-alive").unwrap(),
41 HeaderName::from_lowercase(b"proxy-connection").unwrap(),
42 TRAILER,
43 TRANSFER_ENCODING,
44 UPGRADE,
45 ];
46
47 for header in connection_headers.iter() {
48 if headers.remove(header).is_some() {
49 warn!("Connection header illegal in HTTP/2: {}", header.as_str());
50 }
51 }
52
53 if is_request {
54 if headers
55 .get(TE)
56 .map(|te_header| te_header != "trailers")
57 .unwrap_or(false)
58 {
59 warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
60 headers.remove(TE);
61 }
62 } else if headers.remove(TE).is_some() {
63 warn!("TE headers illegal in HTTP/2 responses");
64 }
65
66 if let Some(header) = headers.remove(CONNECTION) {
67 warn!(
68 "Connection header illegal in HTTP/2: {}",
69 CONNECTION.as_str()
70 );
71 let header_contents = header.to_str().unwrap();
72
73 // A `Connection` header may have a comma-separated list of names of other headers that
74 // are meant for only this specific connection.
75 //
76 // Iterate these names and remove them as headers. Connection-specific headers are
77 // forbidden in HTTP2, as that information has been moved into frame types of the h2
78 // protocol.
79 for name in header_contents.split(',') {
80 let name = name.trim();
81 headers.remove(name);
82 }
83 }
84}
85
86// body adapters used by both Client and Server
87
88pin_project! {
89 struct PipeToSendStream<S>
90 where
91 S: HttpBody,
92 {
93 body_tx: SendStream<SendBuf<S::Data>>,
94 data_done: bool,
95 #[pin]
96 stream: S,
97 }
98}
99
100impl<S> PipeToSendStream<S>
101where
102 S: HttpBody,
103{
104 fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
105 PipeToSendStream {
106 body_tx: tx,
107 data_done: false,
108 stream,
109 }
110 }
111}
112
113impl<S> Future for PipeToSendStream<S>
114where
115 S: HttpBody,
116 S::Error: Into<Box<dyn StdError + Send + Sync>>,
117{
118 type Output = crate::Result<()>;
119
120 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
121 let mut me = self.project();
122 loop {
123 if !*me.data_done {
124 // we don't have the next chunk of data yet, so just reserve 1 byte to make
125 // sure there's some capacity available. h2 will handle the capacity management
126 // for the actual body chunk.
127 me.body_tx.reserve_capacity(1);
128
129 if me.body_tx.capacity() == 0 {
130 loop {
131 match ready!(me.body_tx.poll_capacity(cx)) {
132 Some(Ok(0)) => {}
133 Some(Ok(_)) => break,
134 Some(Err(e)) => {
135 return Poll::Ready(Err(crate::Error::new_body_write(e)))
136 }
137 None => {
138 // None means the stream is no longer in a
139 // streaming state, we either finished it
140 // somehow, or the remote reset us.
141 return Poll::Ready(Err(crate::Error::new_body_write(
142 "send stream capacity unexpectedly closed",
143 )));
144 }
145 }
146 }
147 } else if let Poll::Ready(reason) = me
148 .body_tx
149 .poll_reset(cx)
150 .map_err(crate::Error::new_body_write)?
151 {
152 debug!("stream received RST_STREAM: {:?}", reason);
153 return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
154 reason,
155 ))));
156 }
157
158 match ready!(me.stream.as_mut().poll_data(cx)) {
159 Some(Ok(chunk)) => {
160 let is_eos = me.stream.is_end_stream();
161 trace!(
162 "send body chunk: {} bytes, eos={}",
163 chunk.remaining(),
164 is_eos,
165 );
166
167 let buf = SendBuf::Buf(chunk);
168 me.body_tx
169 .send_data(buf, is_eos)
170 .map_err(crate::Error::new_body_write)?;
171
172 if is_eos {
173 return Poll::Ready(Ok(()));
174 }
175 }
176 Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
177 None => {
178 me.body_tx.reserve_capacity(0);
179 let is_eos = me.stream.is_end_stream();
180 if is_eos {
181 return Poll::Ready(me.body_tx.send_eos_frame());
182 } else {
183 *me.data_done = true;
184 // loop again to poll_trailers
185 }
186 }
187 }
188 } else {
189 if let Poll::Ready(reason) = me
190 .body_tx
191 .poll_reset(cx)
192 .map_err(crate::Error::new_body_write)?
193 {
194 debug!("stream received RST_STREAM: {:?}", reason);
195 return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
196 reason,
197 ))));
198 }
199
200 match ready!(me.stream.poll_trailers(cx)) {
201 Ok(Some(trailers)) => {
202 me.body_tx
203 .send_trailers(trailers)
204 .map_err(crate::Error::new_body_write)?;
205 return Poll::Ready(Ok(()));
206 }
207 Ok(None) => {
208 // There were no trailers, so send an empty DATA frame...
209 return Poll::Ready(me.body_tx.send_eos_frame());
210 }
211 Err(e) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
212 }
213 }
214 }
215 }
216}
217
218trait SendStreamExt {
219 fn on_user_err<E>(&mut self, err: E) -> crate::Error
220 where
221 E: Into<Box<dyn std::error::Error + Send + Sync>>;
222 fn send_eos_frame(&mut self) -> crate::Result<()>;
223}
224
225impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
226 fn on_user_err<E>(&mut self, err: E) -> crate::Error
227 where
228 E: Into<Box<dyn std::error::Error + Send + Sync>>,
229 {
230 let err: Error = crate::Error::new_user_body(cause:err);
231 debug!("send body user stream error: {}", err);
232 self.send_reset(err.h2_reason());
233 err
234 }
235
236 fn send_eos_frame(&mut self) -> crate::Result<()> {
237 trace!("send body eos");
238 self.send_data(SendBuf::None, true)
239 .map_err(op:crate::Error::new_body_write)
240 }
241}
242
243#[repr(usize)]
244enum SendBuf<B> {
245 Buf(B),
246 Cursor(Cursor<Box<[u8]>>),
247 None,
248}
249
250impl<B: Buf> Buf for SendBuf<B> {
251 #[inline]
252 fn remaining(&self) -> usize {
253 match *self {
254 Self::Buf(ref b) => b.remaining(),
255 Self::Cursor(ref c) => Buf::remaining(c),
256 Self::None => 0,
257 }
258 }
259
260 #[inline]
261 fn chunk(&self) -> &[u8] {
262 match *self {
263 Self::Buf(ref b) => b.chunk(),
264 Self::Cursor(ref c) => c.chunk(),
265 Self::None => &[],
266 }
267 }
268
269 #[inline]
270 fn advance(&mut self, cnt: usize) {
271 match *self {
272 Self::Buf(ref mut b) => b.advance(cnt),
273 Self::Cursor(ref mut c) => c.advance(cnt),
274 Self::None => {}
275 }
276 }
277
278 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
279 match *self {
280 Self::Buf(ref b) => b.chunks_vectored(dst),
281 Self::Cursor(ref c) => c.chunks_vectored(dst),
282 Self::None => 0,
283 }
284 }
285}
286
287struct H2Upgraded<B>
288where
289 B: Buf,
290{
291 ping: Recorder,
292 send_stream: UpgradedSendStream<B>,
293 recv_stream: RecvStream,
294 buf: Bytes,
295}
296
297impl<B> AsyncRead for H2Upgraded<B>
298where
299 B: Buf,
300{
301 fn poll_read(
302 mut self: Pin<&mut Self>,
303 cx: &mut Context<'_>,
304 read_buf: &mut ReadBuf<'_>,
305 ) -> Poll<Result<(), io::Error>> {
306 if self.buf.is_empty() {
307 self.buf = loop {
308 match ready!(self.recv_stream.poll_data(cx)) {
309 None => return Poll::Ready(Ok(())),
310 Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => {
311 continue
312 }
313 Some(Ok(buf)) => {
314 self.ping.record_data(buf.len());
315 break buf;
316 }
317 Some(Err(e)) => {
318 return Poll::Ready(match e.reason() {
319 Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()),
320 Some(Reason::STREAM_CLOSED) => {
321 Err(io::Error::new(io::ErrorKind::BrokenPipe, e))
322 }
323 _ => Err(h2_to_io_error(e)),
324 })
325 }
326 }
327 };
328 }
329 let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
330 read_buf.put_slice(&self.buf[..cnt]);
331 self.buf.advance(cnt);
332 let _ = self.recv_stream.flow_control().release_capacity(cnt);
333 Poll::Ready(Ok(()))
334 }
335}
336
337impl<B> AsyncWrite for H2Upgraded<B>
338where
339 B: Buf,
340{
341 fn poll_write(
342 mut self: Pin<&mut Self>,
343 cx: &mut Context<'_>,
344 buf: &[u8],
345 ) -> Poll<Result<usize, io::Error>> {
346 if buf.is_empty() {
347 return Poll::Ready(Ok(0));
348 }
349 self.send_stream.reserve_capacity(buf.len());
350
351 // We ignore all errors returned by `poll_capacity` and `write`, as we
352 // will get the correct from `poll_reset` anyway.
353 let cnt = match ready!(self.send_stream.poll_capacity(cx)) {
354 None => Some(0),
355 Some(Ok(cnt)) => self
356 .send_stream
357 .write(&buf[..cnt], false)
358 .ok()
359 .map(|()| cnt),
360 Some(Err(_)) => None,
361 };
362
363 if let Some(cnt) = cnt {
364 return Poll::Ready(Ok(cnt));
365 }
366
367 Poll::Ready(Err(h2_to_io_error(
368 match ready!(self.send_stream.poll_reset(cx)) {
369 Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
370 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
371 }
372 Ok(reason) => reason.into(),
373 Err(e) => e,
374 },
375 )))
376 }
377
378 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
379 Poll::Ready(Ok(()))
380 }
381
382 fn poll_shutdown(
383 mut self: Pin<&mut Self>,
384 cx: &mut Context<'_>,
385 ) -> Poll<Result<(), io::Error>> {
386 if self.send_stream.write(&[], true).is_ok() {
387 return Poll::Ready(Ok(()));
388 }
389
390 Poll::Ready(Err(h2_to_io_error(
391 match ready!(self.send_stream.poll_reset(cx)) {
392 Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())),
393 Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
394 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
395 }
396 Ok(reason) => reason.into(),
397 Err(e) => e,
398 },
399 )))
400 }
401}
402
403fn h2_to_io_error(e: h2::Error) -> io::Error {
404 if e.is_io() {
405 e.into_io().unwrap()
406 } else {
407 io::Error::new(kind:io::ErrorKind::Other, error:e)
408 }
409}
410
411struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>);
412
413impl<B> UpgradedSendStream<B>
414where
415 B: Buf,
416{
417 unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self {
418 assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>());
419 Self(mem::transmute(inner))
420 }
421
422 fn reserve_capacity(&mut self, cnt: usize) {
423 unsafe { self.as_inner_unchecked().reserve_capacity(cnt) }
424 }
425
426 fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> {
427 unsafe { self.as_inner_unchecked().poll_capacity(cx) }
428 }
429
430 fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> {
431 unsafe { self.as_inner_unchecked().poll_reset(cx) }
432 }
433
434 fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), io::Error> {
435 let send_buf = SendBuf::Cursor(Cursor::new(buf.into()));
436 unsafe {
437 self.as_inner_unchecked()
438 .send_data(send_buf, end_of_stream)
439 .map_err(h2_to_io_error)
440 }
441 }
442
443 unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> {
444 &mut *(&mut self.0 as *mut _ as *mut _)
445 }
446}
447
448#[repr(transparent)]
449struct Neutered<B> {
450 _inner: B,
451 impossible: Impossible,
452}
453
454enum Impossible {}
455
456unsafe impl<B> Send for Neutered<B> {}
457
458impl<B> Buf for Neutered<B> {
459 fn remaining(&self) -> usize {
460 match self.impossible {}
461 }
462
463 fn chunk(&self) -> &[u8] {
464 match self.impossible {}
465 }
466
467 fn advance(&mut self, _cnt: usize) {
468 match self.impossible {}
469 }
470}
471