1 | use std::fmt; |
2 | use std::mem::MaybeUninit; |
3 | use std::ops::DerefMut; |
4 | use std::pin::Pin; |
5 | use std::task::{Context, Poll}; |
6 | |
7 | // New IO traits? What?! Why, are you bonkers? |
8 | // |
9 | // I mean, yes, probably. But, here's the goals: |
10 | // |
11 | // 1. Supports poll-based IO operations. |
12 | // 2. Opt-in vectored IO. |
13 | // 3. Can use an optional buffer pool. |
14 | // 4. Able to add completion-based (uring) IO eventually. |
15 | // |
16 | // Frankly, the last point is the entire reason we're doing this. We want to |
17 | // have forwards-compatibility with an eventually stable io-uring runtime. We |
18 | // don't need that to work right away. But it must be possible to add in here |
19 | // without breaking hyper 1.0. |
20 | // |
21 | // While in here, if there's small tweaks to poll_read or poll_write that would |
22 | // allow even the "slow" path to be faster, such as if someone didn't remember |
23 | // to forward along an `is_completion` call. |
24 | |
25 | /// Reads bytes from a source. |
26 | /// |
27 | /// This trait is similar to `std::io::Read`, but supports asynchronous reads. |
28 | pub trait Read { |
29 | /// Attempts to read bytes into the `buf`. |
30 | /// |
31 | /// On success, returns `Poll::Ready(Ok(()))` and places data in the |
32 | /// unfilled portion of `buf`. If no data was read (`buf.remaining()` is |
33 | /// unchanged), it implies that EOF has been reached. |
34 | /// |
35 | /// If no data is available for reading, the method returns `Poll::Pending` |
36 | /// and arranges for the current task (via `cx.waker()`) to receive a |
37 | /// notification when the object becomes readable or is closed. |
38 | fn poll_read( |
39 | self: Pin<&mut Self>, |
40 | cx: &mut Context<'_>, |
41 | buf: ReadBufCursor<'_>, |
42 | ) -> Poll<Result<(), std::io::Error>>; |
43 | } |
44 | |
45 | /// Write bytes asynchronously. |
46 | /// |
47 | /// This trait is similar to `std::io::Write`, but for asynchronous writes. |
48 | pub trait Write { |
49 | /// Attempt to write bytes from `buf` into the destination. |
50 | /// |
51 | /// On success, returns `Poll::Ready(Ok(num_bytes_written)))`. If |
52 | /// successful, it must be guaranteed that `n <= buf.len()`. A return value |
53 | /// of `0` means that the underlying object is no longer able to accept |
54 | /// bytes, or that the provided buffer is empty. |
55 | /// |
56 | /// If the object is not ready for writing, the method returns |
57 | /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to |
58 | /// receive a notification when the object becomes writable or is closed. |
59 | fn poll_write( |
60 | self: Pin<&mut Self>, |
61 | cx: &mut Context<'_>, |
62 | buf: &[u8], |
63 | ) -> Poll<Result<usize, std::io::Error>>; |
64 | |
65 | /// Attempts to flush the object. |
66 | /// |
67 | /// On success, returns `Poll::Ready(Ok(()))`. |
68 | /// |
69 | /// If flushing cannot immediately complete, this method returns |
70 | /// `Poll::Pending` and arranges for the current task (via `cx.waker()`) to |
71 | /// receive a notification when the object can make progress. |
72 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>>; |
73 | |
74 | /// Attempts to shut down this writer. |
75 | fn poll_shutdown( |
76 | self: Pin<&mut Self>, |
77 | cx: &mut Context<'_>, |
78 | ) -> Poll<Result<(), std::io::Error>>; |
79 | |
80 | /// Returns whether this writer has an efficient `poll_write_vectored` |
81 | /// implementation. |
82 | /// |
83 | /// The default implementation returns `false`. |
84 | fn is_write_vectored(&self) -> bool { |
85 | false |
86 | } |
87 | |
88 | /// Like `poll_write`, except that it writes from a slice of buffers. |
89 | fn poll_write_vectored( |
90 | self: Pin<&mut Self>, |
91 | cx: &mut Context<'_>, |
92 | bufs: &[std::io::IoSlice<'_>], |
93 | ) -> Poll<Result<usize, std::io::Error>> { |
94 | let buf = bufs |
95 | .iter() |
96 | .find(|b| !b.is_empty()) |
97 | .map_or(&[][..], |b| &**b); |
98 | self.poll_write(cx, buf) |
99 | } |
100 | } |
101 | |
102 | /// A wrapper around a byte buffer that is incrementally filled and initialized. |
103 | /// |
104 | /// This type is a sort of "double cursor". It tracks three regions in the |
105 | /// buffer: a region at the beginning of the buffer that has been logically |
106 | /// filled with data, a region that has been initialized at some point but not |
107 | /// yet logically filled, and a region at the end that may be uninitialized. |
108 | /// The filled region is guaranteed to be a subset of the initialized region. |
109 | /// |
110 | /// In summary, the contents of the buffer can be visualized as: |
111 | /// |
112 | /// ```not_rust |
113 | /// [ capacity ] |
114 | /// [ filled | unfilled ] |
115 | /// [ initialized | uninitialized ] |
116 | /// ``` |
117 | /// |
118 | /// It is undefined behavior to de-initialize any bytes from the uninitialized |
119 | /// region, since it is merely unknown whether this region is uninitialized or |
120 | /// not, and if part of it turns out to be initialized, it must stay initialized. |
121 | pub struct ReadBuf<'a> { |
122 | raw: &'a mut [MaybeUninit<u8>], |
123 | filled: usize, |
124 | init: usize, |
125 | } |
126 | |
127 | /// The cursor part of a [`ReadBuf`]. |
128 | /// |
129 | /// This is created by calling `ReadBuf::unfilled()`. |
130 | #[derive (Debug)] |
131 | pub struct ReadBufCursor<'a> { |
132 | buf: &'a mut ReadBuf<'a>, |
133 | } |
134 | |
135 | impl<'data> ReadBuf<'data> { |
136 | /// Create a new `ReadBuf` with a slice of initialized bytes. |
137 | #[inline ] |
138 | pub fn new(raw: &'data mut [u8]) -> Self { |
139 | let len = raw.len(); |
140 | Self { |
141 | // SAFETY: We never de-init the bytes ourselves. |
142 | raw: unsafe { &mut *(raw as *mut [u8] as *mut [MaybeUninit<u8>]) }, |
143 | filled: 0, |
144 | init: len, |
145 | } |
146 | } |
147 | |
148 | /// Create a new `ReadBuf` with a slice of uninitialized bytes. |
149 | #[inline ] |
150 | pub fn uninit(raw: &'data mut [MaybeUninit<u8>]) -> Self { |
151 | Self { |
152 | raw, |
153 | filled: 0, |
154 | init: 0, |
155 | } |
156 | } |
157 | |
158 | /// Get a slice of the buffer that has been filled in with bytes. |
159 | #[inline ] |
160 | pub fn filled(&self) -> &[u8] { |
161 | // SAFETY: We only slice the filled part of the buffer, which is always valid |
162 | unsafe { &*(&self.raw[0..self.filled] as *const [MaybeUninit<u8>] as *const [u8]) } |
163 | } |
164 | |
165 | /// Get a cursor to the unfilled portion of the buffer. |
166 | #[inline ] |
167 | pub fn unfilled<'cursor>(&'cursor mut self) -> ReadBufCursor<'cursor> { |
168 | ReadBufCursor { |
169 | // SAFETY: self.buf is never re-assigned, so its safe to narrow |
170 | // the lifetime. |
171 | buf: unsafe { |
172 | std::mem::transmute::<&'cursor mut ReadBuf<'data>, &'cursor mut ReadBuf<'cursor>>( |
173 | self, |
174 | ) |
175 | }, |
176 | } |
177 | } |
178 | |
179 | #[inline ] |
180 | #[cfg (all(any(feature = "client" , feature = "server" ), feature = "http2" ))] |
181 | pub(crate) unsafe fn set_init(&mut self, n: usize) { |
182 | self.init = self.init.max(n); |
183 | } |
184 | |
185 | #[inline ] |
186 | #[cfg (all(any(feature = "client" , feature = "server" ), feature = "http2" ))] |
187 | pub(crate) unsafe fn set_filled(&mut self, n: usize) { |
188 | self.filled = self.filled.max(n); |
189 | } |
190 | |
191 | #[inline ] |
192 | #[cfg (all(any(feature = "client" , feature = "server" ), feature = "http2" ))] |
193 | pub(crate) fn len(&self) -> usize { |
194 | self.filled |
195 | } |
196 | |
197 | #[inline ] |
198 | #[cfg (all(any(feature = "client" , feature = "server" ), feature = "http2" ))] |
199 | pub(crate) fn init_len(&self) -> usize { |
200 | self.init |
201 | } |
202 | |
203 | #[inline ] |
204 | fn remaining(&self) -> usize { |
205 | self.capacity() - self.filled |
206 | } |
207 | |
208 | #[inline ] |
209 | fn capacity(&self) -> usize { |
210 | self.raw.len() |
211 | } |
212 | } |
213 | |
214 | impl fmt::Debug for ReadBuf<'_> { |
215 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
216 | f&mut DebugStruct<'_, '_>.debug_struct("ReadBuf" ) |
217 | .field("filled" , &self.filled) |
218 | .field("init" , &self.init) |
219 | .field(name:"capacity" , &self.capacity()) |
220 | .finish() |
221 | } |
222 | } |
223 | |
224 | impl ReadBufCursor<'_> { |
225 | /// Access the unfilled part of the buffer. |
226 | /// |
227 | /// # Safety |
228 | /// |
229 | /// The caller must not uninitialize any bytes that may have been |
230 | /// initialized before. |
231 | #[inline ] |
232 | pub unsafe fn as_mut(&mut self) -> &mut [MaybeUninit<u8>] { |
233 | &mut self.buf.raw[self.buf.filled..] |
234 | } |
235 | |
236 | /// Advance the `filled` cursor by `n` bytes. |
237 | /// |
238 | /// # Safety |
239 | /// |
240 | /// The caller must take care that `n` more bytes have been initialized. |
241 | #[inline ] |
242 | pub unsafe fn advance(&mut self, n: usize) { |
243 | self.buf.filled = self.buf.filled.checked_add(n).expect("overflow" ); |
244 | self.buf.init = self.buf.filled.max(self.buf.init); |
245 | } |
246 | |
247 | /// Returns the number of bytes that can be written from the current |
248 | /// position until the end of the buffer is reached. |
249 | /// |
250 | /// This value is equal to the length of the slice returned by `as_mut()``. |
251 | #[inline ] |
252 | pub fn remaining(&self) -> usize { |
253 | self.buf.remaining() |
254 | } |
255 | |
256 | /// Transfer bytes into `self`` from `src` and advance the cursor |
257 | /// by the number of bytes written. |
258 | /// |
259 | /// # Panics |
260 | /// |
261 | /// `self` must have enough remaining capacity to contain all of `src`. |
262 | #[inline ] |
263 | pub fn put_slice(&mut self, src: &[u8]) { |
264 | assert!( |
265 | self.buf.remaining() >= src.len(), |
266 | "src.len() must fit in remaining()" |
267 | ); |
268 | |
269 | let amt = src.len(); |
270 | // Cannot overflow, asserted above |
271 | let end = self.buf.filled + amt; |
272 | |
273 | // Safety: the length is asserted above |
274 | unsafe { |
275 | self.buf.raw[self.buf.filled..end] |
276 | .as_mut_ptr() |
277 | .cast::<u8>() |
278 | .copy_from_nonoverlapping(src.as_ptr(), amt); |
279 | } |
280 | |
281 | if self.buf.init < end { |
282 | self.buf.init = end; |
283 | } |
284 | self.buf.filled = end; |
285 | } |
286 | } |
287 | |
288 | macro_rules! deref_async_read { |
289 | () => { |
290 | fn poll_read( |
291 | mut self: Pin<&mut Self>, |
292 | cx: &mut Context<'_>, |
293 | buf: ReadBufCursor<'_>, |
294 | ) -> Poll<std::io::Result<()>> { |
295 | Pin::new(&mut **self).poll_read(cx, buf) |
296 | } |
297 | }; |
298 | } |
299 | |
300 | impl<T: ?Sized + Read + Unpin> Read for Box<T> { |
301 | deref_async_read!(); |
302 | } |
303 | |
304 | impl<T: ?Sized + Read + Unpin> Read for &mut T { |
305 | deref_async_read!(); |
306 | } |
307 | |
308 | impl<P> Read for Pin<P> |
309 | where |
310 | P: DerefMut, |
311 | P::Target: Read, |
312 | { |
313 | fn poll_read( |
314 | self: Pin<&mut Self>, |
315 | cx: &mut Context<'_>, |
316 | buf: ReadBufCursor<'_>, |
317 | ) -> Poll<std::io::Result<()>> { |
318 | pin_as_deref_mut(self).poll_read(cx, buf) |
319 | } |
320 | } |
321 | |
322 | macro_rules! deref_async_write { |
323 | () => { |
324 | fn poll_write( |
325 | mut self: Pin<&mut Self>, |
326 | cx: &mut Context<'_>, |
327 | buf: &[u8], |
328 | ) -> Poll<std::io::Result<usize>> { |
329 | Pin::new(&mut **self).poll_write(cx, buf) |
330 | } |
331 | |
332 | fn poll_write_vectored( |
333 | mut self: Pin<&mut Self>, |
334 | cx: &mut Context<'_>, |
335 | bufs: &[std::io::IoSlice<'_>], |
336 | ) -> Poll<std::io::Result<usize>> { |
337 | Pin::new(&mut **self).poll_write_vectored(cx, bufs) |
338 | } |
339 | |
340 | fn is_write_vectored(&self) -> bool { |
341 | (**self).is_write_vectored() |
342 | } |
343 | |
344 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { |
345 | Pin::new(&mut **self).poll_flush(cx) |
346 | } |
347 | |
348 | fn poll_shutdown( |
349 | mut self: Pin<&mut Self>, |
350 | cx: &mut Context<'_>, |
351 | ) -> Poll<std::io::Result<()>> { |
352 | Pin::new(&mut **self).poll_shutdown(cx) |
353 | } |
354 | }; |
355 | } |
356 | |
357 | impl<T: ?Sized + Write + Unpin> Write for Box<T> { |
358 | deref_async_write!(); |
359 | } |
360 | |
361 | impl<T: ?Sized + Write + Unpin> Write for &mut T { |
362 | deref_async_write!(); |
363 | } |
364 | |
365 | impl<P> Write for Pin<P> |
366 | where |
367 | P: DerefMut, |
368 | P::Target: Write, |
369 | { |
370 | fn poll_write( |
371 | self: Pin<&mut Self>, |
372 | cx: &mut Context<'_>, |
373 | buf: &[u8], |
374 | ) -> Poll<std::io::Result<usize>> { |
375 | pin_as_deref_mut(self).poll_write(cx, buf) |
376 | } |
377 | |
378 | fn poll_write_vectored( |
379 | self: Pin<&mut Self>, |
380 | cx: &mut Context<'_>, |
381 | bufs: &[std::io::IoSlice<'_>], |
382 | ) -> Poll<std::io::Result<usize>> { |
383 | pin_as_deref_mut(self).poll_write_vectored(cx, bufs) |
384 | } |
385 | |
386 | fn is_write_vectored(&self) -> bool { |
387 | (**self).is_write_vectored() |
388 | } |
389 | |
390 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { |
391 | pin_as_deref_mut(self).poll_flush(cx) |
392 | } |
393 | |
394 | fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { |
395 | pin_as_deref_mut(self).poll_shutdown(cx) |
396 | } |
397 | } |
398 | |
399 | /// Polyfill for Pin::as_deref_mut() |
400 | /// TODO: use Pin::as_deref_mut() instead once stabilized |
401 | fn pin_as_deref_mut<P: DerefMut>(pin: Pin<&mut Pin<P>>) -> Pin<&mut P::Target> { |
402 | // SAFETY: we go directly from Pin<&mut Pin<P>> to Pin<&mut P::Target>, without moving or |
403 | // giving out the &mut Pin<P> in the process. See Pin::as_deref_mut() for more detail. |
404 | unsafe { pin.get_unchecked_mut() }.as_mut() |
405 | } |
406 | |