1// Signal handling
2cfg_signal_internal_and_unix! {
3 mod signal;
4}
5
6use crate::io::interest::Interest;
7use crate::io::ready::Ready;
8use crate::loom::sync::Mutex;
9use crate::runtime::driver;
10use crate::runtime::io::registration_set;
11use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
12
13use mio::event::Source;
14use std::fmt;
15use std::io;
16use std::sync::Arc;
17use std::time::Duration;
18
19/// I/O driver, backed by Mio.
20pub(crate) struct Driver {
21 /// True when an event with the signal token is received
22 signal_ready: bool,
23
24 /// Reuse the `mio::Events` value across calls to poll.
25 events: mio::Events,
26
27 /// The system event queue.
28 poll: mio::Poll,
29}
30
31/// A reference to an I/O driver.
32pub(crate) struct Handle {
33 /// Registers I/O resources.
34 registry: mio::Registry,
35
36 /// Tracks all registrations
37 registrations: RegistrationSet,
38
39 /// State that should be synchronized
40 synced: Mutex<registration_set::Synced>,
41
42 /// Used to wake up the reactor from a call to `turn`.
43 /// Not supported on `Wasi` due to lack of threading support.
44 #[cfg(not(target_os = "wasi"))]
45 waker: mio::Waker,
46
47 pub(crate) metrics: IoDriverMetrics,
48}
49
50#[derive(Debug)]
51pub(crate) struct ReadyEvent {
52 pub(super) tick: u8,
53 pub(crate) ready: Ready,
54 pub(super) is_shutdown: bool,
55}
56
57cfg_net_unix!(
58 impl ReadyEvent {
59 pub(crate) fn with_ready(&self, ready: Ready) -> Self {
60 Self {
61 ready,
62 tick: self.tick,
63 is_shutdown: self.is_shutdown,
64 }
65 }
66 }
67);
68
69#[derive(Debug, Eq, PartialEq, Clone, Copy)]
70pub(super) enum Direction {
71 Read,
72 Write,
73}
74
75pub(super) enum Tick {
76 Set,
77 Clear(u8),
78}
79
80const TOKEN_WAKEUP: mio::Token = mio::Token(0);
81const TOKEN_SIGNAL: mio::Token = mio::Token(1);
82
83fn _assert_kinds() {
84 fn _assert<T: Send + Sync>() {}
85
86 _assert::<Handle>();
87}
88
89// ===== impl Driver =====
90
91impl Driver {
92 /// Creates a new event loop, returning any error that happened during the
93 /// creation.
94 pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
95 let poll = mio::Poll::new()?;
96 #[cfg(not(target_os = "wasi"))]
97 let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
98 let registry = poll.registry().try_clone()?;
99
100 let driver = Driver {
101 signal_ready: false,
102 events: mio::Events::with_capacity(nevents),
103 poll,
104 };
105
106 let (registrations, synced) = RegistrationSet::new();
107
108 let handle = Handle {
109 registry,
110 registrations,
111 synced: Mutex::new(synced),
112 #[cfg(not(target_os = "wasi"))]
113 waker,
114 metrics: IoDriverMetrics::default(),
115 };
116
117 Ok((driver, handle))
118 }
119
120 pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
121 let handle = rt_handle.io();
122 self.turn(handle, None);
123 }
124
125 pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
126 let handle = rt_handle.io();
127 self.turn(handle, Some(duration));
128 }
129
130 pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
131 let handle = rt_handle.io();
132 let ios = handle.registrations.shutdown(&mut handle.synced.lock());
133
134 // `shutdown()` must be called without holding the lock.
135 for io in ios {
136 io.shutdown();
137 }
138 }
139
140 fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
141 debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
142
143 handle.release_pending_registrations();
144
145 let events = &mut self.events;
146
147 // Block waiting for an event to happen, peeling out how many events
148 // happened.
149 match self.poll.poll(events, max_wait) {
150 Ok(()) => {}
151 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
152 #[cfg(target_os = "wasi")]
153 Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
154 // In case of wasm32_wasi this error happens, when trying to poll without subscriptions
155 // just return from the park, as there would be nothing, which wakes us up.
156 }
157 Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e),
158 }
159
160 // Process all the events that came in, dispatching appropriately
161 let mut ready_count = 0;
162 for event in events.iter() {
163 let token = event.token();
164
165 if token == TOKEN_WAKEUP {
166 // Nothing to do, the event is used to unblock the I/O driver
167 } else if token == TOKEN_SIGNAL {
168 self.signal_ready = true;
169 } else {
170 let ready = Ready::from_mio(event);
171 // Use std::ptr::from_exposed_addr when stable
172 let ptr: *const ScheduledIo = token.0 as *const _;
173
174 // Safety: we ensure that the pointers used as tokens are not freed
175 // until they are both deregistered from mio **and** we know the I/O
176 // driver is not concurrently polling. The I/O driver holds ownership of
177 // an `Arc<ScheduledIo>` so we can safely cast this to a ref.
178 let io: &ScheduledIo = unsafe { &*ptr };
179
180 io.set_readiness(Tick::Set, |curr| curr | ready);
181 io.wake(ready);
182
183 ready_count += 1;
184 }
185 }
186
187 handle.metrics.incr_ready_count_by(ready_count);
188 }
189}
190
191impl fmt::Debug for Driver {
192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193 write!(f, "Driver")
194 }
195}
196
197impl Handle {
198 /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
199 /// makes the next call to `turn` return immediately.
200 ///
201 /// This method is intended to be used in situations where a notification
202 /// needs to otherwise be sent to the main reactor. If the reactor is
203 /// currently blocked inside of `turn` then it will wake up and soon return
204 /// after this method has been called. If the reactor is not currently
205 /// blocked in `turn`, then the next call to `turn` will not block and
206 /// return immediately.
207 pub(crate) fn unpark(&self) {
208 #[cfg(not(target_os = "wasi"))]
209 self.waker.wake().expect("failed to wake I/O driver");
210 }
211
212 /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
213 ///
214 /// The registration token is returned.
215 pub(super) fn add_source(
216 &self,
217 source: &mut impl mio::event::Source,
218 interest: Interest,
219 ) -> io::Result<Arc<ScheduledIo>> {
220 let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
221 let token = scheduled_io.token();
222
223 // TODO: if this returns an err, the `ScheduledIo` leaks...
224 self.registry.register(source, token, interest.to_mio())?;
225
226 // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList`
227 self.metrics.incr_fd_count();
228
229 Ok(scheduled_io)
230 }
231
232 /// Deregisters an I/O resource from the reactor.
233 pub(super) fn deregister_source(
234 &self,
235 registration: &Arc<ScheduledIo>,
236 source: &mut impl Source,
237 ) -> io::Result<()> {
238 // Deregister the source with the OS poller **first**
239 self.registry.deregister(source)?;
240
241 if self
242 .registrations
243 .deregister(&mut self.synced.lock(), registration)
244 {
245 self.unpark();
246 }
247
248 self.metrics.dec_fd_count();
249
250 Ok(())
251 }
252
253 fn release_pending_registrations(&self) {
254 if self.registrations.needs_release() {
255 self.registrations.release(&mut self.synced.lock());
256 }
257 }
258}
259
260impl fmt::Debug for Handle {
261 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262 write!(f, "Handle")
263 }
264}
265
266impl Direction {
267 pub(super) fn mask(self) -> Ready {
268 match self {
269 Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
270 Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
271 }
272 }
273}
274