1 | // Signal handling |
2 | cfg_signal_internal_and_unix! { |
3 | mod signal; |
4 | } |
5 | |
6 | use crate::io::interest::Interest; |
7 | use crate::io::ready::Ready; |
8 | use crate::loom::sync::Mutex; |
9 | use crate::runtime::driver; |
10 | use crate::runtime::io::registration_set; |
11 | use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo}; |
12 | |
13 | use mio::event::Source; |
14 | use std::fmt; |
15 | use std::io; |
16 | use std::sync::Arc; |
17 | use std::time::Duration; |
18 | |
19 | /// I/O driver, backed by Mio. |
20 | pub(crate) struct Driver { |
21 | /// True when an event with the signal token is received |
22 | signal_ready: bool, |
23 | |
24 | /// Reuse the `mio::Events` value across calls to poll. |
25 | events: mio::Events, |
26 | |
27 | /// The system event queue. |
28 | poll: mio::Poll, |
29 | } |
30 | |
31 | /// A reference to an I/O driver. |
32 | pub(crate) struct Handle { |
33 | /// Registers I/O resources. |
34 | registry: mio::Registry, |
35 | |
36 | /// Tracks all registrations |
37 | registrations: RegistrationSet, |
38 | |
39 | /// State that should be synchronized |
40 | synced: Mutex<registration_set::Synced>, |
41 | |
42 | /// Used to wake up the reactor from a call to `turn`. |
43 | /// Not supported on `Wasi` due to lack of threading support. |
44 | #[cfg (not(target_os = "wasi" ))] |
45 | waker: mio::Waker, |
46 | |
47 | pub(crate) metrics: IoDriverMetrics, |
48 | } |
49 | |
50 | #[derive (Debug)] |
51 | pub(crate) struct ReadyEvent { |
52 | pub(super) tick: u8, |
53 | pub(crate) ready: Ready, |
54 | pub(super) is_shutdown: bool, |
55 | } |
56 | |
57 | cfg_net_unix!( |
58 | impl ReadyEvent { |
59 | pub(crate) fn with_ready(&self, ready: Ready) -> Self { |
60 | Self { |
61 | ready, |
62 | tick: self.tick, |
63 | is_shutdown: self.is_shutdown, |
64 | } |
65 | } |
66 | } |
67 | ); |
68 | |
69 | #[derive (Debug, Eq, PartialEq, Clone, Copy)] |
70 | pub(super) enum Direction { |
71 | Read, |
72 | Write, |
73 | } |
74 | |
75 | pub(super) enum Tick { |
76 | Set, |
77 | Clear(u8), |
78 | } |
79 | |
80 | const TOKEN_WAKEUP: mio::Token = mio::Token(0); |
81 | const TOKEN_SIGNAL: mio::Token = mio::Token(1); |
82 | |
83 | fn _assert_kinds() { |
84 | fn _assert<T: Send + Sync>() {} |
85 | |
86 | _assert::<Handle>(); |
87 | } |
88 | |
89 | // ===== impl Driver ===== |
90 | |
91 | impl Driver { |
92 | /// Creates a new event loop, returning any error that happened during the |
93 | /// creation. |
94 | pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> { |
95 | let poll = mio::Poll::new()?; |
96 | #[cfg (not(target_os = "wasi" ))] |
97 | let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; |
98 | let registry = poll.registry().try_clone()?; |
99 | |
100 | let driver = Driver { |
101 | signal_ready: false, |
102 | events: mio::Events::with_capacity(nevents), |
103 | poll, |
104 | }; |
105 | |
106 | let (registrations, synced) = RegistrationSet::new(); |
107 | |
108 | let handle = Handle { |
109 | registry, |
110 | registrations, |
111 | synced: Mutex::new(synced), |
112 | #[cfg (not(target_os = "wasi" ))] |
113 | waker, |
114 | metrics: IoDriverMetrics::default(), |
115 | }; |
116 | |
117 | Ok((driver, handle)) |
118 | } |
119 | |
120 | pub(crate) fn park(&mut self, rt_handle: &driver::Handle) { |
121 | let handle = rt_handle.io(); |
122 | self.turn(handle, None); |
123 | } |
124 | |
125 | pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { |
126 | let handle = rt_handle.io(); |
127 | self.turn(handle, Some(duration)); |
128 | } |
129 | |
130 | pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { |
131 | let handle = rt_handle.io(); |
132 | let ios = handle.registrations.shutdown(&mut handle.synced.lock()); |
133 | |
134 | // `shutdown()` must be called without holding the lock. |
135 | for io in ios { |
136 | io.shutdown(); |
137 | } |
138 | } |
139 | |
140 | fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) { |
141 | debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock())); |
142 | |
143 | handle.release_pending_registrations(); |
144 | |
145 | let events = &mut self.events; |
146 | |
147 | // Block waiting for an event to happen, peeling out how many events |
148 | // happened. |
149 | match self.poll.poll(events, max_wait) { |
150 | Ok(()) => {} |
151 | Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} |
152 | #[cfg (target_os = "wasi" )] |
153 | Err(e) if e.kind() == io::ErrorKind::InvalidInput => { |
154 | // In case of wasm32_wasi this error happens, when trying to poll without subscriptions |
155 | // just return from the park, as there would be nothing, which wakes us up. |
156 | } |
157 | Err(e) => panic!("unexpected error when polling the I/O driver: {:?}" , e), |
158 | } |
159 | |
160 | // Process all the events that came in, dispatching appropriately |
161 | let mut ready_count = 0; |
162 | for event in events.iter() { |
163 | let token = event.token(); |
164 | |
165 | if token == TOKEN_WAKEUP { |
166 | // Nothing to do, the event is used to unblock the I/O driver |
167 | } else if token == TOKEN_SIGNAL { |
168 | self.signal_ready = true; |
169 | } else { |
170 | let ready = Ready::from_mio(event); |
171 | // Use std::ptr::from_exposed_addr when stable |
172 | let ptr: *const ScheduledIo = token.0 as *const _; |
173 | |
174 | // Safety: we ensure that the pointers used as tokens are not freed |
175 | // until they are both deregistered from mio **and** we know the I/O |
176 | // driver is not concurrently polling. The I/O driver holds ownership of |
177 | // an `Arc<ScheduledIo>` so we can safely cast this to a ref. |
178 | let io: &ScheduledIo = unsafe { &*ptr }; |
179 | |
180 | io.set_readiness(Tick::Set, |curr| curr | ready); |
181 | io.wake(ready); |
182 | |
183 | ready_count += 1; |
184 | } |
185 | } |
186 | |
187 | handle.metrics.incr_ready_count_by(ready_count); |
188 | } |
189 | } |
190 | |
191 | impl fmt::Debug for Driver { |
192 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
193 | write!(f, "Driver" ) |
194 | } |
195 | } |
196 | |
197 | impl Handle { |
198 | /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise |
199 | /// makes the next call to `turn` return immediately. |
200 | /// |
201 | /// This method is intended to be used in situations where a notification |
202 | /// needs to otherwise be sent to the main reactor. If the reactor is |
203 | /// currently blocked inside of `turn` then it will wake up and soon return |
204 | /// after this method has been called. If the reactor is not currently |
205 | /// blocked in `turn`, then the next call to `turn` will not block and |
206 | /// return immediately. |
207 | pub(crate) fn unpark(&self) { |
208 | #[cfg (not(target_os = "wasi" ))] |
209 | self.waker.wake().expect("failed to wake I/O driver" ); |
210 | } |
211 | |
212 | /// Registers an I/O resource with the reactor for a given `mio::Ready` state. |
213 | /// |
214 | /// The registration token is returned. |
215 | pub(super) fn add_source( |
216 | &self, |
217 | source: &mut impl mio::event::Source, |
218 | interest: Interest, |
219 | ) -> io::Result<Arc<ScheduledIo>> { |
220 | let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?; |
221 | let token = scheduled_io.token(); |
222 | |
223 | // TODO: if this returns an err, the `ScheduledIo` leaks... |
224 | self.registry.register(source, token, interest.to_mio())?; |
225 | |
226 | // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList` |
227 | self.metrics.incr_fd_count(); |
228 | |
229 | Ok(scheduled_io) |
230 | } |
231 | |
232 | /// Deregisters an I/O resource from the reactor. |
233 | pub(super) fn deregister_source( |
234 | &self, |
235 | registration: &Arc<ScheduledIo>, |
236 | source: &mut impl Source, |
237 | ) -> io::Result<()> { |
238 | // Deregister the source with the OS poller **first** |
239 | self.registry.deregister(source)?; |
240 | |
241 | if self |
242 | .registrations |
243 | .deregister(&mut self.synced.lock(), registration) |
244 | { |
245 | self.unpark(); |
246 | } |
247 | |
248 | self.metrics.dec_fd_count(); |
249 | |
250 | Ok(()) |
251 | } |
252 | |
253 | fn release_pending_registrations(&self) { |
254 | if self.registrations.needs_release() { |
255 | self.registrations.release(&mut self.synced.lock()); |
256 | } |
257 | } |
258 | } |
259 | |
260 | impl fmt::Debug for Handle { |
261 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
262 | write!(f, "Handle" ) |
263 | } |
264 | } |
265 | |
266 | impl Direction { |
267 | pub(super) fn mask(self) -> Ready { |
268 | match self { |
269 | Direction::Read => Ready::READABLE | Ready::READ_CLOSED, |
270 | Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, |
271 | } |
272 | } |
273 | } |
274 | |