1 | #![cfg_attr (not(all(feature = "rt" , feature = "net" )), allow(dead_code))] |
2 | |
3 | mod registration; |
4 | pub(crate) use registration::Registration; |
5 | |
6 | mod scheduled_io; |
7 | use scheduled_io::ScheduledIo; |
8 | |
9 | mod metrics; |
10 | |
11 | use crate::io::interest::Interest; |
12 | use crate::io::ready::Ready; |
13 | use crate::runtime::driver; |
14 | use crate::util::slab::{self, Slab}; |
15 | use crate::{loom::sync::RwLock, util::bit}; |
16 | |
17 | use metrics::IoDriverMetrics; |
18 | |
19 | use std::fmt; |
20 | use std::io; |
21 | use std::time::Duration; |
22 | |
23 | /// I/O driver, backed by Mio. |
24 | pub(crate) struct Driver { |
25 | /// Tracks the number of times `turn` is called. It is safe for this to wrap |
26 | /// as it is mostly used to determine when to call `compact()`. |
27 | tick: u8, |
28 | |
29 | /// True when an event with the signal token is received |
30 | signal_ready: bool, |
31 | |
32 | /// Reuse the `mio::Events` value across calls to poll. |
33 | events: mio::Events, |
34 | |
35 | /// Primary slab handle containing the state for each resource registered |
36 | /// with this driver. |
37 | resources: Slab<ScheduledIo>, |
38 | |
39 | /// The system event queue. |
40 | poll: mio::Poll, |
41 | } |
42 | |
43 | /// A reference to an I/O driver. |
44 | pub(crate) struct Handle { |
45 | /// Registers I/O resources. |
46 | registry: mio::Registry, |
47 | |
48 | /// Allocates `ScheduledIo` handles when creating new resources. |
49 | io_dispatch: RwLock<IoDispatcher>, |
50 | |
51 | /// Used to wake up the reactor from a call to `turn`. |
52 | /// Not supported on Wasi due to lack of threading support. |
53 | #[cfg (not(tokio_wasi))] |
54 | waker: mio::Waker, |
55 | |
56 | pub(crate) metrics: IoDriverMetrics, |
57 | } |
58 | |
59 | #[derive (Debug)] |
60 | pub(crate) struct ReadyEvent { |
61 | tick: u8, |
62 | pub(crate) ready: Ready, |
63 | is_shutdown: bool, |
64 | } |
65 | |
66 | cfg_net_unix!( |
67 | impl ReadyEvent { |
68 | pub(crate) fn with_ready(&self, ready: Ready) -> Self { |
69 | Self { |
70 | ready, |
71 | tick: self.tick, |
72 | is_shutdown: self.is_shutdown, |
73 | } |
74 | } |
75 | } |
76 | ); |
77 | |
78 | struct IoDispatcher { |
79 | allocator: slab::Allocator<ScheduledIo>, |
80 | is_shutdown: bool, |
81 | } |
82 | |
83 | #[derive (Debug, Eq, PartialEq, Clone, Copy)] |
84 | enum Direction { |
85 | Read, |
86 | Write, |
87 | } |
88 | |
89 | enum Tick { |
90 | Set(u8), |
91 | Clear(u8), |
92 | } |
93 | |
94 | // TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup |
95 | // token. |
96 | const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); |
97 | const TOKEN_SIGNAL: mio::Token = mio::Token(1 + (1 << 31)); |
98 | |
99 | const ADDRESS: bit::Pack = bit::Pack::least_significant(width:24); |
100 | |
101 | // Packs the generation value in the `readiness` field. |
102 | // |
103 | // The generation prevents a race condition where a slab slot is reused for a |
104 | // new socket while the I/O driver is about to apply a readiness event. The |
105 | // generation value is checked when setting new readiness. If the generation do |
106 | // not match, then the readiness event is discarded. |
107 | const GENERATION: bit::Pack = ADDRESS.then(width:7); |
108 | |
109 | fn _assert_kinds() { |
110 | fn _assert<T: Send + Sync>() {} |
111 | |
112 | _assert::<Handle>(); |
113 | } |
114 | |
115 | // ===== impl Driver ===== |
116 | |
117 | impl Driver { |
118 | /// Creates a new event loop, returning any error that happened during the |
119 | /// creation. |
120 | pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> { |
121 | let poll = mio::Poll::new()?; |
122 | #[cfg (not(tokio_wasi))] |
123 | let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; |
124 | let registry = poll.registry().try_clone()?; |
125 | |
126 | let slab = Slab::new(); |
127 | let allocator = slab.allocator(); |
128 | |
129 | let driver = Driver { |
130 | tick: 0, |
131 | signal_ready: false, |
132 | events: mio::Events::with_capacity(nevents), |
133 | poll, |
134 | resources: slab, |
135 | }; |
136 | |
137 | let handle = Handle { |
138 | registry, |
139 | io_dispatch: RwLock::new(IoDispatcher::new(allocator)), |
140 | #[cfg (not(tokio_wasi))] |
141 | waker, |
142 | metrics: IoDriverMetrics::default(), |
143 | }; |
144 | |
145 | Ok((driver, handle)) |
146 | } |
147 | |
148 | pub(crate) fn park(&mut self, rt_handle: &driver::Handle) { |
149 | let handle = rt_handle.io(); |
150 | self.turn(handle, None); |
151 | } |
152 | |
153 | pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { |
154 | let handle = rt_handle.io(); |
155 | self.turn(handle, Some(duration)); |
156 | } |
157 | |
158 | pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { |
159 | let handle = rt_handle.io(); |
160 | |
161 | if handle.shutdown() { |
162 | self.resources.for_each(|io| { |
163 | // If a task is waiting on the I/O resource, notify it that the |
164 | // runtime is being shutdown. And shutdown will clear all wakers. |
165 | io.shutdown(); |
166 | }); |
167 | } |
168 | } |
169 | |
170 | fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) { |
171 | // How often to call `compact()` on the resource slab |
172 | const COMPACT_INTERVAL: u8 = 255; |
173 | |
174 | self.tick = self.tick.wrapping_add(1); |
175 | |
176 | if self.tick == COMPACT_INTERVAL { |
177 | self.resources.compact() |
178 | } |
179 | |
180 | let events = &mut self.events; |
181 | |
182 | // Block waiting for an event to happen, peeling out how many events |
183 | // happened. |
184 | match self.poll.poll(events, max_wait) { |
185 | Ok(_) => {} |
186 | Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} |
187 | #[cfg (tokio_wasi)] |
188 | Err(e) if e.kind() == io::ErrorKind::InvalidInput => { |
189 | // In case of wasm32_wasi this error happens, when trying to poll without subscriptions |
190 | // just return from the park, as there would be nothing, which wakes us up. |
191 | } |
192 | Err(e) => panic!("unexpected error when polling the I/O driver: {:?}" , e), |
193 | } |
194 | |
195 | // Process all the events that came in, dispatching appropriately |
196 | let mut ready_count = 0; |
197 | for event in events.iter() { |
198 | let token = event.token(); |
199 | |
200 | if token == TOKEN_WAKEUP { |
201 | // Nothing to do, the event is used to unblock the I/O driver |
202 | } else if token == TOKEN_SIGNAL { |
203 | self.signal_ready = true; |
204 | } else { |
205 | Self::dispatch( |
206 | &mut self.resources, |
207 | self.tick, |
208 | token, |
209 | Ready::from_mio(event), |
210 | ); |
211 | ready_count += 1; |
212 | } |
213 | } |
214 | |
215 | handle.metrics.incr_ready_count_by(ready_count); |
216 | } |
217 | |
218 | fn dispatch(resources: &mut Slab<ScheduledIo>, tick: u8, token: mio::Token, ready: Ready) { |
219 | let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); |
220 | |
221 | let io = match resources.get(addr) { |
222 | Some(io) => io, |
223 | None => return, |
224 | }; |
225 | |
226 | let res = io.set_readiness(Some(token.0), Tick::Set(tick), |curr| curr | ready); |
227 | |
228 | if res.is_err() { |
229 | // token no longer valid! |
230 | return; |
231 | } |
232 | |
233 | io.wake(ready); |
234 | } |
235 | } |
236 | |
237 | impl fmt::Debug for Driver { |
238 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
239 | write!(f, "Driver" ) |
240 | } |
241 | } |
242 | |
243 | impl Handle { |
244 | /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise |
245 | /// makes the next call to `turn` return immediately. |
246 | /// |
247 | /// This method is intended to be used in situations where a notification |
248 | /// needs to otherwise be sent to the main reactor. If the reactor is |
249 | /// currently blocked inside of `turn` then it will wake up and soon return |
250 | /// after this method has been called. If the reactor is not currently |
251 | /// blocked in `turn`, then the next call to `turn` will not block and |
252 | /// return immediately. |
253 | pub(crate) fn unpark(&self) { |
254 | #[cfg (not(tokio_wasi))] |
255 | self.waker.wake().expect("failed to wake I/O driver" ); |
256 | } |
257 | |
258 | /// Registers an I/O resource with the reactor for a given `mio::Ready` state. |
259 | /// |
260 | /// The registration token is returned. |
261 | pub(super) fn add_source( |
262 | &self, |
263 | source: &mut impl mio::event::Source, |
264 | interest: Interest, |
265 | ) -> io::Result<slab::Ref<ScheduledIo>> { |
266 | let (address, shared) = self.allocate()?; |
267 | |
268 | let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); |
269 | |
270 | self.registry |
271 | .register(source, mio::Token(token), interest.to_mio())?; |
272 | |
273 | self.metrics.incr_fd_count(); |
274 | |
275 | Ok(shared) |
276 | } |
277 | |
278 | /// Deregisters an I/O resource from the reactor. |
279 | pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { |
280 | self.registry.deregister(source)?; |
281 | |
282 | self.metrics.dec_fd_count(); |
283 | |
284 | Ok(()) |
285 | } |
286 | |
287 | /// shutdown the dispatcher. |
288 | fn shutdown(&self) -> bool { |
289 | let mut io = self.io_dispatch.write().unwrap(); |
290 | if io.is_shutdown { |
291 | return false; |
292 | } |
293 | io.is_shutdown = true; |
294 | true |
295 | } |
296 | |
297 | fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> { |
298 | let io = self.io_dispatch.read().unwrap(); |
299 | if io.is_shutdown { |
300 | return Err(io::Error::new( |
301 | io::ErrorKind::Other, |
302 | crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, |
303 | )); |
304 | } |
305 | io.allocator.allocate().ok_or_else(|| { |
306 | io::Error::new( |
307 | io::ErrorKind::Other, |
308 | "reactor at max registered I/O resources" , |
309 | ) |
310 | }) |
311 | } |
312 | } |
313 | |
314 | impl fmt::Debug for Handle { |
315 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
316 | write!(f, "Handle" ) |
317 | } |
318 | } |
319 | |
320 | // ===== impl IoDispatcher ===== |
321 | |
322 | impl IoDispatcher { |
323 | fn new(allocator: slab::Allocator<ScheduledIo>) -> Self { |
324 | Self { |
325 | allocator, |
326 | is_shutdown: false, |
327 | } |
328 | } |
329 | } |
330 | |
331 | impl Direction { |
332 | pub(super) fn mask(self) -> Ready { |
333 | match self { |
334 | Direction::Read => Ready::READABLE | Ready::READ_CLOSED, |
335 | Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, |
336 | } |
337 | } |
338 | } |
339 | |
340 | // Signal handling |
341 | cfg_signal_internal_and_unix! { |
342 | impl Handle { |
343 | pub(crate) fn register_signal_receiver(&self, receiver: &mut mio::net::UnixStream) -> io::Result<()> { |
344 | self.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?; |
345 | Ok(()) |
346 | } |
347 | } |
348 | |
349 | impl Driver { |
350 | pub(crate) fn consume_signal_ready(&mut self) -> bool { |
351 | let ret = self.signal_ready; |
352 | self.signal_ready = false; |
353 | ret |
354 | } |
355 | } |
356 | } |
357 | |