1 | use std::ops::{Deref, DerefMut}; |
2 | #[cfg (unix)] |
3 | use std::os::unix::io::AsRawFd; |
4 | #[cfg (target_os = "wasi" )] |
5 | use std::os::wasi::io::AsRawFd; |
6 | #[cfg (windows)] |
7 | use std::os::windows::io::AsRawSocket; |
8 | #[cfg (debug_assertions)] |
9 | use std::sync::atomic::{AtomicUsize, Ordering}; |
10 | use std::{fmt, io}; |
11 | |
12 | use crate::sys::IoSourceState; |
13 | use crate::{event, Interest, Registry, Token}; |
14 | |
15 | /// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`] |
16 | /// implementation. |
17 | /// |
18 | /// `IoSource` enables registering any FD or socket wrapper with [`Poll`]. |
19 | /// |
20 | /// While only implementations for TCP, UDP, and UDS (Unix only) are provided, |
21 | /// Mio supports registering any FD or socket that can be registered with the |
22 | /// underlying OS selector. `IoSource` provides the necessary bridge. |
23 | /// |
24 | /// [`RawFd`]: std::os::unix::io::RawFd |
25 | /// [`RawSocket`]: std::os::windows::io::RawSocket |
26 | /// |
27 | /// # Notes |
28 | /// |
29 | /// To handle the registrations and events properly **all** I/O operations (such |
30 | /// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the |
31 | /// internal state is updated accordingly. |
32 | /// |
33 | /// [`Poll`]: crate::Poll |
34 | /// [`do_io`]: IoSource::do_io |
35 | /* |
36 | /// |
37 | /// # Examples |
38 | /// |
39 | /// Basic usage. |
40 | /// |
41 | /// ``` |
42 | /// # use std::error::Error; |
43 | /// # fn main() -> Result<(), Box<dyn Error>> { |
44 | /// use mio::{Interest, Poll, Token}; |
45 | /// use mio::IoSource; |
46 | /// |
47 | /// use std::net; |
48 | /// |
49 | /// let poll = Poll::new()?; |
50 | /// |
51 | /// // Bind a std TCP listener. |
52 | /// let listener = net::TcpListener::bind("127.0.0.1:0")?; |
53 | /// // Wrap it in the `IoSource` type. |
54 | /// let mut listener = IoSource::new(listener); |
55 | /// |
56 | /// // Register the listener. |
57 | /// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?; |
58 | /// # Ok(()) |
59 | /// # } |
60 | /// ``` |
61 | */ |
62 | pub struct IoSource<T> { |
63 | state: IoSourceState, |
64 | inner: T, |
65 | #[cfg (debug_assertions)] |
66 | selector_id: SelectorId, |
67 | } |
68 | |
69 | impl<T> IoSource<T> { |
70 | /// Create a new `IoSource`. |
71 | pub fn new(io: T) -> IoSource<T> { |
72 | IoSource { |
73 | state: IoSourceState::new(), |
74 | inner: io, |
75 | #[cfg (debug_assertions)] |
76 | selector_id: SelectorId::new(), |
77 | } |
78 | } |
79 | |
80 | /// Execute an I/O operations ensuring that the socket receives more events |
81 | /// if it hits a [`WouldBlock`] error. |
82 | /// |
83 | /// # Notes |
84 | /// |
85 | /// This method is required to be called for **all** I/O operations to |
86 | /// ensure the user will receive events once the socket is ready again after |
87 | /// returning a [`WouldBlock`] error. |
88 | /// |
89 | /// [`WouldBlock`]: io::ErrorKind::WouldBlock |
90 | pub fn do_io<F, R>(&self, f: F) -> io::Result<R> |
91 | where |
92 | F: FnOnce(&T) -> io::Result<R>, |
93 | { |
94 | self.state.do_io(f, &self.inner) |
95 | } |
96 | |
97 | /// Returns the I/O source, dropping the state. |
98 | /// |
99 | /// # Notes |
100 | /// |
101 | /// To ensure no more events are to be received for this I/O source first |
102 | /// [`deregister`] it. |
103 | /// |
104 | /// [`deregister`]: Registry::deregister |
105 | pub fn into_inner(self) -> T { |
106 | self.inner |
107 | } |
108 | } |
109 | |
110 | /// Be careful when using this method. All I/O operations that may block must go |
111 | /// through the [`do_io`] method. |
112 | /// |
113 | /// [`do_io`]: IoSource::do_io |
114 | impl<T> Deref for IoSource<T> { |
115 | type Target = T; |
116 | |
117 | fn deref(&self) -> &Self::Target { |
118 | &self.inner |
119 | } |
120 | } |
121 | |
122 | /// Be careful when using this method. All I/O operations that may block must go |
123 | /// through the [`do_io`] method. |
124 | /// |
125 | /// [`do_io`]: IoSource::do_io |
126 | impl<T> DerefMut for IoSource<T> { |
127 | fn deref_mut(&mut self) -> &mut Self::Target { |
128 | &mut self.inner |
129 | } |
130 | } |
131 | |
132 | #[cfg (unix)] |
133 | impl<T> event::Source for IoSource<T> |
134 | where |
135 | T: AsRawFd, |
136 | { |
137 | fn register( |
138 | &mut self, |
139 | registry: &Registry, |
140 | token: Token, |
141 | interests: Interest, |
142 | ) -> io::Result<()> { |
143 | #[cfg (debug_assertions)] |
144 | self.selector_id.associate(registry)?; |
145 | self.state |
146 | .register(registry, token, interests, self.inner.as_raw_fd()) |
147 | } |
148 | |
149 | fn reregister( |
150 | &mut self, |
151 | registry: &Registry, |
152 | token: Token, |
153 | interests: Interest, |
154 | ) -> io::Result<()> { |
155 | #[cfg (debug_assertions)] |
156 | self.selector_id.check_association(registry)?; |
157 | self.state |
158 | .reregister(registry, token, interests, self.inner.as_raw_fd()) |
159 | } |
160 | |
161 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
162 | #[cfg (debug_assertions)] |
163 | self.selector_id.remove_association(registry)?; |
164 | self.state.deregister(registry, self.inner.as_raw_fd()) |
165 | } |
166 | } |
167 | |
168 | #[cfg (windows)] |
169 | impl<T> event::Source for IoSource<T> |
170 | where |
171 | T: AsRawSocket, |
172 | { |
173 | fn register( |
174 | &mut self, |
175 | registry: &Registry, |
176 | token: Token, |
177 | interests: Interest, |
178 | ) -> io::Result<()> { |
179 | #[cfg (debug_assertions)] |
180 | self.selector_id.associate(registry)?; |
181 | self.state |
182 | .register(registry, token, interests, self.inner.as_raw_socket()) |
183 | } |
184 | |
185 | fn reregister( |
186 | &mut self, |
187 | registry: &Registry, |
188 | token: Token, |
189 | interests: Interest, |
190 | ) -> io::Result<()> { |
191 | #[cfg (debug_assertions)] |
192 | self.selector_id.check_association(registry)?; |
193 | self.state.reregister(registry, token, interests) |
194 | } |
195 | |
196 | fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { |
197 | #[cfg (debug_assertions)] |
198 | self.selector_id.remove_association(_registry)?; |
199 | self.state.deregister() |
200 | } |
201 | } |
202 | |
203 | #[cfg (target_os = "wasi" )] |
204 | impl<T> event::Source for IoSource<T> |
205 | where |
206 | T: AsRawFd, |
207 | { |
208 | fn register( |
209 | &mut self, |
210 | registry: &Registry, |
211 | token: Token, |
212 | interests: Interest, |
213 | ) -> io::Result<()> { |
214 | #[cfg (debug_assertions)] |
215 | self.selector_id.associate(registry)?; |
216 | registry |
217 | .selector() |
218 | .register(self.inner.as_raw_fd() as _, token, interests) |
219 | } |
220 | |
221 | fn reregister( |
222 | &mut self, |
223 | registry: &Registry, |
224 | token: Token, |
225 | interests: Interest, |
226 | ) -> io::Result<()> { |
227 | #[cfg (debug_assertions)] |
228 | self.selector_id.check_association(registry)?; |
229 | registry |
230 | .selector() |
231 | .reregister(self.inner.as_raw_fd() as _, token, interests) |
232 | } |
233 | |
234 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
235 | #[cfg (debug_assertions)] |
236 | self.selector_id.remove_association(registry)?; |
237 | registry.selector().deregister(self.inner.as_raw_fd() as _) |
238 | } |
239 | } |
240 | |
241 | impl<T> fmt::Debug for IoSource<T> |
242 | where |
243 | T: fmt::Debug, |
244 | { |
245 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
246 | self.inner.fmt(f) |
247 | } |
248 | } |
249 | |
250 | /// Used to associate an `IoSource` with a `sys::Selector`. |
251 | #[cfg (debug_assertions)] |
252 | #[derive (Debug)] |
253 | struct SelectorId { |
254 | id: AtomicUsize, |
255 | } |
256 | |
257 | #[cfg (debug_assertions)] |
258 | impl SelectorId { |
259 | /// Value of `id` if `SelectorId` is not associated with any |
260 | /// `sys::Selector`. Valid selector ids start at 1. |
261 | const UNASSOCIATED: usize = 0; |
262 | |
263 | /// Create a new `SelectorId`. |
264 | const fn new() -> SelectorId { |
265 | SelectorId { |
266 | id: AtomicUsize::new(Self::UNASSOCIATED), |
267 | } |
268 | } |
269 | |
270 | /// Associate an I/O source with `registry`, returning an error if its |
271 | /// already registered. |
272 | fn associate(&self, registry: &Registry) -> io::Result<()> { |
273 | let registry_id = registry.selector().id(); |
274 | let previous_id = self.id.swap(registry_id, Ordering::AcqRel); |
275 | |
276 | if previous_id == Self::UNASSOCIATED { |
277 | Ok(()) |
278 | } else { |
279 | Err(io::Error::new( |
280 | io::ErrorKind::AlreadyExists, |
281 | "I/O source already registered with a `Registry`" , |
282 | )) |
283 | } |
284 | } |
285 | |
286 | /// Check the association of an I/O source with `registry`, returning an |
287 | /// error if its registered with a different `Registry` or not registered at |
288 | /// all. |
289 | fn check_association(&self, registry: &Registry) -> io::Result<()> { |
290 | let registry_id = registry.selector().id(); |
291 | let id = self.id.load(Ordering::Acquire); |
292 | |
293 | if id == registry_id { |
294 | Ok(()) |
295 | } else if id == Self::UNASSOCIATED { |
296 | Err(io::Error::new( |
297 | io::ErrorKind::NotFound, |
298 | "I/O source not registered with `Registry`" , |
299 | )) |
300 | } else { |
301 | Err(io::Error::new( |
302 | io::ErrorKind::AlreadyExists, |
303 | "I/O source already registered with a different `Registry`" , |
304 | )) |
305 | } |
306 | } |
307 | |
308 | /// Remove a previously made association from `registry`, returns an error |
309 | /// if it was not previously associated with `registry`. |
310 | fn remove_association(&self, registry: &Registry) -> io::Result<()> { |
311 | let registry_id = registry.selector().id(); |
312 | let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel); |
313 | |
314 | if previous_id == registry_id { |
315 | Ok(()) |
316 | } else { |
317 | Err(io::Error::new( |
318 | io::ErrorKind::NotFound, |
319 | "I/O source not registered with `Registry`" , |
320 | )) |
321 | } |
322 | } |
323 | } |
324 | |
325 | #[cfg (debug_assertions)] |
326 | impl Clone for SelectorId { |
327 | fn clone(&self) -> SelectorId { |
328 | SelectorId { |
329 | id: AtomicUsize::new(self.id.load(order:Ordering::Acquire)), |
330 | } |
331 | } |
332 | } |
333 | |