| 1 | //! Eventfd based implementation of the ping event source. |
| 2 | //! |
| 3 | //! # Implementation notes |
| 4 | //! |
| 5 | //! The eventfd is a much lighter signalling mechanism provided by the Linux |
| 6 | //! kernel. Rather than write an arbitrary sequence of bytes, it only has a |
| 7 | //! 64-bit counter. |
| 8 | //! |
| 9 | //! To avoid closing the eventfd early, we wrap it in a RAII-style closer |
| 10 | //! `CloseOnDrop` in `make_ping()`. When all the senders are dropped, another |
| 11 | //! wrapper `FlagOnDrop` handles signalling this to the event source, which is |
| 12 | //! the sole owner of the eventfd itself. The senders have weak references to |
| 13 | //! the eventfd, and if the source is dropped before the senders, they will |
| 14 | //! simply not do anything (except log a message). |
| 15 | //! |
| 16 | //! To differentiate between regular ping events and close ping events, we add 2 |
| 17 | //! to the counter for regular events and 1 for close events. In the source we |
| 18 | //! can then check the LSB and if it's set, we know it was a close event. This |
| 19 | //! only works if a close event never fires more than once. |
| 20 | |
| 21 | use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; |
| 22 | use std::sync::Arc; |
| 23 | |
| 24 | use rustix::event::{eventfd, EventfdFlags}; |
| 25 | use rustix::io::{read, write, Errno}; |
| 26 | |
| 27 | use super::PingError; |
| 28 | use crate::{ |
| 29 | generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory, |
| 30 | }; |
| 31 | |
| 32 | // These are not bitfields! They are increments to add to the eventfd counter. |
| 33 | // Since the fd can only be closed once, we can effectively use the |
| 34 | // INCREMENT_CLOSE value as a bitmask when checking. |
| 35 | const INCREMENT_PING: u64 = 0x2; |
| 36 | const INCREMENT_CLOSE: u64 = 0x1; |
| 37 | |
| 38 | #[inline ] |
| 39 | pub fn make_ping() -> std::io::Result<(Ping, PingSource)> { |
| 40 | let read: OwnedFd = eventfd(initval:0, flags:EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?; |
| 41 | |
| 42 | // We only have one fd for the eventfd. If the sending end closes it when |
| 43 | // all copies are dropped, the receiving end will be closed as well. We need |
| 44 | // to make sure the fd is not closed until all holders of it have dropped |
| 45 | // it. |
| 46 | |
| 47 | let fd: Arc = Arc::new(data:read); |
| 48 | |
| 49 | let ping: Ping = Ping { |
| 50 | event: Arc::new(data:FlagOnDrop(Arc::clone(&fd))), |
| 51 | }; |
| 52 | |
| 53 | let source: PingSource = PingSource { |
| 54 | event: Generic::new(file:ArcAsFd(fd), interest:Interest::READ, Mode::Level), |
| 55 | }; |
| 56 | |
| 57 | Ok((ping, source)) |
| 58 | } |
| 59 | |
| 60 | // Helper functions for the event source IO. |
| 61 | |
| 62 | #[inline ] |
| 63 | fn send_ping(fd: BorrowedFd<'_>, count: u64) -> std::io::Result<()> { |
| 64 | assert!(count > 0); |
| 65 | match write(fd, &count.to_ne_bytes()) { |
| 66 | // The write succeeded, the ping will wake up the loop. |
| 67 | Ok(_) => Ok(()), |
| 68 | |
| 69 | // The counter hit its cap, which means previous calls to write() will |
| 70 | // wake up the loop. |
| 71 | Err(Errno::AGAIN) => Ok(()), |
| 72 | |
| 73 | // Anything else is a real error. |
| 74 | Err(e: Errno) => Err(e.into()), |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | #[inline ] |
| 79 | fn drain_ping(fd: BorrowedFd<'_>) -> std::io::Result<u64> { |
| 80 | // The eventfd counter is effectively a u64. |
| 81 | const NBYTES: usize = 8; |
| 82 | let mut buf: [u8; 8] = [0u8; NBYTES]; |
| 83 | |
| 84 | match read(fd, &mut buf) { |
| 85 | // Reading from an eventfd should only ever produce 8 bytes. No looping |
| 86 | // is required. |
| 87 | Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)), |
| 88 | |
| 89 | Ok(_) => unreachable!(), |
| 90 | |
| 91 | // Any other error can be propagated. |
| 92 | Err(e: Errno) => Err(e.into()), |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | // Rust 1.64.0 adds an `AsFd` implementation for `Arc`, so this won't be needed |
| 97 | #[derive (Debug)] |
| 98 | struct ArcAsFd(Arc<OwnedFd>); |
| 99 | |
| 100 | impl AsFd for ArcAsFd { |
| 101 | fn as_fd(&self) -> BorrowedFd { |
| 102 | self.0.as_fd() |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | // The event source is simply a generic source with one of the eventfds. |
| 107 | #[derive (Debug)] |
| 108 | pub struct PingSource { |
| 109 | event: Generic<ArcAsFd>, |
| 110 | } |
| 111 | |
| 112 | impl EventSource for PingSource { |
| 113 | type Event = (); |
| 114 | type Metadata = (); |
| 115 | type Ret = (); |
| 116 | type Error = PingError; |
| 117 | |
| 118 | fn process_events<C>( |
| 119 | &mut self, |
| 120 | readiness: Readiness, |
| 121 | token: Token, |
| 122 | mut callback: C, |
| 123 | ) -> Result<PostAction, Self::Error> |
| 124 | where |
| 125 | C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
| 126 | { |
| 127 | self.event |
| 128 | .process_events(readiness, token, |_, fd| { |
| 129 | let counter = drain_ping(fd.as_fd())?; |
| 130 | |
| 131 | // If the LSB is set, it means we were closed. If anything else |
| 132 | // is also set, it means we were pinged. The two are not |
| 133 | // mutually exclusive. |
| 134 | let close = (counter & INCREMENT_CLOSE) != 0; |
| 135 | let ping = (counter & (u64::MAX - 1)) != 0; |
| 136 | |
| 137 | if ping { |
| 138 | callback((), &mut ()); |
| 139 | } |
| 140 | |
| 141 | if close { |
| 142 | Ok(PostAction::Remove) |
| 143 | } else { |
| 144 | Ok(PostAction::Continue) |
| 145 | } |
| 146 | }) |
| 147 | .map_err(|e| PingError(e.into())) |
| 148 | } |
| 149 | |
| 150 | fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { |
| 151 | self.event.register(poll, token_factory) |
| 152 | } |
| 153 | |
| 154 | fn reregister( |
| 155 | &mut self, |
| 156 | poll: &mut Poll, |
| 157 | token_factory: &mut TokenFactory, |
| 158 | ) -> crate::Result<()> { |
| 159 | self.event.reregister(poll, token_factory) |
| 160 | } |
| 161 | |
| 162 | fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { |
| 163 | self.event.unregister(poll) |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | #[derive (Clone, Debug)] |
| 168 | pub struct Ping { |
| 169 | // This is an Arc because it's potentially shared with clones. The last one |
| 170 | // dropped needs to signal to the event source via the eventfd. |
| 171 | event: Arc<FlagOnDrop>, |
| 172 | } |
| 173 | |
| 174 | impl Ping { |
| 175 | /// Send a ping to the `PingSource`. |
| 176 | pub fn ping(&self) { |
| 177 | if let Err(e: Error) = send_ping(self.event.0.as_fd(), INCREMENT_PING) { |
| 178 | log::warn!("[calloop] Failed to write a ping: {:?}" , e); |
| 179 | } |
| 180 | } |
| 181 | } |
| 182 | |
| 183 | /// This manages signalling to the PingSource when it's dropped. There should |
| 184 | /// only ever be one of these per PingSource. |
| 185 | #[derive (Debug)] |
| 186 | struct FlagOnDrop(Arc<OwnedFd>); |
| 187 | |
| 188 | impl Drop for FlagOnDrop { |
| 189 | fn drop(&mut self) { |
| 190 | if let Err(e: Error) = send_ping(self.0.as_fd(), INCREMENT_CLOSE) { |
| 191 | log::warn!("[calloop] Failed to send close ping: {:?}" , e); |
| 192 | } |
| 193 | } |
| 194 | } |
| 195 | |