1//! Adapters for async IO objects
2//!
3//! This module mainly hosts the [`Async`] adapter for making IO objects async with readiness
4//! monitoring backed by an [`EventLoop`](crate::EventLoop). See [`LoopHandle::adapt_io`] for
5//! how to create them.
6//!
7//! [`LoopHandle::adapt_io`]: crate::LoopHandle#method.adapt_io
8
9use std::cell::RefCell;
10use std::pin::Pin;
11use std::rc::Rc;
12use std::task::{Context, Poll as TaskPoll, Waker};
13
14#[cfg(unix)]
15use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
16#[cfg(windows)]
17use std::os::windows::io::{
18 AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd, RawSocket as RawFd,
19};
20
21#[cfg(feature = "futures-io")]
22use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
23
24use crate::loop_logic::EventIterator;
25use crate::{
26 loop_logic::LoopInner, sources::EventDispatcher, Interest, Mode, Poll, PostAction, Readiness,
27 Token, TokenFactory,
28};
29use crate::{AdditionalLifecycleEventsSet, RegistrationToken};
30
31/// Adapter for async IO manipulations
32///
33/// This type wraps an IO object, providing methods to create futures waiting for its
34/// readiness.
35///
36/// If the `futures-io` cargo feature is enabled, it also implements `AsyncRead` and/or
37/// `AsyncWrite` if the underlying type implements `Read` and/or `Write`.
38///
39/// Note that this adapter and the futures procuded from it and *not* threadsafe.
40///
41/// ## Platform-Specific
42///
43/// - **Windows:** Usually, on drop, the file descriptor is set back to its previous status.
44/// For example, if the file was previously nonblocking it will be set to nonblocking, and
45/// if the file was blocking it will be set to blocking. However, on Windows, it is impossible
46/// to tell what its status was before. Therefore it will always be set to blocking.
47pub struct Async<'l, F: AsFd> {
48 fd: Option<F>,
49 dispatcher: Rc<RefCell<IoDispatcher>>,
50 inner: Rc<dyn IoLoopInner + 'l>,
51 was_nonblocking: bool,
52}
53
54impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> {
55 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("Async").field(name:"fd", &self.fd).finish()
58 }
59}
60
61impl<'l, F: AsFd> Async<'l, F> {
62 pub(crate) fn new<Data>(inner: Rc<LoopInner<'l, Data>>, fd: F) -> crate::Result<Async<'l, F>> {
63 // set non-blocking
64 let was_nonblocking = set_nonblocking(
65 #[cfg(unix)]
66 fd.as_fd(),
67 #[cfg(windows)]
68 fd.as_socket(),
69 true,
70 )?;
71 // register in the loop
72 let dispatcher = Rc::new(RefCell::new(IoDispatcher {
73 #[cfg(unix)]
74 fd: fd.as_fd().as_raw_fd(),
75 #[cfg(windows)]
76 fd: fd.as_socket().as_raw_socket(),
77 token: None,
78 waker: None,
79 is_registered: false,
80 interest: Interest::EMPTY,
81 last_readiness: Readiness::EMPTY,
82 }));
83
84 {
85 let mut sources = inner.sources.borrow_mut();
86 let slot = sources.vacant_entry();
87 slot.source = Some(dispatcher.clone());
88 dispatcher.borrow_mut().token = Some(Token { inner: slot.token });
89 }
90
91 // SAFETY: We are sure to deregister on drop.
92 unsafe {
93 inner.register(&dispatcher)?;
94 }
95
96 // Straightforward casting would require us to add the bound `Data: 'l` but we don't actually need it
97 // as this module never accesses the dispatch data, so we use transmute to erase it
98 let inner: Rc<dyn IoLoopInner + 'l> =
99 unsafe { std::mem::transmute(inner as Rc<dyn IoLoopInner>) };
100
101 Ok(Async {
102 fd: Some(fd),
103 dispatcher,
104 inner,
105 was_nonblocking,
106 })
107 }
108
109 /// Mutably access the underlying IO object
110 pub fn get_mut(&mut self) -> &mut F {
111 self.fd.as_mut().unwrap()
112 }
113
114 /// A future that resolves once the object becomes ready for reading
115 pub fn readable<'s>(&'s mut self) -> Readable<'s, 'l, F> {
116 Readable { io: self }
117 }
118
119 /// A future that resolves once the object becomes ready for writing
120 pub fn writable<'s>(&'s mut self) -> Writable<'s, 'l, F> {
121 Writable { io: self }
122 }
123
124 /// Remove the async adapter and retrieve the underlying object
125 pub fn into_inner(mut self) -> F {
126 self.fd.take().unwrap()
127 }
128
129 fn readiness(&self) -> Readiness {
130 self.dispatcher.borrow_mut().readiness()
131 }
132
133 fn register_waker(&self, interest: Interest, waker: Waker) -> crate::Result<()> {
134 {
135 let mut disp = self.dispatcher.borrow_mut();
136 disp.interest = interest;
137 disp.waker = Some(waker);
138 }
139 self.inner.reregister(&self.dispatcher)
140 }
141}
142
143/// A future that resolves once the associated object becomes ready for reading
144#[derive(Debug)]
145pub struct Readable<'s, 'l, F: AsFd> {
146 io: &'s mut Async<'l, F>,
147}
148
149impl<'s, 'l, F: AsFd> std::future::Future for Readable<'s, 'l, F> {
150 type Output = ();
151 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<()> {
152 let io: &mut &mut Async<'_, F> = &mut self.as_mut().io;
153 let readiness: Readiness = io.readiness();
154 if readiness.readable || readiness.error {
155 TaskPoll::Ready(())
156 } else {
157 let _ = io.register_waker(interest:Interest::READ, waker:cx.waker().clone());
158 TaskPoll::Pending
159 }
160 }
161}
162
163/// A future that resolves once the associated object becomes ready for writing
164#[derive(Debug)]
165pub struct Writable<'s, 'l, F: AsFd> {
166 io: &'s mut Async<'l, F>,
167}
168
169impl<'s, 'l, F: AsFd> std::future::Future for Writable<'s, 'l, F> {
170 type Output = ();
171 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<()> {
172 let io: &mut &mut Async<'_, F> = &mut self.as_mut().io;
173 let readiness: Readiness = io.readiness();
174 if readiness.writable || readiness.error {
175 TaskPoll::Ready(())
176 } else {
177 let _ = io.register_waker(interest:Interest::WRITE, waker:cx.waker().clone());
178 TaskPoll::Pending
179 }
180 }
181}
182
183impl<'l, F: AsFd> Drop for Async<'l, F> {
184 fn drop(&mut self) {
185 self.inner.kill(&self.dispatcher);
186 // restore flags
187 let _ = set_nonblocking(
188 fd:unsafe { BorrowedFd::borrow_raw(self.dispatcher.borrow().fd) },
189 self.was_nonblocking,
190 );
191 }
192}
193
194impl<'l, F: AsFd> Unpin for Async<'l, F> {}
195
196trait IoLoopInner {
197 unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>;
198 fn reregister(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>;
199 fn kill(&self, dispatcher: &RefCell<IoDispatcher>);
200}
201
202impl<'l, Data> IoLoopInner for LoopInner<'l, Data> {
203 unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> {
204 let disp = dispatcher.borrow();
205 self.poll.borrow_mut().register(
206 unsafe { BorrowedFd::borrow_raw(disp.fd) },
207 Interest::EMPTY,
208 Mode::OneShot,
209 disp.token.expect("No token for IO dispatcher"),
210 )
211 }
212
213 fn reregister(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> {
214 let disp = dispatcher.borrow();
215 self.poll.borrow_mut().reregister(
216 unsafe { BorrowedFd::borrow_raw(disp.fd) },
217 disp.interest,
218 Mode::OneShot,
219 disp.token.expect("No token for IO dispatcher"),
220 )
221 }
222
223 fn kill(&self, dispatcher: &RefCell<IoDispatcher>) {
224 let token = dispatcher
225 .borrow()
226 .token
227 .expect("No token for IO dispatcher");
228 if let Ok(slot) = self.sources.borrow_mut().get_mut(token.inner) {
229 slot.source = None;
230 }
231 }
232}
233
234struct IoDispatcher {
235 fd: RawFd, // FIXME: `BorrowedFd`? How to statically verify it doesn't outlive file?
236 token: Option<Token>,
237 waker: Option<Waker>,
238 is_registered: bool,
239 interest: Interest,
240 last_readiness: Readiness,
241}
242
243impl IoDispatcher {
244 fn readiness(&mut self) -> Readiness {
245 std::mem::replace(&mut self.last_readiness, src:Readiness::EMPTY)
246 }
247}
248
249impl<Data> EventDispatcher<Data> for RefCell<IoDispatcher> {
250 fn process_events(
251 &self,
252 readiness: Readiness,
253 _token: Token,
254 _data: &mut Data,
255 ) -> crate::Result<PostAction> {
256 let mut disp = self.borrow_mut();
257 disp.last_readiness = readiness;
258 if let Some(waker) = disp.waker.take() {
259 waker.wake();
260 }
261 Ok(PostAction::Continue)
262 }
263
264 fn register(
265 &self,
266 _: &mut Poll,
267 _: &mut AdditionalLifecycleEventsSet,
268 _: &mut TokenFactory,
269 ) -> crate::Result<()> {
270 // registration is handled by IoLoopInner
271 unreachable!()
272 }
273
274 fn reregister(
275 &self,
276 _: &mut Poll,
277 _: &mut AdditionalLifecycleEventsSet,
278 _: &mut TokenFactory,
279 ) -> crate::Result<bool> {
280 // registration is handled by IoLoopInner
281 unreachable!()
282 }
283
284 fn unregister(
285 &self,
286 poll: &mut Poll,
287 _: &mut AdditionalLifecycleEventsSet,
288 _: RegistrationToken,
289 ) -> crate::Result<bool> {
290 let disp = self.borrow();
291 if disp.is_registered {
292 poll.unregister(unsafe { BorrowedFd::borrow_raw(disp.fd) })?;
293 }
294 Ok(true)
295 }
296
297 fn before_sleep(&self) -> crate::Result<Option<(Readiness, Token)>> {
298 Ok(None)
299 }
300 fn before_handle_events(&self, _: EventIterator<'_>) {}
301}
302
303/*
304 * Async IO trait implementations
305 */
306
307#[cfg(feature = "futures-io")]
308#[cfg_attr(docsrs, doc(cfg(feature = "futures-io")))]
309impl<'l, F: AsFd + std::io::Read> AsyncRead for Async<'l, F> {
310 fn poll_read(
311 mut self: Pin<&mut Self>,
312 cx: &mut Context<'_>,
313 buf: &mut [u8],
314 ) -> TaskPoll<std::io::Result<usize>> {
315 match (*self).get_mut().read(buf) {
316 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
317 res => return TaskPoll::Ready(res),
318 }
319 self.register_waker(Interest::READ, cx.waker().clone())?;
320 TaskPoll::Pending
321 }
322
323 fn poll_read_vectored(
324 mut self: Pin<&mut Self>,
325 cx: &mut Context<'_>,
326 bufs: &mut [IoSliceMut<'_>],
327 ) -> TaskPoll<std::io::Result<usize>> {
328 match (*self).get_mut().read_vectored(bufs) {
329 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
330 res => return TaskPoll::Ready(res),
331 }
332 self.register_waker(Interest::READ, cx.waker().clone())?;
333 TaskPoll::Pending
334 }
335}
336
337#[cfg(feature = "futures-io")]
338#[cfg_attr(docsrs, doc(cfg(feature = "futures-io")))]
339impl<'l, F: AsFd + std::io::Write> AsyncWrite for Async<'l, F> {
340 fn poll_write(
341 mut self: Pin<&mut Self>,
342 cx: &mut Context<'_>,
343 buf: &[u8],
344 ) -> TaskPoll<std::io::Result<usize>> {
345 match (*self).get_mut().write(buf) {
346 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
347 res => return TaskPoll::Ready(res),
348 }
349 self.register_waker(Interest::WRITE, cx.waker().clone())?;
350 TaskPoll::Pending
351 }
352
353 fn poll_write_vectored(
354 mut self: Pin<&mut Self>,
355 cx: &mut Context<'_>,
356 bufs: &[IoSlice<'_>],
357 ) -> TaskPoll<std::io::Result<usize>> {
358 match (*self).get_mut().write_vectored(bufs) {
359 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
360 res => return TaskPoll::Ready(res),
361 }
362 self.register_waker(Interest::WRITE, cx.waker().clone())?;
363 TaskPoll::Pending
364 }
365
366 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<std::io::Result<()>> {
367 match (*self).get_mut().flush() {
368 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
369 res => return TaskPoll::Ready(res),
370 }
371 self.register_waker(Interest::WRITE, cx.waker().clone())?;
372 TaskPoll::Pending
373 }
374
375 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<std::io::Result<()>> {
376 self.poll_flush(cx)
377 }
378}
379
380// https://github.com/smol-rs/async-io/blob/6499077421495f2200d5b86918399f3a84bbe8e4/src/lib.rs#L2171-L2195
381/// Set the nonblocking status of an FD and return whether it was nonblocking before.
382#[allow(clippy::needless_return)]
383#[inline]
384fn set_nonblocking(fd: BorrowedFd<'_>, is_nonblocking: bool) -> std::io::Result<bool> {
385 #[cfg(windows)]
386 {
387 rustix::io::ioctl_fionbio(fd, is_nonblocking)?;
388
389 // Unfortunately it is impossible to tell if a socket was nonblocking on Windows.
390 // Just say it wasn't for now.
391 return Ok(false);
392 }
393
394 #[cfg(not(windows))]
395 {
396 let previous = rustix::fs::fcntl_getfl(fd)?;
397 let new = if is_nonblocking {
398 previous | rustix::fs::OFlags::NONBLOCK
399 } else {
400 previous & !(rustix::fs::OFlags::NONBLOCK)
401 };
402 if new != previous {
403 rustix::fs::fcntl_setfl(fd, new)?;
404 }
405
406 return Ok(previous.contains(rustix::fs::OFlags::NONBLOCK));
407 }
408}
409
410#[cfg(all(test, unix, feature = "executor", feature = "futures-io"))]
411mod tests {
412 use futures::io::{AsyncReadExt, AsyncWriteExt};
413
414 use crate::sources::futures::executor;
415
416 #[test]
417 fn read_write() {
418 let mut event_loop = crate::EventLoop::try_new().unwrap();
419 let handle = event_loop.handle();
420 let (exec, sched) = executor().unwrap();
421 handle
422 .insert_source(exec, move |ret, &mut (), got| {
423 *got = ret;
424 })
425 .unwrap();
426
427 let (tx, rx) = std::os::unix::net::UnixStream::pair().unwrap();
428 let mut tx = handle.adapt_io(tx).unwrap();
429 let mut rx = handle.adapt_io(rx).unwrap();
430 let received = std::rc::Rc::new(std::cell::Cell::new(false));
431 let fut_received = received.clone();
432
433 sched
434 .schedule(async move {
435 let mut buf = [0; 12];
436 rx.read_exact(&mut buf).await.unwrap();
437 assert_eq!(&buf, b"Hello World!");
438 fut_received.set(true);
439 })
440 .unwrap();
441
442 // The receiving future alone cannot advance
443 event_loop
444 .dispatch(Some(std::time::Duration::from_millis(10)), &mut ())
445 .unwrap();
446 assert!(!received.get());
447
448 // schedule the writing future as well and wait until finish
449 sched
450 .schedule(async move {
451 tx.write_all(b"Hello World!").await.unwrap();
452 tx.flush().await.unwrap();
453 })
454 .unwrap();
455
456 while !received.get() {
457 event_loop.dispatch(None, &mut ()).unwrap();
458 }
459 }
460
461 #[test]
462 fn read_write_vectored() {
463 let mut event_loop = crate::EventLoop::try_new().unwrap();
464 let handle = event_loop.handle();
465 let (exec, sched) = executor().unwrap();
466 handle
467 .insert_source(exec, move |ret, &mut (), got| {
468 *got = ret;
469 })
470 .unwrap();
471
472 let (tx, rx) = std::os::unix::net::UnixStream::pair().unwrap();
473 let mut tx = handle.adapt_io(tx).unwrap();
474 let mut rx = handle.adapt_io(rx).unwrap();
475 let received = std::rc::Rc::new(std::cell::Cell::new(false));
476 let fut_received = received.clone();
477
478 sched
479 .schedule(async move {
480 let mut buf = [0; 12];
481 let mut ioslices = buf
482 .chunks_mut(2)
483 .map(std::io::IoSliceMut::new)
484 .collect::<Vec<_>>();
485 let count = rx.read_vectored(&mut ioslices).await.unwrap();
486 assert_eq!(count, 12);
487 assert_eq!(&buf, b"Hello World!");
488 fut_received.set(true);
489 })
490 .unwrap();
491
492 // The receiving future alone cannot advance
493 event_loop
494 .dispatch(Some(std::time::Duration::from_millis(10)), &mut ())
495 .unwrap();
496 assert!(!received.get());
497
498 // schedule the writing future as well and wait until finish
499 sched
500 .schedule(async move {
501 let buf = b"Hello World!";
502 let ioslices = buf.chunks(2).map(std::io::IoSlice::new).collect::<Vec<_>>();
503 let count = tx.write_vectored(&ioslices).await.unwrap();
504 assert_eq!(count, 12);
505 tx.flush().await.unwrap();
506 })
507 .unwrap();
508
509 while !received.get() {
510 event_loop.dispatch(None, &mut ()).unwrap();
511 }
512 }
513
514 #[test]
515 fn readable() {
516 use std::io::Write;
517
518 let mut event_loop = crate::EventLoop::try_new().unwrap();
519 let handle = event_loop.handle();
520 let (exec, sched) = executor().unwrap();
521 handle
522 .insert_source(exec, move |(), &mut (), got| {
523 *got = true;
524 })
525 .unwrap();
526
527 let (mut tx, rx) = std::os::unix::net::UnixStream::pair().unwrap();
528
529 let mut rx = handle.adapt_io(rx).unwrap();
530 sched
531 .schedule(async move {
532 rx.readable().await;
533 })
534 .unwrap();
535
536 let mut dispatched = false;
537
538 event_loop
539 .dispatch(Some(std::time::Duration::from_millis(100)), &mut dispatched)
540 .unwrap();
541 // The socket is not yet readable, so the readable() future has not completed
542 assert!(!dispatched);
543
544 tx.write_all(&[42]).unwrap();
545 tx.flush().unwrap();
546
547 // Now we should become readable
548 while !dispatched {
549 event_loop.dispatch(None, &mut dispatched).unwrap();
550 }
551 }
552
553 #[test]
554 fn writable() {
555 use std::io::{BufReader, BufWriter, Read, Write};
556
557 let mut event_loop = crate::EventLoop::try_new().unwrap();
558 let handle = event_loop.handle();
559 let (exec, sched) = executor().unwrap();
560 handle
561 .insert_source(exec, move |(), &mut (), got| {
562 *got = true;
563 })
564 .unwrap();
565
566 let (mut tx, mut rx) = std::os::unix::net::UnixStream::pair().unwrap();
567 tx.set_nonblocking(true).unwrap();
568 rx.set_nonblocking(true).unwrap();
569
570 // First, fill the socket buffers
571 {
572 let mut writer = BufWriter::new(&mut tx);
573 let data = vec![42u8; 1024];
574 loop {
575 if writer.write(&data).is_err() {
576 break;
577 }
578 }
579 }
580
581 // Now, wait for it to be readable
582 let mut tx = handle.adapt_io(tx).unwrap();
583 sched
584 .schedule(async move {
585 tx.writable().await;
586 })
587 .unwrap();
588
589 let mut dispatched = false;
590
591 event_loop
592 .dispatch(Some(std::time::Duration::from_millis(100)), &mut dispatched)
593 .unwrap();
594 // The socket is not yet writable, so the readable() future has not completed
595 assert!(!dispatched);
596
597 // now read everything
598 {
599 let mut reader = BufReader::new(&mut rx);
600 let mut buffer = vec![0u8; 1024];
601 loop {
602 if reader.read(&mut buffer).is_err() {
603 break;
604 }
605 }
606 }
607
608 // Now we should become writable
609 while !dispatched {
610 event_loop.dispatch(None, &mut dispatched).unwrap();
611 }
612 }
613}
614