1 | #[cfg (any(target_os = "linux" , target_os = "android" ))] |
2 | mod eventfd { |
3 | use crate::sys::Selector; |
4 | use crate::{Interest, Token}; |
5 | |
6 | use std::fs::File; |
7 | use std::io::{self, Read, Write}; |
8 | use std::os::unix::io::FromRawFd; |
9 | |
10 | /// Waker backed by `eventfd`. |
11 | /// |
12 | /// `eventfd` is effectively an 64 bit counter. All writes must be of 8 |
13 | /// bytes (64 bits) and are converted (native endian) into an 64 bit |
14 | /// unsigned integer and added to the count. Reads must also be 8 bytes and |
15 | /// reset the count to 0, returning the count. |
16 | #[derive (Debug)] |
17 | pub struct Waker { |
18 | fd: File, |
19 | } |
20 | |
21 | impl Waker { |
22 | pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { |
23 | let fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?; |
24 | let file = unsafe { File::from_raw_fd(fd) }; |
25 | |
26 | selector.register(fd, token, Interest::READABLE)?; |
27 | Ok(Waker { fd: file }) |
28 | } |
29 | |
30 | pub fn wake(&self) -> io::Result<()> { |
31 | let buf: [u8; 8] = 1u64.to_ne_bytes(); |
32 | match (&self.fd).write(&buf) { |
33 | Ok(_) => Ok(()), |
34 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { |
35 | // Writing only blocks if the counter is going to overflow. |
36 | // So we'll reset the counter to 0 and wake it again. |
37 | self.reset()?; |
38 | self.wake() |
39 | } |
40 | Err(err) => Err(err), |
41 | } |
42 | } |
43 | |
44 | /// Reset the eventfd object, only need to call this if `wake` fails. |
45 | fn reset(&self) -> io::Result<()> { |
46 | let mut buf: [u8; 8] = 0u64.to_ne_bytes(); |
47 | match (&self.fd).read(&mut buf) { |
48 | Ok(_) => Ok(()), |
49 | // If the `Waker` hasn't been awoken yet this will return a |
50 | // `WouldBlock` error which we can safely ignore. |
51 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()), |
52 | Err(err) => Err(err), |
53 | } |
54 | } |
55 | } |
56 | } |
57 | |
58 | #[cfg (any(target_os = "linux" , target_os = "android" ))] |
59 | pub use self::eventfd::Waker; |
60 | |
61 | #[cfg (any( |
62 | target_os = "freebsd" , |
63 | target_os = "ios" , |
64 | target_os = "macos" , |
65 | target_os = "tvos" , |
66 | target_os = "watchos" , |
67 | ))] |
68 | mod kqueue { |
69 | use crate::sys::Selector; |
70 | use crate::Token; |
71 | |
72 | use std::io; |
73 | |
74 | /// Waker backed by kqueue user space notifications (`EVFILT_USER`). |
75 | /// |
76 | /// The implementation is fairly simple, first the kqueue must be setup to |
77 | /// receive waker events this done by calling `Selector.setup_waker`. Next |
78 | /// we need access to kqueue, thus we need to duplicate the file descriptor. |
79 | /// Now waking is as simple as adding an event to the kqueue. |
80 | #[derive (Debug)] |
81 | pub struct Waker { |
82 | selector: Selector, |
83 | token: Token, |
84 | } |
85 | |
86 | impl Waker { |
87 | pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { |
88 | let selector = selector.try_clone()?; |
89 | selector.setup_waker(token)?; |
90 | Ok(Waker { selector, token }) |
91 | } |
92 | |
93 | pub fn wake(&self) -> io::Result<()> { |
94 | self.selector.wake(self.token) |
95 | } |
96 | } |
97 | } |
98 | |
99 | #[cfg (any( |
100 | target_os = "freebsd" , |
101 | target_os = "ios" , |
102 | target_os = "macos" , |
103 | target_os = "tvos" , |
104 | target_os = "watchos" , |
105 | ))] |
106 | pub use self::kqueue::Waker; |
107 | |
108 | #[cfg (any( |
109 | target_os = "dragonfly" , |
110 | target_os = "illumos" , |
111 | target_os = "netbsd" , |
112 | target_os = "openbsd" , |
113 | target_os = "redox" , |
114 | ))] |
115 | mod pipe { |
116 | use crate::sys::unix::Selector; |
117 | use crate::{Interest, Token}; |
118 | |
119 | use std::fs::File; |
120 | use std::io::{self, Read, Write}; |
121 | use std::os::unix::io::FromRawFd; |
122 | |
123 | /// Waker backed by a unix pipe. |
124 | /// |
125 | /// Waker controls both the sending and receiving ends and empties the pipe |
126 | /// if writing to it (waking) fails. |
127 | #[derive (Debug)] |
128 | pub struct Waker { |
129 | sender: File, |
130 | receiver: File, |
131 | } |
132 | |
133 | impl Waker { |
134 | pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { |
135 | let mut fds = [-1; 2]; |
136 | syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; |
137 | let sender = unsafe { File::from_raw_fd(fds[1]) }; |
138 | let receiver = unsafe { File::from_raw_fd(fds[0]) }; |
139 | |
140 | selector.register(fds[0], token, Interest::READABLE)?; |
141 | Ok(Waker { sender, receiver }) |
142 | } |
143 | |
144 | pub fn wake(&self) -> io::Result<()> { |
145 | // The epoll emulation on some illumos systems currently requires |
146 | // the pipe buffer to be completely empty for an edge-triggered |
147 | // wakeup on the pipe read side. |
148 | #[cfg (target_os = "illumos" )] |
149 | self.empty(); |
150 | |
151 | match (&self.sender).write(&[1]) { |
152 | Ok(_) => Ok(()), |
153 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { |
154 | // The reading end is full so we'll empty the buffer and try |
155 | // again. |
156 | self.empty(); |
157 | self.wake() |
158 | } |
159 | Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(), |
160 | Err(err) => Err(err), |
161 | } |
162 | } |
163 | |
164 | /// Empty the pipe's buffer, only need to call this if `wake` fails. |
165 | /// This ignores any errors. |
166 | fn empty(&self) { |
167 | let mut buf = [0; 4096]; |
168 | loop { |
169 | match (&self.receiver).read(&mut buf) { |
170 | Ok(n) if n > 0 => continue, |
171 | _ => return, |
172 | } |
173 | } |
174 | } |
175 | } |
176 | } |
177 | |
178 | #[cfg (any( |
179 | target_os = "dragonfly" , |
180 | target_os = "illumos" , |
181 | target_os = "netbsd" , |
182 | target_os = "openbsd" , |
183 | target_os = "redox" , |
184 | ))] |
185 | pub use self::pipe::Waker; |
186 | |