1#![warn(rust_2018_idioms)]
2#![cfg(all(target_os = "freebsd", feature = "net"))]
3
4use mio_aio::{AioCb, AioFsyncMode, LioCb};
5use std::{
6 future::Future,
7 mem,
8 os::unix::io::{AsRawFd, RawFd},
9 pin::Pin,
10 task::{Context, Poll},
11};
12use tempfile::tempfile;
13use tokio::io::bsd::{Aio, AioSource};
14use tokio_test::assert_pending;
15
16mod aio {
17 use super::*;
18
19 /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource
20 struct WrappedAioCb<'a>(AioCb<'a>);
21 impl<'a> AioSource for WrappedAioCb<'a> {
22 fn register(&mut self, kq: RawFd, token: usize) {
23 self.0.register_raw(kq, token)
24 }
25 fn deregister(&mut self) {
26 self.0.deregister_raw()
27 }
28 }
29
30 /// A very crude implementation of an AIO-based future
31 struct FsyncFut(Aio<WrappedAioCb<'static>>);
32
33 impl Future for FsyncFut {
34 type Output = std::io::Result<()>;
35
36 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37 let poll_result = self.0.poll_ready(cx);
38 match poll_result {
39 Poll::Pending => Poll::Pending,
40 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
41 Poll::Ready(Ok(_ev)) => {
42 // At this point, we could clear readiness. But there's no
43 // point, since we're about to drop the Aio.
44 let result = (*self.0).0.aio_return();
45 match result {
46 Ok(_) => Poll::Ready(Ok(())),
47 Err(e) => Poll::Ready(Err(e.into())),
48 }
49 }
50 }
51 }
52 }
53
54 /// Low-level AIO Source
55 ///
56 /// An example bypassing mio_aio and Nix to demonstrate how the kevent
57 /// registration actually works, under the hood.
58 struct LlSource(Pin<Box<libc::aiocb>>);
59
60 impl AioSource for LlSource {
61 fn register(&mut self, kq: RawFd, token: usize) {
62 let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
63 sev.sigev_notify = libc::SIGEV_KEVENT;
64 sev.sigev_signo = kq;
65 sev.sigev_value = libc::sigval {
66 sival_ptr: token as *mut libc::c_void,
67 };
68 self.0.aio_sigevent = sev;
69 }
70
71 fn deregister(&mut self) {
72 unsafe {
73 self.0.aio_sigevent = mem::zeroed();
74 }
75 }
76 }
77
78 struct LlFut(Aio<LlSource>);
79
80 impl Future for LlFut {
81 type Output = std::io::Result<()>;
82
83 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
84 let poll_result = self.0.poll_ready(cx);
85 match poll_result {
86 Poll::Pending => Poll::Pending,
87 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
88 Poll::Ready(Ok(_ev)) => {
89 let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
90 assert_eq!(0, r);
91 Poll::Ready(Ok(()))
92 }
93 }
94 }
95 }
96
97 /// A very simple object that can implement AioSource and can be reused.
98 ///
99 /// mio_aio normally assumes that each AioCb will be consumed on completion.
100 /// This somewhat contrived example shows how an Aio object can be reused
101 /// anyway.
102 struct ReusableFsyncSource {
103 aiocb: Pin<Box<AioCb<'static>>>,
104 fd: RawFd,
105 token: usize,
106 }
107 impl ReusableFsyncSource {
108 fn fsync(&mut self) {
109 self.aiocb.register_raw(self.fd, self.token);
110 self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap();
111 }
112 fn new(aiocb: AioCb<'static>) -> Self {
113 ReusableFsyncSource {
114 aiocb: Box::pin(aiocb),
115 fd: 0,
116 token: 0,
117 }
118 }
119 fn reset(&mut self, aiocb: AioCb<'static>) {
120 self.aiocb = Box::pin(aiocb);
121 }
122 }
123 impl AioSource for ReusableFsyncSource {
124 fn register(&mut self, kq: RawFd, token: usize) {
125 self.fd = kq;
126 self.token = token;
127 }
128 fn deregister(&mut self) {
129 self.fd = 0;
130 }
131 }
132
133 struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>);
134 impl<'a> Future for ReusableFsyncFut<'a> {
135 type Output = std::io::Result<()>;
136
137 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138 let poll_result = self.0.poll_ready(cx);
139 match poll_result {
140 Poll::Pending => Poll::Pending,
141 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
142 Poll::Ready(Ok(ev)) => {
143 // Since this future uses a reusable Aio, we must clear
144 // its readiness here. That makes the future
145 // non-idempotent; the caller can't poll it repeatedly after
146 // it has already returned Ready. But that's ok; most
147 // futures behave this way.
148 self.0.clear_ready(ev);
149 let result = (*self.0).aiocb.aio_return();
150 match result {
151 Ok(_) => Poll::Ready(Ok(())),
152 Err(e) => Poll::Ready(Err(e.into())),
153 }
154 }
155 }
156 }
157 }
158
159 #[tokio::test]
160 async fn fsync() {
161 let f = tempfile().unwrap();
162 let fd = f.as_raw_fd();
163 let aiocb = AioCb::from_fd(fd, 0);
164 let source = WrappedAioCb(aiocb);
165 let mut poll_aio = Aio::new_for_aio(source).unwrap();
166 (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap();
167 let fut = FsyncFut(poll_aio);
168 fut.await.unwrap();
169 }
170
171 #[tokio::test]
172 async fn ll_fsync() {
173 let f = tempfile().unwrap();
174 let fd = f.as_raw_fd();
175 let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
176 aiocb.aio_fildes = fd;
177 let source = LlSource(Box::pin(aiocb));
178 let mut poll_aio = Aio::new_for_aio(source).unwrap();
179 let r = unsafe {
180 let p = (*poll_aio).0.as_mut().get_unchecked_mut();
181 libc::aio_fsync(libc::O_SYNC, p)
182 };
183 assert_eq!(0, r);
184 let fut = LlFut(poll_aio);
185 fut.await.unwrap();
186 }
187
188 /// A suitably crafted future type can reuse an Aio object
189 #[tokio::test]
190 async fn reuse() {
191 let f = tempfile().unwrap();
192 let fd = f.as_raw_fd();
193 let aiocb0 = AioCb::from_fd(fd, 0);
194 let source = ReusableFsyncSource::new(aiocb0);
195 let mut poll_aio = Aio::new_for_aio(source).unwrap();
196 poll_aio.fsync();
197 let fut0 = ReusableFsyncFut(&mut poll_aio);
198 fut0.await.unwrap();
199
200 let aiocb1 = AioCb::from_fd(fd, 0);
201 poll_aio.reset(aiocb1);
202 let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
203 assert_pending!(poll_aio.poll_ready(&mut ctx));
204 poll_aio.fsync();
205 let fut1 = ReusableFsyncFut(&mut poll_aio);
206 fut1.await.unwrap();
207 }
208}
209
210mod lio {
211 use super::*;
212
213 struct WrappedLioCb<'a>(LioCb<'a>);
214 impl<'a> AioSource for WrappedLioCb<'a> {
215 fn register(&mut self, kq: RawFd, token: usize) {
216 self.0.register_raw(kq, token)
217 }
218 fn deregister(&mut self) {
219 self.0.deregister_raw()
220 }
221 }
222
223 /// A very crude lio_listio-based Future
224 struct LioFut(Option<Aio<WrappedLioCb<'static>>>);
225
226 impl Future for LioFut {
227 type Output = std::io::Result<Vec<isize>>;
228
229 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230 let poll_result = self.0.as_mut().unwrap().poll_ready(cx);
231 match poll_result {
232 Poll::Pending => Poll::Pending,
233 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
234 Poll::Ready(Ok(_ev)) => {
235 // At this point, we could clear readiness. But there's no
236 // point, since we're about to drop the Aio.
237 let r = self.0.take().unwrap().into_inner().0.into_results(|iter| {
238 iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
239 });
240 Poll::Ready(Ok(r))
241 }
242 }
243 }
244 }
245
246 /// Minimal example demonstrating reuse of an Aio object with lio
247 /// readiness. mio_aio::LioCb actually does something similar under the
248 /// hood.
249 struct ReusableLioSource {
250 liocb: Option<LioCb<'static>>,
251 fd: RawFd,
252 token: usize,
253 }
254 impl ReusableLioSource {
255 fn new(liocb: LioCb<'static>) -> Self {
256 ReusableLioSource {
257 liocb: Some(liocb),
258 fd: 0,
259 token: 0,
260 }
261 }
262 fn reset(&mut self, liocb: LioCb<'static>) {
263 self.liocb = Some(liocb);
264 }
265 fn submit(&mut self) {
266 self.liocb
267 .as_mut()
268 .unwrap()
269 .register_raw(self.fd, self.token);
270 self.liocb.as_mut().unwrap().submit().unwrap();
271 }
272 }
273 impl AioSource for ReusableLioSource {
274 fn register(&mut self, kq: RawFd, token: usize) {
275 self.fd = kq;
276 self.token = token;
277 }
278 fn deregister(&mut self) {
279 self.fd = 0;
280 }
281 }
282 struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>);
283 impl<'a> Future for ReusableLioFut<'a> {
284 type Output = std::io::Result<Vec<isize>>;
285
286 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
287 let poll_result = self.0.poll_ready(cx);
288 match poll_result {
289 Poll::Pending => Poll::Pending,
290 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
291 Poll::Ready(Ok(ev)) => {
292 // Since this future uses a reusable Aio, we must clear
293 // its readiness here. That makes the future
294 // non-idempotent; the caller can't poll it repeatedly after
295 // it has already returned Ready. But that's ok; most
296 // futures behave this way.
297 self.0.clear_ready(ev);
298 let r = (*self.0).liocb.take().unwrap().into_results(|iter| {
299 iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
300 });
301 Poll::Ready(Ok(r))
302 }
303 }
304 }
305 }
306
307 /// An lio_listio operation with one write element
308 #[tokio::test]
309 async fn onewrite() {
310 const WBUF: &[u8] = b"abcdef";
311 let f = tempfile().unwrap();
312
313 let mut builder = mio_aio::LioCbBuilder::with_capacity(1);
314 builder = builder.emplace_slice(
315 f.as_raw_fd(),
316 0,
317 &WBUF[..],
318 0,
319 mio_aio::LioOpcode::LIO_WRITE,
320 );
321 let liocb = builder.finish();
322 let source = WrappedLioCb(liocb);
323 let mut poll_aio = Aio::new_for_lio(source).unwrap();
324
325 // Send the operation to the kernel
326 (*poll_aio).0.submit().unwrap();
327 let fut = LioFut(Some(poll_aio));
328 let v = fut.await.unwrap();
329 assert_eq!(v.len(), 1);
330 assert_eq!(v[0] as usize, WBUF.len());
331 }
332
333 /// A suitably crafted future type can reuse an Aio object
334 #[tokio::test]
335 async fn reuse() {
336 const WBUF: &[u8] = b"abcdef";
337 let f = tempfile().unwrap();
338
339 let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1);
340 builder0 = builder0.emplace_slice(
341 f.as_raw_fd(),
342 0,
343 &WBUF[..],
344 0,
345 mio_aio::LioOpcode::LIO_WRITE,
346 );
347 let liocb0 = builder0.finish();
348 let source = ReusableLioSource::new(liocb0);
349 let mut poll_aio = Aio::new_for_aio(source).unwrap();
350 poll_aio.submit();
351 let fut0 = ReusableLioFut(&mut poll_aio);
352 let v = fut0.await.unwrap();
353 assert_eq!(v.len(), 1);
354 assert_eq!(v[0] as usize, WBUF.len());
355
356 // Now reuse the same Aio
357 let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1);
358 builder1 = builder1.emplace_slice(
359 f.as_raw_fd(),
360 0,
361 &WBUF[..],
362 0,
363 mio_aio::LioOpcode::LIO_WRITE,
364 );
365 let liocb1 = builder1.finish();
366 poll_aio.reset(liocb1);
367 let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
368 assert_pending!(poll_aio.poll_ready(&mut ctx));
369 poll_aio.submit();
370 let fut1 = ReusableLioFut(&mut poll_aio);
371 let v = fut1.await.unwrap();
372 assert_eq!(v.len(), 1);
373 assert_eq!(v[0] as usize, WBUF.len());
374 }
375}
376