1 | //! Eventfd based implementation of the ping event source. |
2 | //! |
3 | //! # Implementation notes |
4 | //! |
5 | //! The eventfd is a much lighter signalling mechanism provided by the Linux |
6 | //! kernel. Rather than write an arbitrary sequence of bytes, it only has a |
7 | //! 64-bit counter. |
8 | //! |
9 | //! To avoid closing the eventfd early, we wrap it in a RAII-style closer |
10 | //! `CloseOnDrop` in `make_ping()`. When all the senders are dropped, another |
11 | //! wrapper `FlagOnDrop` handles signalling this to the event source, which is |
12 | //! the sole owner of the eventfd itself. The senders have weak references to |
13 | //! the eventfd, and if the source is dropped before the senders, they will |
14 | //! simply not do anything (except log a message). |
15 | //! |
16 | //! To differentiate between regular ping events and close ping events, we add 2 |
17 | //! to the counter for regular events and 1 for close events. In the source we |
18 | //! can then check the LSB and if it's set, we know it was a close event. This |
19 | //! only works if a close event never fires more than once. |
20 | |
21 | use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; |
22 | use std::sync::Arc; |
23 | |
24 | use rustix::event::{eventfd, EventfdFlags}; |
25 | use rustix::io::{read, write, Errno}; |
26 | use tracing::warn; |
27 | |
28 | use super::PingError; |
29 | use crate::{ |
30 | generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory, |
31 | }; |
32 | |
33 | // These are not bitfields! They are increments to add to the eventfd counter. |
34 | // Since the fd can only be closed once, we can effectively use the |
35 | // INCREMENT_CLOSE value as a bitmask when checking. |
36 | const INCREMENT_PING: u64 = 0x2; |
37 | const INCREMENT_CLOSE: u64 = 0x1; |
38 | |
39 | #[inline ] |
40 | pub fn make_ping() -> std::io::Result<(Ping, PingSource)> { |
41 | let read: OwnedFd = eventfd(initval:0, flags:EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?; |
42 | |
43 | // We only have one fd for the eventfd. If the sending end closes it when |
44 | // all copies are dropped, the receiving end will be closed as well. We need |
45 | // to make sure the fd is not closed until all holders of it have dropped |
46 | // it. |
47 | |
48 | let fd: Arc = Arc::new(data:read); |
49 | |
50 | let ping: Ping = Ping { |
51 | event: Arc::new(data:FlagOnDrop(Arc::clone(&fd))), |
52 | }; |
53 | |
54 | let source: PingSource = PingSource { |
55 | event: Generic::new(file:ArcAsFd(fd), interest:Interest::READ, Mode::Level), |
56 | }; |
57 | |
58 | Ok((ping, source)) |
59 | } |
60 | |
61 | // Helper functions for the event source IO. |
62 | |
63 | #[inline ] |
64 | fn send_ping(fd: BorrowedFd<'_>, count: u64) -> std::io::Result<()> { |
65 | assert!(count > 0); |
66 | match write(fd, &count.to_ne_bytes()) { |
67 | // The write succeeded, the ping will wake up the loop. |
68 | Ok(_) => Ok(()), |
69 | |
70 | // The counter hit its cap, which means previous calls to write() will |
71 | // wake up the loop. |
72 | Err(Errno::AGAIN) => Ok(()), |
73 | |
74 | // Anything else is a real error. |
75 | Err(e: Errno) => Err(e.into()), |
76 | } |
77 | } |
78 | |
79 | #[inline ] |
80 | fn drain_ping(fd: BorrowedFd<'_>) -> std::io::Result<u64> { |
81 | // The eventfd counter is effectively a u64. |
82 | const NBYTES: usize = 8; |
83 | let mut buf: [u8; 8] = [0u8; NBYTES]; |
84 | |
85 | match read(fd, &mut buf) { |
86 | // Reading from an eventfd should only ever produce 8 bytes. No looping |
87 | // is required. |
88 | Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)), |
89 | |
90 | Ok(_) => unreachable!(), |
91 | |
92 | // Any other error can be propagated. |
93 | Err(e: Errno) => Err(e.into()), |
94 | } |
95 | } |
96 | |
97 | // Rust 1.64.0 adds an `AsFd` implementation for `Arc`, so this won't be needed |
98 | #[derive (Debug)] |
99 | struct ArcAsFd(Arc<OwnedFd>); |
100 | |
101 | impl AsFd for ArcAsFd { |
102 | fn as_fd(&self) -> BorrowedFd { |
103 | self.0.as_fd() |
104 | } |
105 | } |
106 | |
107 | // The event source is simply a generic source with one of the eventfds. |
108 | #[derive (Debug)] |
109 | pub struct PingSource { |
110 | event: Generic<ArcAsFd>, |
111 | } |
112 | |
113 | impl EventSource for PingSource { |
114 | type Event = (); |
115 | type Metadata = (); |
116 | type Ret = (); |
117 | type Error = PingError; |
118 | |
119 | fn process_events<C>( |
120 | &mut self, |
121 | readiness: Readiness, |
122 | token: Token, |
123 | mut callback: C, |
124 | ) -> Result<PostAction, Self::Error> |
125 | where |
126 | C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
127 | { |
128 | self.event |
129 | .process_events(readiness, token, |_, fd| { |
130 | let counter = drain_ping(fd.as_fd())?; |
131 | |
132 | // If the LSB is set, it means we were closed. If anything else |
133 | // is also set, it means we were pinged. The two are not |
134 | // mutually exclusive. |
135 | let close = (counter & INCREMENT_CLOSE) != 0; |
136 | let ping = (counter & (u64::MAX - 1)) != 0; |
137 | |
138 | if ping { |
139 | callback((), &mut ()); |
140 | } |
141 | |
142 | if close { |
143 | Ok(PostAction::Remove) |
144 | } else { |
145 | Ok(PostAction::Continue) |
146 | } |
147 | }) |
148 | .map_err(|e| PingError(e.into())) |
149 | } |
150 | |
151 | fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { |
152 | self.event.register(poll, token_factory) |
153 | } |
154 | |
155 | fn reregister( |
156 | &mut self, |
157 | poll: &mut Poll, |
158 | token_factory: &mut TokenFactory, |
159 | ) -> crate::Result<()> { |
160 | self.event.reregister(poll, token_factory) |
161 | } |
162 | |
163 | fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { |
164 | self.event.unregister(poll) |
165 | } |
166 | } |
167 | |
168 | #[derive (Clone, Debug)] |
169 | pub struct Ping { |
170 | // This is an Arc because it's potentially shared with clones. The last one |
171 | // dropped needs to signal to the event source via the eventfd. |
172 | event: Arc<FlagOnDrop>, |
173 | } |
174 | |
175 | impl Ping { |
176 | /// Send a ping to the `PingSource`. |
177 | pub fn ping(&self) { |
178 | if let Err(e: Error) = send_ping(self.event.0.as_fd(), INCREMENT_PING) { |
179 | warn!("Failed to write a ping: {e:?}" ); |
180 | } |
181 | } |
182 | } |
183 | |
184 | /// This manages signalling to the PingSource when it's dropped. There should |
185 | /// only ever be one of these per PingSource. |
186 | #[derive (Debug)] |
187 | struct FlagOnDrop(Arc<OwnedFd>); |
188 | |
189 | impl Drop for FlagOnDrop { |
190 | fn drop(&mut self) { |
191 | if let Err(e: Error) = send_ping(self.0.as_fd(), INCREMENT_CLOSE) { |
192 | warn!("Failed to send close ping: {e:?}" ); |
193 | } |
194 | } |
195 | } |
196 | |