1#[cfg(any(target_os = "linux", target_os = "android"))]
2mod eventfd {
3 use crate::sys::Selector;
4 use crate::{Interest, Token};
5
6 use std::fs::File;
7 use std::io::{self, Read, Write};
8 use std::os::unix::io::FromRawFd;
9
10 /// Waker backed by `eventfd`.
11 ///
12 /// `eventfd` is effectively an 64 bit counter. All writes must be of 8
13 /// bytes (64 bits) and are converted (native endian) into an 64 bit
14 /// unsigned integer and added to the count. Reads must also be 8 bytes and
15 /// reset the count to 0, returning the count.
16 #[derive(Debug)]
17 pub struct Waker {
18 fd: File,
19 }
20
21 impl Waker {
22 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
23 let fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
24 let file = unsafe { File::from_raw_fd(fd) };
25
26 selector.register(fd, token, Interest::READABLE)?;
27 Ok(Waker { fd: file })
28 }
29
30 pub fn wake(&self) -> io::Result<()> {
31 let buf: [u8; 8] = 1u64.to_ne_bytes();
32 match (&self.fd).write(&buf) {
33 Ok(_) => Ok(()),
34 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
35 // Writing only blocks if the counter is going to overflow.
36 // So we'll reset the counter to 0 and wake it again.
37 self.reset()?;
38 self.wake()
39 }
40 Err(err) => Err(err),
41 }
42 }
43
44 /// Reset the eventfd object, only need to call this if `wake` fails.
45 fn reset(&self) -> io::Result<()> {
46 let mut buf: [u8; 8] = 0u64.to_ne_bytes();
47 match (&self.fd).read(&mut buf) {
48 Ok(_) => Ok(()),
49 // If the `Waker` hasn't been awoken yet this will return a
50 // `WouldBlock` error which we can safely ignore.
51 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()),
52 Err(err) => Err(err),
53 }
54 }
55 }
56}
57
58#[cfg(any(target_os = "linux", target_os = "android"))]
59pub use self::eventfd::Waker;
60
61#[cfg(any(
62 target_os = "freebsd",
63 target_os = "ios",
64 target_os = "macos",
65 target_os = "tvos",
66 target_os = "watchos",
67))]
68mod kqueue {
69 use crate::sys::Selector;
70 use crate::Token;
71
72 use std::io;
73
74 /// Waker backed by kqueue user space notifications (`EVFILT_USER`).
75 ///
76 /// The implementation is fairly simple, first the kqueue must be setup to
77 /// receive waker events this done by calling `Selector.setup_waker`. Next
78 /// we need access to kqueue, thus we need to duplicate the file descriptor.
79 /// Now waking is as simple as adding an event to the kqueue.
80 #[derive(Debug)]
81 pub struct Waker {
82 selector: Selector,
83 token: Token,
84 }
85
86 impl Waker {
87 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
88 let selector = selector.try_clone()?;
89 selector.setup_waker(token)?;
90 Ok(Waker { selector, token })
91 }
92
93 pub fn wake(&self) -> io::Result<()> {
94 self.selector.wake(self.token)
95 }
96 }
97}
98
99#[cfg(any(
100 target_os = "freebsd",
101 target_os = "ios",
102 target_os = "macos",
103 target_os = "tvos",
104 target_os = "watchos",
105))]
106pub use self::kqueue::Waker;
107
108#[cfg(any(
109 target_os = "dragonfly",
110 target_os = "illumos",
111 target_os = "netbsd",
112 target_os = "openbsd",
113 target_os = "redox",
114))]
115mod pipe {
116 use crate::sys::unix::Selector;
117 use crate::{Interest, Token};
118
119 use std::fs::File;
120 use std::io::{self, Read, Write};
121 use std::os::unix::io::FromRawFd;
122
123 /// Waker backed by a unix pipe.
124 ///
125 /// Waker controls both the sending and receiving ends and empties the pipe
126 /// if writing to it (waking) fails.
127 #[derive(Debug)]
128 pub struct Waker {
129 sender: File,
130 receiver: File,
131 }
132
133 impl Waker {
134 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
135 let mut fds = [-1; 2];
136 syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?;
137 let sender = unsafe { File::from_raw_fd(fds[1]) };
138 let receiver = unsafe { File::from_raw_fd(fds[0]) };
139
140 selector.register(fds[0], token, Interest::READABLE)?;
141 Ok(Waker { sender, receiver })
142 }
143
144 pub fn wake(&self) -> io::Result<()> {
145 // The epoll emulation on some illumos systems currently requires
146 // the pipe buffer to be completely empty for an edge-triggered
147 // wakeup on the pipe read side.
148 #[cfg(target_os = "illumos")]
149 self.empty();
150
151 match (&self.sender).write(&[1]) {
152 Ok(_) => Ok(()),
153 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
154 // The reading end is full so we'll empty the buffer and try
155 // again.
156 self.empty();
157 self.wake()
158 }
159 Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(),
160 Err(err) => Err(err),
161 }
162 }
163
164 /// Empty the pipe's buffer, only need to call this if `wake` fails.
165 /// This ignores any errors.
166 fn empty(&self) {
167 let mut buf = [0; 4096];
168 loop {
169 match (&self.receiver).read(&mut buf) {
170 Ok(n) if n > 0 => continue,
171 _ => return,
172 }
173 }
174 }
175 }
176}
177
178#[cfg(any(
179 target_os = "dragonfly",
180 target_os = "illumos",
181 target_os = "netbsd",
182 target_os = "openbsd",
183 target_os = "redox",
184))]
185pub use self::pipe::Waker;
186