1 | use std::borrow::Borrow; |
2 | use std::collections::BTreeMap; |
3 | use std::fmt; |
4 | use std::future::Future; |
5 | use std::io; |
6 | use std::marker::PhantomData; |
7 | use std::mem; |
8 | #[cfg (unix)] |
9 | use std::os::unix::io::RawFd; |
10 | #[cfg (windows)] |
11 | use std::os::windows::io::RawSocket; |
12 | use std::panic; |
13 | use std::pin::Pin; |
14 | use std::sync::atomic::{AtomicUsize, Ordering}; |
15 | use std::sync::{Arc, Mutex, MutexGuard}; |
16 | use std::task::{Context, Poll, Waker}; |
17 | use std::time::{Duration, Instant}; |
18 | |
19 | use async_lock::OnceCell; |
20 | use concurrent_queue::ConcurrentQueue; |
21 | use futures_lite::ready; |
22 | use polling::{Event, Poller}; |
23 | use slab::Slab; |
24 | |
25 | const READ: usize = 0; |
26 | const WRITE: usize = 1; |
27 | |
28 | /// The reactor. |
29 | /// |
30 | /// There is only one global instance of this type, accessible by [`Reactor::get()`]. |
31 | pub(crate) struct Reactor { |
32 | /// Portable bindings to epoll/kqueue/event ports/IOCP. |
33 | /// |
34 | /// This is where I/O is polled, producing I/O events. |
35 | poller: Poller, |
36 | |
37 | /// Ticker bumped before polling. |
38 | /// |
39 | /// This is useful for checking what is the current "round" of `ReactorLock::react()` when |
40 | /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those |
41 | /// methods must make sure they don't receive stale I/O events - they only accept events from a |
42 | /// fresh "round" of `ReactorLock::react()`. |
43 | ticker: AtomicUsize, |
44 | |
45 | /// Registered sources. |
46 | sources: Mutex<Slab<Arc<Source>>>, |
47 | |
48 | /// Temporary storage for I/O events when polling the reactor. |
49 | /// |
50 | /// Holding a lock on this event list implies the exclusive right to poll I/O. |
51 | events: Mutex<Vec<Event>>, |
52 | |
53 | /// An ordered map of registered timers. |
54 | /// |
55 | /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to |
56 | /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the |
57 | /// timer. |
58 | timers: Mutex<BTreeMap<(Instant, usize), Waker>>, |
59 | |
60 | /// A queue of timer operations (insert and remove). |
61 | /// |
62 | /// When inserting or removing a timer, we don't process it immediately - we just push it into |
63 | /// this queue. Timers actually get processed when the queue fills up or the reactor is polled. |
64 | timer_ops: ConcurrentQueue<TimerOp>, |
65 | } |
66 | |
67 | impl Reactor { |
68 | /// Returns a reference to the reactor. |
69 | pub(crate) fn get() -> &'static Reactor { |
70 | static REACTOR: OnceCell<Reactor> = OnceCell::new(); |
71 | |
72 | REACTOR.get_or_init_blocking(|| { |
73 | crate::driver::init(); |
74 | Reactor { |
75 | poller: Poller::new().expect("cannot initialize I/O event notification" ), |
76 | ticker: AtomicUsize::new(0), |
77 | sources: Mutex::new(Slab::new()), |
78 | events: Mutex::new(Vec::new()), |
79 | timers: Mutex::new(BTreeMap::new()), |
80 | timer_ops: ConcurrentQueue::bounded(1000), |
81 | } |
82 | }) |
83 | } |
84 | |
85 | /// Returns the current ticker. |
86 | pub(crate) fn ticker(&self) -> usize { |
87 | self.ticker.load(Ordering::SeqCst) |
88 | } |
89 | |
90 | /// Registers an I/O source in the reactor. |
91 | pub(crate) fn insert_io( |
92 | &self, |
93 | #[cfg (unix)] raw: RawFd, |
94 | #[cfg (windows)] raw: RawSocket, |
95 | ) -> io::Result<Arc<Source>> { |
96 | // Create an I/O source for this file descriptor. |
97 | let source = { |
98 | let mut sources = self.sources.lock().unwrap(); |
99 | let key = sources.vacant_entry().key(); |
100 | let source = Arc::new(Source { |
101 | raw, |
102 | key, |
103 | state: Default::default(), |
104 | }); |
105 | sources.insert(source.clone()); |
106 | source |
107 | }; |
108 | |
109 | // Register the file descriptor. |
110 | if let Err(err) = self.poller.add(raw, Event::none(source.key)) { |
111 | let mut sources = self.sources.lock().unwrap(); |
112 | sources.remove(source.key); |
113 | return Err(err); |
114 | } |
115 | |
116 | Ok(source) |
117 | } |
118 | |
119 | /// Deregisters an I/O source from the reactor. |
120 | pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> { |
121 | let mut sources = self.sources.lock().unwrap(); |
122 | sources.remove(source.key); |
123 | self.poller.delete(source.raw) |
124 | } |
125 | |
126 | /// Registers a timer in the reactor. |
127 | /// |
128 | /// Returns the inserted timer's ID. |
129 | pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize { |
130 | // Generate a new timer ID. |
131 | static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1); |
132 | let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed); |
133 | |
134 | // Push an insert operation. |
135 | while self |
136 | .timer_ops |
137 | .push(TimerOp::Insert(when, id, waker.clone())) |
138 | .is_err() |
139 | { |
140 | // If the queue is full, drain it and try again. |
141 | let mut timers = self.timers.lock().unwrap(); |
142 | self.process_timer_ops(&mut timers); |
143 | } |
144 | |
145 | // Notify that a timer has been inserted. |
146 | self.notify(); |
147 | |
148 | id |
149 | } |
150 | |
151 | /// Deregisters a timer from the reactor. |
152 | pub(crate) fn remove_timer(&self, when: Instant, id: usize) { |
153 | // Push a remove operation. |
154 | while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() { |
155 | // If the queue is full, drain it and try again. |
156 | let mut timers = self.timers.lock().unwrap(); |
157 | self.process_timer_ops(&mut timers); |
158 | } |
159 | } |
160 | |
161 | /// Notifies the thread blocked on the reactor. |
162 | pub(crate) fn notify(&self) { |
163 | self.poller.notify().expect("failed to notify reactor" ); |
164 | } |
165 | |
166 | /// Locks the reactor, potentially blocking if the lock is held by another thread. |
167 | pub(crate) fn lock(&self) -> ReactorLock<'_> { |
168 | let reactor = self; |
169 | let events = self.events.lock().unwrap(); |
170 | ReactorLock { reactor, events } |
171 | } |
172 | |
173 | /// Attempts to lock the reactor. |
174 | pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> { |
175 | self.events.try_lock().ok().map(|events| { |
176 | let reactor = self; |
177 | ReactorLock { reactor, events } |
178 | }) |
179 | } |
180 | |
181 | /// Processes ready timers and extends the list of wakers to wake. |
182 | /// |
183 | /// Returns the duration until the next timer before this method was called. |
184 | fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> { |
185 | let mut timers = self.timers.lock().unwrap(); |
186 | self.process_timer_ops(&mut timers); |
187 | |
188 | let now = Instant::now(); |
189 | |
190 | // Split timers into ready and pending timers. |
191 | // |
192 | // Careful to split just *after* `now`, so that a timer set for exactly `now` is considered |
193 | // ready. |
194 | let pending = timers.split_off(&(now + Duration::from_nanos(1), 0)); |
195 | let ready = mem::replace(&mut *timers, pending); |
196 | |
197 | // Calculate the duration until the next event. |
198 | let dur = if ready.is_empty() { |
199 | // Duration until the next timer. |
200 | timers |
201 | .keys() |
202 | .next() |
203 | .map(|(when, _)| when.saturating_duration_since(now)) |
204 | } else { |
205 | // Timers are about to fire right now. |
206 | Some(Duration::from_secs(0)) |
207 | }; |
208 | |
209 | // Drop the lock before waking. |
210 | drop(timers); |
211 | |
212 | // Add wakers to the list. |
213 | log::trace!("process_timers: {} ready wakers" , ready.len()); |
214 | for (_, waker) in ready { |
215 | wakers.push(waker); |
216 | } |
217 | |
218 | dur |
219 | } |
220 | |
221 | /// Processes queued timer operations. |
222 | fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) { |
223 | // Process only as much as fits into the queue, or else this loop could in theory run |
224 | // forever. |
225 | for _ in 0..self.timer_ops.capacity().unwrap() { |
226 | match self.timer_ops.pop() { |
227 | Ok(TimerOp::Insert(when, id, waker)) => { |
228 | timers.insert((when, id), waker); |
229 | } |
230 | Ok(TimerOp::Remove(when, id)) => { |
231 | timers.remove(&(when, id)); |
232 | } |
233 | Err(_) => break, |
234 | } |
235 | } |
236 | } |
237 | } |
238 | |
239 | /// A lock on the reactor. |
240 | pub(crate) struct ReactorLock<'a> { |
241 | reactor: &'a Reactor, |
242 | events: MutexGuard<'a, Vec<Event>>, |
243 | } |
244 | |
245 | impl ReactorLock<'_> { |
246 | /// Processes new events, blocking until the first event or the timeout. |
247 | pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> { |
248 | let mut wakers = Vec::new(); |
249 | |
250 | // Process ready timers. |
251 | let next_timer = self.reactor.process_timers(&mut wakers); |
252 | |
253 | // compute the timeout for blocking on I/O events. |
254 | let timeout = match (next_timer, timeout) { |
255 | (None, None) => None, |
256 | (Some(t), None) | (None, Some(t)) => Some(t), |
257 | (Some(a), Some(b)) => Some(a.min(b)), |
258 | }; |
259 | |
260 | // Bump the ticker before polling I/O. |
261 | let tick = self |
262 | .reactor |
263 | .ticker |
264 | .fetch_add(1, Ordering::SeqCst) |
265 | .wrapping_add(1); |
266 | |
267 | self.events.clear(); |
268 | |
269 | // Block on I/O events. |
270 | let res = match self.reactor.poller.wait(&mut self.events, timeout) { |
271 | // No I/O events occurred. |
272 | Ok(0) => { |
273 | if timeout != Some(Duration::from_secs(0)) { |
274 | // The non-zero timeout was hit so fire ready timers. |
275 | self.reactor.process_timers(&mut wakers); |
276 | } |
277 | Ok(()) |
278 | } |
279 | |
280 | // At least one I/O event occurred. |
281 | Ok(_) => { |
282 | // Iterate over sources in the event list. |
283 | let sources = self.reactor.sources.lock().unwrap(); |
284 | |
285 | for ev in self.events.iter() { |
286 | // Check if there is a source in the table with this key. |
287 | if let Some(source) = sources.get(ev.key) { |
288 | let mut state = source.state.lock().unwrap(); |
289 | |
290 | // Collect wakers if a writability event was emitted. |
291 | for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] { |
292 | if emitted { |
293 | state[dir].tick = tick; |
294 | state[dir].drain_into(&mut wakers); |
295 | } |
296 | } |
297 | |
298 | // Re-register if there are still writers or readers. The can happen if |
299 | // e.g. we were previously interested in both readability and writability, |
300 | // but only one of them was emitted. |
301 | if !state[READ].is_empty() || !state[WRITE].is_empty() { |
302 | self.reactor.poller.modify( |
303 | source.raw, |
304 | Event { |
305 | key: source.key, |
306 | readable: !state[READ].is_empty(), |
307 | writable: !state[WRITE].is_empty(), |
308 | }, |
309 | )?; |
310 | } |
311 | } |
312 | } |
313 | |
314 | Ok(()) |
315 | } |
316 | |
317 | // The syscall was interrupted. |
318 | Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()), |
319 | |
320 | // An actual error occureed. |
321 | Err(err) => Err(err), |
322 | }; |
323 | |
324 | // Wake up ready tasks. |
325 | log::trace!("react: {} ready wakers" , wakers.len()); |
326 | for waker in wakers { |
327 | // Don't let a panicking waker blow everything up. |
328 | panic::catch_unwind(|| waker.wake()).ok(); |
329 | } |
330 | |
331 | res |
332 | } |
333 | } |
334 | |
335 | /// A single timer operation. |
336 | enum TimerOp { |
337 | Insert(Instant, usize, Waker), |
338 | Remove(Instant, usize), |
339 | } |
340 | |
341 | /// A registered source of I/O events. |
342 | #[derive (Debug)] |
343 | pub(crate) struct Source { |
344 | /// Raw file descriptor on Unix platforms. |
345 | #[cfg (unix)] |
346 | pub(crate) raw: RawFd, |
347 | |
348 | /// Raw socket handle on Windows. |
349 | #[cfg (windows)] |
350 | pub(crate) raw: RawSocket, |
351 | |
352 | /// The key of this source obtained during registration. |
353 | key: usize, |
354 | |
355 | /// Inner state with registered wakers. |
356 | state: Mutex<[Direction; 2]>, |
357 | } |
358 | |
359 | /// A read or write direction. |
360 | #[derive (Debug, Default)] |
361 | struct Direction { |
362 | /// Last reactor tick that delivered an event. |
363 | tick: usize, |
364 | |
365 | /// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`. |
366 | ticks: Option<(usize, usize)>, |
367 | |
368 | /// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`. |
369 | waker: Option<Waker>, |
370 | |
371 | /// Wakers of tasks waiting for the next event. |
372 | /// |
373 | /// Registered by `Async::readable()` and `Async::writable()`. |
374 | wakers: Slab<Option<Waker>>, |
375 | } |
376 | |
377 | impl Direction { |
378 | /// Returns `true` if there are no wakers interested in this direction. |
379 | fn is_empty(&self) -> bool { |
380 | self.waker.is_none() && self.wakers.iter().all(|(_, opt: &Option)| opt.is_none()) |
381 | } |
382 | |
383 | /// Moves all wakers into a `Vec`. |
384 | fn drain_into(&mut self, dst: &mut Vec<Waker>) { |
385 | if let Some(w: Waker) = self.waker.take() { |
386 | dst.push(w); |
387 | } |
388 | for (_, opt: &mut Option) in self.wakers.iter_mut() { |
389 | if let Some(w: Waker) = opt.take() { |
390 | dst.push(w); |
391 | } |
392 | } |
393 | } |
394 | } |
395 | |
396 | impl Source { |
397 | /// Polls the I/O source for readability. |
398 | pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
399 | self.poll_ready(READ, cx) |
400 | } |
401 | |
402 | /// Polls the I/O source for writability. |
403 | pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
404 | self.poll_ready(WRITE, cx) |
405 | } |
406 | |
407 | /// Registers a waker from `poll_readable()` or `poll_writable()`. |
408 | /// |
409 | /// If a different waker is already registered, it gets replaced and woken. |
410 | fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
411 | let mut state = self.state.lock().unwrap(); |
412 | |
413 | // Check if the reactor has delivered an event. |
414 | if let Some((a, b)) = state[dir].ticks { |
415 | // If `state[dir].tick` has changed to a value other than the old reactor tick, |
416 | // that means a newer reactor tick has delivered an event. |
417 | if state[dir].tick != a && state[dir].tick != b { |
418 | state[dir].ticks = None; |
419 | return Poll::Ready(Ok(())); |
420 | } |
421 | } |
422 | |
423 | let was_empty = state[dir].is_empty(); |
424 | |
425 | // Register the current task's waker. |
426 | if let Some(w) = state[dir].waker.take() { |
427 | if w.will_wake(cx.waker()) { |
428 | state[dir].waker = Some(w); |
429 | return Poll::Pending; |
430 | } |
431 | // Wake the previous waker because it's going to get replaced. |
432 | panic::catch_unwind(|| w.wake()).ok(); |
433 | } |
434 | state[dir].waker = Some(cx.waker().clone()); |
435 | state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick)); |
436 | |
437 | // Update interest in this I/O handle. |
438 | if was_empty { |
439 | Reactor::get().poller.modify( |
440 | self.raw, |
441 | Event { |
442 | key: self.key, |
443 | readable: !state[READ].is_empty(), |
444 | writable: !state[WRITE].is_empty(), |
445 | }, |
446 | )?; |
447 | } |
448 | |
449 | Poll::Pending |
450 | } |
451 | |
452 | /// Waits until the I/O source is readable. |
453 | pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> { |
454 | Readable(Self::ready(handle, READ)) |
455 | } |
456 | |
457 | /// Waits until the I/O source is readable. |
458 | pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> { |
459 | ReadableOwned(Self::ready(handle, READ)) |
460 | } |
461 | |
462 | /// Waits until the I/O source is writable. |
463 | pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> { |
464 | Writable(Self::ready(handle, WRITE)) |
465 | } |
466 | |
467 | /// Waits until the I/O source is writable. |
468 | pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> { |
469 | WritableOwned(Self::ready(handle, WRITE)) |
470 | } |
471 | |
472 | /// Waits until the I/O source is readable or writable. |
473 | fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> { |
474 | Ready { |
475 | handle, |
476 | dir, |
477 | ticks: None, |
478 | index: None, |
479 | _capture: PhantomData, |
480 | } |
481 | } |
482 | } |
483 | |
484 | /// Future for [`Async::readable`](crate::Async::readable). |
485 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
486 | pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>); |
487 | |
488 | impl<T> Future for Readable<'_, T> { |
489 | type Output = io::Result<()>; |
490 | |
491 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
492 | ready!(Pin::new(&mut self.0).poll(cx))?; |
493 | log::trace!("readable: fd= {}" , self.0.handle.source.raw); |
494 | Poll::Ready(Ok(())) |
495 | } |
496 | } |
497 | |
498 | impl<T> fmt::Debug for Readable<'_, T> { |
499 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
500 | f.debug_struct(name:"Readable" ).finish() |
501 | } |
502 | } |
503 | |
504 | /// Future for [`Async::readable_owned`](crate::Async::readable_owned). |
505 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
506 | pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>); |
507 | |
508 | impl<T> Future for ReadableOwned<T> { |
509 | type Output = io::Result<()>; |
510 | |
511 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
512 | ready!(Pin::new(&mut self.0).poll(cx))?; |
513 | log::trace!("readable_owned: fd= {}" , self.0.handle.source.raw); |
514 | Poll::Ready(Ok(())) |
515 | } |
516 | } |
517 | |
518 | impl<T> fmt::Debug for ReadableOwned<T> { |
519 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
520 | f.debug_struct(name:"ReadableOwned" ).finish() |
521 | } |
522 | } |
523 | |
524 | /// Future for [`Async::writable`](crate::Async::writable). |
525 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
526 | pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>); |
527 | |
528 | impl<T> Future for Writable<'_, T> { |
529 | type Output = io::Result<()>; |
530 | |
531 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
532 | ready!(Pin::new(&mut self.0).poll(cx))?; |
533 | log::trace!("writable: fd= {}" , self.0.handle.source.raw); |
534 | Poll::Ready(Ok(())) |
535 | } |
536 | } |
537 | |
538 | impl<T> fmt::Debug for Writable<'_, T> { |
539 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
540 | f.debug_struct(name:"Writable" ).finish() |
541 | } |
542 | } |
543 | |
544 | /// Future for [`Async::writable_owned`](crate::Async::writable_owned). |
545 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
546 | pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>); |
547 | |
548 | impl<T> Future for WritableOwned<T> { |
549 | type Output = io::Result<()>; |
550 | |
551 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
552 | ready!(Pin::new(&mut self.0).poll(cx))?; |
553 | log::trace!("writable_owned: fd= {}" , self.0.handle.source.raw); |
554 | Poll::Ready(Ok(())) |
555 | } |
556 | } |
557 | |
558 | impl<T> fmt::Debug for WritableOwned<T> { |
559 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
560 | f.debug_struct(name:"WritableOwned" ).finish() |
561 | } |
562 | } |
563 | |
564 | struct Ready<H: Borrow<crate::Async<T>>, T> { |
565 | handle: H, |
566 | dir: usize, |
567 | ticks: Option<(usize, usize)>, |
568 | index: Option<usize>, |
569 | _capture: PhantomData<fn() -> T>, |
570 | } |
571 | |
572 | impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {} |
573 | |
574 | impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> { |
575 | type Output = io::Result<()>; |
576 | |
577 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
578 | let Self { |
579 | ref handle, |
580 | dir, |
581 | ticks, |
582 | index, |
583 | .. |
584 | } = &mut *self; |
585 | |
586 | let mut state = handle.borrow().source.state.lock().unwrap(); |
587 | |
588 | // Check if the reactor has delivered an event. |
589 | if let Some((a, b)) = *ticks { |
590 | // If `state[dir].tick` has changed to a value other than the old reactor tick, |
591 | // that means a newer reactor tick has delivered an event. |
592 | if state[*dir].tick != a && state[*dir].tick != b { |
593 | return Poll::Ready(Ok(())); |
594 | } |
595 | } |
596 | |
597 | let was_empty = state[*dir].is_empty(); |
598 | |
599 | // Register the current task's waker. |
600 | let i = match *index { |
601 | Some(i) => i, |
602 | None => { |
603 | let i = state[*dir].wakers.insert(None); |
604 | *index = Some(i); |
605 | *ticks = Some((Reactor::get().ticker(), state[*dir].tick)); |
606 | i |
607 | } |
608 | }; |
609 | state[*dir].wakers[i] = Some(cx.waker().clone()); |
610 | |
611 | // Update interest in this I/O handle. |
612 | if was_empty { |
613 | Reactor::get().poller.modify( |
614 | handle.borrow().source.raw, |
615 | Event { |
616 | key: handle.borrow().source.key, |
617 | readable: !state[READ].is_empty(), |
618 | writable: !state[WRITE].is_empty(), |
619 | }, |
620 | )?; |
621 | } |
622 | |
623 | Poll::Pending |
624 | } |
625 | } |
626 | |
627 | impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> { |
628 | fn drop(&mut self) { |
629 | // Remove our waker when dropped. |
630 | if let Some(key: usize) = self.index { |
631 | let mut state = self.handle.borrow().source.state.lock().unwrap(); |
632 | let wakers: &mut {unknown} = &mut state[self.dir].wakers; |
633 | if wakers.contains(key) { |
634 | wakers.remove(key); |
635 | } |
636 | } |
637 | } |
638 | } |
639 | |