1 | use std::error::Error as StdError; |
2 | use std::future::Future; |
3 | use std::io::{Cursor, IoSlice}; |
4 | use std::mem; |
5 | use std::pin::Pin; |
6 | use std::task::{Context, Poll}; |
7 | |
8 | use bytes::{Buf, Bytes}; |
9 | use futures_util::ready; |
10 | use h2::{Reason, RecvStream, SendStream}; |
11 | use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE}; |
12 | use http::HeaderMap; |
13 | use pin_project_lite::pin_project; |
14 | |
15 | use crate::body::Body; |
16 | use crate::proto::h2::ping::Recorder; |
17 | use crate::rt::{Read, ReadBufCursor, Write}; |
18 | |
19 | pub(crate) mod ping; |
20 | |
21 | cfg_client! { |
22 | pub(crate) mod client; |
23 | pub(crate) use self::client::ClientTask; |
24 | } |
25 | |
26 | cfg_server! { |
27 | pub(crate) mod server; |
28 | pub(crate) use self::server::Server; |
29 | } |
30 | |
31 | /// Default initial stream window size defined in HTTP2 spec. |
32 | pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535; |
33 | |
34 | // List of connection headers from RFC 9110 Section 7.6.1 |
35 | // |
36 | // TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're |
37 | // tested separately. |
38 | static CONNECTION_HEADERS: [HeaderName; 4] = [ |
39 | HeaderName::from_static(src:"keep-alive" ), |
40 | HeaderName::from_static(src:"proxy-connection" ), |
41 | TRANSFER_ENCODING, |
42 | UPGRADE, |
43 | ]; |
44 | |
45 | fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { |
46 | for header in &CONNECTION_HEADERS { |
47 | if headers.remove(header).is_some() { |
48 | warn!("Connection header illegal in HTTP/2: {}" , header.as_str()); |
49 | } |
50 | } |
51 | |
52 | if is_request { |
53 | if headers |
54 | .get(TE) |
55 | .map_or(false, |te_header| te_header != "trailers" ) |
56 | { |
57 | warn!("TE headers not set to \"trailers \" are illegal in HTTP/2 requests" ); |
58 | headers.remove(TE); |
59 | } |
60 | } else if headers.remove(TE).is_some() { |
61 | warn!("TE headers illegal in HTTP/2 responses" ); |
62 | } |
63 | |
64 | if let Some(header) = headers.remove(CONNECTION) { |
65 | warn!( |
66 | "Connection header illegal in HTTP/2: {}" , |
67 | CONNECTION.as_str() |
68 | ); |
69 | let header_contents = header.to_str().unwrap(); |
70 | |
71 | // A `Connection` header may have a comma-separated list of names of other headers that |
72 | // are meant for only this specific connection. |
73 | // |
74 | // Iterate these names and remove them as headers. Connection-specific headers are |
75 | // forbidden in HTTP2, as that information has been moved into frame types of the h2 |
76 | // protocol. |
77 | for name in header_contents.split(',' ) { |
78 | let name = name.trim(); |
79 | headers.remove(name); |
80 | } |
81 | } |
82 | } |
83 | |
84 | // body adapters used by both Client and Server |
85 | |
86 | pin_project! { |
87 | pub(crate) struct PipeToSendStream<S> |
88 | where |
89 | S: Body, |
90 | { |
91 | body_tx: SendStream<SendBuf<S::Data>>, |
92 | data_done: bool, |
93 | #[pin] |
94 | stream: S, |
95 | } |
96 | } |
97 | |
98 | impl<S> PipeToSendStream<S> |
99 | where |
100 | S: Body, |
101 | { |
102 | fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> { |
103 | PipeToSendStream { |
104 | body_tx: tx, |
105 | data_done: false, |
106 | stream, |
107 | } |
108 | } |
109 | } |
110 | |
111 | impl<S> Future for PipeToSendStream<S> |
112 | where |
113 | S: Body, |
114 | S::Error: Into<Box<dyn StdError + Send + Sync>>, |
115 | { |
116 | type Output = crate::Result<()>; |
117 | |
118 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
119 | let mut me = self.project(); |
120 | loop { |
121 | // we don't have the next chunk of data yet, so just reserve 1 byte to make |
122 | // sure there's some capacity available. h2 will handle the capacity management |
123 | // for the actual body chunk. |
124 | me.body_tx.reserve_capacity(1); |
125 | |
126 | if me.body_tx.capacity() == 0 { |
127 | loop { |
128 | match ready!(me.body_tx.poll_capacity(cx)) { |
129 | Some(Ok(0)) => {} |
130 | Some(Ok(_)) => break, |
131 | Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))), |
132 | None => { |
133 | // None means the stream is no longer in a |
134 | // streaming state, we either finished it |
135 | // somehow, or the remote reset us. |
136 | return Poll::Ready(Err(crate::Error::new_body_write( |
137 | "send stream capacity unexpectedly closed" , |
138 | ))); |
139 | } |
140 | } |
141 | } |
142 | } else if let Poll::Ready(reason) = me |
143 | .body_tx |
144 | .poll_reset(cx) |
145 | .map_err(crate::Error::new_body_write)? |
146 | { |
147 | debug!("stream received RST_STREAM: {:?}" , reason); |
148 | return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason)))); |
149 | } |
150 | |
151 | match ready!(me.stream.as_mut().poll_frame(cx)) { |
152 | Some(Ok(frame)) => { |
153 | if frame.is_data() { |
154 | let chunk = frame.into_data().unwrap_or_else(|_| unreachable!()); |
155 | let is_eos = me.stream.is_end_stream(); |
156 | trace!( |
157 | "send body chunk: {} bytes, eos={}" , |
158 | chunk.remaining(), |
159 | is_eos, |
160 | ); |
161 | |
162 | let buf = SendBuf::Buf(chunk); |
163 | me.body_tx |
164 | .send_data(buf, is_eos) |
165 | .map_err(crate::Error::new_body_write)?; |
166 | |
167 | if is_eos { |
168 | return Poll::Ready(Ok(())); |
169 | } |
170 | } else if frame.is_trailers() { |
171 | // no more DATA, so give any capacity back |
172 | me.body_tx.reserve_capacity(0); |
173 | me.body_tx |
174 | .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!())) |
175 | .map_err(crate::Error::new_body_write)?; |
176 | return Poll::Ready(Ok(())); |
177 | } else { |
178 | trace!("discarding unknown frame" ); |
179 | // loop again |
180 | } |
181 | } |
182 | Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), |
183 | None => { |
184 | // no more frames means we're done here |
185 | // but at this point, we haven't sent an EOS DATA, or |
186 | // any trailers, so send an empty EOS DATA. |
187 | return Poll::Ready(me.body_tx.send_eos_frame()); |
188 | } |
189 | } |
190 | } |
191 | } |
192 | } |
193 | |
194 | trait SendStreamExt { |
195 | fn on_user_err<E>(&mut self, err: E) -> crate::Error |
196 | where |
197 | E: Into<Box<dyn std::error::Error + Send + Sync>>; |
198 | fn send_eos_frame(&mut self) -> crate::Result<()>; |
199 | } |
200 | |
201 | impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> { |
202 | fn on_user_err<E>(&mut self, err: E) -> crate::Error |
203 | where |
204 | E: Into<Box<dyn std::error::Error + Send + Sync>>, |
205 | { |
206 | let err: Error = crate::Error::new_user_body(cause:err); |
207 | debug!("send body user stream error: {}" , err); |
208 | self.send_reset(err.h2_reason()); |
209 | err |
210 | } |
211 | |
212 | fn send_eos_frame(&mut self) -> crate::Result<()> { |
213 | trace!("send body eos" ); |
214 | self.send_data(SendBuf::None, true) |
215 | .map_err(op:crate::Error::new_body_write) |
216 | } |
217 | } |
218 | |
219 | #[repr (usize)] |
220 | enum SendBuf<B> { |
221 | Buf(B), |
222 | Cursor(Cursor<Box<[u8]>>), |
223 | None, |
224 | } |
225 | |
226 | impl<B: Buf> Buf for SendBuf<B> { |
227 | #[inline ] |
228 | fn remaining(&self) -> usize { |
229 | match *self { |
230 | Self::Buf(ref b) => b.remaining(), |
231 | Self::Cursor(ref c) => Buf::remaining(c), |
232 | Self::None => 0, |
233 | } |
234 | } |
235 | |
236 | #[inline ] |
237 | fn chunk(&self) -> &[u8] { |
238 | match *self { |
239 | Self::Buf(ref b) => b.chunk(), |
240 | Self::Cursor(ref c) => c.chunk(), |
241 | Self::None => &[], |
242 | } |
243 | } |
244 | |
245 | #[inline ] |
246 | fn advance(&mut self, cnt: usize) { |
247 | match *self { |
248 | Self::Buf(ref mut b) => b.advance(cnt), |
249 | Self::Cursor(ref mut c) => c.advance(cnt), |
250 | Self::None => {} |
251 | } |
252 | } |
253 | |
254 | fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { |
255 | match *self { |
256 | Self::Buf(ref b) => b.chunks_vectored(dst), |
257 | Self::Cursor(ref c) => c.chunks_vectored(dst), |
258 | Self::None => 0, |
259 | } |
260 | } |
261 | } |
262 | |
263 | struct H2Upgraded<B> |
264 | where |
265 | B: Buf, |
266 | { |
267 | ping: Recorder, |
268 | send_stream: UpgradedSendStream<B>, |
269 | recv_stream: RecvStream, |
270 | buf: Bytes, |
271 | } |
272 | |
273 | impl<B> Read for H2Upgraded<B> |
274 | where |
275 | B: Buf, |
276 | { |
277 | fn poll_read( |
278 | mut self: Pin<&mut Self>, |
279 | cx: &mut Context<'_>, |
280 | mut read_buf: ReadBufCursor<'_>, |
281 | ) -> Poll<Result<(), std::io::Error>> { |
282 | if self.buf.is_empty() { |
283 | self.buf = loop { |
284 | match ready!(self.recv_stream.poll_data(cx)) { |
285 | None => return Poll::Ready(Ok(())), |
286 | Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => { |
287 | continue |
288 | } |
289 | Some(Ok(buf)) => { |
290 | self.ping.record_data(buf.len()); |
291 | break buf; |
292 | } |
293 | Some(Err(e)) => { |
294 | return Poll::Ready(match e.reason() { |
295 | Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()), |
296 | Some(Reason::STREAM_CLOSED) => { |
297 | Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)) |
298 | } |
299 | _ => Err(h2_to_io_error(e)), |
300 | }) |
301 | } |
302 | } |
303 | }; |
304 | } |
305 | let cnt = std::cmp::min(self.buf.len(), read_buf.remaining()); |
306 | read_buf.put_slice(&self.buf[..cnt]); |
307 | self.buf.advance(cnt); |
308 | let _ = self.recv_stream.flow_control().release_capacity(cnt); |
309 | Poll::Ready(Ok(())) |
310 | } |
311 | } |
312 | |
313 | impl<B> Write for H2Upgraded<B> |
314 | where |
315 | B: Buf, |
316 | { |
317 | fn poll_write( |
318 | mut self: Pin<&mut Self>, |
319 | cx: &mut Context<'_>, |
320 | buf: &[u8], |
321 | ) -> Poll<Result<usize, std::io::Error>> { |
322 | if buf.is_empty() { |
323 | return Poll::Ready(Ok(0)); |
324 | } |
325 | self.send_stream.reserve_capacity(buf.len()); |
326 | |
327 | // We ignore all errors returned by `poll_capacity` and `write`, as we |
328 | // will get the correct from `poll_reset` anyway. |
329 | let cnt = match ready!(self.send_stream.poll_capacity(cx)) { |
330 | None => Some(0), |
331 | Some(Ok(cnt)) => self |
332 | .send_stream |
333 | .write(&buf[..cnt], false) |
334 | .ok() |
335 | .map(|()| cnt), |
336 | Some(Err(_)) => None, |
337 | }; |
338 | |
339 | if let Some(cnt) = cnt { |
340 | return Poll::Ready(Ok(cnt)); |
341 | } |
342 | |
343 | Poll::Ready(Err(h2_to_io_error( |
344 | match ready!(self.send_stream.poll_reset(cx)) { |
345 | Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { |
346 | return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())) |
347 | } |
348 | Ok(reason) => reason.into(), |
349 | Err(e) => e, |
350 | }, |
351 | ))) |
352 | } |
353 | |
354 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { |
355 | Poll::Ready(Ok(())) |
356 | } |
357 | |
358 | fn poll_shutdown( |
359 | mut self: Pin<&mut Self>, |
360 | cx: &mut Context<'_>, |
361 | ) -> Poll<Result<(), std::io::Error>> { |
362 | if self.send_stream.write(&[], true).is_ok() { |
363 | return Poll::Ready(Ok(())); |
364 | } |
365 | |
366 | Poll::Ready(Err(h2_to_io_error( |
367 | match ready!(self.send_stream.poll_reset(cx)) { |
368 | Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())), |
369 | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { |
370 | return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())) |
371 | } |
372 | Ok(reason) => reason.into(), |
373 | Err(e) => e, |
374 | }, |
375 | ))) |
376 | } |
377 | } |
378 | |
379 | fn h2_to_io_error(e: h2::Error) -> std::io::Error { |
380 | if e.is_io() { |
381 | e.into_io().unwrap() |
382 | } else { |
383 | std::io::Error::new(kind:std::io::ErrorKind::Other, error:e) |
384 | } |
385 | } |
386 | |
387 | struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>); |
388 | |
389 | impl<B> UpgradedSendStream<B> |
390 | where |
391 | B: Buf, |
392 | { |
393 | unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self { |
394 | assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>()); |
395 | Self(mem::transmute(inner)) |
396 | } |
397 | |
398 | fn reserve_capacity(&mut self, cnt: usize) { |
399 | unsafe { self.as_inner_unchecked().reserve_capacity(cnt) } |
400 | } |
401 | |
402 | fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> { |
403 | unsafe { self.as_inner_unchecked().poll_capacity(cx) } |
404 | } |
405 | |
406 | fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> { |
407 | unsafe { self.as_inner_unchecked().poll_reset(cx) } |
408 | } |
409 | |
410 | fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), std::io::Error> { |
411 | let send_buf = SendBuf::Cursor(Cursor::new(buf.into())); |
412 | unsafe { |
413 | self.as_inner_unchecked() |
414 | .send_data(send_buf, end_of_stream) |
415 | .map_err(h2_to_io_error) |
416 | } |
417 | } |
418 | |
419 | unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> { |
420 | &mut *(&mut self.0 as *mut _ as *mut _) |
421 | } |
422 | } |
423 | |
424 | #[repr (transparent)] |
425 | struct Neutered<B> { |
426 | _inner: B, |
427 | impossible: Impossible, |
428 | } |
429 | |
430 | enum Impossible {} |
431 | |
432 | unsafe impl<B> Send for Neutered<B> {} |
433 | |
434 | impl<B> Buf for Neutered<B> { |
435 | fn remaining(&self) -> usize { |
436 | match self.impossible {} |
437 | } |
438 | |
439 | fn chunk(&self) -> &[u8] { |
440 | match self.impossible {} |
441 | } |
442 | |
443 | fn advance(&mut self, _cnt: usize) { |
444 | match self.impossible {} |
445 | } |
446 | } |
447 | |