1 | #[cfg (all( |
2 | not(mio_unsupported_force_poll_poll), |
3 | not(all( |
4 | not(mio_unsupported_force_waker_pipe), |
5 | any( |
6 | target_os = "freebsd" , |
7 | target_os = "ios" , |
8 | target_os = "macos" , |
9 | target_os = "tvos" , |
10 | target_os = "watchos" , |
11 | ) |
12 | )), |
13 | not(any(target_os = "solaris" , target_os = "vita" )), |
14 | ))] |
15 | mod fdbased { |
16 | #[cfg (all( |
17 | not(mio_unsupported_force_waker_pipe), |
18 | any(target_os = "linux" , target_os = "android" ), |
19 | ))] |
20 | use crate::sys::unix::waker::eventfd::WakerInternal; |
21 | #[cfg (any( |
22 | mio_unsupported_force_waker_pipe, |
23 | target_os = "aix" , |
24 | target_os = "dragonfly" , |
25 | target_os = "illumos" , |
26 | target_os = "netbsd" , |
27 | target_os = "openbsd" , |
28 | target_os = "redox" , |
29 | ))] |
30 | use crate::sys::unix::waker::pipe::WakerInternal; |
31 | use crate::sys::Selector; |
32 | use crate::{Interest, Token}; |
33 | use std::io; |
34 | use std::os::unix::io::AsRawFd; |
35 | |
36 | #[derive (Debug)] |
37 | pub struct Waker { |
38 | waker: WakerInternal, |
39 | } |
40 | |
41 | impl Waker { |
42 | pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { |
43 | let waker = WakerInternal::new()?; |
44 | selector.register(waker.as_raw_fd(), token, Interest::READABLE)?; |
45 | Ok(Waker { waker }) |
46 | } |
47 | |
48 | pub fn wake(&self) -> io::Result<()> { |
49 | self.waker.wake() |
50 | } |
51 | } |
52 | } |
53 | |
54 | #[cfg (all( |
55 | not(mio_unsupported_force_poll_poll), |
56 | not(all( |
57 | not(mio_unsupported_force_waker_pipe), |
58 | any( |
59 | target_os = "freebsd" , |
60 | target_os = "ios" , |
61 | target_os = "macos" , |
62 | target_os = "tvos" , |
63 | target_os = "watchos" , |
64 | ) |
65 | )), |
66 | not(any(target_os = "solaris" , target_os = "vita" )), |
67 | ))] |
68 | pub use self::fdbased::Waker; |
69 | |
70 | #[cfg (all( |
71 | not(mio_unsupported_force_waker_pipe), |
72 | any(target_os = "linux" , target_os = "android" , target_os = "espidf" ) |
73 | ))] |
74 | mod eventfd { |
75 | use std::fs::File; |
76 | use std::io::{self, Read, Write}; |
77 | use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; |
78 | |
79 | /// Waker backed by `eventfd`. |
80 | /// |
81 | /// `eventfd` is effectively an 64 bit counter. All writes must be of 8 |
82 | /// bytes (64 bits) and are converted (native endian) into an 64 bit |
83 | /// unsigned integer and added to the count. Reads must also be 8 bytes and |
84 | /// reset the count to 0, returning the count. |
85 | #[derive (Debug)] |
86 | pub struct WakerInternal { |
87 | fd: File, |
88 | } |
89 | |
90 | impl WakerInternal { |
91 | pub fn new() -> io::Result<WakerInternal> { |
92 | #[cfg (not(target_os = "espidf" ))] |
93 | let flags = libc::EFD_CLOEXEC | libc::EFD_NONBLOCK; |
94 | // ESP-IDF is EFD_NONBLOCK by default and errors if you try to pass this flag. |
95 | #[cfg (target_os = "espidf" )] |
96 | let flags = 0; |
97 | let fd = syscall!(eventfd(0, flags))?; |
98 | |
99 | let file = unsafe { File::from_raw_fd(fd) }; |
100 | Ok(WakerInternal { fd: file }) |
101 | } |
102 | |
103 | pub fn wake(&self) -> io::Result<()> { |
104 | let buf: [u8; 8] = 1u64.to_ne_bytes(); |
105 | match (&self.fd).write(&buf) { |
106 | Ok(_) => Ok(()), |
107 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { |
108 | // Writing only blocks if the counter is going to overflow. |
109 | // So we'll reset the counter to 0 and wake it again. |
110 | self.reset()?; |
111 | self.wake() |
112 | } |
113 | Err(err) => Err(err), |
114 | } |
115 | } |
116 | |
117 | #[cfg (mio_unsupported_force_poll_poll)] |
118 | pub fn ack_and_reset(&self) { |
119 | let _ = self.reset(); |
120 | } |
121 | |
122 | /// Reset the eventfd object, only need to call this if `wake` fails. |
123 | fn reset(&self) -> io::Result<()> { |
124 | let mut buf: [u8; 8] = 0u64.to_ne_bytes(); |
125 | match (&self.fd).read(&mut buf) { |
126 | Ok(_) => Ok(()), |
127 | // If the `Waker` hasn't been awoken yet this will return a |
128 | // `WouldBlock` error which we can safely ignore. |
129 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()), |
130 | Err(err) => Err(err), |
131 | } |
132 | } |
133 | } |
134 | |
135 | impl AsRawFd for WakerInternal { |
136 | fn as_raw_fd(&self) -> RawFd { |
137 | self.fd.as_raw_fd() |
138 | } |
139 | } |
140 | } |
141 | |
142 | #[cfg (all( |
143 | mio_unsupported_force_poll_poll, |
144 | not(mio_unsupported_force_waker_pipe), |
145 | any(target_os = "linux" , target_os = "android" , target_os = "espidf" ) |
146 | ))] |
147 | pub(crate) use self::eventfd::WakerInternal; |
148 | |
149 | #[cfg (all( |
150 | not(mio_unsupported_force_waker_pipe), |
151 | any( |
152 | target_os = "freebsd" , |
153 | target_os = "ios" , |
154 | target_os = "macos" , |
155 | target_os = "tvos" , |
156 | target_os = "watchos" , |
157 | ) |
158 | ))] |
159 | mod kqueue { |
160 | use crate::sys::Selector; |
161 | use crate::Token; |
162 | |
163 | use std::io; |
164 | |
165 | /// Waker backed by kqueue user space notifications (`EVFILT_USER`). |
166 | /// |
167 | /// The implementation is fairly simple, first the kqueue must be setup to |
168 | /// receive waker events this done by calling `Selector.setup_waker`. Next |
169 | /// we need access to kqueue, thus we need to duplicate the file descriptor. |
170 | /// Now waking is as simple as adding an event to the kqueue. |
171 | #[derive (Debug)] |
172 | pub struct Waker { |
173 | selector: Selector, |
174 | token: Token, |
175 | } |
176 | |
177 | impl Waker { |
178 | pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { |
179 | let selector = selector.try_clone()?; |
180 | selector.setup_waker(token)?; |
181 | Ok(Waker { selector, token }) |
182 | } |
183 | |
184 | pub fn wake(&self) -> io::Result<()> { |
185 | self.selector.wake(self.token) |
186 | } |
187 | } |
188 | } |
189 | |
190 | #[cfg (all( |
191 | not(mio_unsupported_force_waker_pipe), |
192 | any( |
193 | target_os = "freebsd" , |
194 | target_os = "ios" , |
195 | target_os = "macos" , |
196 | target_os = "tvos" , |
197 | target_os = "watchos" , |
198 | ) |
199 | ))] |
200 | pub use self::kqueue::Waker; |
201 | |
202 | #[cfg (any( |
203 | mio_unsupported_force_waker_pipe, |
204 | target_os = "aix" , |
205 | target_os = "dragonfly" , |
206 | target_os = "illumos" , |
207 | target_os = "netbsd" , |
208 | target_os = "openbsd" , |
209 | target_os = "redox" , |
210 | target_os = "solaris" , |
211 | target_os = "vita" , |
212 | ))] |
213 | mod pipe { |
214 | use crate::sys::unix::pipe; |
215 | use std::fs::File; |
216 | use std::io::{self, Read, Write}; |
217 | use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; |
218 | |
219 | /// Waker backed by a unix pipe. |
220 | /// |
221 | /// Waker controls both the sending and receiving ends and empties the pipe |
222 | /// if writing to it (waking) fails. |
223 | #[derive (Debug)] |
224 | pub struct WakerInternal { |
225 | sender: File, |
226 | receiver: File, |
227 | } |
228 | |
229 | impl WakerInternal { |
230 | pub fn new() -> io::Result<WakerInternal> { |
231 | let [receiver, sender] = pipe::new_raw()?; |
232 | let sender = unsafe { File::from_raw_fd(sender) }; |
233 | let receiver = unsafe { File::from_raw_fd(receiver) }; |
234 | Ok(WakerInternal { sender, receiver }) |
235 | } |
236 | |
237 | pub fn wake(&self) -> io::Result<()> { |
238 | // The epoll emulation on some illumos systems currently requires |
239 | // the pipe buffer to be completely empty for an edge-triggered |
240 | // wakeup on the pipe read side. |
241 | #[cfg (target_os = "illumos" )] |
242 | self.empty(); |
243 | |
244 | match (&self.sender).write(&[1]) { |
245 | Ok(_) => Ok(()), |
246 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { |
247 | // The reading end is full so we'll empty the buffer and try |
248 | // again. |
249 | self.empty(); |
250 | self.wake() |
251 | } |
252 | Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(), |
253 | Err(err) => Err(err), |
254 | } |
255 | } |
256 | |
257 | #[cfg (any( |
258 | mio_unsupported_force_poll_poll, |
259 | target_os = "solaris" , |
260 | target_os = "vita" |
261 | ))] |
262 | pub fn ack_and_reset(&self) { |
263 | self.empty(); |
264 | } |
265 | |
266 | /// Empty the pipe's buffer, only need to call this if `wake` fails. |
267 | /// This ignores any errors. |
268 | fn empty(&self) { |
269 | let mut buf = [0; 4096]; |
270 | loop { |
271 | match (&self.receiver).read(&mut buf) { |
272 | Ok(n) if n > 0 => continue, |
273 | _ => return, |
274 | } |
275 | } |
276 | } |
277 | } |
278 | |
279 | impl AsRawFd for WakerInternal { |
280 | fn as_raw_fd(&self) -> RawFd { |
281 | self.receiver.as_raw_fd() |
282 | } |
283 | } |
284 | } |
285 | |
286 | #[cfg (any( |
287 | all( |
288 | mio_unsupported_force_poll_poll, |
289 | any( |
290 | mio_unsupported_force_waker_pipe, |
291 | target_os = "aix" , |
292 | target_os = "dragonfly" , |
293 | target_os = "illumos" , |
294 | target_os = "netbsd" , |
295 | target_os = "openbsd" , |
296 | target_os = "redox" , |
297 | ) |
298 | ), |
299 | target_os = "solaris" , |
300 | target_os = "vita" , |
301 | ))] |
302 | pub(crate) use self::pipe::WakerInternal; |
303 | |
304 | #[cfg (any( |
305 | mio_unsupported_force_poll_poll, |
306 | target_os = "solaris" , |
307 | target_os = "vita" |
308 | ))] |
309 | mod poll { |
310 | use crate::sys::Selector; |
311 | use crate::Token; |
312 | use std::io; |
313 | |
314 | #[derive (Debug)] |
315 | pub struct Waker { |
316 | selector: Selector, |
317 | token: Token, |
318 | } |
319 | |
320 | impl Waker { |
321 | pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { |
322 | Ok(Waker { |
323 | selector: selector.try_clone()?, |
324 | token, |
325 | }) |
326 | } |
327 | |
328 | pub fn wake(&self) -> io::Result<()> { |
329 | self.selector.wake(self.token) |
330 | } |
331 | } |
332 | } |
333 | |
334 | #[cfg (any( |
335 | mio_unsupported_force_poll_poll, |
336 | target_os = "solaris" , |
337 | target_os = "vita" |
338 | ))] |
339 | pub use self::poll::Waker; |
340 | |