1//! Wrapper for a transient Calloop event source.
2//!
3//! If you have high level event source that you expect to remain in the event
4//! loop indefinitely, and another event source nested inside that one that you
5//! expect to require removal or disabling from time to time, this module can
6//! handle it for you.
7
8/// A [`TransientSource`] wraps a Calloop event source and manages its
9/// registration. A user of this type only needs to perform the usual Calloop
10/// calls (`process_events()` and `*register()`) and the return value of
11/// [`process_events()`](crate::EventSource::process_events).
12///
13/// Rather than needing to check for the full set of
14/// [`PostAction`](crate::PostAction) values returned from `process_events()`,
15/// you can just check for `Continue` or `Reregister` and pass that back out
16/// through your own `process_events()` implementation. In your registration
17/// functions, you then only need to call the same function on this type ie.
18/// `register()` inside `register()` etc.
19///
20/// For example, say you have a source that contains a channel along with some
21/// other logic. If the channel's sending end has been dropped, it needs to be
22/// removed from the loop. So to manage this, you use this in your struct:
23///
24/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
25/// struct CompositeSource {
26/// // Event source for channel.
27/// mpsc_receiver: TransientSource<calloop::channel::Channel<T>>,
28///
29/// // Any other fields go here...
30/// }
31/// ```
32///
33/// To create the transient source, you can simply use the `Into`
34/// implementation:
35///
36/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
37/// let (sender, source) = channel();
38/// let mpsc_receiver: TransientSource<Channel> = source.into();
39/// ```
40///
41/// (If you want to start off with an empty `TransientSource`, you can just use
42/// `Default::default()` instead.)
43///
44/// `TransientSource` implements [`EventSource`](crate::EventSource) and passes
45/// through `process_events()` calls, so in the parent's `process_events()`
46/// implementation you can just do this:
47///
48/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
49/// fn process_events<F>(
50/// &mut self,
51/// readiness: calloop::Readiness,
52/// token: calloop::Token,
53/// callback: F,
54/// ) -> Result<calloop::PostAction, Self::Error>
55/// where
56/// F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
57/// {
58/// let channel_return = self.mpsc_receiver.process_events(readiness, token, callback)?;
59///
60/// // Perform other logic here...
61///
62/// Ok(channel_return)
63/// }
64/// ```
65///
66/// Note that:
67///
68/// - You can call `process_events()` on the `TransientSource<Channel>` even
69/// if the channel has been unregistered and dropped. All that will happen
70/// is that you won't get any events from it.
71///
72/// - The [`PostAction`](crate::PostAction) returned from `process_events()`
73/// will only ever be `PostAction::Continue` or `PostAction::Reregister`.
74/// You will still need to combine this with the result of any other sources
75/// (transient or not).
76///
77/// Once you return `channel_return` from your `process_events()` method (and
78/// assuming it propagates all the way up to the event loop itself through any
79/// other event sources), the event loop might call `reregister()` on your
80/// source. All your source has to do is:
81///
82/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
83/// fn reregister(
84/// &mut self,
85/// poll: &mut calloop::Poll,
86/// token_factory: &mut calloop::TokenFactory,
87/// ) -> crate::Result<()> {
88/// self.mpsc_receiver.reregister(poll, token_factory)?;
89///
90/// // Other registration actions...
91///
92/// Ok(())
93/// }
94/// ```
95///
96/// The `TransientSource` will take care of updating the registration of the
97/// inner source, even if it actually needs to be unregistered or initially
98/// registered.
99///
100/// ## Replacing or removing `TransientSource`s
101///
102/// Not properly removing or replacing `TransientSource`s can cause spurious
103/// wakeups of the event loop, and in some cases can leak file descriptors or
104/// fail to free entries in Calloop's internal data structures. No unsoundness
105/// or undefined behaviour will result, but leaking file descriptors can result
106/// in errors or panics.
107///
108/// If you want to remove a source before it returns `PostAction::Remove`, use
109/// the [`TransientSource::remove()`] method. If you want to replace a source
110/// with another one, use the [`TransientSource::replace()`] method. Either of
111/// these may be called at any time during processing or from outside the event
112/// loop. Both require either returning `PostAction::Reregister` from the
113/// `process_event()` call that does this, or reregistering the event source
114/// some other way eg. via the top-level loop handle.
115///
116/// If, instead, you directly assign a new source to the variable holding the
117/// `TransientSource`, the inner source will be dropped before it can be
118/// unregistered. For example:
119///
120/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
121/// self.mpsc_receiver = Default::default();
122/// self.mpsc_receiver = new_channel.into();
123/// ```
124#[derive(Debug, Default)]
125pub struct TransientSource<T> {
126 state: TransientSourceState<T>,
127}
128
129/// This is the internal state of the [`TransientSource`], as a separate type so
130/// it's not exposed.
131#[derive(Debug)]
132enum TransientSourceState<T> {
133 /// The source should be kept in the loop.
134 Keep(T),
135 /// The source needs to be registered with the loop.
136 Register(T),
137 /// The source needs to be disabled but kept.
138 Disable(T),
139 /// The source needs to be removed from the loop.
140 Remove(T),
141 /// The source is being replaced by another. For most API purposes (eg.
142 /// `map()`), this will be treated as the `Register` state enclosing the new
143 /// source.
144 Replace {
145 /// The new source, which will be registered and used from now on.
146 new: T,
147 /// The old source, which will be unregistered and dropped.
148 old: T,
149 },
150 /// The source has been removed from the loop and dropped (this might also
151 /// be observed if there is a panic while changing states).
152 None,
153}
154
155impl<T> Default for TransientSourceState<T> {
156 fn default() -> Self {
157 Self::None
158 }
159}
160
161impl<T> TransientSourceState<T> {
162 /// If a caller needs to flag the contained source for removal or
163 /// registration, we need to replace the enum variant safely. This requires
164 /// having a `None` value in there temporarily while we do the swap.
165 ///
166 /// If the variant is `None` the value will not change and `replacer` will
167 /// not be called. If the variant is `Replace` then `replacer` will be
168 /// called **on the new source**, which may cause the old source to leak
169 /// registration in the event loop if it has not yet been unregistered.
170 ///
171 /// The `replacer` function here is expected to be one of the enum variant
172 /// constructors eg. `replace(TransientSource::Remove)`.
173 fn replace_state<F>(&mut self, replacer: F)
174 where
175 F: FnOnce(T) -> Self,
176 {
177 *self = match std::mem::take(self) {
178 Self::Keep(source)
179 | Self::Register(source)
180 | Self::Remove(source)
181 | Self::Disable(source)
182 | Self::Replace { new: source, .. } => replacer(source),
183 Self::None => return,
184 };
185 }
186}
187
188impl<T> TransientSource<T> {
189 /// Apply a function to the enclosed source, if it exists and is not about
190 /// to be removed.
191 pub fn map<F, U>(&mut self, f: F) -> Option<U>
192 where
193 F: FnOnce(&mut T) -> U,
194 {
195 match &mut self.state {
196 TransientSourceState::Keep(source)
197 | TransientSourceState::Register(source)
198 | TransientSourceState::Disable(source)
199 | TransientSourceState::Replace { new: source, .. } => Some(f(source)),
200 TransientSourceState::Remove(_) | TransientSourceState::None => None,
201 }
202 }
203
204 /// Returns `true` if there is no wrapped event source.
205 pub fn is_none(&self) -> bool {
206 matches!(self.state, TransientSourceState::None)
207 }
208
209 /// Removes the wrapped event source from the event loop and this wrapper.
210 ///
211 /// If this is called from outside of the event loop, you will need to wake
212 /// up the event loop for any changes to take place. If it is called from
213 /// within the event loop, you must return `PostAction::Reregister` from
214 /// your own event source's `process_events()`, and the source will be
215 /// unregistered as needed after it exits.
216 pub fn remove(&mut self) {
217 self.state.replace_state(TransientSourceState::Remove);
218 }
219
220 /// Replace the currently wrapped source with the given one. No more events
221 /// will be generated from the old source after this point. The old source
222 /// will not be dropped immediately, it will be kept so that it can be
223 /// deregistered.
224 ///
225 /// If this is called from outside of the event loop, you will need to wake
226 /// up the event loop for any changes to take place. If it is called from
227 /// within the event loop, you must return `PostAction::Reregister` from
228 /// your own event source's `process_events()`, and the sources will be
229 /// registered and unregistered as needed after it exits.
230 pub fn replace(&mut self, new: T) {
231 self.state
232 .replace_state(|old| TransientSourceState::Replace { new, old });
233 }
234}
235
236impl<T: crate::EventSource> From<T> for TransientSource<T> {
237 fn from(source: T) -> Self {
238 Self {
239 state: TransientSourceState::Register(source),
240 }
241 }
242}
243
244impl<T: crate::EventSource> crate::EventSource for TransientSource<T> {
245 type Event = T::Event;
246 type Metadata = T::Metadata;
247 type Ret = T::Ret;
248 type Error = T::Error;
249
250 fn process_events<F>(
251 &mut self,
252 readiness: crate::Readiness,
253 token: crate::Token,
254 callback: F,
255 ) -> Result<crate::PostAction, Self::Error>
256 where
257 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
258 {
259 let reregister = if let TransientSourceState::Keep(source) = &mut self.state {
260 let child_post_action = source.process_events(readiness, token, callback)?;
261
262 match child_post_action {
263 // Nothing needs to change.
264 crate::PostAction::Continue => false,
265
266 // Our child source needs re-registration, therefore this
267 // wrapper needs re-registration.
268 crate::PostAction::Reregister => true,
269
270 // If our nested source needs to be removed or disabled, we need
271 // to swap it out for the "Remove" or "Disable" variant.
272 crate::PostAction::Disable => {
273 self.state.replace_state(TransientSourceState::Disable);
274 true
275 }
276
277 crate::PostAction::Remove => {
278 self.state.replace_state(TransientSourceState::Remove);
279 true
280 }
281 }
282 } else {
283 false
284 };
285
286 let post_action = if reregister {
287 crate::PostAction::Reregister
288 } else {
289 crate::PostAction::Continue
290 };
291
292 Ok(post_action)
293 }
294
295 fn register(
296 &mut self,
297 poll: &mut crate::Poll,
298 token_factory: &mut crate::TokenFactory,
299 ) -> crate::Result<()> {
300 match &mut self.state {
301 TransientSourceState::Keep(source) => {
302 source.register(poll, token_factory)?;
303 }
304 TransientSourceState::Register(source)
305 | TransientSourceState::Disable(source)
306 | TransientSourceState::Replace { new: source, .. } => {
307 source.register(poll, token_factory)?;
308 self.state.replace_state(TransientSourceState::Keep);
309 // Drops the disposed source in the Replace case.
310 }
311 TransientSourceState::Remove(_source) => {
312 self.state.replace_state(|_| TransientSourceState::None);
313 }
314 TransientSourceState::None => (),
315 }
316 Ok(())
317 }
318
319 fn reregister(
320 &mut self,
321 poll: &mut crate::Poll,
322 token_factory: &mut crate::TokenFactory,
323 ) -> crate::Result<()> {
324 match &mut self.state {
325 TransientSourceState::Keep(source) => source.reregister(poll, token_factory)?,
326 TransientSourceState::Register(source) => {
327 source.register(poll, token_factory)?;
328 self.state.replace_state(TransientSourceState::Keep);
329 }
330 TransientSourceState::Disable(source) => {
331 source.unregister(poll)?;
332 }
333 TransientSourceState::Remove(source) => {
334 source.unregister(poll)?;
335 self.state.replace_state(|_| TransientSourceState::None);
336 }
337 TransientSourceState::Replace { new, old } => {
338 old.unregister(poll)?;
339 new.register(poll, token_factory)?;
340 self.state.replace_state(TransientSourceState::Keep);
341 // Drops 'dispose'.
342 }
343 TransientSourceState::None => (),
344 }
345 Ok(())
346 }
347
348 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
349 match &mut self.state {
350 TransientSourceState::Keep(source)
351 | TransientSourceState::Register(source)
352 | TransientSourceState::Disable(source) => source.unregister(poll)?,
353 TransientSourceState::Remove(source) => {
354 source.unregister(poll)?;
355 self.state.replace_state(|_| TransientSourceState::None);
356 }
357 TransientSourceState::Replace { new, old } => {
358 old.unregister(poll)?;
359 new.unregister(poll)?;
360 self.state.replace_state(TransientSourceState::Register);
361 }
362 TransientSourceState::None => (),
363 }
364 Ok(())
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use crate::{
372 channel::{channel, Channel, Event},
373 ping::{make_ping, PingSource},
374 Dispatcher, EventSource, PostAction,
375 };
376 use std::{
377 rc::Rc,
378 sync::atomic::{AtomicBool, Ordering},
379 time::Duration,
380 };
381
382 #[test]
383 fn test_transient_drop() {
384 // A test source that sets a flag when it's dropped.
385 struct TestSource<'a> {
386 dropped: &'a AtomicBool,
387 ping: PingSource,
388 }
389
390 impl<'a> Drop for TestSource<'a> {
391 fn drop(&mut self) {
392 self.dropped.store(true, Ordering::Relaxed)
393 }
394 }
395
396 impl<'a> crate::EventSource for TestSource<'a> {
397 type Event = ();
398 type Metadata = ();
399 type Ret = ();
400 type Error = Box<dyn std::error::Error + Sync + Send>;
401
402 fn process_events<F>(
403 &mut self,
404 readiness: crate::Readiness,
405 token: crate::Token,
406 callback: F,
407 ) -> Result<crate::PostAction, Self::Error>
408 where
409 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
410 {
411 self.ping.process_events(readiness, token, callback)?;
412 Ok(PostAction::Remove)
413 }
414
415 fn register(
416 &mut self,
417 poll: &mut crate::Poll,
418 token_factory: &mut crate::TokenFactory,
419 ) -> crate::Result<()> {
420 self.ping.register(poll, token_factory)
421 }
422
423 fn reregister(
424 &mut self,
425 poll: &mut crate::Poll,
426 token_factory: &mut crate::TokenFactory,
427 ) -> crate::Result<()> {
428 self.ping.reregister(poll, token_factory)
429 }
430
431 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
432 self.ping.unregister(poll)
433 }
434 }
435
436 // Test that the inner source is actually dropped when it asks to be
437 // removed from the loop, while the TransientSource remains. We use two
438 // flags for this:
439 // - fired: should be set only when the inner event source has an event
440 // - dropped: set by the drop handler for the inner source (it's an
441 // AtomicBool becaues it requires a longer lifetime than the fired
442 // flag)
443 let mut fired = false;
444 let dropped = false.into();
445
446 // The inner source that should be dropped after the first loop run.
447 let (pinger, ping) = make_ping().unwrap();
448 let inner = TestSource {
449 dropped: &dropped,
450 ping,
451 };
452
453 // The TransientSource wrapper.
454 let outer: TransientSource<_> = inner.into();
455
456 let mut event_loop = crate::EventLoop::try_new().unwrap();
457 let handle = event_loop.handle();
458
459 let _token = handle
460 .insert_source(outer, |_, _, fired| {
461 *fired = true;
462 })
463 .unwrap();
464
465 // First loop run: the ping generates an event for the inner source.
466 pinger.ping();
467
468 event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
469
470 assert!(fired);
471 assert!(dropped.load(Ordering::Relaxed));
472
473 // Second loop run: the ping does nothing because the receiver has been
474 // dropped.
475 fired = false;
476
477 pinger.ping();
478
479 event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
480 assert!(!fired);
481 }
482
483 #[test]
484 fn test_transient_passthrough() {
485 // Test that event processing works when a source is nested inside a
486 // TransientSource. In particular, we want to ensure that the final
487 // event is received even if it corresponds to that same event source
488 // returning `PostAction::Remove`.
489 let (sender, receiver) = channel();
490 let outer: TransientSource<_> = receiver.into();
491
492 let mut event_loop = crate::EventLoop::try_new().unwrap();
493 let handle = event_loop.handle();
494
495 // Our callback puts the receied events in here for us to check later.
496 let mut msg_queue = vec![];
497
498 let _token = handle
499 .insert_source(outer, |msg, _, queue: &mut Vec<_>| {
500 queue.push(msg);
501 })
502 .unwrap();
503
504 // Send some data and drop the sender. We specifically want to test that
505 // we get the "closed" message.
506 sender.send(0u32).unwrap();
507 sender.send(1u32).unwrap();
508 sender.send(2u32).unwrap();
509 sender.send(3u32).unwrap();
510 drop(sender);
511
512 // Run loop once to process events.
513 event_loop.dispatch(Duration::ZERO, &mut msg_queue).unwrap();
514
515 assert!(matches!(
516 msg_queue.as_slice(),
517 &[
518 Event::Msg(0u32),
519 Event::Msg(1u32),
520 Event::Msg(2u32),
521 Event::Msg(3u32),
522 Event::Closed
523 ]
524 ));
525 }
526
527 #[test]
528 fn test_transient_map() {
529 struct IdSource {
530 id: u32,
531 ping: PingSource,
532 }
533
534 impl EventSource for IdSource {
535 type Event = u32;
536 type Metadata = ();
537 type Ret = ();
538 type Error = Box<dyn std::error::Error + Sync + Send>;
539
540 fn process_events<F>(
541 &mut self,
542 readiness: crate::Readiness,
543 token: crate::Token,
544 mut callback: F,
545 ) -> Result<PostAction, Self::Error>
546 where
547 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
548 {
549 let id = self.id;
550 self.ping
551 .process_events(readiness, token, |_, md| callback(id, md))?;
552
553 let action = if self.id > 2 {
554 PostAction::Remove
555 } else {
556 PostAction::Continue
557 };
558
559 Ok(action)
560 }
561
562 fn register(
563 &mut self,
564 poll: &mut crate::Poll,
565 token_factory: &mut crate::TokenFactory,
566 ) -> crate::Result<()> {
567 self.ping.register(poll, token_factory)
568 }
569
570 fn reregister(
571 &mut self,
572 poll: &mut crate::Poll,
573 token_factory: &mut crate::TokenFactory,
574 ) -> crate::Result<()> {
575 self.ping.reregister(poll, token_factory)
576 }
577
578 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
579 self.ping.unregister(poll)
580 }
581 }
582
583 struct WrapperSource(TransientSource<IdSource>);
584
585 impl EventSource for WrapperSource {
586 type Event = <IdSource as EventSource>::Event;
587 type Metadata = <IdSource as EventSource>::Metadata;
588 type Ret = <IdSource as EventSource>::Ret;
589 type Error = <IdSource as EventSource>::Error;
590
591 fn process_events<F>(
592 &mut self,
593 readiness: crate::Readiness,
594 token: crate::Token,
595 callback: F,
596 ) -> Result<PostAction, Self::Error>
597 where
598 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
599 {
600 let action = self.0.process_events(readiness, token, callback);
601 self.0.map(|inner| inner.id += 1);
602 action
603 }
604
605 fn register(
606 &mut self,
607 poll: &mut crate::Poll,
608 token_factory: &mut crate::TokenFactory,
609 ) -> crate::Result<()> {
610 self.0.map(|inner| inner.id += 1);
611 self.0.register(poll, token_factory)
612 }
613
614 fn reregister(
615 &mut self,
616 poll: &mut crate::Poll,
617 token_factory: &mut crate::TokenFactory,
618 ) -> crate::Result<()> {
619 self.0.map(|inner| inner.id += 1);
620 self.0.reregister(poll, token_factory)
621 }
622
623 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
624 self.0.map(|inner| inner.id += 1);
625 self.0.unregister(poll)
626 }
627 }
628
629 // To test the id later.
630 let mut id = 0;
631
632 // Create our source.
633 let (pinger, ping) = make_ping().unwrap();
634 let inner = IdSource { id, ping };
635
636 // The TransientSource wrapper.
637 let outer: TransientSource<_> = inner.into();
638
639 // The top level source.
640 let top = WrapperSource(outer);
641
642 // Create a dispatcher so we can check the source afterwards.
643 let dispatcher = Dispatcher::new(top, |got_id, _, test_id| {
644 *test_id = got_id;
645 });
646
647 let mut event_loop = crate::EventLoop::try_new().unwrap();
648 let handle = event_loop.handle();
649
650 let token = handle.register_dispatcher(dispatcher.clone()).unwrap();
651
652 // First loop run: the ping generates an event for the inner source.
653 // The ID should be 1 after the increment in register().
654 pinger.ping();
655 event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
656 assert_eq!(id, 1);
657
658 // Second loop run: the ID should be 2 after the previous
659 // process_events().
660 pinger.ping();
661 event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
662 assert_eq!(id, 2);
663
664 // Third loop run: the ID should be 3 after another process_events().
665 pinger.ping();
666 event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
667 assert_eq!(id, 3);
668
669 // Fourth loop run: the callback is no longer called by the inner
670 // source, so our local ID is not incremented.
671 pinger.ping();
672 event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
673 assert_eq!(id, 3);
674
675 // Remove the dispatcher so we can inspect the sources.
676 handle.remove(token);
677
678 let mut top_after = dispatcher.into_source_inner();
679
680 // I expect the inner source to be dropped, so the TransientSource
681 // variant is None (its version of None, not Option::None), so its map()
682 // won't call the passed-in function (hence the unreachable!()) and its
683 // return value should be Option::None.
684 assert!(top_after.0.map(|_| unreachable!()).is_none());
685 }
686
687 #[test]
688 fn test_transient_disable() {
689 // Test that disabling and enabling is handled properly.
690 struct DisablingSource(PingSource);
691
692 impl EventSource for DisablingSource {
693 type Event = ();
694 type Metadata = ();
695 type Ret = ();
696 type Error = Box<dyn std::error::Error + Sync + Send>;
697
698 fn process_events<F>(
699 &mut self,
700 readiness: crate::Readiness,
701 token: crate::Token,
702 callback: F,
703 ) -> Result<PostAction, Self::Error>
704 where
705 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
706 {
707 self.0.process_events(readiness, token, callback)?;
708 Ok(PostAction::Disable)
709 }
710
711 fn register(
712 &mut self,
713 poll: &mut crate::Poll,
714 token_factory: &mut crate::TokenFactory,
715 ) -> crate::Result<()> {
716 self.0.register(poll, token_factory)
717 }
718
719 fn reregister(
720 &mut self,
721 poll: &mut crate::Poll,
722 token_factory: &mut crate::TokenFactory,
723 ) -> crate::Result<()> {
724 self.0.reregister(poll, token_factory)
725 }
726
727 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
728 self.0.unregister(poll)
729 }
730 }
731
732 // Flag for checking when the source fires.
733 let mut fired = false;
734
735 // Create our source.
736 let (pinger, ping) = make_ping().unwrap();
737
738 let inner = DisablingSource(ping);
739
740 // The TransientSource wrapper.
741 let outer: TransientSource<_> = inner.into();
742
743 let mut event_loop = crate::EventLoop::try_new().unwrap();
744 let handle = event_loop.handle();
745 let token = handle
746 .insert_source(outer, |_, _, fired| {
747 *fired = true;
748 })
749 .unwrap();
750
751 // Ping here and not later, to check that disabling after an event is
752 // triggered but not processed does not discard the event.
753 pinger.ping();
754 event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
755 assert!(fired);
756
757 // Source should now be disabled.
758 pinger.ping();
759 fired = false;
760 event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
761 assert!(!fired);
762
763 // Re-enable the source.
764 handle.enable(&token).unwrap();
765
766 // Trigger another event.
767 pinger.ping();
768 fired = false;
769 event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
770 assert!(fired);
771 }
772
773 #[test]
774 fn test_transient_replace_unregister() {
775 // This is a bit of a complex test, but it essentially boils down to:
776 // how can a "parent" event source containing a TransientSource replace
777 // the "child" source without leaking the source's registration?
778
779 // First, a source that finishes immediately. This is so we cover the
780 // edge case of replacing a source as soon as it wants to be removed.
781 struct FinishImmediatelySource {
782 source: PingSource,
783 data: Option<i32>,
784 registered: bool,
785 dropped: Rc<AtomicBool>,
786 }
787
788 impl FinishImmediatelySource {
789 // The constructor passes out the drop flag so we can check that
790 // this source was or wasn't dropped.
791 fn new(source: PingSource, data: i32) -> (Self, Rc<AtomicBool>) {
792 let dropped = Rc::new(false.into());
793
794 (
795 Self {
796 source,
797 data: Some(data),
798 registered: false,
799 dropped: Rc::clone(&dropped),
800 },
801 dropped,
802 )
803 }
804 }
805
806 impl EventSource for FinishImmediatelySource {
807 type Event = i32;
808 type Metadata = ();
809 type Ret = ();
810 type Error = Box<dyn std::error::Error + Sync + Send>;
811
812 fn process_events<F>(
813 &mut self,
814 readiness: crate::Readiness,
815 token: crate::Token,
816 mut callback: F,
817 ) -> Result<PostAction, Self::Error>
818 where
819 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
820 {
821 let mut data = self.data.take();
822
823 self.source.process_events(readiness, token, |_, _| {
824 if let Some(data) = data.take() {
825 callback(data, &mut ())
826 }
827 })?;
828
829 self.data = data;
830
831 Ok(if self.data.is_none() {
832 PostAction::Remove
833 } else {
834 PostAction::Continue
835 })
836 }
837
838 fn register(
839 &mut self,
840 poll: &mut crate::Poll,
841 token_factory: &mut crate::TokenFactory,
842 ) -> crate::Result<()> {
843 self.registered = true;
844 self.source.register(poll, token_factory)
845 }
846
847 fn reregister(
848 &mut self,
849 poll: &mut crate::Poll,
850 token_factory: &mut crate::TokenFactory,
851 ) -> crate::Result<()> {
852 self.source.reregister(poll, token_factory)
853 }
854
855 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
856 self.registered = false;
857 self.source.unregister(poll)
858 }
859 }
860
861 // The drop handler sets a flag we can check for debugging (we want to
862 // know that the source itself was dropped), and also checks that the
863 // source was unregistered. Ultimately neither the source nor its
864 // registration should be leaked.
865
866 impl Drop for FinishImmediatelySource {
867 fn drop(&mut self) {
868 assert!(!self.registered, "source dropped while still registered");
869 self.dropped.store(true, Ordering::Relaxed);
870 }
871 }
872
873 // Our wrapper source handles detecting when the child source finishes,
874 // and replacing that child source with another one that will generate
875 // more events. This is one intended use case of the TransientSource.
876
877 struct WrapperSource {
878 current: TransientSource<FinishImmediatelySource>,
879 replacement: Option<FinishImmediatelySource>,
880 dropped: Rc<AtomicBool>,
881 }
882
883 impl WrapperSource {
884 // The constructor passes out the drop flag so we can check that
885 // this source was or wasn't dropped.
886 fn new(
887 first: FinishImmediatelySource,
888 second: FinishImmediatelySource,
889 ) -> (Self, Rc<AtomicBool>) {
890 let dropped = Rc::new(false.into());
891
892 (
893 Self {
894 current: first.into(),
895 replacement: second.into(),
896 dropped: Rc::clone(&dropped),
897 },
898 dropped,
899 )
900 }
901 }
902
903 impl EventSource for WrapperSource {
904 type Event = i32;
905 type Metadata = ();
906 type Ret = ();
907 type Error = Box<dyn std::error::Error + Sync + Send>;
908
909 fn process_events<F>(
910 &mut self,
911 readiness: crate::Readiness,
912 token: crate::Token,
913 mut callback: F,
914 ) -> Result<PostAction, Self::Error>
915 where
916 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
917 {
918 // Did our event source generate an event?
919 let mut fired = false;
920
921 let post_action = self.current.process_events(readiness, token, |data, _| {
922 callback(data, &mut ());
923 fired = true;
924 })?;
925
926 if fired {
927 // The event source will be unregistered after the current
928 // process_events() iteration is finished. The replace()
929 // method will handle doing that even while we've added a
930 // new source.
931 if let Some(replacement) = self.replacement.take() {
932 self.current.replace(replacement);
933 }
934
935 // Parent source is responsible for flagging this, but it's
936 // already set.
937 assert_eq!(post_action, PostAction::Reregister);
938 }
939
940 Ok(post_action)
941 }
942
943 fn register(
944 &mut self,
945 poll: &mut crate::Poll,
946 token_factory: &mut crate::TokenFactory,
947 ) -> crate::Result<()> {
948 self.current.register(poll, token_factory)
949 }
950
951 fn reregister(
952 &mut self,
953 poll: &mut crate::Poll,
954 token_factory: &mut crate::TokenFactory,
955 ) -> crate::Result<()> {
956 self.current.reregister(poll, token_factory)
957 }
958
959 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
960 self.current.unregister(poll)
961 }
962 }
963
964 impl Drop for WrapperSource {
965 fn drop(&mut self) {
966 self.dropped.store(true, Ordering::Relaxed);
967 }
968 }
969
970 // Construct the various nested sources - FinishImmediatelySource inside
971 // TransientSource inside WrapperSource. The numbers let us verify which
972 // event source fires first.
973 let (ping0_tx, ping0_rx) = crate::ping::make_ping().unwrap();
974 let (ping1_tx, ping1_rx) = crate::ping::make_ping().unwrap();
975 let (inner0, inner0_dropped) = FinishImmediatelySource::new(ping0_rx, 0);
976 let (inner1, inner1_dropped) = FinishImmediatelySource::new(ping1_rx, 1);
977 let (outer, outer_dropped) = WrapperSource::new(inner0, inner1);
978
979 // Now the actual test starts.
980
981 let mut event_loop: crate::EventLoop<(Option<i32>, crate::LoopSignal)> =
982 crate::EventLoop::try_new().unwrap();
983 let handle = event_loop.handle();
984 let signal = event_loop.get_signal();
985
986 // This is how we communicate with the event sources.
987 let mut context = (None, signal);
988
989 let _token = handle
990 .insert_source(outer, |data, _, (evt, sig)| {
991 *evt = Some(data);
992 sig.stop();
993 })
994 .unwrap();
995
996 // Ensure our sources fire.
997 ping0_tx.ping();
998 ping1_tx.ping();
999
1000 // Use run() rather than dispatch() because it's not strictly part of
1001 // any API contract as to how many runs of the event loop it takes to
1002 // replace the nested source.
1003 event_loop.run(None, &mut context, |_| {}).unwrap();
1004
1005 // First, make sure the inner source actually did fire.
1006 assert_eq!(context.0.take(), Some(0), "first inner source did not fire");
1007
1008 // Make sure that the outer source is still alive.
1009 assert!(
1010 !outer_dropped.load(Ordering::Relaxed),
1011 "outer source already dropped"
1012 );
1013
1014 // Make sure that the inner child source IS dropped now.
1015 assert!(
1016 inner0_dropped.load(Ordering::Relaxed),
1017 "first inner source not dropped"
1018 );
1019
1020 // Make sure that, in between the first event and second event, the
1021 // replacement child source still exists.
1022 assert!(
1023 !inner1_dropped.load(Ordering::Relaxed),
1024 "replacement inner source dropped"
1025 );
1026
1027 // Run the event loop until we get a second event.
1028 event_loop.run(None, &mut context, |_| {}).unwrap();
1029
1030 // Ensure the replacement source fired (which checks that it was
1031 // registered and is being processed by the TransientSource).
1032 assert_eq!(context.0.take(), Some(1), "replacement source did not fire");
1033 }
1034
1035 #[test]
1036 fn test_transient_remove() {
1037 // This tests that calling remove(), even before an event source has
1038 // requested its own removal, results in the event source being removed.
1039
1040 const STOP_AT: i32 = 2;
1041
1042 // A wrapper source to automate the removal of the inner source.
1043 struct WrapperSource {
1044 inner: TransientSource<Channel<i32>>,
1045 }
1046
1047 impl EventSource for WrapperSource {
1048 type Event = i32;
1049 type Metadata = ();
1050 type Ret = ();
1051 type Error = Box<dyn std::error::Error + Sync + Send>;
1052
1053 fn process_events<F>(
1054 &mut self,
1055 readiness: crate::Readiness,
1056 token: crate::Token,
1057 mut callback: F,
1058 ) -> Result<PostAction, Self::Error>
1059 where
1060 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1061 {
1062 let mut remove = false;
1063
1064 let mut post_action = self.inner.process_events(readiness, token, |evt, _| {
1065 if let Event::Msg(num) = evt {
1066 callback(num, &mut ());
1067 remove = num >= STOP_AT;
1068 }
1069 })?;
1070
1071 if remove {
1072 self.inner.remove();
1073 post_action |= PostAction::Reregister;
1074 }
1075
1076 Ok(post_action)
1077 }
1078
1079 fn register(
1080 &mut self,
1081 poll: &mut crate::Poll,
1082 token_factory: &mut crate::TokenFactory,
1083 ) -> crate::Result<()> {
1084 self.inner.register(poll, token_factory)
1085 }
1086
1087 fn reregister(
1088 &mut self,
1089 poll: &mut crate::Poll,
1090 token_factory: &mut crate::TokenFactory,
1091 ) -> crate::Result<()> {
1092 self.inner.reregister(poll, token_factory)
1093 }
1094
1095 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
1096 self.inner.unregister(poll)
1097 }
1098 }
1099
1100 // Create our sources and loop.
1101
1102 let (sender, receiver) = channel();
1103 let wrapper = WrapperSource {
1104 inner: receiver.into(),
1105 };
1106
1107 let mut event_loop = crate::EventLoop::try_new().unwrap();
1108 let handle = event_loop.handle();
1109
1110 handle
1111 .insert_source(wrapper, |num, _, out: &mut Option<_>| {
1112 *out = Some(num);
1113 })
1114 .unwrap();
1115
1116 // Storage for callback data.
1117 let mut out = None;
1118
1119 // Send some data we expect to get callbacks for.
1120 for num in 0..=STOP_AT {
1121 sender.send(num).unwrap();
1122 event_loop.dispatch(Duration::ZERO, &mut out).unwrap();
1123 assert_eq!(out.take(), Some(num));
1124 }
1125
1126 // Now we expect the receiver to be gone.
1127 assert!(matches!(
1128 sender.send(STOP_AT + 1),
1129 Err(std::sync::mpsc::SendError { .. })
1130 ));
1131 }
1132}
1133