1 | use std::ops::{Deref, DerefMut}; |
2 | #[cfg (any(unix, target_os = "wasi" ))] |
3 | use std::os::fd::AsRawFd; |
4 | // TODO: once <https://github.com/rust-lang/rust/issues/126198> is fixed this |
5 | // can use `std::os::fd` and be merged with the above. |
6 | #[cfg (target_os = "hermit" )] |
7 | use std::os::hermit::io::AsRawFd; |
8 | #[cfg (windows)] |
9 | use std::os::windows::io::AsRawSocket; |
10 | #[cfg (debug_assertions)] |
11 | use std::sync::atomic::{AtomicUsize, Ordering}; |
12 | use std::{fmt, io}; |
13 | |
14 | use crate::sys::IoSourceState; |
15 | use crate::{event, Interest, Registry, Token}; |
16 | |
17 | /// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`] |
18 | /// implementation. |
19 | /// |
20 | /// `IoSource` enables registering any FD or socket wrapper with [`Poll`]. |
21 | /// |
22 | /// While only implementations for TCP, UDP, and UDS (Unix only) are provided, |
23 | /// Mio supports registering any FD or socket that can be registered with the |
24 | /// underlying OS selector. `IoSource` provides the necessary bridge. |
25 | /// |
26 | /// [`RawFd`]: std::os::fd::RawFd |
27 | /// [`RawSocket`]: std::os::windows::io::RawSocket |
28 | /// |
29 | /// # Notes |
30 | /// |
31 | /// To handle the registrations and events properly **all** I/O operations (such |
32 | /// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the |
33 | /// internal state is updated accordingly. |
34 | /// |
35 | /// [`Poll`]: crate::Poll |
36 | /// [`do_io`]: IoSource::do_io |
37 | pub struct IoSource<T> { |
38 | state: IoSourceState, |
39 | inner: T, |
40 | #[cfg (debug_assertions)] |
41 | selector_id: SelectorId, |
42 | } |
43 | |
44 | impl<T> IoSource<T> { |
45 | /// Create a new `IoSource`. |
46 | pub fn new(io: T) -> IoSource<T> { |
47 | IoSource { |
48 | state: IoSourceState::new(), |
49 | inner: io, |
50 | #[cfg (debug_assertions)] |
51 | selector_id: SelectorId::new(), |
52 | } |
53 | } |
54 | |
55 | /// Execute an I/O operations ensuring that the socket receives more events |
56 | /// if it hits a [`WouldBlock`] error. |
57 | /// |
58 | /// # Notes |
59 | /// |
60 | /// This method is required to be called for **all** I/O operations to |
61 | /// ensure the user will receive events once the socket is ready again after |
62 | /// returning a [`WouldBlock`] error. |
63 | /// |
64 | /// [`WouldBlock`]: io::ErrorKind::WouldBlock |
65 | pub fn do_io<F, R>(&self, f: F) -> io::Result<R> |
66 | where |
67 | F: FnOnce(&T) -> io::Result<R>, |
68 | { |
69 | self.state.do_io(f, &self.inner) |
70 | } |
71 | |
72 | /// Returns the I/O source, dropping the state. |
73 | /// |
74 | /// # Notes |
75 | /// |
76 | /// To ensure no more events are to be received for this I/O source first |
77 | /// [`deregister`] it. |
78 | /// |
79 | /// [`deregister`]: Registry::deregister |
80 | pub fn into_inner(self) -> T { |
81 | self.inner |
82 | } |
83 | } |
84 | |
85 | /// Be careful when using this method. All I/O operations that may block must go |
86 | /// through the [`do_io`] method. |
87 | /// |
88 | /// [`do_io`]: IoSource::do_io |
89 | impl<T> Deref for IoSource<T> { |
90 | type Target = T; |
91 | |
92 | fn deref(&self) -> &Self::Target { |
93 | &self.inner |
94 | } |
95 | } |
96 | |
97 | /// Be careful when using this method. All I/O operations that may block must go |
98 | /// through the [`do_io`] method. |
99 | /// |
100 | /// [`do_io`]: IoSource::do_io |
101 | impl<T> DerefMut for IoSource<T> { |
102 | fn deref_mut(&mut self) -> &mut Self::Target { |
103 | &mut self.inner |
104 | } |
105 | } |
106 | |
107 | #[cfg (any(unix, target_os = "hermit" ))] |
108 | impl<T> event::Source for IoSource<T> |
109 | where |
110 | T: AsRawFd, |
111 | { |
112 | fn register( |
113 | &mut self, |
114 | registry: &Registry, |
115 | token: Token, |
116 | interests: Interest, |
117 | ) -> io::Result<()> { |
118 | #[cfg (debug_assertions)] |
119 | self.selector_id.associate(registry)?; |
120 | self.state |
121 | .register(registry, token, interests, self.inner.as_raw_fd()) |
122 | } |
123 | |
124 | fn reregister( |
125 | &mut self, |
126 | registry: &Registry, |
127 | token: Token, |
128 | interests: Interest, |
129 | ) -> io::Result<()> { |
130 | #[cfg (debug_assertions)] |
131 | self.selector_id.check_association(registry)?; |
132 | self.state |
133 | .reregister(registry, token, interests, self.inner.as_raw_fd()) |
134 | } |
135 | |
136 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
137 | #[cfg (debug_assertions)] |
138 | self.selector_id.remove_association(registry)?; |
139 | self.state.deregister(registry, self.inner.as_raw_fd()) |
140 | } |
141 | } |
142 | |
143 | #[cfg (windows)] |
144 | impl<T> event::Source for IoSource<T> |
145 | where |
146 | T: AsRawSocket, |
147 | { |
148 | fn register( |
149 | &mut self, |
150 | registry: &Registry, |
151 | token: Token, |
152 | interests: Interest, |
153 | ) -> io::Result<()> { |
154 | #[cfg (debug_assertions)] |
155 | self.selector_id.associate(registry)?; |
156 | self.state |
157 | .register(registry, token, interests, self.inner.as_raw_socket()) |
158 | } |
159 | |
160 | fn reregister( |
161 | &mut self, |
162 | registry: &Registry, |
163 | token: Token, |
164 | interests: Interest, |
165 | ) -> io::Result<()> { |
166 | #[cfg (debug_assertions)] |
167 | self.selector_id.check_association(registry)?; |
168 | self.state.reregister(registry, token, interests) |
169 | } |
170 | |
171 | fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { |
172 | #[cfg (debug_assertions)] |
173 | self.selector_id.remove_association(_registry)?; |
174 | self.state.deregister() |
175 | } |
176 | } |
177 | |
178 | #[cfg (target_os = "wasi" )] |
179 | impl<T> event::Source for IoSource<T> |
180 | where |
181 | T: AsRawFd, |
182 | { |
183 | fn register( |
184 | &mut self, |
185 | registry: &Registry, |
186 | token: Token, |
187 | interests: Interest, |
188 | ) -> io::Result<()> { |
189 | #[cfg (debug_assertions)] |
190 | self.selector_id.associate(registry)?; |
191 | registry |
192 | .selector() |
193 | .register(self.inner.as_raw_fd() as _, token, interests) |
194 | } |
195 | |
196 | fn reregister( |
197 | &mut self, |
198 | registry: &Registry, |
199 | token: Token, |
200 | interests: Interest, |
201 | ) -> io::Result<()> { |
202 | #[cfg (debug_assertions)] |
203 | self.selector_id.check_association(registry)?; |
204 | registry |
205 | .selector() |
206 | .reregister(self.inner.as_raw_fd() as _, token, interests) |
207 | } |
208 | |
209 | fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
210 | #[cfg (debug_assertions)] |
211 | self.selector_id.remove_association(registry)?; |
212 | registry.selector().deregister(self.inner.as_raw_fd() as _) |
213 | } |
214 | } |
215 | |
216 | impl<T> fmt::Debug for IoSource<T> |
217 | where |
218 | T: fmt::Debug, |
219 | { |
220 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
221 | self.inner.fmt(f) |
222 | } |
223 | } |
224 | |
225 | /// Used to associate an `IoSource` with a `sys::Selector`. |
226 | #[cfg (debug_assertions)] |
227 | #[derive (Debug)] |
228 | struct SelectorId { |
229 | id: AtomicUsize, |
230 | } |
231 | |
232 | #[cfg (debug_assertions)] |
233 | impl SelectorId { |
234 | /// Value of `id` if `SelectorId` is not associated with any |
235 | /// `sys::Selector`. Valid selector ids start at 1. |
236 | const UNASSOCIATED: usize = 0; |
237 | |
238 | /// Create a new `SelectorId`. |
239 | const fn new() -> SelectorId { |
240 | SelectorId { |
241 | id: AtomicUsize::new(Self::UNASSOCIATED), |
242 | } |
243 | } |
244 | |
245 | /// Associate an I/O source with `registry`, returning an error if its |
246 | /// already registered. |
247 | fn associate(&self, registry: &Registry) -> io::Result<()> { |
248 | let registry_id = registry.selector().id(); |
249 | let previous_id = self.id.swap(registry_id, Ordering::AcqRel); |
250 | |
251 | if previous_id == Self::UNASSOCIATED { |
252 | Ok(()) |
253 | } else { |
254 | Err(io::Error::new( |
255 | io::ErrorKind::AlreadyExists, |
256 | "I/O source already registered with a `Registry`" , |
257 | )) |
258 | } |
259 | } |
260 | |
261 | /// Check the association of an I/O source with `registry`, returning an |
262 | /// error if its registered with a different `Registry` or not registered at |
263 | /// all. |
264 | fn check_association(&self, registry: &Registry) -> io::Result<()> { |
265 | let registry_id = registry.selector().id(); |
266 | let id = self.id.load(Ordering::Acquire); |
267 | |
268 | if id == registry_id { |
269 | Ok(()) |
270 | } else if id == Self::UNASSOCIATED { |
271 | Err(io::Error::new( |
272 | io::ErrorKind::NotFound, |
273 | "I/O source not registered with `Registry`" , |
274 | )) |
275 | } else { |
276 | Err(io::Error::new( |
277 | io::ErrorKind::AlreadyExists, |
278 | "I/O source already registered with a different `Registry`" , |
279 | )) |
280 | } |
281 | } |
282 | |
283 | /// Remove a previously made association from `registry`, returns an error |
284 | /// if it was not previously associated with `registry`. |
285 | fn remove_association(&self, registry: &Registry) -> io::Result<()> { |
286 | let registry_id = registry.selector().id(); |
287 | let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel); |
288 | |
289 | if previous_id == registry_id { |
290 | Ok(()) |
291 | } else { |
292 | Err(io::Error::new( |
293 | io::ErrorKind::NotFound, |
294 | "I/O source not registered with `Registry`" , |
295 | )) |
296 | } |
297 | } |
298 | } |
299 | |
300 | #[cfg (debug_assertions)] |
301 | impl Clone for SelectorId { |
302 | fn clone(&self) -> SelectorId { |
303 | SelectorId { |
304 | id: AtomicUsize::new(self.id.load(Ordering::Acquire)), |
305 | } |
306 | } |
307 | } |
308 | |