1//! Async I/O and timers.
2//!
3//! This crate provides two tools:
4//!
5//! * [`Async`], an adapter for standard networking types (and [many other] types) to use in
6//! async programs.
7//! * [`Timer`], a future or stream that emits timed events.
8//!
9//! For concrete async networking types built on top of this crate, see [`async-net`].
10//!
11//! [many other]: https://github.com/smol-rs/async-io/tree/master/examples
12//! [`async-net`]: https://docs.rs/async-net
13//!
14//! # Implementation
15//!
16//! The first time [`Async`] or [`Timer`] is used, a thread named "async-io" will be spawned.
17//! The purpose of this thread is to wait for I/O events reported by the operating system, and then
18//! wake appropriate futures blocked on I/O or timers when they can be resumed.
19//!
20//! To wait for the next I/O event, the "async-io" thread uses [epoll] on Linux/Android/illumos,
21//! [kqueue] on macOS/iOS/BSD, [event ports] on illumos/Solaris, and [IOCP] on Windows. That
22//! functionality is provided by the [`polling`] crate.
23//!
24//! However, note that you can also process I/O events and wake futures on any thread using the
25//! [`block_on()`] function. The "async-io" thread is therefore just a fallback mechanism
26//! processing I/O events in case no other threads are.
27//!
28//! [epoll]: https://en.wikipedia.org/wiki/Epoll
29//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
30//! [event ports]: https://illumos.org/man/port_create
31//! [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
32//! [`polling`]: https://docs.rs/polling
33//!
34//! # Examples
35//!
36//! Connect to `example.com:80`, or time out after 10 seconds.
37//!
38//! ```
39//! use async_io::{Async, Timer};
40//! use futures_lite::{future::FutureExt, io};
41//!
42//! use std::net::{TcpStream, ToSocketAddrs};
43//! use std::time::Duration;
44//!
45//! # futures_lite::future::block_on(async {
46//! let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
47//!
48//! let stream = Async::<TcpStream>::connect(addr).or(async {
49//! Timer::after(Duration::from_secs(10)).await;
50//! Err(io::ErrorKind::TimedOut.into())
51//! })
52//! .await?;
53//! # std::io::Result::Ok(()) });
54//! ```
55
56#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
57#![doc(
58 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
59)]
60#![doc(
61 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
62)]
63
64use std::future::Future;
65use std::io::{self, IoSlice, IoSliceMut, Read, Write};
66use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
67use std::pin::Pin;
68use std::sync::Arc;
69use std::task::{Context, Poll, Waker};
70use std::time::{Duration, Instant};
71
72#[cfg(unix)]
73use std::{
74 os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd},
75 os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
76 path::Path,
77};
78
79#[cfg(windows)]
80use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket};
81
82use futures_io::{AsyncRead, AsyncWrite};
83use futures_lite::stream::{self, Stream};
84use futures_lite::{future, pin, ready};
85
86use rustix::io as rio;
87use rustix::net as rn;
88
89use crate::reactor::{Reactor, Registration, Source};
90
91mod driver;
92mod reactor;
93
94pub mod os;
95
96pub use driver::block_on;
97pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};
98
99/// A future or stream that emits timed events.
100///
101/// Timers are futures that output a single [`Instant`] when they fire.
102///
103/// Timers are also streams that can output [`Instant`]s periodically.
104///
105/// # Precision
106///
107/// There is a limit on the maximum precision that a `Timer` can provide. This limit is
108/// dependent on the current platform; for instance, on Windows, the maximum precision is
109/// about 16 milliseconds. Because of this limit, the timer may sleep for longer than the
110/// requested duration. It will never sleep for less.
111///
112/// # Examples
113///
114/// Sleep for 1 second:
115///
116/// ```
117/// use async_io::Timer;
118/// use std::time::Duration;
119///
120/// # futures_lite::future::block_on(async {
121/// Timer::after(Duration::from_secs(1)).await;
122/// # });
123/// ```
124///
125/// Timeout after 1 second:
126///
127/// ```
128/// use async_io::Timer;
129/// use futures_lite::FutureExt;
130/// use std::time::Duration;
131///
132/// # futures_lite::future::block_on(async {
133/// let addrs = async_net::resolve("google.com:80")
134/// .or(async {
135/// Timer::after(Duration::from_secs(1)).await;
136/// Err(std::io::ErrorKind::TimedOut.into())
137/// })
138/// .await?;
139/// # std::io::Result::Ok(()) });
140/// ```
141#[derive(Debug)]
142pub struct Timer {
143 /// This timer's ID and last waker that polled it.
144 ///
145 /// When this field is set to `None`, this timer is not registered in the reactor.
146 id_and_waker: Option<(usize, Waker)>,
147
148 /// The next instant at which this timer fires.
149 ///
150 /// If this timer is a blank timer, this value is None. If the timer
151 /// must be set, this value contains the next instant at which the
152 /// timer must fire.
153 when: Option<Instant>,
154
155 /// The period.
156 period: Duration,
157}
158
159impl Timer {
160 /// Creates a timer that will never fire.
161 ///
162 /// # Examples
163 ///
164 /// This function may also be useful for creating a function with an optional timeout.
165 ///
166 /// ```
167 /// # futures_lite::future::block_on(async {
168 /// use async_io::Timer;
169 /// use futures_lite::prelude::*;
170 /// use std::time::Duration;
171 ///
172 /// async fn run_with_timeout(timeout: Option<Duration>) {
173 /// let timer = timeout
174 /// .map(|timeout| Timer::after(timeout))
175 /// .unwrap_or_else(Timer::never);
176 ///
177 /// run_lengthy_operation().or(timer).await;
178 /// }
179 /// # // Note that since a Timer as a Future returns an Instant,
180 /// # // this function needs to return an Instant to be used
181 /// # // in "or".
182 /// # async fn run_lengthy_operation() -> std::time::Instant {
183 /// # std::time::Instant::now()
184 /// # }
185 ///
186 /// // Times out after 5 seconds.
187 /// run_with_timeout(Some(Duration::from_secs(5))).await;
188 /// // Does not time out.
189 /// run_with_timeout(None).await;
190 /// # });
191 /// ```
192 pub fn never() -> Timer {
193 Timer {
194 id_and_waker: None,
195 when: None,
196 period: Duration::MAX,
197 }
198 }
199
200 /// Creates a timer that emits an event once after the given duration of time.
201 ///
202 /// # Examples
203 ///
204 /// ```
205 /// use async_io::Timer;
206 /// use std::time::Duration;
207 ///
208 /// # futures_lite::future::block_on(async {
209 /// Timer::after(Duration::from_secs(1)).await;
210 /// # });
211 /// ```
212 pub fn after(duration: Duration) -> Timer {
213 Instant::now()
214 .checked_add(duration)
215 .map_or_else(Timer::never, Timer::at)
216 }
217
218 /// Creates a timer that emits an event once at the given time instant.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// use async_io::Timer;
224 /// use std::time::{Duration, Instant};
225 ///
226 /// # futures_lite::future::block_on(async {
227 /// let now = Instant::now();
228 /// let when = now + Duration::from_secs(1);
229 /// Timer::at(when).await;
230 /// # });
231 /// ```
232 pub fn at(instant: Instant) -> Timer {
233 Timer::interval_at(instant, Duration::MAX)
234 }
235
236 /// Creates a timer that emits events periodically.
237 ///
238 /// # Examples
239 ///
240 /// ```
241 /// use async_io::Timer;
242 /// use futures_lite::StreamExt;
243 /// use std::time::{Duration, Instant};
244 ///
245 /// # futures_lite::future::block_on(async {
246 /// let period = Duration::from_secs(1);
247 /// Timer::interval(period).next().await;
248 /// # });
249 /// ```
250 pub fn interval(period: Duration) -> Timer {
251 Instant::now()
252 .checked_add(period)
253 .map_or_else(Timer::never, |at| Timer::interval_at(at, period))
254 }
255
256 /// Creates a timer that emits events periodically, starting at `start`.
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// use async_io::Timer;
262 /// use futures_lite::StreamExt;
263 /// use std::time::{Duration, Instant};
264 ///
265 /// # futures_lite::future::block_on(async {
266 /// let start = Instant::now();
267 /// let period = Duration::from_secs(1);
268 /// Timer::interval_at(start, period).next().await;
269 /// # });
270 /// ```
271 pub fn interval_at(start: Instant, period: Duration) -> Timer {
272 Timer {
273 id_and_waker: None,
274 when: Some(start),
275 period,
276 }
277 }
278
279 /// Indicates whether or not this timer will ever fire.
280 ///
281 /// [`never()`] will never fire, and timers created with [`after()`] or [`at()`] will fire
282 /// if the duration is not too large.
283 ///
284 /// [`never()`]: Timer::never()
285 /// [`after()`]: Timer::after()
286 /// [`at()`]: Timer::at()
287 ///
288 /// # Examples
289 ///
290 /// ```
291 /// # futures_lite::future::block_on(async {
292 /// use async_io::Timer;
293 /// use futures_lite::prelude::*;
294 /// use std::time::Duration;
295 ///
296 /// // `never` will never fire.
297 /// assert!(!Timer::never().will_fire());
298 ///
299 /// // `after` will fire if the duration is not too large.
300 /// assert!(Timer::after(Duration::from_secs(1)).will_fire());
301 /// assert!(!Timer::after(Duration::MAX).will_fire());
302 ///
303 /// // However, once an `after` timer has fired, it will never fire again.
304 /// let mut t = Timer::after(Duration::from_secs(1));
305 /// assert!(t.will_fire());
306 /// (&mut t).await;
307 /// assert!(!t.will_fire());
308 ///
309 /// // Interval timers will fire periodically.
310 /// let mut t = Timer::interval(Duration::from_secs(1));
311 /// assert!(t.will_fire());
312 /// t.next().await;
313 /// assert!(t.will_fire());
314 /// # });
315 /// ```
316 #[inline]
317 pub fn will_fire(&self) -> bool {
318 self.when.is_some()
319 }
320
321 /// Sets the timer to emit an en event once after the given duration of time.
322 ///
323 /// Note that resetting a timer is different from creating a new timer because
324 /// [`set_after()`][`Timer::set_after()`] does not remove the waker associated with the task
325 /// that is polling the timer.
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use async_io::Timer;
331 /// use std::time::Duration;
332 ///
333 /// # futures_lite::future::block_on(async {
334 /// let mut t = Timer::after(Duration::from_secs(1));
335 /// t.set_after(Duration::from_millis(100));
336 /// # });
337 /// ```
338 pub fn set_after(&mut self, duration: Duration) {
339 match Instant::now().checked_add(duration) {
340 Some(instant) => self.set_at(instant),
341 None => {
342 // Overflow to never going off.
343 self.clear();
344 self.when = None;
345 }
346 }
347 }
348
349 /// Sets the timer to emit an event once at the given time instant.
350 ///
351 /// Note that resetting a timer is different from creating a new timer because
352 /// [`set_at()`][`Timer::set_at()`] does not remove the waker associated with the task
353 /// that is polling the timer.
354 ///
355 /// # Examples
356 ///
357 /// ```
358 /// use async_io::Timer;
359 /// use std::time::{Duration, Instant};
360 ///
361 /// # futures_lite::future::block_on(async {
362 /// let mut t = Timer::after(Duration::from_secs(1));
363 ///
364 /// let now = Instant::now();
365 /// let when = now + Duration::from_secs(1);
366 /// t.set_at(when);
367 /// # });
368 /// ```
369 pub fn set_at(&mut self, instant: Instant) {
370 self.clear();
371
372 // Update the timeout.
373 self.when = Some(instant);
374
375 if let Some((id, waker)) = self.id_and_waker.as_mut() {
376 // Re-register the timer with the new timeout.
377 *id = Reactor::get().insert_timer(instant, waker);
378 }
379 }
380
381 /// Sets the timer to emit events periodically.
382 ///
383 /// Note that resetting a timer is different from creating a new timer because
384 /// [`set_interval()`][`Timer::set_interval()`] does not remove the waker associated with the
385 /// task that is polling the timer.
386 ///
387 /// # Examples
388 ///
389 /// ```
390 /// use async_io::Timer;
391 /// use futures_lite::StreamExt;
392 /// use std::time::{Duration, Instant};
393 ///
394 /// # futures_lite::future::block_on(async {
395 /// let mut t = Timer::after(Duration::from_secs(1));
396 ///
397 /// let period = Duration::from_secs(2);
398 /// t.set_interval(period);
399 /// # });
400 /// ```
401 pub fn set_interval(&mut self, period: Duration) {
402 match Instant::now().checked_add(period) {
403 Some(instant) => self.set_interval_at(instant, period),
404 None => {
405 // Overflow to never going off.
406 self.clear();
407 self.when = None;
408 }
409 }
410 }
411
412 /// Sets the timer to emit events periodically, starting at `start`.
413 ///
414 /// Note that resetting a timer is different from creating a new timer because
415 /// [`set_interval_at()`][`Timer::set_interval_at()`] does not remove the waker associated with
416 /// the task that is polling the timer.
417 ///
418 /// # Examples
419 ///
420 /// ```
421 /// use async_io::Timer;
422 /// use futures_lite::StreamExt;
423 /// use std::time::{Duration, Instant};
424 ///
425 /// # futures_lite::future::block_on(async {
426 /// let mut t = Timer::after(Duration::from_secs(1));
427 ///
428 /// let start = Instant::now();
429 /// let period = Duration::from_secs(2);
430 /// t.set_interval_at(start, period);
431 /// # });
432 /// ```
433 pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
434 self.clear();
435
436 self.when = Some(start);
437 self.period = period;
438
439 if let Some((id, waker)) = self.id_and_waker.as_mut() {
440 // Re-register the timer with the new timeout.
441 *id = Reactor::get().insert_timer(start, waker);
442 }
443 }
444
445 /// Helper function to clear the current timer.
446 fn clear(&mut self) {
447 if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) {
448 // Deregister the timer from the reactor.
449 Reactor::get().remove_timer(when, *id);
450 }
451 }
452}
453
454impl Drop for Timer {
455 fn drop(&mut self) {
456 if let (Some(when: Instant), Some((id: usize, _))) = (self.when, self.id_and_waker.take()) {
457 // Deregister the timer from the reactor.
458 Reactor::get().remove_timer(when, id);
459 }
460 }
461}
462
463impl Future for Timer {
464 type Output = Instant;
465
466 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
467 match self.poll_next(cx) {
468 Poll::Ready(Some(when: Instant)) => Poll::Ready(when),
469 Poll::Pending => Poll::Pending,
470 Poll::Ready(None) => unreachable!(),
471 }
472 }
473}
474
475impl Stream for Timer {
476 type Item = Instant;
477
478 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
479 let this = self.get_mut();
480
481 if let Some(ref mut when) = this.when {
482 // Check if the timer has already fired.
483 if Instant::now() >= *when {
484 if let Some((id, _)) = this.id_and_waker.take() {
485 // Deregister the timer from the reactor.
486 Reactor::get().remove_timer(*when, id);
487 }
488 let result_time = *when;
489 if let Some(next) = (*when).checked_add(this.period) {
490 *when = next;
491 // Register the timer in the reactor.
492 let id = Reactor::get().insert_timer(next, cx.waker());
493 this.id_and_waker = Some((id, cx.waker().clone()));
494 } else {
495 this.when = None;
496 }
497 return Poll::Ready(Some(result_time));
498 } else {
499 match &this.id_and_waker {
500 None => {
501 // Register the timer in the reactor.
502 let id = Reactor::get().insert_timer(*when, cx.waker());
503 this.id_and_waker = Some((id, cx.waker().clone()));
504 }
505 Some((id, w)) if !w.will_wake(cx.waker()) => {
506 // Deregister the timer from the reactor to remove the old waker.
507 Reactor::get().remove_timer(*when, *id);
508
509 // Register the timer in the reactor with the new waker.
510 let id = Reactor::get().insert_timer(*when, cx.waker());
511 this.id_and_waker = Some((id, cx.waker().clone()));
512 }
513 Some(_) => {}
514 }
515 }
516 }
517
518 Poll::Pending
519 }
520}
521
522/// Async adapter for I/O types.
523///
524/// This type puts an I/O handle into non-blocking mode, registers it in
525/// [epoll]/[kqueue]/[event ports]/[IOCP], and then provides an async interface for it.
526///
527/// [epoll]: https://en.wikipedia.org/wiki/Epoll
528/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
529/// [event ports]: https://illumos.org/man/port_create
530/// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
531///
532/// # Caveats
533///
534/// [`Async`] is a low-level primitive, and as such it comes with some caveats.
535///
536/// For higher-level primitives built on top of [`Async`], look into [`async-net`] or
537/// [`async-process`] (on Unix).
538///
539/// The most notable caveat is that it is unsafe to access the inner I/O source mutably
540/// using this primitive. Traits likes [`AsyncRead`] and [`AsyncWrite`] are not implemented by
541/// default unless it is guaranteed that the resource won't be invalidated by reading or writing.
542/// See the [`IoSafe`] trait for more information.
543///
544/// [`async-net`]: https://github.com/smol-rs/async-net
545/// [`async-process`]: https://github.com/smol-rs/async-process
546/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html
547/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html
548///
549/// ### Supported types
550///
551/// [`Async`] supports all networking types, as well as some OS-specific file descriptors like
552/// [timerfd] and [inotify].
553///
554/// However, do not use [`Async`] with types like [`File`][`std::fs::File`],
555/// [`Stdin`][`std::io::Stdin`], [`Stdout`][`std::io::Stdout`], or [`Stderr`][`std::io::Stderr`]
556/// because all operating systems have issues with them when put in non-blocking mode.
557///
558/// [timerfd]: https://github.com/smol-rs/async-io/blob/master/examples/linux-timerfd.rs
559/// [inotify]: https://github.com/smol-rs/async-io/blob/master/examples/linux-inotify.rs
560///
561/// ### Concurrent I/O
562///
563/// Note that [`&Async<T>`][`Async`] implements [`AsyncRead`] and [`AsyncWrite`] if `&T`
564/// implements those traits, which means tasks can concurrently read and write using shared
565/// references.
566///
567/// But there is a catch: only one task can read a time, and only one task can write at a time. It
568/// is okay to have two tasks where one is reading and the other is writing at the same time, but
569/// it is not okay to have two tasks reading at the same time or writing at the same time. If you
570/// try to do that, conflicting tasks will just keep waking each other in turn, thus wasting CPU
571/// time.
572///
573/// Besides [`AsyncRead`] and [`AsyncWrite`], this caveat also applies to
574/// [`poll_readable()`][`Async::poll_readable()`] and
575/// [`poll_writable()`][`Async::poll_writable()`].
576///
577/// However, any number of tasks can be concurrently calling other methods like
578/// [`readable()`][`Async::readable()`] or [`read_with()`][`Async::read_with()`].
579///
580/// ### Closing
581///
582/// Closing the write side of [`Async`] with [`close()`][`futures_lite::AsyncWriteExt::close()`]
583/// simply flushes. If you want to shutdown a TCP or Unix socket, use
584/// [`Shutdown`][`std::net::Shutdown`].
585///
586/// # Examples
587///
588/// Connect to a server and echo incoming messages back to the server:
589///
590/// ```no_run
591/// use async_io::Async;
592/// use futures_lite::io;
593/// use std::net::TcpStream;
594///
595/// # futures_lite::future::block_on(async {
596/// // Connect to a local server.
597/// let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 8000)).await?;
598///
599/// // Echo all messages from the read side of the stream into the write side.
600/// io::copy(&stream, &stream).await?;
601/// # std::io::Result::Ok(()) });
602/// ```
603///
604/// You can use either predefined async methods or wrap blocking I/O operations in
605/// [`Async::read_with()`], [`Async::read_with_mut()`], [`Async::write_with()`], and
606/// [`Async::write_with_mut()`]:
607///
608/// ```no_run
609/// use async_io::Async;
610/// use std::net::TcpListener;
611///
612/// # futures_lite::future::block_on(async {
613/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
614///
615/// // These two lines are equivalent:
616/// let (stream, addr) = listener.accept().await?;
617/// let (stream, addr) = listener.read_with(|inner| inner.accept()).await?;
618/// # std::io::Result::Ok(()) });
619/// ```
620#[derive(Debug)]
621pub struct Async<T> {
622 /// A source registered in the reactor.
623 source: Arc<Source>,
624
625 /// The inner I/O handle.
626 io: Option<T>,
627}
628
629impl<T> Unpin for Async<T> {}
630
631#[cfg(unix)]
632impl<T: AsFd> Async<T> {
633 /// Creates an async I/O handle.
634 ///
635 /// This method will put the handle in non-blocking mode and register it in
636 /// [epoll]/[kqueue]/[event ports]/[IOCP].
637 ///
638 /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
639 /// `AsSocket`.
640 ///
641 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
642 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
643 /// [event ports]: https://illumos.org/man/port_create
644 /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
645 ///
646 /// # Examples
647 ///
648 /// ```
649 /// use async_io::Async;
650 /// use std::net::{SocketAddr, TcpListener};
651 ///
652 /// # futures_lite::future::block_on(async {
653 /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
654 /// let listener = Async::new(listener)?;
655 /// # std::io::Result::Ok(()) });
656 /// ```
657 pub fn new(io: T) -> io::Result<Async<T>> {
658 // Put the file descriptor in non-blocking mode.
659 set_nonblocking(io.as_fd())?;
660
661 Self::new_nonblocking(io)
662 }
663
664 /// Creates an async I/O handle without setting it to non-blocking mode.
665 ///
666 /// This method will register the handle in [epoll]/[kqueue]/[event ports]/[IOCP].
667 ///
668 /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
669 /// `AsSocket`.
670 ///
671 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
672 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
673 /// [event ports]: https://illumos.org/man/port_create
674 /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
675 ///
676 /// # Caveats
677 ///
678 /// The caller should ensure that the handle is set to non-blocking mode or that it is okay if
679 /// it is not set. If not set to non-blocking mode, I/O operations may block the current thread
680 /// and cause a deadlock in an asynchronous context.
681 pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
682 // SAFETY: It is impossible to drop the I/O source while it is registered through
683 // this type.
684 let registration = unsafe { Registration::new(io.as_fd()) };
685
686 Ok(Async {
687 source: Reactor::get().insert_io(registration)?,
688 io: Some(io),
689 })
690 }
691}
692
693#[cfg(unix)]
694impl<T: AsRawFd> AsRawFd for Async<T> {
695 fn as_raw_fd(&self) -> RawFd {
696 self.get_ref().as_raw_fd()
697 }
698}
699
700#[cfg(unix)]
701impl<T: AsFd> AsFd for Async<T> {
702 fn as_fd(&self) -> BorrowedFd<'_> {
703 self.get_ref().as_fd()
704 }
705}
706
707#[cfg(unix)]
708impl<T: AsFd + From<OwnedFd>> TryFrom<OwnedFd> for Async<T> {
709 type Error = io::Error;
710
711 fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
712 Async::new(io:value.into())
713 }
714}
715
716#[cfg(unix)]
717impl<T: Into<OwnedFd>> TryFrom<Async<T>> for OwnedFd {
718 type Error = io::Error;
719
720 fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
721 value.into_inner().map(op:Into::into)
722 }
723}
724
725#[cfg(windows)]
726impl<T: AsSocket> Async<T> {
727 /// Creates an async I/O handle.
728 ///
729 /// This method will put the handle in non-blocking mode and register it in
730 /// [epoll]/[kqueue]/[event ports]/[IOCP].
731 ///
732 /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
733 /// `AsSocket`.
734 ///
735 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
736 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
737 /// [event ports]: https://illumos.org/man/port_create
738 /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
739 ///
740 /// # Examples
741 ///
742 /// ```
743 /// use async_io::Async;
744 /// use std::net::{SocketAddr, TcpListener};
745 ///
746 /// # futures_lite::future::block_on(async {
747 /// let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
748 /// let listener = Async::new(listener)?;
749 /// # std::io::Result::Ok(()) });
750 /// ```
751 pub fn new(io: T) -> io::Result<Async<T>> {
752 // Put the socket in non-blocking mode.
753 set_nonblocking(io.as_socket())?;
754
755 Self::new_nonblocking(io)
756 }
757
758 /// Creates an async I/O handle without setting it to non-blocking mode.
759 ///
760 /// This method will register the handle in [epoll]/[kqueue]/[event ports]/[IOCP].
761 ///
762 /// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
763 /// `AsSocket`.
764 ///
765 /// [epoll]: https://en.wikipedia.org/wiki/Epoll
766 /// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
767 /// [event ports]: https://illumos.org/man/port_create
768 /// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
769 ///
770 /// # Caveats
771 ///
772 /// The caller should ensure that the handle is set to non-blocking mode or that it is okay if
773 /// it is not set. If not set to non-blocking mode, I/O operations may block the current thread
774 /// and cause a deadlock in an asynchronous context.
775 pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
776 // Create the registration.
777 //
778 // SAFETY: It is impossible to drop the I/O source while it is registered through
779 // this type.
780 let registration = unsafe { Registration::new(io.as_socket()) };
781
782 Ok(Async {
783 source: Reactor::get().insert_io(registration)?,
784 io: Some(io),
785 })
786 }
787}
788
789#[cfg(windows)]
790impl<T: AsRawSocket> AsRawSocket for Async<T> {
791 fn as_raw_socket(&self) -> RawSocket {
792 self.get_ref().as_raw_socket()
793 }
794}
795
796#[cfg(windows)]
797impl<T: AsSocket> AsSocket for Async<T> {
798 fn as_socket(&self) -> BorrowedSocket<'_> {
799 self.get_ref().as_socket()
800 }
801}
802
803#[cfg(windows)]
804impl<T: AsSocket + From<OwnedSocket>> TryFrom<OwnedSocket> for Async<T> {
805 type Error = io::Error;
806
807 fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
808 Async::new(value.into())
809 }
810}
811
812#[cfg(windows)]
813impl<T: Into<OwnedSocket>> TryFrom<Async<T>> for OwnedSocket {
814 type Error = io::Error;
815
816 fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
817 value.into_inner().map(Into::into)
818 }
819}
820
821impl<T> Async<T> {
822 /// Gets a reference to the inner I/O handle.
823 ///
824 /// # Examples
825 ///
826 /// ```
827 /// use async_io::Async;
828 /// use std::net::TcpListener;
829 ///
830 /// # futures_lite::future::block_on(async {
831 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
832 /// let inner = listener.get_ref();
833 /// # std::io::Result::Ok(()) });
834 /// ```
835 pub fn get_ref(&self) -> &T {
836 self.io.as_ref().unwrap()
837 }
838
839 /// Gets a mutable reference to the inner I/O handle.
840 ///
841 /// # Safety
842 ///
843 /// The underlying I/O source must not be dropped using this function.
844 ///
845 /// # Examples
846 ///
847 /// ```
848 /// use async_io::Async;
849 /// use std::net::TcpListener;
850 ///
851 /// # futures_lite::future::block_on(async {
852 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
853 /// let inner = unsafe { listener.get_mut() };
854 /// # std::io::Result::Ok(()) });
855 /// ```
856 pub unsafe fn get_mut(&mut self) -> &mut T {
857 self.io.as_mut().unwrap()
858 }
859
860 /// Unwraps the inner I/O handle.
861 ///
862 /// This method will **not** put the I/O handle back into blocking mode.
863 ///
864 /// # Examples
865 ///
866 /// ```
867 /// use async_io::Async;
868 /// use std::net::TcpListener;
869 ///
870 /// # futures_lite::future::block_on(async {
871 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
872 /// let inner = listener.into_inner()?;
873 ///
874 /// // Put the listener back into blocking mode.
875 /// inner.set_nonblocking(false)?;
876 /// # std::io::Result::Ok(()) });
877 /// ```
878 pub fn into_inner(mut self) -> io::Result<T> {
879 let io = self.io.take().unwrap();
880 Reactor::get().remove_io(&self.source)?;
881 Ok(io)
882 }
883
884 /// Waits until the I/O handle is readable.
885 ///
886 /// This method completes when a read operation on this I/O handle wouldn't block.
887 ///
888 /// # Examples
889 ///
890 /// ```no_run
891 /// use async_io::Async;
892 /// use std::net::TcpListener;
893 ///
894 /// # futures_lite::future::block_on(async {
895 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
896 ///
897 /// // Wait until a client can be accepted.
898 /// listener.readable().await?;
899 /// # std::io::Result::Ok(()) });
900 /// ```
901 pub fn readable(&self) -> Readable<'_, T> {
902 Source::readable(self)
903 }
904
905 /// Waits until the I/O handle is readable.
906 ///
907 /// This method completes when a read operation on this I/O handle wouldn't block.
908 pub fn readable_owned(self: Arc<Self>) -> ReadableOwned<T> {
909 Source::readable_owned(self)
910 }
911
912 /// Waits until the I/O handle is writable.
913 ///
914 /// This method completes when a write operation on this I/O handle wouldn't block.
915 ///
916 /// # Examples
917 ///
918 /// ```
919 /// use async_io::Async;
920 /// use std::net::{TcpStream, ToSocketAddrs};
921 ///
922 /// # futures_lite::future::block_on(async {
923 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
924 /// let stream = Async::<TcpStream>::connect(addr).await?;
925 ///
926 /// // Wait until the stream is writable.
927 /// stream.writable().await?;
928 /// # std::io::Result::Ok(()) });
929 /// ```
930 pub fn writable(&self) -> Writable<'_, T> {
931 Source::writable(self)
932 }
933
934 /// Waits until the I/O handle is writable.
935 ///
936 /// This method completes when a write operation on this I/O handle wouldn't block.
937 pub fn writable_owned(self: Arc<Self>) -> WritableOwned<T> {
938 Source::writable_owned(self)
939 }
940
941 /// Polls the I/O handle for readability.
942 ///
943 /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
944 /// indicating readability since the last time this task has called the method and received
945 /// [`Poll::Pending`].
946 ///
947 /// # Caveats
948 ///
949 /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
950 /// will just keep waking each other in turn, thus wasting CPU time.
951 ///
952 /// Note that the [`AsyncRead`] implementation for [`Async`] also uses this method.
953 ///
954 /// # Examples
955 ///
956 /// ```no_run
957 /// use async_io::Async;
958 /// use futures_lite::future;
959 /// use std::net::TcpListener;
960 ///
961 /// # futures_lite::future::block_on(async {
962 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
963 ///
964 /// // Wait until a client can be accepted.
965 /// future::poll_fn(|cx| listener.poll_readable(cx)).await?;
966 /// # std::io::Result::Ok(()) });
967 /// ```
968 pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
969 self.source.poll_readable(cx)
970 }
971
972 /// Polls the I/O handle for writability.
973 ///
974 /// When this method returns [`Poll::Ready`], that means the OS has delivered an event
975 /// indicating writability since the last time this task has called the method and received
976 /// [`Poll::Pending`].
977 ///
978 /// # Caveats
979 ///
980 /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
981 /// will just keep waking each other in turn, thus wasting CPU time.
982 ///
983 /// Note that the [`AsyncWrite`] implementation for [`Async`] also uses this method.
984 ///
985 /// # Examples
986 ///
987 /// ```
988 /// use async_io::Async;
989 /// use futures_lite::future;
990 /// use std::net::{TcpStream, ToSocketAddrs};
991 ///
992 /// # futures_lite::future::block_on(async {
993 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
994 /// let stream = Async::<TcpStream>::connect(addr).await?;
995 ///
996 /// // Wait until the stream is writable.
997 /// future::poll_fn(|cx| stream.poll_writable(cx)).await?;
998 /// # std::io::Result::Ok(()) });
999 /// ```
1000 pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1001 self.source.poll_writable(cx)
1002 }
1003
1004 /// Performs a read operation asynchronously.
1005 ///
1006 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1007 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1008 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1009 /// sends a notification that the I/O handle is readable.
1010 ///
1011 /// The closure receives a shared reference to the I/O handle.
1012 ///
1013 /// # Examples
1014 ///
1015 /// ```no_run
1016 /// use async_io::Async;
1017 /// use std::net::TcpListener;
1018 ///
1019 /// # futures_lite::future::block_on(async {
1020 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1021 ///
1022 /// // Accept a new client asynchronously.
1023 /// let (stream, addr) = listener.read_with(|l| l.accept()).await?;
1024 /// # std::io::Result::Ok(()) });
1025 /// ```
1026 pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
1027 let mut op = op;
1028 loop {
1029 match op(self.get_ref()) {
1030 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1031 res => return res,
1032 }
1033 optimistic(self.readable()).await?;
1034 }
1035 }
1036
1037 /// Performs a read operation asynchronously.
1038 ///
1039 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1040 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1041 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1042 /// sends a notification that the I/O handle is readable.
1043 ///
1044 /// The closure receives a mutable reference to the I/O handle.
1045 ///
1046 /// # Safety
1047 ///
1048 /// In the closure, the underlying I/O source must not be dropped.
1049 ///
1050 /// # Examples
1051 ///
1052 /// ```no_run
1053 /// use async_io::Async;
1054 /// use std::net::TcpListener;
1055 ///
1056 /// # futures_lite::future::block_on(async {
1057 /// let mut listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1058 ///
1059 /// // Accept a new client asynchronously.
1060 /// let (stream, addr) = unsafe { listener.read_with_mut(|l| l.accept()).await? };
1061 /// # std::io::Result::Ok(()) });
1062 /// ```
1063 pub async unsafe fn read_with_mut<R>(
1064 &mut self,
1065 op: impl FnMut(&mut T) -> io::Result<R>,
1066 ) -> io::Result<R> {
1067 let mut op = op;
1068 loop {
1069 match op(self.get_mut()) {
1070 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1071 res => return res,
1072 }
1073 optimistic(self.readable()).await?;
1074 }
1075 }
1076
1077 /// Performs a write operation asynchronously.
1078 ///
1079 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1080 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1081 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1082 /// sends a notification that the I/O handle is writable.
1083 ///
1084 /// The closure receives a shared reference to the I/O handle.
1085 ///
1086 /// # Examples
1087 ///
1088 /// ```no_run
1089 /// use async_io::Async;
1090 /// use std::net::UdpSocket;
1091 ///
1092 /// # futures_lite::future::block_on(async {
1093 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1094 /// socket.get_ref().connect("127.0.0.1:9000")?;
1095 ///
1096 /// let msg = b"hello";
1097 /// let len = socket.write_with(|s| s.send(msg)).await?;
1098 /// # std::io::Result::Ok(()) });
1099 /// ```
1100 pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
1101 let mut op = op;
1102 loop {
1103 match op(self.get_ref()) {
1104 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1105 res => return res,
1106 }
1107 optimistic(self.writable()).await?;
1108 }
1109 }
1110
1111 /// Performs a write operation asynchronously.
1112 ///
1113 /// The I/O handle is registered in the reactor and put in non-blocking mode. This method
1114 /// invokes the `op` closure in a loop until it succeeds or returns an error other than
1115 /// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
1116 /// sends a notification that the I/O handle is writable.
1117 ///
1118 /// # Safety
1119 ///
1120 /// The closure receives a mutable reference to the I/O handle. In the closure, the underlying
1121 /// I/O source must not be dropped.
1122 ///
1123 /// # Examples
1124 ///
1125 /// ```no_run
1126 /// use async_io::Async;
1127 /// use std::net::UdpSocket;
1128 ///
1129 /// # futures_lite::future::block_on(async {
1130 /// let mut socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1131 /// socket.get_ref().connect("127.0.0.1:9000")?;
1132 ///
1133 /// let msg = b"hello";
1134 /// let len = unsafe { socket.write_with_mut(|s| s.send(msg)).await? };
1135 /// # std::io::Result::Ok(()) });
1136 /// ```
1137 pub async unsafe fn write_with_mut<R>(
1138 &mut self,
1139 op: impl FnMut(&mut T) -> io::Result<R>,
1140 ) -> io::Result<R> {
1141 let mut op = op;
1142 loop {
1143 match op(self.get_mut()) {
1144 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1145 res => return res,
1146 }
1147 optimistic(self.writable()).await?;
1148 }
1149 }
1150}
1151
1152impl<T> AsRef<T> for Async<T> {
1153 fn as_ref(&self) -> &T {
1154 self.get_ref()
1155 }
1156}
1157
1158impl<T> Drop for Async<T> {
1159 fn drop(&mut self) {
1160 if self.io.is_some() {
1161 // Deregister and ignore errors because destructors should not panic.
1162 Reactor::get().remove_io(&self.source).ok();
1163
1164 // Drop the I/O handle to close it.
1165 self.io.take();
1166 }
1167 }
1168}
1169
1170/// Types whose I/O trait implementations do not drop the underlying I/O source.
1171///
1172/// The resource contained inside of the [`Async`] cannot be invalidated. This invalidation can
1173/// happen if the inner resource (the [`TcpStream`], [`UnixListener`] or other `T`) is moved out
1174/// and dropped before the [`Async`]. Because of this, functions that grant mutable access to
1175/// the inner type are unsafe, as there is no way to guarantee that the source won't be dropped
1176/// and a dangling handle won't be left behind.
1177///
1178/// Unfortunately this extends to implementations of [`Read`] and [`Write`]. Since methods on those
1179/// traits take `&mut`, there is no guarantee that the implementor of those traits won't move the
1180/// source out while the method is being run.
1181///
1182/// This trait is an antidote to this predicament. By implementing this trait, the user pledges
1183/// that using any I/O traits won't destroy the source. This way, [`Async`] can implement the
1184/// `async` version of these I/O traits, like [`AsyncRead`] and [`AsyncWrite`].
1185///
1186/// # Safety
1187///
1188/// Any I/O trait implementations for this type must not drop the underlying I/O source. Traits
1189/// affected by this trait include [`Read`], [`Write`], [`Seek`] and [`BufRead`].
1190///
1191/// This trait is implemented by default on top of `libstd` types. In addition, it is implemented
1192/// for immutable reference types, as it is impossible to invalidate any outstanding references
1193/// while holding an immutable reference, even with interior mutability. As Rust's current pinning
1194/// system relies on similar guarantees, I believe that this approach is robust.
1195///
1196/// [`BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
1197/// [`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
1198/// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
1199/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
1200///
1201/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html
1202/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html
1203pub unsafe trait IoSafe {}
1204
1205/// Reference types can't be mutated.
1206///
1207/// The worst thing that can happen is that external state is used to change what kind of pointer
1208/// `as_fd()` returns. For instance:
1209///
1210/// ```
1211/// # #[cfg(unix)] {
1212/// use std::cell::Cell;
1213/// use std::net::TcpStream;
1214/// use std::os::unix::io::{AsFd, BorrowedFd};
1215///
1216/// struct Bar {
1217/// flag: Cell<bool>,
1218/// a: TcpStream,
1219/// b: TcpStream
1220/// }
1221///
1222/// impl AsFd for Bar {
1223/// fn as_fd(&self) -> BorrowedFd<'_> {
1224/// if self.flag.replace(!self.flag.get()) {
1225/// self.a.as_fd()
1226/// } else {
1227/// self.b.as_fd()
1228/// }
1229/// }
1230/// }
1231/// # }
1232/// ```
1233///
1234/// We solve this problem by only calling `as_fd()` once to get the original source. Implementations
1235/// like this are considered buggy (but not unsound) and are thus not really supported by `async-io`.
1236unsafe impl<T: ?Sized> IoSafe for &T {}
1237
1238// Can be implemented on top of libstd types.
1239unsafe impl IoSafe for std::fs::File {}
1240unsafe impl IoSafe for std::io::Stderr {}
1241unsafe impl IoSafe for std::io::Stdin {}
1242unsafe impl IoSafe for std::io::Stdout {}
1243unsafe impl IoSafe for std::io::StderrLock<'_> {}
1244unsafe impl IoSafe for std::io::StdinLock<'_> {}
1245unsafe impl IoSafe for std::io::StdoutLock<'_> {}
1246unsafe impl IoSafe for std::net::TcpStream {}
1247unsafe impl IoSafe for std::process::ChildStdin {}
1248unsafe impl IoSafe for std::process::ChildStdout {}
1249unsafe impl IoSafe for std::process::ChildStderr {}
1250
1251#[cfg(unix)]
1252unsafe impl IoSafe for std::os::unix::net::UnixStream {}
1253
1254unsafe impl<T: IoSafe + Read> IoSafe for std::io::BufReader<T> {}
1255unsafe impl<T: IoSafe + Write> IoSafe for std::io::BufWriter<T> {}
1256unsafe impl<T: IoSafe + Write> IoSafe for std::io::LineWriter<T> {}
1257unsafe impl<T: IoSafe + ?Sized> IoSafe for &mut T {}
1258unsafe impl<T: IoSafe + ?Sized> IoSafe for Box<T> {}
1259unsafe impl<T: Clone + IoSafe + ?Sized> IoSafe for std::borrow::Cow<'_, T> {}
1260
1261impl<T: IoSafe + Read> AsyncRead for Async<T> {
1262 fn poll_read(
1263 mut self: Pin<&mut Self>,
1264 cx: &mut Context<'_>,
1265 buf: &mut [u8],
1266 ) -> Poll<io::Result<usize>> {
1267 loop {
1268 match unsafe { (*self).get_mut() }.read(buf) {
1269 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1270 res => return Poll::Ready(res),
1271 }
1272 ready!(self.poll_readable(cx))?;
1273 }
1274 }
1275
1276 fn poll_read_vectored(
1277 mut self: Pin<&mut Self>,
1278 cx: &mut Context<'_>,
1279 bufs: &mut [IoSliceMut<'_>],
1280 ) -> Poll<io::Result<usize>> {
1281 loop {
1282 match unsafe { (*self).get_mut() }.read_vectored(bufs) {
1283 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1284 res => return Poll::Ready(res),
1285 }
1286 ready!(self.poll_readable(cx))?;
1287 }
1288 }
1289}
1290
1291// Since this is through a reference, we can't mutate the inner I/O source.
1292// Therefore this is safe!
1293impl<T> AsyncRead for &Async<T>
1294where
1295 for<'a> &'a T: Read,
1296{
1297 fn poll_read(
1298 self: Pin<&mut Self>,
1299 cx: &mut Context<'_>,
1300 buf: &mut [u8],
1301 ) -> Poll<io::Result<usize>> {
1302 loop {
1303 match (*self).get_ref().read(buf) {
1304 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1305 res => return Poll::Ready(res),
1306 }
1307 ready!(self.poll_readable(cx))?;
1308 }
1309 }
1310
1311 fn poll_read_vectored(
1312 self: Pin<&mut Self>,
1313 cx: &mut Context<'_>,
1314 bufs: &mut [IoSliceMut<'_>],
1315 ) -> Poll<io::Result<usize>> {
1316 loop {
1317 match (*self).get_ref().read_vectored(bufs) {
1318 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1319 res => return Poll::Ready(res),
1320 }
1321 ready!(self.poll_readable(cx))?;
1322 }
1323 }
1324}
1325
1326impl<T: IoSafe + Write> AsyncWrite for Async<T> {
1327 fn poll_write(
1328 mut self: Pin<&mut Self>,
1329 cx: &mut Context<'_>,
1330 buf: &[u8],
1331 ) -> Poll<io::Result<usize>> {
1332 loop {
1333 match unsafe { (*self).get_mut() }.write(buf) {
1334 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1335 res => return Poll::Ready(res),
1336 }
1337 ready!(self.poll_writable(cx))?;
1338 }
1339 }
1340
1341 fn poll_write_vectored(
1342 mut self: Pin<&mut Self>,
1343 cx: &mut Context<'_>,
1344 bufs: &[IoSlice<'_>],
1345 ) -> Poll<io::Result<usize>> {
1346 loop {
1347 match unsafe { (*self).get_mut() }.write_vectored(bufs) {
1348 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1349 res => return Poll::Ready(res),
1350 }
1351 ready!(self.poll_writable(cx))?;
1352 }
1353 }
1354
1355 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1356 loop {
1357 match unsafe { (*self).get_mut() }.flush() {
1358 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1359 res => return Poll::Ready(res),
1360 }
1361 ready!(self.poll_writable(cx))?;
1362 }
1363 }
1364
1365 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1366 self.poll_flush(cx)
1367 }
1368}
1369
1370impl<T> AsyncWrite for &Async<T>
1371where
1372 for<'a> &'a T: Write,
1373{
1374 fn poll_write(
1375 self: Pin<&mut Self>,
1376 cx: &mut Context<'_>,
1377 buf: &[u8],
1378 ) -> Poll<io::Result<usize>> {
1379 loop {
1380 match (*self).get_ref().write(buf) {
1381 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1382 res => return Poll::Ready(res),
1383 }
1384 ready!(self.poll_writable(cx))?;
1385 }
1386 }
1387
1388 fn poll_write_vectored(
1389 self: Pin<&mut Self>,
1390 cx: &mut Context<'_>,
1391 bufs: &[IoSlice<'_>],
1392 ) -> Poll<io::Result<usize>> {
1393 loop {
1394 match (*self).get_ref().write_vectored(bufs) {
1395 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1396 res => return Poll::Ready(res),
1397 }
1398 ready!(self.poll_writable(cx))?;
1399 }
1400 }
1401
1402 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1403 loop {
1404 match (*self).get_ref().flush() {
1405 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
1406 res => return Poll::Ready(res),
1407 }
1408 ready!(self.poll_writable(cx))?;
1409 }
1410 }
1411
1412 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1413 self.poll_flush(cx)
1414 }
1415}
1416
1417impl Async<TcpListener> {
1418 /// Creates a TCP listener bound to the specified address.
1419 ///
1420 /// Binding with port number 0 will request an available port from the OS.
1421 ///
1422 /// # Examples
1423 ///
1424 /// ```
1425 /// use async_io::Async;
1426 /// use std::net::TcpListener;
1427 ///
1428 /// # futures_lite::future::block_on(async {
1429 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
1430 /// println!("Listening on {}", listener.get_ref().local_addr()?);
1431 /// # std::io::Result::Ok(()) });
1432 /// ```
1433 pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>> {
1434 let addr = addr.into();
1435 Async::new(TcpListener::bind(addr)?)
1436 }
1437
1438 /// Accepts a new incoming TCP connection.
1439 ///
1440 /// When a connection is established, it will be returned as a TCP stream together with its
1441 /// remote address.
1442 ///
1443 /// # Examples
1444 ///
1445 /// ```no_run
1446 /// use async_io::Async;
1447 /// use std::net::TcpListener;
1448 ///
1449 /// # futures_lite::future::block_on(async {
1450 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1451 /// let (stream, addr) = listener.accept().await?;
1452 /// println!("Accepted client: {}", addr);
1453 /// # std::io::Result::Ok(()) });
1454 /// ```
1455 pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
1456 let (stream, addr) = self.read_with(|io| io.accept()).await?;
1457 Ok((Async::new(stream)?, addr))
1458 }
1459
1460 /// Returns a stream of incoming TCP connections.
1461 ///
1462 /// The stream is infinite, i.e. it never stops with a [`None`].
1463 ///
1464 /// # Examples
1465 ///
1466 /// ```no_run
1467 /// use async_io::Async;
1468 /// use futures_lite::{pin, stream::StreamExt};
1469 /// use std::net::TcpListener;
1470 ///
1471 /// # futures_lite::future::block_on(async {
1472 /// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
1473 /// let incoming = listener.incoming();
1474 /// pin!(incoming);
1475 ///
1476 /// while let Some(stream) = incoming.next().await {
1477 /// let stream = stream?;
1478 /// println!("Accepted client: {}", stream.get_ref().peer_addr()?);
1479 /// }
1480 /// # std::io::Result::Ok(()) });
1481 /// ```
1482 pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_ {
1483 stream::unfold(self, |listener| async move {
1484 let res = listener.accept().await.map(|(stream, _)| stream);
1485 Some((res, listener))
1486 })
1487 }
1488}
1489
1490impl TryFrom<std::net::TcpListener> for Async<std::net::TcpListener> {
1491 type Error = io::Error;
1492
1493 fn try_from(listener: std::net::TcpListener) -> io::Result<Self> {
1494 Async::new(io:listener)
1495 }
1496}
1497
1498impl Async<TcpStream> {
1499 /// Creates a TCP connection to the specified address.
1500 ///
1501 /// # Examples
1502 ///
1503 /// ```
1504 /// use async_io::Async;
1505 /// use std::net::{TcpStream, ToSocketAddrs};
1506 ///
1507 /// # futures_lite::future::block_on(async {
1508 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1509 /// let stream = Async::<TcpStream>::connect(addr).await?;
1510 /// # std::io::Result::Ok(()) });
1511 /// ```
1512 pub async fn connect<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpStream>> {
1513 // Figure out how to handle this address.
1514 let addr = addr.into();
1515 let (domain, sock_addr) = match addr {
1516 SocketAddr::V4(v4) => (rn::AddressFamily::INET, rn::SocketAddrAny::V4(v4)),
1517 SocketAddr::V6(v6) => (rn::AddressFamily::INET6, rn::SocketAddrAny::V6(v6)),
1518 };
1519
1520 // Begin async connect.
1521 let socket = connect(sock_addr, domain, Some(rn::ipproto::TCP))?;
1522 // Use new_nonblocking because connect already sets socket to non-blocking mode.
1523 let stream = Async::new_nonblocking(TcpStream::from(socket))?;
1524
1525 // The stream becomes writable when connected.
1526 stream.writable().await?;
1527
1528 // Check if there was an error while connecting.
1529 match stream.get_ref().take_error()? {
1530 None => Ok(stream),
1531 Some(err) => Err(err),
1532 }
1533 }
1534
1535 /// Reads data from the stream without removing it from the buffer.
1536 ///
1537 /// Returns the number of bytes read. Successive calls of this method read the same data.
1538 ///
1539 /// # Examples
1540 ///
1541 /// ```
1542 /// use async_io::Async;
1543 /// use futures_lite::{io::AsyncWriteExt, stream::StreamExt};
1544 /// use std::net::{TcpStream, ToSocketAddrs};
1545 ///
1546 /// # futures_lite::future::block_on(async {
1547 /// let addr = "example.com:80".to_socket_addrs()?.next().unwrap();
1548 /// let mut stream = Async::<TcpStream>::connect(addr).await?;
1549 ///
1550 /// stream
1551 /// .write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
1552 /// .await?;
1553 ///
1554 /// let mut buf = [0u8; 1024];
1555 /// let len = stream.peek(&mut buf).await?;
1556 /// # std::io::Result::Ok(()) });
1557 /// ```
1558 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1559 self.read_with(|io| io.peek(buf)).await
1560 }
1561}
1562
1563impl TryFrom<std::net::TcpStream> for Async<std::net::TcpStream> {
1564 type Error = io::Error;
1565
1566 fn try_from(stream: std::net::TcpStream) -> io::Result<Self> {
1567 Async::new(io:stream)
1568 }
1569}
1570
1571impl Async<UdpSocket> {
1572 /// Creates a UDP socket bound to the specified address.
1573 ///
1574 /// Binding with port number 0 will request an available port from the OS.
1575 ///
1576 /// # Examples
1577 ///
1578 /// ```
1579 /// use async_io::Async;
1580 /// use std::net::UdpSocket;
1581 ///
1582 /// # futures_lite::future::block_on(async {
1583 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1584 /// println!("Bound to {}", socket.get_ref().local_addr()?);
1585 /// # std::io::Result::Ok(()) });
1586 /// ```
1587 pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<UdpSocket>> {
1588 let addr = addr.into();
1589 Async::new(UdpSocket::bind(addr)?)
1590 }
1591
1592 /// Receives a single datagram message.
1593 ///
1594 /// Returns the number of bytes read and the address the message came from.
1595 ///
1596 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1597 /// If the message is too long to fit, excess bytes may get discarded.
1598 ///
1599 /// # Examples
1600 ///
1601 /// ```no_run
1602 /// use async_io::Async;
1603 /// use std::net::UdpSocket;
1604 ///
1605 /// # futures_lite::future::block_on(async {
1606 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1607 ///
1608 /// let mut buf = [0u8; 1024];
1609 /// let (len, addr) = socket.recv_from(&mut buf).await?;
1610 /// # std::io::Result::Ok(()) });
1611 /// ```
1612 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1613 self.read_with(|io| io.recv_from(buf)).await
1614 }
1615
1616 /// Receives a single datagram message without removing it from the queue.
1617 ///
1618 /// Returns the number of bytes read and the address the message came from.
1619 ///
1620 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1621 /// If the message is too long to fit, excess bytes may get discarded.
1622 ///
1623 /// # Examples
1624 ///
1625 /// ```no_run
1626 /// use async_io::Async;
1627 /// use std::net::UdpSocket;
1628 ///
1629 /// # futures_lite::future::block_on(async {
1630 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1631 ///
1632 /// let mut buf = [0u8; 1024];
1633 /// let (len, addr) = socket.peek_from(&mut buf).await?;
1634 /// # std::io::Result::Ok(()) });
1635 /// ```
1636 pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1637 self.read_with(|io| io.peek_from(buf)).await
1638 }
1639
1640 /// Sends data to the specified address.
1641 ///
1642 /// Returns the number of bytes writen.
1643 ///
1644 /// # Examples
1645 ///
1646 /// ```no_run
1647 /// use async_io::Async;
1648 /// use std::net::UdpSocket;
1649 ///
1650 /// # futures_lite::future::block_on(async {
1651 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?;
1652 /// let addr = socket.get_ref().local_addr()?;
1653 ///
1654 /// let msg = b"hello";
1655 /// let len = socket.send_to(msg, addr).await?;
1656 /// # std::io::Result::Ok(()) });
1657 /// ```
1658 pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
1659 let addr = addr.into();
1660 self.write_with(|io| io.send_to(buf, addr)).await
1661 }
1662
1663 /// Receives a single datagram message from the connected peer.
1664 ///
1665 /// Returns the number of bytes read.
1666 ///
1667 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1668 /// If the message is too long to fit, excess bytes may get discarded.
1669 ///
1670 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1671 /// This method will fail if the socket is not connected.
1672 ///
1673 /// # Examples
1674 ///
1675 /// ```no_run
1676 /// use async_io::Async;
1677 /// use std::net::UdpSocket;
1678 ///
1679 /// # futures_lite::future::block_on(async {
1680 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1681 /// socket.get_ref().connect("127.0.0.1:9000")?;
1682 ///
1683 /// let mut buf = [0u8; 1024];
1684 /// let len = socket.recv(&mut buf).await?;
1685 /// # std::io::Result::Ok(()) });
1686 /// ```
1687 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
1688 self.read_with(|io| io.recv(buf)).await
1689 }
1690
1691 /// Receives a single datagram message from the connected peer without removing it from the
1692 /// queue.
1693 ///
1694 /// Returns the number of bytes read and the address the message came from.
1695 ///
1696 /// This method must be called with a valid byte slice of sufficient size to hold the message.
1697 /// If the message is too long to fit, excess bytes may get discarded.
1698 ///
1699 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1700 /// This method will fail if the socket is not connected.
1701 ///
1702 /// # Examples
1703 ///
1704 /// ```no_run
1705 /// use async_io::Async;
1706 /// use std::net::UdpSocket;
1707 ///
1708 /// # futures_lite::future::block_on(async {
1709 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1710 /// socket.get_ref().connect("127.0.0.1:9000")?;
1711 ///
1712 /// let mut buf = [0u8; 1024];
1713 /// let len = socket.peek(&mut buf).await?;
1714 /// # std::io::Result::Ok(()) });
1715 /// ```
1716 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1717 self.read_with(|io| io.peek(buf)).await
1718 }
1719
1720 /// Sends data to the connected peer.
1721 ///
1722 /// Returns the number of bytes written.
1723 ///
1724 /// The [`connect`][`UdpSocket::connect()`] method connects this socket to a remote address.
1725 /// This method will fail if the socket is not connected.
1726 ///
1727 /// # Examples
1728 ///
1729 /// ```no_run
1730 /// use async_io::Async;
1731 /// use std::net::UdpSocket;
1732 ///
1733 /// # futures_lite::future::block_on(async {
1734 /// let socket = Async::<UdpSocket>::bind(([127, 0, 0, 1], 8000))?;
1735 /// socket.get_ref().connect("127.0.0.1:9000")?;
1736 ///
1737 /// let msg = b"hello";
1738 /// let len = socket.send(msg).await?;
1739 /// # std::io::Result::Ok(()) });
1740 /// ```
1741 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
1742 self.write_with(|io| io.send(buf)).await
1743 }
1744}
1745
1746impl TryFrom<std::net::UdpSocket> for Async<std::net::UdpSocket> {
1747 type Error = io::Error;
1748
1749 fn try_from(socket: std::net::UdpSocket) -> io::Result<Self> {
1750 Async::new(io:socket)
1751 }
1752}
1753
1754#[cfg(unix)]
1755impl Async<UnixListener> {
1756 /// Creates a UDS listener bound to the specified path.
1757 ///
1758 /// # Examples
1759 ///
1760 /// ```no_run
1761 /// use async_io::Async;
1762 /// use std::os::unix::net::UnixListener;
1763 ///
1764 /// # futures_lite::future::block_on(async {
1765 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1766 /// println!("Listening on {:?}", listener.get_ref().local_addr()?);
1767 /// # std::io::Result::Ok(()) });
1768 /// ```
1769 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
1770 let path = path.as_ref().to_owned();
1771 Async::new(UnixListener::bind(path)?)
1772 }
1773
1774 /// Accepts a new incoming UDS stream connection.
1775 ///
1776 /// When a connection is established, it will be returned as a stream together with its remote
1777 /// address.
1778 ///
1779 /// # Examples
1780 ///
1781 /// ```no_run
1782 /// use async_io::Async;
1783 /// use std::os::unix::net::UnixListener;
1784 ///
1785 /// # futures_lite::future::block_on(async {
1786 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1787 /// let (stream, addr) = listener.accept().await?;
1788 /// println!("Accepted client: {:?}", addr);
1789 /// # std::io::Result::Ok(()) });
1790 /// ```
1791 pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
1792 let (stream, addr) = self.read_with(|io| io.accept()).await?;
1793 Ok((Async::new(stream)?, addr))
1794 }
1795
1796 /// Returns a stream of incoming UDS connections.
1797 ///
1798 /// The stream is infinite, i.e. it never stops with a [`None`] item.
1799 ///
1800 /// # Examples
1801 ///
1802 /// ```no_run
1803 /// use async_io::Async;
1804 /// use futures_lite::{pin, stream::StreamExt};
1805 /// use std::os::unix::net::UnixListener;
1806 ///
1807 /// # futures_lite::future::block_on(async {
1808 /// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
1809 /// let incoming = listener.incoming();
1810 /// pin!(incoming);
1811 ///
1812 /// while let Some(stream) = incoming.next().await {
1813 /// let stream = stream?;
1814 /// println!("Accepted client: {:?}", stream.get_ref().peer_addr()?);
1815 /// }
1816 /// # std::io::Result::Ok(()) });
1817 /// ```
1818 pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_ {
1819 stream::unfold(self, |listener| async move {
1820 let res = listener.accept().await.map(|(stream, _)| stream);
1821 Some((res, listener))
1822 })
1823 }
1824}
1825
1826#[cfg(unix)]
1827impl TryFrom<std::os::unix::net::UnixListener> for Async<std::os::unix::net::UnixListener> {
1828 type Error = io::Error;
1829
1830 fn try_from(listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
1831 Async::new(io:listener)
1832 }
1833}
1834
1835#[cfg(unix)]
1836impl Async<UnixStream> {
1837 /// Creates a UDS stream connected to the specified path.
1838 ///
1839 /// # Examples
1840 ///
1841 /// ```no_run
1842 /// use async_io::Async;
1843 /// use std::os::unix::net::UnixStream;
1844 ///
1845 /// # futures_lite::future::block_on(async {
1846 /// let stream = Async::<UnixStream>::connect("/tmp/socket").await?;
1847 /// # std::io::Result::Ok(()) });
1848 /// ```
1849 pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
1850 let address = convert_path_to_socket_address(path.as_ref())?;
1851
1852 // Begin async connect.
1853 let socket = connect(address.into(), rn::AddressFamily::UNIX, None)?;
1854 // Use new_nonblocking because connect already sets socket to non-blocking mode.
1855 let stream = Async::new_nonblocking(UnixStream::from(socket))?;
1856
1857 // The stream becomes writable when connected.
1858 stream.writable().await?;
1859
1860 // On Linux, it appears the socket may become writable even when connecting fails, so we
1861 // must do an extra check here and see if the peer address is retrievable.
1862 stream.get_ref().peer_addr()?;
1863 Ok(stream)
1864 }
1865
1866 /// Creates an unnamed pair of connected UDS stream sockets.
1867 ///
1868 /// # Examples
1869 ///
1870 /// ```no_run
1871 /// use async_io::Async;
1872 /// use std::os::unix::net::UnixStream;
1873 ///
1874 /// # futures_lite::future::block_on(async {
1875 /// let (stream1, stream2) = Async::<UnixStream>::pair()?;
1876 /// # std::io::Result::Ok(()) });
1877 /// ```
1878 pub fn pair() -> io::Result<(Async<UnixStream>, Async<UnixStream>)> {
1879 let (stream1, stream2) = UnixStream::pair()?;
1880 Ok((Async::new(stream1)?, Async::new(stream2)?))
1881 }
1882}
1883
1884#[cfg(unix)]
1885impl TryFrom<std::os::unix::net::UnixStream> for Async<std::os::unix::net::UnixStream> {
1886 type Error = io::Error;
1887
1888 fn try_from(stream: std::os::unix::net::UnixStream) -> io::Result<Self> {
1889 Async::new(io:stream)
1890 }
1891}
1892
1893#[cfg(unix)]
1894impl Async<UnixDatagram> {
1895 /// Creates a UDS datagram socket bound to the specified path.
1896 ///
1897 /// # Examples
1898 ///
1899 /// ```no_run
1900 /// use async_io::Async;
1901 /// use std::os::unix::net::UnixDatagram;
1902 ///
1903 /// # futures_lite::future::block_on(async {
1904 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1905 /// # std::io::Result::Ok(()) });
1906 /// ```
1907 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
1908 let path = path.as_ref().to_owned();
1909 Async::new(UnixDatagram::bind(path)?)
1910 }
1911
1912 /// Creates a UDS datagram socket not bound to any address.
1913 ///
1914 /// # Examples
1915 ///
1916 /// ```no_run
1917 /// use async_io::Async;
1918 /// use std::os::unix::net::UnixDatagram;
1919 ///
1920 /// # futures_lite::future::block_on(async {
1921 /// let socket = Async::<UnixDatagram>::unbound()?;
1922 /// # std::io::Result::Ok(()) });
1923 /// ```
1924 pub fn unbound() -> io::Result<Async<UnixDatagram>> {
1925 Async::new(UnixDatagram::unbound()?)
1926 }
1927
1928 /// Creates an unnamed pair of connected Unix datagram sockets.
1929 ///
1930 /// # Examples
1931 ///
1932 /// ```no_run
1933 /// use async_io::Async;
1934 /// use std::os::unix::net::UnixDatagram;
1935 ///
1936 /// # futures_lite::future::block_on(async {
1937 /// let (socket1, socket2) = Async::<UnixDatagram>::pair()?;
1938 /// # std::io::Result::Ok(()) });
1939 /// ```
1940 pub fn pair() -> io::Result<(Async<UnixDatagram>, Async<UnixDatagram>)> {
1941 let (socket1, socket2) = UnixDatagram::pair()?;
1942 Ok((Async::new(socket1)?, Async::new(socket2)?))
1943 }
1944
1945 /// Receives data from the socket.
1946 ///
1947 /// Returns the number of bytes read and the address the message came from.
1948 ///
1949 /// # Examples
1950 ///
1951 /// ```no_run
1952 /// use async_io::Async;
1953 /// use std::os::unix::net::UnixDatagram;
1954 ///
1955 /// # futures_lite::future::block_on(async {
1956 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket")?;
1957 ///
1958 /// let mut buf = [0u8; 1024];
1959 /// let (len, addr) = socket.recv_from(&mut buf).await?;
1960 /// # std::io::Result::Ok(()) });
1961 /// ```
1962 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> {
1963 self.read_with(|io| io.recv_from(buf)).await
1964 }
1965
1966 /// Sends data to the specified address.
1967 ///
1968 /// Returns the number of bytes written.
1969 ///
1970 /// # Examples
1971 ///
1972 /// ```no_run
1973 /// use async_io::Async;
1974 /// use std::os::unix::net::UnixDatagram;
1975 ///
1976 /// # futures_lite::future::block_on(async {
1977 /// let socket = Async::<UnixDatagram>::unbound()?;
1978 ///
1979 /// let msg = b"hello";
1980 /// let addr = "/tmp/socket";
1981 /// let len = socket.send_to(msg, addr).await?;
1982 /// # std::io::Result::Ok(()) });
1983 /// ```
1984 pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
1985 self.write_with(|io| io.send_to(buf, &path)).await
1986 }
1987
1988 /// Receives data from the connected peer.
1989 ///
1990 /// Returns the number of bytes read and the address the message came from.
1991 ///
1992 /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
1993 /// This method will fail if the socket is not connected.
1994 ///
1995 /// # Examples
1996 ///
1997 /// ```no_run
1998 /// use async_io::Async;
1999 /// use std::os::unix::net::UnixDatagram;
2000 ///
2001 /// # futures_lite::future::block_on(async {
2002 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
2003 /// socket.get_ref().connect("/tmp/socket2")?;
2004 ///
2005 /// let mut buf = [0u8; 1024];
2006 /// let len = socket.recv(&mut buf).await?;
2007 /// # std::io::Result::Ok(()) });
2008 /// ```
2009 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
2010 self.read_with(|io| io.recv(buf)).await
2011 }
2012
2013 /// Sends data to the connected peer.
2014 ///
2015 /// Returns the number of bytes written.
2016 ///
2017 /// The [`connect`][`UnixDatagram::connect()`] method connects this socket to a remote address.
2018 /// This method will fail if the socket is not connected.
2019 ///
2020 /// # Examples
2021 ///
2022 /// ```no_run
2023 /// use async_io::Async;
2024 /// use std::os::unix::net::UnixDatagram;
2025 ///
2026 /// # futures_lite::future::block_on(async {
2027 /// let socket = Async::<UnixDatagram>::bind("/tmp/socket1")?;
2028 /// socket.get_ref().connect("/tmp/socket2")?;
2029 ///
2030 /// let msg = b"hello";
2031 /// let len = socket.send(msg).await?;
2032 /// # std::io::Result::Ok(()) });
2033 /// ```
2034 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
2035 self.write_with(|io| io.send(buf)).await
2036 }
2037}
2038
2039#[cfg(unix)]
2040impl TryFrom<std::os::unix::net::UnixDatagram> for Async<std::os::unix::net::UnixDatagram> {
2041 type Error = io::Error;
2042
2043 fn try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<Self> {
2044 Async::new(io:socket)
2045 }
2046}
2047
2048/// Polls a future once, waits for a wakeup, and then optimistically assumes the future is ready.
2049async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
2050 let mut polled: bool = false;
2051 pin!(fut);
2052
2053 futurePollFn) -> …>::poll_fn(|cx: &mut Context<'_>| {
2054 if !polled {
2055 polled = true;
2056 fut.as_mut().poll(cx)
2057 } else {
2058 Poll::Ready(Ok(()))
2059 }
2060 })
2061 .await
2062}
2063
2064fn connect(
2065 addr: rn::SocketAddrAny,
2066 domain: rn::AddressFamily,
2067 protocol: Option<rn::Protocol>,
2068) -> io::Result<rustix::fd::OwnedFd> {
2069 #[cfg(windows)]
2070 use rustix::fd::AsFd;
2071
2072 setup_networking();
2073
2074 #[cfg(any(
2075 target_os = "android",
2076 target_os = "dragonfly",
2077 target_os = "freebsd",
2078 target_os = "fuchsia",
2079 target_os = "illumos",
2080 target_os = "linux",
2081 target_os = "netbsd",
2082 target_os = "openbsd"
2083 ))]
2084 let socket = rn::socket_with(
2085 domain,
2086 rn::SocketType::STREAM,
2087 rn::SocketFlags::CLOEXEC | rn::SocketFlags::NONBLOCK,
2088 protocol,
2089 )?;
2090
2091 #[cfg(not(any(
2092 target_os = "android",
2093 target_os = "dragonfly",
2094 target_os = "freebsd",
2095 target_os = "fuchsia",
2096 target_os = "illumos",
2097 target_os = "linux",
2098 target_os = "netbsd",
2099 target_os = "openbsd"
2100 )))]
2101 let socket = {
2102 #[cfg(not(any(
2103 target_os = "aix",
2104 target_os = "macos",
2105 target_os = "ios",
2106 target_os = "tvos",
2107 target_os = "watchos",
2108 target_os = "espidf",
2109 windows,
2110 )))]
2111 let flags = rn::SocketFlags::CLOEXEC;
2112 #[cfg(any(
2113 target_os = "aix",
2114 target_os = "macos",
2115 target_os = "ios",
2116 target_os = "tvos",
2117 target_os = "watchos",
2118 target_os = "espidf",
2119 windows,
2120 ))]
2121 let flags = rn::SocketFlags::empty();
2122
2123 // Create the socket.
2124 let socket = rn::socket_with(domain, rn::SocketType::STREAM, flags, protocol)?;
2125
2126 // Set cloexec if necessary.
2127 #[cfg(any(
2128 target_os = "aix",
2129 target_os = "macos",
2130 target_os = "ios",
2131 target_os = "tvos",
2132 target_os = "watchos",
2133 ))]
2134 rio::fcntl_setfd(&socket, rio::fcntl_getfd(&socket)? | rio::FdFlags::CLOEXEC)?;
2135
2136 // Set non-blocking mode.
2137 set_nonblocking(socket.as_fd())?;
2138
2139 socket
2140 };
2141
2142 // Set nosigpipe if necessary.
2143 #[cfg(any(
2144 target_os = "macos",
2145 target_os = "ios",
2146 target_os = "tvos",
2147 target_os = "watchos",
2148 target_os = "freebsd"
2149 ))]
2150 rn::sockopt::set_socket_nosigpipe(&socket, true)?;
2151
2152 // Set the handle information to HANDLE_FLAG_INHERIT.
2153 #[cfg(windows)]
2154 unsafe {
2155 if windows_sys::Win32::Foundation::SetHandleInformation(
2156 socket.as_raw_socket() as _,
2157 windows_sys::Win32::Foundation::HANDLE_FLAG_INHERIT,
2158 windows_sys::Win32::Foundation::HANDLE_FLAG_INHERIT,
2159 ) == 0
2160 {
2161 return Err(io::Error::last_os_error());
2162 }
2163 }
2164
2165 #[allow(unreachable_patterns)]
2166 match rn::connect_any(&socket, &addr) {
2167 Ok(_) => {}
2168 #[cfg(unix)]
2169 Err(rio::Errno::INPROGRESS) => {}
2170 Err(rio::Errno::AGAIN) | Err(rio::Errno::WOULDBLOCK) => {}
2171 Err(err) => return Err(err.into()),
2172 }
2173 Ok(socket)
2174}
2175
2176#[inline]
2177fn setup_networking() {
2178 #[cfg(windows)]
2179 {
2180 // On Windows, we need to call WSAStartup before calling any networking code.
2181 // Make sure to call it at least once.
2182 static INIT: std::sync::Once = std::sync::Once::new();
2183
2184 INIT.call_once(|| {
2185 let _ = rustix::net::wsa_startup();
2186 });
2187 }
2188}
2189
2190#[inline]
2191fn set_nonblocking(
2192 #[cfg(unix)] fd: BorrowedFd<'_>,
2193 #[cfg(windows)] fd: BorrowedSocket<'_>,
2194) -> io::Result<()> {
2195 cfg_if::cfg_if! {
2196 // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
2197 // for now, as with the standard library, because it seems to behave
2198 // differently depending on the platform.
2199 // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
2200 // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
2201 // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
2202 if #[cfg(any(windows, target_os = "linux"))] {
2203 rustix::io::ioctl_fionbio(fd, true)?;
2204 } else {
2205 let previous = rustix::fs::fcntl_getfl(fd)?;
2206 let new = previous | rustix::fs::OFlags::NONBLOCK;
2207 if new != previous {
2208 rustix::fs::fcntl_setfl(fd, new)?;
2209 }
2210 }
2211 }
2212
2213 Ok(())
2214}
2215
2216/// Converts a `Path` to its socket address representation.
2217///
2218/// This function is abstract socket-aware.
2219#[cfg(unix)]
2220#[inline]
2221fn convert_path_to_socket_address(path: &Path) -> io::Result<rn::SocketAddrUnix> {
2222 // SocketAddrUnix::new() will throw EINVAL when a path with a zero in it is passed in.
2223 // However, some users expect to be able to pass in paths to abstract sockets, which
2224 // triggers this error as it has a zero in it. Therefore, if a path starts with a zero,
2225 // make it an abstract socket.
2226 #[cfg(any(target_os = "linux", target_os = "android"))]
2227 let address: SocketAddrUnix = {
2228 use std::os::unix::ffi::OsStrExt;
2229
2230 let path: &OsStr = path.as_os_str();
2231 match path.as_bytes().first() {
2232 Some(0) => rn::SocketAddrUnix::new_abstract_name(path.as_bytes().get(index:1..).unwrap())?,
2233 _ => rn::SocketAddrUnix::new(path)?,
2234 }
2235 };
2236
2237 // Only Linux and Android support abstract sockets.
2238 #[cfg(not(any(target_os = "linux", target_os = "android")))]
2239 let address = rn::SocketAddrUnix::new(path)?;
2240
2241 Ok(address)
2242}
2243