| 1 | //! HTTP Upgrades |
| 2 | //! |
| 3 | //! This module deals with managing [HTTP Upgrades][mdn] in hyper. Since |
| 4 | //! several concepts in HTTP allow for first talking HTTP, and then converting |
| 5 | //! to a different protocol, this module conflates them into a single API. |
| 6 | //! Those include: |
| 7 | //! |
| 8 | //! - HTTP/1.1 Upgrades |
| 9 | //! - HTTP `CONNECT` |
| 10 | //! |
| 11 | //! You are responsible for any other pre-requisites to establish an upgrade, |
| 12 | //! such as sending the appropriate headers, methods, and status codes. You can |
| 13 | //! then use [`on`][] to grab a `Future` which will resolve to the upgraded |
| 14 | //! connection object, or an error if the upgrade fails. |
| 15 | //! |
| 16 | //! [mdn]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism |
| 17 | //! |
| 18 | //! # Client |
| 19 | //! |
| 20 | //! Sending an HTTP upgrade from the [`client`](super::client) involves setting |
| 21 | //! either the appropriate method, if wanting to `CONNECT`, or headers such as |
| 22 | //! `Upgrade` and `Connection`, on the `http::Request`. Once receiving the |
| 23 | //! `http::Response` back, you must check for the specific information that the |
| 24 | //! upgrade is agreed upon by the server (such as a `101` status code), and then |
| 25 | //! get the `Future` from the `Response`. |
| 26 | //! |
| 27 | //! # Server |
| 28 | //! |
| 29 | //! Receiving upgrade requests in a server requires you to check the relevant |
| 30 | //! headers in a `Request`, and if an upgrade should be done, you then send the |
| 31 | //! corresponding headers in a response. To then wait for hyper to finish the |
| 32 | //! upgrade, you call `on()` with the `Request`, and then can spawn a task |
| 33 | //! awaiting it. |
| 34 | //! |
| 35 | //! # Example |
| 36 | //! |
| 37 | //! See [this example][example] showing how upgrades work with both |
| 38 | //! Clients and Servers. |
| 39 | //! |
| 40 | //! [example]: https://github.com/hyperium/hyper/blob/master/examples/upgrades.rs |
| 41 | |
| 42 | use std::any::TypeId; |
| 43 | use std::error::Error as StdError; |
| 44 | use std::fmt; |
| 45 | use std::future::Future; |
| 46 | use std::io; |
| 47 | use std::pin::Pin; |
| 48 | use std::sync::{Arc, Mutex}; |
| 49 | use std::task::{Context, Poll}; |
| 50 | |
| 51 | use crate::rt::{Read, ReadBufCursor, Write}; |
| 52 | use bytes::Bytes; |
| 53 | use tokio::sync::oneshot; |
| 54 | |
| 55 | use crate::common::io::Rewind; |
| 56 | |
| 57 | /// An upgraded HTTP connection. |
| 58 | /// |
| 59 | /// This type holds a trait object internally of the original IO that |
| 60 | /// was used to speak HTTP before the upgrade. It can be used directly |
| 61 | /// as a [`Read`] or [`Write`] for convenience. |
| 62 | /// |
| 63 | /// Alternatively, if the exact type is known, this can be deconstructed |
| 64 | /// into its parts. |
| 65 | pub struct Upgraded { |
| 66 | io: Rewind<Box<dyn Io + Send>>, |
| 67 | } |
| 68 | |
| 69 | /// A future for a possible HTTP upgrade. |
| 70 | /// |
| 71 | /// If no upgrade was available, or it doesn't succeed, yields an `Error`. |
| 72 | #[derive (Clone)] |
| 73 | pub struct OnUpgrade { |
| 74 | rx: Option<Arc<Mutex<oneshot::Receiver<crate::Result<Upgraded>>>>>, |
| 75 | } |
| 76 | |
| 77 | /// The deconstructed parts of an [`Upgraded`] type. |
| 78 | /// |
| 79 | /// Includes the original IO type, and a read buffer of bytes that the |
| 80 | /// HTTP state machine may have already read before completing an upgrade. |
| 81 | #[derive (Debug)] |
| 82 | #[non_exhaustive ] |
| 83 | pub struct Parts<T> { |
| 84 | /// The original IO object used before the upgrade. |
| 85 | pub io: T, |
| 86 | /// A buffer of bytes that have been read but not processed as HTTP. |
| 87 | /// |
| 88 | /// For instance, if the `Connection` is used for an HTTP upgrade request, |
| 89 | /// it is possible the server sent back the first bytes of the new protocol |
| 90 | /// along with the response upgrade. |
| 91 | /// |
| 92 | /// You will want to check for any existing bytes if you plan to continue |
| 93 | /// communicating on the IO object. |
| 94 | pub read_buf: Bytes, |
| 95 | } |
| 96 | |
| 97 | /// Gets a pending HTTP upgrade from this message. |
| 98 | /// |
| 99 | /// This can be called on the following types: |
| 100 | /// |
| 101 | /// - `http::Request<B>` |
| 102 | /// - `http::Response<B>` |
| 103 | /// - `&mut http::Request<B>` |
| 104 | /// - `&mut http::Response<B>` |
| 105 | pub fn on<T: sealed::CanUpgrade>(msg: T) -> OnUpgrade { |
| 106 | msg.on_upgrade() |
| 107 | } |
| 108 | |
| 109 | #[cfg (all( |
| 110 | any(feature = "client" , feature = "server" ), |
| 111 | any(feature = "http1" , feature = "http2" ), |
| 112 | ))] |
| 113 | pub(super) struct Pending { |
| 114 | tx: oneshot::Sender<crate::Result<Upgraded>>, |
| 115 | } |
| 116 | |
| 117 | #[cfg (all( |
| 118 | any(feature = "client" , feature = "server" ), |
| 119 | any(feature = "http1" , feature = "http2" ), |
| 120 | ))] |
| 121 | pub(super) fn pending() -> (Pending, OnUpgrade) { |
| 122 | let (tx: Sender>, rx: Receiver>) = oneshot::channel(); |
| 123 | ( |
| 124 | Pending { tx }, |
| 125 | OnUpgrade { |
| 126 | rx: Some(Arc::new(data:Mutex::new(rx))), |
| 127 | }, |
| 128 | ) |
| 129 | } |
| 130 | |
| 131 | // ===== impl Upgraded ===== |
| 132 | |
| 133 | impl Upgraded { |
| 134 | #[cfg (all( |
| 135 | any(feature = "client" , feature = "server" ), |
| 136 | any(feature = "http1" , feature = "http2" ) |
| 137 | ))] |
| 138 | pub(super) fn new<T>(io: T, read_buf: Bytes) -> Self |
| 139 | where |
| 140 | T: Read + Write + Unpin + Send + 'static, |
| 141 | { |
| 142 | Upgraded { |
| 143 | io: Rewind::new_buffered(Box::new(io), read_buf), |
| 144 | } |
| 145 | } |
| 146 | |
| 147 | /// Tries to downcast the internal trait object to the type passed. |
| 148 | /// |
| 149 | /// On success, returns the downcasted parts. On error, returns the |
| 150 | /// `Upgraded` back. |
| 151 | pub fn downcast<T: Read + Write + Unpin + 'static>(self) -> Result<Parts<T>, Self> { |
| 152 | let (io, buf) = self.io.into_inner(); |
| 153 | match io.__hyper_downcast() { |
| 154 | Ok(t) => Ok(Parts { |
| 155 | io: *t, |
| 156 | read_buf: buf, |
| 157 | }), |
| 158 | Err(io) => Err(Upgraded { |
| 159 | io: Rewind::new_buffered(io, buf), |
| 160 | }), |
| 161 | } |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | impl Read for Upgraded { |
| 166 | fn poll_read( |
| 167 | mut self: Pin<&mut Self>, |
| 168 | cx: &mut Context<'_>, |
| 169 | buf: ReadBufCursor<'_>, |
| 170 | ) -> Poll<io::Result<()>> { |
| 171 | Pin::new(&mut self.io).poll_read(cx, buf) |
| 172 | } |
| 173 | } |
| 174 | |
| 175 | impl Write for Upgraded { |
| 176 | fn poll_write( |
| 177 | mut self: Pin<&mut Self>, |
| 178 | cx: &mut Context<'_>, |
| 179 | buf: &[u8], |
| 180 | ) -> Poll<io::Result<usize>> { |
| 181 | Pin::new(&mut self.io).poll_write(cx, buf) |
| 182 | } |
| 183 | |
| 184 | fn poll_write_vectored( |
| 185 | mut self: Pin<&mut Self>, |
| 186 | cx: &mut Context<'_>, |
| 187 | bufs: &[io::IoSlice<'_>], |
| 188 | ) -> Poll<io::Result<usize>> { |
| 189 | Pin::new(&mut self.io).poll_write_vectored(cx, bufs) |
| 190 | } |
| 191 | |
| 192 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 193 | Pin::new(&mut self.io).poll_flush(cx) |
| 194 | } |
| 195 | |
| 196 | fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 197 | Pin::new(&mut self.io).poll_shutdown(cx) |
| 198 | } |
| 199 | |
| 200 | fn is_write_vectored(&self) -> bool { |
| 201 | self.io.is_write_vectored() |
| 202 | } |
| 203 | } |
| 204 | |
| 205 | impl fmt::Debug for Upgraded { |
| 206 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 207 | f.debug_struct(name:"Upgraded" ).finish() |
| 208 | } |
| 209 | } |
| 210 | |
| 211 | // ===== impl OnUpgrade ===== |
| 212 | |
| 213 | impl OnUpgrade { |
| 214 | pub(super) fn none() -> Self { |
| 215 | OnUpgrade { rx: None } |
| 216 | } |
| 217 | |
| 218 | #[cfg (all(any(feature = "client" , feature = "server" ), feature = "http1" ))] |
| 219 | pub(super) fn is_none(&self) -> bool { |
| 220 | self.rx.is_none() |
| 221 | } |
| 222 | } |
| 223 | |
| 224 | impl Future for OnUpgrade { |
| 225 | type Output = Result<Upgraded, crate::Error>; |
| 226 | |
| 227 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 228 | match self.rx { |
| 229 | Some(ref rx: &Arc>>>) => PinPoll, …>>::new(&mut *rx.lock().unwrap()) |
| 230 | .poll(cx) |
| 231 | .map(|res: Result, …>| match res { |
| 232 | Ok(Ok(upgraded: Upgraded)) => Ok(upgraded), |
| 233 | Ok(Err(err: Error)) => Err(err), |
| 234 | Err(_oneshot_canceled: RecvError) => { |
| 235 | Err(crate::Error::new_canceled().with(cause:UpgradeExpected)) |
| 236 | } |
| 237 | }), |
| 238 | None => Poll::Ready(Err(crate::Error::new_user_no_upgrade())), |
| 239 | } |
| 240 | } |
| 241 | } |
| 242 | |
| 243 | impl fmt::Debug for OnUpgrade { |
| 244 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 245 | f.debug_struct(name:"OnUpgrade" ).finish() |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | // ===== impl Pending ===== |
| 250 | |
| 251 | #[cfg (all( |
| 252 | any(feature = "client" , feature = "server" ), |
| 253 | any(feature = "http1" , feature = "http2" ) |
| 254 | ))] |
| 255 | impl Pending { |
| 256 | pub(super) fn fulfill(self, upgraded: Upgraded) { |
| 257 | trace!("pending upgrade fulfill" ); |
| 258 | let _ = self.tx.send(Ok(upgraded)); |
| 259 | } |
| 260 | |
| 261 | #[cfg (feature = "http1" )] |
| 262 | /// Don't fulfill the pending Upgrade, but instead signal that |
| 263 | /// upgrades are handled manually. |
| 264 | pub(super) fn manual(self) { |
| 265 | #[cfg (any(feature = "http1" , feature = "http2" ))] |
| 266 | trace!("pending upgrade handled manually" ); |
| 267 | let _ = self.tx.send(Err(crate::Error::new_user_manual_upgrade())); |
| 268 | } |
| 269 | } |
| 270 | |
| 271 | // ===== impl UpgradeExpected ===== |
| 272 | |
| 273 | /// Error cause returned when an upgrade was expected but canceled |
| 274 | /// for whatever reason. |
| 275 | /// |
| 276 | /// This likely means the actual `Conn` future wasn't polled and upgraded. |
| 277 | #[derive (Debug)] |
| 278 | struct UpgradeExpected; |
| 279 | |
| 280 | impl fmt::Display for UpgradeExpected { |
| 281 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 282 | f.write_str(data:"upgrade expected but not completed" ) |
| 283 | } |
| 284 | } |
| 285 | |
| 286 | impl StdError for UpgradeExpected {} |
| 287 | |
| 288 | // ===== impl Io ===== |
| 289 | |
| 290 | pub(super) trait Io: Read + Write + Unpin + 'static { |
| 291 | fn __hyper_type_id(&self) -> TypeId { |
| 292 | TypeId::of::<Self>() |
| 293 | } |
| 294 | } |
| 295 | |
| 296 | impl<T: Read + Write + Unpin + 'static> Io for T {} |
| 297 | |
| 298 | impl dyn Io + Send { |
| 299 | fn __hyper_is<T: Io>(&self) -> bool { |
| 300 | let t: TypeId = TypeId::of::<T>(); |
| 301 | self.__hyper_type_id() == t |
| 302 | } |
| 303 | |
| 304 | fn __hyper_downcast<T: Io>(self: Box<Self>) -> Result<Box<T>, Box<Self>> { |
| 305 | if self.__hyper_is::<T>() { |
| 306 | // Taken from `std::error::Error::downcast()`. |
| 307 | unsafe { |
| 308 | let raw: *mut dyn Io = Box::into_raw(self); |
| 309 | Ok(Box::from_raw(raw as *mut T)) |
| 310 | } |
| 311 | } else { |
| 312 | Err(self) |
| 313 | } |
| 314 | } |
| 315 | } |
| 316 | |
| 317 | mod sealed { |
| 318 | use super::OnUpgrade; |
| 319 | |
| 320 | pub trait CanUpgrade { |
| 321 | fn on_upgrade(self) -> OnUpgrade; |
| 322 | } |
| 323 | |
| 324 | impl<B> CanUpgrade for http::Request<B> { |
| 325 | fn on_upgrade(mut self) -> OnUpgrade { |
| 326 | self.extensions_mut() |
| 327 | .remove::<OnUpgrade>() |
| 328 | .unwrap_or_else(OnUpgrade::none) |
| 329 | } |
| 330 | } |
| 331 | |
| 332 | impl<B> CanUpgrade for &'_ mut http::Request<B> { |
| 333 | fn on_upgrade(self) -> OnUpgrade { |
| 334 | self.extensions_mut() |
| 335 | .remove::<OnUpgrade>() |
| 336 | .unwrap_or_else(OnUpgrade::none) |
| 337 | } |
| 338 | } |
| 339 | |
| 340 | impl<B> CanUpgrade for http::Response<B> { |
| 341 | fn on_upgrade(mut self) -> OnUpgrade { |
| 342 | self.extensions_mut() |
| 343 | .remove::<OnUpgrade>() |
| 344 | .unwrap_or_else(OnUpgrade::none) |
| 345 | } |
| 346 | } |
| 347 | |
| 348 | impl<B> CanUpgrade for &'_ mut http::Response<B> { |
| 349 | fn on_upgrade(self) -> OnUpgrade { |
| 350 | self.extensions_mut() |
| 351 | .remove::<OnUpgrade>() |
| 352 | .unwrap_or_else(OnUpgrade::none) |
| 353 | } |
| 354 | } |
| 355 | } |
| 356 | |
| 357 | #[cfg (all( |
| 358 | any(feature = "client" , feature = "server" ), |
| 359 | any(feature = "http1" , feature = "http2" ), |
| 360 | ))] |
| 361 | #[cfg (test)] |
| 362 | mod tests { |
| 363 | use super::*; |
| 364 | |
| 365 | #[test ] |
| 366 | fn upgraded_downcast() { |
| 367 | let upgraded = Upgraded::new(Mock, Bytes::new()); |
| 368 | |
| 369 | let upgraded = upgraded |
| 370 | .downcast::<crate::common::io::Compat<std::io::Cursor<Vec<u8>>>>() |
| 371 | .unwrap_err(); |
| 372 | |
| 373 | upgraded.downcast::<Mock>().unwrap(); |
| 374 | } |
| 375 | |
| 376 | // TODO: replace with tokio_test::io when it can test write_buf |
| 377 | struct Mock; |
| 378 | |
| 379 | impl Read for Mock { |
| 380 | fn poll_read( |
| 381 | self: Pin<&mut Self>, |
| 382 | _cx: &mut Context<'_>, |
| 383 | _buf: ReadBufCursor<'_>, |
| 384 | ) -> Poll<io::Result<()>> { |
| 385 | unreachable!("Mock::poll_read" ) |
| 386 | } |
| 387 | } |
| 388 | |
| 389 | impl Write for Mock { |
| 390 | fn poll_write( |
| 391 | self: Pin<&mut Self>, |
| 392 | _: &mut Context<'_>, |
| 393 | buf: &[u8], |
| 394 | ) -> Poll<io::Result<usize>> { |
| 395 | // panic!("poll_write shouldn't be called"); |
| 396 | Poll::Ready(Ok(buf.len())) |
| 397 | } |
| 398 | |
| 399 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 400 | unreachable!("Mock::poll_flush" ) |
| 401 | } |
| 402 | |
| 403 | fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 404 | unreachable!("Mock::poll_shutdown" ) |
| 405 | } |
| 406 | } |
| 407 | } |
| 408 | |