1#[cfg(all(
2 not(mio_unsupported_force_poll_poll),
3 not(all(
4 not(mio_unsupported_force_waker_pipe),
5 any(
6 target_os = "freebsd",
7 target_os = "ios",
8 target_os = "macos",
9 target_os = "tvos",
10 target_os = "watchos",
11 )
12 )),
13 not(any(target_os = "solaris", target_os = "vita")),
14))]
15mod fdbased {
16 #[cfg(all(
17 not(mio_unsupported_force_waker_pipe),
18 any(target_os = "linux", target_os = "android"),
19 ))]
20 use crate::sys::unix::waker::eventfd::WakerInternal;
21 #[cfg(any(
22 mio_unsupported_force_waker_pipe,
23 target_os = "aix",
24 target_os = "dragonfly",
25 target_os = "illumos",
26 target_os = "netbsd",
27 target_os = "openbsd",
28 target_os = "redox",
29 ))]
30 use crate::sys::unix::waker::pipe::WakerInternal;
31 use crate::sys::Selector;
32 use crate::{Interest, Token};
33 use std::io;
34 use std::os::unix::io::AsRawFd;
35
36 #[derive(Debug)]
37 pub struct Waker {
38 waker: WakerInternal,
39 }
40
41 impl Waker {
42 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
43 let waker = WakerInternal::new()?;
44 selector.register(waker.as_raw_fd(), token, Interest::READABLE)?;
45 Ok(Waker { waker })
46 }
47
48 pub fn wake(&self) -> io::Result<()> {
49 self.waker.wake()
50 }
51 }
52}
53
54#[cfg(all(
55 not(mio_unsupported_force_poll_poll),
56 not(all(
57 not(mio_unsupported_force_waker_pipe),
58 any(
59 target_os = "freebsd",
60 target_os = "ios",
61 target_os = "macos",
62 target_os = "tvos",
63 target_os = "watchos",
64 )
65 )),
66 not(any(target_os = "solaris", target_os = "vita")),
67))]
68pub use self::fdbased::Waker;
69
70#[cfg(all(
71 not(mio_unsupported_force_waker_pipe),
72 any(target_os = "linux", target_os = "android", target_os = "espidf")
73))]
74mod eventfd {
75 use std::fs::File;
76 use std::io::{self, Read, Write};
77 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
78
79 /// Waker backed by `eventfd`.
80 ///
81 /// `eventfd` is effectively an 64 bit counter. All writes must be of 8
82 /// bytes (64 bits) and are converted (native endian) into an 64 bit
83 /// unsigned integer and added to the count. Reads must also be 8 bytes and
84 /// reset the count to 0, returning the count.
85 #[derive(Debug)]
86 pub struct WakerInternal {
87 fd: File,
88 }
89
90 impl WakerInternal {
91 pub fn new() -> io::Result<WakerInternal> {
92 #[cfg(not(target_os = "espidf"))]
93 let flags = libc::EFD_CLOEXEC | libc::EFD_NONBLOCK;
94 // ESP-IDF is EFD_NONBLOCK by default and errors if you try to pass this flag.
95 #[cfg(target_os = "espidf")]
96 let flags = 0;
97 let fd = syscall!(eventfd(0, flags))?;
98
99 let file = unsafe { File::from_raw_fd(fd) };
100 Ok(WakerInternal { fd: file })
101 }
102
103 pub fn wake(&self) -> io::Result<()> {
104 let buf: [u8; 8] = 1u64.to_ne_bytes();
105 match (&self.fd).write(&buf) {
106 Ok(_) => Ok(()),
107 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
108 // Writing only blocks if the counter is going to overflow.
109 // So we'll reset the counter to 0 and wake it again.
110 self.reset()?;
111 self.wake()
112 }
113 Err(err) => Err(err),
114 }
115 }
116
117 #[cfg(mio_unsupported_force_poll_poll)]
118 pub fn ack_and_reset(&self) {
119 let _ = self.reset();
120 }
121
122 /// Reset the eventfd object, only need to call this if `wake` fails.
123 fn reset(&self) -> io::Result<()> {
124 let mut buf: [u8; 8] = 0u64.to_ne_bytes();
125 match (&self.fd).read(&mut buf) {
126 Ok(_) => Ok(()),
127 // If the `Waker` hasn't been awoken yet this will return a
128 // `WouldBlock` error which we can safely ignore.
129 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()),
130 Err(err) => Err(err),
131 }
132 }
133 }
134
135 impl AsRawFd for WakerInternal {
136 fn as_raw_fd(&self) -> RawFd {
137 self.fd.as_raw_fd()
138 }
139 }
140}
141
142#[cfg(all(
143 mio_unsupported_force_poll_poll,
144 not(mio_unsupported_force_waker_pipe),
145 any(target_os = "linux", target_os = "android", target_os = "espidf")
146))]
147pub(crate) use self::eventfd::WakerInternal;
148
149#[cfg(all(
150 not(mio_unsupported_force_waker_pipe),
151 any(
152 target_os = "freebsd",
153 target_os = "ios",
154 target_os = "macos",
155 target_os = "tvos",
156 target_os = "watchos",
157 )
158))]
159mod kqueue {
160 use crate::sys::Selector;
161 use crate::Token;
162
163 use std::io;
164
165 /// Waker backed by kqueue user space notifications (`EVFILT_USER`).
166 ///
167 /// The implementation is fairly simple, first the kqueue must be setup to
168 /// receive waker events this done by calling `Selector.setup_waker`. Next
169 /// we need access to kqueue, thus we need to duplicate the file descriptor.
170 /// Now waking is as simple as adding an event to the kqueue.
171 #[derive(Debug)]
172 pub struct Waker {
173 selector: Selector,
174 token: Token,
175 }
176
177 impl Waker {
178 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
179 let selector = selector.try_clone()?;
180 selector.setup_waker(token)?;
181 Ok(Waker { selector, token })
182 }
183
184 pub fn wake(&self) -> io::Result<()> {
185 self.selector.wake(self.token)
186 }
187 }
188}
189
190#[cfg(all(
191 not(mio_unsupported_force_waker_pipe),
192 any(
193 target_os = "freebsd",
194 target_os = "ios",
195 target_os = "macos",
196 target_os = "tvos",
197 target_os = "watchos",
198 )
199))]
200pub use self::kqueue::Waker;
201
202#[cfg(any(
203 mio_unsupported_force_waker_pipe,
204 target_os = "aix",
205 target_os = "dragonfly",
206 target_os = "illumos",
207 target_os = "netbsd",
208 target_os = "openbsd",
209 target_os = "redox",
210 target_os = "solaris",
211 target_os = "vita",
212))]
213mod pipe {
214 use crate::sys::unix::pipe;
215 use std::fs::File;
216 use std::io::{self, Read, Write};
217 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
218
219 /// Waker backed by a unix pipe.
220 ///
221 /// Waker controls both the sending and receiving ends and empties the pipe
222 /// if writing to it (waking) fails.
223 #[derive(Debug)]
224 pub struct WakerInternal {
225 sender: File,
226 receiver: File,
227 }
228
229 impl WakerInternal {
230 pub fn new() -> io::Result<WakerInternal> {
231 let [receiver, sender] = pipe::new_raw()?;
232 let sender = unsafe { File::from_raw_fd(sender) };
233 let receiver = unsafe { File::from_raw_fd(receiver) };
234 Ok(WakerInternal { sender, receiver })
235 }
236
237 pub fn wake(&self) -> io::Result<()> {
238 // The epoll emulation on some illumos systems currently requires
239 // the pipe buffer to be completely empty for an edge-triggered
240 // wakeup on the pipe read side.
241 #[cfg(target_os = "illumos")]
242 self.empty();
243
244 match (&self.sender).write(&[1]) {
245 Ok(_) => Ok(()),
246 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
247 // The reading end is full so we'll empty the buffer and try
248 // again.
249 self.empty();
250 self.wake()
251 }
252 Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(),
253 Err(err) => Err(err),
254 }
255 }
256
257 #[cfg(any(
258 mio_unsupported_force_poll_poll,
259 target_os = "solaris",
260 target_os = "vita"
261 ))]
262 pub fn ack_and_reset(&self) {
263 self.empty();
264 }
265
266 /// Empty the pipe's buffer, only need to call this if `wake` fails.
267 /// This ignores any errors.
268 fn empty(&self) {
269 let mut buf = [0; 4096];
270 loop {
271 match (&self.receiver).read(&mut buf) {
272 Ok(n) if n > 0 => continue,
273 _ => return,
274 }
275 }
276 }
277 }
278
279 impl AsRawFd for WakerInternal {
280 fn as_raw_fd(&self) -> RawFd {
281 self.receiver.as_raw_fd()
282 }
283 }
284}
285
286#[cfg(any(
287 all(
288 mio_unsupported_force_poll_poll,
289 any(
290 mio_unsupported_force_waker_pipe,
291 target_os = "aix",
292 target_os = "dragonfly",
293 target_os = "illumos",
294 target_os = "netbsd",
295 target_os = "openbsd",
296 target_os = "redox",
297 )
298 ),
299 target_os = "solaris",
300 target_os = "vita",
301))]
302pub(crate) use self::pipe::WakerInternal;
303
304#[cfg(any(
305 mio_unsupported_force_poll_poll,
306 target_os = "solaris",
307 target_os = "vita"
308))]
309mod poll {
310 use crate::sys::Selector;
311 use crate::Token;
312 use std::io;
313
314 #[derive(Debug)]
315 pub struct Waker {
316 selector: Selector,
317 token: Token,
318 }
319
320 impl Waker {
321 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
322 Ok(Waker {
323 selector: selector.try_clone()?,
324 token,
325 })
326 }
327
328 pub fn wake(&self) -> io::Result<()> {
329 self.selector.wake(self.token)
330 }
331 }
332}
333
334#[cfg(any(
335 mio_unsupported_force_poll_poll,
336 target_os = "solaris",
337 target_os = "vita"
338))]
339pub use self::poll::Waker;
340