1use std::ops::{Deref, DerefMut};
2#[cfg(any(unix, target_os = "wasi"))]
3use std::os::fd::AsRawFd;
4// TODO: once <https://github.com/rust-lang/rust/issues/126198> is fixed this
5// can use `std::os::fd` and be merged with the above.
6#[cfg(target_os = "hermit")]
7use std::os::hermit::io::AsRawFd;
8#[cfg(windows)]
9use std::os::windows::io::AsRawSocket;
10#[cfg(debug_assertions)]
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::{fmt, io};
13
14use crate::sys::IoSourceState;
15use crate::{event, Interest, Registry, Token};
16
17/// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`]
18/// implementation.
19///
20/// `IoSource` enables registering any FD or socket wrapper with [`Poll`].
21///
22/// While only implementations for TCP, UDP, and UDS (Unix only) are provided,
23/// Mio supports registering any FD or socket that can be registered with the
24/// underlying OS selector. `IoSource` provides the necessary bridge.
25///
26/// [`RawFd`]: std::os::fd::RawFd
27/// [`RawSocket`]: std::os::windows::io::RawSocket
28///
29/// # Notes
30///
31/// To handle the registrations and events properly **all** I/O operations (such
32/// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the
33/// internal state is updated accordingly.
34///
35/// [`Poll`]: crate::Poll
36/// [`do_io`]: IoSource::do_io
37pub struct IoSource<T> {
38 state: IoSourceState,
39 inner: T,
40 #[cfg(debug_assertions)]
41 selector_id: SelectorId,
42}
43
44impl<T> IoSource<T> {
45 /// Create a new `IoSource`.
46 pub fn new(io: T) -> IoSource<T> {
47 IoSource {
48 state: IoSourceState::new(),
49 inner: io,
50 #[cfg(debug_assertions)]
51 selector_id: SelectorId::new(),
52 }
53 }
54
55 /// Execute an I/O operations ensuring that the socket receives more events
56 /// if it hits a [`WouldBlock`] error.
57 ///
58 /// # Notes
59 ///
60 /// This method is required to be called for **all** I/O operations to
61 /// ensure the user will receive events once the socket is ready again after
62 /// returning a [`WouldBlock`] error.
63 ///
64 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
65 pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
66 where
67 F: FnOnce(&T) -> io::Result<R>,
68 {
69 self.state.do_io(f, &self.inner)
70 }
71
72 /// Returns the I/O source, dropping the state.
73 ///
74 /// # Notes
75 ///
76 /// To ensure no more events are to be received for this I/O source first
77 /// [`deregister`] it.
78 ///
79 /// [`deregister`]: Registry::deregister
80 pub fn into_inner(self) -> T {
81 self.inner
82 }
83}
84
85/// Be careful when using this method. All I/O operations that may block must go
86/// through the [`do_io`] method.
87///
88/// [`do_io`]: IoSource::do_io
89impl<T> Deref for IoSource<T> {
90 type Target = T;
91
92 fn deref(&self) -> &Self::Target {
93 &self.inner
94 }
95}
96
97/// Be careful when using this method. All I/O operations that may block must go
98/// through the [`do_io`] method.
99///
100/// [`do_io`]: IoSource::do_io
101impl<T> DerefMut for IoSource<T> {
102 fn deref_mut(&mut self) -> &mut Self::Target {
103 &mut self.inner
104 }
105}
106
107#[cfg(any(unix, target_os = "hermit"))]
108impl<T> event::Source for IoSource<T>
109where
110 T: AsRawFd,
111{
112 fn register(
113 &mut self,
114 registry: &Registry,
115 token: Token,
116 interests: Interest,
117 ) -> io::Result<()> {
118 #[cfg(debug_assertions)]
119 self.selector_id.associate(registry)?;
120 self.state
121 .register(registry, token, interests, self.inner.as_raw_fd())
122 }
123
124 fn reregister(
125 &mut self,
126 registry: &Registry,
127 token: Token,
128 interests: Interest,
129 ) -> io::Result<()> {
130 #[cfg(debug_assertions)]
131 self.selector_id.check_association(registry)?;
132 self.state
133 .reregister(registry, token, interests, self.inner.as_raw_fd())
134 }
135
136 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
137 #[cfg(debug_assertions)]
138 self.selector_id.remove_association(registry)?;
139 self.state.deregister(registry, self.inner.as_raw_fd())
140 }
141}
142
143#[cfg(windows)]
144impl<T> event::Source for IoSource<T>
145where
146 T: AsRawSocket,
147{
148 fn register(
149 &mut self,
150 registry: &Registry,
151 token: Token,
152 interests: Interest,
153 ) -> io::Result<()> {
154 #[cfg(debug_assertions)]
155 self.selector_id.associate(registry)?;
156 self.state
157 .register(registry, token, interests, self.inner.as_raw_socket())
158 }
159
160 fn reregister(
161 &mut self,
162 registry: &Registry,
163 token: Token,
164 interests: Interest,
165 ) -> io::Result<()> {
166 #[cfg(debug_assertions)]
167 self.selector_id.check_association(registry)?;
168 self.state.reregister(registry, token, interests)
169 }
170
171 fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
172 #[cfg(debug_assertions)]
173 self.selector_id.remove_association(_registry)?;
174 self.state.deregister()
175 }
176}
177
178#[cfg(target_os = "wasi")]
179impl<T> event::Source for IoSource<T>
180where
181 T: AsRawFd,
182{
183 fn register(
184 &mut self,
185 registry: &Registry,
186 token: Token,
187 interests: Interest,
188 ) -> io::Result<()> {
189 #[cfg(debug_assertions)]
190 self.selector_id.associate(registry)?;
191 registry
192 .selector()
193 .register(self.inner.as_raw_fd() as _, token, interests)
194 }
195
196 fn reregister(
197 &mut self,
198 registry: &Registry,
199 token: Token,
200 interests: Interest,
201 ) -> io::Result<()> {
202 #[cfg(debug_assertions)]
203 self.selector_id.check_association(registry)?;
204 registry
205 .selector()
206 .reregister(self.inner.as_raw_fd() as _, token, interests)
207 }
208
209 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
210 #[cfg(debug_assertions)]
211 self.selector_id.remove_association(registry)?;
212 registry.selector().deregister(self.inner.as_raw_fd() as _)
213 }
214}
215
216impl<T> fmt::Debug for IoSource<T>
217where
218 T: fmt::Debug,
219{
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 self.inner.fmt(f)
222 }
223}
224
225/// Used to associate an `IoSource` with a `sys::Selector`.
226#[cfg(debug_assertions)]
227#[derive(Debug)]
228struct SelectorId {
229 id: AtomicUsize,
230}
231
232#[cfg(debug_assertions)]
233impl SelectorId {
234 /// Value of `id` if `SelectorId` is not associated with any
235 /// `sys::Selector`. Valid selector ids start at 1.
236 const UNASSOCIATED: usize = 0;
237
238 /// Create a new `SelectorId`.
239 const fn new() -> SelectorId {
240 SelectorId {
241 id: AtomicUsize::new(Self::UNASSOCIATED),
242 }
243 }
244
245 /// Associate an I/O source with `registry`, returning an error if its
246 /// already registered.
247 fn associate(&self, registry: &Registry) -> io::Result<()> {
248 let registry_id = registry.selector().id();
249 let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
250
251 if previous_id == Self::UNASSOCIATED {
252 Ok(())
253 } else {
254 Err(io::Error::new(
255 io::ErrorKind::AlreadyExists,
256 "I/O source already registered with a `Registry`",
257 ))
258 }
259 }
260
261 /// Check the association of an I/O source with `registry`, returning an
262 /// error if its registered with a different `Registry` or not registered at
263 /// all.
264 fn check_association(&self, registry: &Registry) -> io::Result<()> {
265 let registry_id = registry.selector().id();
266 let id = self.id.load(Ordering::Acquire);
267
268 if id == registry_id {
269 Ok(())
270 } else if id == Self::UNASSOCIATED {
271 Err(io::Error::new(
272 io::ErrorKind::NotFound,
273 "I/O source not registered with `Registry`",
274 ))
275 } else {
276 Err(io::Error::new(
277 io::ErrorKind::AlreadyExists,
278 "I/O source already registered with a different `Registry`",
279 ))
280 }
281 }
282
283 /// Remove a previously made association from `registry`, returns an error
284 /// if it was not previously associated with `registry`.
285 fn remove_association(&self, registry: &Registry) -> io::Result<()> {
286 let registry_id = registry.selector().id();
287 let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
288
289 if previous_id == registry_id {
290 Ok(())
291 } else {
292 Err(io::Error::new(
293 io::ErrorKind::NotFound,
294 "I/O source not registered with `Registry`",
295 ))
296 }
297 }
298}
299
300#[cfg(debug_assertions)]
301impl Clone for SelectorId {
302 fn clone(&self) -> SelectorId {
303 SelectorId {
304 id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
305 }
306 }
307}
308