1//! Asynchronous I/O
2//!
3//! This crate contains the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and
4//! `AsyncBufRead` traits, the asynchronous analogs to
5//! `std::io::{Read, Write, Seek, BufRead}`. The primary difference is
6//! that these traits integrate with the asynchronous task system.
7//!
8//! All items of this library are only available when the `std` feature of this
9//! library is activated, and it is activated by default.
10
11#![cfg_attr(not(feature = "std"), no_std)]
12#![warn(missing_debug_implementations, missing_docs, rust_2018_idioms, unreachable_pub)]
13// It cannot be included in the published code because this lints have false positives in the minimum required version.
14#![cfg_attr(test, warn(single_use_lifetimes))]
15#![doc(test(
16 no_crate_inject,
17 attr(
18 deny(warnings, rust_2018_idioms, single_use_lifetimes),
19 allow(dead_code, unused_assignments, unused_variables)
20 )
21))]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23
24#[cfg(feature = "std")]
25mod if_std {
26 use std::io;
27 use std::ops::DerefMut;
28 use std::pin::Pin;
29 use std::task::{Context, Poll};
30
31 // Re-export some types from `std::io` so that users don't have to deal
32 // with conflicts when `use`ing `futures::io` and `std::io`.
33 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
34 #[doc(no_inline)]
35 pub use io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
36
37 /// Read bytes asynchronously.
38 ///
39 /// This trait is analogous to the `std::io::Read` trait, but integrates
40 /// with the asynchronous task system. In particular, the `poll_read`
41 /// method, unlike `Read::read`, will automatically queue the current task
42 /// for wakeup and return if data is not yet available, rather than blocking
43 /// the calling thread.
44 pub trait AsyncRead {
45 /// Attempt to read from the `AsyncRead` into `buf`.
46 ///
47 /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
48 ///
49 /// If no data is available for reading, the method returns
50 /// `Poll::Pending` and arranges for the current task (via
51 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
52 /// readable or is closed.
53 ///
54 /// # Implementation
55 ///
56 /// This function may not return errors of kind `WouldBlock` or
57 /// `Interrupted`. Implementations must convert `WouldBlock` into
58 /// `Poll::Pending` and either internally retry or convert
59 /// `Interrupted` into another error kind.
60 fn poll_read(
61 self: Pin<&mut Self>,
62 cx: &mut Context<'_>,
63 buf: &mut [u8],
64 ) -> Poll<Result<usize>>;
65
66 /// Attempt to read from the `AsyncRead` into `bufs` using vectored
67 /// IO operations.
68 ///
69 /// This method is similar to `poll_read`, but allows data to be read
70 /// into multiple buffers using a single operation.
71 ///
72 /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
73 ///
74 /// If no data is available for reading, the method returns
75 /// `Poll::Pending` and arranges for the current task (via
76 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
77 /// readable or is closed.
78 /// By default, this method delegates to using `poll_read` on the first
79 /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
80 /// support vectored IO should override this method.
81 ///
82 /// # Implementation
83 ///
84 /// This function may not return errors of kind `WouldBlock` or
85 /// `Interrupted`. Implementations must convert `WouldBlock` into
86 /// `Poll::Pending` and either internally retry or convert
87 /// `Interrupted` into another error kind.
88 fn poll_read_vectored(
89 self: Pin<&mut Self>,
90 cx: &mut Context<'_>,
91 bufs: &mut [IoSliceMut<'_>],
92 ) -> Poll<Result<usize>> {
93 for b in bufs {
94 if !b.is_empty() {
95 return self.poll_read(cx, b);
96 }
97 }
98
99 self.poll_read(cx, &mut [])
100 }
101 }
102
103 /// Write bytes asynchronously.
104 ///
105 /// This trait is analogous to the `std::io::Write` trait, but integrates
106 /// with the asynchronous task system. In particular, the `poll_write`
107 /// method, unlike `Write::write`, will automatically queue the current task
108 /// for wakeup and return if the writer cannot take more data, rather than blocking
109 /// the calling thread.
110 pub trait AsyncWrite {
111 /// Attempt to write bytes from `buf` into the object.
112 ///
113 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
114 ///
115 /// If the object is not ready for writing, the method returns
116 /// `Poll::Pending` and arranges for the current task (via
117 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
118 /// writable or is closed.
119 ///
120 /// # Implementation
121 ///
122 /// This function may not return errors of kind `WouldBlock` or
123 /// `Interrupted`. Implementations must convert `WouldBlock` into
124 /// `Poll::Pending` and either internally retry or convert
125 /// `Interrupted` into another error kind.
126 ///
127 /// `poll_write` must try to make progress by flushing the underlying object if
128 /// that is the only way the underlying object can become writable again.
129 fn poll_write(
130 self: Pin<&mut Self>,
131 cx: &mut Context<'_>,
132 buf: &[u8],
133 ) -> Poll<Result<usize>>;
134
135 /// Attempt to write bytes from `bufs` into the object using vectored
136 /// IO operations.
137 ///
138 /// This method is similar to `poll_write`, but allows data from multiple buffers to be written
139 /// using a single operation.
140 ///
141 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
142 ///
143 /// If the object is not ready for writing, the method returns
144 /// `Poll::Pending` and arranges for the current task (via
145 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
146 /// writable or is closed.
147 ///
148 /// By default, this method delegates to using `poll_write` on the first
149 /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
150 /// support vectored IO should override this method.
151 ///
152 /// # Implementation
153 ///
154 /// This function may not return errors of kind `WouldBlock` or
155 /// `Interrupted`. Implementations must convert `WouldBlock` into
156 /// `Poll::Pending` and either internally retry or convert
157 /// `Interrupted` into another error kind.
158 fn poll_write_vectored(
159 self: Pin<&mut Self>,
160 cx: &mut Context<'_>,
161 bufs: &[IoSlice<'_>],
162 ) -> Poll<Result<usize>> {
163 for b in bufs {
164 if !b.is_empty() {
165 return self.poll_write(cx, b);
166 }
167 }
168
169 self.poll_write(cx, &[])
170 }
171
172 /// Attempt to flush the object, ensuring that any buffered data reach
173 /// their destination.
174 ///
175 /// On success, returns `Poll::Ready(Ok(()))`.
176 ///
177 /// If flushing cannot immediately complete, this method returns
178 /// `Poll::Pending` and arranges for the current task (via
179 /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
180 /// progress towards flushing.
181 ///
182 /// # Implementation
183 ///
184 /// This function may not return errors of kind `WouldBlock` or
185 /// `Interrupted`. Implementations must convert `WouldBlock` into
186 /// `Poll::Pending` and either internally retry or convert
187 /// `Interrupted` into another error kind.
188 ///
189 /// It only makes sense to do anything here if you actually buffer data.
190 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
191
192 /// Attempt to close the object.
193 ///
194 /// On success, returns `Poll::Ready(Ok(()))`.
195 ///
196 /// If closing cannot immediately complete, this function returns
197 /// `Poll::Pending` and arranges for the current task (via
198 /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
199 /// progress towards closing.
200 ///
201 /// # Implementation
202 ///
203 /// This function may not return errors of kind `WouldBlock` or
204 /// `Interrupted`. Implementations must convert `WouldBlock` into
205 /// `Poll::Pending` and either internally retry or convert
206 /// `Interrupted` into another error kind.
207 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
208 }
209
210 /// Seek bytes asynchronously.
211 ///
212 /// This trait is analogous to the `std::io::Seek` trait, but integrates
213 /// with the asynchronous task system. In particular, the `poll_seek`
214 /// method, unlike `Seek::seek`, will automatically queue the current task
215 /// for wakeup and return if data is not yet available, rather than blocking
216 /// the calling thread.
217 pub trait AsyncSeek {
218 /// Attempt to seek to an offset, in bytes, in a stream.
219 ///
220 /// A seek beyond the end of a stream is allowed, but behavior is defined
221 /// by the implementation.
222 ///
223 /// If the seek operation completed successfully,
224 /// this method returns the new position from the start of the stream.
225 /// That position can be used later with [`SeekFrom::Start`].
226 ///
227 /// # Errors
228 ///
229 /// Seeking to a negative offset is considered an error.
230 ///
231 /// # Implementation
232 ///
233 /// This function may not return errors of kind `WouldBlock` or
234 /// `Interrupted`. Implementations must convert `WouldBlock` into
235 /// `Poll::Pending` and either internally retry or convert
236 /// `Interrupted` into another error kind.
237 fn poll_seek(
238 self: Pin<&mut Self>,
239 cx: &mut Context<'_>,
240 pos: SeekFrom,
241 ) -> Poll<Result<u64>>;
242 }
243
244 /// Read bytes asynchronously.
245 ///
246 /// This trait is analogous to the `std::io::BufRead` trait, but integrates
247 /// with the asynchronous task system. In particular, the `poll_fill_buf`
248 /// method, unlike `BufRead::fill_buf`, will automatically queue the current task
249 /// for wakeup and return if data is not yet available, rather than blocking
250 /// the calling thread.
251 pub trait AsyncBufRead: AsyncRead {
252 /// Attempt to return the contents of the internal buffer, filling it with more data
253 /// from the inner reader if it is empty.
254 ///
255 /// On success, returns `Poll::Ready(Ok(buf))`.
256 ///
257 /// If no data is available for reading, the method returns
258 /// `Poll::Pending` and arranges for the current task (via
259 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
260 /// readable or is closed.
261 ///
262 /// This function is a lower-level call. It needs to be paired with the
263 /// [`consume`] method to function properly. When calling this
264 /// method, none of the contents will be "read" in the sense that later
265 /// calling [`poll_read`] may return the same contents. As such, [`consume`] must
266 /// be called with the number of bytes that are consumed from this buffer to
267 /// ensure that the bytes are never returned twice.
268 ///
269 /// [`poll_read`]: AsyncRead::poll_read
270 /// [`consume`]: AsyncBufRead::consume
271 ///
272 /// An empty buffer returned indicates that the stream has reached EOF.
273 ///
274 /// # Implementation
275 ///
276 /// This function may not return errors of kind `WouldBlock` or
277 /// `Interrupted`. Implementations must convert `WouldBlock` into
278 /// `Poll::Pending` and either internally retry or convert
279 /// `Interrupted` into another error kind.
280 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>;
281
282 /// Tells this buffer that `amt` bytes have been consumed from the buffer,
283 /// so they should no longer be returned in calls to [`poll_read`].
284 ///
285 /// This function is a lower-level call. It needs to be paired with the
286 /// [`poll_fill_buf`] method to function properly. This function does
287 /// not perform any I/O, it simply informs this object that some amount of
288 /// its buffer, returned from [`poll_fill_buf`], has been consumed and should
289 /// no longer be returned. As such, this function may do odd things if
290 /// [`poll_fill_buf`] isn't called before calling it.
291 ///
292 /// The `amt` must be `<=` the number of bytes in the buffer returned by
293 /// [`poll_fill_buf`].
294 ///
295 /// [`poll_read`]: AsyncRead::poll_read
296 /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
297 fn consume(self: Pin<&mut Self>, amt: usize);
298 }
299
300 macro_rules! deref_async_read {
301 () => {
302 fn poll_read(
303 mut self: Pin<&mut Self>,
304 cx: &mut Context<'_>,
305 buf: &mut [u8],
306 ) -> Poll<Result<usize>> {
307 Pin::new(&mut **self).poll_read(cx, buf)
308 }
309
310 fn poll_read_vectored(
311 mut self: Pin<&mut Self>,
312 cx: &mut Context<'_>,
313 bufs: &mut [IoSliceMut<'_>],
314 ) -> Poll<Result<usize>> {
315 Pin::new(&mut **self).poll_read_vectored(cx, bufs)
316 }
317 };
318 }
319
320 impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for Box<T> {
321 deref_async_read!();
322 }
323
324 impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for &mut T {
325 deref_async_read!();
326 }
327
328 impl<P> AsyncRead for Pin<P>
329 where
330 P: DerefMut + Unpin,
331 P::Target: AsyncRead,
332 {
333 fn poll_read(
334 self: Pin<&mut Self>,
335 cx: &mut Context<'_>,
336 buf: &mut [u8],
337 ) -> Poll<Result<usize>> {
338 self.get_mut().as_mut().poll_read(cx, buf)
339 }
340
341 fn poll_read_vectored(
342 self: Pin<&mut Self>,
343 cx: &mut Context<'_>,
344 bufs: &mut [IoSliceMut<'_>],
345 ) -> Poll<Result<usize>> {
346 self.get_mut().as_mut().poll_read_vectored(cx, bufs)
347 }
348 }
349
350 macro_rules! delegate_async_read_to_stdio {
351 () => {
352 fn poll_read(
353 mut self: Pin<&mut Self>,
354 _: &mut Context<'_>,
355 buf: &mut [u8],
356 ) -> Poll<Result<usize>> {
357 Poll::Ready(io::Read::read(&mut *self, buf))
358 }
359
360 fn poll_read_vectored(
361 mut self: Pin<&mut Self>,
362 _: &mut Context<'_>,
363 bufs: &mut [IoSliceMut<'_>],
364 ) -> Poll<Result<usize>> {
365 Poll::Ready(io::Read::read_vectored(&mut *self, bufs))
366 }
367 };
368 }
369
370 impl AsyncRead for &[u8] {
371 delegate_async_read_to_stdio!();
372 }
373
374 macro_rules! deref_async_write {
375 () => {
376 fn poll_write(
377 mut self: Pin<&mut Self>,
378 cx: &mut Context<'_>,
379 buf: &[u8],
380 ) -> Poll<Result<usize>> {
381 Pin::new(&mut **self).poll_write(cx, buf)
382 }
383
384 fn poll_write_vectored(
385 mut self: Pin<&mut Self>,
386 cx: &mut Context<'_>,
387 bufs: &[IoSlice<'_>],
388 ) -> Poll<Result<usize>> {
389 Pin::new(&mut **self).poll_write_vectored(cx, bufs)
390 }
391
392 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
393 Pin::new(&mut **self).poll_flush(cx)
394 }
395
396 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
397 Pin::new(&mut **self).poll_close(cx)
398 }
399 };
400 }
401
402 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
403 deref_async_write!();
404 }
405
406 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
407 deref_async_write!();
408 }
409
410 impl<P> AsyncWrite for Pin<P>
411 where
412 P: DerefMut + Unpin,
413 P::Target: AsyncWrite,
414 {
415 fn poll_write(
416 self: Pin<&mut Self>,
417 cx: &mut Context<'_>,
418 buf: &[u8],
419 ) -> Poll<Result<usize>> {
420 self.get_mut().as_mut().poll_write(cx, buf)
421 }
422
423 fn poll_write_vectored(
424 self: Pin<&mut Self>,
425 cx: &mut Context<'_>,
426 bufs: &[IoSlice<'_>],
427 ) -> Poll<Result<usize>> {
428 self.get_mut().as_mut().poll_write_vectored(cx, bufs)
429 }
430
431 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
432 self.get_mut().as_mut().poll_flush(cx)
433 }
434
435 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
436 self.get_mut().as_mut().poll_close(cx)
437 }
438 }
439
440 macro_rules! delegate_async_write_to_stdio {
441 () => {
442 fn poll_write(
443 mut self: Pin<&mut Self>,
444 _: &mut Context<'_>,
445 buf: &[u8],
446 ) -> Poll<Result<usize>> {
447 Poll::Ready(io::Write::write(&mut *self, buf))
448 }
449
450 fn poll_write_vectored(
451 mut self: Pin<&mut Self>,
452 _: &mut Context<'_>,
453 bufs: &[IoSlice<'_>],
454 ) -> Poll<Result<usize>> {
455 Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
456 }
457
458 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
459 Poll::Ready(io::Write::flush(&mut *self))
460 }
461
462 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
463 self.poll_flush(cx)
464 }
465 };
466 }
467
468 impl AsyncWrite for Vec<u8> {
469 delegate_async_write_to_stdio!();
470 }
471
472 macro_rules! deref_async_seek {
473 () => {
474 fn poll_seek(
475 mut self: Pin<&mut Self>,
476 cx: &mut Context<'_>,
477 pos: SeekFrom,
478 ) -> Poll<Result<u64>> {
479 Pin::new(&mut **self).poll_seek(cx, pos)
480 }
481 };
482 }
483
484 impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> {
485 deref_async_seek!();
486 }
487
488 impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T {
489 deref_async_seek!();
490 }
491
492 impl<P> AsyncSeek for Pin<P>
493 where
494 P: DerefMut + Unpin,
495 P::Target: AsyncSeek,
496 {
497 fn poll_seek(
498 self: Pin<&mut Self>,
499 cx: &mut Context<'_>,
500 pos: SeekFrom,
501 ) -> Poll<Result<u64>> {
502 self.get_mut().as_mut().poll_seek(cx, pos)
503 }
504 }
505
506 macro_rules! deref_async_buf_read {
507 () => {
508 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
509 Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
510 }
511
512 fn consume(mut self: Pin<&mut Self>, amt: usize) {
513 Pin::new(&mut **self).consume(amt)
514 }
515 };
516 }
517
518 impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> {
519 deref_async_buf_read!();
520 }
521
522 impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T {
523 deref_async_buf_read!();
524 }
525
526 impl<P> AsyncBufRead for Pin<P>
527 where
528 P: DerefMut + Unpin,
529 P::Target: AsyncBufRead,
530 {
531 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
532 self.get_mut().as_mut().poll_fill_buf(cx)
533 }
534
535 fn consume(self: Pin<&mut Self>, amt: usize) {
536 self.get_mut().as_mut().consume(amt)
537 }
538 }
539
540 macro_rules! delegate_async_buf_read_to_stdio {
541 () => {
542 fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
543 Poll::Ready(io::BufRead::fill_buf(self.get_mut()))
544 }
545
546 fn consume(self: Pin<&mut Self>, amt: usize) {
547 io::BufRead::consume(self.get_mut(), amt)
548 }
549 };
550 }
551
552 impl AsyncBufRead for &[u8] {
553 delegate_async_buf_read_to_stdio!();
554 }
555}
556
557#[cfg(feature = "std")]
558pub use self::if_std::*;
559