1use std::cell::{Cell, RefCell};
2use std::fmt::Debug;
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use std::{io, slice};
8
9#[cfg(feature = "block_on")]
10use std::future::Future;
11
12#[cfg(unix)]
13use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
14#[cfg(windows)]
15use std::os::windows::io::{AsHandle, AsRawHandle, AsSocket as AsFd, BorrowedHandle, RawHandle};
16
17use log::trace;
18use polling::Poller;
19
20use crate::list::{SourceEntry, SourceList};
21use crate::sources::{Dispatcher, EventSource, Idle, IdleDispatcher};
22use crate::sys::{Notifier, PollEvent};
23use crate::token::TokenInner;
24use crate::{
25 AdditionalLifecycleEventsSet, InsertError, Poll, PostAction, Readiness, Token, TokenFactory,
26};
27
28type IdleCallback<'i, Data> = Rc<RefCell<dyn IdleDispatcher<Data> + 'i>>;
29
30/// A token representing a registration in the [`EventLoop`].
31///
32/// This token is given to you by the [`EventLoop`] when an [`EventSource`] is inserted or
33/// a [`Dispatcher`] is registered. You can use it to [disable](LoopHandle#method.disable),
34/// [enable](LoopHandle#method.enable), [update`](LoopHandle#method.update),
35/// [remove](LoopHandle#method.remove) or [kill](LoopHandle#method.kill) it.
36#[derive(Clone, Copy, Debug, PartialEq, Eq)]
37pub struct RegistrationToken {
38 inner: TokenInner,
39}
40
41impl RegistrationToken {
42 /// Create the RegistrationToken corresponding to the given raw key
43 /// This is needed because some methods use `RegistrationToken`s as
44 /// raw usizes within this crate
45 pub(crate) fn new(inner: TokenInner) -> Self {
46 Self { inner }
47 }
48}
49
50pub(crate) struct LoopInner<'l, Data> {
51 pub(crate) poll: RefCell<Poll>,
52 // The `Option` is used to keep slots of the slab occipied, to prevent id reuse
53 // while in-flight events might still referr to a recently destroyed event source.
54 pub(crate) sources: RefCell<SourceList<'l, Data>>,
55 pub(crate) sources_with_additional_lifecycle_events: RefCell<AdditionalLifecycleEventsSet>,
56 idles: RefCell<Vec<IdleCallback<'l, Data>>>,
57 pending_action: Cell<PostAction>,
58}
59
60/// An handle to an event loop
61///
62/// This handle allows you to insert new sources and idles in this event loop,
63/// it can be cloned, and it is possible to insert new sources from within a source
64/// callback.
65pub struct LoopHandle<'l, Data> {
66 inner: Rc<LoopInner<'l, Data>>,
67}
68
69impl<'l, Data> std::fmt::Debug for LoopHandle<'l, Data> {
70 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.write_str(data:"LoopHandle { ... }")
73 }
74}
75
76impl<'l, Data> Clone for LoopHandle<'l, Data> {
77 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
78 fn clone(&self) -> Self {
79 LoopHandle {
80 inner: self.inner.clone(),
81 }
82 }
83}
84
85impl<'l, Data> LoopHandle<'l, Data> {
86 /// Inserts a new event source in the loop.
87 ///
88 /// The provided callback will be called during the dispatching cycles whenever the
89 /// associated source generates events, see `EventLoop::dispatch(..)` for details.
90 ///
91 /// This function takes ownership of the event source. Use `register_dispatcher`
92 /// if you need access to the event source after this call.
93 pub fn insert_source<S, F>(
94 &self,
95 source: S,
96 callback: F,
97 ) -> Result<RegistrationToken, InsertError<S>>
98 where
99 S: EventSource + 'l,
100 F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret + 'l,
101 {
102 let dispatcher = Dispatcher::new(source, callback);
103 self.register_dispatcher(dispatcher.clone())
104 .map_err(|error| InsertError {
105 error,
106 inserted: dispatcher.into_source_inner(),
107 })
108 }
109
110 /// Registers a `Dispatcher` in the loop.
111 ///
112 /// Use this function if you need access to the event source after its insertion in the loop.
113 ///
114 /// See also `insert_source`.
115 #[cfg_attr(feature = "nightly_coverage", coverage(off))] // Contains a branch we can't hit w/o OOM
116 pub fn register_dispatcher<S>(
117 &self,
118 dispatcher: Dispatcher<'l, S, Data>,
119 ) -> crate::Result<RegistrationToken>
120 where
121 S: EventSource + 'l,
122 {
123 let mut sources = self.inner.sources.borrow_mut();
124 let mut poll = self.inner.poll.borrow_mut();
125
126 // Find an empty slot if any
127 let slot = sources.vacant_entry();
128
129 slot.source = Some(dispatcher.clone_as_event_dispatcher());
130 trace!("[calloop] Inserting new source #{}", slot.token.get_id());
131 let ret = slot.source.as_ref().unwrap().register(
132 &mut poll,
133 &mut self
134 .inner
135 .sources_with_additional_lifecycle_events
136 .borrow_mut(),
137 &mut TokenFactory::new(slot.token),
138 );
139
140 if let Err(error) = ret {
141 slot.source = None;
142 return Err(error);
143 }
144
145 Ok(RegistrationToken { inner: slot.token })
146 }
147
148 /// Inserts an idle callback.
149 ///
150 /// This callback will be called during a dispatching cycle when the event loop has
151 /// finished processing all pending events from the sources and becomes idle.
152 pub fn insert_idle<'i, F: FnOnce(&mut Data) + 'l + 'i>(&self, callback: F) -> Idle<'i> {
153 let mut opt_cb = Some(callback);
154 let callback = Rc::new(RefCell::new(Some(move |data: &mut Data| {
155 if let Some(cb) = opt_cb.take() {
156 cb(data);
157 }
158 })));
159 self.inner.idles.borrow_mut().push(callback.clone());
160 Idle { callback }
161 }
162
163 /// Enables this previously disabled event source.
164 ///
165 /// This previously disabled source will start generating events again.
166 ///
167 /// **Note:** this cannot be done from within the source callback.
168 pub fn enable(&self, token: &RegistrationToken) -> crate::Result<()> {
169 if let &SourceEntry {
170 token: entry_token,
171 source: Some(ref source),
172 } = self.inner.sources.borrow().get(token.inner)?
173 {
174 trace!("[calloop] Registering source #{}", entry_token.get_id());
175 source.register(
176 &mut self.inner.poll.borrow_mut(),
177 &mut self
178 .inner
179 .sources_with_additional_lifecycle_events
180 .borrow_mut(),
181 &mut TokenFactory::new(entry_token),
182 )
183 } else {
184 Err(crate::Error::InvalidToken)
185 }
186 }
187
188 /// Makes this source update its registration.
189 ///
190 /// If after accessing the source you changed its parameters in a way that requires
191 /// updating its registration.
192 pub fn update(&self, token: &RegistrationToken) -> crate::Result<()> {
193 if let &SourceEntry {
194 token: entry_token,
195 source: Some(ref source),
196 } = self.inner.sources.borrow().get(token.inner)?
197 {
198 trace!(
199 "[calloop] Updating registration of source #{}",
200 entry_token.get_id()
201 );
202 if !source.reregister(
203 &mut self.inner.poll.borrow_mut(),
204 &mut self
205 .inner
206 .sources_with_additional_lifecycle_events
207 .borrow_mut(),
208 &mut TokenFactory::new(entry_token),
209 )? {
210 trace!("[calloop] Cannot do it now, storing for later.");
211 // we are in a callback, store for later processing
212 self.inner.pending_action.set(PostAction::Reregister);
213 }
214 Ok(())
215 } else {
216 Err(crate::Error::InvalidToken)
217 }
218 }
219
220 /// Disables this event source.
221 ///
222 /// The source remains in the event loop, but it'll no longer generate events
223 pub fn disable(&self, token: &RegistrationToken) -> crate::Result<()> {
224 if let &SourceEntry {
225 token: entry_token,
226 source: Some(ref source),
227 } = self.inner.sources.borrow().get(token.inner)?
228 {
229 if !token.inner.same_source_as(entry_token) {
230 // The token provided by the user is no longer valid
231 return Err(crate::Error::InvalidToken);
232 }
233 trace!("[calloop] Unregistering source #{}", entry_token.get_id());
234 if !source.unregister(
235 &mut self.inner.poll.borrow_mut(),
236 &mut self
237 .inner
238 .sources_with_additional_lifecycle_events
239 .borrow_mut(),
240 *token,
241 )? {
242 trace!("[calloop] Cannot do it now, storing for later.");
243 // we are in a callback, store for later processing
244 self.inner.pending_action.set(PostAction::Disable);
245 }
246 Ok(())
247 } else {
248 Err(crate::Error::InvalidToken)
249 }
250 }
251
252 /// Removes this source from the event loop.
253 pub fn remove(&self, token: RegistrationToken) {
254 if let Ok(&mut SourceEntry {
255 token: entry_token,
256 ref mut source,
257 }) = self.inner.sources.borrow_mut().get_mut(token.inner)
258 {
259 if let Some(source) = source.take() {
260 trace!("[calloop] Removing source #{}", entry_token.get_id());
261 if let Err(e) = source.unregister(
262 &mut self.inner.poll.borrow_mut(),
263 &mut self
264 .inner
265 .sources_with_additional_lifecycle_events
266 .borrow_mut(),
267 token,
268 ) {
269 log::warn!(
270 "[calloop] Failed to unregister source from the polling system: {:?}",
271 e
272 );
273 }
274 }
275 }
276 }
277
278 /// Wrap an IO object into an async adapter
279 ///
280 /// This adapter turns the IO object into an async-aware one that can be used in futures.
281 /// The readiness of these futures will be driven by the event loop.
282 ///
283 /// The produced futures can be polled in any executor, and notably the one provided by
284 /// calloop.
285 pub fn adapt_io<F: AsFd>(&self, fd: F) -> crate::Result<crate::io::Async<'l, F>> {
286 crate::io::Async::new(self.inner.clone(), fd)
287 }
288}
289
290/// An event loop
291///
292/// This loop can host several event sources, that can be dynamically added or removed.
293pub struct EventLoop<'l, Data> {
294 #[allow(dead_code)]
295 poller: Arc<Poller>,
296 handle: LoopHandle<'l, Data>,
297 signals: Arc<Signals>,
298 // A caching vector for synthetic poll events
299 synthetic_events: Vec<PollEvent>,
300}
301
302impl<'l, Data> std::fmt::Debug for EventLoop<'l, Data> {
303 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
304 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305 f.write_str(data:"EventLoop { ... }")
306 }
307}
308
309/// Signals related to the event loop.
310struct Signals {
311 /// Signal to stop the event loop.
312 stop: AtomicBool,
313
314 /// Signal that the future is ready.
315 #[cfg(feature = "block_on")]
316 future_ready: AtomicBool,
317}
318
319impl<'l, Data> EventLoop<'l, Data> {
320 /// Create a new event loop
321 ///
322 /// Fails if the initialization of the polling system failed.
323 pub fn try_new() -> crate::Result<Self> {
324 let poll = Poll::new()?;
325 let poller = poll.poller.clone();
326 let handle = LoopHandle {
327 inner: Rc::new(LoopInner {
328 poll: RefCell::new(poll),
329 sources: RefCell::new(SourceList::new()),
330 idles: RefCell::new(Vec::new()),
331 pending_action: Cell::new(PostAction::Continue),
332 sources_with_additional_lifecycle_events: Default::default(),
333 }),
334 };
335
336 Ok(EventLoop {
337 handle,
338 signals: Arc::new(Signals {
339 stop: AtomicBool::new(false),
340 #[cfg(feature = "block_on")]
341 future_ready: AtomicBool::new(false),
342 }),
343 poller,
344 synthetic_events: vec![],
345 })
346 }
347
348 /// Retrieve a loop handle
349 pub fn handle(&self) -> LoopHandle<'l, Data> {
350 self.handle.clone()
351 }
352
353 fn dispatch_events(
354 &mut self,
355 mut timeout: Option<Duration>,
356 data: &mut Data,
357 ) -> crate::Result<()> {
358 let now = Instant::now();
359 {
360 let mut extra_lifecycle_sources = self
361 .handle
362 .inner
363 .sources_with_additional_lifecycle_events
364 .borrow_mut();
365 let sources = &self.handle.inner.sources.borrow();
366 for source in &mut *extra_lifecycle_sources.values {
367 if let Ok(SourceEntry {
368 source: Some(disp), ..
369 }) = sources.get(source.inner)
370 {
371 if let Some((readiness, token)) = disp.before_sleep()? {
372 // Wake up instantly after polling if we recieved an event
373 timeout = Some(Duration::ZERO);
374 self.synthetic_events.push(PollEvent { readiness, token });
375 }
376 } else {
377 unreachable!()
378 }
379 }
380 }
381 let events = {
382 let poll = self.handle.inner.poll.borrow();
383 loop {
384 let result = poll.poll(timeout);
385
386 match result {
387 Ok(events) => break events,
388 Err(crate::Error::IoError(err)) if err.kind() == io::ErrorKind::Interrupted => {
389 // Interrupted by a signal. Update timeout and retry.
390 if let Some(to) = timeout {
391 let elapsed = now.elapsed();
392 if elapsed >= to {
393 return Ok(());
394 } else {
395 timeout = Some(to - elapsed);
396 }
397 }
398 }
399 Err(err) => return Err(err),
400 };
401 }
402 };
403 {
404 let mut extra_lifecycle_sources = self
405 .handle
406 .inner
407 .sources_with_additional_lifecycle_events
408 .borrow_mut();
409 if !extra_lifecycle_sources.values.is_empty() {
410 for source in &mut *extra_lifecycle_sources.values {
411 if let Ok(SourceEntry {
412 source: Some(disp), ..
413 }) = self.handle.inner.sources.borrow().get(source.inner)
414 {
415 let iter = EventIterator {
416 inner: events.iter(),
417 registration_token: *source,
418 };
419 disp.before_handle_events(iter);
420 } else {
421 unreachable!()
422 }
423 }
424 }
425 }
426
427 for event in self.synthetic_events.drain(..).chain(events) {
428 // Get the registration token associated with the event.
429 let reg_token = event.token.inner.forget_sub_id();
430
431 let opt_disp = self
432 .handle
433 .inner
434 .sources
435 .borrow()
436 .get(reg_token)
437 .ok()
438 .and_then(|entry| entry.source.clone());
439
440 if let Some(disp) = opt_disp {
441 trace!(
442 "[calloop] Dispatching events for source #{}",
443 reg_token.get_id()
444 );
445 let mut ret = disp.process_events(event.readiness, event.token, data)?;
446
447 // if the returned PostAction is Continue, it may be overwritten by an user-specified pending action
448 let pending_action = self
449 .handle
450 .inner
451 .pending_action
452 .replace(PostAction::Continue);
453 if let PostAction::Continue = ret {
454 ret = pending_action;
455 }
456
457 match ret {
458 PostAction::Reregister => {
459 trace!(
460 "[calloop] Postaction reregister for source #{}",
461 reg_token.get_id()
462 );
463 disp.reregister(
464 &mut self.handle.inner.poll.borrow_mut(),
465 &mut self
466 .handle
467 .inner
468 .sources_with_additional_lifecycle_events
469 .borrow_mut(),
470 &mut TokenFactory::new(reg_token),
471 )?;
472 }
473 PostAction::Disable => {
474 trace!(
475 "[calloop] Postaction unregister for source #{}",
476 reg_token.get_id()
477 );
478 disp.unregister(
479 &mut self.handle.inner.poll.borrow_mut(),
480 &mut self
481 .handle
482 .inner
483 .sources_with_additional_lifecycle_events
484 .borrow_mut(),
485 RegistrationToken::new(reg_token),
486 )?;
487 }
488 PostAction::Remove => {
489 trace!(
490 "[calloop] Postaction remove for source #{}",
491 reg_token.get_id()
492 );
493 if let Ok(entry) = self.handle.inner.sources.borrow_mut().get_mut(reg_token)
494 {
495 entry.source = None;
496 }
497 }
498 PostAction::Continue => {}
499 }
500
501 if self
502 .handle
503 .inner
504 .sources
505 .borrow()
506 .get(reg_token)
507 .ok()
508 .map(|entry| entry.source.is_none())
509 .unwrap_or(true)
510 {
511 // the source has been removed from within its callback, unregister it
512 let mut poll = self.handle.inner.poll.borrow_mut();
513 if let Err(e) = disp.unregister(
514 &mut poll,
515 &mut self
516 .handle
517 .inner
518 .sources_with_additional_lifecycle_events
519 .borrow_mut(),
520 RegistrationToken::new(reg_token),
521 ) {
522 log::warn!(
523 "[calloop] Failed to unregister source from the polling system: {:?}",
524 e
525 );
526 }
527 }
528 } else {
529 log::warn!(
530 "[calloop] Received an event for non-existence source: {:?}",
531 reg_token
532 );
533 }
534 }
535
536 Ok(())
537 }
538
539 fn dispatch_idles(&mut self, data: &mut Data) {
540 let idles = std::mem::take(&mut *self.handle.inner.idles.borrow_mut());
541 for idle in idles {
542 idle.borrow_mut().dispatch(data);
543 }
544 }
545
546 /// Dispatch pending events to their callbacks
547 ///
548 /// If some sources have events available, their callbacks will be immediatly called.
549 /// Otherwise this will wait until an event is receive or the provided `timeout`
550 /// is reached. If `timeout` is `None`, it will wait without a duration limit.
551 ///
552 /// Once pending events have been processed or the timeout is reached, all pending
553 /// idle callbacks will be fired before this method returns.
554 pub fn dispatch<D: Into<Option<Duration>>>(
555 &mut self,
556 timeout: D,
557 data: &mut Data,
558 ) -> crate::Result<()> {
559 self.dispatch_events(timeout.into(), data)?;
560 self.dispatch_idles(data);
561
562 Ok(())
563 }
564
565 /// Get a signal to stop this event loop from running
566 ///
567 /// To be used in conjunction with the `run()` method.
568 pub fn get_signal(&self) -> LoopSignal {
569 LoopSignal {
570 signal: self.signals.clone(),
571 notifier: self.handle.inner.poll.borrow().notifier(),
572 }
573 }
574
575 /// Run this event loop
576 ///
577 /// This will repeatedly try to dispatch events (see the `dispatch()` method) on
578 /// this event loop, waiting at most `timeout` every time.
579 ///
580 /// Between each dispatch wait, your provided callback will be called.
581 ///
582 /// You can use the `get_signal()` method to retrieve a way to stop or wakeup
583 /// the event loop from anywhere.
584 pub fn run<F, D: Into<Option<Duration>>>(
585 &mut self,
586 timeout: D,
587 data: &mut Data,
588 mut cb: F,
589 ) -> crate::Result<()>
590 where
591 F: FnMut(&mut Data),
592 {
593 let timeout = timeout.into();
594 self.signals.stop.store(false, Ordering::Release);
595 while !self.signals.stop.load(Ordering::Acquire) {
596 self.dispatch(timeout, data)?;
597 cb(data);
598 }
599 Ok(())
600 }
601
602 /// Block a future on this event loop.
603 ///
604 /// This will run the provided future on this event loop, blocking until it is
605 /// resolved.
606 ///
607 /// If [`LoopSignal::stop()`] is called before the future is resolved, this function returns
608 /// `None`.
609 #[cfg(feature = "block_on")]
610 pub fn block_on<R>(
611 &mut self,
612 future: impl Future<Output = R>,
613 data: &mut Data,
614 mut cb: impl FnMut(&mut Data),
615 ) -> crate::Result<Option<R>> {
616 use std::task::{Context, Poll, Wake, Waker};
617
618 /// A waker that will wake up the event loop when it is ready to make progress.
619 struct EventLoopWaker(LoopSignal);
620
621 impl Wake for EventLoopWaker {
622 fn wake(self: Arc<Self>) {
623 // Set the waker.
624 self.0.signal.future_ready.store(true, Ordering::Release);
625 self.0.notifier.notify().ok();
626 }
627
628 fn wake_by_ref(self: &Arc<Self>) {
629 // Set the waker.
630 self.0.signal.future_ready.store(true, Ordering::Release);
631 self.0.notifier.notify().ok();
632 }
633 }
634
635 // Pin the future to the stack.
636 pin_utils::pin_mut!(future);
637
638 // Create a waker that will wake up the event loop when it is ready to make progress.
639 let waker = {
640 let handle = EventLoopWaker(self.get_signal());
641
642 Waker::from(Arc::new(handle))
643 };
644 let mut context = Context::from_waker(&waker);
645
646 // Begin running the loop.
647 let mut output = None;
648
649 self.signals.stop.store(false, Ordering::Release);
650 self.signals.future_ready.store(true, Ordering::Release);
651
652 while !self.signals.stop.load(Ordering::Acquire) {
653 // If the future is ready to be polled, poll it.
654 if self.signals.future_ready.swap(false, Ordering::AcqRel) {
655 // Poll the future and break the loop if it's ready.
656 if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
657 output = Some(result);
658 break;
659 }
660 }
661
662 // Otherwise, block on the event loop.
663 self.dispatch_events(None, data)?;
664 self.dispatch_idles(data);
665 cb(data);
666 }
667
668 Ok(output)
669 }
670}
671
672#[cfg(unix)]
673impl<'l, Data> AsRawFd for EventLoop<'l, Data> {
674 /// Get the underlying raw-fd of the poller.
675 ///
676 /// This could be used to create [`Generic`] source out of the current loop
677 /// and inserting into some other [`EventLoop`]. It's recommended to clone `fd`
678 /// before doing so.
679 ///
680 /// [`Generic`]: crate::generic::Generic
681 fn as_raw_fd(&self) -> RawFd {
682 self.poller.as_raw_fd()
683 }
684}
685
686#[cfg(unix)]
687impl<'l, Data> AsFd for EventLoop<'l, Data> {
688 /// Get the underlying fd of the poller.
689 ///
690 /// This could be used to create [`Generic`] source out of the current loop
691 /// and inserting into some other [`EventLoop`].
692 ///
693 /// [`Generic`]: crate::generic::Generic
694 fn as_fd(&self) -> BorrowedFd<'_> {
695 self.poller.as_fd()
696 }
697}
698
699#[cfg(windows)]
700impl<Data> AsRawHandle for EventLoop<'_, Data> {
701 fn as_raw_handle(&self) -> RawHandle {
702 self.poller.as_raw_handle()
703 }
704}
705
706#[cfg(windows)]
707impl<Data> AsHandle for EventLoop<'_, Data> {
708 fn as_handle(&self) -> BorrowedHandle<'_> {
709 self.poller.as_handle()
710 }
711}
712
713#[derive(Clone, Debug)]
714/// The EventIterator is an `Iterator` over the events relevant to a particular source
715/// This type is used in the [`EventSource::before_handle_events`] methods for
716/// two main reasons:
717/// - To avoid dynamic dispatch overhead
718/// - Secondly, it is to allow this type to be `Clone`, which is not
719/// possible with dynamic dispatch
720pub struct EventIterator<'a> {
721 inner: slice::Iter<'a, PollEvent>,
722 registration_token: RegistrationToken,
723}
724
725impl<'a> Iterator for EventIterator<'a> {
726 type Item = (Readiness, Token);
727
728 fn next(&mut self) -> Option<Self::Item> {
729 for next: &PollEvent in self.inner.by_ref() {
730 if nextTokenInner
731 .token
732 .inner
733 .same_source_as(self.registration_token.inner)
734 {
735 return Some((next.readiness, next.token));
736 }
737 }
738 None
739 }
740}
741
742/// A signal that can be shared between thread to stop or wakeup a running
743/// event loop
744#[derive(Clone)]
745pub struct LoopSignal {
746 signal: Arc<Signals>,
747 notifier: Notifier,
748}
749
750impl std::fmt::Debug for LoopSignal {
751 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
752 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753 f.write_str(data:"LoopSignal { ... }")
754 }
755}
756
757impl LoopSignal {
758 /// Stop the event loop
759 ///
760 /// Once this method is called, the next time the event loop has finished
761 /// waiting for events, it will return rather than starting to wait again.
762 ///
763 /// This is only useful if you are using the `EventLoop::run()` method.
764 pub fn stop(&self) {
765 self.signal.stop.store(val:true, order:Ordering::Release);
766 }
767
768 /// Wake up the event loop
769 ///
770 /// This sends a dummy event to the event loop to simulate the reception
771 /// of an event, making the wait return early. Called after `stop()`, this
772 /// ensures the event loop will terminate quickly if you specified a long
773 /// timeout (or no timeout at all) to the `dispatch` or `run` method.
774 pub fn wakeup(&self) {
775 self.notifier.notify().ok();
776 }
777}
778
779#[cfg(test)]
780mod tests {
781 use std::{cell::Cell, rc::Rc, time::Duration};
782
783 use crate::{
784 channel::{channel, Channel},
785 ping::*,
786 EventIterator, EventSource, Poll, PostAction, Readiness, RegistrationToken, Token,
787 TokenFactory,
788 };
789
790 #[cfg(unix)]
791 use crate::{generic::Generic, Dispatcher, Interest, Mode};
792
793 use super::EventLoop;
794
795 #[test]
796 fn dispatch_idle() {
797 let mut event_loop = EventLoop::try_new().unwrap();
798
799 let mut dispatched = false;
800
801 event_loop.handle().insert_idle(|d| {
802 *d = true;
803 });
804
805 event_loop
806 .dispatch(Some(Duration::ZERO), &mut dispatched)
807 .unwrap();
808
809 assert!(dispatched);
810 }
811
812 #[test]
813 fn cancel_idle() {
814 let mut event_loop = EventLoop::try_new().unwrap();
815
816 let mut dispatched = false;
817
818 let handle = event_loop.handle();
819 let idle = handle.insert_idle(move |d| {
820 *d = true;
821 });
822
823 idle.cancel();
824
825 event_loop
826 .dispatch(Duration::ZERO, &mut dispatched)
827 .unwrap();
828
829 assert!(!dispatched);
830 }
831
832 #[test]
833 fn wakeup() {
834 let mut event_loop = EventLoop::try_new().unwrap();
835
836 let signal = event_loop.get_signal();
837
838 ::std::thread::spawn(move || {
839 ::std::thread::sleep(Duration::from_millis(500));
840 signal.wakeup();
841 });
842
843 // the test should return
844 event_loop.dispatch(None, &mut ()).unwrap();
845 }
846
847 #[test]
848 fn wakeup_stop() {
849 let mut event_loop = EventLoop::try_new().unwrap();
850
851 let signal = event_loop.get_signal();
852
853 ::std::thread::spawn(move || {
854 ::std::thread::sleep(Duration::from_millis(500));
855 signal.stop();
856 signal.wakeup();
857 });
858
859 // the test should return
860 event_loop.run(None, &mut (), |_| {}).unwrap();
861 }
862
863 #[test]
864 fn additional_events() {
865 let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
866 let mut lock = Lock {
867 lock: Rc::new((
868 // Whether the lock is locked
869 Cell::new(false),
870 // The total number of events processed in process_events
871 Cell::new(0),
872 // The total number of events processed in before_handle_events
873 // This is used to ensure that the count seen in before_handle_events is expected
874 Cell::new(0),
875 )),
876 };
877 let (sender, channel) = channel();
878 let token = event_loop
879 .handle()
880 .insert_source(
881 LockingSource {
882 channel,
883 lock: lock.clone(),
884 },
885 |_, _, lock| {
886 lock.lock();
887 lock.unlock();
888 },
889 )
890 .unwrap();
891 sender.send(()).unwrap();
892
893 event_loop.dispatch(None, &mut lock).unwrap();
894 // We should have been locked twice so far
895 assert_eq!(lock.lock.1.get(), 2);
896 // And we should have received one event
897 assert_eq!(lock.lock.2.get(), 1);
898 event_loop.handle().disable(&token).unwrap();
899 event_loop
900 .dispatch(Some(Duration::ZERO), &mut lock)
901 .unwrap();
902 assert_eq!(lock.lock.1.get(), 2);
903
904 event_loop.handle().enable(&token).unwrap();
905 event_loop
906 .dispatch(Some(Duration::ZERO), &mut lock)
907 .unwrap();
908 assert_eq!(lock.lock.1.get(), 3);
909 event_loop.handle().remove(token);
910 event_loop
911 .dispatch(Some(Duration::ZERO), &mut lock)
912 .unwrap();
913 assert_eq!(lock.lock.1.get(), 3);
914 assert_eq!(lock.lock.2.get(), 1);
915
916 #[derive(Clone)]
917 struct Lock {
918 lock: Rc<(Cell<bool>, Cell<u32>, Cell<u32>)>,
919 }
920 impl Lock {
921 fn lock(&self) {
922 if self.lock.0.get() {
923 panic!();
924 }
925 // Increase the count
926 self.lock.1.set(self.lock.1.get() + 1);
927 self.lock.0.set(true)
928 }
929 fn unlock(&self) {
930 if !self.lock.0.get() {
931 panic!();
932 }
933 self.lock.0.set(false);
934 }
935 }
936 struct LockingSource {
937 channel: Channel<()>,
938 lock: Lock,
939 }
940 impl EventSource for LockingSource {
941 type Event = <Channel<()> as EventSource>::Event;
942
943 type Metadata = <Channel<()> as EventSource>::Metadata;
944
945 type Ret = <Channel<()> as EventSource>::Ret;
946
947 type Error = <Channel<()> as EventSource>::Error;
948
949 fn process_events<F>(
950 &mut self,
951 readiness: Readiness,
952 token: Token,
953 callback: F,
954 ) -> Result<PostAction, Self::Error>
955 where
956 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
957 {
958 self.channel.process_events(readiness, token, callback)
959 }
960
961 fn register(
962 &mut self,
963 poll: &mut Poll,
964 token_factory: &mut TokenFactory,
965 ) -> crate::Result<()> {
966 self.channel.register(poll, token_factory)
967 }
968
969 fn reregister(
970 &mut self,
971 poll: &mut Poll,
972 token_factory: &mut TokenFactory,
973 ) -> crate::Result<()> {
974 self.channel.reregister(poll, token_factory)
975 }
976
977 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
978 self.channel.unregister(poll)
979 }
980
981 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
982
983 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
984 self.lock.lock();
985 Ok(None)
986 }
987
988 fn before_handle_events(&mut self, events: EventIterator) {
989 let events_count = events.count();
990 let lock = &self.lock.lock;
991 lock.2.set(lock.2.get() + events_count as u32);
992 self.lock.unlock();
993 }
994 }
995 }
996 #[test]
997 fn default_additional_events() {
998 let (sender, channel) = channel();
999 let mut test_source = NoopWithDefaultHandlers { channel };
1000 let mut event_loop = EventLoop::try_new().unwrap();
1001 event_loop
1002 .handle()
1003 .insert_source(Box::new(&mut test_source), |_, _, _| {})
1004 .unwrap();
1005 sender.send(()).unwrap();
1006
1007 event_loop.dispatch(None, &mut ()).unwrap();
1008 struct NoopWithDefaultHandlers {
1009 channel: Channel<()>,
1010 }
1011 impl EventSource for NoopWithDefaultHandlers {
1012 type Event = <Channel<()> as EventSource>::Event;
1013
1014 type Metadata = <Channel<()> as EventSource>::Metadata;
1015
1016 type Ret = <Channel<()> as EventSource>::Ret;
1017
1018 type Error = <Channel<()> as EventSource>::Error;
1019
1020 fn process_events<F>(
1021 &mut self,
1022 readiness: Readiness,
1023 token: Token,
1024 callback: F,
1025 ) -> Result<PostAction, Self::Error>
1026 where
1027 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1028 {
1029 self.channel.process_events(readiness, token, callback)
1030 }
1031
1032 fn register(
1033 &mut self,
1034 poll: &mut Poll,
1035 token_factory: &mut TokenFactory,
1036 ) -> crate::Result<()> {
1037 self.channel.register(poll, token_factory)
1038 }
1039
1040 fn reregister(
1041 &mut self,
1042 poll: &mut Poll,
1043 token_factory: &mut TokenFactory,
1044 ) -> crate::Result<()> {
1045 self.channel.reregister(poll, token_factory)
1046 }
1047
1048 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1049 self.channel.unregister(poll)
1050 }
1051
1052 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1053 }
1054 }
1055
1056 #[test]
1057 fn additional_events_synthetic() {
1058 let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
1059 let mut lock = Lock {
1060 lock: Rc::new(Cell::new(false)),
1061 };
1062 event_loop
1063 .handle()
1064 .insert_source(
1065 InstantWakeupLockingSource {
1066 lock: lock.clone(),
1067 token: None,
1068 },
1069 |_, _, lock| {
1070 lock.lock();
1071 lock.unlock();
1072 },
1073 )
1074 .unwrap();
1075
1076 // Loop should finish, as
1077 event_loop.dispatch(None, &mut lock).unwrap();
1078 #[derive(Clone)]
1079 struct Lock {
1080 lock: Rc<Cell<bool>>,
1081 }
1082 impl Lock {
1083 fn lock(&self) {
1084 if self.lock.get() {
1085 panic!();
1086 }
1087 self.lock.set(true)
1088 }
1089 fn unlock(&self) {
1090 if !self.lock.get() {
1091 panic!();
1092 }
1093 self.lock.set(false);
1094 }
1095 }
1096 struct InstantWakeupLockingSource {
1097 lock: Lock,
1098 token: Option<Token>,
1099 }
1100 impl EventSource for InstantWakeupLockingSource {
1101 type Event = ();
1102
1103 type Metadata = ();
1104
1105 type Ret = ();
1106
1107 type Error = <Channel<()> as EventSource>::Error;
1108
1109 fn process_events<F>(
1110 &mut self,
1111 _: Readiness,
1112 token: Token,
1113 mut callback: F,
1114 ) -> Result<PostAction, Self::Error>
1115 where
1116 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1117 {
1118 assert_eq!(token, self.token.unwrap());
1119 callback((), &mut ());
1120 Ok(PostAction::Continue)
1121 }
1122
1123 fn register(
1124 &mut self,
1125 _: &mut Poll,
1126 token_factory: &mut TokenFactory,
1127 ) -> crate::Result<()> {
1128 self.token = Some(token_factory.token());
1129 Ok(())
1130 }
1131
1132 fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1133 unreachable!()
1134 }
1135
1136 fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1137 unreachable!()
1138 }
1139
1140 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1141
1142 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1143 self.lock.lock();
1144 Ok(Some((Readiness::EMPTY, self.token.unwrap())))
1145 }
1146
1147 fn before_handle_events(&mut self, _: EventIterator) {
1148 self.lock.unlock();
1149 }
1150 }
1151 }
1152
1153 #[cfg(unix)]
1154 #[test]
1155 fn insert_bad_source() {
1156 use std::os::unix::io::FromRawFd;
1157
1158 let event_loop = EventLoop::<()>::try_new().unwrap();
1159 let fd = unsafe { std::os::unix::io::OwnedFd::from_raw_fd(420) };
1160 let ret = event_loop.handle().insert_source(
1161 crate::sources::generic::Generic::new(fd, Interest::READ, Mode::Level),
1162 |_, _, _| Ok(PostAction::Continue),
1163 );
1164 assert!(ret.is_err());
1165 }
1166
1167 #[test]
1168 fn invalid_token() {
1169 let (_ping, source) = crate::sources::ping::make_ping().unwrap();
1170
1171 let event_loop = EventLoop::<()>::try_new().unwrap();
1172 let handle = event_loop.handle();
1173 let reg_token = handle.insert_source(source, |_, _, _| {}).unwrap();
1174 handle.remove(reg_token);
1175
1176 let ret = handle.enable(&reg_token);
1177 assert!(ret.is_err());
1178 }
1179
1180 #[cfg(unix)]
1181 #[test]
1182 fn insert_source_no_interest() {
1183 use rustix::pipe::pipe;
1184
1185 // Create a pipe to get an arbitrary fd.
1186 let (read, _write) = pipe().unwrap();
1187
1188 let source = crate::sources::generic::Generic::new(read, Interest::EMPTY, Mode::Level);
1189 let dispatcher = Dispatcher::new(source, |_, _, _| Ok(PostAction::Continue));
1190
1191 let event_loop = EventLoop::<()>::try_new().unwrap();
1192 let handle = event_loop.handle();
1193 let ret = handle.register_dispatcher(dispatcher.clone());
1194
1195 if let Ok(token) = ret {
1196 // Unwrap the dispatcher+source and close the read end.
1197 handle.remove(token);
1198 } else {
1199 // Fail the test.
1200 panic!();
1201 }
1202 }
1203
1204 #[test]
1205 fn disarm_rearm() {
1206 let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1207 let (ping, ping_source) = make_ping().unwrap();
1208
1209 let ping_token = event_loop
1210 .handle()
1211 .insert_source(ping_source, |(), &mut (), dispatched| {
1212 *dispatched = true;
1213 })
1214 .unwrap();
1215
1216 ping.ping();
1217 let mut dispatched = false;
1218 event_loop
1219 .dispatch(Duration::ZERO, &mut dispatched)
1220 .unwrap();
1221 assert!(dispatched);
1222
1223 // disable the source
1224 ping.ping();
1225 event_loop.handle().disable(&ping_token).unwrap();
1226 let mut dispatched = false;
1227 event_loop
1228 .dispatch(Duration::ZERO, &mut dispatched)
1229 .unwrap();
1230 assert!(!dispatched);
1231
1232 // reenable it, the previous ping now gets dispatched
1233 event_loop.handle().enable(&ping_token).unwrap();
1234 let mut dispatched = false;
1235 event_loop
1236 .dispatch(Duration::ZERO, &mut dispatched)
1237 .unwrap();
1238 assert!(dispatched);
1239 }
1240
1241 #[test]
1242 fn multiple_tokens() {
1243 struct DoubleSource {
1244 ping1: PingSource,
1245 ping2: PingSource,
1246 }
1247
1248 impl crate::EventSource for DoubleSource {
1249 type Event = u32;
1250 type Metadata = ();
1251 type Ret = ();
1252 type Error = PingError;
1253
1254 fn process_events<F>(
1255 &mut self,
1256 readiness: Readiness,
1257 token: Token,
1258 mut callback: F,
1259 ) -> Result<PostAction, Self::Error>
1260 where
1261 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1262 {
1263 self.ping1
1264 .process_events(readiness, token, |(), &mut ()| callback(1, &mut ()))?;
1265 self.ping2
1266 .process_events(readiness, token, |(), &mut ()| callback(2, &mut ()))?;
1267 Ok(PostAction::Continue)
1268 }
1269
1270 fn register(
1271 &mut self,
1272 poll: &mut Poll,
1273 token_factory: &mut TokenFactory,
1274 ) -> crate::Result<()> {
1275 self.ping1.register(poll, token_factory)?;
1276 self.ping2.register(poll, token_factory)?;
1277 Ok(())
1278 }
1279
1280 fn reregister(
1281 &mut self,
1282 poll: &mut Poll,
1283 token_factory: &mut TokenFactory,
1284 ) -> crate::Result<()> {
1285 self.ping1.reregister(poll, token_factory)?;
1286 self.ping2.reregister(poll, token_factory)?;
1287 Ok(())
1288 }
1289
1290 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1291 self.ping1.unregister(poll)?;
1292 self.ping2.unregister(poll)?;
1293 Ok(())
1294 }
1295 }
1296
1297 let mut event_loop = EventLoop::<u32>::try_new().unwrap();
1298
1299 let (ping1, source1) = make_ping().unwrap();
1300 let (ping2, source2) = make_ping().unwrap();
1301
1302 let source = DoubleSource {
1303 ping1: source1,
1304 ping2: source2,
1305 };
1306
1307 event_loop
1308 .handle()
1309 .insert_source(source, |i, _, d| {
1310 eprintln!("Dispatching {}", i);
1311 *d += i
1312 })
1313 .unwrap();
1314
1315 let mut dispatched = 0;
1316 ping1.ping();
1317 event_loop
1318 .dispatch(Duration::ZERO, &mut dispatched)
1319 .unwrap();
1320 assert_eq!(dispatched, 1);
1321
1322 dispatched = 0;
1323 ping2.ping();
1324 event_loop
1325 .dispatch(Duration::ZERO, &mut dispatched)
1326 .unwrap();
1327 assert_eq!(dispatched, 2);
1328
1329 dispatched = 0;
1330 ping1.ping();
1331 ping2.ping();
1332 event_loop
1333 .dispatch(Duration::ZERO, &mut dispatched)
1334 .unwrap();
1335 assert_eq!(dispatched, 3);
1336 }
1337
1338 #[cfg(unix)]
1339 #[test]
1340 fn change_interests() {
1341 use rustix::io::write;
1342 use rustix::net::{recv, socketpair, AddressFamily, RecvFlags, SocketFlags, SocketType};
1343 let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1344
1345 let (sock1, sock2) = socketpair(
1346 AddressFamily::UNIX,
1347 SocketType::STREAM,
1348 SocketFlags::empty(),
1349 None, // recv with DONTWAIT will suffice for platforms without SockFlag::SOCK_NONBLOCKING such as macOS
1350 )
1351 .unwrap();
1352
1353 let source = Generic::new(sock1, Interest::READ, Mode::Level);
1354 let dispatcher = Dispatcher::new(source, |_, fd, dispatched| {
1355 *dispatched = true;
1356 // read all contents available to drain the socket
1357 let mut buf = [0u8; 32];
1358 loop {
1359 match recv(&*fd, &mut buf, RecvFlags::DONTWAIT) {
1360 Ok(0) => break, // closed pipe, we are now inert
1361 Ok(_) => {}
1362 Err(e) => {
1363 let e: std::io::Error = e.into();
1364 if e.kind() == std::io::ErrorKind::WouldBlock {
1365 break;
1366 // nothing more to read
1367 } else {
1368 // propagate error
1369 return Err(e);
1370 }
1371 }
1372 }
1373 }
1374 Ok(PostAction::Continue)
1375 });
1376
1377 let sock_token_1 = event_loop
1378 .handle()
1379 .register_dispatcher(dispatcher.clone())
1380 .unwrap();
1381
1382 // first dispatch, nothing is readable
1383 let mut dispatched = false;
1384 event_loop
1385 .dispatch(Duration::ZERO, &mut dispatched)
1386 .unwrap();
1387 assert!(!dispatched);
1388
1389 // write something, the socket becomes readable
1390 write(&sock2, &[1, 2, 3]).unwrap();
1391 dispatched = false;
1392 event_loop
1393 .dispatch(Duration::ZERO, &mut dispatched)
1394 .unwrap();
1395 assert!(dispatched);
1396
1397 // All has been read, no longer readable
1398 dispatched = false;
1399 event_loop
1400 .dispatch(Duration::ZERO, &mut dispatched)
1401 .unwrap();
1402 assert!(!dispatched);
1403
1404 // change the interests for writability instead
1405 dispatcher.as_source_mut().interest = Interest::WRITE;
1406 event_loop.handle().update(&sock_token_1).unwrap();
1407
1408 // the socket is writable
1409 dispatched = false;
1410 event_loop
1411 .dispatch(Duration::ZERO, &mut dispatched)
1412 .unwrap();
1413 assert!(dispatched);
1414
1415 // change back to readable
1416 dispatcher.as_source_mut().interest = Interest::READ;
1417 event_loop.handle().update(&sock_token_1).unwrap();
1418
1419 // the socket is not readable
1420 dispatched = false;
1421 event_loop
1422 .dispatch(Duration::ZERO, &mut dispatched)
1423 .unwrap();
1424 assert!(!dispatched);
1425 }
1426
1427 #[test]
1428 fn kill_source() {
1429 let mut event_loop = EventLoop::<Option<RegistrationToken>>::try_new().unwrap();
1430
1431 let handle = event_loop.handle();
1432 let (ping, ping_source) = make_ping().unwrap();
1433 let ping_token = event_loop
1434 .handle()
1435 .insert_source(ping_source, move |(), &mut (), opt_src| {
1436 if let Some(src) = opt_src.take() {
1437 handle.remove(src);
1438 }
1439 })
1440 .unwrap();
1441
1442 ping.ping();
1443
1444 let mut opt_src = Some(ping_token);
1445
1446 event_loop.dispatch(Duration::ZERO, &mut opt_src).unwrap();
1447
1448 assert!(opt_src.is_none());
1449 }
1450
1451 #[test]
1452 fn non_static_data() {
1453 use std::sync::mpsc;
1454
1455 let (sender, receiver) = mpsc::channel();
1456
1457 {
1458 struct RefSender<'a>(&'a mpsc::Sender<()>);
1459 let mut ref_sender = RefSender(&sender);
1460
1461 let mut event_loop = EventLoop::<RefSender<'_>>::try_new().unwrap();
1462 let (ping, ping_source) = make_ping().unwrap();
1463 let _ping_token = event_loop
1464 .handle()
1465 .insert_source(ping_source, |_, _, ref_sender| {
1466 ref_sender.0.send(()).unwrap();
1467 })
1468 .unwrap();
1469
1470 ping.ping();
1471
1472 event_loop
1473 .dispatch(Duration::ZERO, &mut ref_sender)
1474 .unwrap();
1475 }
1476
1477 receiver.recv().unwrap();
1478 // sender still usable (e.g. for another EventLoop)
1479 drop(sender);
1480 }
1481
1482 #[cfg(feature = "block_on")]
1483 #[test]
1484 fn block_on_test() {
1485 use crate::sources::timer::TimeoutFuture;
1486 use std::time::Duration;
1487
1488 let mut evl = EventLoop::<()>::try_new().unwrap();
1489
1490 let mut data = 22;
1491 let timeout = {
1492 let data = &mut data;
1493 let evl_handle = evl.handle();
1494
1495 async move {
1496 TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1497 *data = 32;
1498 11
1499 }
1500 };
1501
1502 let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1503 assert_eq!(result, Some(11));
1504 assert_eq!(data, 32);
1505 }
1506
1507 #[cfg(feature = "block_on")]
1508 #[test]
1509 fn block_on_early_cancel() {
1510 use crate::sources::timer;
1511 use std::time::Duration;
1512
1513 let mut evl = EventLoop::<()>::try_new().unwrap();
1514
1515 let mut data = 22;
1516 let timeout = {
1517 let data = &mut data;
1518 let evl_handle = evl.handle();
1519
1520 async move {
1521 timer::TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1522 *data = 32;
1523 11
1524 }
1525 };
1526
1527 let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1528 let handle = evl.get_signal();
1529 let _timer_token = evl
1530 .handle()
1531 .insert_source(timer_source, move |_, _, _| {
1532 handle.stop();
1533 timer::TimeoutAction::Drop
1534 })
1535 .unwrap();
1536
1537 let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1538 assert_eq!(result, None);
1539 assert_eq!(data, 22);
1540 }
1541
1542 #[test]
1543 fn reuse() {
1544 use crate::sources::timer;
1545 use std::sync::{Arc, Mutex};
1546 use std::time::{Duration, Instant};
1547
1548 let mut evl = EventLoop::<RegistrationToken>::try_new().unwrap();
1549 let handle = evl.handle();
1550
1551 let data = Arc::new(Mutex::new(1));
1552 let data_cloned = data.clone();
1553
1554 let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1555 let mut first_timer_token = evl
1556 .handle()
1557 .insert_source(timer_source, move |_, _, own_token| {
1558 handle.remove(*own_token);
1559 let data_cloned = data_cloned.clone();
1560 let _ = handle.insert_source(timer::Timer::immediate(), move |_, _, _| {
1561 *data_cloned.lock().unwrap() = 2;
1562 timer::TimeoutAction::Drop
1563 });
1564 timer::TimeoutAction::Drop
1565 })
1566 .unwrap();
1567
1568 let now = Instant::now();
1569 loop {
1570 evl.dispatch(Some(Duration::from_secs(3)), &mut first_timer_token)
1571 .unwrap();
1572 if Instant::now().duration_since(now) > Duration::from_secs(3) {
1573 break;
1574 }
1575 }
1576
1577 assert_eq!(*data.lock().unwrap(), 2);
1578 }
1579
1580 #[test]
1581 fn drop_of_subsource() {
1582 struct WithSubSource {
1583 token: Option<Token>,
1584 }
1585
1586 impl crate::EventSource for WithSubSource {
1587 type Event = ();
1588 type Metadata = ();
1589 type Ret = ();
1590 type Error = crate::Error;
1591 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1592
1593 fn process_events<F>(
1594 &mut self,
1595 _: Readiness,
1596 _: Token,
1597 mut callback: F,
1598 ) -> Result<PostAction, Self::Error>
1599 where
1600 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1601 {
1602 callback((), &mut ());
1603 // Drop the source
1604 Ok(PostAction::Remove)
1605 }
1606
1607 fn register(&mut self, _: &mut Poll, fact: &mut TokenFactory) -> crate::Result<()> {
1608 // produce a few tokens to emulate a subsource
1609 fact.token();
1610 fact.token();
1611 self.token = Some(fact.token());
1612 Ok(())
1613 }
1614
1615 fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1616 Ok(())
1617 }
1618
1619 fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1620 Ok(())
1621 }
1622
1623 // emulate a readiness
1624 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1625 Ok(self.token.map(|token| {
1626 (
1627 Readiness {
1628 readable: true,
1629 writable: false,
1630 error: false,
1631 },
1632 token,
1633 )
1634 }))
1635 }
1636 }
1637
1638 // Now the actual test
1639 let mut evl = EventLoop::<bool>::try_new().unwrap();
1640 evl.handle()
1641 .insert_source(WithSubSource { token: None }, |_, _, ran| {
1642 *ran = true;
1643 })
1644 .unwrap();
1645
1646 let mut ran = false;
1647
1648 evl.dispatch(Some(Duration::ZERO), &mut ran).unwrap();
1649
1650 assert!(ran);
1651 }
1652
1653 // A dummy EventSource to test insertion and removal of sources
1654 struct DummySource;
1655
1656 impl crate::EventSource for DummySource {
1657 type Event = ();
1658 type Metadata = ();
1659 type Ret = ();
1660 type Error = crate::Error;
1661
1662 fn process_events<F>(
1663 &mut self,
1664 _: Readiness,
1665 _: Token,
1666 mut callback: F,
1667 ) -> Result<PostAction, Self::Error>
1668 where
1669 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1670 {
1671 callback((), &mut ());
1672 Ok(PostAction::Continue)
1673 }
1674
1675 fn register(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1676 Ok(())
1677 }
1678
1679 fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1680 Ok(())
1681 }
1682
1683 fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1684 Ok(())
1685 }
1686 }
1687}
1688