1 | use bytes::{Buf, Bytes}; |
2 | use h2::{Reason, RecvStream, SendStream}; |
3 | use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRADE}; |
4 | use http::HeaderMap; |
5 | use pin_project_lite::pin_project; |
6 | use std::error::Error as StdError; |
7 | use std::io::{self, Cursor, IoSlice}; |
8 | use std::mem; |
9 | use std::task::Context; |
10 | use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; |
11 | use tracing::{debug, trace, warn}; |
12 | |
13 | use crate::body::HttpBody; |
14 | use crate::common::{task, Future, Pin, Poll}; |
15 | use crate::proto::h2::ping::Recorder; |
16 | |
17 | pub(crate) mod ping; |
18 | |
19 | cfg_client! { |
20 | pub(crate) mod client; |
21 | pub(crate) use self::client::ClientTask; |
22 | } |
23 | |
24 | cfg_server! { |
25 | pub(crate) mod server; |
26 | pub(crate) use self::server::Server; |
27 | } |
28 | |
29 | /// Default initial stream window size defined in HTTP2 spec. |
30 | pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535; |
31 | |
32 | fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { |
33 | // List of connection headers from: |
34 | // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection |
35 | // |
36 | // TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're |
37 | // tested separately. |
38 | let connection_headers = [ |
39 | HeaderName::from_lowercase(b"keep-alive" ).unwrap(), |
40 | HeaderName::from_lowercase(b"proxy-connection" ).unwrap(), |
41 | TRAILER, |
42 | TRANSFER_ENCODING, |
43 | UPGRADE, |
44 | ]; |
45 | |
46 | for header in connection_headers.iter() { |
47 | if headers.remove(header).is_some() { |
48 | warn!("Connection header illegal in HTTP/2: {}" , header.as_str()); |
49 | } |
50 | } |
51 | |
52 | if is_request { |
53 | if headers |
54 | .get(TE) |
55 | .map(|te_header| te_header != "trailers" ) |
56 | .unwrap_or(false) |
57 | { |
58 | warn!("TE headers not set to \"trailers \" are illegal in HTTP/2 requests" ); |
59 | headers.remove(TE); |
60 | } |
61 | } else if headers.remove(TE).is_some() { |
62 | warn!("TE headers illegal in HTTP/2 responses" ); |
63 | } |
64 | |
65 | if let Some(header) = headers.remove(CONNECTION) { |
66 | warn!( |
67 | "Connection header illegal in HTTP/2: {}" , |
68 | CONNECTION.as_str() |
69 | ); |
70 | let header_contents = header.to_str().unwrap(); |
71 | |
72 | // A `Connection` header may have a comma-separated list of names of other headers that |
73 | // are meant for only this specific connection. |
74 | // |
75 | // Iterate these names and remove them as headers. Connection-specific headers are |
76 | // forbidden in HTTP2, as that information has been moved into frame types of the h2 |
77 | // protocol. |
78 | for name in header_contents.split(',' ) { |
79 | let name = name.trim(); |
80 | headers.remove(name); |
81 | } |
82 | } |
83 | } |
84 | |
85 | // body adapters used by both Client and Server |
86 | |
87 | pin_project! { |
88 | struct PipeToSendStream<S> |
89 | where |
90 | S: HttpBody, |
91 | { |
92 | body_tx: SendStream<SendBuf<S::Data>>, |
93 | data_done: bool, |
94 | #[pin] |
95 | stream: S, |
96 | } |
97 | } |
98 | |
99 | impl<S> PipeToSendStream<S> |
100 | where |
101 | S: HttpBody, |
102 | { |
103 | fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> { |
104 | PipeToSendStream { |
105 | body_tx: tx, |
106 | data_done: false, |
107 | stream, |
108 | } |
109 | } |
110 | } |
111 | |
112 | impl<S> Future for PipeToSendStream<S> |
113 | where |
114 | S: HttpBody, |
115 | S::Error: Into<Box<dyn StdError + Send + Sync>>, |
116 | { |
117 | type Output = crate::Result<()>; |
118 | |
119 | fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |
120 | let mut me = self.project(); |
121 | loop { |
122 | if !*me.data_done { |
123 | // we don't have the next chunk of data yet, so just reserve 1 byte to make |
124 | // sure there's some capacity available. h2 will handle the capacity management |
125 | // for the actual body chunk. |
126 | me.body_tx.reserve_capacity(1); |
127 | |
128 | if me.body_tx.capacity() == 0 { |
129 | loop { |
130 | match ready!(me.body_tx.poll_capacity(cx)) { |
131 | Some(Ok(0)) => {} |
132 | Some(Ok(_)) => break, |
133 | Some(Err(e)) => { |
134 | return Poll::Ready(Err(crate::Error::new_body_write(e))) |
135 | } |
136 | None => { |
137 | // None means the stream is no longer in a |
138 | // streaming state, we either finished it |
139 | // somehow, or the remote reset us. |
140 | return Poll::Ready(Err(crate::Error::new_body_write( |
141 | "send stream capacity unexpectedly closed" , |
142 | ))); |
143 | } |
144 | } |
145 | } |
146 | } else if let Poll::Ready(reason) = me |
147 | .body_tx |
148 | .poll_reset(cx) |
149 | .map_err(crate::Error::new_body_write)? |
150 | { |
151 | debug!("stream received RST_STREAM: {:?}" , reason); |
152 | return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from( |
153 | reason, |
154 | )))); |
155 | } |
156 | |
157 | match ready!(me.stream.as_mut().poll_data(cx)) { |
158 | Some(Ok(chunk)) => { |
159 | let is_eos = me.stream.is_end_stream(); |
160 | trace!( |
161 | "send body chunk: {} bytes, eos= {}" , |
162 | chunk.remaining(), |
163 | is_eos, |
164 | ); |
165 | |
166 | let buf = SendBuf::Buf(chunk); |
167 | me.body_tx |
168 | .send_data(buf, is_eos) |
169 | .map_err(crate::Error::new_body_write)?; |
170 | |
171 | if is_eos { |
172 | return Poll::Ready(Ok(())); |
173 | } |
174 | } |
175 | Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), |
176 | None => { |
177 | me.body_tx.reserve_capacity(0); |
178 | let is_eos = me.stream.is_end_stream(); |
179 | if is_eos { |
180 | return Poll::Ready(me.body_tx.send_eos_frame()); |
181 | } else { |
182 | *me.data_done = true; |
183 | // loop again to poll_trailers |
184 | } |
185 | } |
186 | } |
187 | } else { |
188 | if let Poll::Ready(reason) = me |
189 | .body_tx |
190 | .poll_reset(cx) |
191 | .map_err(crate::Error::new_body_write)? |
192 | { |
193 | debug!("stream received RST_STREAM: {:?}" , reason); |
194 | return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from( |
195 | reason, |
196 | )))); |
197 | } |
198 | |
199 | match ready!(me.stream.poll_trailers(cx)) { |
200 | Ok(Some(trailers)) => { |
201 | me.body_tx |
202 | .send_trailers(trailers) |
203 | .map_err(crate::Error::new_body_write)?; |
204 | return Poll::Ready(Ok(())); |
205 | } |
206 | Ok(None) => { |
207 | // There were no trailers, so send an empty DATA frame... |
208 | return Poll::Ready(me.body_tx.send_eos_frame()); |
209 | } |
210 | Err(e) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), |
211 | } |
212 | } |
213 | } |
214 | } |
215 | } |
216 | |
217 | trait SendStreamExt { |
218 | fn on_user_err<E>(&mut self, err: E) -> crate::Error |
219 | where |
220 | E: Into<Box<dyn std::error::Error + Send + Sync>>; |
221 | fn send_eos_frame(&mut self) -> crate::Result<()>; |
222 | } |
223 | |
224 | impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> { |
225 | fn on_user_err<E>(&mut self, err: E) -> crate::Error |
226 | where |
227 | E: Into<Box<dyn std::error::Error + Send + Sync>>, |
228 | { |
229 | let err: Error = crate::Error::new_user_body(cause:err); |
230 | debug!("send body user stream error: {}" , err); |
231 | self.send_reset(err.h2_reason()); |
232 | err |
233 | } |
234 | |
235 | fn send_eos_frame(&mut self) -> crate::Result<()> { |
236 | trace!("send body eos" ); |
237 | self.send_data(SendBuf::None, true) |
238 | .map_err(op:crate::Error::new_body_write) |
239 | } |
240 | } |
241 | |
242 | #[repr (usize)] |
243 | enum SendBuf<B> { |
244 | Buf(B), |
245 | Cursor(Cursor<Box<[u8]>>), |
246 | None, |
247 | } |
248 | |
249 | impl<B: Buf> Buf for SendBuf<B> { |
250 | #[inline ] |
251 | fn remaining(&self) -> usize { |
252 | match *self { |
253 | Self::Buf(ref b) => b.remaining(), |
254 | Self::Cursor(ref c) => Buf::remaining(c), |
255 | Self::None => 0, |
256 | } |
257 | } |
258 | |
259 | #[inline ] |
260 | fn chunk(&self) -> &[u8] { |
261 | match *self { |
262 | Self::Buf(ref b) => b.chunk(), |
263 | Self::Cursor(ref c) => c.chunk(), |
264 | Self::None => &[], |
265 | } |
266 | } |
267 | |
268 | #[inline ] |
269 | fn advance(&mut self, cnt: usize) { |
270 | match *self { |
271 | Self::Buf(ref mut b) => b.advance(cnt), |
272 | Self::Cursor(ref mut c) => c.advance(cnt), |
273 | Self::None => {} |
274 | } |
275 | } |
276 | |
277 | fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { |
278 | match *self { |
279 | Self::Buf(ref b) => b.chunks_vectored(dst), |
280 | Self::Cursor(ref c) => c.chunks_vectored(dst), |
281 | Self::None => 0, |
282 | } |
283 | } |
284 | } |
285 | |
286 | struct H2Upgraded<B> |
287 | where |
288 | B: Buf, |
289 | { |
290 | ping: Recorder, |
291 | send_stream: UpgradedSendStream<B>, |
292 | recv_stream: RecvStream, |
293 | buf: Bytes, |
294 | } |
295 | |
296 | impl<B> AsyncRead for H2Upgraded<B> |
297 | where |
298 | B: Buf, |
299 | { |
300 | fn poll_read( |
301 | mut self: Pin<&mut Self>, |
302 | cx: &mut Context<'_>, |
303 | read_buf: &mut ReadBuf<'_>, |
304 | ) -> Poll<Result<(), io::Error>> { |
305 | if self.buf.is_empty() { |
306 | self.buf = loop { |
307 | match ready!(self.recv_stream.poll_data(cx)) { |
308 | None => return Poll::Ready(Ok(())), |
309 | Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => { |
310 | continue |
311 | } |
312 | Some(Ok(buf)) => { |
313 | self.ping.record_data(buf.len()); |
314 | break buf; |
315 | } |
316 | Some(Err(e)) => { |
317 | return Poll::Ready(match e.reason() { |
318 | Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()), |
319 | Some(Reason::STREAM_CLOSED) => { |
320 | Err(io::Error::new(io::ErrorKind::BrokenPipe, e)) |
321 | } |
322 | _ => Err(h2_to_io_error(e)), |
323 | }) |
324 | } |
325 | } |
326 | }; |
327 | } |
328 | let cnt = std::cmp::min(self.buf.len(), read_buf.remaining()); |
329 | read_buf.put_slice(&self.buf[..cnt]); |
330 | self.buf.advance(cnt); |
331 | let _ = self.recv_stream.flow_control().release_capacity(cnt); |
332 | Poll::Ready(Ok(())) |
333 | } |
334 | } |
335 | |
336 | impl<B> AsyncWrite for H2Upgraded<B> |
337 | where |
338 | B: Buf, |
339 | { |
340 | fn poll_write( |
341 | mut self: Pin<&mut Self>, |
342 | cx: &mut Context<'_>, |
343 | buf: &[u8], |
344 | ) -> Poll<Result<usize, io::Error>> { |
345 | if buf.is_empty() { |
346 | return Poll::Ready(Ok(0)); |
347 | } |
348 | self.send_stream.reserve_capacity(buf.len()); |
349 | |
350 | // We ignore all errors returned by `poll_capacity` and `write`, as we |
351 | // will get the correct from `poll_reset` anyway. |
352 | let cnt = match ready!(self.send_stream.poll_capacity(cx)) { |
353 | None => Some(0), |
354 | Some(Ok(cnt)) => self |
355 | .send_stream |
356 | .write(&buf[..cnt], false) |
357 | .ok() |
358 | .map(|()| cnt), |
359 | Some(Err(_)) => None, |
360 | }; |
361 | |
362 | if let Some(cnt) = cnt { |
363 | return Poll::Ready(Ok(cnt)); |
364 | } |
365 | |
366 | Poll::Ready(Err(h2_to_io_error( |
367 | match ready!(self.send_stream.poll_reset(cx)) { |
368 | Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { |
369 | return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())) |
370 | } |
371 | Ok(reason) => reason.into(), |
372 | Err(e) => e, |
373 | }, |
374 | ))) |
375 | } |
376 | |
377 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
378 | Poll::Ready(Ok(())) |
379 | } |
380 | |
381 | fn poll_shutdown( |
382 | mut self: Pin<&mut Self>, |
383 | cx: &mut Context<'_>, |
384 | ) -> Poll<Result<(), io::Error>> { |
385 | if self.send_stream.write(&[], true).is_ok() { |
386 | return Poll::Ready(Ok(())) |
387 | } |
388 | |
389 | Poll::Ready(Err(h2_to_io_error( |
390 | match ready!(self.send_stream.poll_reset(cx)) { |
391 | Ok(Reason::NO_ERROR) => { |
392 | return Poll::Ready(Ok(())) |
393 | } |
394 | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { |
395 | return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())) |
396 | } |
397 | Ok(reason) => reason.into(), |
398 | Err(e) => e, |
399 | }, |
400 | ))) |
401 | } |
402 | } |
403 | |
404 | fn h2_to_io_error(e: h2::Error) -> io::Error { |
405 | if e.is_io() { |
406 | e.into_io().unwrap() |
407 | } else { |
408 | io::Error::new(kind:io::ErrorKind::Other, error:e) |
409 | } |
410 | } |
411 | |
412 | struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>); |
413 | |
414 | impl<B> UpgradedSendStream<B> |
415 | where |
416 | B: Buf, |
417 | { |
418 | unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self { |
419 | assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>()); |
420 | Self(mem::transmute(inner)) |
421 | } |
422 | |
423 | fn reserve_capacity(&mut self, cnt: usize) { |
424 | unsafe { self.as_inner_unchecked().reserve_capacity(cnt) } |
425 | } |
426 | |
427 | fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> { |
428 | unsafe { self.as_inner_unchecked().poll_capacity(cx) } |
429 | } |
430 | |
431 | fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> { |
432 | unsafe { self.as_inner_unchecked().poll_reset(cx) } |
433 | } |
434 | |
435 | fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), io::Error> { |
436 | let send_buf = SendBuf::Cursor(Cursor::new(buf.into())); |
437 | unsafe { |
438 | self.as_inner_unchecked() |
439 | .send_data(send_buf, end_of_stream) |
440 | .map_err(h2_to_io_error) |
441 | } |
442 | } |
443 | |
444 | unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> { |
445 | &mut *(&mut self.0 as *mut _ as *mut _) |
446 | } |
447 | } |
448 | |
449 | #[repr (transparent)] |
450 | struct Neutered<B> { |
451 | _inner: B, |
452 | impossible: Impossible, |
453 | } |
454 | |
455 | enum Impossible {} |
456 | |
457 | unsafe impl<B> Send for Neutered<B> {} |
458 | |
459 | impl<B> Buf for Neutered<B> { |
460 | fn remaining(&self) -> usize { |
461 | match self.impossible {} |
462 | } |
463 | |
464 | fn chunk(&self) -> &[u8] { |
465 | match self.impossible {} |
466 | } |
467 | |
468 | fn advance(&mut self, _cnt: usize) { |
469 | match self.impossible {} |
470 | } |
471 | } |
472 | |