1//! Async I/O and timers.
2//!
3//! This crate provides two tools:
4//!
5//! * [`Async`], an adapter for standard networking types (and [many other] types) to use in
6//! async programs.
7//! * [`Timer`], a future or stream that emits timed events.
8//!
9//! For concrete async networking types built on top of this crate, see [`async-net`].
10//!
11//! [many other]: https://github.com/smol-rs/async-io/tree/master/examples
12//! [`async-net`]: https://docs.rs/async-net
13//!
14//! # Implementation
15//!
16//! The first time [`Async`] or [`Timer`] is used, a thread named "async-io" will be spawned.
17//! The purpose of this thread is to wait for I/O events reported by the operating system, and then
18//! wake appropriate futures blocked on I/O or timers when they can be resumed.
19//!
20//! To wait for the next I/O event, the "async-io" thread uses [epoll] on Linux/Android/illumos,
21//! [kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [IOCP] on Windows. That
22//! functionality is provided by the [`polling`] crate.
23//!
24//! However, note that you can also process I/O events and wake futures on any thread using the
25//! [`block_on()`] function. The "async-io" thread is therefore just a fallback mechanism
26//! processing I/O events in case no other threads are.
27//!
28//! [epoll]: https://en.wikipedia.org/wiki/Epoll
29//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
30//! [event ports]: https://illumos.org/man/port_create
31//! [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
32//! [`polling`]: https://docs.rs/polling
33//!
34//! # Examples
35//!
36//! Connect to `example.com:80`, or time out after 10 seconds.
37//!
38//! ```
39//! use async_io::{Async, Timer};
40//! use futures_lite::{future::FutureExt, io};
41//!
42//! use std::net::{TcpStream, ToSocketAddrs};
43//! use std::time::Duration;
44//!
45//! # futures_lite::future::block_on(async {
46//! let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
47//!
48//! let stream = Async::<TcpStream>::connect(addr).or(async {
49//! Timer::after(Duration::from_secs(10)).await;
50//! Err(io::ErrorKind::TimedOut.into())
51//! })
52//! .await?;
53//! # std::io::Result::Ok(()) });
54//! ```
55
56#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
57
58use std::convert::TryFrom;
59use std::future::Future;
60use std::io::{self, IoSlice, IoSliceMut, Read, Write};
61use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
62use std::pin::Pin;
63use std::sync::Arc;
64use std::task::{Context, Poll, Waker};
65use std::time::{Duration, Instant};
66
67#[cfg(all(not(async_io_no_io_safety), unix))]
68use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
69#[cfg(unix)]
70use std::{
71 os::unix::io::{AsRawFd, RawFd},
72 os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
73 path::Path,
74};
75
76#[cfg(windows)]
77use std::os::windows::io::{AsRawSocket, RawSocket};
78#[cfg(all(not(async_io_no_io_safety), windows))]
79use std::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket};
80
81use futures_lite::io::{AsyncRead, AsyncWrite};
82use futures_lite::stream::{self, Stream};
83use futures_lite::{future, pin, ready};
84use socket2::{Domain, Protocol, SockAddr, Socket, Type};
85
86use crate::reactor::{Reactor, Source};
87
88mod driver;
89mod reactor;
90
91pub use driver::block_on;
92pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
93
94/// Use `Duration::MAX` once `duration_constants` are stabilized.
95fn duration_max() -> Duration {
96 Duration::new(secs:std::u64::MAX, nanos:1_000_000_000 - 1)
97}
98
99/// A future or stream that emits timed events.
100///
101/// Timers are futures that output a single [`Instant`] when they fire.
102///
103/// Timers are also streams that can output [`Instant`]s periodically.
104///
105/// # Precision
106///
107/// There is a limit on the maximum precision that a `Timer` can provide. This limit is
108/// dependent on the current platform; for instance, on Windows, the maximum precision is
109/// about 16 milliseconds. Because of this limit, the timer may sleep for longer than the
110/// requested duration. It will never sleep for less.
111///
112/// # Examples
113///
114/// Sleep for 1 second:
115///
116/// ```
117/// use async_io::Timer;
118/// use std::time::Duration;
119///
120/// # futures_lite::future::block_on(async {
121/// Timer::after(Duration::from_secs(1)).await;
122/// # });
123/// ```
124///
125/// Timeout after 1 second:
126///
127/// ```
128/// use async_io::Timer;
129/// use futures_lite::FutureExt;
130/// use std::time::Duration;
131///
132/// # futures_lite::future::block_on(async {
133/// let addrs = async_net::resolve("google.com:80")
134/// .or(async {
135/// Timer::after(Duration::from_secs(10)).await;
136/// Err(std::io::ErrorKind::TimedOut.into())
137/// })
138/// .await?;
139/// # std::io::Result::Ok(()) });
140/// ```
141#[derive(Debug)]
142pub struct Timer {
143 /// This timer's ID and last waker that polled it.
144 ///
145 /// When this field is set to `None`, this timer is not registered in the reactor.
146 id_and_waker: Option<(usize, Waker)>,
147
148 /// The next instant at which this timer fires.
149 ///
150 /// If this timer is a blank timer, this value is None. If the timer
151 /// must be set, this value contains the next instant at which the
152 /// timer must fire.
153 when: Option<Instant>,
154
155 /// The period.
156 period: Duration,
157}
158
159impl Timer {
160 /// Creates a timer that will never fire.
161 ///
162 /// # Examples
163 ///
164 /// This function may also be useful for creating a function with an optional timeout.
165 ///
166 /// ```
167 /// # futures_lite::future::block_on(async {
168 /// use async_io::Timer;
169 /// use futures_lite::prelude::*;
170 /// use std::time::Duration;
171 ///
172 /// async fn run_with_timeout(timeout: Option<Duration>) {
173 /// let timer = timeout
174 /// .map(|timeout| Timer::after(timeout))
175 /// .unwrap_or_else(Timer::never);
176 ///
177 /// run_lengthy_operation().or(timer).await;
178 /// }
179 /// # // Note that since a Timer as a Future returns an Instant,
180 /// # // this function needs to return an Instant to be used
181 /// # // in "or".
182 /// # async fn run_lengthy_operation() -> std::time::Instant {
183 /// # std::time::Instant::now()
184 /// # }
185 ///
186 /// // Times out after 5 seconds.
187 /// run_with_timeout(Some(Duration::from_secs(5))).await;
188 /// // Does not time out.
189 /// run_with_timeout(None).await;
190 /// # });
191 /// ```
192 pub fn never() -> Timer {
193 Timer {
194 id_and_waker: None,
195 when: None,
196 period: duration_max(),
197 }
198 }
199
200 /// Creates a timer that emits an event once after the given duration of time.
201 ///
202 /// # Examples
203 ///
204 /// ```
205 /// use async_io::Timer;
206 /// use std::time::Duration;
207 ///
208 /// # futures_lite::future::block_on(async {
209 /// Timer::after(Duration::from_secs(1)).await;
210 /// # });
211 /// ```
212 pub fn after(duration: Duration) -> Timer {
213 Instant::now()
214 .checked_add(duration)
215 .map_or_else(Timer::never, Timer::at)
216 }
217
218 /// Creates a timer that emits an event once at the given time instant.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// use async_io::Timer;
224 /// use std::time::{Duration, Instant};
225 ///
226 /// # futures_lite::future::block_on(async {
227 /// let now = Instant::now();
228 /// let when = now + Duration::from_secs(1);
229 /// Timer::at(when).await;
230 /// # });
231 /// ```
232 pub fn at(instant: Instant) -> Timer {
233 // Use Duration::MAX once duration_constants are stabilized.
234 Timer::interval_at(instant, duration_max())
235 }
236
237 /// Creates a timer that emits events periodically.
238 ///
239 /// # Examples
240 ///
241 /// ```
242 /// use async_io::Timer;
243 /// use futures_lite::StreamExt;
244 /// use std::time::{Duration, Instant};
245 ///
246 /// # futures_lite::future::block_on(async {
247 /// let period = Duration::from_secs(1);
248 /// Timer::interval(period).next().await;
249 /// # });
250 /// ```
251 pub fn interval(period: Duration) -> Timer {
252 Instant::now()
253 .checked_add(period)
254 .map_or_else(Timer::never, |at| Timer::interval_at(at, period))
255 }
256
257 /// Creates a timer that emits events periodically, starting at `start`.
258 ///
259 /// # Examples
260 ///
261 /// ```
262 /// use async_io::Timer;
263 /// use futures_lite::StreamExt;
264 /// use std::time::{Duration, Instant};
265 ///
266 /// # futures_lite::future::block_on(async {
267 /// let start = Instant::now();
268 /// let period = Duration::from_secs(1);
269 /// Timer::interval_at(start, period).next().await;
270 /// # });
271 /// ```
272 pub fn interval_at(start: Instant, period: Duration) -> Timer {
273 Timer {
274 id_and_waker: None,
275 when: Some(start),
276 period,
277 }
278 }
279
280 /// Indicates whether or not this timer will ever fire.
281 ///
282 /// [`never()`] will never fire, and timers created with [`after()`] or [`at()`] will fire
283 /// if the duration is not too large.
284 ///
285 /// # Examples
286 ///
287 /// ```
288 /// use async_io::Timer;
289 /// use std::time::Duration;
290 ///
291 /// // `never` will never fire.
292 /// assert!(!Timer::never().will_fire());
293 ///
294 /// // `after` will fire if the duration is not too large.
295 /// assert!(Timer::after(Duration::from_secs(1)).will_fire());
296 /// assert!(!Timer::after(Duration::MAX).will_fire());
297 /// ```
298 #[inline]
299 pub fn will_fire(&self) -> bool {
300 self.when.is_some()
301 }
302
303 /// Sets the timer to emit an en event once after the given duration of time.
304 ///
305 /// Note that resetting a timer is different from creating a new timer because
306 /// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
307 /// that is polling the timer.
308 ///
309 /// # Examples
310 ///
311 /// ```
312 /// use async_io::Timer;
313 /// use std::time::Duration;
314 ///
315 /// # futures_lite::future::block_on(async {
316 /// let mut t = Timer::after(Duration::from_secs(1));
317 /// t.set_after(Duration::from_millis(100));
318 /// # });
319 /// ```
320 pub fn set_after(&mut self, duration: Duration) {
321 match Instant::now().checked_add(duration) {
322 Some(instant) => self.set_at(instant),
323 None => {
324 // Overflow to never going off.
325 self.clear();
326 self.when = None;
327 }
328 }
329 }
330
331 /// Sets the timer to emit an event once at the given time instant.
332 ///
333 /// Note that resetting a timer is different from creating a new timer because
334 /// [`set_at()`][`Timer::set_at()`] does not remove the waker associated with the task
335 /// that is polling the timer.
336 ///
337 /// # Examples
338 ///
339 /// ```
340 /// use async_io::Timer;
341 /// use std::time::{Duration, Instant};
342 ///
343 /// # futures_lite::future::block_on(async {
344 /// let mut t = Timer::after(Duration::from_secs(1));
345 ///
346 /// let now = Instant::now();
347 /// let when = now + Duration::from_secs(1);
348 /// t.set_at(when);
349 /// # });
350 /// ```
351 pub fn set_at(&mut self, instant: Instant) {
352 self.clear();
353
354 // Update the timeout.
355 self.when = Some(instant);
356
357 if let Some((id, waker)) = self.id_and_waker.as_mut() {
358 // Re-register the timer with the new timeout.
359 *id = Reactor::get().insert_timer(instant, waker);
360 }
361 }
362
363 /// Sets the timer to emit events periodically.
364 ///
365 /// Note that resetting a timer is different from creating a new timer because
366 /// [`set_interval()`][`Timer::set_interval()`] does not remove the waker associated with the
367 /// task that is polling the timer.
368 ///
369 /// # Examples
370 ///
371 /// ```
372 /// use async_io::Timer;
373 /// use futures_lite::StreamExt;
374 /// use std::time::{Duration, Instant};
375 ///
376 /// # futures_lite::future::block_on(async {
377 /// let mut t = Timer::after(Duration::from_secs(1));
378 ///
379 /// let period = Duration::from_secs(2);
380 /// t.set_interval(period);
381 /// # });
382 /// ```
383 pub fn set_interval(&mut self, period: Duration) {
384 match Instant::now().checked_add(period) {
385 Some(instant) => self.set_interval_at(instant, period),
386 None => {
387 // Overflow to never going off.
388 self.clear();
389 self.when = None;
390 }
391 }
392 }
393
394 /// Sets the timer to emit events periodically, starting at `start`.
395 ///
396 /// Note that resetting a timer is different from creating a new timer because
397 /// [`set_interval_at()`][`Timer::set_interval_at()`] does not remove the waker associated with
398 /// the task that is polling the timer.
399 ///
400 /// # Examples
401 ///
402 /// ```
403 /// use async_io::Timer;
404 /// use futures_lite::StreamExt;
405 /// use std::time::{Duration, Instant};
406 ///
407 /// # futures_lite::future::block_on(async {
408 /// let mut t = Timer::after(Duration::from_secs(1));
409 ///
410 /// let start = Instant::now();
411 /// let period = Duration::from_secs(2);
412 /// t.set_interval_at(start, period);
413 /// # });
414 /// ```
415 pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
416 self.clear();
417
418 self.when = Some(start);
419 self.period = period;
420
421 if let Some((id, waker)) = self.id_and_waker.as_mut() {
422 // Re-register the timer with the new timeout.
423 *id = Reactor::get().insert_timer(start, waker);
424 }
425 }
426
427 /// Helper function to clear the current timer.
428 fn clear(&mut self) {
429 if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) {
430 // Deregister the timer from the reactor.
431 Reactor::get().remove_timer(when, *id);
432 }
433 }
434}
435
436impl Drop for Timer {
437 fn drop(&mut self) {
438 if let (Some(when: Instant), Some((id: usize, _))) = (self.when, self.id_and_waker.take()) {
439 // Deregister the timer from the reactor.
440 Reactor::get().remove_timer(when, id);
441 }
442 }
443}
444
445impl Future for Timer {
446 type Output = Instant;
447
448 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
449 match self.poll_next(cx) {
450 Poll::Ready(Some(when: Instant)) => Poll::Ready(when),
451 Poll::Pending => Poll::Pending,
452 Poll::Ready(None) => unreachable!(),
453 }
454 }
455}
456
457impl Stream for Timer {
458 type Item = Instant;
459
460 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
461 let this = self.get_mut();
462
463 if let Some(ref mut when) = this.when {
464 // Check if the timer has already fired.
465 if Instant::now() >= *when {
466 if let Some((id, _)) = this.id_and_waker.take() {
467 // Deregister the timer from the reactor.
468 Reactor::get().remove_timer(*when, id);
469 }
470 let result_time = *when;
471 if let Some(next) = (*when).checked_add(this.period) {
472 *when = next;
473 // Register the timer in the reactor.
474 let id = Reactor::get().insert_timer(next, cx.waker());
475 this.id_and_waker = Some((id, cx.waker().clone()));
476 }
477 return Poll::Ready(Some(result_time));
478 } else {
479 match &this.id_and_waker {
480 None => {
481 // Register the timer in the reactor.
482 let id = Reactor::get().insert_timer(*when, cx.waker());
483 this.id_and_waker = Some((id, cx.waker().clone()));
484 }
485 Some((id, w)) if !w.will_wake(cx.waker()) => {
486 // Deregister the timer from the reactor to remove the old waker.
487 Reactor::get().remove_timer(*when, *id);
488
489 // Register the timer in the reactor with the new waker.
490 let id = Reactor::get().insert_timer(*when, cx.waker());
491 this.id_and_waker = Some((id, cx.waker().clone()));
492 }
493 Some(_) => {}
494 }
495 }
496 }
497
498 Poll::Pending
499 }
500}
501
502/// Async adapter for I/O types.
503///
504/// This type puts an I/O handle into non-blocking mode, registers it in
505/// [epoll]/[kqueue]/[event ports]/[IOCP], and then provides an async interface for it.
506///
507/// [epoll]: https://en.wikipedia.org/wiki/Epoll
508/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
509/// [event ports]: https://illumos.org/man/port_create
510/// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
511///
512/// # Caveats
513///
514/// [`Async`] is a low-level primitive, and as such it comes with some caveats.
515///
516/// For higher-level primitives built on top of [`Async`], look into [`async-net`] or
517/// [`async-process`] (on Unix).
518///
519/// [`async-net`]: https://github.com/smol-rs/async-net
520/// [`async-process`]: https://github.com/smol-rs/async-process
521///
522/// ### Supported types
523///
524/// [`Async`] supports all networking types, as well as some OS-specific file descriptors like
525/// [timerfd] and [inotify].
526///
527/// However, do not use [`Async`] with types like [`File`][`std::fs::File`],
528/// [`Stdin`][`std::io::Stdin`], [`Stdout`][`std::io::Stdout`], or [`Stderr`][`std::io::Stderr`]
529/// because all operating systems have issues with them when put in non-blocking mode.
530///
531/// [timerfd]: https://github.com/smol-rs/async-io/blob/master/examples/linux-timerfd.rs
532/// [inotify]: https://github.com/smol-rs/async-io/blob/master/examples/linux-inotify.rs
533///
534/// ### Concurrent I/O
535///
536/// Note that [`&Async<T>`][`Async`] implements [`AsyncRead`] and [`AsyncWrite`] if `&T`
537/// implements those traits, which means tasks can concurrently read and write using shared
538/// references.
539///
540/// But there is a catch: only one task can read a time, and only one task can write at a time. It
541/// is okay to have two tasks where one is reading and the other is writing at the same time, but
542/// it is not okay to have two tasks reading at the same time or writing at the same time. If you
543/// try to do that, conflicting tasks will just keep waking each other in turn, thus wasting CPU
544/// time.
545///
546/// Besides [`AsyncRead`] and [`AsyncWrite`], this caveat also applies to
547/// [`poll_readable()`][`Async::poll_readable()`] and
548/// [`poll_writable()`][`Async::poll_writable()`].
549///
550/// However, any number of tasks can be concurrently calling other methods like
551/// [`readable()`][`Async::readable()`] or [`read_with()`][`Async::read_with()`].
552///
553/// ### Closing
554///
555/// Closing the write side of [`Async`] with [`close()`][`futures_lite::AsyncWriteExt::close()`]
556/// simply flushes. If you want to shutdown a TCP or Unix socket, use
557/// [`Shutdown`][`std::net::Shutdown`].
558///
559/// # Examples
560///
561/// Connect to a server and echo incoming messages back to the server:
562///
563/// ```no_run
564/// use async_io::Async;
565/// use futures_lite::io;
566/// use std::net::TcpStream;
567///
568/// # futures_lite::future::block_on(async {
569/// // Connect to a local server.
570/// let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
571///
572/// // Echo all messages from the read side of the stream into the write side.
573/// io::copy(&stream, &stream).await?;
574/// # std::io::Result::Ok(()) });
575/// ```
576///
577/// You can use either predefined async methods or wrap blocking I/O operations in
578/// [`Async::read_with()`], [`Async::read_with_mut()`], [`Async::write_with()`], and
579/// [`Async::write_with_mut()`]:
580///
581/// ```no_run
582/// use async_io::Async;
583/// use std::net::TcpListener;
584///
585/// # futures_lite::future::block_on(async {
586/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
587///
588/// // These two lines are equivalent:
589/// let (stream, addr) = listener.accept().await?;
590/// let (stream, addr) = listener.read_with(|inner| inner.accept()).await?;
591/// # std::io::Result::Ok(()) });
592/// ```
593#[derive(Debug)]
594pub struct Async<T> {
595 /// A source registered in the reactor.
596 source: Arc<Source>,
597
598 /// The inner I/O handle.
599 io: Option<T>,
600}
601
602impl<T> Unpin for Async<T> {}
603
604#[cfg(unix)]
605impl<T: AsRawFd> Async<T> {
606 /// Creates an async I/O handle.
607 ///
608 /// This method will put the handle in non-blocking mode and register it in
609 /// [epoll]/[kqueue]/[event ports]/[IOCP].
610 ///
611 /// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
612 /// `AsRawSocket`.
613 ///
614 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
615 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
616 /// [event ports]: https://illumos.org/man/port_create
617 /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
618 ///
619 /// # Examples
620 ///
621 /// ```
622 /// use async_io::Async;
623 /// use std::net::{SocketAddr, TcpListener};
624 ///
625 /// # futures_lite::future::block_on(async {
626 /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
627 /// let listener = Async::new(listener)?;
628 /// # std::io::Result::Ok(()) });
629 /// ```
630 pub fn new(io: T) -> io::Result<Async<T>> {
631 let raw = io.as_raw_fd();
632
633 // Put the file descriptor in non-blocking mode.
634 //
635 // Safety: We assume `as_raw_fd()` returns a valid fd. When we can
636 // depend on Rust >= 1.63, where `AsFd` is stabilized, and when
637 // `TimerFd` implements it, we can remove this unsafe and simplify this.
638 let fd = unsafe { rustix::fd::BorrowedFd::borrow_raw(raw) };
639 cfg_if::cfg_if! {
640 // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
641 // for now, as with the standard library, because it seems to behave
642 // differently depending on the platform.
643 // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
644 // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
645 // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
646 if #[cfg(target_os = "linux")] {
647 rustix::io::ioctl_fionbio(fd, true)?;
648 } else {
649 let previous = rustix::fs::fcntl_getfl(fd)?;
650 let new = previous | rustix::fs::OFlags::NONBLOCK;
651 if new != previous {
652 rustix::fs::fcntl_setfl(fd, new)?;
653 }
654 }
655 }
656
657 Ok(Async {
658 source: Reactor::get().insert_io(raw)?,
659 io: Some(io),
660 })
661 }
662}
663
664#[cfg(unix)]
665impl<T: AsRawFd> AsRawFd for Async<T> {
666 fn as_raw_fd(&self) -> RawFd {
667 self.source.raw
668 }
669}
670
671#[cfg(all(not(async_io_no_io_safety), unix))]
672impl<T: AsFd> AsFd for Async<T> {
673 fn as_fd(&self) -> BorrowedFd<'_> {
674 self.get_ref().as_fd()
675 }
676}
677
678#[cfg(all(not(async_io_no_io_safety), unix))]
679impl<T: AsRawFd + From<OwnedFd>> TryFrom<OwnedFd> for Async<T> {
680 type Error = io::Error;
681
682 fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
683 Async::new(io:value.into())
684 }
685}
686
687#[cfg(all(not(async_io_no_io_safety), unix))]
688impl<T: Into<OwnedFd>> TryFrom<Async<T>> for OwnedFd {
689 type Error = io::Error;
690
691 fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
692 value.into_inner().map(op:Into::into)
693 }
694}
695
696#[cfg(windows)]
697impl<T: AsRawSocket> Async<T> {
698 /// Creates an async I/O handle.
699 ///
700 /// This method will put the handle in non-blocking mode and register it in
701 /// [epoll]/[kqueue]/[event ports]/[IOCP].
702 ///
703 /// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
704 /// `AsRawSocket`.
705 ///
706 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
707 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
708 /// [event ports]: https://illumos.org/man/port_create
709 /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
710 ///
711 /// # Examples
712 ///
713 /// ```
714 /// use async_io::Async;
715 /// use std::net::{SocketAddr, TcpListener};
716 ///
717 /// # futures_lite::future::block_on(async {
718 /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
719 /// let listener = Async::new(listener)?;
720 /// # std::io::Result::Ok(()) });
721 /// ```
722 pub fn new(io: T) -> io::Result<Async<T>> {
723 let sock = io.as_raw_socket();
724 let borrowed = unsafe { rustix::fd::BorrowedFd::borrow_raw(sock) };
725
726 // Put the socket in non-blocking mode.
727 //
728 // Safety: We assume `as_raw_socket()` returns a valid fd. When we can
729 // depend on Rust >= 1.63, where `AsFd` is stabilized, and when
730 // `TimerFd` implements it, we can remove this unsafe and simplify this.
731 rustix::io::ioctl_fionbio(borrowed, true)?;
732
733 Ok(Async {
734 source: Reactor::get().insert_io(sock)?,
735 io: Some(io),
736 })
737 }
738}
739
740#[cfg(windows)]
741impl<T: AsRawSocket> AsRawSocket for Async<T> {
742 fn as_raw_socket(&self) -> RawSocket {
743 self.source.raw
744 }
745}
746
747#[cfg(all(not(async_io_no_io_safety), windows))]
748impl<T: AsSocket> AsSocket for Async<T> {
749 fn as_socket(&self) -> BorrowedSocket<'_> {
750 self.get_ref().as_socket()
751 }
752}
753
754#[cfg(all(not(async_io_no_io_safety), windows))]
755impl<T: AsRawSocket + From<OwnedSocket>> TryFrom<OwnedSocket> for Async<T> {
756 type Error = io::Error;
757
758 fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
759 Async::new(value.into())
760 }
761}
762
763#[cfg(all(not(async_io_no_io_safety), windows))]
764impl<T: Into<OwnedSocket>> TryFrom<Async<T>> for OwnedSocket {
765 type Error = io::Error;
766
767 fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
768 value.into_inner().map(Into::into)
769 }
770}
771
772impl<T> Async<T> {
773 /// Gets a reference to the inner I/O handle.
774 ///
775 /// # Examples
776 ///
777 /// ```
778 /// use async_io::Async;
779 /// use std::net::TcpListener;
780 ///
781 /// # futures_lite::future::block_on(async {
782 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
783 /// let inner = listener.get_ref();
784 /// # std::io::Result::Ok(()) });
785 /// ```
786 pub fn get_ref(&self) -> &T {
787 self.io.as_ref().unwrap()
788 }
789
790 /// Gets a mutable reference to the inner I/O handle.
791 ///
792 /// # Examples
793 ///
794 /// ```
795 /// use async_io::Async;
796 /// use std::net::TcpListener;
797 ///
798 /// # futures_lite::future::block_on(async {
799 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
800 /// let inner = listener.get_mut();
801 /// # std::io::Result::Ok(()) });
802 /// ```
803 pub fn get_mut(&mut self) -> &mut T {
804 self.io.as_mut().unwrap()
805 }
806
807 /// Unwraps the inner I/O handle.
808 ///
809 /// This method will **not** put the I/O handle back into blocking mode.
810 ///
811 /// # Examples
812 ///
813 /// ```
814 /// use async_io::Async;
815 /// use std::net::TcpListener;
816 ///
817 /// # futures_lite::future::block_on(async {
818 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
819 /// let inner = listener.into_inner()?;
820 ///
821 /// // Put the listener back into blocking mode.
822 /// inner.set_nonblocking(false)?;
823 /// # std::io::Result::Ok(()) });
824 /// ```
825 pub fn into_inner(mut self) -> io::Result<T> {
826 let io = self.io.take().unwrap();
827 Reactor::get().remove_io(&self.source)?;
828 Ok(io)
829 }
830
831 /// Waits until the I/O handle is readable.
832 ///
833 /// This method completes when a read operation on this I/O handle wouldn't block.
834 ///
835 /// # Examples
836 ///
837 /// ```no_run
838 /// use async_io::Async;
839 /// use std::net::TcpListener;
840 ///
841 /// # futures_lite::future::block_on(async {
842 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
843 ///
844 /// // Wait until a client can be accepted.
845 /// listener.readable().await?;
846 /// # std::io::Result::Ok(()) });
847 /// ```
848 pub fn readable(&self) -> Readable<'_, T> {
849 Source::readable(self)
850 }
851
852 /// Waits until the I/O handle is readable.
853 ///
854 /// This method completes when a read operation on this I/O handle wouldn't block.
855 pub fn readable_owned(self: Arc<Self>) -> ReadableOwned<T> {
856 Source::readable_owned(self)
857 }
858
859 /// Waits until the I/O handle is writable.
860 ///
861 /// This method completes when a write operation on this I/O handle wouldn't block.
862 ///
863 /// # Examples
864 ///
865 /// ```
866 /// use async_io::Async;
867 /// use std::net::{TcpStream, ToSocketAddrs};
868 ///
869 /// # futures_lite::future::block_on(async {
870 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
871 /// let stream = Async::<TcpStream>::connect(addr).await?;
872 ///
873 /// // Wait until the stream is writable.
874 /// stream.writable().await?;
875 /// # std::io::Result::Ok(()) });
876 /// ```
877 pub fn writable(&self) -> Writable<'_, T> {
878 Source::writable(self)
879 }
880
881 /// Waits until the I/O handle is writable.
882 ///
883 /// This method completes when a write operation on this I/O handle wouldn't block.
884 pub fn writable_owned(self: Arc<Self>) -> WritableOwned<T> {
885 Source::writable_owned(self)
886 }
887
888 /// Polls the I/O handle for readability.
889 ///
890 /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
891 /// indicating readability since the last time this task has called the method and received
892 /// [`Poll::Pending`].
893 ///
894 /// # Caveats
895 ///
896 /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
897 /// will just keep waking each other in turn, thus wasting CPU time.
898 ///
899 /// Note that the [`AsyncRead`] implementation for [`Async`] also uses this method.
900 ///
901 /// # Examples
902 ///
903 /// ```no_run
904 /// use async_io::Async;
905 /// use futures_lite::future;
906 /// use std::net::TcpListener;
907 ///
908 /// # futures_lite::future::block_on(async {
909 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
910 ///
911 /// // Wait until a client can be accepted.
912 /// future::poll_fn(|cx| listener.poll_readable(cx)).await?;
913 /// # std::io::Result::Ok(()) });
914 /// ```
915 pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
916 self.source.poll_readable(cx)
917 }
918
919 /// Polls the I/O handle for writability.
920 ///
921 /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
922 /// indicating writability since the last time this task has called the method and received
923 /// [`Poll::Pending`].
924 ///
925 /// # Caveats
926 ///
927 /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
928 /// will just keep waking each other in turn, thus wasting CPU time.
929 ///
930 /// Note that the [`AsyncWrite`] implementation for [`Async`] also uses this method.
931 ///
932 /// # Examples
933 ///
934 /// ```
935 /// use async_io::Async;
936 /// use futures_lite::future;
937 /// use std::net::{TcpStream, ToSocketAddrs};
938 ///
939 /// # futures_lite::future::block_on(async {
940 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
941 /// let stream = Async::<TcpStream>::connect(addr).await?;
942 ///
943 /// // Wait until the stream is writable.
944 /// future::poll_fn(|cx| stream.poll_writable(cx)).await?;
945 /// # std::io::Result::Ok(()) });
946 /// ```
947 pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
948 self.source.poll_writable(cx)
949 }
950
951 /// Performs a read operation asynchronously.
952 ///
953 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
954 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
955 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
956 /// sends a notification that the I/O handle is readable.
957 ///
958 /// The closure receives a shared reference to the I/O handle.
959 ///
960 /// # Examples
961 ///
962 /// ```no_run
963 /// use async_io::Async;
964 /// use std::net::TcpListener;
965 ///
966 /// # futures_lite::future::block_on(async {
967 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
968 ///
969 /// // Accept a new client asynchronously.
970 /// let (stream, addr) = listener.read_with(|l| l.accept()).await?;
971 /// # std::io::Result::Ok(()) });
972 /// ```
973 pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
974 let mut op = op;
975 loop {
976 match op(self.get_ref()) {
977 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
978 res => return res,
979 }
980 optimistic(self.readable()).await?;
981 }
982 }
983
984 /// Performs a read operation asynchronously.
985 ///
986 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
987 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
988 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
989 /// sends a notification that the I/O handle is readable.
990 ///
991 /// The closure receives a mutable reference to the I/O handle.
992 ///
993 /// # Examples
994 ///
995 /// ```no_run
996 /// use async_io::Async;
997 /// use std::net::TcpListener;
998 ///
999 /// # futures_lite::future::block_on(async {
1000 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1001 ///
1002 /// // Accept a new client asynchronously.
1003 /// let (stream, addr) = listener.read_with_mut(|l| l.accept()).await?;
1004 /// # std::io::Result::Ok(()) });
1005 /// ```
1006 pub async fn read_with_mut<R>(
1007 &mut self,
1008 op: impl FnMut(&mut T) -> io::Result<R>,
1009 ) -> io::Result<R> {
1010 let mut op = op;
1011 loop {
1012 match op(self.get_mut()) {
1013 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1014 res => return res,
1015 }
1016 optimistic(self.readable()).await?;
1017 }
1018 }
1019
1020 /// Performs a write operation asynchronously.
1021 ///
1022 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1023 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1024 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1025 /// sends a notification that the I/O handle is writable.
1026 ///
1027 /// The closure receives a shared reference to the I/O handle.
1028 ///
1029 /// # Examples
1030 ///
1031 /// ```no_run
1032 /// use async_io::Async;
1033 /// use std::net::UdpSocket;
1034 ///
1035 /// # futures_lite::future::block_on(async {
1036 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1037 /// socket.get_ref().connect("127.0.0.1:9000")?;
1038 ///
1039 /// let msg = b"hello";
1040 /// let len = socket.write_with(|s| s.send(msg)).await?;
1041 /// # std::io::Result::Ok(()) });
1042 /// ```
1043 pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
1044 let mut op = op;
1045 loop {
1046 match op(self.get_ref()) {
1047 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1048 res => return res,
1049 }
1050 optimistic(self.writable()).await?;
1051 }
1052 }
1053
1054 /// Performs a write operation asynchronously.
1055 ///
1056 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1057 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1058 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1059 /// sends a notification that the I/O handle is writable.
1060 ///
1061 /// The closure receives a mutable reference to the I/O handle.
1062 ///
1063 /// # Examples
1064 ///
1065 /// ```no_run
1066 /// use async_io::Async;
1067 /// use std::net::UdpSocket;
1068 ///
1069 /// # futures_lite::future::block_on(async {
1070 /// let mut socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1071 /// socket.get_ref().connect("127.0.0.1:9000")?;
1072 ///
1073 /// let msg = b"hello";
1074 /// let len = socket.write_with_mut(|s| s.send(msg)).await?;
1075 /// # std::io::Result::Ok(()) });
1076 /// ```
1077 pub async fn write_with_mut<R>(
1078 &mut self,
1079 op: impl FnMut(&mut T) -> io::Result<R>,
1080 ) -> io::Result<R> {
1081 let mut op = op;
1082 loop {
1083 match op(self.get_mut()) {
1084 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1085 res => return res,
1086 }
1087 optimistic(self.writable()).await?;
1088 }
1089 }
1090}
1091
1092impl<T> AsRef<T> for Async<T> {
1093 fn as_ref(&self) -> &T {
1094 self.get_ref()
1095 }
1096}
1097
1098impl<T> AsMut<T> for Async<T> {
1099 fn as_mut(&mut self) -> &mut T {
1100 self.get_mut()
1101 }
1102}
1103
1104impl<T> Drop for Async<T> {
1105 fn drop(&mut self) {
1106 if self.io.is_some() {
1107 // Deregister and ignore errors because destructors should not panic.
1108 Reactor::get().remove_io(&self.source).ok();
1109
1110 // Drop the I/O handle to close it.
1111 self.io.take();
1112 }
1113 }
1114}
1115
1116impl<T: Read> AsyncRead for Async<T> {
1117 fn poll_read(
1118 mut self: Pin<&mut Self>,
1119 cx: &mut Context<'_>,
1120 buf: &mut [u8],
1121 ) -> Poll<io::Result<usize>> {
1122 loop {
1123 match (*self).get_mut().read(buf) {
1124 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1125 res => return Poll::Ready(res),
1126 }
1127 ready!(self.poll_readable(cx))?;
1128 }
1129 }
1130
1131 fn poll_read_vectored(
1132 mut self: Pin<&mut Self>,
1133 cx: &mut Context<'_>,
1134 bufs: &mut [IoSliceMut<'_>],
1135 ) -> Poll<io::Result<usize>> {
1136 loop {
1137 match (*self).get_mut().read_vectored(bufs) {
1138 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1139 res => return Poll::Ready(res),
1140 }
1141 ready!(self.poll_readable(cx))?;
1142 }
1143 }
1144}
1145
1146impl<T> AsyncRead for &Async<T>
1147where
1148 for<'a> &'a T: Read,
1149{
1150 fn poll_read(
1151 self: Pin<&mut Self>,
1152 cx: &mut Context<'_>,
1153 buf: &mut [u8],
1154 ) -> Poll<io::Result<usize>> {
1155 loop {
1156 match (*self).get_ref().read(buf) {
1157 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1158 res => return Poll::Ready(res),
1159 }
1160 ready!(self.poll_readable(cx))?;
1161 }
1162 }
1163
1164 fn poll_read_vectored(
1165 self: Pin<&mut Self>,
1166 cx: &mut Context<'_>,
1167 bufs: &mut [IoSliceMut<'_>],
1168 ) -> Poll<io::Result<usize>> {
1169 loop {
1170 match (*self).get_ref().read_vectored(bufs) {
1171 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1172 res => return Poll::Ready(res),
1173 }
1174 ready!(self.poll_readable(cx))?;
1175 }
1176 }
1177}
1178
1179impl<T: Write> AsyncWrite for Async<T> {
1180 fn poll_write(
1181 mut self: Pin<&mut Self>,
1182 cx: &mut Context<'_>,
1183 buf: &[u8],
1184 ) -> Poll<io::Result<usize>> {
1185 loop {
1186 match (*self).get_mut().write(buf) {
1187 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1188 res => return Poll::Ready(res),
1189 }
1190 ready!(self.poll_writable(cx))?;
1191 }
1192 }
1193
1194 fn poll_write_vectored(
1195 mut self: Pin<&mut Self>,
1196 cx: &mut Context<'_>,
1197 bufs: &[IoSlice<'_>],
1198 ) -> Poll<io::Result<usize>> {
1199 loop {
1200 match (*self).get_mut().write_vectored(bufs) {
1201 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1202 res => return Poll::Ready(res),
1203 }
1204 ready!(self.poll_writable(cx))?;
1205 }
1206 }
1207
1208 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1209 loop {
1210 match (*self).get_mut().flush() {
1211 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1212 res => return Poll::Ready(res),
1213 }
1214 ready!(self.poll_writable(cx))?;
1215 }
1216 }
1217
1218 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1219 self.poll_flush(cx)
1220 }
1221}
1222
1223impl<T> AsyncWrite for &Async<T>
1224where
1225 for<'a> &'a T: Write,
1226{
1227 fn poll_write(
1228 self: Pin<&mut Self>,
1229 cx: &mut Context<'_>,
1230 buf: &[u8],
1231 ) -> Poll<io::Result<usize>> {
1232 loop {
1233 match (*self).get_ref().write(buf) {
1234 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1235 res => return Poll::Ready(res),
1236 }
1237 ready!(self.poll_writable(cx))?;
1238 }
1239 }
1240
1241 fn poll_write_vectored(
1242 self: Pin<&mut Self>,
1243 cx: &mut Context<'_>,
1244 bufs: &[IoSlice<'_>],
1245 ) -> Poll<io::Result<usize>> {
1246 loop {
1247 match (*self).get_ref().write_vectored(bufs) {
1248 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1249 res => return Poll::Ready(res),
1250 }
1251 ready!(self.poll_writable(cx))?;
1252 }
1253 }
1254
1255 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1256 loop {
1257 match (*self).get_ref().flush() {
1258 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1259 res => return Poll::Ready(res),
1260 }
1261 ready!(self.poll_writable(cx))?;
1262 }
1263 }
1264
1265 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1266 self.poll_flush(cx)
1267 }
1268}
1269
1270impl Async<TcpListener> {
1271 /// Creates a TCP listener bound to the specified address.
1272 ///
1273 /// Binding with port number 0 will request an available port from the OS.
1274 ///
1275 /// # Examples
1276 ///
1277 /// ```
1278 /// use async_io::Async;
1279 /// use std::net::TcpListener;
1280 ///
1281 /// # futures_lite::future::block_on(async {
1282 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1283 /// println!("Listening on {}", listener.get_ref().local_addr()?);
1284 /// # std::io::Result::Ok(()) });
1285 /// ```
1286 pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>> {
1287 let addr = addr.into();
1288 Async::new(TcpListener::bind(addr)?)
1289 }
1290
1291 /// Accepts a new incoming TCP connection.
1292 ///
1293 /// When a connection is established, it will be returned as a TCP stream together with its
1294 /// remote address.
1295 ///
1296 /// # Examples
1297 ///
1298 /// ```no_run
1299 /// use async_io::Async;
1300 /// use std::net::TcpListener;
1301 ///
1302 /// # futures_lite::future::block_on(async {
1303 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1304 /// let (stream, addr) = listener.accept().await?;
1305 /// println!("Accepted client: {}", addr);
1306 /// # std::io::Result::Ok(()) });
1307 /// ```
1308 pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
1309 let (stream, addr) = self.read_with(|io| io.accept()).await?;
1310 Ok((Async::new(stream)?, addr))
1311 }
1312
1313 /// Returns a stream of incoming TCP connections.
1314 ///
1315 /// The stream is infinite, i.e. it never stops with a [`None`].
1316 ///
1317 /// # Examples
1318 ///
1319 /// ```no_run
1320 /// use async_io::Async;
1321 /// use futures_lite::{pin, stream::StreamExt};
1322 /// use std::net::TcpListener;
1323 ///
1324 /// # futures_lite::future::block_on(async {
1325 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1326 /// let incoming = listener.incoming();
1327 /// pin!(incoming);
1328 ///
1329 /// while let Some(stream) = incoming.next().await {
1330 /// let stream = stream?;
1331 /// println!("Accepted client: {}", stream.get_ref().peer_addr()?);
1332 /// }
1333 /// # std::io::Result::Ok(()) });
1334 /// ```
1335 pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_ {
1336 stream::unfold(self, |listener| async move {
1337 let res = listener.accept().await.map(|(stream, _)| stream);
1338 Some((res, listener))
1339 })
1340 }
1341}
1342
1343impl TryFrom<std::net::TcpListener> for Async<std::net::TcpListener> {
1344 type Error = io::Error;
1345
1346 fn try_from(listener: std::net::TcpListener) -> io::Result<Self> {
1347 Async::new(io:listener)
1348 }
1349}
1350
1351impl Async<TcpStream> {
1352 /// Creates a TCP connection to the specified address.
1353 ///
1354 /// # Examples
1355 ///
1356 /// ```
1357 /// use async_io::Async;
1358 /// use std::net::{TcpStream, ToSocketAddrs};
1359 ///
1360 /// # futures_lite::future::block_on(async {
1361 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1362 /// let stream = Async::<TcpStream>::connect(addr).await?;
1363 /// # std::io::Result::Ok(()) });
1364 /// ```
1365 pub async fn connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpStream>> {
1366 // Begin async connect.
1367 let addr = addr.into();
1368 let domain = Domain::for_address(addr);
1369 let socket = connect(addr.into(), domain, Some(Protocol::TCP))?;
1370 let stream = Async::new(TcpStream::from(socket))?;
1371
1372 // The stream becomes writable when connected.
1373 stream.writable().await?;
1374
1375 // Check if there was an error while connecting.
1376 match stream.get_ref().take_error()? {
1377 None => Ok(stream),
1378 Some(err) => Err(err),
1379 }
1380 }
1381
1382 /// Reads data from the stream without removing it from the buffer.
1383 ///
1384 /// Returns the number of bytes read. Successive calls of this method read the same data.
1385 ///
1386 /// # Examples
1387 ///
1388 /// ```
1389 /// use async_io::Async;
1390 /// use futures_lite::{io::AsyncWriteExt, stream::StreamExt};
1391 /// use std::net::{TcpStream, ToSocketAddrs};
1392 ///
1393 /// # futures_lite::future::block_on(async {
1394 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1395 /// let mut stream = Async::<TcpStream>::connect(addr).await?;
1396 ///
1397 /// stream
1398 /// .write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
1399 /// .await?;
1400 ///
1401 /// let mut buf = [0u8; 1024];
1402 /// let len = stream.peek(&mut buf).await?;
1403 /// # std::io::Result::Ok(()) });
1404 /// ```
1405 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1406 self.read_with(|io| io.peek(buf)).await
1407 }
1408}
1409
1410impl TryFrom<std::net::TcpStream> for Async<std::net::TcpStream> {
1411 type Error = io::Error;
1412
1413 fn try_from(stream: std::net::TcpStream) -> io::Result<Self> {
1414 Async::new(io:stream)
1415 }
1416}
1417
1418impl Async<UdpSocket> {
1419 /// Creates a UDP socket bound to the specified address.
1420 ///
1421 /// Binding with port number 0 will request an available port from the OS.
1422 ///
1423 /// # Examples
1424 ///
1425 /// ```
1426 /// use async_io::Async;
1427 /// use std::net::UdpSocket;
1428 ///
1429 /// # futures_lite::future::block_on(async {
1430 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1431 /// println!("Bound to {}", socket.get_ref().local_addr()?);
1432 /// # std::io::Result::Ok(()) });
1433 /// ```
1434 pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>> {
1435 let addr = addr.into();
1436 Async::new(UdpSocket::bind(addr)?)
1437 }
1438
1439 /// Receives a single datagram message.
1440 ///
1441 /// Returns the number of bytes read and the address the message came from.
1442 ///
1443 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1444 /// If the message is too long to fit, excess bytes may get discarded.
1445 ///
1446 /// # Examples
1447 ///
1448 /// ```no_run
1449 /// use async_io::Async;
1450 /// use std::net::UdpSocket;
1451 ///
1452 /// # futures_lite::future::block_on(async {
1453 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1454 ///
1455 /// let mut buf = [0u8; 1024];
1456 /// let (len, addr) = socket.recv_from(&mut buf).await?;
1457 /// # std::io::Result::Ok(()) });
1458 /// ```
1459 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1460 self.read_with(|io| io.recv_from(buf)).await
1461 }
1462
1463 /// Receives a single datagram message without removing it from the queue.
1464 ///
1465 /// Returns the number of bytes read and the address the message came from.
1466 ///
1467 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1468 /// If the message is too long to fit, excess bytes may get discarded.
1469 ///
1470 /// # Examples
1471 ///
1472 /// ```no_run
1473 /// use async_io::Async;
1474 /// use std::net::UdpSocket;
1475 ///
1476 /// # futures_lite::future::block_on(async {
1477 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1478 ///
1479 /// let mut buf = [0u8; 1024];
1480 /// let (len, addr) = socket.peek_from(&mut buf).await?;
1481 /// # std::io::Result::Ok(()) });
1482 /// ```
1483 pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1484 self.read_with(|io| io.peek_from(buf)).await
1485 }
1486
1487 /// Sends data to the specified address.
1488 ///
1489 /// Returns the number of bytes writen.
1490 ///
1491 /// # Examples
1492 ///
1493 /// ```no_run
1494 /// use async_io::Async;
1495 /// use std::net::UdpSocket;
1496 ///
1497 /// # futures_lite::future::block_on(async {
1498 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1499 /// let addr = socket.get_ref().local_addr()?;
1500 ///
1501 /// let msg = b"hello";
1502 /// let len = socket.send_to(msg, addr).await?;
1503 /// # std::io::Result::Ok(()) });
1504 /// ```
1505 pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
1506 let addr = addr.into();
1507 self.write_with(|io| io.send_to(buf, addr)).await
1508 }
1509
1510 /// Receives a single datagram message from the connected peer.
1511 ///
1512 /// Returns the number of bytes read.
1513 ///
1514 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1515 /// If the message is too long to fit, excess bytes may get discarded.
1516 ///
1517 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1518 /// This method will fail if the socket is not connected.
1519 ///
1520 /// # Examples
1521 ///
1522 /// ```no_run
1523 /// use async_io::Async;
1524 /// use std::net::UdpSocket;
1525 ///
1526 /// # futures_lite::future::block_on(async {
1527 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1528 /// socket.get_ref().connect("127.0.0.1:9000")?;
1529 ///
1530 /// let mut buf = [0u8; 1024];
1531 /// let len = socket.recv(&mut buf).await?;
1532 /// # std::io::Result::Ok(()) });
1533 /// ```
1534 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
1535 self.read_with(|io| io.recv(buf)).await
1536 }
1537
1538 /// Receives a single datagram message from the connected peer without removing it from the
1539 /// queue.
1540 ///
1541 /// Returns the number of bytes read and the address the message came from.
1542 ///
1543 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1544 /// If the message is too long to fit, excess bytes may get discarded.
1545 ///
1546 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1547 /// This method will fail if the socket is not connected.
1548 ///
1549 /// # Examples
1550 ///
1551 /// ```no_run
1552 /// use async_io::Async;
1553 /// use std::net::UdpSocket;
1554 ///
1555 /// # futures_lite::future::block_on(async {
1556 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1557 /// socket.get_ref().connect("127.0.0.1:9000")?;
1558 ///
1559 /// let mut buf = [0u8; 1024];
1560 /// let len = socket.peek(&mut buf).await?;
1561 /// # std::io::Result::Ok(()) });
1562 /// ```
1563 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1564 self.read_with(|io| io.peek(buf)).await
1565 }
1566
1567 /// Sends data to the connected peer.
1568 ///
1569 /// Returns the number of bytes written.
1570 ///
1571 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1572 /// This method will fail if the socket is not connected.
1573 ///
1574 /// # Examples
1575 ///
1576 /// ```no_run
1577 /// use async_io::Async;
1578 /// use std::net::UdpSocket;
1579 ///
1580 /// # futures_lite::future::block_on(async {
1581 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1582 /// socket.get_ref().connect("127.0.0.1:9000")?;
1583 ///
1584 /// let msg = b"hello";
1585 /// let len = socket.send(msg).await?;
1586 /// # std::io::Result::Ok(()) });
1587 /// ```
1588 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
1589 self.write_with(|io| io.send(buf)).await
1590 }
1591}
1592
1593impl TryFrom<std::net::UdpSocket> for Async<std::net::UdpSocket> {
1594 type Error = io::Error;
1595
1596 fn try_from(socket: std::net::UdpSocket) -> io::Result<Self> {
1597 Async::new(io:socket)
1598 }
1599}
1600
1601#[cfg(unix)]
1602impl Async<UnixListener> {
1603 /// Creates a UDS listener bound to the specified path.
1604 ///
1605 /// # Examples
1606 ///
1607 /// ```no_run
1608 /// use async_io::Async;
1609 /// use std::os::unix::net::UnixListener;
1610 ///
1611 /// # futures_lite::future::block_on(async {
1612 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1613 /// println!("Listening on {:?}", listener.get_ref().local_addr()?);
1614 /// # std::io::Result::Ok(()) });
1615 /// ```
1616 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
1617 let path = path.as_ref().to_owned();
1618 Async::new(UnixListener::bind(path)?)
1619 }
1620
1621 /// Accepts a new incoming UDS stream connection.
1622 ///
1623 /// When a connection is established, it will be returned as a stream together with its remote
1624 /// address.
1625 ///
1626 /// # Examples
1627 ///
1628 /// ```no_run
1629 /// use async_io::Async;
1630 /// use std::os::unix::net::UnixListener;
1631 ///
1632 /// # futures_lite::future::block_on(async {
1633 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1634 /// let (stream, addr) = listener.accept().await?;
1635 /// println!("Accepted client: {:?}", addr);
1636 /// # std::io::Result::Ok(()) });
1637 /// ```
1638 pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
1639 let (stream, addr) = self.read_with(|io| io.accept()).await?;
1640 Ok((Async::new(stream)?, addr))
1641 }
1642
1643 /// Returns a stream of incoming UDS connections.
1644 ///
1645 /// The stream is infinite, i.e. it never stops with a [`None`] item.
1646 ///
1647 /// # Examples
1648 ///
1649 /// ```no_run
1650 /// use async_io::Async;
1651 /// use futures_lite::{pin, stream::StreamExt};
1652 /// use std::os::unix::net::UnixListener;
1653 ///
1654 /// # futures_lite::future::block_on(async {
1655 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1656 /// let incoming = listener.incoming();
1657 /// pin!(incoming);
1658 ///
1659 /// while let Some(stream) = incoming.next().await {
1660 /// let stream = stream?;
1661 /// println!("Accepted client: {:?}", stream.get_ref().peer_addr()?);
1662 /// }
1663 /// # std::io::Result::Ok(()) });
1664 /// ```
1665 pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_ {
1666 stream::unfold(self, |listener| async move {
1667 let res = listener.accept().await.map(|(stream, _)| stream);
1668 Some((res, listener))
1669 })
1670 }
1671}
1672
1673#[cfg(unix)]
1674impl TryFrom<std::os::unix::net::UnixListener> for Async<std::os::unix::net::UnixListener> {
1675 type Error = io::Error;
1676
1677 fn try_from(listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
1678 Async::new(io:listener)
1679 }
1680}
1681
1682#[cfg(unix)]
1683impl Async<UnixStream> {
1684 /// Creates a UDS stream connected to the specified path.
1685 ///
1686 /// # Examples
1687 ///
1688 /// ```no_run
1689 /// use async_io::Async;
1690 /// use std::os::unix::net::UnixStream;
1691 ///
1692 /// # futures_lite::future::block_on(async {
1693 /// let stream = Async::<UnixStream>::connect("/tmp/socket").await?;
1694 /// # std::io::Result::Ok(()) });
1695 /// ```
1696 pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
1697 // Begin async connect.
1698 let socket = connect(SockAddr::unix(path)?, Domain::UNIX, None)?;
1699 let stream = Async::new(UnixStream::from(socket))?;
1700
1701 // The stream becomes writable when connected.
1702 stream.writable().await?;
1703
1704 // On Linux, it appears the socket may become writable even when connecting fails, so we
1705 // must do an extra check here and see if the peer address is retrievable.
1706 stream.get_ref().peer_addr()?;
1707 Ok(stream)
1708 }
1709
1710 /// Creates an unnamed pair of connected UDS stream sockets.
1711 ///
1712 /// # Examples
1713 ///
1714 /// ```no_run
1715 /// use async_io::Async;
1716 /// use std::os::unix::net::UnixStream;
1717 ///
1718 /// # futures_lite::future::block_on(async {
1719 /// let (stream1, stream2) = Async::<UnixStream>::pair()?;
1720 /// # std::io::Result::Ok(()) });
1721 /// ```
1722 pub fn pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)> {
1723 let (stream1, stream2) = UnixStream::pair()?;
1724 Ok((Async::new(stream1)?, Async::new(stream2)?))
1725 }
1726}
1727
1728#[cfg(unix)]
1729impl TryFrom<std::os::unix::net::UnixStream> for Async<std::os::unix::net::UnixStream> {
1730 type Error = io::Error;
1731
1732 fn try_from(stream: std::os::unix::net::UnixStream) -> io::Result<Self> {
1733 Async::new(io:stream)
1734 }
1735}
1736
1737#[cfg(unix)]
1738impl Async<UnixDatagram> {
1739 /// Creates a UDS datagram socket bound to the specified path.
1740 ///
1741 /// # Examples
1742 ///
1743 /// ```no_run
1744 /// use async_io::Async;
1745 /// use std::os::unix::net::UnixDatagram;
1746 ///
1747 /// # futures_lite::future::block_on(async {
1748 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1749 /// # std::io::Result::Ok(()) });
1750 /// ```
1751 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
1752 let path = path.as_ref().to_owned();
1753 Async::new(UnixDatagram::bind(path)?)
1754 }
1755
1756 /// Creates a UDS datagram socket not bound to any address.
1757 ///
1758 /// # Examples
1759 ///
1760 /// ```no_run
1761 /// use async_io::Async;
1762 /// use std::os::unix::net::UnixDatagram;
1763 ///
1764 /// # futures_lite::future::block_on(async {
1765 /// let socket = Async::<UnixDatagram>::unbound()?;
1766 /// # std::io::Result::Ok(()) });
1767 /// ```
1768 pub fn unbound() -> io::Result<Async<UnixDatagram>> {
1769 Async::new(UnixDatagram::unbound()?)
1770 }
1771
1772 /// Creates an unnamed pair of connected Unix datagram sockets.
1773 ///
1774 /// # Examples
1775 ///
1776 /// ```no_run
1777 /// use async_io::Async;
1778 /// use std::os::unix::net::UnixDatagram;
1779 ///
1780 /// # futures_lite::future::block_on(async {
1781 /// let (socket1, socket2) = Async::<UnixDatagram>::pair()?;
1782 /// # std::io::Result::Ok(()) });
1783 /// ```
1784 pub fn pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)> {
1785 let (socket1, socket2) = UnixDatagram::pair()?;
1786 Ok((Async::new(socket1)?, Async::new(socket2)?))
1787 }
1788
1789 /// Receives data from the socket.
1790 ///
1791 /// Returns the number of bytes read and the address the message came from.
1792 ///
1793 /// # Examples
1794 ///
1795 /// ```no_run
1796 /// use async_io::Async;
1797 /// use std::os::unix::net::UnixDatagram;
1798 ///
1799 /// # futures_lite::future::block_on(async {
1800 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1801 ///
1802 /// let mut buf = [0u8; 1024];
1803 /// let (len, addr) = socket.recv_from(&mut buf).await?;
1804 /// # std::io::Result::Ok(()) });
1805 /// ```
1806 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> {
1807 self.read_with(|io| io.recv_from(buf)).await
1808 }
1809
1810 /// Sends data to the specified address.
1811 ///
1812 /// Returns the number of bytes written.
1813 ///
1814 /// # Examples
1815 ///
1816 /// ```no_run
1817 /// use async_io::Async;
1818 /// use std::os::unix::net::UnixDatagram;
1819 ///
1820 /// # futures_lite::future::block_on(async {
1821 /// let socket = Async::<UnixDatagram>::unbound()?;
1822 ///
1823 /// let msg = b"hello";
1824 /// let addr = "/tmp/socket";
1825 /// let len = socket.send_to(msg, addr).await?;
1826 /// # std::io::Result::Ok(()) });
1827 /// ```
1828 pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
1829 self.write_with(|io| io.send_to(buf, &path)).await
1830 }
1831
1832 /// Receives data from the connected peer.
1833 ///
1834 /// Returns the number of bytes read and the address the message came from.
1835 ///
1836 /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
1837 /// This method will fail if the socket is not connected.
1838 ///
1839 /// # Examples
1840 ///
1841 /// ```no_run
1842 /// use async_io::Async;
1843 /// use std::os::unix::net::UnixDatagram;
1844 ///
1845 /// # futures_lite::future::block_on(async {
1846 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
1847 /// socket.get_ref().connect("/tmp/socket2")?;
1848 ///
1849 /// let mut buf = [0u8; 1024];
1850 /// let len = socket.recv(&mut buf).await?;
1851 /// # std::io::Result::Ok(()) });
1852 /// ```
1853 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
1854 self.read_with(|io| io.recv(buf)).await
1855 }
1856
1857 /// Sends data to the connected peer.
1858 ///
1859 /// Returns the number of bytes written.
1860 ///
1861 /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
1862 /// This method will fail if the socket is not connected.
1863 ///
1864 /// # Examples
1865 ///
1866 /// ```no_run
1867 /// use async_io::Async;
1868 /// use std::os::unix::net::UnixDatagram;
1869 ///
1870 /// # futures_lite::future::block_on(async {
1871 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
1872 /// socket.get_ref().connect("/tmp/socket2")?;
1873 ///
1874 /// let msg = b"hello";
1875 /// let len = socket.send(msg).await?;
1876 /// # std::io::Result::Ok(()) });
1877 /// ```
1878 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
1879 self.write_with(|io| io.send(buf)).await
1880 }
1881}
1882
1883#[cfg(unix)]
1884impl TryFrom<std::os::unix::net::UnixDatagram> for Async<std::os::unix::net::UnixDatagram> {
1885 type Error = io::Error;
1886
1887 fn try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<Self> {
1888 Async::new(io:socket)
1889 }
1890}
1891
1892/// Polls a future once, waits for a wakeup, and then optimistically assumes the future is ready.
1893async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
1894 let mut polled: bool = false;
1895 pin!(fut);
1896
1897 futurePollFn) -> …>::poll_fn(|cx: &mut Context<'_>| {
1898 if !polled {
1899 polled = true;
1900 fut.as_mut().poll(cx)
1901 } else {
1902 Poll::Ready(Ok(()))
1903 }
1904 })
1905 .await
1906}
1907
1908fn connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Result<Socket> {
1909 let sock_type = Type::STREAM;
1910 #[cfg(any(
1911 target_os = "android",
1912 target_os = "dragonfly",
1913 target_os = "freebsd",
1914 target_os = "fuchsia",
1915 target_os = "illumos",
1916 target_os = "linux",
1917 target_os = "netbsd",
1918 target_os = "openbsd"
1919 ))]
1920 // If we can, set nonblocking at socket creation for unix
1921 let sock_type = sock_type.nonblocking();
1922 // This automatically handles cloexec on unix, no_inherit on windows and nosigpipe on macos
1923 let socket = Socket::new(domain, sock_type, protocol)?;
1924 #[cfg(not(any(
1925 target_os = "android",
1926 target_os = "dragonfly",
1927 target_os = "freebsd",
1928 target_os = "fuchsia",
1929 target_os = "illumos",
1930 target_os = "linux",
1931 target_os = "netbsd",
1932 target_os = "openbsd"
1933 )))]
1934 // If the current platform doesn't support nonblocking at creation, enable it after creation
1935 socket.set_nonblocking(true)?;
1936 match socket.connect(&addr) {
1937 Ok(_) => {}
1938 #[cfg(unix)]
1939 Err(err) if err.raw_os_error() == Some(rustix::io::Errno::INPROGRESS.raw_os_error()) => {}
1940 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1941 Err(err) => return Err(err),
1942 }
1943 Ok(socket)
1944}
1945