1use std::ops::{Deref, DerefMut};
2#[cfg(unix)]
3use std::os::unix::io::AsRawFd;
4#[cfg(target_os = "wasi")]
5use std::os::wasi::io::AsRawFd;
6#[cfg(windows)]
7use std::os::windows::io::AsRawSocket;
8#[cfg(debug_assertions)]
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::{fmt, io};
11
12use crate::sys::IoSourceState;
13use crate::{event, Interest, Registry, Token};
14
15/// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`]
16/// implementation.
17///
18/// `IoSource` enables registering any FD or socket wrapper with [`Poll`].
19///
20/// While only implementations for TCP, UDP, and UDS (Unix only) are provided,
21/// Mio supports registering any FD or socket that can be registered with the
22/// underlying OS selector. `IoSource` provides the necessary bridge.
23///
24/// [`RawFd`]: std::os::unix::io::RawFd
25/// [`RawSocket`]: std::os::windows::io::RawSocket
26///
27/// # Notes
28///
29/// To handle the registrations and events properly **all** I/O operations (such
30/// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the
31/// internal state is updated accordingly.
32///
33/// [`Poll`]: crate::Poll
34/// [`do_io`]: IoSource::do_io
35/*
36///
37/// # Examples
38///
39/// Basic usage.
40///
41/// ```
42/// # use std::error::Error;
43/// # fn main() -> Result<(), Box<dyn Error>> {
44/// use mio::{Interest, Poll, Token};
45/// use mio::IoSource;
46///
47/// use std::net;
48///
49/// let poll = Poll::new()?;
50///
51/// // Bind a std TCP listener.
52/// let listener = net::TcpListener::bind("127.0.0.1:0")?;
53/// // Wrap it in the `IoSource` type.
54/// let mut listener = IoSource::new(listener);
55///
56/// // Register the listener.
57/// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?;
58/// # Ok(())
59/// # }
60/// ```
61*/
62pub struct IoSource<T> {
63 state: IoSourceState,
64 inner: T,
65 #[cfg(debug_assertions)]
66 selector_id: SelectorId,
67}
68
69impl<T> IoSource<T> {
70 /// Create a new `IoSource`.
71 pub fn new(io: T) -> IoSource<T> {
72 IoSource {
73 state: IoSourceState::new(),
74 inner: io,
75 #[cfg(debug_assertions)]
76 selector_id: SelectorId::new(),
77 }
78 }
79
80 /// Execute an I/O operations ensuring that the socket receives more events
81 /// if it hits a [`WouldBlock`] error.
82 ///
83 /// # Notes
84 ///
85 /// This method is required to be called for **all** I/O operations to
86 /// ensure the user will receive events once the socket is ready again after
87 /// returning a [`WouldBlock`] error.
88 ///
89 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
90 pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
91 where
92 F: FnOnce(&T) -> io::Result<R>,
93 {
94 self.state.do_io(f, &self.inner)
95 }
96
97 /// Returns the I/O source, dropping the state.
98 ///
99 /// # Notes
100 ///
101 /// To ensure no more events are to be received for this I/O source first
102 /// [`deregister`] it.
103 ///
104 /// [`deregister`]: Registry::deregister
105 pub fn into_inner(self) -> T {
106 self.inner
107 }
108}
109
110/// Be careful when using this method. All I/O operations that may block must go
111/// through the [`do_io`] method.
112///
113/// [`do_io`]: IoSource::do_io
114impl<T> Deref for IoSource<T> {
115 type Target = T;
116
117 fn deref(&self) -> &Self::Target {
118 &self.inner
119 }
120}
121
122/// Be careful when using this method. All I/O operations that may block must go
123/// through the [`do_io`] method.
124///
125/// [`do_io`]: IoSource::do_io
126impl<T> DerefMut for IoSource<T> {
127 fn deref_mut(&mut self) -> &mut Self::Target {
128 &mut self.inner
129 }
130}
131
132#[cfg(unix)]
133impl<T> event::Source for IoSource<T>
134where
135 T: AsRawFd,
136{
137 fn register(
138 &mut self,
139 registry: &Registry,
140 token: Token,
141 interests: Interest,
142 ) -> io::Result<()> {
143 #[cfg(debug_assertions)]
144 self.selector_id.associate(registry)?;
145 registry
146 .selector()
147 .register(self.inner.as_raw_fd(), token, interests)
148 }
149
150 fn reregister(
151 &mut self,
152 registry: &Registry,
153 token: Token,
154 interests: Interest,
155 ) -> io::Result<()> {
156 #[cfg(debug_assertions)]
157 self.selector_id.check_association(registry)?;
158 registry
159 .selector()
160 .reregister(self.inner.as_raw_fd(), token, interests)
161 }
162
163 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
164 #[cfg(debug_assertions)]
165 self.selector_id.remove_association(registry)?;
166 registry.selector().deregister(self.inner.as_raw_fd())
167 }
168}
169
170#[cfg(windows)]
171impl<T> event::Source for IoSource<T>
172where
173 T: AsRawSocket,
174{
175 fn register(
176 &mut self,
177 registry: &Registry,
178 token: Token,
179 interests: Interest,
180 ) -> io::Result<()> {
181 #[cfg(debug_assertions)]
182 self.selector_id.associate(registry)?;
183 self.state
184 .register(registry, token, interests, self.inner.as_raw_socket())
185 }
186
187 fn reregister(
188 &mut self,
189 registry: &Registry,
190 token: Token,
191 interests: Interest,
192 ) -> io::Result<()> {
193 #[cfg(debug_assertions)]
194 self.selector_id.check_association(registry)?;
195 self.state.reregister(registry, token, interests)
196 }
197
198 fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
199 #[cfg(debug_assertions)]
200 self.selector_id.remove_association(_registry)?;
201 self.state.deregister()
202 }
203}
204
205#[cfg(target_os = "wasi")]
206impl<T> event::Source for IoSource<T>
207where
208 T: AsRawFd,
209{
210 fn register(
211 &mut self,
212 registry: &Registry,
213 token: Token,
214 interests: Interest,
215 ) -> io::Result<()> {
216 #[cfg(debug_assertions)]
217 self.selector_id.associate(registry)?;
218 registry
219 .selector()
220 .register(self.inner.as_raw_fd() as _, token, interests)
221 }
222
223 fn reregister(
224 &mut self,
225 registry: &Registry,
226 token: Token,
227 interests: Interest,
228 ) -> io::Result<()> {
229 #[cfg(debug_assertions)]
230 self.selector_id.check_association(registry)?;
231 registry
232 .selector()
233 .reregister(self.inner.as_raw_fd() as _, token, interests)
234 }
235
236 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
237 #[cfg(debug_assertions)]
238 self.selector_id.remove_association(registry)?;
239 registry.selector().deregister(self.inner.as_raw_fd() as _)
240 }
241}
242
243impl<T> fmt::Debug for IoSource<T>
244where
245 T: fmt::Debug,
246{
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 self.inner.fmt(f)
249 }
250}
251
252/// Used to associate an `IoSource` with a `sys::Selector`.
253#[cfg(debug_assertions)]
254#[derive(Debug)]
255struct SelectorId {
256 id: AtomicUsize,
257}
258
259#[cfg(debug_assertions)]
260impl SelectorId {
261 /// Value of `id` if `SelectorId` is not associated with any
262 /// `sys::Selector`. Valid selector ids start at 1.
263 const UNASSOCIATED: usize = 0;
264
265 /// Create a new `SelectorId`.
266 const fn new() -> SelectorId {
267 SelectorId {
268 id: AtomicUsize::new(Self::UNASSOCIATED),
269 }
270 }
271
272 /// Associate an I/O source with `registry`, returning an error if its
273 /// already registered.
274 fn associate(&self, registry: &Registry) -> io::Result<()> {
275 let registry_id = registry.selector().id();
276 let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
277
278 if previous_id == Self::UNASSOCIATED {
279 Ok(())
280 } else {
281 Err(io::Error::new(
282 io::ErrorKind::AlreadyExists,
283 "I/O source already registered with a `Registry`",
284 ))
285 }
286 }
287
288 /// Check the association of an I/O source with `registry`, returning an
289 /// error if its registered with a different `Registry` or not registered at
290 /// all.
291 fn check_association(&self, registry: &Registry) -> io::Result<()> {
292 let registry_id = registry.selector().id();
293 let id = self.id.load(Ordering::Acquire);
294
295 if id == registry_id {
296 Ok(())
297 } else if id == Self::UNASSOCIATED {
298 Err(io::Error::new(
299 io::ErrorKind::NotFound,
300 "I/O source not registered with `Registry`",
301 ))
302 } else {
303 Err(io::Error::new(
304 io::ErrorKind::AlreadyExists,
305 "I/O source already registered with a different `Registry`",
306 ))
307 }
308 }
309
310 /// Remove a previously made association from `registry`, returns an error
311 /// if it was not previously associated with `registry`.
312 fn remove_association(&self, registry: &Registry) -> io::Result<()> {
313 let registry_id = registry.selector().id();
314 let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
315
316 if previous_id == registry_id {
317 Ok(())
318 } else {
319 Err(io::Error::new(
320 io::ErrorKind::NotFound,
321 "I/O source not registered with `Registry`",
322 ))
323 }
324 }
325}
326
327#[cfg(debug_assertions)]
328impl Clone for SelectorId {
329 fn clone(&self) -> SelectorId {
330 SelectorId {
331 id: AtomicUsize::new(self.id.load(order:Ordering::Acquire)),
332 }
333 }
334}
335