1 | use bytes::{Buf, Bytes}; |
2 | use h2::{Reason, RecvStream, SendStream}; |
3 | use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRADE}; |
4 | use http::HeaderMap; |
5 | use pin_project_lite::pin_project; |
6 | use std::error::Error as StdError; |
7 | use std::future::Future; |
8 | use std::io::{self, Cursor, IoSlice}; |
9 | use std::mem; |
10 | use std::pin::Pin; |
11 | use std::task::{Context, Poll}; |
12 | use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; |
13 | use tracing::{debug, trace, warn}; |
14 | |
15 | use crate::body::HttpBody; |
16 | use crate::proto::h2::ping::Recorder; |
17 | |
18 | pub(crate) mod ping; |
19 | |
20 | cfg_client! { |
21 | pub(crate) mod client; |
22 | pub(crate) use self::client::ClientTask; |
23 | } |
24 | |
25 | cfg_server! { |
26 | pub(crate) mod server; |
27 | pub(crate) use self::server::Server; |
28 | } |
29 | |
30 | /// Default initial stream window size defined in HTTP2 spec. |
31 | pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535; |
32 | |
33 | fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { |
34 | // List of connection headers from: |
35 | // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection |
36 | // |
37 | // TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're |
38 | // tested separately. |
39 | let connection_headers = [ |
40 | HeaderName::from_lowercase(b"keep-alive" ).unwrap(), |
41 | HeaderName::from_lowercase(b"proxy-connection" ).unwrap(), |
42 | TRAILER, |
43 | TRANSFER_ENCODING, |
44 | UPGRADE, |
45 | ]; |
46 | |
47 | for header in connection_headers.iter() { |
48 | if headers.remove(header).is_some() { |
49 | warn!("Connection header illegal in HTTP/2: {}" , header.as_str()); |
50 | } |
51 | } |
52 | |
53 | if is_request { |
54 | if headers |
55 | .get(TE) |
56 | .map(|te_header| te_header != "trailers" ) |
57 | .unwrap_or(false) |
58 | { |
59 | warn!("TE headers not set to \"trailers \" are illegal in HTTP/2 requests" ); |
60 | headers.remove(TE); |
61 | } |
62 | } else if headers.remove(TE).is_some() { |
63 | warn!("TE headers illegal in HTTP/2 responses" ); |
64 | } |
65 | |
66 | if let Some(header) = headers.remove(CONNECTION) { |
67 | warn!( |
68 | "Connection header illegal in HTTP/2: {}" , |
69 | CONNECTION.as_str() |
70 | ); |
71 | let header_contents = header.to_str().unwrap(); |
72 | |
73 | // A `Connection` header may have a comma-separated list of names of other headers that |
74 | // are meant for only this specific connection. |
75 | // |
76 | // Iterate these names and remove them as headers. Connection-specific headers are |
77 | // forbidden in HTTP2, as that information has been moved into frame types of the h2 |
78 | // protocol. |
79 | for name in header_contents.split(',' ) { |
80 | let name = name.trim(); |
81 | headers.remove(name); |
82 | } |
83 | } |
84 | } |
85 | |
86 | // body adapters used by both Client and Server |
87 | |
88 | pin_project! { |
89 | struct PipeToSendStream<S> |
90 | where |
91 | S: HttpBody, |
92 | { |
93 | body_tx: SendStream<SendBuf<S::Data>>, |
94 | data_done: bool, |
95 | #[pin] |
96 | stream: S, |
97 | } |
98 | } |
99 | |
100 | impl<S> PipeToSendStream<S> |
101 | where |
102 | S: HttpBody, |
103 | { |
104 | fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> { |
105 | PipeToSendStream { |
106 | body_tx: tx, |
107 | data_done: false, |
108 | stream, |
109 | } |
110 | } |
111 | } |
112 | |
113 | impl<S> Future for PipeToSendStream<S> |
114 | where |
115 | S: HttpBody, |
116 | S::Error: Into<Box<dyn StdError + Send + Sync>>, |
117 | { |
118 | type Output = crate::Result<()>; |
119 | |
120 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
121 | let mut me = self.project(); |
122 | loop { |
123 | if !*me.data_done { |
124 | // we don't have the next chunk of data yet, so just reserve 1 byte to make |
125 | // sure there's some capacity available. h2 will handle the capacity management |
126 | // for the actual body chunk. |
127 | me.body_tx.reserve_capacity(1); |
128 | |
129 | if me.body_tx.capacity() == 0 { |
130 | loop { |
131 | match ready!(me.body_tx.poll_capacity(cx)) { |
132 | Some(Ok(0)) => {} |
133 | Some(Ok(_)) => break, |
134 | Some(Err(e)) => { |
135 | return Poll::Ready(Err(crate::Error::new_body_write(e))) |
136 | } |
137 | None => { |
138 | // None means the stream is no longer in a |
139 | // streaming state, we either finished it |
140 | // somehow, or the remote reset us. |
141 | return Poll::Ready(Err(crate::Error::new_body_write( |
142 | "send stream capacity unexpectedly closed" , |
143 | ))); |
144 | } |
145 | } |
146 | } |
147 | } else if let Poll::Ready(reason) = me |
148 | .body_tx |
149 | .poll_reset(cx) |
150 | .map_err(crate::Error::new_body_write)? |
151 | { |
152 | debug!("stream received RST_STREAM: {:?}" , reason); |
153 | return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from( |
154 | reason, |
155 | )))); |
156 | } |
157 | |
158 | match ready!(me.stream.as_mut().poll_data(cx)) { |
159 | Some(Ok(chunk)) => { |
160 | let is_eos = me.stream.is_end_stream(); |
161 | trace!( |
162 | "send body chunk: {} bytes, eos= {}" , |
163 | chunk.remaining(), |
164 | is_eos, |
165 | ); |
166 | |
167 | let buf = SendBuf::Buf(chunk); |
168 | me.body_tx |
169 | .send_data(buf, is_eos) |
170 | .map_err(crate::Error::new_body_write)?; |
171 | |
172 | if is_eos { |
173 | return Poll::Ready(Ok(())); |
174 | } |
175 | } |
176 | Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), |
177 | None => { |
178 | me.body_tx.reserve_capacity(0); |
179 | let is_eos = me.stream.is_end_stream(); |
180 | if is_eos { |
181 | return Poll::Ready(me.body_tx.send_eos_frame()); |
182 | } else { |
183 | *me.data_done = true; |
184 | // loop again to poll_trailers |
185 | } |
186 | } |
187 | } |
188 | } else { |
189 | if let Poll::Ready(reason) = me |
190 | .body_tx |
191 | .poll_reset(cx) |
192 | .map_err(crate::Error::new_body_write)? |
193 | { |
194 | debug!("stream received RST_STREAM: {:?}" , reason); |
195 | return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from( |
196 | reason, |
197 | )))); |
198 | } |
199 | |
200 | match ready!(me.stream.poll_trailers(cx)) { |
201 | Ok(Some(trailers)) => { |
202 | me.body_tx |
203 | .send_trailers(trailers) |
204 | .map_err(crate::Error::new_body_write)?; |
205 | return Poll::Ready(Ok(())); |
206 | } |
207 | Ok(None) => { |
208 | // There were no trailers, so send an empty DATA frame... |
209 | return Poll::Ready(me.body_tx.send_eos_frame()); |
210 | } |
211 | Err(e) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), |
212 | } |
213 | } |
214 | } |
215 | } |
216 | } |
217 | |
218 | trait SendStreamExt { |
219 | fn on_user_err<E>(&mut self, err: E) -> crate::Error |
220 | where |
221 | E: Into<Box<dyn std::error::Error + Send + Sync>>; |
222 | fn send_eos_frame(&mut self) -> crate::Result<()>; |
223 | } |
224 | |
225 | impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> { |
226 | fn on_user_err<E>(&mut self, err: E) -> crate::Error |
227 | where |
228 | E: Into<Box<dyn std::error::Error + Send + Sync>>, |
229 | { |
230 | let err: Error = crate::Error::new_user_body(cause:err); |
231 | debug!("send body user stream error: {}" , err); |
232 | self.send_reset(err.h2_reason()); |
233 | err |
234 | } |
235 | |
236 | fn send_eos_frame(&mut self) -> crate::Result<()> { |
237 | trace!("send body eos" ); |
238 | self.send_data(SendBuf::None, true) |
239 | .map_err(op:crate::Error::new_body_write) |
240 | } |
241 | } |
242 | |
243 | #[repr (usize)] |
244 | enum SendBuf<B> { |
245 | Buf(B), |
246 | Cursor(Cursor<Box<[u8]>>), |
247 | None, |
248 | } |
249 | |
250 | impl<B: Buf> Buf for SendBuf<B> { |
251 | #[inline ] |
252 | fn remaining(&self) -> usize { |
253 | match *self { |
254 | Self::Buf(ref b) => b.remaining(), |
255 | Self::Cursor(ref c) => Buf::remaining(c), |
256 | Self::None => 0, |
257 | } |
258 | } |
259 | |
260 | #[inline ] |
261 | fn chunk(&self) -> &[u8] { |
262 | match *self { |
263 | Self::Buf(ref b) => b.chunk(), |
264 | Self::Cursor(ref c) => c.chunk(), |
265 | Self::None => &[], |
266 | } |
267 | } |
268 | |
269 | #[inline ] |
270 | fn advance(&mut self, cnt: usize) { |
271 | match *self { |
272 | Self::Buf(ref mut b) => b.advance(cnt), |
273 | Self::Cursor(ref mut c) => c.advance(cnt), |
274 | Self::None => {} |
275 | } |
276 | } |
277 | |
278 | fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { |
279 | match *self { |
280 | Self::Buf(ref b) => b.chunks_vectored(dst), |
281 | Self::Cursor(ref c) => c.chunks_vectored(dst), |
282 | Self::None => 0, |
283 | } |
284 | } |
285 | } |
286 | |
287 | struct H2Upgraded<B> |
288 | where |
289 | B: Buf, |
290 | { |
291 | ping: Recorder, |
292 | send_stream: UpgradedSendStream<B>, |
293 | recv_stream: RecvStream, |
294 | buf: Bytes, |
295 | } |
296 | |
297 | impl<B> AsyncRead for H2Upgraded<B> |
298 | where |
299 | B: Buf, |
300 | { |
301 | fn poll_read( |
302 | mut self: Pin<&mut Self>, |
303 | cx: &mut Context<'_>, |
304 | read_buf: &mut ReadBuf<'_>, |
305 | ) -> Poll<Result<(), io::Error>> { |
306 | if self.buf.is_empty() { |
307 | self.buf = loop { |
308 | match ready!(self.recv_stream.poll_data(cx)) { |
309 | None => return Poll::Ready(Ok(())), |
310 | Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => { |
311 | continue |
312 | } |
313 | Some(Ok(buf)) => { |
314 | self.ping.record_data(buf.len()); |
315 | break buf; |
316 | } |
317 | Some(Err(e)) => { |
318 | return Poll::Ready(match e.reason() { |
319 | Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()), |
320 | Some(Reason::STREAM_CLOSED) => { |
321 | Err(io::Error::new(io::ErrorKind::BrokenPipe, e)) |
322 | } |
323 | _ => Err(h2_to_io_error(e)), |
324 | }) |
325 | } |
326 | } |
327 | }; |
328 | } |
329 | let cnt = std::cmp::min(self.buf.len(), read_buf.remaining()); |
330 | read_buf.put_slice(&self.buf[..cnt]); |
331 | self.buf.advance(cnt); |
332 | let _ = self.recv_stream.flow_control().release_capacity(cnt); |
333 | Poll::Ready(Ok(())) |
334 | } |
335 | } |
336 | |
337 | impl<B> AsyncWrite for H2Upgraded<B> |
338 | where |
339 | B: Buf, |
340 | { |
341 | fn poll_write( |
342 | mut self: Pin<&mut Self>, |
343 | cx: &mut Context<'_>, |
344 | buf: &[u8], |
345 | ) -> Poll<Result<usize, io::Error>> { |
346 | if buf.is_empty() { |
347 | return Poll::Ready(Ok(0)); |
348 | } |
349 | self.send_stream.reserve_capacity(buf.len()); |
350 | |
351 | // We ignore all errors returned by `poll_capacity` and `write`, as we |
352 | // will get the correct from `poll_reset` anyway. |
353 | let cnt = match ready!(self.send_stream.poll_capacity(cx)) { |
354 | None => Some(0), |
355 | Some(Ok(cnt)) => self |
356 | .send_stream |
357 | .write(&buf[..cnt], false) |
358 | .ok() |
359 | .map(|()| cnt), |
360 | Some(Err(_)) => None, |
361 | }; |
362 | |
363 | if let Some(cnt) = cnt { |
364 | return Poll::Ready(Ok(cnt)); |
365 | } |
366 | |
367 | Poll::Ready(Err(h2_to_io_error( |
368 | match ready!(self.send_stream.poll_reset(cx)) { |
369 | Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { |
370 | return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())) |
371 | } |
372 | Ok(reason) => reason.into(), |
373 | Err(e) => e, |
374 | }, |
375 | ))) |
376 | } |
377 | |
378 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
379 | Poll::Ready(Ok(())) |
380 | } |
381 | |
382 | fn poll_shutdown( |
383 | mut self: Pin<&mut Self>, |
384 | cx: &mut Context<'_>, |
385 | ) -> Poll<Result<(), io::Error>> { |
386 | if self.send_stream.write(&[], true).is_ok() { |
387 | return Poll::Ready(Ok(())); |
388 | } |
389 | |
390 | Poll::Ready(Err(h2_to_io_error( |
391 | match ready!(self.send_stream.poll_reset(cx)) { |
392 | Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())), |
393 | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { |
394 | return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())) |
395 | } |
396 | Ok(reason) => reason.into(), |
397 | Err(e) => e, |
398 | }, |
399 | ))) |
400 | } |
401 | } |
402 | |
403 | fn h2_to_io_error(e: h2::Error) -> io::Error { |
404 | if e.is_io() { |
405 | e.into_io().unwrap() |
406 | } else { |
407 | io::Error::new(kind:io::ErrorKind::Other, error:e) |
408 | } |
409 | } |
410 | |
411 | struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>); |
412 | |
413 | impl<B> UpgradedSendStream<B> |
414 | where |
415 | B: Buf, |
416 | { |
417 | unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self { |
418 | assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>()); |
419 | Self(mem::transmute(inner)) |
420 | } |
421 | |
422 | fn reserve_capacity(&mut self, cnt: usize) { |
423 | unsafe { self.as_inner_unchecked().reserve_capacity(cnt) } |
424 | } |
425 | |
426 | fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> { |
427 | unsafe { self.as_inner_unchecked().poll_capacity(cx) } |
428 | } |
429 | |
430 | fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> { |
431 | unsafe { self.as_inner_unchecked().poll_reset(cx) } |
432 | } |
433 | |
434 | fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), io::Error> { |
435 | let send_buf = SendBuf::Cursor(Cursor::new(buf.into())); |
436 | unsafe { |
437 | self.as_inner_unchecked() |
438 | .send_data(send_buf, end_of_stream) |
439 | .map_err(h2_to_io_error) |
440 | } |
441 | } |
442 | |
443 | unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> { |
444 | &mut *(&mut self.0 as *mut _ as *mut _) |
445 | } |
446 | } |
447 | |
448 | #[repr (transparent)] |
449 | struct Neutered<B> { |
450 | _inner: B, |
451 | impossible: Impossible, |
452 | } |
453 | |
454 | enum Impossible {} |
455 | |
456 | unsafe impl<B> Send for Neutered<B> {} |
457 | |
458 | impl<B> Buf for Neutered<B> { |
459 | fn remaining(&self) -> usize { |
460 | match self.impossible {} |
461 | } |
462 | |
463 | fn chunk(&self) -> &[u8] { |
464 | match self.impossible {} |
465 | } |
466 | |
467 | fn advance(&mut self, _cnt: usize) { |
468 | match self.impossible {} |
469 | } |
470 | } |
471 | |