| 1 | use std::fmt; |
| 2 | use std::mem::MaybeUninit; |
| 3 | use std::ops::DerefMut; |
| 4 | use std::pin::Pin; |
| 5 | use std::task::{Context, Poll}; |
| 6 | |
| 7 | // New IO traits? What?! Why, are you bonkers? |
| 8 | // |
| 9 | // I mean, yes, probably. But, here's the goals: |
| 10 | // |
| 11 | // 1. Supports poll-based IO operations. |
| 12 | // 2. Opt-in vectored IO. |
| 13 | // 3. Can use an optional buffer pool. |
| 14 | // 4. Able to add completion-based (uring) IO eventually. |
| 15 | // |
| 16 | // Frankly, the last point is the entire reason we're doing this. We want to |
| 17 | // have forwards-compatibility with an eventually stable io-uring runtime. We |
| 18 | // don't need that to work right away. But it must be possible to add in here |
| 19 | // without breaking hyper 1.0. |
| 20 | // |
| 21 | // While in here, if there's small tweaks to poll_read or poll_write that would |
| 22 | // allow even the "slow" path to be faster, such as if someone didn't remember |
| 23 | // to forward along an `is_completion` call. |
| 24 | |
| 25 | /// Reads bytes from a source. |
| 26 | /// |
| 27 | /// This trait is similar to `std::io::Read`, but supports asynchronous reads. |
| 28 | pub trait Read { |
| 29 | /// Attempts to read bytes into the `buf`. |
| 30 | /// |
| 31 | /// On success, returns `Poll::Ready(Ok(()))` and places data in the |
| 32 | /// unfilled portion of `buf`. If no data was read (`buf.remaining()` is |
| 33 | /// unchanged), it implies that EOF has been reached. |
| 34 | /// |
| 35 | /// If no data is available for reading, the method returns `Poll::Pending` |
| 36 | /// and arranges for the current task (via `cx.waker()`) to receive a |
| 37 | /// notification when the object becomes readable or is closed. |
| 38 | fn poll_read( |
| 39 | self: Pin<&mut Self>, |
| 40 | cx: &mut Context<'_>, |
| 41 | buf: ReadBufCursor<'_>, |
| 42 | ) -> Poll<Result<(), std::io::Error>>; |
| 43 | } |
| 44 | |
| 45 | /// Write bytes asynchronously. |
| 46 | /// |
| 47 | /// This trait is similar to `std::io::Write`, but for asynchronous writes. |
| 48 | pub trait Write { |
| 49 | /// Attempt to write bytes from `buf` into the destination. |
| 50 | /// |
| 51 | /// On success, returns `Poll::Ready(Ok(num_bytes_written)))`. If |
| 52 | /// successful, it must be guaranteed that `n <= buf.len()`. A return value |
| 53 | /// of `0` means that the underlying object is no longer able to accept |
| 54 | /// bytes, or that the provided buffer is empty. |
| 55 | /// |
| 56 | /// If the object is not ready for writing, the method returns |
| 57 | /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to |
| 58 | /// receive a notification when the object becomes writable or is closed. |
| 59 | fn poll_write( |
| 60 | self: Pin<&mut Self>, |
| 61 | cx: &mut Context<'_>, |
| 62 | buf: &[u8], |
| 63 | ) -> Poll<Result<usize, std::io::Error>>; |
| 64 | |
| 65 | /// Attempts to flush the object. |
| 66 | /// |
| 67 | /// On success, returns `Poll::Ready(Ok(()))`. |
| 68 | /// |
| 69 | /// If flushing cannot immediately complete, this method returns |
| 70 | /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to |
| 71 | /// receive a notification when the object can make progress. |
| 72 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>>; |
| 73 | |
| 74 | /// Attempts to shut down this writer. |
| 75 | fn poll_shutdown( |
| 76 | self: Pin<&mut Self>, |
| 77 | cx: &mut Context<'_>, |
| 78 | ) -> Poll<Result<(), std::io::Error>>; |
| 79 | |
| 80 | /// Returns whether this writer has an efficient `poll_write_vectored` |
| 81 | /// implementation. |
| 82 | /// |
| 83 | /// The default implementation returns `false`. |
| 84 | fn is_write_vectored(&self) -> bool { |
| 85 | false |
| 86 | } |
| 87 | |
| 88 | /// Like `poll_write`, except that it writes from a slice of buffers. |
| 89 | fn poll_write_vectored( |
| 90 | self: Pin<&mut Self>, |
| 91 | cx: &mut Context<'_>, |
| 92 | bufs: &[std::io::IoSlice<'_>], |
| 93 | ) -> Poll<Result<usize, std::io::Error>> { |
| 94 | let buf = bufs |
| 95 | .iter() |
| 96 | .find(|b| !b.is_empty()) |
| 97 | .map_or(&[][..], |b| &**b); |
| 98 | self.poll_write(cx, buf) |
| 99 | } |
| 100 | } |
| 101 | |
| 102 | /// A wrapper around a byte buffer that is incrementally filled and initialized. |
| 103 | /// |
| 104 | /// This type is a sort of "double cursor". It tracks three regions in the |
| 105 | /// buffer: a region at the beginning of the buffer that has been logically |
| 106 | /// filled with data, a region that has been initialized at some point but not |
| 107 | /// yet logically filled, and a region at the end that may be uninitialized. |
| 108 | /// The filled region is guaranteed to be a subset of the initialized region. |
| 109 | /// |
| 110 | /// In summary, the contents of the buffer can be visualized as: |
| 111 | /// |
| 112 | /// ```not_rust |
| 113 | /// [ capacity ] |
| 114 | /// [ filled | unfilled ] |
| 115 | /// [ initialized | uninitialized ] |
| 116 | /// ``` |
| 117 | /// |
| 118 | /// It is undefined behavior to de-initialize any bytes from the uninitialized |
| 119 | /// region, since it is merely unknown whether this region is uninitialized or |
| 120 | /// not, and if part of it turns out to be initialized, it must stay initialized. |
| 121 | pub struct ReadBuf<'a> { |
| 122 | raw: &'a mut [MaybeUninit<u8>], |
| 123 | filled: usize, |
| 124 | init: usize, |
| 125 | } |
| 126 | |
| 127 | /// The cursor part of a [`ReadBuf`]. |
| 128 | /// |
| 129 | /// This is created by calling `ReadBuf::unfilled()`. |
| 130 | #[derive (Debug)] |
| 131 | pub struct ReadBufCursor<'a> { |
| 132 | buf: &'a mut ReadBuf<'a>, |
| 133 | } |
| 134 | |
| 135 | impl<'data> ReadBuf<'data> { |
| 136 | /// Create a new `ReadBuf` with a slice of initialized bytes. |
| 137 | #[inline ] |
| 138 | pub fn new(raw: &'data mut [u8]) -> Self { |
| 139 | let len = raw.len(); |
| 140 | Self { |
| 141 | // SAFETY: We never de-init the bytes ourselves. |
| 142 | raw: unsafe { &mut *(raw as *mut [u8] as *mut [MaybeUninit<u8>]) }, |
| 143 | filled: 0, |
| 144 | init: len, |
| 145 | } |
| 146 | } |
| 147 | |
| 148 | /// Create a new `ReadBuf` with a slice of uninitialized bytes. |
| 149 | #[inline ] |
| 150 | pub fn uninit(raw: &'data mut [MaybeUninit<u8>]) -> Self { |
| 151 | Self { |
| 152 | raw, |
| 153 | filled: 0, |
| 154 | init: 0, |
| 155 | } |
| 156 | } |
| 157 | |
| 158 | /// Get a slice of the buffer that has been filled in with bytes. |
| 159 | #[inline ] |
| 160 | pub fn filled(&self) -> &[u8] { |
| 161 | // SAFETY: We only slice the filled part of the buffer, which is always valid |
| 162 | unsafe { &*(&self.raw[0..self.filled] as *const [MaybeUninit<u8>] as *const [u8]) } |
| 163 | } |
| 164 | |
| 165 | /// Get a cursor to the unfilled portion of the buffer. |
| 166 | #[inline ] |
| 167 | pub fn unfilled<'cursor>(&'cursor mut self) -> ReadBufCursor<'cursor> { |
| 168 | ReadBufCursor { |
| 169 | // SAFETY: self.buf is never re-assigned, so its safe to narrow |
| 170 | // the lifetime. |
| 171 | buf: unsafe { |
| 172 | std::mem::transmute::<&'cursor mut ReadBuf<'data>, &'cursor mut ReadBuf<'cursor>>( |
| 173 | self, |
| 174 | ) |
| 175 | }, |
| 176 | } |
| 177 | } |
| 178 | |
| 179 | #[inline ] |
| 180 | #[cfg (all(any(feature = "client" , feature = "server" ), feature = "http2" ))] |
| 181 | pub(crate) unsafe fn set_init(&mut self, n: usize) { |
| 182 | self.init = self.init.max(n); |
| 183 | } |
| 184 | |
| 185 | #[inline ] |
| 186 | #[cfg (all(any(feature = "client" , feature = "server" ), feature = "http2" ))] |
| 187 | pub(crate) unsafe fn set_filled(&mut self, n: usize) { |
| 188 | self.filled = self.filled.max(n); |
| 189 | } |
| 190 | |
| 191 | #[inline ] |
| 192 | #[cfg (all(any(feature = "client" , feature = "server" ), feature = "http2" ))] |
| 193 | pub(crate) fn len(&self) -> usize { |
| 194 | self.filled |
| 195 | } |
| 196 | |
| 197 | #[inline ] |
| 198 | #[cfg (all(any(feature = "client" , feature = "server" ), feature = "http2" ))] |
| 199 | pub(crate) fn init_len(&self) -> usize { |
| 200 | self.init |
| 201 | } |
| 202 | |
| 203 | #[inline ] |
| 204 | fn remaining(&self) -> usize { |
| 205 | self.capacity() - self.filled |
| 206 | } |
| 207 | |
| 208 | #[inline ] |
| 209 | fn capacity(&self) -> usize { |
| 210 | self.raw.len() |
| 211 | } |
| 212 | } |
| 213 | |
| 214 | impl fmt::Debug for ReadBuf<'_> { |
| 215 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 216 | f&mut DebugStruct<'_, '_>.debug_struct("ReadBuf" ) |
| 217 | .field("filled" , &self.filled) |
| 218 | .field("init" , &self.init) |
| 219 | .field(name:"capacity" , &self.capacity()) |
| 220 | .finish() |
| 221 | } |
| 222 | } |
| 223 | |
| 224 | impl ReadBufCursor<'_> { |
| 225 | /// Access the unfilled part of the buffer. |
| 226 | /// |
| 227 | /// # Safety |
| 228 | /// |
| 229 | /// The caller must not uninitialize any bytes that may have been |
| 230 | /// initialized before. |
| 231 | #[inline ] |
| 232 | pub unsafe fn as_mut(&mut self) -> &mut [MaybeUninit<u8>] { |
| 233 | &mut self.buf.raw[self.buf.filled..] |
| 234 | } |
| 235 | |
| 236 | /// Advance the `filled` cursor by `n` bytes. |
| 237 | /// |
| 238 | /// # Safety |
| 239 | /// |
| 240 | /// The caller must take care that `n` more bytes have been initialized. |
| 241 | #[inline ] |
| 242 | pub unsafe fn advance(&mut self, n: usize) { |
| 243 | self.buf.filled = self.buf.filled.checked_add(n).expect("overflow" ); |
| 244 | self.buf.init = self.buf.filled.max(self.buf.init); |
| 245 | } |
| 246 | |
| 247 | /// Returns the number of bytes that can be written from the current |
| 248 | /// position until the end of the buffer is reached. |
| 249 | /// |
| 250 | /// This value is equal to the length of the slice returned by `as_mut()``. |
| 251 | #[inline ] |
| 252 | pub fn remaining(&self) -> usize { |
| 253 | self.buf.remaining() |
| 254 | } |
| 255 | |
| 256 | /// Transfer bytes into `self`` from `src` and advance the cursor |
| 257 | /// by the number of bytes written. |
| 258 | /// |
| 259 | /// # Panics |
| 260 | /// |
| 261 | /// `self` must have enough remaining capacity to contain all of `src`. |
| 262 | #[inline ] |
| 263 | pub fn put_slice(&mut self, src: &[u8]) { |
| 264 | assert!( |
| 265 | self.buf.remaining() >= src.len(), |
| 266 | "src.len() must fit in remaining()" |
| 267 | ); |
| 268 | |
| 269 | let amt = src.len(); |
| 270 | // Cannot overflow, asserted above |
| 271 | let end = self.buf.filled + amt; |
| 272 | |
| 273 | // Safety: the length is asserted above |
| 274 | unsafe { |
| 275 | self.buf.raw[self.buf.filled..end] |
| 276 | .as_mut_ptr() |
| 277 | .cast::<u8>() |
| 278 | .copy_from_nonoverlapping(src.as_ptr(), amt); |
| 279 | } |
| 280 | |
| 281 | if self.buf.init < end { |
| 282 | self.buf.init = end; |
| 283 | } |
| 284 | self.buf.filled = end; |
| 285 | } |
| 286 | } |
| 287 | |
| 288 | macro_rules! deref_async_read { |
| 289 | () => { |
| 290 | fn poll_read( |
| 291 | mut self: Pin<&mut Self>, |
| 292 | cx: &mut Context<'_>, |
| 293 | buf: ReadBufCursor<'_>, |
| 294 | ) -> Poll<std::io::Result<()>> { |
| 295 | Pin::new(&mut **self).poll_read(cx, buf) |
| 296 | } |
| 297 | }; |
| 298 | } |
| 299 | |
| 300 | impl<T: ?Sized + Read + Unpin> Read for Box<T> { |
| 301 | deref_async_read!(); |
| 302 | } |
| 303 | |
| 304 | impl<T: ?Sized + Read + Unpin> Read for &mut T { |
| 305 | deref_async_read!(); |
| 306 | } |
| 307 | |
| 308 | impl<P> Read for Pin<P> |
| 309 | where |
| 310 | P: DerefMut, |
| 311 | P::Target: Read, |
| 312 | { |
| 313 | fn poll_read( |
| 314 | self: Pin<&mut Self>, |
| 315 | cx: &mut Context<'_>, |
| 316 | buf: ReadBufCursor<'_>, |
| 317 | ) -> Poll<std::io::Result<()>> { |
| 318 | pin_as_deref_mut(self).poll_read(cx, buf) |
| 319 | } |
| 320 | } |
| 321 | |
| 322 | macro_rules! deref_async_write { |
| 323 | () => { |
| 324 | fn poll_write( |
| 325 | mut self: Pin<&mut Self>, |
| 326 | cx: &mut Context<'_>, |
| 327 | buf: &[u8], |
| 328 | ) -> Poll<std::io::Result<usize>> { |
| 329 | Pin::new(&mut **self).poll_write(cx, buf) |
| 330 | } |
| 331 | |
| 332 | fn poll_write_vectored( |
| 333 | mut self: Pin<&mut Self>, |
| 334 | cx: &mut Context<'_>, |
| 335 | bufs: &[std::io::IoSlice<'_>], |
| 336 | ) -> Poll<std::io::Result<usize>> { |
| 337 | Pin::new(&mut **self).poll_write_vectored(cx, bufs) |
| 338 | } |
| 339 | |
| 340 | fn is_write_vectored(&self) -> bool { |
| 341 | (**self).is_write_vectored() |
| 342 | } |
| 343 | |
| 344 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { |
| 345 | Pin::new(&mut **self).poll_flush(cx) |
| 346 | } |
| 347 | |
| 348 | fn poll_shutdown( |
| 349 | mut self: Pin<&mut Self>, |
| 350 | cx: &mut Context<'_>, |
| 351 | ) -> Poll<std::io::Result<()>> { |
| 352 | Pin::new(&mut **self).poll_shutdown(cx) |
| 353 | } |
| 354 | }; |
| 355 | } |
| 356 | |
| 357 | impl<T: ?Sized + Write + Unpin> Write for Box<T> { |
| 358 | deref_async_write!(); |
| 359 | } |
| 360 | |
| 361 | impl<T: ?Sized + Write + Unpin> Write for &mut T { |
| 362 | deref_async_write!(); |
| 363 | } |
| 364 | |
| 365 | impl<P> Write for Pin<P> |
| 366 | where |
| 367 | P: DerefMut, |
| 368 | P::Target: Write, |
| 369 | { |
| 370 | fn poll_write( |
| 371 | self: Pin<&mut Self>, |
| 372 | cx: &mut Context<'_>, |
| 373 | buf: &[u8], |
| 374 | ) -> Poll<std::io::Result<usize>> { |
| 375 | pin_as_deref_mut(self).poll_write(cx, buf) |
| 376 | } |
| 377 | |
| 378 | fn poll_write_vectored( |
| 379 | self: Pin<&mut Self>, |
| 380 | cx: &mut Context<'_>, |
| 381 | bufs: &[std::io::IoSlice<'_>], |
| 382 | ) -> Poll<std::io::Result<usize>> { |
| 383 | pin_as_deref_mut(self).poll_write_vectored(cx, bufs) |
| 384 | } |
| 385 | |
| 386 | fn is_write_vectored(&self) -> bool { |
| 387 | (**self).is_write_vectored() |
| 388 | } |
| 389 | |
| 390 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { |
| 391 | pin_as_deref_mut(self).poll_flush(cx) |
| 392 | } |
| 393 | |
| 394 | fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { |
| 395 | pin_as_deref_mut(self).poll_shutdown(cx) |
| 396 | } |
| 397 | } |
| 398 | |
| 399 | /// Polyfill for Pin::as_deref_mut() |
| 400 | /// TODO: use Pin::as_deref_mut() instead once stabilized |
| 401 | fn pin_as_deref_mut<P: DerefMut>(pin: Pin<&mut Pin<P>>) -> Pin<&mut P::Target> { |
| 402 | // SAFETY: we go directly from Pin<&mut Pin<P>> to Pin<&mut P::Target>, without moving or |
| 403 | // giving out the &mut Pin<P> in the process. See Pin::as_deref_mut() for more detail. |
| 404 | unsafe { pin.get_unchecked_mut() }.as_mut() |
| 405 | } |
| 406 | |