1 | //! Eventfd based implementation of the ping event source. |
2 | //! |
3 | //! # Implementation notes |
4 | //! |
5 | //! The eventfd is a much lighter signalling mechanism provided by the Linux |
6 | //! kernel. Rather than write an arbitrary sequence of bytes, it only has a |
7 | //! 64-bit counter. |
8 | //! |
9 | //! To avoid closing the eventfd early, we wrap it in a RAII-style closer |
10 | //! `CloseOnDrop` in `make_ping()`. When all the senders are dropped, another |
11 | //! wrapper `FlagOnDrop` handles signalling this to the event source, which is |
12 | //! the sole owner of the eventfd itself. The senders have weak references to |
13 | //! the eventfd, and if the source is dropped before the senders, they will |
14 | //! simply not do anything (except log a message). |
15 | //! |
16 | //! To differentiate between regular ping events and close ping events, we add 2 |
17 | //! to the counter for regular events and 1 for close events. In the source we |
18 | //! can then check the LSB and if it's set, we know it was a close event. This |
19 | //! only works if a close event never fires more than once. |
20 | |
21 | use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; |
22 | use std::sync::Arc; |
23 | |
24 | use rustix::event::{eventfd, EventfdFlags}; |
25 | use rustix::io::{read, write, Errno}; |
26 | |
27 | use super::PingError; |
28 | use crate::{ |
29 | generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory, |
30 | }; |
31 | |
32 | // These are not bitfields! They are increments to add to the eventfd counter. |
33 | // Since the fd can only be closed once, we can effectively use the |
34 | // INCREMENT_CLOSE value as a bitmask when checking. |
35 | const INCREMENT_PING: u64 = 0x2; |
36 | const INCREMENT_CLOSE: u64 = 0x1; |
37 | |
38 | #[inline ] |
39 | pub fn make_ping() -> std::io::Result<(Ping, PingSource)> { |
40 | let read: OwnedFd = eventfd(initval:0, flags:EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?; |
41 | |
42 | // We only have one fd for the eventfd. If the sending end closes it when |
43 | // all copies are dropped, the receiving end will be closed as well. We need |
44 | // to make sure the fd is not closed until all holders of it have dropped |
45 | // it. |
46 | |
47 | let fd: Arc = Arc::new(data:read); |
48 | |
49 | let ping: Ping = Ping { |
50 | event: Arc::new(data:FlagOnDrop(Arc::clone(&fd))), |
51 | }; |
52 | |
53 | let source: PingSource = PingSource { |
54 | event: Generic::new(file:ArcAsFd(fd), interest:Interest::READ, Mode::Level), |
55 | }; |
56 | |
57 | Ok((ping, source)) |
58 | } |
59 | |
60 | // Helper functions for the event source IO. |
61 | |
62 | #[inline ] |
63 | fn send_ping(fd: BorrowedFd<'_>, count: u64) -> std::io::Result<()> { |
64 | assert!(count > 0); |
65 | match write(fd, &count.to_ne_bytes()) { |
66 | // The write succeeded, the ping will wake up the loop. |
67 | Ok(_) => Ok(()), |
68 | |
69 | // The counter hit its cap, which means previous calls to write() will |
70 | // wake up the loop. |
71 | Err(Errno::AGAIN) => Ok(()), |
72 | |
73 | // Anything else is a real error. |
74 | Err(e: Errno) => Err(e.into()), |
75 | } |
76 | } |
77 | |
78 | #[inline ] |
79 | fn drain_ping(fd: BorrowedFd<'_>) -> std::io::Result<u64> { |
80 | // The eventfd counter is effectively a u64. |
81 | const NBYTES: usize = 8; |
82 | let mut buf: [u8; 8] = [0u8; NBYTES]; |
83 | |
84 | match read(fd, &mut buf) { |
85 | // Reading from an eventfd should only ever produce 8 bytes. No looping |
86 | // is required. |
87 | Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)), |
88 | |
89 | Ok(_) => unreachable!(), |
90 | |
91 | // Any other error can be propagated. |
92 | Err(e: Errno) => Err(e.into()), |
93 | } |
94 | } |
95 | |
96 | // Rust 1.64.0 adds an `AsFd` implementation for `Arc`, so this won't be needed |
97 | #[derive (Debug)] |
98 | struct ArcAsFd(Arc<OwnedFd>); |
99 | |
100 | impl AsFd for ArcAsFd { |
101 | fn as_fd(&self) -> BorrowedFd { |
102 | self.0.as_fd() |
103 | } |
104 | } |
105 | |
106 | // The event source is simply a generic source with one of the eventfds. |
107 | #[derive (Debug)] |
108 | pub struct PingSource { |
109 | event: Generic<ArcAsFd>, |
110 | } |
111 | |
112 | impl EventSource for PingSource { |
113 | type Event = (); |
114 | type Metadata = (); |
115 | type Ret = (); |
116 | type Error = PingError; |
117 | |
118 | fn process_events<C>( |
119 | &mut self, |
120 | readiness: Readiness, |
121 | token: Token, |
122 | mut callback: C, |
123 | ) -> Result<PostAction, Self::Error> |
124 | where |
125 | C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
126 | { |
127 | self.event |
128 | .process_events(readiness, token, |_, fd| { |
129 | let counter = drain_ping(fd.as_fd())?; |
130 | |
131 | // If the LSB is set, it means we were closed. If anything else |
132 | // is also set, it means we were pinged. The two are not |
133 | // mutually exclusive. |
134 | let close = (counter & INCREMENT_CLOSE) != 0; |
135 | let ping = (counter & (u64::MAX - 1)) != 0; |
136 | |
137 | if ping { |
138 | callback((), &mut ()); |
139 | } |
140 | |
141 | if close { |
142 | Ok(PostAction::Remove) |
143 | } else { |
144 | Ok(PostAction::Continue) |
145 | } |
146 | }) |
147 | .map_err(|e| PingError(e.into())) |
148 | } |
149 | |
150 | fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { |
151 | self.event.register(poll, token_factory) |
152 | } |
153 | |
154 | fn reregister( |
155 | &mut self, |
156 | poll: &mut Poll, |
157 | token_factory: &mut TokenFactory, |
158 | ) -> crate::Result<()> { |
159 | self.event.reregister(poll, token_factory) |
160 | } |
161 | |
162 | fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { |
163 | self.event.unregister(poll) |
164 | } |
165 | } |
166 | |
167 | #[derive (Clone, Debug)] |
168 | pub struct Ping { |
169 | // This is an Arc because it's potentially shared with clones. The last one |
170 | // dropped needs to signal to the event source via the eventfd. |
171 | event: Arc<FlagOnDrop>, |
172 | } |
173 | |
174 | impl Ping { |
175 | /// Send a ping to the `PingSource`. |
176 | pub fn ping(&self) { |
177 | if let Err(e: Error) = send_ping(self.event.0.as_fd(), INCREMENT_PING) { |
178 | log::warn!("[calloop] Failed to write a ping: {:?}" , e); |
179 | } |
180 | } |
181 | } |
182 | |
183 | /// This manages signalling to the PingSource when it's dropped. There should |
184 | /// only ever be one of these per PingSource. |
185 | #[derive (Debug)] |
186 | struct FlagOnDrop(Arc<OwnedFd>); |
187 | |
188 | impl Drop for FlagOnDrop { |
189 | fn drop(&mut self) { |
190 | if let Err(e: Error) = send_ping(self.0.as_fd(), INCREMENT_CLOSE) { |
191 | log::warn!("[calloop] Failed to send close ping: {:?}" , e); |
192 | } |
193 | } |
194 | } |
195 | |