1use crate::{Interest, Token};
2
3use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLRDHUP};
4use std::os::unix::io::{AsRawFd, RawFd};
5#[cfg(debug_assertions)]
6use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7use std::time::Duration;
8use std::{cmp, i32, io, ptr};
9
10/// Unique id for use as `SelectorId`.
11#[cfg(debug_assertions)]
12static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
13
14#[derive(Debug)]
15pub struct Selector {
16 #[cfg(debug_assertions)]
17 id: usize,
18 ep: RawFd,
19 #[cfg(debug_assertions)]
20 has_waker: AtomicBool,
21}
22
23impl Selector {
24 pub fn new() -> io::Result<Selector> {
25 #[cfg(not(target_os = "android"))]
26 let res = syscall!(epoll_create1(libc::EPOLL_CLOEXEC));
27
28 // On Android < API level 16 `epoll_create1` is not defined, so use a
29 // raw system call.
30 // According to libuv, `EPOLL_CLOEXEC` is not defined on Android API <
31 // 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform,
32 // so we use it instead.
33 #[cfg(target_os = "android")]
34 let res = syscall!(syscall(libc::SYS_epoll_create1, libc::O_CLOEXEC));
35
36 let ep = match res {
37 Ok(ep) => ep as RawFd,
38 Err(err) => {
39 // When `epoll_create1` is not available fall back to use
40 // `epoll_create` followed by `fcntl`.
41 if let Some(libc::ENOSYS) = err.raw_os_error() {
42 match syscall!(epoll_create(1024)) {
43 Ok(ep) => match syscall!(fcntl(ep, libc::F_SETFD, libc::FD_CLOEXEC)) {
44 Ok(ep) => ep as RawFd,
45 Err(err) => {
46 // `fcntl` failed, cleanup `ep`.
47 let _ = unsafe { libc::close(ep) };
48 return Err(err);
49 }
50 },
51 Err(err) => return Err(err),
52 }
53 } else {
54 return Err(err);
55 }
56 }
57 };
58
59 Ok(Selector {
60 #[cfg(debug_assertions)]
61 id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
62 ep,
63 #[cfg(debug_assertions)]
64 has_waker: AtomicBool::new(false),
65 })
66 }
67
68 pub fn try_clone(&self) -> io::Result<Selector> {
69 syscall!(fcntl(self.ep, libc::F_DUPFD_CLOEXEC, super::LOWEST_FD)).map(|ep| Selector {
70 // It's the same selector, so we use the same id.
71 #[cfg(debug_assertions)]
72 id: self.id,
73 ep,
74 #[cfg(debug_assertions)]
75 has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
76 })
77 }
78
79 pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
80 // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ
81 // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits
82 // architectures. The magic number is the same constant used by libuv.
83 #[cfg(target_pointer_width = "32")]
84 const MAX_SAFE_TIMEOUT: u128 = 1789569;
85 #[cfg(not(target_pointer_width = "32"))]
86 const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128;
87
88 let timeout = timeout
89 .map(|to| {
90 // `Duration::as_millis` truncates, so round up. This avoids
91 // turning sub-millisecond timeouts into a zero timeout, unless
92 // the caller explicitly requests that by specifying a zero
93 // timeout.
94 let to_ms = to
95 .checked_add(Duration::from_nanos(999_999))
96 .unwrap_or(to)
97 .as_millis();
98 cmp::min(MAX_SAFE_TIMEOUT, to_ms) as libc::c_int
99 })
100 .unwrap_or(-1);
101
102 events.clear();
103 syscall!(epoll_wait(
104 self.ep,
105 events.as_mut_ptr(),
106 events.capacity() as i32,
107 timeout,
108 ))
109 .map(|n_events| {
110 // This is safe because `epoll_wait` ensures that `n_events` are
111 // assigned.
112 unsafe { events.set_len(n_events as usize) };
113 })
114 }
115
116 pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
117 let mut event = libc::epoll_event {
118 events: interests_to_epoll(interests),
119 u64: usize::from(token) as u64,
120 #[cfg(target_os = "redox")]
121 _pad: 0,
122 };
123
124 syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
125 }
126
127 pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
128 let mut event = libc::epoll_event {
129 events: interests_to_epoll(interests),
130 u64: usize::from(token) as u64,
131 #[cfg(target_os = "redox")]
132 _pad: 0,
133 };
134
135 syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ())
136 }
137
138 pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
139 syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
140 }
141
142 #[cfg(debug_assertions)]
143 pub fn register_waker(&self) -> bool {
144 self.has_waker.swap(true, Ordering::AcqRel)
145 }
146}
147
148cfg_io_source! {
149 impl Selector {
150 #[cfg(debug_assertions)]
151 pub fn id(&self) -> usize {
152 self.id
153 }
154 }
155}
156
157impl AsRawFd for Selector {
158 fn as_raw_fd(&self) -> RawFd {
159 self.ep
160 }
161}
162
163impl Drop for Selector {
164 fn drop(&mut self) {
165 if let Err(err: Error) = syscall!(close(self.ep)) {
166 error!("error closing epoll: {}", err);
167 }
168 }
169}
170
171fn interests_to_epoll(interests: Interest) -> u32 {
172 let mut kind: i32 = EPOLLET;
173
174 if interests.is_readable() {
175 kind = kind | EPOLLIN | EPOLLRDHUP;
176 }
177
178 if interests.is_writable() {
179 kind |= EPOLLOUT;
180 }
181
182 if interests.is_priority() {
183 kind |= EPOLLPRI;
184 }
185
186 kind as u32
187}
188
189pub type Event = libc::epoll_event;
190pub type Events = Vec<Event>;
191
192pub mod event {
193 use std::fmt;
194
195 use crate::sys::Event;
196 use crate::Token;
197
198 pub fn token(event: &Event) -> Token {
199 Token(event.u64 as usize)
200 }
201
202 pub fn is_readable(event: &Event) -> bool {
203 (event.events as libc::c_int & libc::EPOLLIN) != 0
204 || (event.events as libc::c_int & libc::EPOLLPRI) != 0
205 }
206
207 pub fn is_writable(event: &Event) -> bool {
208 (event.events as libc::c_int & libc::EPOLLOUT) != 0
209 }
210
211 pub fn is_error(event: &Event) -> bool {
212 (event.events as libc::c_int & libc::EPOLLERR) != 0
213 }
214
215 pub fn is_read_closed(event: &Event) -> bool {
216 // Both halves of the socket have closed
217 event.events as libc::c_int & libc::EPOLLHUP != 0
218 // Socket has received FIN or called shutdown(SHUT_RD)
219 || (event.events as libc::c_int & libc::EPOLLIN != 0
220 && event.events as libc::c_int & libc::EPOLLRDHUP != 0)
221 }
222
223 pub fn is_write_closed(event: &Event) -> bool {
224 // Both halves of the socket have closed
225 event.events as libc::c_int & libc::EPOLLHUP != 0
226 // Unix pipe write end has closed
227 || (event.events as libc::c_int & libc::EPOLLOUT != 0
228 && event.events as libc::c_int & libc::EPOLLERR != 0)
229 // The other side (read end) of a Unix pipe has closed.
230 || event.events as libc::c_int == libc::EPOLLERR
231 }
232
233 pub fn is_priority(event: &Event) -> bool {
234 (event.events as libc::c_int & libc::EPOLLPRI) != 0
235 }
236
237 pub fn is_aio(_: &Event) -> bool {
238 // Not supported in the kernel, only in libc.
239 false
240 }
241
242 pub fn is_lio(_: &Event) -> bool {
243 // Not supported.
244 false
245 }
246
247 pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
248 #[allow(clippy::trivially_copy_pass_by_ref)]
249 fn check_events(got: &u32, want: &libc::c_int) -> bool {
250 (*got as libc::c_int & want) != 0
251 }
252 debug_detail!(
253 EventsDetails(u32),
254 check_events,
255 libc::EPOLLIN,
256 libc::EPOLLPRI,
257 libc::EPOLLOUT,
258 libc::EPOLLRDNORM,
259 libc::EPOLLRDBAND,
260 libc::EPOLLWRNORM,
261 libc::EPOLLWRBAND,
262 libc::EPOLLMSG,
263 libc::EPOLLERR,
264 libc::EPOLLHUP,
265 libc::EPOLLET,
266 libc::EPOLLRDHUP,
267 libc::EPOLLONESHOT,
268 #[cfg(target_os = "linux")]
269 libc::EPOLLEXCLUSIVE,
270 #[cfg(any(target_os = "android", target_os = "linux"))]
271 libc::EPOLLWAKEUP,
272 libc::EPOLL_CLOEXEC,
273 );
274
275 // Can't reference fields in packed structures.
276 let e_u64 = event.u64;
277 f.debug_struct("epoll_event")
278 .field("events", &EventsDetails(event.events))
279 .field("u64", &e_u64)
280 .finish()
281 }
282}
283
284#[cfg(target_os = "android")]
285#[test]
286fn assert_close_on_exec_flag() {
287 // This assertion need to be true for Selector::new.
288 assert_eq!(libc::O_CLOEXEC, libc::EPOLL_CLOEXEC);
289}
290