1use std::{
2 io::{Read, Seek, Write},
3 ops::Deref,
4 os::unix::io::AsRawFd,
5 pin::Pin,
6 sync::atomic::{AtomicBool, Ordering},
7 thread, time,
8};
9
10use libc::c_int;
11use nix::{
12 errno::*,
13 sys::{
14 aio::*,
15 signal::{
16 sigaction, SaFlags, SigAction, SigHandler, SigSet, SigevNotify,
17 Signal,
18 },
19 time::{TimeSpec, TimeValLike},
20 },
21};
22use tempfile::tempfile;
23
24pub static SIGNALED: AtomicBool = AtomicBool::new(false);
25
26extern "C" fn sigfunc(_: c_int) {
27 SIGNALED.store(true, Ordering::Relaxed);
28}
29
30// Helper that polls an AioCb for completion or error
31macro_rules! poll_aio {
32 ($aiocb: expr) => {
33 loop {
34 let err = $aiocb.as_mut().error();
35 if err != Err(Errno::EINPROGRESS) {
36 break err;
37 };
38 thread::sleep(time::Duration::from_millis(10));
39 }
40 };
41}
42
43mod aio_fsync {
44 use super::*;
45
46 #[test]
47 fn test_accessors() {
48 let aiocb = AioFsync::new(
49 1001,
50 AioFsyncMode::O_SYNC,
51 42,
52 SigevNotify::SigevSignal {
53 signal: Signal::SIGUSR2,
54 si_value: 99,
55 },
56 );
57 assert_eq!(1001, aiocb.fd());
58 assert_eq!(AioFsyncMode::O_SYNC, aiocb.mode());
59 assert_eq!(42, aiocb.priority());
60 let sev = aiocb.sigevent().sigevent();
61 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
62 assert_eq!(99, sev.sigev_value.sival_ptr as i64);
63 }
64
65 /// `AioFsync::submit` should not modify the `AioCb` object if
66 /// `libc::aio_fsync` returns an error
67 // Skip on Linux, because Linux's AIO implementation can't detect errors
68 // synchronously
69 #[test]
70 #[cfg(any(target_os = "freebsd", target_os = "macos"))]
71 fn error() {
72 use std::mem;
73
74 const INITIAL: &[u8] = b"abcdef123456";
75 // Create an invalid AioFsyncMode
76 let mode = unsafe { mem::transmute(666) };
77 let mut f = tempfile().unwrap();
78 f.write_all(INITIAL).unwrap();
79 let mut aiof = Box::pin(AioFsync::new(
80 f.as_raw_fd(),
81 mode,
82 0,
83 SigevNotify::SigevNone,
84 ));
85 let err = aiof.as_mut().submit();
86 err.expect_err("assertion failed");
87 }
88
89 #[test]
90 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
91 fn ok() {
92 const INITIAL: &[u8] = b"abcdef123456";
93 let mut f = tempfile().unwrap();
94 f.write_all(INITIAL).unwrap();
95 let fd = f.as_raw_fd();
96 let mut aiof = Box::pin(AioFsync::new(
97 fd,
98 AioFsyncMode::O_SYNC,
99 0,
100 SigevNotify::SigevNone,
101 ));
102 aiof.as_mut().submit().unwrap();
103 poll_aio!(&mut aiof).unwrap();
104 aiof.as_mut().aio_return().unwrap();
105 }
106}
107
108mod aio_read {
109 use super::*;
110
111 #[test]
112 fn test_accessors() {
113 let mut rbuf = vec![0; 4];
114 let aiocb = AioRead::new(
115 1001,
116 2, //offset
117 &mut rbuf,
118 42, //priority
119 SigevNotify::SigevSignal {
120 signal: Signal::SIGUSR2,
121 si_value: 99,
122 },
123 );
124 assert_eq!(1001, aiocb.fd());
125 assert_eq!(4, aiocb.nbytes());
126 assert_eq!(2, aiocb.offset());
127 assert_eq!(42, aiocb.priority());
128 let sev = aiocb.sigevent().sigevent();
129 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
130 assert_eq!(99, sev.sigev_value.sival_ptr as i64);
131 }
132
133 // Tests AioWrite.cancel. We aren't trying to test the OS's implementation,
134 // only our bindings. So it's sufficient to check that cancel
135 // returned any AioCancelStat value.
136 #[test]
137 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
138 fn cancel() {
139 const INITIAL: &[u8] = b"abcdef123456";
140 let mut rbuf = vec![0; 4];
141 let mut f = tempfile().unwrap();
142 f.write_all(INITIAL).unwrap();
143 let fd = f.as_raw_fd();
144 let mut aior =
145 Box::pin(AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone));
146 aior.as_mut().submit().unwrap();
147
148 aior.as_mut().cancel().unwrap();
149
150 // Wait for aiow to complete, but don't care whether it succeeded
151 let _ = poll_aio!(&mut aior);
152 let _ = aior.as_mut().aio_return();
153 }
154
155 /// `AioRead::submit` should not modify the `AioCb` object if
156 /// `libc::aio_read` returns an error
157 // Skip on Linux, because Linux's AIO implementation can't detect errors
158 // synchronously
159 #[test]
160 #[cfg(any(target_os = "freebsd", target_os = "macos"))]
161 fn error() {
162 const INITIAL: &[u8] = b"abcdef123456";
163 let mut rbuf = vec![0; 4];
164 let mut f = tempfile().unwrap();
165 f.write_all(INITIAL).unwrap();
166 let mut aior = Box::pin(AioRead::new(
167 f.as_raw_fd(),
168 -1, //an invalid offset
169 &mut rbuf,
170 0, //priority
171 SigevNotify::SigevNone,
172 ));
173 aior.as_mut().submit().expect_err("assertion failed");
174 }
175
176 // Test a simple aio operation with no completion notification. We must
177 // poll for completion
178 #[test]
179 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
180 fn ok() {
181 const INITIAL: &[u8] = b"abcdef123456";
182 let mut rbuf = vec![0; 4];
183 const EXPECT: &[u8] = b"cdef";
184 let mut f = tempfile().unwrap();
185 f.write_all(INITIAL).unwrap();
186 {
187 let fd = f.as_raw_fd();
188 let mut aior = Box::pin(AioRead::new(
189 fd,
190 2,
191 &mut rbuf,
192 0,
193 SigevNotify::SigevNone,
194 ));
195 aior.as_mut().submit().unwrap();
196
197 let err = poll_aio!(&mut aior);
198 assert_eq!(err, Ok(()));
199 assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len());
200 }
201 assert_eq!(EXPECT, rbuf.deref());
202 }
203
204 // Like ok, but allocates the structure on the stack.
205 #[test]
206 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
207 fn on_stack() {
208 const INITIAL: &[u8] = b"abcdef123456";
209 let mut rbuf = vec![0; 4];
210 const EXPECT: &[u8] = b"cdef";
211 let mut f = tempfile().unwrap();
212 f.write_all(INITIAL).unwrap();
213 {
214 let fd = f.as_raw_fd();
215 let mut aior =
216 AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone);
217 let mut aior = unsafe { Pin::new_unchecked(&mut aior) };
218 aior.as_mut().submit().unwrap();
219
220 let err = poll_aio!(&mut aior);
221 assert_eq!(err, Ok(()));
222 assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len());
223 }
224 assert_eq!(EXPECT, rbuf.deref());
225 }
226}
227
228#[cfg(target_os = "freebsd")]
229#[cfg(fbsd14)]
230mod aio_readv {
231 use std::io::IoSliceMut;
232
233 use super::*;
234
235 #[test]
236 fn test_accessors() {
237 let mut rbuf0 = vec![0; 4];
238 let mut rbuf1 = vec![0; 8];
239 let mut rbufs =
240 [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
241 let aiocb = AioReadv::new(
242 1001,
243 2, //offset
244 &mut rbufs,
245 42, //priority
246 SigevNotify::SigevSignal {
247 signal: Signal::SIGUSR2,
248 si_value: 99,
249 },
250 );
251 assert_eq!(1001, aiocb.fd());
252 assert_eq!(2, aiocb.iovlen());
253 assert_eq!(2, aiocb.offset());
254 assert_eq!(42, aiocb.priority());
255 let sev = aiocb.sigevent().sigevent();
256 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
257 assert_eq!(99, sev.sigev_value.sival_ptr as i64);
258 }
259
260 #[test]
261 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
262 fn ok() {
263 const INITIAL: &[u8] = b"abcdef123456";
264 let mut rbuf0 = vec![0; 4];
265 let mut rbuf1 = vec![0; 2];
266 let mut rbufs =
267 [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
268 const EXPECT0: &[u8] = b"cdef";
269 const EXPECT1: &[u8] = b"12";
270 let mut f = tempfile().unwrap();
271 f.write_all(INITIAL).unwrap();
272 {
273 let fd = f.as_raw_fd();
274 let mut aior = Box::pin(AioReadv::new(
275 fd,
276 2,
277 &mut rbufs,
278 0,
279 SigevNotify::SigevNone,
280 ));
281 aior.as_mut().submit().unwrap();
282
283 let err = poll_aio!(&mut aior);
284 assert_eq!(err, Ok(()));
285 assert_eq!(
286 aior.as_mut().aio_return().unwrap(),
287 EXPECT0.len() + EXPECT1.len()
288 );
289 }
290 assert_eq!(&EXPECT0, &rbuf0);
291 assert_eq!(&EXPECT1, &rbuf1);
292 }
293}
294
295mod aio_write {
296 use super::*;
297
298 #[test]
299 fn test_accessors() {
300 let wbuf = vec![0; 4];
301 let aiocb = AioWrite::new(
302 1001,
303 2, //offset
304 &wbuf,
305 42, //priority
306 SigevNotify::SigevSignal {
307 signal: Signal::SIGUSR2,
308 si_value: 99,
309 },
310 );
311 assert_eq!(1001, aiocb.fd());
312 assert_eq!(4, aiocb.nbytes());
313 assert_eq!(2, aiocb.offset());
314 assert_eq!(42, aiocb.priority());
315 let sev = aiocb.sigevent().sigevent();
316 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
317 assert_eq!(99, sev.sigev_value.sival_ptr as i64);
318 }
319
320 // Tests AioWrite.cancel. We aren't trying to test the OS's implementation,
321 // only our bindings. So it's sufficient to check that cancel
322 // returned any AioCancelStat value.
323 #[test]
324 #[cfg_attr(target_env = "musl", ignore)]
325 fn cancel() {
326 let wbuf: &[u8] = b"CDEF";
327
328 let f = tempfile().unwrap();
329 let mut aiow = Box::pin(AioWrite::new(
330 f.as_raw_fd(),
331 0,
332 wbuf,
333 0,
334 SigevNotify::SigevNone,
335 ));
336 aiow.as_mut().submit().unwrap();
337 let err = aiow.as_mut().error();
338 assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
339
340 aiow.as_mut().cancel().unwrap();
341
342 // Wait for aiow to complete, but don't care whether it succeeded
343 let _ = poll_aio!(&mut aiow);
344 let _ = aiow.as_mut().aio_return();
345 }
346
347 // Test a simple aio operation with no completion notification. We must
348 // poll for completion.
349 #[test]
350 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
351 fn ok() {
352 const INITIAL: &[u8] = b"abcdef123456";
353 let wbuf = "CDEF".to_string().into_bytes();
354 let mut rbuf = Vec::new();
355 const EXPECT: &[u8] = b"abCDEF123456";
356
357 let mut f = tempfile().unwrap();
358 f.write_all(INITIAL).unwrap();
359 let mut aiow = Box::pin(AioWrite::new(
360 f.as_raw_fd(),
361 2,
362 &wbuf,
363 0,
364 SigevNotify::SigevNone,
365 ));
366 aiow.as_mut().submit().unwrap();
367
368 let err = poll_aio!(&mut aiow);
369 assert_eq!(err, Ok(()));
370 assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len());
371
372 f.rewind().unwrap();
373 let len = f.read_to_end(&mut rbuf).unwrap();
374 assert_eq!(len, EXPECT.len());
375 assert_eq!(rbuf, EXPECT);
376 }
377
378 // Like ok, but allocates the structure on the stack.
379 #[test]
380 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
381 fn on_stack() {
382 const INITIAL: &[u8] = b"abcdef123456";
383 let wbuf = "CDEF".to_string().into_bytes();
384 let mut rbuf = Vec::new();
385 const EXPECT: &[u8] = b"abCDEF123456";
386
387 let mut f = tempfile().unwrap();
388 f.write_all(INITIAL).unwrap();
389 let mut aiow = AioWrite::new(
390 f.as_raw_fd(),
391 2, //offset
392 &wbuf,
393 0, //priority
394 SigevNotify::SigevNone,
395 );
396 let mut aiow = unsafe { Pin::new_unchecked(&mut aiow) };
397 aiow.as_mut().submit().unwrap();
398
399 let err = poll_aio!(&mut aiow);
400 assert_eq!(err, Ok(()));
401 assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len());
402
403 f.rewind().unwrap();
404 let len = f.read_to_end(&mut rbuf).unwrap();
405 assert_eq!(len, EXPECT.len());
406 assert_eq!(rbuf, EXPECT);
407 }
408
409 /// `AioWrite::write` should not modify the `AioCb` object if
410 /// `libc::aio_write` returns an error.
411 // Skip on Linux, because Linux's AIO implementation can't detect errors
412 // synchronously
413 #[test]
414 #[cfg(any(target_os = "freebsd", target_os = "macos"))]
415 fn error() {
416 let wbuf = "CDEF".to_string().into_bytes();
417 let mut aiow = Box::pin(AioWrite::new(
418 666, // An invalid file descriptor
419 0, //offset
420 &wbuf,
421 0, //priority
422 SigevNotify::SigevNone,
423 ));
424 aiow.as_mut().submit().expect_err("assertion failed");
425 // Dropping the AioWrite at this point should not panic
426 }
427}
428
429#[cfg(target_os = "freebsd")]
430#[cfg(fbsd14)]
431mod aio_writev {
432 use std::io::IoSlice;
433
434 use super::*;
435
436 #[test]
437 fn test_accessors() {
438 let wbuf0 = vec![0; 4];
439 let wbuf1 = vec![0; 8];
440 let wbufs = [IoSlice::new(&wbuf0), IoSlice::new(&wbuf1)];
441 let aiocb = AioWritev::new(
442 1001,
443 2, //offset
444 &wbufs,
445 42, //priority
446 SigevNotify::SigevSignal {
447 signal: Signal::SIGUSR2,
448 si_value: 99,
449 },
450 );
451 assert_eq!(1001, aiocb.fd());
452 assert_eq!(2, aiocb.iovlen());
453 assert_eq!(2, aiocb.offset());
454 assert_eq!(42, aiocb.priority());
455 let sev = aiocb.sigevent().sigevent();
456 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
457 assert_eq!(99, sev.sigev_value.sival_ptr as i64);
458 }
459
460 // Test a simple aio operation with no completion notification. We must
461 // poll for completion.
462 #[test]
463 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
464 fn ok() {
465 const INITIAL: &[u8] = b"abcdef123456";
466 let wbuf0 = b"BC";
467 let wbuf1 = b"DEF";
468 let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)];
469 let wlen = wbuf0.len() + wbuf1.len();
470 let mut rbuf = Vec::new();
471 const EXPECT: &[u8] = b"aBCDEF123456";
472
473 let mut f = tempfile().unwrap();
474 f.write_all(INITIAL).unwrap();
475 let mut aiow = Box::pin(AioWritev::new(
476 f.as_raw_fd(),
477 1,
478 &wbufs,
479 0,
480 SigevNotify::SigevNone,
481 ));
482 aiow.as_mut().submit().unwrap();
483
484 let err = poll_aio!(&mut aiow);
485 assert_eq!(err, Ok(()));
486 assert_eq!(aiow.as_mut().aio_return().unwrap(), wlen);
487
488 f.rewind().unwrap();
489 let len = f.read_to_end(&mut rbuf).unwrap();
490 assert_eq!(len, EXPECT.len());
491 assert_eq!(rbuf, EXPECT);
492 }
493}
494
495// Test an aio operation with completion delivered by a signal
496#[test]
497#[cfg_attr(
498 any(
499 all(target_env = "musl", target_arch = "x86_64"),
500 target_arch = "mips",
501 target_arch = "mips64"
502 ),
503 ignore
504)]
505fn sigev_signal() {
506 let _m = crate::SIGNAL_MTX.lock();
507 let sa = SigAction::new(
508 SigHandler::Handler(sigfunc),
509 SaFlags::SA_RESETHAND,
510 SigSet::empty(),
511 );
512 SIGNALED.store(false, Ordering::Relaxed);
513 unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
514
515 const INITIAL: &[u8] = b"abcdef123456";
516 const WBUF: &[u8] = b"CDEF";
517 let mut rbuf = Vec::new();
518 const EXPECT: &[u8] = b"abCDEF123456";
519
520 let mut f = tempfile().unwrap();
521 f.write_all(INITIAL).unwrap();
522 let mut aiow = Box::pin(AioWrite::new(
523 f.as_raw_fd(),
524 2, //offset
525 WBUF,
526 0, //priority
527 SigevNotify::SigevSignal {
528 signal: Signal::SIGUSR2,
529 si_value: 0, //TODO: validate in sigfunc
530 },
531 ));
532 aiow.as_mut().submit().unwrap();
533 while !SIGNALED.load(Ordering::Relaxed) {
534 thread::sleep(time::Duration::from_millis(10));
535 }
536
537 assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
538 f.rewind().unwrap();
539 let len = f.read_to_end(&mut rbuf).unwrap();
540 assert_eq!(len, EXPECT.len());
541 assert_eq!(rbuf, EXPECT);
542}
543
544// Tests using aio_cancel_all for all outstanding IOs.
545#[test]
546#[cfg_attr(target_env = "musl", ignore)]
547fn test_aio_cancel_all() {
548 let wbuf: &[u8] = b"CDEF";
549
550 let f = tempfile().unwrap();
551 let mut aiocb = Box::pin(AioWrite::new(
552 f.as_raw_fd(),
553 0, //offset
554 wbuf,
555 0, //priority
556 SigevNotify::SigevNone,
557 ));
558 aiocb.as_mut().submit().unwrap();
559 let err = aiocb.as_mut().error();
560 assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
561
562 aio_cancel_all(f.as_raw_fd()).unwrap();
563
564 // Wait for aiocb to complete, but don't care whether it succeeded
565 let _ = poll_aio!(&mut aiocb);
566 let _ = aiocb.as_mut().aio_return();
567}
568
569#[test]
570// On Cirrus on Linux, this test fails due to a glibc bug.
571// https://github.com/nix-rust/nix/issues/1099
572#[cfg_attr(target_os = "linux", ignore)]
573// On Cirrus, aio_suspend is failing with EINVAL
574// https://github.com/nix-rust/nix/issues/1361
575#[cfg_attr(target_os = "macos", ignore)]
576fn test_aio_suspend() {
577 const INITIAL: &[u8] = b"abcdef123456";
578 const WBUF: &[u8] = b"CDEFG";
579 let timeout = TimeSpec::seconds(10);
580 let mut rbuf = vec![0; 4];
581 let rlen = rbuf.len();
582 let mut f = tempfile().unwrap();
583 f.write_all(INITIAL).unwrap();
584
585 let mut wcb = Box::pin(AioWrite::new(
586 f.as_raw_fd(),
587 2, //offset
588 WBUF,
589 0, //priority
590 SigevNotify::SigevNone,
591 ));
592
593 let mut rcb = Box::pin(AioRead::new(
594 f.as_raw_fd(),
595 8, //offset
596 &mut rbuf,
597 0, //priority
598 SigevNotify::SigevNone,
599 ));
600 wcb.as_mut().submit().unwrap();
601 rcb.as_mut().submit().unwrap();
602 loop {
603 {
604 let cbbuf = [
605 &*wcb as &dyn AsRef<libc::aiocb>,
606 &*rcb as &dyn AsRef<libc::aiocb>,
607 ];
608 let r = aio_suspend(&cbbuf[..], Some(timeout));
609 match r {
610 Err(Errno::EINTR) => continue,
611 Err(e) => panic!("aio_suspend returned {e:?}"),
612 Ok(_) => (),
613 };
614 }
615 if rcb.as_mut().error() != Err(Errno::EINPROGRESS)
616 && wcb.as_mut().error() != Err(Errno::EINPROGRESS)
617 {
618 break;
619 }
620 }
621
622 assert_eq!(wcb.as_mut().aio_return().unwrap(), WBUF.len());
623 assert_eq!(rcb.as_mut().aio_return().unwrap(), rlen);
624}
625