1#![warn(rust_2018_idioms)]
2#![cfg(all(unix, feature = "full"))]
3
4use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
5use std::sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc,
8};
9use std::time::Duration;
10use std::{
11 future::Future,
12 io::{self, ErrorKind, Read, Write},
13 task::{Context, Waker},
14};
15
16use nix::unistd::{close, read, write};
17
18use futures::poll;
19
20use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
21use tokio_test::{assert_err, assert_pending};
22
23struct TestWaker {
24 inner: Arc<TestWakerInner>,
25 waker: Waker,
26}
27
28#[derive(Default)]
29struct TestWakerInner {
30 awoken: AtomicBool,
31}
32
33impl futures::task::ArcWake for TestWakerInner {
34 fn wake_by_ref(arc_self: &Arc<Self>) {
35 arc_self.awoken.store(true, Ordering::SeqCst);
36 }
37}
38
39impl TestWaker {
40 fn new() -> Self {
41 let inner: Arc<TestWakerInner> = Default::default();
42
43 Self {
44 inner: inner.clone(),
45 waker: futures::task::waker(inner),
46 }
47 }
48
49 fn awoken(&self) -> bool {
50 self.inner.awoken.swap(false, Ordering::SeqCst)
51 }
52
53 fn context(&self) -> Context<'_> {
54 Context::from_waker(&self.waker)
55 }
56}
57
58#[derive(Debug)]
59struct FileDescriptor {
60 fd: RawFd,
61}
62
63impl AsRawFd for FileDescriptor {
64 fn as_raw_fd(&self) -> RawFd {
65 self.fd
66 }
67}
68
69impl Read for &FileDescriptor {
70 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
71 read(self.fd, buf).map_err(io::Error::from)
72 }
73}
74
75impl Read for FileDescriptor {
76 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
77 (self as &Self).read(buf)
78 }
79}
80
81impl Write for &FileDescriptor {
82 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
83 write(self.fd, buf).map_err(io::Error::from)
84 }
85
86 fn flush(&mut self) -> io::Result<()> {
87 Ok(())
88 }
89}
90
91impl Write for FileDescriptor {
92 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
93 (self as &Self).write(buf)
94 }
95
96 fn flush(&mut self) -> io::Result<()> {
97 (self as &Self).flush()
98 }
99}
100
101impl Drop for FileDescriptor {
102 fn drop(&mut self) {
103 let _ = close(self.fd);
104 }
105}
106
107fn set_nonblocking(fd: RawFd) {
108 use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
109
110 let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
111
112 if flags < 0 {
113 panic!(
114 "bad return value from fcntl(F_GETFL): {} ({:?})",
115 flags,
116 nix::Error::last()
117 );
118 }
119
120 let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
121
122 nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
123}
124
125fn socketpair() -> (FileDescriptor, FileDescriptor) {
126 use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
127
128 let (fd_a, fd_b) = socket::socketpair(
129 AddressFamily::Unix,
130 SockType::Stream,
131 None,
132 SockFlag::empty(),
133 )
134 .expect("socketpair");
135 let fds = (
136 FileDescriptor {
137 fd: fd_a.into_raw_fd(),
138 },
139 FileDescriptor {
140 fd: fd_b.into_raw_fd(),
141 },
142 );
143
144 set_nonblocking(fds.0.fd);
145 set_nonblocking(fds.1.fd);
146
147 fds
148}
149
150fn drain(mut fd: &FileDescriptor) {
151 let mut buf = [0u8; 512];
152
153 loop {
154 match fd.read(&mut buf[..]) {
155 Err(e) if e.kind() == ErrorKind::WouldBlock => break,
156 Ok(0) => panic!("unexpected EOF"),
157 Err(e) => panic!("unexpected error: {:?}", e),
158 Ok(_) => continue,
159 }
160 }
161}
162
163#[tokio::test]
164async fn initially_writable() {
165 let (a, b) = socketpair();
166
167 let afd_a = AsyncFd::new(a).unwrap();
168 let afd_b = AsyncFd::new(b).unwrap();
169
170 afd_a.writable().await.unwrap().clear_ready();
171 afd_b.writable().await.unwrap().clear_ready();
172
173 tokio::select! {
174 biased;
175 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
176 _ = afd_a.readable() => panic!("Unexpected readable state"),
177 _ = afd_b.readable() => panic!("Unexpected readable state"),
178 }
179}
180
181#[tokio::test]
182async fn reset_readable() {
183 let (a, mut b) = socketpair();
184
185 let afd_a = AsyncFd::new(a).unwrap();
186
187 let readable = afd_a.readable();
188 tokio::pin!(readable);
189
190 tokio::select! {
191 _ = readable.as_mut() => panic!(),
192 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
193 }
194
195 b.write_all(b"0").unwrap();
196
197 let mut guard = readable.await.unwrap();
198
199 guard
200 .try_io(|_| afd_a.get_ref().read(&mut [0]))
201 .unwrap()
202 .unwrap();
203
204 // `a` is not readable, but the reactor still thinks it is
205 // (because we have not observed a not-ready error yet)
206 afd_a.readable().await.unwrap().retain_ready();
207
208 // Explicitly clear the ready state
209 guard.clear_ready();
210
211 let readable = afd_a.readable();
212 tokio::pin!(readable);
213
214 tokio::select! {
215 _ = readable.as_mut() => panic!(),
216 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
217 }
218
219 b.write_all(b"0").unwrap();
220
221 // We can observe the new readable event
222 afd_a.readable().await.unwrap().clear_ready();
223}
224
225#[tokio::test]
226async fn reset_writable() {
227 let (a, b) = socketpair();
228
229 let afd_a = AsyncFd::new(a).unwrap();
230
231 let mut guard = afd_a.writable().await.unwrap();
232
233 // Write until we get a WouldBlock. This also clears the ready state.
234 while guard
235 .try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
236 .is_ok()
237 {}
238
239 // Writable state should be cleared now.
240 let writable = afd_a.writable();
241 tokio::pin!(writable);
242
243 tokio::select! {
244 _ = writable.as_mut() => panic!(),
245 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
246 }
247
248 // Read from the other side; we should become writable now.
249 drain(&b);
250
251 let _ = writable.await.unwrap();
252}
253
254#[derive(Debug)]
255struct ArcFd<T>(Arc<T>);
256impl<T: AsRawFd> AsRawFd for ArcFd<T> {
257 fn as_raw_fd(&self) -> RawFd {
258 self.0.as_raw_fd()
259 }
260}
261
262#[tokio::test]
263async fn drop_closes() {
264 let (a, mut b) = socketpair();
265
266 let afd_a = AsyncFd::new(a).unwrap();
267
268 assert_eq!(
269 ErrorKind::WouldBlock,
270 b.read(&mut [0]).err().unwrap().kind()
271 );
272
273 std::mem::drop(afd_a);
274
275 assert_eq!(0, b.read(&mut [0]).unwrap());
276
277 // into_inner does not close the fd
278
279 let (a, mut b) = socketpair();
280 let afd_a = AsyncFd::new(a).unwrap();
281 let _a: FileDescriptor = afd_a.into_inner();
282
283 assert_eq!(
284 ErrorKind::WouldBlock,
285 b.read(&mut [0]).err().unwrap().kind()
286 );
287
288 // Drop closure behavior is delegated to the inner object
289 let (a, mut b) = socketpair();
290 let arc_fd = Arc::new(a);
291 let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
292 std::mem::drop(afd_a);
293
294 assert_eq!(
295 ErrorKind::WouldBlock,
296 b.read(&mut [0]).err().unwrap().kind()
297 );
298
299 std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
300}
301
302#[tokio::test]
303async fn reregister() {
304 let (a, _b) = socketpair();
305
306 let afd_a = AsyncFd::new(a).unwrap();
307 let a = afd_a.into_inner();
308 AsyncFd::new(a).unwrap();
309}
310
311#[tokio::test]
312async fn try_io() {
313 let (a, mut b) = socketpair();
314
315 b.write_all(b"0").unwrap();
316
317 let afd_a = AsyncFd::new(a).unwrap();
318
319 let mut guard = afd_a.readable().await.unwrap();
320
321 afd_a.get_ref().read_exact(&mut [0]).unwrap();
322
323 // Should not clear the readable state
324 let _ = guard.try_io(|_| Ok(()));
325
326 // Still readable...
327 let _ = afd_a.readable().await.unwrap();
328
329 // Should clear the readable state
330 let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
331
332 // Assert not readable
333 let readable = afd_a.readable();
334 tokio::pin!(readable);
335
336 tokio::select! {
337 _ = readable.as_mut() => panic!(),
338 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
339 }
340
341 // Write something down b again and make sure we're reawoken
342 b.write_all(b"0").unwrap();
343 let _ = readable.await.unwrap();
344}
345
346#[tokio::test]
347async fn multiple_waiters() {
348 let (a, mut b) = socketpair();
349 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
350
351 let barrier = Arc::new(tokio::sync::Barrier::new(11));
352
353 let mut tasks = Vec::new();
354 for _ in 0..10 {
355 let afd_a = afd_a.clone();
356 let barrier = barrier.clone();
357
358 let f = async move {
359 let notify_barrier = async {
360 barrier.wait().await;
361 futures::future::pending::<()>().await;
362 };
363
364 tokio::select! {
365 biased;
366 guard = afd_a.readable() => {
367 tokio::task::yield_now().await;
368 guard.unwrap().clear_ready()
369 },
370 _ = notify_barrier => unreachable!(),
371 }
372
373 std::mem::drop(afd_a);
374 };
375
376 tasks.push(tokio::spawn(f));
377 }
378
379 let mut all_tasks = futures::future::try_join_all(tasks);
380
381 tokio::select! {
382 r = std::pin::Pin::new(&mut all_tasks) => {
383 r.unwrap(); // propagate panic
384 panic!("Tasks exited unexpectedly")
385 },
386 _ = barrier.wait() => {}
387 };
388
389 b.write_all(b"0").unwrap();
390
391 all_tasks.await.unwrap();
392}
393
394#[tokio::test]
395async fn poll_fns() {
396 let (a, b) = socketpair();
397 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
398 let afd_b = Arc::new(AsyncFd::new(b).unwrap());
399
400 // Fill up the write side of A
401 while afd_a.get_ref().write(&[0; 512]).is_ok() {}
402
403 let waker = TestWaker::new();
404
405 assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
406
407 let afd_a_2 = afd_a.clone();
408 let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
409 let barrier_clone = r_barrier.clone();
410
411 let read_fut = tokio::spawn(async move {
412 // Move waker onto this task first
413 assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
414 .as_ref()
415 .poll_read_ready(cx))));
416 barrier_clone.wait().await;
417
418 let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
419 });
420
421 let afd_a_2 = afd_a.clone();
422 let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
423 let barrier_clone = w_barrier.clone();
424
425 let mut write_fut = tokio::spawn(async move {
426 // Move waker onto this task first
427 assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
428 .as_ref()
429 .poll_write_ready(cx))));
430 barrier_clone.wait().await;
431
432 let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
433 });
434
435 r_barrier.wait().await;
436 w_barrier.wait().await;
437
438 let readable = afd_a.readable();
439 tokio::pin!(readable);
440
441 tokio::select! {
442 _ = &mut readable => unreachable!(),
443 _ = tokio::task::yield_now() => {}
444 }
445
446 // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
447 afd_b.get_ref().write_all(b"0").unwrap();
448
449 let _ = tokio::join!(readable, read_fut);
450
451 // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
452 assert!(!waker.awoken());
453
454 // The writable side should not be awoken
455 tokio::select! {
456 _ = &mut write_fut => unreachable!(),
457 _ = tokio::time::sleep(Duration::from_millis(5)) => {}
458 }
459
460 // Make it writable now
461 drain(afd_b.get_ref());
462
463 // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
464 let _ = write_fut.await;
465}
466
467fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
468 let mut pinned = Box::pin(f);
469
470 assert_pending!(pinned
471 .as_mut()
472 .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
473
474 pinned
475}
476
477fn rt() -> tokio::runtime::Runtime {
478 tokio::runtime::Builder::new_current_thread()
479 .enable_all()
480 .build()
481 .unwrap()
482}
483
484#[test]
485fn driver_shutdown_wakes_currently_pending() {
486 let rt = rt();
487
488 let (a, _b) = socketpair();
489 let afd_a = {
490 let _enter = rt.enter();
491 AsyncFd::new(a).unwrap()
492 };
493
494 let readable = assert_pending(afd_a.readable());
495
496 std::mem::drop(rt);
497
498 // The future was initialized **before** dropping the rt
499 assert_err!(futures::executor::block_on(readable));
500
501 // The future is initialized **after** dropping the rt.
502 assert_err!(futures::executor::block_on(afd_a.readable()));
503}
504
505#[test]
506fn driver_shutdown_wakes_future_pending() {
507 let rt = rt();
508
509 let (a, _b) = socketpair();
510 let afd_a = {
511 let _enter = rt.enter();
512 AsyncFd::new(a).unwrap()
513 };
514
515 std::mem::drop(rt);
516
517 assert_err!(futures::executor::block_on(afd_a.readable()));
518}
519
520#[test]
521fn driver_shutdown_wakes_pending_race() {
522 // TODO: make this a loom test
523 for _ in 0..100 {
524 let rt = rt();
525
526 let (a, _b) = socketpair();
527 let afd_a = {
528 let _enter = rt.enter();
529 AsyncFd::new(a).unwrap()
530 };
531
532 let _ = std::thread::spawn(move || std::mem::drop(rt));
533
534 // This may or may not return an error (but will be awoken)
535 let _ = futures::executor::block_on(afd_a.readable());
536
537 // However retrying will always return an error
538 assert_err!(futures::executor::block_on(afd_a.readable()));
539 }
540}
541
542async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
543 futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
544}
545
546async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
547 futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
548}
549
550#[test]
551fn driver_shutdown_wakes_currently_pending_polls() {
552 let rt = rt();
553
554 let (a, _b) = socketpair();
555 let afd_a = {
556 let _enter = rt.enter();
557 AsyncFd::new(a).unwrap()
558 };
559
560 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
561
562 let readable = assert_pending(poll_readable(&afd_a));
563 let writable = assert_pending(poll_writable(&afd_a));
564
565 std::mem::drop(rt);
566
567 // Attempting to poll readiness when the rt is dropped is an error
568 assert_err!(futures::executor::block_on(readable));
569 assert_err!(futures::executor::block_on(writable));
570}
571
572#[test]
573fn driver_shutdown_wakes_poll() {
574 let rt = rt();
575
576 let (a, _b) = socketpair();
577 let afd_a = {
578 let _enter = rt.enter();
579 AsyncFd::new(a).unwrap()
580 };
581
582 std::mem::drop(rt);
583
584 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
585 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
586}
587
588#[test]
589fn driver_shutdown_then_clear_readiness() {
590 let rt = rt();
591
592 let (a, _b) = socketpair();
593 let afd_a = {
594 let _enter = rt.enter();
595 AsyncFd::new(a).unwrap()
596 };
597
598 let mut write_ready = rt.block_on(afd_a.writable()).unwrap();
599
600 std::mem::drop(rt);
601
602 write_ready.clear_ready();
603}
604
605#[test]
606fn driver_shutdown_wakes_poll_race() {
607 // TODO: make this a loom test
608 for _ in 0..100 {
609 let rt = rt();
610
611 let (a, _b) = socketpair();
612 let afd_a = {
613 let _enter = rt.enter();
614 AsyncFd::new(a).unwrap()
615 };
616
617 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
618
619 let _ = std::thread::spawn(move || std::mem::drop(rt));
620
621 // The poll variants will always return an error in this case
622 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
623 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
624 }
625}
626
627#[tokio::test]
628#[cfg(any(target_os = "linux", target_os = "android"))]
629async fn priority_event_on_oob_data() {
630 use std::net::SocketAddr;
631
632 use tokio::io::Interest;
633
634 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
635
636 let listener = std::net::TcpListener::bind(addr).unwrap();
637 let addr = listener.local_addr().unwrap();
638
639 let client = std::net::TcpStream::connect(addr).unwrap();
640 let client = AsyncFd::with_interest(client, Interest::PRIORITY).unwrap();
641
642 let (stream, _) = listener.accept().unwrap();
643
644 // Sending out of band data should trigger priority event.
645 send_oob_data(&stream, b"hello").unwrap();
646
647 let _ = client.ready(Interest::PRIORITY).await.unwrap();
648}
649
650#[cfg(any(target_os = "linux", target_os = "android"))]
651fn send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize> {
652 unsafe {
653 let res = libc::send(
654 stream.as_raw_fd(),
655 data.as_ptr().cast(),
656 data.len(),
657 libc::MSG_OOB,
658 );
659 if res == -1 {
660 Err(io::Error::last_os_error())
661 } else {
662 Ok(res as usize)
663 }
664 }
665}
666
667#[tokio::test]
668async fn clear_ready_matching_clears_ready() {
669 use tokio::io::{Interest, Ready};
670
671 let (a, mut b) = socketpair();
672
673 let afd_a = AsyncFd::new(a).unwrap();
674 b.write_all(b"0").unwrap();
675
676 let mut guard = afd_a
677 .ready(Interest::READABLE | Interest::WRITABLE)
678 .await
679 .unwrap();
680
681 assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);
682
683 guard.clear_ready_matching(Ready::READABLE);
684 assert_eq!(guard.ready(), Ready::WRITABLE);
685
686 guard.clear_ready_matching(Ready::WRITABLE);
687 assert_eq!(guard.ready(), Ready::EMPTY);
688}
689
690#[tokio::test]
691async fn clear_ready_matching_clears_ready_mut() {
692 use tokio::io::{Interest, Ready};
693
694 let (a, mut b) = socketpair();
695
696 let mut afd_a = AsyncFd::new(a).unwrap();
697 b.write_all(b"0").unwrap();
698
699 let mut guard = afd_a
700 .ready_mut(Interest::READABLE | Interest::WRITABLE)
701 .await
702 .unwrap();
703
704 assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);
705
706 guard.clear_ready_matching(Ready::READABLE);
707 assert_eq!(guard.ready(), Ready::WRITABLE);
708
709 guard.clear_ready_matching(Ready::WRITABLE);
710 assert_eq!(guard.ready(), Ready::EMPTY);
711}
712
713#[tokio::test]
714#[cfg(target_os = "linux")]
715async fn await_error_readiness_timestamping() {
716 use std::net::{Ipv4Addr, SocketAddr};
717
718 use tokio::io::{Interest, Ready};
719
720 let address_a = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
721 let address_b = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
722
723 let socket = std::net::UdpSocket::bind(address_a).unwrap();
724
725 socket.set_nonblocking(true).unwrap();
726
727 // configure send timestamps
728 configure_timestamping_socket(&socket).unwrap();
729
730 socket.connect(address_b).unwrap();
731
732 let fd = AsyncFd::new(socket).unwrap();
733
734 tokio::select! {
735 _ = fd.ready(Interest::ERROR) => panic!(),
736 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
737 }
738
739 let buf = b"hello there";
740 fd.get_ref().send(buf).unwrap();
741
742 // the send timestamp should now be in the error queue
743 let guard = fd.ready(Interest::ERROR).await.unwrap();
744 assert_eq!(guard.ready(), Ready::ERROR);
745}
746
747#[cfg(target_os = "linux")]
748fn configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int> {
749 // enable software timestamping, and specifically software send timestamping
750 let options = libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_TX_SOFTWARE;
751
752 let res = unsafe {
753 libc::setsockopt(
754 udp_socket.as_raw_fd(),
755 libc::SOL_SOCKET,
756 libc::SO_TIMESTAMP,
757 &options as *const _ as *const libc::c_void,
758 std::mem::size_of_val(&options) as libc::socklen_t,
759 )
760 };
761
762 if res == -1 {
763 Err(std::io::Error::last_os_error())
764 } else {
765 Ok(res)
766 }
767}
768
769#[tokio::test]
770#[cfg(target_os = "linux")]
771async fn await_error_readiness_invalid_address() {
772 use std::net::{Ipv4Addr, SocketAddr};
773 use tokio::io::{Interest, Ready};
774
775 let socket_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
776 let socket = std::net::UdpSocket::bind(socket_addr).unwrap();
777 let socket_fd = socket.as_raw_fd();
778
779 // Enable IP_RECVERR option to receive error messages
780 // https://man7.org/linux/man-pages/man7/ip.7.html has some extra information
781 let recv_err: libc::c_int = 1;
782 unsafe {
783 let res = libc::setsockopt(
784 socket.as_raw_fd(),
785 libc::SOL_IP,
786 libc::IP_RECVERR,
787 &recv_err as *const _ as *const libc::c_void,
788 std::mem::size_of_val(&recv_err) as libc::socklen_t,
789 );
790 if res == -1 {
791 panic!("{:?}", std::io::Error::last_os_error());
792 }
793 }
794
795 // Spawn a separate thread for sending messages
796 tokio::spawn(async move {
797 // Set the destination address. This address is invalid in this context. the OS will notice
798 // that nobody is listening on port this port. Normally this is ignored (UDP is "fire and forget"),
799 // but because IP_RECVERR is enabled, the error will actually be reported to the sending socket
800 let mut dest_addr =
801 unsafe { std::mem::MaybeUninit::<libc::sockaddr_in>::zeroed().assume_init() };
802 dest_addr.sin_family = libc::AF_INET as _;
803 // based on https://en.wikipedia.org/wiki/Ephemeral_port, we should pick a port number
804 // below 1024 to guarantee that other tests don't select this port by accident when they
805 // use port 0 to select an ephemeral port.
806 dest_addr.sin_port = 512u16.to_be(); // Destination port
807
808 // Prepare the message data
809 let message = "Hello, Socket!";
810
811 // Prepare the message structure for sendmsg
812 let mut iov = libc::iovec {
813 iov_base: message.as_ptr() as *mut libc::c_void,
814 iov_len: message.len(),
815 };
816
817 // Prepare the destination address for the sendmsg call
818 let dest_sockaddr: *const libc::sockaddr = &dest_addr as *const _ as *const libc::sockaddr;
819 let dest_addrlen: libc::socklen_t = std::mem::size_of_val(&dest_addr) as libc::socklen_t;
820
821 let mut msg: libc::msghdr = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };
822 msg.msg_name = dest_sockaddr as *mut libc::c_void;
823 msg.msg_namelen = dest_addrlen;
824 msg.msg_iov = &mut iov;
825 msg.msg_iovlen = 1;
826
827 if unsafe { libc::sendmsg(socket_fd, &msg, 0) } == -1 {
828 Err(std::io::Error::last_os_error()).unwrap()
829 }
830 });
831
832 let fd = AsyncFd::new(socket).unwrap();
833
834 let guard = fd.ready(Interest::ERROR).await.unwrap();
835 assert_eq!(guard.ready(), Ready::ERROR);
836}
837