1 | use std::ops::{Deref, DerefMut}; |
2 | #[cfg (unix)] |
3 | use std::os::unix::io::AsRawFd; |
4 | #[cfg (target_os = "wasi" )] |
5 | use std::os::wasi::io::AsRawFd; |
6 | #[cfg (windows)] |
7 | use std::os::windows::io::AsRawSocket; |
8 | #[cfg (debug_assertions)] |
9 | use std::sync::atomic::{AtomicUsize, Ordering}; |
10 | use std::{fmt, io}; |
11 | |
12 | use crate::sys::IoSourceState; |
13 | use crate::{event, Interest, Registry, Token}; |
14 | |
15 | /// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`] |
16 | /// implementation. |
17 | /// |
18 | /// `IoSource` enables registering any FD or socket wrapper with [`Poll`]. |
19 | /// |
20 | /// While only implementations for TCP, UDP, and UDS (Unix only) are provided, |
21 | /// Mio supports registering any FD or socket that can be registered with the |
22 | /// underlying OS selector. `IoSource` provides the necessary bridge. |
23 | /// |
24 | /// [`RawFd`]: std::os::unix::io::RawFd |
25 | /// [`RawSocket`]: std::os::windows::io::RawSocket |
26 | /// |
27 | /// # Notes |
28 | /// |
29 | /// To handle the registrations and events properly **all** I/O operations (such |
30 | /// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the |
31 | /// internal state is updated accordingly. |
32 | /// |
33 | /// [`Poll`]: crate::Poll |
34 | /// [`do_io`]: IoSource::do_io |
35 | /* |
36 | /// |
37 | /// # Examples |
38 | /// |
39 | /// Basic usage. |
40 | /// |
41 | /// ``` |
42 | /// # use std::error::Error; |
43 | /// # fn main() -> Result<(), Box<dyn Error>> { |
44 | /// use mio::{Interest, Poll, Token}; |
45 | /// use mio::IoSource; |
46 | /// |
47 | /// use std::net; |
48 | /// |
49 | /// let poll = Poll::new()?; |
50 | /// |
51 | /// // Bind a std TCP listener. |
52 | /// let listener = net::TcpListener::bind("127.0.0.1:0")?; |
53 | /// // Wrap it in the `IoSource` type. |
54 | /// let mut listener = IoSource::new(listener); |
55 | /// |
56 | /// // Register the listener. |
57 | /// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?; |
58 | /// # Ok(()) |
59 | /// # } |
60 | /// ``` |
61 | */ |
62 | pub struct IoSource<T> { |
63 | state: IoSourceState, |
64 | inner: T, |
65 | #[cfg (debug_assertions)] |
66 | selector_id: SelectorId, |
67 | } |
68 | |
69 | impl<T> IoSource<T> { |
70 | /// Create a new `IoSource`. |
71 | pub fn new(io: T) -> IoSource<T> { |
72 | IoSource { |
73 | state: IoSourceState::new(), |
74 | inner: io, |
75 | #[cfg (debug_assertions)] |
76 | selector_id: SelectorId::new(), |
77 | } |
78 | } |
79 | |
80 | /// Execute an I/O operations ensuring that the socket receives more events |
81 | /// if it hits a [`WouldBlock`] error. |
82 | /// |
83 | /// # Notes |
84 | /// |
85 | /// This method is required to be called for **all** I/O operations to |
86 | /// ensure the user will receive events once the socket is ready again after |
87 | /// returning a [`WouldBlock`] error. |
88 | /// |
89 | /// [`WouldBlock`]: io::ErrorKind::WouldBlock |
90 | pub fn do_io<F, R>(&self, f: F) -> io::Result<R> |
91 | where |
92 | F: FnOnce(&T) -> io::Result<R>, |
93 | { |
94 | self.state.do_io(f, &self.inner) |
95 | } |
96 | |
97 | /// Returns the I/O source, dropping the state. |
98 | /// |
99 | /// # Notes |
100 | /// |
101 | /// To ensure no more events are to be received for this I/O source first |
102 | /// [`deregister`] it. |
103 | /// |
104 | /// [`deregister`]: Registry::deregister |
105 | pub fn into_inner(self) -> T { |
106 | self.inner |
107 | } |
108 | } |
109 | |
110 | /// Be careful when using this method. All I/O operations that may block must go |
111 | /// through the [`do_io`] method. |
112 | /// |
113 | /// [`do_io`]: IoSource::do_io |
114 | impl<T> Deref for IoSource<T> { |
115 | type Target = T; |
116 | |
117 | fn deref(&self) -> &Self::Target { |
118 | &self.inner |
119 | } |
120 | } |
121 | |
122 | /// Be careful when using this method. All I/O operations that may block must go |
123 | /// through the [`do_io`] method. |
124 | /// |
125 | /// [`do_io`]: IoSource::do_io |
126 | impl<T> DerefMut for IoSource<T> { |
127 | fn deref_mut(&mut self) -> &mut Self::Target { |
128 | &mut self.inner |
129 | } |
130 | } |
131 | |
132 | #[cfg (unix)] |
133 | impl<T> event::Source for IoSource<T> |
134 | where |
135 | T: AsRawFd, |
136 | { |
137 | fn register( |
138 | &mut self, |
139 | registry: &Registry, |
140 | token: Token, |
141 | interests: Interest, |
142 | ) -> io::Result<()> { |
143 | #[cfg (debug_assertions)] |
144 | self.selector_id.associate(registry)?; |
145 | registry |
146 | .selector() |
147 | .register(self.inner.as_raw_fd(), token, interests) |
148 | } |
149 | |
150 | fn reregister( |
151 | &mut self, |
152 | registry: &Registry, |
153 | token: Token, |
154 | interests: Interest, |
155 | ) -> io::Result<()> { |
156 | #[cfg (debug_assertions)] |
157 | self.selector_id.check_association(registry)?; |
158 | registry |
159 | .selector() |
160 | .reregister(self.inner.as_raw_fd(), token, interests) |
161 | } |
162 | |
163 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
164 | #[cfg (debug_assertions)] |
165 | self.selector_id.remove_association(registry)?; |
166 | registry.selector().deregister(self.inner.as_raw_fd()) |
167 | } |
168 | } |
169 | |
170 | #[cfg (windows)] |
171 | impl<T> event::Source for IoSource<T> |
172 | where |
173 | T: AsRawSocket, |
174 | { |
175 | fn register( |
176 | &mut self, |
177 | registry: &Registry, |
178 | token: Token, |
179 | interests: Interest, |
180 | ) -> io::Result<()> { |
181 | #[cfg (debug_assertions)] |
182 | self.selector_id.associate(registry)?; |
183 | self.state |
184 | .register(registry, token, interests, self.inner.as_raw_socket()) |
185 | } |
186 | |
187 | fn reregister( |
188 | &mut self, |
189 | registry: &Registry, |
190 | token: Token, |
191 | interests: Interest, |
192 | ) -> io::Result<()> { |
193 | #[cfg (debug_assertions)] |
194 | self.selector_id.check_association(registry)?; |
195 | self.state.reregister(registry, token, interests) |
196 | } |
197 | |
198 | fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { |
199 | #[cfg (debug_assertions)] |
200 | self.selector_id.remove_association(_registry)?; |
201 | self.state.deregister() |
202 | } |
203 | } |
204 | |
205 | #[cfg (target_os = "wasi" )] |
206 | impl<T> event::Source for IoSource<T> |
207 | where |
208 | T: AsRawFd, |
209 | { |
210 | fn register( |
211 | &mut self, |
212 | registry: &Registry, |
213 | token: Token, |
214 | interests: Interest, |
215 | ) -> io::Result<()> { |
216 | #[cfg (debug_assertions)] |
217 | self.selector_id.associate(registry)?; |
218 | registry |
219 | .selector() |
220 | .register(self.inner.as_raw_fd() as _, token, interests) |
221 | } |
222 | |
223 | fn reregister( |
224 | &mut self, |
225 | registry: &Registry, |
226 | token: Token, |
227 | interests: Interest, |
228 | ) -> io::Result<()> { |
229 | #[cfg (debug_assertions)] |
230 | self.selector_id.check_association(registry)?; |
231 | registry |
232 | .selector() |
233 | .reregister(self.inner.as_raw_fd() as _, token, interests) |
234 | } |
235 | |
236 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
237 | #[cfg (debug_assertions)] |
238 | self.selector_id.remove_association(registry)?; |
239 | registry.selector().deregister(self.inner.as_raw_fd() as _) |
240 | } |
241 | } |
242 | |
243 | impl<T> fmt::Debug for IoSource<T> |
244 | where |
245 | T: fmt::Debug, |
246 | { |
247 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
248 | self.inner.fmt(f) |
249 | } |
250 | } |
251 | |
252 | /// Used to associate an `IoSource` with a `sys::Selector`. |
253 | #[cfg (debug_assertions)] |
254 | #[derive (Debug)] |
255 | struct SelectorId { |
256 | id: AtomicUsize, |
257 | } |
258 | |
259 | #[cfg (debug_assertions)] |
260 | impl SelectorId { |
261 | /// Value of `id` if `SelectorId` is not associated with any |
262 | /// `sys::Selector`. Valid selector ids start at 1. |
263 | const UNASSOCIATED: usize = 0; |
264 | |
265 | /// Create a new `SelectorId`. |
266 | const fn new() -> SelectorId { |
267 | SelectorId { |
268 | id: AtomicUsize::new(Self::UNASSOCIATED), |
269 | } |
270 | } |
271 | |
272 | /// Associate an I/O source with `registry`, returning an error if its |
273 | /// already registered. |
274 | fn associate(&self, registry: &Registry) -> io::Result<()> { |
275 | let registry_id = registry.selector().id(); |
276 | let previous_id = self.id.swap(registry_id, Ordering::AcqRel); |
277 | |
278 | if previous_id == Self::UNASSOCIATED { |
279 | Ok(()) |
280 | } else { |
281 | Err(io::Error::new( |
282 | io::ErrorKind::AlreadyExists, |
283 | "I/O source already registered with a `Registry`" , |
284 | )) |
285 | } |
286 | } |
287 | |
288 | /// Check the association of an I/O source with `registry`, returning an |
289 | /// error if its registered with a different `Registry` or not registered at |
290 | /// all. |
291 | fn check_association(&self, registry: &Registry) -> io::Result<()> { |
292 | let registry_id = registry.selector().id(); |
293 | let id = self.id.load(Ordering::Acquire); |
294 | |
295 | if id == registry_id { |
296 | Ok(()) |
297 | } else if id == Self::UNASSOCIATED { |
298 | Err(io::Error::new( |
299 | io::ErrorKind::NotFound, |
300 | "I/O source not registered with `Registry`" , |
301 | )) |
302 | } else { |
303 | Err(io::Error::new( |
304 | io::ErrorKind::AlreadyExists, |
305 | "I/O source already registered with a different `Registry`" , |
306 | )) |
307 | } |
308 | } |
309 | |
310 | /// Remove a previously made association from `registry`, returns an error |
311 | /// if it was not previously associated with `registry`. |
312 | fn remove_association(&self, registry: &Registry) -> io::Result<()> { |
313 | let registry_id = registry.selector().id(); |
314 | let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel); |
315 | |
316 | if previous_id == registry_id { |
317 | Ok(()) |
318 | } else { |
319 | Err(io::Error::new( |
320 | io::ErrorKind::NotFound, |
321 | "I/O source not registered with `Registry`" , |
322 | )) |
323 | } |
324 | } |
325 | } |
326 | |
327 | #[cfg (debug_assertions)] |
328 | impl Clone for SelectorId { |
329 | fn clone(&self) -> SelectorId { |
330 | SelectorId { |
331 | id: AtomicUsize::new(self.id.load(order:Ordering::Acquire)), |
332 | } |
333 | } |
334 | } |
335 | |