1 | // Signal handling |
2 | cfg_signal_internal_and_unix! { |
3 | mod signal; |
4 | } |
5 | |
6 | use crate::io::interest::Interest; |
7 | use crate::io::ready::Ready; |
8 | use crate::loom::sync::Mutex; |
9 | use crate::runtime::driver; |
10 | use crate::runtime::io::registration_set; |
11 | use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo}; |
12 | |
13 | use mio::event::Source; |
14 | use std::fmt; |
15 | use std::io; |
16 | use std::sync::Arc; |
17 | use std::time::Duration; |
18 | |
19 | /// I/O driver, backed by Mio. |
20 | pub(crate) struct Driver { |
21 | /// True when an event with the signal token is received |
22 | signal_ready: bool, |
23 | |
24 | /// Reuse the `mio::Events` value across calls to poll. |
25 | events: mio::Events, |
26 | |
27 | /// The system event queue. |
28 | poll: mio::Poll, |
29 | } |
30 | |
31 | /// A reference to an I/O driver. |
32 | pub(crate) struct Handle { |
33 | /// Registers I/O resources. |
34 | registry: mio::Registry, |
35 | |
36 | /// Tracks all registrations |
37 | registrations: RegistrationSet, |
38 | |
39 | /// State that should be synchronized |
40 | synced: Mutex<registration_set::Synced>, |
41 | |
42 | /// Used to wake up the reactor from a call to `turn`. |
43 | /// Not supported on `Wasi` due to lack of threading support. |
44 | #[cfg (not(target_os = "wasi" ))] |
45 | waker: mio::Waker, |
46 | |
47 | pub(crate) metrics: IoDriverMetrics, |
48 | } |
49 | |
50 | #[derive (Debug)] |
51 | pub(crate) struct ReadyEvent { |
52 | pub(super) tick: u8, |
53 | pub(crate) ready: Ready, |
54 | pub(super) is_shutdown: bool, |
55 | } |
56 | |
57 | cfg_net_unix!( |
58 | impl ReadyEvent { |
59 | pub(crate) fn with_ready(&self, ready: Ready) -> Self { |
60 | Self { |
61 | ready, |
62 | tick: self.tick, |
63 | is_shutdown: self.is_shutdown, |
64 | } |
65 | } |
66 | } |
67 | ); |
68 | |
69 | #[derive (Debug, Eq, PartialEq, Clone, Copy)] |
70 | pub(super) enum Direction { |
71 | Read, |
72 | Write, |
73 | } |
74 | |
75 | pub(super) enum Tick { |
76 | Set, |
77 | Clear(u8), |
78 | } |
79 | |
80 | const TOKEN_WAKEUP: mio::Token = mio::Token(0); |
81 | const TOKEN_SIGNAL: mio::Token = mio::Token(1); |
82 | |
83 | fn _assert_kinds() { |
84 | fn _assert<T: Send + Sync>() {} |
85 | |
86 | _assert::<Handle>(); |
87 | } |
88 | |
89 | // ===== impl Driver ===== |
90 | |
91 | impl Driver { |
92 | /// Creates a new event loop, returning any error that happened during the |
93 | /// creation. |
94 | pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> { |
95 | let poll = mio::Poll::new()?; |
96 | #[cfg (not(target_os = "wasi" ))] |
97 | let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; |
98 | let registry = poll.registry().try_clone()?; |
99 | |
100 | let driver = Driver { |
101 | signal_ready: false, |
102 | events: mio::Events::with_capacity(nevents), |
103 | poll, |
104 | }; |
105 | |
106 | let (registrations, synced) = RegistrationSet::new(); |
107 | |
108 | let handle = Handle { |
109 | registry, |
110 | registrations, |
111 | synced: Mutex::new(synced), |
112 | #[cfg (not(target_os = "wasi" ))] |
113 | waker, |
114 | metrics: IoDriverMetrics::default(), |
115 | }; |
116 | |
117 | Ok((driver, handle)) |
118 | } |
119 | |
120 | pub(crate) fn park(&mut self, rt_handle: &driver::Handle) { |
121 | let handle = rt_handle.io(); |
122 | self.turn(handle, None); |
123 | } |
124 | |
125 | pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { |
126 | let handle = rt_handle.io(); |
127 | self.turn(handle, Some(duration)); |
128 | } |
129 | |
130 | pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { |
131 | let handle = rt_handle.io(); |
132 | let ios = handle.registrations.shutdown(&mut handle.synced.lock()); |
133 | |
134 | // `shutdown()` must be called without holding the lock. |
135 | for io in ios { |
136 | io.shutdown(); |
137 | } |
138 | } |
139 | |
140 | fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) { |
141 | debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock())); |
142 | |
143 | handle.release_pending_registrations(); |
144 | |
145 | let events = &mut self.events; |
146 | |
147 | // Block waiting for an event to happen, peeling out how many events |
148 | // happened. |
149 | match self.poll.poll(events, max_wait) { |
150 | Ok(()) => {} |
151 | Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} |
152 | #[cfg (target_os = "wasi" )] |
153 | Err(e) if e.kind() == io::ErrorKind::InvalidInput => { |
154 | // In case of wasm32_wasi this error happens, when trying to poll without subscriptions |
155 | // just return from the park, as there would be nothing, which wakes us up. |
156 | } |
157 | Err(e) => panic!("unexpected error when polling the I/O driver: {e:?}" ), |
158 | } |
159 | |
160 | // Process all the events that came in, dispatching appropriately |
161 | let mut ready_count = 0; |
162 | for event in events.iter() { |
163 | let token = event.token(); |
164 | |
165 | if token == TOKEN_WAKEUP { |
166 | // Nothing to do, the event is used to unblock the I/O driver |
167 | } else if token == TOKEN_SIGNAL { |
168 | self.signal_ready = true; |
169 | } else { |
170 | let ready = Ready::from_mio(event); |
171 | let ptr = super::EXPOSE_IO.from_exposed_addr(token.0); |
172 | |
173 | // Safety: we ensure that the pointers used as tokens are not freed |
174 | // until they are both deregistered from mio **and** we know the I/O |
175 | // driver is not concurrently polling. The I/O driver holds ownership of |
176 | // an `Arc<ScheduledIo>` so we can safely cast this to a ref. |
177 | let io: &ScheduledIo = unsafe { &*ptr }; |
178 | |
179 | io.set_readiness(Tick::Set, |curr| curr | ready); |
180 | io.wake(ready); |
181 | |
182 | ready_count += 1; |
183 | } |
184 | } |
185 | |
186 | handle.metrics.incr_ready_count_by(ready_count); |
187 | } |
188 | } |
189 | |
190 | impl fmt::Debug for Driver { |
191 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
192 | write!(f, "Driver" ) |
193 | } |
194 | } |
195 | |
196 | impl Handle { |
197 | /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise |
198 | /// makes the next call to `turn` return immediately. |
199 | /// |
200 | /// This method is intended to be used in situations where a notification |
201 | /// needs to otherwise be sent to the main reactor. If the reactor is |
202 | /// currently blocked inside of `turn` then it will wake up and soon return |
203 | /// after this method has been called. If the reactor is not currently |
204 | /// blocked in `turn`, then the next call to `turn` will not block and |
205 | /// return immediately. |
206 | pub(crate) fn unpark(&self) { |
207 | #[cfg (not(target_os = "wasi" ))] |
208 | self.waker.wake().expect("failed to wake I/O driver" ); |
209 | } |
210 | |
211 | /// Registers an I/O resource with the reactor for a given `mio::Ready` state. |
212 | /// |
213 | /// The registration token is returned. |
214 | pub(super) fn add_source( |
215 | &self, |
216 | source: &mut impl mio::event::Source, |
217 | interest: Interest, |
218 | ) -> io::Result<Arc<ScheduledIo>> { |
219 | let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?; |
220 | let token = scheduled_io.token(); |
221 | |
222 | // we should remove the `scheduled_io` from the `registrations` set if registering |
223 | // the `source` with the OS fails. Otherwise it will leak the `scheduled_io`. |
224 | if let Err(e) = self.registry.register(source, token, interest.to_mio()) { |
225 | // safety: `scheduled_io` is part of the `registrations` set. |
226 | unsafe { |
227 | self.registrations |
228 | .remove(&mut self.synced.lock(), &scheduled_io) |
229 | }; |
230 | |
231 | return Err(e); |
232 | } |
233 | |
234 | // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList` |
235 | self.metrics.incr_fd_count(); |
236 | |
237 | Ok(scheduled_io) |
238 | } |
239 | |
240 | /// Deregisters an I/O resource from the reactor. |
241 | pub(super) fn deregister_source( |
242 | &self, |
243 | registration: &Arc<ScheduledIo>, |
244 | source: &mut impl Source, |
245 | ) -> io::Result<()> { |
246 | // Deregister the source with the OS poller **first** |
247 | self.registry.deregister(source)?; |
248 | |
249 | if self |
250 | .registrations |
251 | .deregister(&mut self.synced.lock(), registration) |
252 | { |
253 | self.unpark(); |
254 | } |
255 | |
256 | self.metrics.dec_fd_count(); |
257 | |
258 | Ok(()) |
259 | } |
260 | |
261 | fn release_pending_registrations(&self) { |
262 | if self.registrations.needs_release() { |
263 | self.registrations.release(&mut self.synced.lock()); |
264 | } |
265 | } |
266 | } |
267 | |
268 | impl fmt::Debug for Handle { |
269 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
270 | write!(f, "Handle" ) |
271 | } |
272 | } |
273 | |
274 | impl Direction { |
275 | pub(super) fn mask(self) -> Ready { |
276 | match self { |
277 | Direction::Read => Ready::READABLE | Ready::READ_CLOSED, |
278 | Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, |
279 | } |
280 | } |
281 | } |
282 | |