1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (all(unix, feature = "full" ))] |
3 | |
4 | use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd}; |
5 | use std::sync::{ |
6 | atomic::{AtomicBool, Ordering}, |
7 | Arc, |
8 | }; |
9 | use std::time::Duration; |
10 | use std::{ |
11 | future::Future, |
12 | io::{self, ErrorKind, Read, Write}, |
13 | task::{Context, Waker}, |
14 | }; |
15 | |
16 | use nix::unistd::{close, read, write}; |
17 | |
18 | use futures::poll; |
19 | |
20 | use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard}; |
21 | use tokio_test::{assert_err, assert_pending}; |
22 | |
23 | struct TestWaker { |
24 | inner: Arc<TestWakerInner>, |
25 | waker: Waker, |
26 | } |
27 | |
28 | #[derive(Default)] |
29 | struct TestWakerInner { |
30 | awoken: AtomicBool, |
31 | } |
32 | |
33 | impl futures::task::ArcWake for TestWakerInner { |
34 | fn wake_by_ref(arc_self: &Arc<Self>) { |
35 | arc_self.awoken.store(true, Ordering::SeqCst); |
36 | } |
37 | } |
38 | |
39 | impl TestWaker { |
40 | fn new() -> Self { |
41 | let inner: Arc<TestWakerInner> = Default::default(); |
42 | |
43 | Self { |
44 | inner: inner.clone(), |
45 | waker: futures::task::waker(inner), |
46 | } |
47 | } |
48 | |
49 | fn awoken(&self) -> bool { |
50 | self.inner.awoken.swap(false, Ordering::SeqCst) |
51 | } |
52 | |
53 | fn context(&self) -> Context<'_> { |
54 | Context::from_waker(&self.waker) |
55 | } |
56 | } |
57 | |
58 | #[derive(Debug)] |
59 | struct FileDescriptor { |
60 | fd: RawFd, |
61 | } |
62 | |
63 | impl AsRawFd for FileDescriptor { |
64 | fn as_raw_fd(&self) -> RawFd { |
65 | self.fd |
66 | } |
67 | } |
68 | |
69 | impl Read for &FileDescriptor { |
70 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
71 | read(self.fd, buf).map_err(io::Error::from) |
72 | } |
73 | } |
74 | |
75 | impl Read for FileDescriptor { |
76 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
77 | (self as &Self).read(buf) |
78 | } |
79 | } |
80 | |
81 | impl Write for &FileDescriptor { |
82 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
83 | write(self.fd, buf).map_err(io::Error::from) |
84 | } |
85 | |
86 | fn flush(&mut self) -> io::Result<()> { |
87 | Ok(()) |
88 | } |
89 | } |
90 | |
91 | impl Write for FileDescriptor { |
92 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
93 | (self as &Self).write(buf) |
94 | } |
95 | |
96 | fn flush(&mut self) -> io::Result<()> { |
97 | (self as &Self).flush() |
98 | } |
99 | } |
100 | |
101 | impl Drop for FileDescriptor { |
102 | fn drop(&mut self) { |
103 | let _ = close(self.fd); |
104 | } |
105 | } |
106 | |
107 | fn set_nonblocking(fd: RawFd) { |
108 | use nix::fcntl::{OFlag, F_GETFL, F_SETFL}; |
109 | |
110 | let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)" ); |
111 | |
112 | if flags < 0 { |
113 | panic!( |
114 | "bad return value from fcntl(F_GETFL): {} ({:?})" , |
115 | flags, |
116 | nix::Error::last() |
117 | ); |
118 | } |
119 | |
120 | let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK; |
121 | |
122 | nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)" ); |
123 | } |
124 | |
125 | fn socketpair() -> (FileDescriptor, FileDescriptor) { |
126 | use nix::sys::socket::{self, AddressFamily, SockFlag, SockType}; |
127 | |
128 | let (fd_a, fd_b) = socket::socketpair( |
129 | AddressFamily::Unix, |
130 | SockType::Stream, |
131 | None, |
132 | SockFlag::empty(), |
133 | ) |
134 | .expect("socketpair" ); |
135 | let fds = ( |
136 | FileDescriptor { |
137 | fd: fd_a.into_raw_fd(), |
138 | }, |
139 | FileDescriptor { |
140 | fd: fd_b.into_raw_fd(), |
141 | }, |
142 | ); |
143 | |
144 | set_nonblocking(fds.0.fd); |
145 | set_nonblocking(fds.1.fd); |
146 | |
147 | fds |
148 | } |
149 | |
150 | fn drain(mut fd: &FileDescriptor) { |
151 | let mut buf = [0u8; 512]; |
152 | |
153 | loop { |
154 | match fd.read(&mut buf[..]) { |
155 | Err(e) if e.kind() == ErrorKind::WouldBlock => break, |
156 | Ok(0) => panic!("unexpected EOF" ), |
157 | Err(e) => panic!("unexpected error: {:?}" , e), |
158 | Ok(_) => continue, |
159 | } |
160 | } |
161 | } |
162 | |
163 | #[tokio::test ] |
164 | async fn initially_writable() { |
165 | let (a, b) = socketpair(); |
166 | |
167 | let afd_a = AsyncFd::new(a).unwrap(); |
168 | let afd_b = AsyncFd::new(b).unwrap(); |
169 | |
170 | afd_a.writable().await.unwrap().clear_ready(); |
171 | afd_b.writable().await.unwrap().clear_ready(); |
172 | |
173 | tokio::select! { |
174 | biased; |
175 | _ = tokio::time::sleep(Duration::from_millis(10)) => {}, |
176 | _ = afd_a.readable() => panic!("Unexpected readable state" ), |
177 | _ = afd_b.readable() => panic!("Unexpected readable state" ), |
178 | } |
179 | } |
180 | |
181 | #[tokio::test ] |
182 | async fn reset_readable() { |
183 | let (a, mut b) = socketpair(); |
184 | |
185 | let afd_a = AsyncFd::new(a).unwrap(); |
186 | |
187 | let readable = afd_a.readable(); |
188 | tokio::pin!(readable); |
189 | |
190 | tokio::select! { |
191 | _ = readable.as_mut() => panic!(), |
192 | _ = tokio::time::sleep(Duration::from_millis(10)) => {} |
193 | } |
194 | |
195 | b.write_all(b"0" ).unwrap(); |
196 | |
197 | let mut guard = readable.await.unwrap(); |
198 | |
199 | guard |
200 | .try_io(|_| afd_a.get_ref().read(&mut [0])) |
201 | .unwrap() |
202 | .unwrap(); |
203 | |
204 | // `a` is not readable, but the reactor still thinks it is |
205 | // (because we have not observed a not-ready error yet) |
206 | afd_a.readable().await.unwrap().retain_ready(); |
207 | |
208 | // Explicitly clear the ready state |
209 | guard.clear_ready(); |
210 | |
211 | let readable = afd_a.readable(); |
212 | tokio::pin!(readable); |
213 | |
214 | tokio::select! { |
215 | _ = readable.as_mut() => panic!(), |
216 | _ = tokio::time::sleep(Duration::from_millis(10)) => {} |
217 | } |
218 | |
219 | b.write_all(b"0" ).unwrap(); |
220 | |
221 | // We can observe the new readable event |
222 | afd_a.readable().await.unwrap().clear_ready(); |
223 | } |
224 | |
225 | #[tokio::test ] |
226 | async fn reset_writable() { |
227 | let (a, b) = socketpair(); |
228 | |
229 | let afd_a = AsyncFd::new(a).unwrap(); |
230 | |
231 | let mut guard = afd_a.writable().await.unwrap(); |
232 | |
233 | // Write until we get a WouldBlock. This also clears the ready state. |
234 | while guard |
235 | .try_io(|_| afd_a.get_ref().write(&[0; 512][..])) |
236 | .is_ok() |
237 | {} |
238 | |
239 | // Writable state should be cleared now. |
240 | let writable = afd_a.writable(); |
241 | tokio::pin!(writable); |
242 | |
243 | tokio::select! { |
244 | _ = writable.as_mut() => panic!(), |
245 | _ = tokio::time::sleep(Duration::from_millis(10)) => {} |
246 | } |
247 | |
248 | // Read from the other side; we should become writable now. |
249 | drain(&b); |
250 | |
251 | let _ = writable.await.unwrap(); |
252 | } |
253 | |
254 | #[derive(Debug)] |
255 | struct ArcFd<T>(Arc<T>); |
256 | impl<T: AsRawFd> AsRawFd for ArcFd<T> { |
257 | fn as_raw_fd(&self) -> RawFd { |
258 | self.0.as_raw_fd() |
259 | } |
260 | } |
261 | |
262 | #[tokio::test ] |
263 | async fn drop_closes() { |
264 | let (a, mut b) = socketpair(); |
265 | |
266 | let afd_a = AsyncFd::new(a).unwrap(); |
267 | |
268 | assert_eq!( |
269 | ErrorKind::WouldBlock, |
270 | b.read(&mut [0]).err().unwrap().kind() |
271 | ); |
272 | |
273 | std::mem::drop(afd_a); |
274 | |
275 | assert_eq!(0, b.read(&mut [0]).unwrap()); |
276 | |
277 | // into_inner does not close the fd |
278 | |
279 | let (a, mut b) = socketpair(); |
280 | let afd_a = AsyncFd::new(a).unwrap(); |
281 | let _a: FileDescriptor = afd_a.into_inner(); |
282 | |
283 | assert_eq!( |
284 | ErrorKind::WouldBlock, |
285 | b.read(&mut [0]).err().unwrap().kind() |
286 | ); |
287 | |
288 | // Drop closure behavior is delegated to the inner object |
289 | let (a, mut b) = socketpair(); |
290 | let arc_fd = Arc::new(a); |
291 | let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap(); |
292 | std::mem::drop(afd_a); |
293 | |
294 | assert_eq!( |
295 | ErrorKind::WouldBlock, |
296 | b.read(&mut [0]).err().unwrap().kind() |
297 | ); |
298 | |
299 | std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning |
300 | } |
301 | |
302 | #[tokio::test ] |
303 | async fn reregister() { |
304 | let (a, _b) = socketpair(); |
305 | |
306 | let afd_a = AsyncFd::new(a).unwrap(); |
307 | let a = afd_a.into_inner(); |
308 | AsyncFd::new(a).unwrap(); |
309 | } |
310 | |
311 | #[tokio::test ] |
312 | async fn try_io() { |
313 | let (a, mut b) = socketpair(); |
314 | |
315 | b.write_all(b"0" ).unwrap(); |
316 | |
317 | let afd_a = AsyncFd::new(a).unwrap(); |
318 | |
319 | let mut guard = afd_a.readable().await.unwrap(); |
320 | |
321 | afd_a.get_ref().read_exact(&mut [0]).unwrap(); |
322 | |
323 | // Should not clear the readable state |
324 | let _ = guard.try_io(|_| Ok(())); |
325 | |
326 | // Still readable... |
327 | let _ = afd_a.readable().await.unwrap(); |
328 | |
329 | // Should clear the readable state |
330 | let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into())); |
331 | |
332 | // Assert not readable |
333 | let readable = afd_a.readable(); |
334 | tokio::pin!(readable); |
335 | |
336 | tokio::select! { |
337 | _ = readable.as_mut() => panic!(), |
338 | _ = tokio::time::sleep(Duration::from_millis(10)) => {} |
339 | } |
340 | |
341 | // Write something down b again and make sure we're reawoken |
342 | b.write_all(b"0" ).unwrap(); |
343 | let _ = readable.await.unwrap(); |
344 | } |
345 | |
346 | #[tokio::test ] |
347 | async fn multiple_waiters() { |
348 | let (a, mut b) = socketpair(); |
349 | let afd_a = Arc::new(AsyncFd::new(a).unwrap()); |
350 | |
351 | let barrier = Arc::new(tokio::sync::Barrier::new(11)); |
352 | |
353 | let mut tasks = Vec::new(); |
354 | for _ in 0..10 { |
355 | let afd_a = afd_a.clone(); |
356 | let barrier = barrier.clone(); |
357 | |
358 | let f = async move { |
359 | let notify_barrier = async { |
360 | barrier.wait().await; |
361 | futures::future::pending::<()>().await; |
362 | }; |
363 | |
364 | tokio::select! { |
365 | biased; |
366 | guard = afd_a.readable() => { |
367 | tokio::task::yield_now().await; |
368 | guard.unwrap().clear_ready() |
369 | }, |
370 | _ = notify_barrier => unreachable!(), |
371 | } |
372 | |
373 | std::mem::drop(afd_a); |
374 | }; |
375 | |
376 | tasks.push(tokio::spawn(f)); |
377 | } |
378 | |
379 | let mut all_tasks = futures::future::try_join_all(tasks); |
380 | |
381 | tokio::select! { |
382 | r = std::pin::Pin::new(&mut all_tasks) => { |
383 | r.unwrap(); // propagate panic |
384 | panic!("Tasks exited unexpectedly" ) |
385 | }, |
386 | _ = barrier.wait() => {} |
387 | }; |
388 | |
389 | b.write_all(b"0" ).unwrap(); |
390 | |
391 | all_tasks.await.unwrap(); |
392 | } |
393 | |
394 | #[tokio::test ] |
395 | async fn poll_fns() { |
396 | let (a, b) = socketpair(); |
397 | let afd_a = Arc::new(AsyncFd::new(a).unwrap()); |
398 | let afd_b = Arc::new(AsyncFd::new(b).unwrap()); |
399 | |
400 | // Fill up the write side of A |
401 | while afd_a.get_ref().write(&[0; 512]).is_ok() {} |
402 | |
403 | let waker = TestWaker::new(); |
404 | |
405 | assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context())); |
406 | |
407 | let afd_a_2 = afd_a.clone(); |
408 | let r_barrier = Arc::new(tokio::sync::Barrier::new(2)); |
409 | let barrier_clone = r_barrier.clone(); |
410 | |
411 | let read_fut = tokio::spawn(async move { |
412 | // Move waker onto this task first |
413 | assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2 |
414 | .as_ref() |
415 | .poll_read_ready(cx)))); |
416 | barrier_clone.wait().await; |
417 | |
418 | let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await; |
419 | }); |
420 | |
421 | let afd_a_2 = afd_a.clone(); |
422 | let w_barrier = Arc::new(tokio::sync::Barrier::new(2)); |
423 | let barrier_clone = w_barrier.clone(); |
424 | |
425 | let mut write_fut = tokio::spawn(async move { |
426 | // Move waker onto this task first |
427 | assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2 |
428 | .as_ref() |
429 | .poll_write_ready(cx)))); |
430 | barrier_clone.wait().await; |
431 | |
432 | let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await; |
433 | }); |
434 | |
435 | r_barrier.wait().await; |
436 | w_barrier.wait().await; |
437 | |
438 | let readable = afd_a.readable(); |
439 | tokio::pin!(readable); |
440 | |
441 | tokio::select! { |
442 | _ = &mut readable => unreachable!(), |
443 | _ = tokio::task::yield_now() => {} |
444 | } |
445 | |
446 | // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly |
447 | afd_b.get_ref().write_all(b"0" ).unwrap(); |
448 | |
449 | let _ = tokio::join!(readable, read_fut); |
450 | |
451 | // Our original waker should _not_ be awoken (poll_read_ready retains only the last context) |
452 | assert!(!waker.awoken()); |
453 | |
454 | // The writable side should not be awoken |
455 | tokio::select! { |
456 | _ = &mut write_fut => unreachable!(), |
457 | _ = tokio::time::sleep(Duration::from_millis(5)) => {} |
458 | } |
459 | |
460 | // Make it writable now |
461 | drain(afd_b.get_ref()); |
462 | |
463 | // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side) |
464 | let _ = write_fut.await; |
465 | } |
466 | |
467 | fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> { |
468 | let mut pinned = Box::pin(f); |
469 | |
470 | assert_pending!(pinned |
471 | .as_mut() |
472 | .poll(&mut Context::from_waker(futures::task::noop_waker_ref()))); |
473 | |
474 | pinned |
475 | } |
476 | |
477 | fn rt() -> tokio::runtime::Runtime { |
478 | tokio::runtime::Builder::new_current_thread() |
479 | .enable_all() |
480 | .build() |
481 | .unwrap() |
482 | } |
483 | |
484 | #[test] |
485 | fn driver_shutdown_wakes_currently_pending() { |
486 | let rt = rt(); |
487 | |
488 | let (a, _b) = socketpair(); |
489 | let afd_a = { |
490 | let _enter = rt.enter(); |
491 | AsyncFd::new(a).unwrap() |
492 | }; |
493 | |
494 | let readable = assert_pending(afd_a.readable()); |
495 | |
496 | std::mem::drop(rt); |
497 | |
498 | // The future was initialized **before** dropping the rt |
499 | assert_err!(futures::executor::block_on(readable)); |
500 | |
501 | // The future is initialized **after** dropping the rt. |
502 | assert_err!(futures::executor::block_on(afd_a.readable())); |
503 | } |
504 | |
505 | #[test] |
506 | fn driver_shutdown_wakes_future_pending() { |
507 | let rt = rt(); |
508 | |
509 | let (a, _b) = socketpair(); |
510 | let afd_a = { |
511 | let _enter = rt.enter(); |
512 | AsyncFd::new(a).unwrap() |
513 | }; |
514 | |
515 | std::mem::drop(rt); |
516 | |
517 | assert_err!(futures::executor::block_on(afd_a.readable())); |
518 | } |
519 | |
520 | #[test] |
521 | fn driver_shutdown_wakes_pending_race() { |
522 | // TODO: make this a loom test |
523 | for _ in 0..100 { |
524 | let rt = rt(); |
525 | |
526 | let (a, _b) = socketpair(); |
527 | let afd_a = { |
528 | let _enter = rt.enter(); |
529 | AsyncFd::new(a).unwrap() |
530 | }; |
531 | |
532 | let _ = std::thread::spawn(move || std::mem::drop(rt)); |
533 | |
534 | // This may or may not return an error (but will be awoken) |
535 | let _ = futures::executor::block_on(afd_a.readable()); |
536 | |
537 | // However retrying will always return an error |
538 | assert_err!(futures::executor::block_on(afd_a.readable())); |
539 | } |
540 | } |
541 | |
542 | async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> { |
543 | futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await |
544 | } |
545 | |
546 | async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> { |
547 | futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await |
548 | } |
549 | |
550 | #[test] |
551 | fn driver_shutdown_wakes_currently_pending_polls() { |
552 | let rt = rt(); |
553 | |
554 | let (a, _b) = socketpair(); |
555 | let afd_a = { |
556 | let _enter = rt.enter(); |
557 | AsyncFd::new(a).unwrap() |
558 | }; |
559 | |
560 | while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable |
561 | |
562 | let readable = assert_pending(poll_readable(&afd_a)); |
563 | let writable = assert_pending(poll_writable(&afd_a)); |
564 | |
565 | std::mem::drop(rt); |
566 | |
567 | // Attempting to poll readiness when the rt is dropped is an error |
568 | assert_err!(futures::executor::block_on(readable)); |
569 | assert_err!(futures::executor::block_on(writable)); |
570 | } |
571 | |
572 | #[test] |
573 | fn driver_shutdown_wakes_poll() { |
574 | let rt = rt(); |
575 | |
576 | let (a, _b) = socketpair(); |
577 | let afd_a = { |
578 | let _enter = rt.enter(); |
579 | AsyncFd::new(a).unwrap() |
580 | }; |
581 | |
582 | std::mem::drop(rt); |
583 | |
584 | assert_err!(futures::executor::block_on(poll_readable(&afd_a))); |
585 | assert_err!(futures::executor::block_on(poll_writable(&afd_a))); |
586 | } |
587 | |
588 | #[test] |
589 | fn driver_shutdown_then_clear_readiness() { |
590 | let rt = rt(); |
591 | |
592 | let (a, _b) = socketpair(); |
593 | let afd_a = { |
594 | let _enter = rt.enter(); |
595 | AsyncFd::new(a).unwrap() |
596 | }; |
597 | |
598 | let mut write_ready = rt.block_on(afd_a.writable()).unwrap(); |
599 | |
600 | std::mem::drop(rt); |
601 | |
602 | write_ready.clear_ready(); |
603 | } |
604 | |
605 | #[test] |
606 | fn driver_shutdown_wakes_poll_race() { |
607 | // TODO: make this a loom test |
608 | for _ in 0..100 { |
609 | let rt = rt(); |
610 | |
611 | let (a, _b) = socketpair(); |
612 | let afd_a = { |
613 | let _enter = rt.enter(); |
614 | AsyncFd::new(a).unwrap() |
615 | }; |
616 | |
617 | while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable |
618 | |
619 | let _ = std::thread::spawn(move || std::mem::drop(rt)); |
620 | |
621 | // The poll variants will always return an error in this case |
622 | assert_err!(futures::executor::block_on(poll_readable(&afd_a))); |
623 | assert_err!(futures::executor::block_on(poll_writable(&afd_a))); |
624 | } |
625 | } |
626 | |
627 | #[tokio::test ] |
628 | #[cfg (any(target_os = "linux" , target_os = "android" ))] |
629 | async fn priority_event_on_oob_data() { |
630 | use std::net::SocketAddr; |
631 | |
632 | use tokio::io::Interest; |
633 | |
634 | let addr: SocketAddr = "127.0.0.1:0" .parse().unwrap(); |
635 | |
636 | let listener = std::net::TcpListener::bind(addr).unwrap(); |
637 | let addr = listener.local_addr().unwrap(); |
638 | |
639 | let client = std::net::TcpStream::connect(addr).unwrap(); |
640 | let client = AsyncFd::with_interest(client, Interest::PRIORITY).unwrap(); |
641 | |
642 | let (stream, _) = listener.accept().unwrap(); |
643 | |
644 | // Sending out of band data should trigger priority event. |
645 | send_oob_data(&stream, b"hello" ).unwrap(); |
646 | |
647 | let _ = client.ready(Interest::PRIORITY).await.unwrap(); |
648 | } |
649 | |
650 | #[cfg (any(target_os = "linux" , target_os = "android" ))] |
651 | fn send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize> { |
652 | unsafe { |
653 | let res = libc::send( |
654 | stream.as_raw_fd(), |
655 | data.as_ptr().cast(), |
656 | data.len(), |
657 | libc::MSG_OOB, |
658 | ); |
659 | if res == -1 { |
660 | Err(io::Error::last_os_error()) |
661 | } else { |
662 | Ok(res as usize) |
663 | } |
664 | } |
665 | } |
666 | |
667 | #[tokio::test ] |
668 | async fn clear_ready_matching_clears_ready() { |
669 | use tokio::io::{Interest, Ready}; |
670 | |
671 | let (a, mut b) = socketpair(); |
672 | |
673 | let afd_a = AsyncFd::new(a).unwrap(); |
674 | b.write_all(b"0" ).unwrap(); |
675 | |
676 | let mut guard = afd_a |
677 | .ready(Interest::READABLE | Interest::WRITABLE) |
678 | .await |
679 | .unwrap(); |
680 | |
681 | assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE); |
682 | |
683 | guard.clear_ready_matching(Ready::READABLE); |
684 | assert_eq!(guard.ready(), Ready::WRITABLE); |
685 | |
686 | guard.clear_ready_matching(Ready::WRITABLE); |
687 | assert_eq!(guard.ready(), Ready::EMPTY); |
688 | } |
689 | |
690 | #[tokio::test ] |
691 | async fn clear_ready_matching_clears_ready_mut() { |
692 | use tokio::io::{Interest, Ready}; |
693 | |
694 | let (a, mut b) = socketpair(); |
695 | |
696 | let mut afd_a = AsyncFd::new(a).unwrap(); |
697 | b.write_all(b"0" ).unwrap(); |
698 | |
699 | let mut guard = afd_a |
700 | .ready_mut(Interest::READABLE | Interest::WRITABLE) |
701 | .await |
702 | .unwrap(); |
703 | |
704 | assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE); |
705 | |
706 | guard.clear_ready_matching(Ready::READABLE); |
707 | assert_eq!(guard.ready(), Ready::WRITABLE); |
708 | |
709 | guard.clear_ready_matching(Ready::WRITABLE); |
710 | assert_eq!(guard.ready(), Ready::EMPTY); |
711 | } |
712 | |
713 | #[tokio::test ] |
714 | #[cfg (target_os = "linux" )] |
715 | async fn await_error_readiness_timestamping() { |
716 | use std::net::{Ipv4Addr, SocketAddr}; |
717 | |
718 | use tokio::io::{Interest, Ready}; |
719 | |
720 | let address_a = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); |
721 | let address_b = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); |
722 | |
723 | let socket = std::net::UdpSocket::bind(address_a).unwrap(); |
724 | |
725 | socket.set_nonblocking(true).unwrap(); |
726 | |
727 | // configure send timestamps |
728 | configure_timestamping_socket(&socket).unwrap(); |
729 | |
730 | socket.connect(address_b).unwrap(); |
731 | |
732 | let fd = AsyncFd::new(socket).unwrap(); |
733 | |
734 | tokio::select! { |
735 | _ = fd.ready(Interest::ERROR) => panic!(), |
736 | _ = tokio::time::sleep(Duration::from_millis(10)) => {} |
737 | } |
738 | |
739 | let buf = b"hello there" ; |
740 | fd.get_ref().send(buf).unwrap(); |
741 | |
742 | // the send timestamp should now be in the error queue |
743 | let guard = fd.ready(Interest::ERROR).await.unwrap(); |
744 | assert_eq!(guard.ready(), Ready::ERROR); |
745 | } |
746 | |
747 | #[cfg (target_os = "linux" )] |
748 | fn configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int> { |
749 | // enable software timestamping, and specifically software send timestamping |
750 | let options = libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_TX_SOFTWARE; |
751 | |
752 | let res = unsafe { |
753 | libc::setsockopt( |
754 | udp_socket.as_raw_fd(), |
755 | libc::SOL_SOCKET, |
756 | libc::SO_TIMESTAMP, |
757 | &options as *const _ as *const libc::c_void, |
758 | std::mem::size_of_val(&options) as libc::socklen_t, |
759 | ) |
760 | }; |
761 | |
762 | if res == -1 { |
763 | Err(std::io::Error::last_os_error()) |
764 | } else { |
765 | Ok(res) |
766 | } |
767 | } |
768 | |
769 | #[tokio::test ] |
770 | #[cfg (target_os = "linux" )] |
771 | async fn await_error_readiness_invalid_address() { |
772 | use std::net::{Ipv4Addr, SocketAddr}; |
773 | use tokio::io::{Interest, Ready}; |
774 | |
775 | let socket_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); |
776 | let socket = std::net::UdpSocket::bind(socket_addr).unwrap(); |
777 | let socket_fd = socket.as_raw_fd(); |
778 | |
779 | // Enable IP_RECVERR option to receive error messages |
780 | // https://man7.org/linux/man-pages/man7/ip.7.html has some extra information |
781 | let recv_err: libc::c_int = 1; |
782 | unsafe { |
783 | let res = libc::setsockopt( |
784 | socket.as_raw_fd(), |
785 | libc::SOL_IP, |
786 | libc::IP_RECVERR, |
787 | &recv_err as *const _ as *const libc::c_void, |
788 | std::mem::size_of_val(&recv_err) as libc::socklen_t, |
789 | ); |
790 | if res == -1 { |
791 | panic!("{:?}" , std::io::Error::last_os_error()); |
792 | } |
793 | } |
794 | |
795 | // Spawn a separate thread for sending messages |
796 | tokio::spawn(async move { |
797 | // Set the destination address. This address is invalid in this context. the OS will notice |
798 | // that nobody is listening on port this port. Normally this is ignored (UDP is "fire and forget"), |
799 | // but because IP_RECVERR is enabled, the error will actually be reported to the sending socket |
800 | let mut dest_addr = |
801 | unsafe { std::mem::MaybeUninit::<libc::sockaddr_in>::zeroed().assume_init() }; |
802 | dest_addr.sin_family = libc::AF_INET as _; |
803 | // based on https://en.wikipedia.org/wiki/Ephemeral_port, we should pick a port number |
804 | // below 1024 to guarantee that other tests don't select this port by accident when they |
805 | // use port 0 to select an ephemeral port. |
806 | dest_addr.sin_port = 512u16.to_be(); // Destination port |
807 | |
808 | // Prepare the message data |
809 | let message = "Hello, Socket!" ; |
810 | |
811 | // Prepare the message structure for sendmsg |
812 | let mut iov = libc::iovec { |
813 | iov_base: message.as_ptr() as *mut libc::c_void, |
814 | iov_len: message.len(), |
815 | }; |
816 | |
817 | // Prepare the destination address for the sendmsg call |
818 | let dest_sockaddr: *const libc::sockaddr = &dest_addr as *const _ as *const libc::sockaddr; |
819 | let dest_addrlen: libc::socklen_t = std::mem::size_of_val(&dest_addr) as libc::socklen_t; |
820 | |
821 | let mut msg: libc::msghdr = unsafe { std::mem::MaybeUninit::zeroed().assume_init() }; |
822 | msg.msg_name = dest_sockaddr as *mut libc::c_void; |
823 | msg.msg_namelen = dest_addrlen; |
824 | msg.msg_iov = &mut iov; |
825 | msg.msg_iovlen = 1; |
826 | |
827 | if unsafe { libc::sendmsg(socket_fd, &msg, 0) } == -1 { |
828 | Err(std::io::Error::last_os_error()).unwrap() |
829 | } |
830 | }); |
831 | |
832 | let fd = AsyncFd::new(socket).unwrap(); |
833 | |
834 | let guard = fd.ready(Interest::ERROR).await.unwrap(); |
835 | assert_eq!(guard.ready(), Ready::ERROR); |
836 | } |
837 | |