1use std::ops::{Deref, DerefMut};
2#[cfg(unix)]
3use std::os::unix::io::AsRawFd;
4#[cfg(target_os = "wasi")]
5use std::os::wasi::io::AsRawFd;
6#[cfg(windows)]
7use std::os::windows::io::AsRawSocket;
8#[cfg(debug_assertions)]
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::{fmt, io};
11
12use crate::sys::IoSourceState;
13use crate::{event, Interest, Registry, Token};
14
15/// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`]
16/// implementation.
17///
18/// `IoSource` enables registering any FD or socket wrapper with [`Poll`].
19///
20/// While only implementations for TCP, UDP, and UDS (Unix only) are provided,
21/// Mio supports registering any FD or socket that can be registered with the
22/// underlying OS selector. `IoSource` provides the necessary bridge.
23///
24/// [`RawFd`]: std::os::unix::io::RawFd
25/// [`RawSocket`]: std::os::windows::io::RawSocket
26///
27/// # Notes
28///
29/// To handle the registrations and events properly **all** I/O operations (such
30/// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the
31/// internal state is updated accordingly.
32///
33/// [`Poll`]: crate::Poll
34/// [`do_io`]: IoSource::do_io
35/*
36///
37/// # Examples
38///
39/// Basic usage.
40///
41/// ```
42/// # use std::error::Error;
43/// # fn main() -> Result<(), Box<dyn Error>> {
44/// use mio::{Interest, Poll, Token};
45/// use mio::IoSource;
46///
47/// use std::net;
48///
49/// let poll = Poll::new()?;
50///
51/// // Bind a std TCP listener.
52/// let listener = net::TcpListener::bind("127.0.0.1:0")?;
53/// // Wrap it in the `IoSource` type.
54/// let mut listener = IoSource::new(listener);
55///
56/// // Register the listener.
57/// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?;
58/// # Ok(())
59/// # }
60/// ```
61*/
62pub struct IoSource<T> {
63 state: IoSourceState,
64 inner: T,
65 #[cfg(debug_assertions)]
66 selector_id: SelectorId,
67}
68
69impl<T> IoSource<T> {
70 /// Create a new `IoSource`.
71 pub fn new(io: T) -> IoSource<T> {
72 IoSource {
73 state: IoSourceState::new(),
74 inner: io,
75 #[cfg(debug_assertions)]
76 selector_id: SelectorId::new(),
77 }
78 }
79
80 /// Execute an I/O operations ensuring that the socket receives more events
81 /// if it hits a [`WouldBlock`] error.
82 ///
83 /// # Notes
84 ///
85 /// This method is required to be called for **all** I/O operations to
86 /// ensure the user will receive events once the socket is ready again after
87 /// returning a [`WouldBlock`] error.
88 ///
89 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
90 pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
91 where
92 F: FnOnce(&T) -> io::Result<R>,
93 {
94 self.state.do_io(f, &self.inner)
95 }
96
97 /// Returns the I/O source, dropping the state.
98 ///
99 /// # Notes
100 ///
101 /// To ensure no more events are to be received for this I/O source first
102 /// [`deregister`] it.
103 ///
104 /// [`deregister`]: Registry::deregister
105 pub fn into_inner(self) -> T {
106 self.inner
107 }
108}
109
110/// Be careful when using this method. All I/O operations that may block must go
111/// through the [`do_io`] method.
112///
113/// [`do_io`]: IoSource::do_io
114impl<T> Deref for IoSource<T> {
115 type Target = T;
116
117 fn deref(&self) -> &Self::Target {
118 &self.inner
119 }
120}
121
122/// Be careful when using this method. All I/O operations that may block must go
123/// through the [`do_io`] method.
124///
125/// [`do_io`]: IoSource::do_io
126impl<T> DerefMut for IoSource<T> {
127 fn deref_mut(&mut self) -> &mut Self::Target {
128 &mut self.inner
129 }
130}
131
132#[cfg(unix)]
133impl<T> event::Source for IoSource<T>
134where
135 T: AsRawFd,
136{
137 fn register(
138 &mut self,
139 registry: &Registry,
140 token: Token,
141 interests: Interest,
142 ) -> io::Result<()> {
143 #[cfg(debug_assertions)]
144 self.selector_id.associate(registry)?;
145 self.state
146 .register(registry, token, interests, self.inner.as_raw_fd())
147 }
148
149 fn reregister(
150 &mut self,
151 registry: &Registry,
152 token: Token,
153 interests: Interest,
154 ) -> io::Result<()> {
155 #[cfg(debug_assertions)]
156 self.selector_id.check_association(registry)?;
157 self.state
158 .reregister(registry, token, interests, self.inner.as_raw_fd())
159 }
160
161 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
162 #[cfg(debug_assertions)]
163 self.selector_id.remove_association(registry)?;
164 self.state.deregister(registry, self.inner.as_raw_fd())
165 }
166}
167
168#[cfg(windows)]
169impl<T> event::Source for IoSource<T>
170where
171 T: AsRawSocket,
172{
173 fn register(
174 &mut self,
175 registry: &Registry,
176 token: Token,
177 interests: Interest,
178 ) -> io::Result<()> {
179 #[cfg(debug_assertions)]
180 self.selector_id.associate(registry)?;
181 self.state
182 .register(registry, token, interests, self.inner.as_raw_socket())
183 }
184
185 fn reregister(
186 &mut self,
187 registry: &Registry,
188 token: Token,
189 interests: Interest,
190 ) -> io::Result<()> {
191 #[cfg(debug_assertions)]
192 self.selector_id.check_association(registry)?;
193 self.state.reregister(registry, token, interests)
194 }
195
196 fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
197 #[cfg(debug_assertions)]
198 self.selector_id.remove_association(_registry)?;
199 self.state.deregister()
200 }
201}
202
203#[cfg(target_os = "wasi")]
204impl<T> event::Source for IoSource<T>
205where
206 T: AsRawFd,
207{
208 fn register(
209 &mut self,
210 registry: &Registry,
211 token: Token,
212 interests: Interest,
213 ) -> io::Result<()> {
214 #[cfg(debug_assertions)]
215 self.selector_id.associate(registry)?;
216 registry
217 .selector()
218 .register(self.inner.as_raw_fd() as _, token, interests)
219 }
220
221 fn reregister(
222 &mut self,
223 registry: &Registry,
224 token: Token,
225 interests: Interest,
226 ) -> io::Result<()> {
227 #[cfg(debug_assertions)]
228 self.selector_id.check_association(registry)?;
229 registry
230 .selector()
231 .reregister(self.inner.as_raw_fd() as _, token, interests)
232 }
233
234 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
235 #[cfg(debug_assertions)]
236 self.selector_id.remove_association(registry)?;
237 registry.selector().deregister(self.inner.as_raw_fd() as _)
238 }
239}
240
241impl<T> fmt::Debug for IoSource<T>
242where
243 T: fmt::Debug,
244{
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 self.inner.fmt(f)
247 }
248}
249
250/// Used to associate an `IoSource` with a `sys::Selector`.
251#[cfg(debug_assertions)]
252#[derive(Debug)]
253struct SelectorId {
254 id: AtomicUsize,
255}
256
257#[cfg(debug_assertions)]
258impl SelectorId {
259 /// Value of `id` if `SelectorId` is not associated with any
260 /// `sys::Selector`. Valid selector ids start at 1.
261 const UNASSOCIATED: usize = 0;
262
263 /// Create a new `SelectorId`.
264 const fn new() -> SelectorId {
265 SelectorId {
266 id: AtomicUsize::new(Self::UNASSOCIATED),
267 }
268 }
269
270 /// Associate an I/O source with `registry`, returning an error if its
271 /// already registered.
272 fn associate(&self, registry: &Registry) -> io::Result<()> {
273 let registry_id = registry.selector().id();
274 let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
275
276 if previous_id == Self::UNASSOCIATED {
277 Ok(())
278 } else {
279 Err(io::Error::new(
280 io::ErrorKind::AlreadyExists,
281 "I/O source already registered with a `Registry`",
282 ))
283 }
284 }
285
286 /// Check the association of an I/O source with `registry`, returning an
287 /// error if its registered with a different `Registry` or not registered at
288 /// all.
289 fn check_association(&self, registry: &Registry) -> io::Result<()> {
290 let registry_id = registry.selector().id();
291 let id = self.id.load(Ordering::Acquire);
292
293 if id == registry_id {
294 Ok(())
295 } else if id == Self::UNASSOCIATED {
296 Err(io::Error::new(
297 io::ErrorKind::NotFound,
298 "I/O source not registered with `Registry`",
299 ))
300 } else {
301 Err(io::Error::new(
302 io::ErrorKind::AlreadyExists,
303 "I/O source already registered with a different `Registry`",
304 ))
305 }
306 }
307
308 /// Remove a previously made association from `registry`, returns an error
309 /// if it was not previously associated with `registry`.
310 fn remove_association(&self, registry: &Registry) -> io::Result<()> {
311 let registry_id = registry.selector().id();
312 let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
313
314 if previous_id == registry_id {
315 Ok(())
316 } else {
317 Err(io::Error::new(
318 io::ErrorKind::NotFound,
319 "I/O source not registered with `Registry`",
320 ))
321 }
322 }
323}
324
325#[cfg(debug_assertions)]
326impl Clone for SelectorId {
327 fn clone(&self) -> SelectorId {
328 SelectorId {
329 id: AtomicUsize::new(self.id.load(order:Ordering::Acquire)),
330 }
331 }
332}
333