1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (all(target_os = "freebsd" , feature = "net" ))] |
3 | |
4 | use mio_aio::{AioCb, AioFsyncMode, LioCb}; |
5 | use std::{ |
6 | future::Future, |
7 | mem, |
8 | os::unix::io::{AsRawFd, RawFd}, |
9 | pin::Pin, |
10 | task::{Context, Poll}, |
11 | }; |
12 | use tempfile::tempfile; |
13 | use tokio::io::bsd::{Aio, AioSource}; |
14 | use tokio_test::assert_pending; |
15 | |
16 | mod aio { |
17 | use super::*; |
18 | |
19 | /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource |
20 | struct WrappedAioCb<'a>(AioCb<'a>); |
21 | impl<'a> AioSource for WrappedAioCb<'a> { |
22 | fn register(&mut self, kq: RawFd, token: usize) { |
23 | self.0.register_raw(kq, token) |
24 | } |
25 | fn deregister(&mut self) { |
26 | self.0.deregister_raw() |
27 | } |
28 | } |
29 | |
30 | /// A very crude implementation of an AIO-based future |
31 | struct FsyncFut(Aio<WrappedAioCb<'static>>); |
32 | |
33 | impl Future for FsyncFut { |
34 | type Output = std::io::Result<()>; |
35 | |
36 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
37 | let poll_result = self.0.poll_ready(cx); |
38 | match poll_result { |
39 | Poll::Pending => Poll::Pending, |
40 | Poll::Ready(Err(e)) => Poll::Ready(Err(e)), |
41 | Poll::Ready(Ok(_ev)) => { |
42 | // At this point, we could clear readiness. But there's no |
43 | // point, since we're about to drop the Aio. |
44 | let result = (*self.0).0.aio_return(); |
45 | match result { |
46 | Ok(_) => Poll::Ready(Ok(())), |
47 | Err(e) => Poll::Ready(Err(e.into())), |
48 | } |
49 | } |
50 | } |
51 | } |
52 | } |
53 | |
54 | /// Low-level AIO Source |
55 | /// |
56 | /// An example bypassing mio_aio and Nix to demonstrate how the kevent |
57 | /// registration actually works, under the hood. |
58 | struct LlSource(Pin<Box<libc::aiocb>>); |
59 | |
60 | impl AioSource for LlSource { |
61 | fn register(&mut self, kq: RawFd, token: usize) { |
62 | let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; |
63 | sev.sigev_notify = libc::SIGEV_KEVENT; |
64 | sev.sigev_signo = kq; |
65 | sev.sigev_value = libc::sigval { |
66 | sival_ptr: token as *mut libc::c_void, |
67 | }; |
68 | self.0.aio_sigevent = sev; |
69 | } |
70 | |
71 | fn deregister(&mut self) { |
72 | unsafe { |
73 | self.0.aio_sigevent = mem::zeroed(); |
74 | } |
75 | } |
76 | } |
77 | |
78 | struct LlFut(Aio<LlSource>); |
79 | |
80 | impl Future for LlFut { |
81 | type Output = std::io::Result<()>; |
82 | |
83 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
84 | let poll_result = self.0.poll_ready(cx); |
85 | match poll_result { |
86 | Poll::Pending => Poll::Pending, |
87 | Poll::Ready(Err(e)) => Poll::Ready(Err(e)), |
88 | Poll::Ready(Ok(_ev)) => { |
89 | let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) }; |
90 | assert_eq!(0, r); |
91 | Poll::Ready(Ok(())) |
92 | } |
93 | } |
94 | } |
95 | } |
96 | |
97 | /// A very simple object that can implement AioSource and can be reused. |
98 | /// |
99 | /// mio_aio normally assumes that each AioCb will be consumed on completion. |
100 | /// This somewhat contrived example shows how an Aio object can be reused |
101 | /// anyway. |
102 | struct ReusableFsyncSource { |
103 | aiocb: Pin<Box<AioCb<'static>>>, |
104 | fd: RawFd, |
105 | token: usize, |
106 | } |
107 | impl ReusableFsyncSource { |
108 | fn fsync(&mut self) { |
109 | self.aiocb.register_raw(self.fd, self.token); |
110 | self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap(); |
111 | } |
112 | fn new(aiocb: AioCb<'static>) -> Self { |
113 | ReusableFsyncSource { |
114 | aiocb: Box::pin(aiocb), |
115 | fd: 0, |
116 | token: 0, |
117 | } |
118 | } |
119 | fn reset(&mut self, aiocb: AioCb<'static>) { |
120 | self.aiocb = Box::pin(aiocb); |
121 | } |
122 | } |
123 | impl AioSource for ReusableFsyncSource { |
124 | fn register(&mut self, kq: RawFd, token: usize) { |
125 | self.fd = kq; |
126 | self.token = token; |
127 | } |
128 | fn deregister(&mut self) { |
129 | self.fd = 0; |
130 | } |
131 | } |
132 | |
133 | struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>); |
134 | impl<'a> Future for ReusableFsyncFut<'a> { |
135 | type Output = std::io::Result<()>; |
136 | |
137 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
138 | let poll_result = self.0.poll_ready(cx); |
139 | match poll_result { |
140 | Poll::Pending => Poll::Pending, |
141 | Poll::Ready(Err(e)) => Poll::Ready(Err(e)), |
142 | Poll::Ready(Ok(ev)) => { |
143 | // Since this future uses a reusable Aio, we must clear |
144 | // its readiness here. That makes the future |
145 | // non-idempotent; the caller can't poll it repeatedly after |
146 | // it has already returned Ready. But that's ok; most |
147 | // futures behave this way. |
148 | self.0.clear_ready(ev); |
149 | let result = (*self.0).aiocb.aio_return(); |
150 | match result { |
151 | Ok(_) => Poll::Ready(Ok(())), |
152 | Err(e) => Poll::Ready(Err(e.into())), |
153 | } |
154 | } |
155 | } |
156 | } |
157 | } |
158 | |
159 | #[tokio::test ] |
160 | async fn fsync() { |
161 | let f = tempfile().unwrap(); |
162 | let fd = f.as_raw_fd(); |
163 | let aiocb = AioCb::from_fd(fd, 0); |
164 | let source = WrappedAioCb(aiocb); |
165 | let mut poll_aio = Aio::new_for_aio(source).unwrap(); |
166 | (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap(); |
167 | let fut = FsyncFut(poll_aio); |
168 | fut.await.unwrap(); |
169 | } |
170 | |
171 | #[tokio::test ] |
172 | async fn ll_fsync() { |
173 | let f = tempfile().unwrap(); |
174 | let fd = f.as_raw_fd(); |
175 | let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; |
176 | aiocb.aio_fildes = fd; |
177 | let source = LlSource(Box::pin(aiocb)); |
178 | let mut poll_aio = Aio::new_for_aio(source).unwrap(); |
179 | let r = unsafe { |
180 | let p = (*poll_aio).0.as_mut().get_unchecked_mut(); |
181 | libc::aio_fsync(libc::O_SYNC, p) |
182 | }; |
183 | assert_eq!(0, r); |
184 | let fut = LlFut(poll_aio); |
185 | fut.await.unwrap(); |
186 | } |
187 | |
188 | /// A suitably crafted future type can reuse an Aio object |
189 | #[tokio::test ] |
190 | async fn reuse() { |
191 | let f = tempfile().unwrap(); |
192 | let fd = f.as_raw_fd(); |
193 | let aiocb0 = AioCb::from_fd(fd, 0); |
194 | let source = ReusableFsyncSource::new(aiocb0); |
195 | let mut poll_aio = Aio::new_for_aio(source).unwrap(); |
196 | poll_aio.fsync(); |
197 | let fut0 = ReusableFsyncFut(&mut poll_aio); |
198 | fut0.await.unwrap(); |
199 | |
200 | let aiocb1 = AioCb::from_fd(fd, 0); |
201 | poll_aio.reset(aiocb1); |
202 | let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); |
203 | assert_pending!(poll_aio.poll_ready(&mut ctx)); |
204 | poll_aio.fsync(); |
205 | let fut1 = ReusableFsyncFut(&mut poll_aio); |
206 | fut1.await.unwrap(); |
207 | } |
208 | } |
209 | |
210 | mod lio { |
211 | use super::*; |
212 | |
213 | struct WrappedLioCb<'a>(LioCb<'a>); |
214 | impl<'a> AioSource for WrappedLioCb<'a> { |
215 | fn register(&mut self, kq: RawFd, token: usize) { |
216 | self.0.register_raw(kq, token) |
217 | } |
218 | fn deregister(&mut self) { |
219 | self.0.deregister_raw() |
220 | } |
221 | } |
222 | |
223 | /// A very crude lio_listio-based Future |
224 | struct LioFut(Option<Aio<WrappedLioCb<'static>>>); |
225 | |
226 | impl Future for LioFut { |
227 | type Output = std::io::Result<Vec<isize>>; |
228 | |
229 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
230 | let poll_result = self.0.as_mut().unwrap().poll_ready(cx); |
231 | match poll_result { |
232 | Poll::Pending => Poll::Pending, |
233 | Poll::Ready(Err(e)) => Poll::Ready(Err(e)), |
234 | Poll::Ready(Ok(_ev)) => { |
235 | // At this point, we could clear readiness. But there's no |
236 | // point, since we're about to drop the Aio. |
237 | let r = self.0.take().unwrap().into_inner().0.into_results(|iter| { |
238 | iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>() |
239 | }); |
240 | Poll::Ready(Ok(r)) |
241 | } |
242 | } |
243 | } |
244 | } |
245 | |
246 | /// Minimal example demonstrating reuse of an Aio object with lio |
247 | /// readiness. mio_aio::LioCb actually does something similar under the |
248 | /// hood. |
249 | struct ReusableLioSource { |
250 | liocb: Option<LioCb<'static>>, |
251 | fd: RawFd, |
252 | token: usize, |
253 | } |
254 | impl ReusableLioSource { |
255 | fn new(liocb: LioCb<'static>) -> Self { |
256 | ReusableLioSource { |
257 | liocb: Some(liocb), |
258 | fd: 0, |
259 | token: 0, |
260 | } |
261 | } |
262 | fn reset(&mut self, liocb: LioCb<'static>) { |
263 | self.liocb = Some(liocb); |
264 | } |
265 | fn submit(&mut self) { |
266 | self.liocb |
267 | .as_mut() |
268 | .unwrap() |
269 | .register_raw(self.fd, self.token); |
270 | self.liocb.as_mut().unwrap().submit().unwrap(); |
271 | } |
272 | } |
273 | impl AioSource for ReusableLioSource { |
274 | fn register(&mut self, kq: RawFd, token: usize) { |
275 | self.fd = kq; |
276 | self.token = token; |
277 | } |
278 | fn deregister(&mut self) { |
279 | self.fd = 0; |
280 | } |
281 | } |
282 | struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>); |
283 | impl<'a> Future for ReusableLioFut<'a> { |
284 | type Output = std::io::Result<Vec<isize>>; |
285 | |
286 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
287 | let poll_result = self.0.poll_ready(cx); |
288 | match poll_result { |
289 | Poll::Pending => Poll::Pending, |
290 | Poll::Ready(Err(e)) => Poll::Ready(Err(e)), |
291 | Poll::Ready(Ok(ev)) => { |
292 | // Since this future uses a reusable Aio, we must clear |
293 | // its readiness here. That makes the future |
294 | // non-idempotent; the caller can't poll it repeatedly after |
295 | // it has already returned Ready. But that's ok; most |
296 | // futures behave this way. |
297 | self.0.clear_ready(ev); |
298 | let r = (*self.0).liocb.take().unwrap().into_results(|iter| { |
299 | iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>() |
300 | }); |
301 | Poll::Ready(Ok(r)) |
302 | } |
303 | } |
304 | } |
305 | } |
306 | |
307 | /// An lio_listio operation with one write element |
308 | #[tokio::test ] |
309 | async fn onewrite() { |
310 | const WBUF: &[u8] = b"abcdef" ; |
311 | let f = tempfile().unwrap(); |
312 | |
313 | let mut builder = mio_aio::LioCbBuilder::with_capacity(1); |
314 | builder = builder.emplace_slice( |
315 | f.as_raw_fd(), |
316 | 0, |
317 | &WBUF[..], |
318 | 0, |
319 | mio_aio::LioOpcode::LIO_WRITE, |
320 | ); |
321 | let liocb = builder.finish(); |
322 | let source = WrappedLioCb(liocb); |
323 | let mut poll_aio = Aio::new_for_lio(source).unwrap(); |
324 | |
325 | // Send the operation to the kernel |
326 | (*poll_aio).0.submit().unwrap(); |
327 | let fut = LioFut(Some(poll_aio)); |
328 | let v = fut.await.unwrap(); |
329 | assert_eq!(v.len(), 1); |
330 | assert_eq!(v[0] as usize, WBUF.len()); |
331 | } |
332 | |
333 | /// A suitably crafted future type can reuse an Aio object |
334 | #[tokio::test ] |
335 | async fn reuse() { |
336 | const WBUF: &[u8] = b"abcdef" ; |
337 | let f = tempfile().unwrap(); |
338 | |
339 | let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1); |
340 | builder0 = builder0.emplace_slice( |
341 | f.as_raw_fd(), |
342 | 0, |
343 | &WBUF[..], |
344 | 0, |
345 | mio_aio::LioOpcode::LIO_WRITE, |
346 | ); |
347 | let liocb0 = builder0.finish(); |
348 | let source = ReusableLioSource::new(liocb0); |
349 | let mut poll_aio = Aio::new_for_aio(source).unwrap(); |
350 | poll_aio.submit(); |
351 | let fut0 = ReusableLioFut(&mut poll_aio); |
352 | let v = fut0.await.unwrap(); |
353 | assert_eq!(v.len(), 1); |
354 | assert_eq!(v[0] as usize, WBUF.len()); |
355 | |
356 | // Now reuse the same Aio |
357 | let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1); |
358 | builder1 = builder1.emplace_slice( |
359 | f.as_raw_fd(), |
360 | 0, |
361 | &WBUF[..], |
362 | 0, |
363 | mio_aio::LioOpcode::LIO_WRITE, |
364 | ); |
365 | let liocb1 = builder1.finish(); |
366 | poll_aio.reset(liocb1); |
367 | let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); |
368 | assert_pending!(poll_aio.poll_ready(&mut ctx)); |
369 | poll_aio.submit(); |
370 | let fut1 = ReusableLioFut(&mut poll_aio); |
371 | let v = fut1.await.unwrap(); |
372 | assert_eq!(v.len(), 1); |
373 | assert_eq!(v[0] as usize, WBUF.len()); |
374 | } |
375 | } |
376 | |