| 1 | // Signal handling |
| 2 | cfg_signal_internal_and_unix! { |
| 3 | mod signal; |
| 4 | } |
| 5 | |
| 6 | use crate::io::interest::Interest; |
| 7 | use crate::io::ready::Ready; |
| 8 | use crate::loom::sync::Mutex; |
| 9 | use crate::runtime::driver; |
| 10 | use crate::runtime::io::registration_set; |
| 11 | use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo}; |
| 12 | |
| 13 | use mio::event::Source; |
| 14 | use std::fmt; |
| 15 | use std::io; |
| 16 | use std::sync::Arc; |
| 17 | use std::time::Duration; |
| 18 | |
| 19 | /// I/O driver, backed by Mio. |
| 20 | pub(crate) struct Driver { |
| 21 | /// True when an event with the signal token is received |
| 22 | signal_ready: bool, |
| 23 | |
| 24 | /// Reuse the `mio::Events` value across calls to poll. |
| 25 | events: mio::Events, |
| 26 | |
| 27 | /// The system event queue. |
| 28 | poll: mio::Poll, |
| 29 | } |
| 30 | |
| 31 | /// A reference to an I/O driver. |
| 32 | pub(crate) struct Handle { |
| 33 | /// Registers I/O resources. |
| 34 | registry: mio::Registry, |
| 35 | |
| 36 | /// Tracks all registrations |
| 37 | registrations: RegistrationSet, |
| 38 | |
| 39 | /// State that should be synchronized |
| 40 | synced: Mutex<registration_set::Synced>, |
| 41 | |
| 42 | /// Used to wake up the reactor from a call to `turn`. |
| 43 | /// Not supported on `Wasi` due to lack of threading support. |
| 44 | #[cfg (not(target_os = "wasi" ))] |
| 45 | waker: mio::Waker, |
| 46 | |
| 47 | pub(crate) metrics: IoDriverMetrics, |
| 48 | } |
| 49 | |
| 50 | #[derive (Debug)] |
| 51 | pub(crate) struct ReadyEvent { |
| 52 | pub(super) tick: u8, |
| 53 | pub(crate) ready: Ready, |
| 54 | pub(super) is_shutdown: bool, |
| 55 | } |
| 56 | |
| 57 | cfg_net_unix!( |
| 58 | impl ReadyEvent { |
| 59 | pub(crate) fn with_ready(&self, ready: Ready) -> Self { |
| 60 | Self { |
| 61 | ready, |
| 62 | tick: self.tick, |
| 63 | is_shutdown: self.is_shutdown, |
| 64 | } |
| 65 | } |
| 66 | } |
| 67 | ); |
| 68 | |
| 69 | #[derive (Debug, Eq, PartialEq, Clone, Copy)] |
| 70 | pub(super) enum Direction { |
| 71 | Read, |
| 72 | Write, |
| 73 | } |
| 74 | |
| 75 | pub(super) enum Tick { |
| 76 | Set, |
| 77 | Clear(u8), |
| 78 | } |
| 79 | |
| 80 | const TOKEN_WAKEUP: mio::Token = mio::Token(0); |
| 81 | const TOKEN_SIGNAL: mio::Token = mio::Token(1); |
| 82 | |
| 83 | fn _assert_kinds() { |
| 84 | fn _assert<T: Send + Sync>() {} |
| 85 | |
| 86 | _assert::<Handle>(); |
| 87 | } |
| 88 | |
| 89 | // ===== impl Driver ===== |
| 90 | |
| 91 | impl Driver { |
| 92 | /// Creates a new event loop, returning any error that happened during the |
| 93 | /// creation. |
| 94 | pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> { |
| 95 | let poll = mio::Poll::new()?; |
| 96 | #[cfg (not(target_os = "wasi" ))] |
| 97 | let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; |
| 98 | let registry = poll.registry().try_clone()?; |
| 99 | |
| 100 | let driver = Driver { |
| 101 | signal_ready: false, |
| 102 | events: mio::Events::with_capacity(nevents), |
| 103 | poll, |
| 104 | }; |
| 105 | |
| 106 | let (registrations, synced) = RegistrationSet::new(); |
| 107 | |
| 108 | let handle = Handle { |
| 109 | registry, |
| 110 | registrations, |
| 111 | synced: Mutex::new(synced), |
| 112 | #[cfg (not(target_os = "wasi" ))] |
| 113 | waker, |
| 114 | metrics: IoDriverMetrics::default(), |
| 115 | }; |
| 116 | |
| 117 | Ok((driver, handle)) |
| 118 | } |
| 119 | |
| 120 | pub(crate) fn park(&mut self, rt_handle: &driver::Handle) { |
| 121 | let handle = rt_handle.io(); |
| 122 | self.turn(handle, None); |
| 123 | } |
| 124 | |
| 125 | pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { |
| 126 | let handle = rt_handle.io(); |
| 127 | self.turn(handle, Some(duration)); |
| 128 | } |
| 129 | |
| 130 | pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { |
| 131 | let handle = rt_handle.io(); |
| 132 | let ios = handle.registrations.shutdown(&mut handle.synced.lock()); |
| 133 | |
| 134 | // `shutdown()` must be called without holding the lock. |
| 135 | for io in ios { |
| 136 | io.shutdown(); |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) { |
| 141 | debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock())); |
| 142 | |
| 143 | handle.release_pending_registrations(); |
| 144 | |
| 145 | let events = &mut self.events; |
| 146 | |
| 147 | // Block waiting for an event to happen, peeling out how many events |
| 148 | // happened. |
| 149 | match self.poll.poll(events, max_wait) { |
| 150 | Ok(()) => {} |
| 151 | Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} |
| 152 | #[cfg (target_os = "wasi" )] |
| 153 | Err(e) if e.kind() == io::ErrorKind::InvalidInput => { |
| 154 | // In case of wasm32_wasi this error happens, when trying to poll without subscriptions |
| 155 | // just return from the park, as there would be nothing, which wakes us up. |
| 156 | } |
| 157 | Err(e) => panic!("unexpected error when polling the I/O driver: {e:?}" ), |
| 158 | } |
| 159 | |
| 160 | // Process all the events that came in, dispatching appropriately |
| 161 | let mut ready_count = 0; |
| 162 | for event in events.iter() { |
| 163 | let token = event.token(); |
| 164 | |
| 165 | if token == TOKEN_WAKEUP { |
| 166 | // Nothing to do, the event is used to unblock the I/O driver |
| 167 | } else if token == TOKEN_SIGNAL { |
| 168 | self.signal_ready = true; |
| 169 | } else { |
| 170 | let ready = Ready::from_mio(event); |
| 171 | let ptr = super::EXPOSE_IO.from_exposed_addr(token.0); |
| 172 | |
| 173 | // Safety: we ensure that the pointers used as tokens are not freed |
| 174 | // until they are both deregistered from mio **and** we know the I/O |
| 175 | // driver is not concurrently polling. The I/O driver holds ownership of |
| 176 | // an `Arc<ScheduledIo>` so we can safely cast this to a ref. |
| 177 | let io: &ScheduledIo = unsafe { &*ptr }; |
| 178 | |
| 179 | io.set_readiness(Tick::Set, |curr| curr | ready); |
| 180 | io.wake(ready); |
| 181 | |
| 182 | ready_count += 1; |
| 183 | } |
| 184 | } |
| 185 | |
| 186 | handle.metrics.incr_ready_count_by(ready_count); |
| 187 | } |
| 188 | } |
| 189 | |
| 190 | impl fmt::Debug for Driver { |
| 191 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 192 | write!(f, "Driver" ) |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | impl Handle { |
| 197 | /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise |
| 198 | /// makes the next call to `turn` return immediately. |
| 199 | /// |
| 200 | /// This method is intended to be used in situations where a notification |
| 201 | /// needs to otherwise be sent to the main reactor. If the reactor is |
| 202 | /// currently blocked inside of `turn` then it will wake up and soon return |
| 203 | /// after this method has been called. If the reactor is not currently |
| 204 | /// blocked in `turn`, then the next call to `turn` will not block and |
| 205 | /// return immediately. |
| 206 | pub(crate) fn unpark(&self) { |
| 207 | #[cfg (not(target_os = "wasi" ))] |
| 208 | self.waker.wake().expect("failed to wake I/O driver" ); |
| 209 | } |
| 210 | |
| 211 | /// Registers an I/O resource with the reactor for a given `mio::Ready` state. |
| 212 | /// |
| 213 | /// The registration token is returned. |
| 214 | pub(super) fn add_source( |
| 215 | &self, |
| 216 | source: &mut impl mio::event::Source, |
| 217 | interest: Interest, |
| 218 | ) -> io::Result<Arc<ScheduledIo>> { |
| 219 | let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?; |
| 220 | let token = scheduled_io.token(); |
| 221 | |
| 222 | // we should remove the `scheduled_io` from the `registrations` set if registering |
| 223 | // the `source` with the OS fails. Otherwise it will leak the `scheduled_io`. |
| 224 | if let Err(e) = self.registry.register(source, token, interest.to_mio()) { |
| 225 | // safety: `scheduled_io` is part of the `registrations` set. |
| 226 | unsafe { |
| 227 | self.registrations |
| 228 | .remove(&mut self.synced.lock(), &scheduled_io) |
| 229 | }; |
| 230 | |
| 231 | return Err(e); |
| 232 | } |
| 233 | |
| 234 | // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList` |
| 235 | self.metrics.incr_fd_count(); |
| 236 | |
| 237 | Ok(scheduled_io) |
| 238 | } |
| 239 | |
| 240 | /// Deregisters an I/O resource from the reactor. |
| 241 | pub(super) fn deregister_source( |
| 242 | &self, |
| 243 | registration: &Arc<ScheduledIo>, |
| 244 | source: &mut impl Source, |
| 245 | ) -> io::Result<()> { |
| 246 | // Deregister the source with the OS poller **first** |
| 247 | self.registry.deregister(source)?; |
| 248 | |
| 249 | if self |
| 250 | .registrations |
| 251 | .deregister(&mut self.synced.lock(), registration) |
| 252 | { |
| 253 | self.unpark(); |
| 254 | } |
| 255 | |
| 256 | self.metrics.dec_fd_count(); |
| 257 | |
| 258 | Ok(()) |
| 259 | } |
| 260 | |
| 261 | fn release_pending_registrations(&self) { |
| 262 | if self.registrations.needs_release() { |
| 263 | self.registrations.release(&mut self.synced.lock()); |
| 264 | } |
| 265 | } |
| 266 | } |
| 267 | |
| 268 | impl fmt::Debug for Handle { |
| 269 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 270 | write!(f, "Handle" ) |
| 271 | } |
| 272 | } |
| 273 | |
| 274 | impl Direction { |
| 275 | pub(super) fn mask(self) -> Ready { |
| 276 | match self { |
| 277 | Direction::Read => Ready::READABLE | Ready::READ_CLOSED, |
| 278 | Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, |
| 279 | } |
| 280 | } |
| 281 | } |
| 282 | |