1//! Eventfd based implementation of the ping event source.
2//!
3//! # Implementation notes
4//!
5//! The eventfd is a much lighter signalling mechanism provided by the Linux
6//! kernel. Rather than write an arbitrary sequence of bytes, it only has a
7//! 64-bit counter.
8//!
9//! To avoid closing the eventfd early, we wrap it in a RAII-style closer
10//! `CloseOnDrop` in `make_ping()`. When all the senders are dropped, another
11//! wrapper `FlagOnDrop` handles signalling this to the event source, which is
12//! the sole owner of the eventfd itself. The senders have weak references to
13//! the eventfd, and if the source is dropped before the senders, they will
14//! simply not do anything (except log a message).
15//!
16//! To differentiate between regular ping events and close ping events, we add 2
17//! to the counter for regular events and 1 for close events. In the source we
18//! can then check the LSB and if it's set, we know it was a close event. This
19//! only works if a close event never fires more than once.
20
21use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
22use std::sync::Arc;
23
24use rustix::event::{eventfd, EventfdFlags};
25use rustix::io::{read, write, Errno};
26use tracing::warn;
27
28use super::PingError;
29use crate::{
30 generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
31};
32
33// These are not bitfields! They are increments to add to the eventfd counter.
34// Since the fd can only be closed once, we can effectively use the
35// INCREMENT_CLOSE value as a bitmask when checking.
36const INCREMENT_PING: u64 = 0x2;
37const INCREMENT_CLOSE: u64 = 0x1;
38
39#[inline]
40pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
41 let read: OwnedFd = eventfd(initval:0, flags:EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;
42
43 // We only have one fd for the eventfd. If the sending end closes it when
44 // all copies are dropped, the receiving end will be closed as well. We need
45 // to make sure the fd is not closed until all holders of it have dropped
46 // it.
47
48 let fd: Arc = Arc::new(data:read);
49
50 let ping: Ping = Ping {
51 event: Arc::new(data:FlagOnDrop(Arc::clone(&fd))),
52 };
53
54 let source: PingSource = PingSource {
55 event: Generic::new(file:ArcAsFd(fd), interest:Interest::READ, Mode::Level),
56 };
57
58 Ok((ping, source))
59}
60
61// Helper functions for the event source IO.
62
63#[inline]
64fn send_ping(fd: BorrowedFd<'_>, count: u64) -> std::io::Result<()> {
65 assert!(count > 0);
66 match write(fd, &count.to_ne_bytes()) {
67 // The write succeeded, the ping will wake up the loop.
68 Ok(_) => Ok(()),
69
70 // The counter hit its cap, which means previous calls to write() will
71 // wake up the loop.
72 Err(Errno::AGAIN) => Ok(()),
73
74 // Anything else is a real error.
75 Err(e: Errno) => Err(e.into()),
76 }
77}
78
79#[inline]
80fn drain_ping(fd: BorrowedFd<'_>) -> std::io::Result<u64> {
81 // The eventfd counter is effectively a u64.
82 const NBYTES: usize = 8;
83 let mut buf: [u8; 8] = [0u8; NBYTES];
84
85 match read(fd, &mut buf) {
86 // Reading from an eventfd should only ever produce 8 bytes. No looping
87 // is required.
88 Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)),
89
90 Ok(_) => unreachable!(),
91
92 // Any other error can be propagated.
93 Err(e: Errno) => Err(e.into()),
94 }
95}
96
97// Rust 1.64.0 adds an `AsFd` implementation for `Arc`, so this won't be needed
98#[derive(Debug)]
99struct ArcAsFd(Arc<OwnedFd>);
100
101impl AsFd for ArcAsFd {
102 fn as_fd(&self) -> BorrowedFd {
103 self.0.as_fd()
104 }
105}
106
107// The event source is simply a generic source with one of the eventfds.
108#[derive(Debug)]
109pub struct PingSource {
110 event: Generic<ArcAsFd>,
111}
112
113impl EventSource for PingSource {
114 type Event = ();
115 type Metadata = ();
116 type Ret = ();
117 type Error = PingError;
118
119 fn process_events<C>(
120 &mut self,
121 readiness: Readiness,
122 token: Token,
123 mut callback: C,
124 ) -> Result<PostAction, Self::Error>
125 where
126 C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
127 {
128 self.event
129 .process_events(readiness, token, |_, fd| {
130 let counter = drain_ping(fd.as_fd())?;
131
132 // If the LSB is set, it means we were closed. If anything else
133 // is also set, it means we were pinged. The two are not
134 // mutually exclusive.
135 let close = (counter & INCREMENT_CLOSE) != 0;
136 let ping = (counter & (u64::MAX - 1)) != 0;
137
138 if ping {
139 callback((), &mut ());
140 }
141
142 if close {
143 Ok(PostAction::Remove)
144 } else {
145 Ok(PostAction::Continue)
146 }
147 })
148 .map_err(|e| PingError(e.into()))
149 }
150
151 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
152 self.event.register(poll, token_factory)
153 }
154
155 fn reregister(
156 &mut self,
157 poll: &mut Poll,
158 token_factory: &mut TokenFactory,
159 ) -> crate::Result<()> {
160 self.event.reregister(poll, token_factory)
161 }
162
163 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
164 self.event.unregister(poll)
165 }
166}
167
168#[derive(Clone, Debug)]
169pub struct Ping {
170 // This is an Arc because it's potentially shared with clones. The last one
171 // dropped needs to signal to the event source via the eventfd.
172 event: Arc<FlagOnDrop>,
173}
174
175impl Ping {
176 /// Send a ping to the `PingSource`.
177 pub fn ping(&self) {
178 if let Err(e: Error) = send_ping(self.event.0.as_fd(), INCREMENT_PING) {
179 warn!("Failed to write a ping: {e:?}");
180 }
181 }
182}
183
184/// This manages signalling to the PingSource when it's dropped. There should
185/// only ever be one of these per PingSource.
186#[derive(Debug)]
187struct FlagOnDrop(Arc<OwnedFd>);
188
189impl Drop for FlagOnDrop {
190 fn drop(&mut self) {
191 if let Err(e: Error) = send_ping(self.0.as_fd(), INCREMENT_CLOSE) {
192 warn!("Failed to send close ping: {e:?}");
193 }
194 }
195}
196