1use bytes::{Buf, Bytes};
2use h2::{Reason, RecvStream, SendStream};
3use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRADE};
4use http::HeaderMap;
5use pin_project_lite::pin_project;
6use std::error::Error as StdError;
7use std::io::{self, Cursor, IoSlice};
8use std::mem;
9use std::task::Context;
10use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
11use tracing::{debug, trace, warn};
12
13use crate::body::HttpBody;
14use crate::common::{task, Future, Pin, Poll};
15use crate::proto::h2::ping::Recorder;
16
17pub(crate) mod ping;
18
19cfg_client! {
20 pub(crate) mod client;
21 pub(crate) use self::client::ClientTask;
22}
23
24cfg_server! {
25 pub(crate) mod server;
26 pub(crate) use self::server::Server;
27}
28
29/// Default initial stream window size defined in HTTP2 spec.
30pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
31
32fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
33 // List of connection headers from:
34 // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection
35 //
36 // TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're
37 // tested separately.
38 let connection_headers = [
39 HeaderName::from_lowercase(b"keep-alive").unwrap(),
40 HeaderName::from_lowercase(b"proxy-connection").unwrap(),
41 TRAILER,
42 TRANSFER_ENCODING,
43 UPGRADE,
44 ];
45
46 for header in connection_headers.iter() {
47 if headers.remove(header).is_some() {
48 warn!("Connection header illegal in HTTP/2: {}", header.as_str());
49 }
50 }
51
52 if is_request {
53 if headers
54 .get(TE)
55 .map(|te_header| te_header != "trailers")
56 .unwrap_or(false)
57 {
58 warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
59 headers.remove(TE);
60 }
61 } else if headers.remove(TE).is_some() {
62 warn!("TE headers illegal in HTTP/2 responses");
63 }
64
65 if let Some(header) = headers.remove(CONNECTION) {
66 warn!(
67 "Connection header illegal in HTTP/2: {}",
68 CONNECTION.as_str()
69 );
70 let header_contents = header.to_str().unwrap();
71
72 // A `Connection` header may have a comma-separated list of names of other headers that
73 // are meant for only this specific connection.
74 //
75 // Iterate these names and remove them as headers. Connection-specific headers are
76 // forbidden in HTTP2, as that information has been moved into frame types of the h2
77 // protocol.
78 for name in header_contents.split(',') {
79 let name = name.trim();
80 headers.remove(name);
81 }
82 }
83}
84
85// body adapters used by both Client and Server
86
87pin_project! {
88 struct PipeToSendStream<S>
89 where
90 S: HttpBody,
91 {
92 body_tx: SendStream<SendBuf<S::Data>>,
93 data_done: bool,
94 #[pin]
95 stream: S,
96 }
97}
98
99impl<S> PipeToSendStream<S>
100where
101 S: HttpBody,
102{
103 fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
104 PipeToSendStream {
105 body_tx: tx,
106 data_done: false,
107 stream,
108 }
109 }
110}
111
112impl<S> Future for PipeToSendStream<S>
113where
114 S: HttpBody,
115 S::Error: Into<Box<dyn StdError + Send + Sync>>,
116{
117 type Output = crate::Result<()>;
118
119 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
120 let mut me = self.project();
121 loop {
122 if !*me.data_done {
123 // we don't have the next chunk of data yet, so just reserve 1 byte to make
124 // sure there's some capacity available. h2 will handle the capacity management
125 // for the actual body chunk.
126 me.body_tx.reserve_capacity(1);
127
128 if me.body_tx.capacity() == 0 {
129 loop {
130 match ready!(me.body_tx.poll_capacity(cx)) {
131 Some(Ok(0)) => {}
132 Some(Ok(_)) => break,
133 Some(Err(e)) => {
134 return Poll::Ready(Err(crate::Error::new_body_write(e)))
135 }
136 None => {
137 // None means the stream is no longer in a
138 // streaming state, we either finished it
139 // somehow, or the remote reset us.
140 return Poll::Ready(Err(crate::Error::new_body_write(
141 "send stream capacity unexpectedly closed",
142 )));
143 }
144 }
145 }
146 } else if let Poll::Ready(reason) = me
147 .body_tx
148 .poll_reset(cx)
149 .map_err(crate::Error::new_body_write)?
150 {
151 debug!("stream received RST_STREAM: {:?}", reason);
152 return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
153 reason,
154 ))));
155 }
156
157 match ready!(me.stream.as_mut().poll_data(cx)) {
158 Some(Ok(chunk)) => {
159 let is_eos = me.stream.is_end_stream();
160 trace!(
161 "send body chunk: {} bytes, eos={}",
162 chunk.remaining(),
163 is_eos,
164 );
165
166 let buf = SendBuf::Buf(chunk);
167 me.body_tx
168 .send_data(buf, is_eos)
169 .map_err(crate::Error::new_body_write)?;
170
171 if is_eos {
172 return Poll::Ready(Ok(()));
173 }
174 }
175 Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
176 None => {
177 me.body_tx.reserve_capacity(0);
178 let is_eos = me.stream.is_end_stream();
179 if is_eos {
180 return Poll::Ready(me.body_tx.send_eos_frame());
181 } else {
182 *me.data_done = true;
183 // loop again to poll_trailers
184 }
185 }
186 }
187 } else {
188 if let Poll::Ready(reason) = me
189 .body_tx
190 .poll_reset(cx)
191 .map_err(crate::Error::new_body_write)?
192 {
193 debug!("stream received RST_STREAM: {:?}", reason);
194 return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
195 reason,
196 ))));
197 }
198
199 match ready!(me.stream.poll_trailers(cx)) {
200 Ok(Some(trailers)) => {
201 me.body_tx
202 .send_trailers(trailers)
203 .map_err(crate::Error::new_body_write)?;
204 return Poll::Ready(Ok(()));
205 }
206 Ok(None) => {
207 // There were no trailers, so send an empty DATA frame...
208 return Poll::Ready(me.body_tx.send_eos_frame());
209 }
210 Err(e) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
211 }
212 }
213 }
214 }
215}
216
217trait SendStreamExt {
218 fn on_user_err<E>(&mut self, err: E) -> crate::Error
219 where
220 E: Into<Box<dyn std::error::Error + Send + Sync>>;
221 fn send_eos_frame(&mut self) -> crate::Result<()>;
222}
223
224impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
225 fn on_user_err<E>(&mut self, err: E) -> crate::Error
226 where
227 E: Into<Box<dyn std::error::Error + Send + Sync>>,
228 {
229 let err: Error = crate::Error::new_user_body(cause:err);
230 debug!("send body user stream error: {}", err);
231 self.send_reset(err.h2_reason());
232 err
233 }
234
235 fn send_eos_frame(&mut self) -> crate::Result<()> {
236 trace!("send body eos");
237 self.send_data(SendBuf::None, true)
238 .map_err(op:crate::Error::new_body_write)
239 }
240}
241
242#[repr(usize)]
243enum SendBuf<B> {
244 Buf(B),
245 Cursor(Cursor<Box<[u8]>>),
246 None,
247}
248
249impl<B: Buf> Buf for SendBuf<B> {
250 #[inline]
251 fn remaining(&self) -> usize {
252 match *self {
253 Self::Buf(ref b) => b.remaining(),
254 Self::Cursor(ref c) => Buf::remaining(c),
255 Self::None => 0,
256 }
257 }
258
259 #[inline]
260 fn chunk(&self) -> &[u8] {
261 match *self {
262 Self::Buf(ref b) => b.chunk(),
263 Self::Cursor(ref c) => c.chunk(),
264 Self::None => &[],
265 }
266 }
267
268 #[inline]
269 fn advance(&mut self, cnt: usize) {
270 match *self {
271 Self::Buf(ref mut b) => b.advance(cnt),
272 Self::Cursor(ref mut c) => c.advance(cnt),
273 Self::None => {}
274 }
275 }
276
277 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
278 match *self {
279 Self::Buf(ref b) => b.chunks_vectored(dst),
280 Self::Cursor(ref c) => c.chunks_vectored(dst),
281 Self::None => 0,
282 }
283 }
284}
285
286struct H2Upgraded<B>
287where
288 B: Buf,
289{
290 ping: Recorder,
291 send_stream: UpgradedSendStream<B>,
292 recv_stream: RecvStream,
293 buf: Bytes,
294}
295
296impl<B> AsyncRead for H2Upgraded<B>
297where
298 B: Buf,
299{
300 fn poll_read(
301 mut self: Pin<&mut Self>,
302 cx: &mut Context<'_>,
303 read_buf: &mut ReadBuf<'_>,
304 ) -> Poll<Result<(), io::Error>> {
305 if self.buf.is_empty() {
306 self.buf = loop {
307 match ready!(self.recv_stream.poll_data(cx)) {
308 None => return Poll::Ready(Ok(())),
309 Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => {
310 continue
311 }
312 Some(Ok(buf)) => {
313 self.ping.record_data(buf.len());
314 break buf;
315 }
316 Some(Err(e)) => {
317 return Poll::Ready(match e.reason() {
318 Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()),
319 Some(Reason::STREAM_CLOSED) => {
320 Err(io::Error::new(io::ErrorKind::BrokenPipe, e))
321 }
322 _ => Err(h2_to_io_error(e)),
323 })
324 }
325 }
326 };
327 }
328 let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
329 read_buf.put_slice(&self.buf[..cnt]);
330 self.buf.advance(cnt);
331 let _ = self.recv_stream.flow_control().release_capacity(cnt);
332 Poll::Ready(Ok(()))
333 }
334}
335
336impl<B> AsyncWrite for H2Upgraded<B>
337where
338 B: Buf,
339{
340 fn poll_write(
341 mut self: Pin<&mut Self>,
342 cx: &mut Context<'_>,
343 buf: &[u8],
344 ) -> Poll<Result<usize, io::Error>> {
345 if buf.is_empty() {
346 return Poll::Ready(Ok(0));
347 }
348 self.send_stream.reserve_capacity(buf.len());
349
350 // We ignore all errors returned by `poll_capacity` and `write`, as we
351 // will get the correct from `poll_reset` anyway.
352 let cnt = match ready!(self.send_stream.poll_capacity(cx)) {
353 None => Some(0),
354 Some(Ok(cnt)) => self
355 .send_stream
356 .write(&buf[..cnt], false)
357 .ok()
358 .map(|()| cnt),
359 Some(Err(_)) => None,
360 };
361
362 if let Some(cnt) = cnt {
363 return Poll::Ready(Ok(cnt));
364 }
365
366 Poll::Ready(Err(h2_to_io_error(
367 match ready!(self.send_stream.poll_reset(cx)) {
368 Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
369 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
370 }
371 Ok(reason) => reason.into(),
372 Err(e) => e,
373 },
374 )))
375 }
376
377 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
378 Poll::Ready(Ok(()))
379 }
380
381 fn poll_shutdown(
382 mut self: Pin<&mut Self>,
383 cx: &mut Context<'_>,
384 ) -> Poll<Result<(), io::Error>> {
385 if self.send_stream.write(&[], true).is_ok() {
386 return Poll::Ready(Ok(()))
387 }
388
389 Poll::Ready(Err(h2_to_io_error(
390 match ready!(self.send_stream.poll_reset(cx)) {
391 Ok(Reason::NO_ERROR) => {
392 return Poll::Ready(Ok(()))
393 }
394 Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
395 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
396 }
397 Ok(reason) => reason.into(),
398 Err(e) => e,
399 },
400 )))
401 }
402}
403
404fn h2_to_io_error(e: h2::Error) -> io::Error {
405 if e.is_io() {
406 e.into_io().unwrap()
407 } else {
408 io::Error::new(kind:io::ErrorKind::Other, error:e)
409 }
410}
411
412struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>);
413
414impl<B> UpgradedSendStream<B>
415where
416 B: Buf,
417{
418 unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self {
419 assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>());
420 Self(mem::transmute(inner))
421 }
422
423 fn reserve_capacity(&mut self, cnt: usize) {
424 unsafe { self.as_inner_unchecked().reserve_capacity(cnt) }
425 }
426
427 fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> {
428 unsafe { self.as_inner_unchecked().poll_capacity(cx) }
429 }
430
431 fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> {
432 unsafe { self.as_inner_unchecked().poll_reset(cx) }
433 }
434
435 fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), io::Error> {
436 let send_buf = SendBuf::Cursor(Cursor::new(buf.into()));
437 unsafe {
438 self.as_inner_unchecked()
439 .send_data(send_buf, end_of_stream)
440 .map_err(h2_to_io_error)
441 }
442 }
443
444 unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> {
445 &mut *(&mut self.0 as *mut _ as *mut _)
446 }
447}
448
449#[repr(transparent)]
450struct Neutered<B> {
451 _inner: B,
452 impossible: Impossible,
453}
454
455enum Impossible {}
456
457unsafe impl<B> Send for Neutered<B> {}
458
459impl<B> Buf for Neutered<B> {
460 fn remaining(&self) -> usize {
461 match self.impossible {}
462 }
463
464 fn chunk(&self) -> &[u8] {
465 match self.impossible {}
466 }
467
468 fn advance(&mut self, _cnt: usize) {
469 match self.impossible {}
470 }
471}
472