1 | use std::borrow::Borrow; |
2 | use std::collections::BTreeMap; |
3 | use std::fmt; |
4 | use std::future::Future; |
5 | use std::io; |
6 | use std::marker::PhantomData; |
7 | use std::mem; |
8 | use std::panic; |
9 | use std::pin::Pin; |
10 | use std::sync::atomic::{AtomicUsize, Ordering}; |
11 | use std::sync::{Arc, Mutex, MutexGuard}; |
12 | use std::task::{Context, Poll, Waker}; |
13 | use std::time::{Duration, Instant}; |
14 | |
15 | use async_lock::OnceCell; |
16 | use concurrent_queue::ConcurrentQueue; |
17 | use futures_lite::ready; |
18 | use polling::{Event, Events, Poller}; |
19 | use slab::Slab; |
20 | |
21 | // Choose the proper implementation of `Registration` based on the target platform. |
22 | cfg_if::cfg_if! { |
23 | if #[cfg(windows)] { |
24 | mod windows; |
25 | pub use windows::Registration; |
26 | } else if #[cfg(any( |
27 | target_vendor = "apple" , |
28 | target_os = "freebsd" , |
29 | target_os = "netbsd" , |
30 | target_os = "openbsd" , |
31 | target_os = "dragonfly" , |
32 | ))] { |
33 | mod kqueue; |
34 | pub use kqueue::Registration; |
35 | } else if #[cfg(unix)] { |
36 | mod unix; |
37 | pub use unix::Registration; |
38 | } else { |
39 | compile_error!("unsupported platform" ); |
40 | } |
41 | } |
42 | |
43 | #[cfg (not(target_os = "espidf" ))] |
44 | const TIMER_QUEUE_SIZE: usize = 1000; |
45 | |
46 | /// ESP-IDF - being an embedded OS - does not need so many timers |
47 | /// and this saves ~ 20K RAM which is a lot for an MCU with RAM < 400K |
48 | #[cfg (target_os = "espidf" )] |
49 | const TIMER_QUEUE_SIZE: usize = 100; |
50 | |
51 | const READ: usize = 0; |
52 | const WRITE: usize = 1; |
53 | |
54 | /// The reactor. |
55 | /// |
56 | /// There is only one global instance of this type, accessible by [`Reactor::get()`]. |
57 | pub(crate) struct Reactor { |
58 | /// Portable bindings to epoll/kqueue/event ports/IOCP. |
59 | /// |
60 | /// This is where I/O is polled, producing I/O events. |
61 | pub(crate) poller: Poller, |
62 | |
63 | /// Ticker bumped before polling. |
64 | /// |
65 | /// This is useful for checking what is the current "round" of `ReactorLock::react()` when |
66 | /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those |
67 | /// methods must make sure they don't receive stale I/O events - they only accept events from a |
68 | /// fresh "round" of `ReactorLock::react()`. |
69 | ticker: AtomicUsize, |
70 | |
71 | /// Registered sources. |
72 | sources: Mutex<Slab<Arc<Source>>>, |
73 | |
74 | /// Temporary storage for I/O events when polling the reactor. |
75 | /// |
76 | /// Holding a lock on this event list implies the exclusive right to poll I/O. |
77 | events: Mutex<Events>, |
78 | |
79 | /// An ordered map of registered timers. |
80 | /// |
81 | /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to |
82 | /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the |
83 | /// timer. |
84 | timers: Mutex<BTreeMap<(Instant, usize), Waker>>, |
85 | |
86 | /// A queue of timer operations (insert and remove). |
87 | /// |
88 | /// When inserting or removing a timer, we don't process it immediately - we just push it into |
89 | /// this queue. Timers actually get processed when the queue fills up or the reactor is polled. |
90 | timer_ops: ConcurrentQueue<TimerOp>, |
91 | } |
92 | |
93 | impl Reactor { |
94 | /// Returns a reference to the reactor. |
95 | pub(crate) fn get() -> &'static Reactor { |
96 | static REACTOR: OnceCell<Reactor> = OnceCell::new(); |
97 | |
98 | REACTOR.get_or_init_blocking(|| { |
99 | crate::driver::init(); |
100 | Reactor { |
101 | poller: Poller::new().expect("cannot initialize I/O event notification" ), |
102 | ticker: AtomicUsize::new(0), |
103 | sources: Mutex::new(Slab::new()), |
104 | events: Mutex::new(Events::new()), |
105 | timers: Mutex::new(BTreeMap::new()), |
106 | timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE), |
107 | } |
108 | }) |
109 | } |
110 | |
111 | /// Returns the current ticker. |
112 | pub(crate) fn ticker(&self) -> usize { |
113 | self.ticker.load(Ordering::SeqCst) |
114 | } |
115 | |
116 | /// Registers an I/O source in the reactor. |
117 | pub(crate) fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> { |
118 | // Create an I/O source for this file descriptor. |
119 | let source = { |
120 | let mut sources = self.sources.lock().unwrap(); |
121 | let key = sources.vacant_entry().key(); |
122 | let source = Arc::new(Source { |
123 | registration: raw, |
124 | key, |
125 | state: Default::default(), |
126 | }); |
127 | sources.insert(source.clone()); |
128 | source |
129 | }; |
130 | |
131 | // Register the file descriptor. |
132 | if let Err(err) = source.registration.add(&self.poller, source.key) { |
133 | let mut sources = self.sources.lock().unwrap(); |
134 | sources.remove(source.key); |
135 | return Err(err); |
136 | } |
137 | |
138 | Ok(source) |
139 | } |
140 | |
141 | /// Deregisters an I/O source from the reactor. |
142 | pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> { |
143 | let mut sources = self.sources.lock().unwrap(); |
144 | sources.remove(source.key); |
145 | source.registration.delete(&self.poller) |
146 | } |
147 | |
148 | /// Registers a timer in the reactor. |
149 | /// |
150 | /// Returns the inserted timer's ID. |
151 | pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize { |
152 | // Generate a new timer ID. |
153 | static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1); |
154 | let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed); |
155 | |
156 | // Push an insert operation. |
157 | while self |
158 | .timer_ops |
159 | .push(TimerOp::Insert(when, id, waker.clone())) |
160 | .is_err() |
161 | { |
162 | // If the queue is full, drain it and try again. |
163 | let mut timers = self.timers.lock().unwrap(); |
164 | self.process_timer_ops(&mut timers); |
165 | } |
166 | |
167 | // Notify that a timer has been inserted. |
168 | self.notify(); |
169 | |
170 | id |
171 | } |
172 | |
173 | /// Deregisters a timer from the reactor. |
174 | pub(crate) fn remove_timer(&self, when: Instant, id: usize) { |
175 | // Push a remove operation. |
176 | while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() { |
177 | // If the queue is full, drain it and try again. |
178 | let mut timers = self.timers.lock().unwrap(); |
179 | self.process_timer_ops(&mut timers); |
180 | } |
181 | } |
182 | |
183 | /// Notifies the thread blocked on the reactor. |
184 | pub(crate) fn notify(&self) { |
185 | self.poller.notify().expect("failed to notify reactor" ); |
186 | } |
187 | |
188 | /// Locks the reactor, potentially blocking if the lock is held by another thread. |
189 | pub(crate) fn lock(&self) -> ReactorLock<'_> { |
190 | let reactor = self; |
191 | let events = self.events.lock().unwrap(); |
192 | ReactorLock { reactor, events } |
193 | } |
194 | |
195 | /// Attempts to lock the reactor. |
196 | pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> { |
197 | self.events.try_lock().ok().map(|events| { |
198 | let reactor = self; |
199 | ReactorLock { reactor, events } |
200 | }) |
201 | } |
202 | |
203 | /// Processes ready timers and extends the list of wakers to wake. |
204 | /// |
205 | /// Returns the duration until the next timer before this method was called. |
206 | fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> { |
207 | let span = tracing::trace_span!("process_timers" ); |
208 | let _enter = span.enter(); |
209 | |
210 | let mut timers = self.timers.lock().unwrap(); |
211 | self.process_timer_ops(&mut timers); |
212 | |
213 | let now = Instant::now(); |
214 | |
215 | // Split timers into ready and pending timers. |
216 | // |
217 | // Careful to split just *after* `now`, so that a timer set for exactly `now` is considered |
218 | // ready. |
219 | let pending = timers.split_off(&(now + Duration::from_nanos(1), 0)); |
220 | let ready = mem::replace(&mut *timers, pending); |
221 | |
222 | // Calculate the duration until the next event. |
223 | let dur = if ready.is_empty() { |
224 | // Duration until the next timer. |
225 | timers |
226 | .keys() |
227 | .next() |
228 | .map(|(when, _)| when.saturating_duration_since(now)) |
229 | } else { |
230 | // Timers are about to fire right now. |
231 | Some(Duration::from_secs(0)) |
232 | }; |
233 | |
234 | // Drop the lock before waking. |
235 | drop(timers); |
236 | |
237 | // Add wakers to the list. |
238 | tracing::trace!(" {} ready wakers" , ready.len()); |
239 | |
240 | for (_, waker) in ready { |
241 | wakers.push(waker); |
242 | } |
243 | |
244 | dur |
245 | } |
246 | |
247 | /// Processes queued timer operations. |
248 | fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) { |
249 | // Process only as much as fits into the queue, or else this loop could in theory run |
250 | // forever. |
251 | self.timer_ops |
252 | .try_iter() |
253 | .take(self.timer_ops.capacity().unwrap()) |
254 | .for_each(|op| match op { |
255 | TimerOp::Insert(when, id, waker) => { |
256 | timers.insert((when, id), waker); |
257 | } |
258 | TimerOp::Remove(when, id) => { |
259 | timers.remove(&(when, id)); |
260 | } |
261 | }); |
262 | } |
263 | } |
264 | |
265 | /// A lock on the reactor. |
266 | pub(crate) struct ReactorLock<'a> { |
267 | reactor: &'a Reactor, |
268 | events: MutexGuard<'a, Events>, |
269 | } |
270 | |
271 | impl ReactorLock<'_> { |
272 | /// Processes new events, blocking until the first event or the timeout. |
273 | pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> { |
274 | let span = tracing::trace_span!("react" ); |
275 | let _enter = span.enter(); |
276 | |
277 | let mut wakers = Vec::new(); |
278 | |
279 | // Process ready timers. |
280 | let next_timer = self.reactor.process_timers(&mut wakers); |
281 | |
282 | // compute the timeout for blocking on I/O events. |
283 | let timeout = match (next_timer, timeout) { |
284 | (None, None) => None, |
285 | (Some(t), None) | (None, Some(t)) => Some(t), |
286 | (Some(a), Some(b)) => Some(a.min(b)), |
287 | }; |
288 | |
289 | // Bump the ticker before polling I/O. |
290 | let tick = self |
291 | .reactor |
292 | .ticker |
293 | .fetch_add(1, Ordering::SeqCst) |
294 | .wrapping_add(1); |
295 | |
296 | self.events.clear(); |
297 | |
298 | // Block on I/O events. |
299 | let res = match self.reactor.poller.wait(&mut self.events, timeout) { |
300 | // No I/O events occurred. |
301 | Ok(0) => { |
302 | if timeout != Some(Duration::from_secs(0)) { |
303 | // The non-zero timeout was hit so fire ready timers. |
304 | self.reactor.process_timers(&mut wakers); |
305 | } |
306 | Ok(()) |
307 | } |
308 | |
309 | // At least one I/O event occurred. |
310 | Ok(_) => { |
311 | // Iterate over sources in the event list. |
312 | let sources = self.reactor.sources.lock().unwrap(); |
313 | |
314 | for ev in self.events.iter() { |
315 | // Check if there is a source in the table with this key. |
316 | if let Some(source) = sources.get(ev.key) { |
317 | let mut state = source.state.lock().unwrap(); |
318 | |
319 | // Collect wakers if a writability event was emitted. |
320 | for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] { |
321 | if emitted { |
322 | state[dir].tick = tick; |
323 | state[dir].drain_into(&mut wakers); |
324 | } |
325 | } |
326 | |
327 | // Re-register if there are still writers or readers. This can happen if |
328 | // e.g. we were previously interested in both readability and writability, |
329 | // but only one of them was emitted. |
330 | if !state[READ].is_empty() || !state[WRITE].is_empty() { |
331 | // Create the event that we are interested in. |
332 | let event = { |
333 | let mut event = Event::none(source.key); |
334 | event.readable = !state[READ].is_empty(); |
335 | event.writable = !state[WRITE].is_empty(); |
336 | event |
337 | }; |
338 | |
339 | // Register interest in this event. |
340 | source.registration.modify(&self.reactor.poller, event)?; |
341 | } |
342 | } |
343 | } |
344 | |
345 | Ok(()) |
346 | } |
347 | |
348 | // The syscall was interrupted. |
349 | Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()), |
350 | |
351 | // An actual error occureed. |
352 | Err(err) => Err(err), |
353 | }; |
354 | |
355 | // Wake up ready tasks. |
356 | tracing::trace!(" {} ready wakers" , wakers.len()); |
357 | for waker in wakers { |
358 | // Don't let a panicking waker blow everything up. |
359 | panic::catch_unwind(|| waker.wake()).ok(); |
360 | } |
361 | |
362 | res |
363 | } |
364 | } |
365 | |
366 | /// A single timer operation. |
367 | enum TimerOp { |
368 | Insert(Instant, usize, Waker), |
369 | Remove(Instant, usize), |
370 | } |
371 | |
372 | /// A registered source of I/O events. |
373 | #[derive (Debug)] |
374 | pub(crate) struct Source { |
375 | /// This source's registration into the reactor. |
376 | registration: Registration, |
377 | |
378 | /// The key of this source obtained during registration. |
379 | key: usize, |
380 | |
381 | /// Inner state with registered wakers. |
382 | state: Mutex<[Direction; 2]>, |
383 | } |
384 | |
385 | /// A read or write direction. |
386 | #[derive (Debug, Default)] |
387 | struct Direction { |
388 | /// Last reactor tick that delivered an event. |
389 | tick: usize, |
390 | |
391 | /// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`. |
392 | ticks: Option<(usize, usize)>, |
393 | |
394 | /// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`. |
395 | waker: Option<Waker>, |
396 | |
397 | /// Wakers of tasks waiting for the next event. |
398 | /// |
399 | /// Registered by `Async::readable()` and `Async::writable()`. |
400 | wakers: Slab<Option<Waker>>, |
401 | } |
402 | |
403 | impl Direction { |
404 | /// Returns `true` if there are no wakers interested in this direction. |
405 | fn is_empty(&self) -> bool { |
406 | self.waker.is_none() && self.wakers.iter().all(|(_, opt: &Option)| opt.is_none()) |
407 | } |
408 | |
409 | /// Moves all wakers into a `Vec`. |
410 | fn drain_into(&mut self, dst: &mut Vec<Waker>) { |
411 | if let Some(w: Waker) = self.waker.take() { |
412 | dst.push(w); |
413 | } |
414 | for (_, opt: &mut Option) in self.wakers.iter_mut() { |
415 | if let Some(w: Waker) = opt.take() { |
416 | dst.push(w); |
417 | } |
418 | } |
419 | } |
420 | } |
421 | |
422 | impl Source { |
423 | /// Polls the I/O source for readability. |
424 | pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
425 | self.poll_ready(READ, cx) |
426 | } |
427 | |
428 | /// Polls the I/O source for writability. |
429 | pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
430 | self.poll_ready(WRITE, cx) |
431 | } |
432 | |
433 | /// Registers a waker from `poll_readable()` or `poll_writable()`. |
434 | /// |
435 | /// If a different waker is already registered, it gets replaced and woken. |
436 | fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
437 | let mut state = self.state.lock().unwrap(); |
438 | |
439 | // Check if the reactor has delivered an event. |
440 | if let Some((a, b)) = state[dir].ticks { |
441 | // If `state[dir].tick` has changed to a value other than the old reactor tick, |
442 | // that means a newer reactor tick has delivered an event. |
443 | if state[dir].tick != a && state[dir].tick != b { |
444 | state[dir].ticks = None; |
445 | return Poll::Ready(Ok(())); |
446 | } |
447 | } |
448 | |
449 | let was_empty = state[dir].is_empty(); |
450 | |
451 | // Register the current task's waker. |
452 | if let Some(w) = state[dir].waker.take() { |
453 | if w.will_wake(cx.waker()) { |
454 | state[dir].waker = Some(w); |
455 | return Poll::Pending; |
456 | } |
457 | // Wake the previous waker because it's going to get replaced. |
458 | panic::catch_unwind(|| w.wake()).ok(); |
459 | } |
460 | state[dir].waker = Some(cx.waker().clone()); |
461 | state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick)); |
462 | |
463 | // Update interest in this I/O handle. |
464 | if was_empty { |
465 | // Create the event that we are interested in. |
466 | let event = { |
467 | let mut event = Event::none(self.key); |
468 | event.readable = !state[READ].is_empty(); |
469 | event.writable = !state[WRITE].is_empty(); |
470 | event |
471 | }; |
472 | |
473 | // Register interest in it. |
474 | self.registration.modify(&Reactor::get().poller, event)?; |
475 | } |
476 | |
477 | Poll::Pending |
478 | } |
479 | |
480 | /// Waits until the I/O source is readable. |
481 | pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> { |
482 | Readable(Self::ready(handle, READ)) |
483 | } |
484 | |
485 | /// Waits until the I/O source is readable. |
486 | pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> { |
487 | ReadableOwned(Self::ready(handle, READ)) |
488 | } |
489 | |
490 | /// Waits until the I/O source is writable. |
491 | pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> { |
492 | Writable(Self::ready(handle, WRITE)) |
493 | } |
494 | |
495 | /// Waits until the I/O source is writable. |
496 | pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> { |
497 | WritableOwned(Self::ready(handle, WRITE)) |
498 | } |
499 | |
500 | /// Waits until the I/O source is readable or writable. |
501 | fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> { |
502 | Ready { |
503 | handle, |
504 | dir, |
505 | ticks: None, |
506 | index: None, |
507 | _capture: PhantomData, |
508 | } |
509 | } |
510 | } |
511 | |
512 | /// Future for [`Async::readable`](crate::Async::readable). |
513 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
514 | pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>); |
515 | |
516 | impl<T> Future for Readable<'_, T> { |
517 | type Output = io::Result<()>; |
518 | |
519 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
520 | ready!(Pin::new(&mut self.0).poll(cx))?; |
521 | tracing::trace!(fd = ?self.0.handle.source.registration, "readable" ); |
522 | Poll::Ready(Ok(())) |
523 | } |
524 | } |
525 | |
526 | impl<T> fmt::Debug for Readable<'_, T> { |
527 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
528 | f.debug_struct(name:"Readable" ).finish() |
529 | } |
530 | } |
531 | |
532 | /// Future for [`Async::readable_owned`](crate::Async::readable_owned). |
533 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
534 | pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>); |
535 | |
536 | impl<T> Future for ReadableOwned<T> { |
537 | type Output = io::Result<()>; |
538 | |
539 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
540 | ready!(Pin::new(&mut self.0).poll(cx))?; |
541 | tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned" ); |
542 | Poll::Ready(Ok(())) |
543 | } |
544 | } |
545 | |
546 | impl<T> fmt::Debug for ReadableOwned<T> { |
547 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
548 | f.debug_struct(name:"ReadableOwned" ).finish() |
549 | } |
550 | } |
551 | |
552 | /// Future for [`Async::writable`](crate::Async::writable). |
553 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
554 | pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>); |
555 | |
556 | impl<T> Future for Writable<'_, T> { |
557 | type Output = io::Result<()>; |
558 | |
559 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
560 | ready!(Pin::new(&mut self.0).poll(cx))?; |
561 | tracing::trace!(fd = ?self.0.handle.source.registration, "writable" ); |
562 | Poll::Ready(Ok(())) |
563 | } |
564 | } |
565 | |
566 | impl<T> fmt::Debug for Writable<'_, T> { |
567 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
568 | f.debug_struct(name:"Writable" ).finish() |
569 | } |
570 | } |
571 | |
572 | /// Future for [`Async::writable_owned`](crate::Async::writable_owned). |
573 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
574 | pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>); |
575 | |
576 | impl<T> Future for WritableOwned<T> { |
577 | type Output = io::Result<()>; |
578 | |
579 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
580 | ready!(Pin::new(&mut self.0).poll(cx))?; |
581 | tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned" ); |
582 | Poll::Ready(Ok(())) |
583 | } |
584 | } |
585 | |
586 | impl<T> fmt::Debug for WritableOwned<T> { |
587 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
588 | f.debug_struct(name:"WritableOwned" ).finish() |
589 | } |
590 | } |
591 | |
592 | struct Ready<H: Borrow<crate::Async<T>>, T> { |
593 | handle: H, |
594 | dir: usize, |
595 | ticks: Option<(usize, usize)>, |
596 | index: Option<usize>, |
597 | _capture: PhantomData<fn() -> T>, |
598 | } |
599 | |
600 | impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {} |
601 | |
602 | impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> { |
603 | type Output = io::Result<()>; |
604 | |
605 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
606 | let Self { |
607 | ref handle, |
608 | dir, |
609 | ticks, |
610 | index, |
611 | .. |
612 | } = &mut *self; |
613 | |
614 | let mut state = handle.borrow().source.state.lock().unwrap(); |
615 | |
616 | // Check if the reactor has delivered an event. |
617 | if let Some((a, b)) = *ticks { |
618 | // If `state[dir].tick` has changed to a value other than the old reactor tick, |
619 | // that means a newer reactor tick has delivered an event. |
620 | if state[*dir].tick != a && state[*dir].tick != b { |
621 | return Poll::Ready(Ok(())); |
622 | } |
623 | } |
624 | |
625 | let was_empty = state[*dir].is_empty(); |
626 | |
627 | // Register the current task's waker. |
628 | let i = match *index { |
629 | Some(i) => i, |
630 | None => { |
631 | let i = state[*dir].wakers.insert(None); |
632 | *index = Some(i); |
633 | *ticks = Some((Reactor::get().ticker(), state[*dir].tick)); |
634 | i |
635 | } |
636 | }; |
637 | state[*dir].wakers[i] = Some(cx.waker().clone()); |
638 | |
639 | // Update interest in this I/O handle. |
640 | if was_empty { |
641 | // Create the event that we are interested in. |
642 | let event = { |
643 | let mut event = Event::none(handle.borrow().source.key); |
644 | event.readable = !state[READ].is_empty(); |
645 | event.writable = !state[WRITE].is_empty(); |
646 | event |
647 | }; |
648 | |
649 | // Indicate that we are interested in this event. |
650 | handle |
651 | .borrow() |
652 | .source |
653 | .registration |
654 | .modify(&Reactor::get().poller, event)?; |
655 | } |
656 | |
657 | Poll::Pending |
658 | } |
659 | } |
660 | |
661 | impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> { |
662 | fn drop(&mut self) { |
663 | // Remove our waker when dropped. |
664 | if let Some(key: usize) = self.index { |
665 | let mut state = self.handle.borrow().source.state.lock().unwrap(); |
666 | let wakers: &mut {unknown} = &mut state[self.dir].wakers; |
667 | if wakers.contains(key) { |
668 | wakers.remove(key); |
669 | } |
670 | } |
671 | } |
672 | } |
673 | |