1 | //! Wrapper for a transient Calloop event source. |
2 | //! |
3 | //! If you have high level event source that you expect to remain in the event |
4 | //! loop indefinitely, and another event source nested inside that one that you |
5 | //! expect to require removal or disabling from time to time, this module can |
6 | //! handle it for you. |
7 | |
8 | /// A [`TransientSource`] wraps a Calloop event source and manages its |
9 | /// registration. A user of this type only needs to perform the usual Calloop |
10 | /// calls (`process_events()` and `*register()`) and the return value of |
11 | /// [`process_events()`](crate::EventSource::process_events). |
12 | /// |
13 | /// Rather than needing to check for the full set of |
14 | /// [`PostAction`](crate::PostAction) values returned from `process_events()`, |
15 | /// you can just check for `Continue` or `Reregister` and pass that back out |
16 | /// through your own `process_events()` implementation. In your registration |
17 | /// functions, you then only need to call the same function on this type ie. |
18 | /// `register()` inside `register()` etc. |
19 | /// |
20 | /// For example, say you have a source that contains a channel along with some |
21 | /// other logic. If the channel's sending end has been dropped, it needs to be |
22 | /// removed from the loop. So to manage this, you use this in your struct: |
23 | /// |
24 | /// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193 |
25 | /// struct CompositeSource { |
26 | /// // Event source for channel. |
27 | /// mpsc_receiver: TransientSource<calloop::channel::Channel<T>>, |
28 | /// |
29 | /// // Any other fields go here... |
30 | /// } |
31 | /// ``` |
32 | /// |
33 | /// To create the transient source, you can simply use the `Into` |
34 | /// implementation: |
35 | /// |
36 | /// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193 |
37 | /// let (sender, source) = channel(); |
38 | /// let mpsc_receiver: TransientSource<Channel> = source.into(); |
39 | /// ``` |
40 | /// |
41 | /// (If you want to start off with an empty `TransientSource`, you can just use |
42 | /// `Default::default()` instead.) |
43 | /// |
44 | /// `TransientSource` implements [`EventSource`](crate::EventSource) and passes |
45 | /// through `process_events()` calls, so in the parent's `process_events()` |
46 | /// implementation you can just do this: |
47 | /// |
48 | /// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193 |
49 | /// fn process_events<F>( |
50 | /// &mut self, |
51 | /// readiness: calloop::Readiness, |
52 | /// token: calloop::Token, |
53 | /// callback: F, |
54 | /// ) -> Result<calloop::PostAction, Self::Error> |
55 | /// where |
56 | /// F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
57 | /// { |
58 | /// let channel_return = self.mpsc_receiver.process_events(readiness, token, callback)?; |
59 | /// |
60 | /// // Perform other logic here... |
61 | /// |
62 | /// Ok(channel_return) |
63 | /// } |
64 | /// ``` |
65 | /// |
66 | /// Note that: |
67 | /// |
68 | /// - You can call `process_events()` on the `TransientSource<Channel>` even |
69 | /// if the channel has been unregistered and dropped. All that will happen |
70 | /// is that you won't get any events from it. |
71 | /// |
72 | /// - The [`PostAction`](crate::PostAction) returned from `process_events()` |
73 | /// will only ever be `PostAction::Continue` or `PostAction::Reregister`. |
74 | /// You will still need to combine this with the result of any other sources |
75 | /// (transient or not). |
76 | /// |
77 | /// Once you return `channel_return` from your `process_events()` method (and |
78 | /// assuming it propagates all the way up to the event loop itself through any |
79 | /// other event sources), the event loop might call `reregister()` on your |
80 | /// source. All your source has to do is: |
81 | /// |
82 | /// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193 |
83 | /// fn reregister( |
84 | /// &mut self, |
85 | /// poll: &mut calloop::Poll, |
86 | /// token_factory: &mut calloop::TokenFactory, |
87 | /// ) -> crate::Result<()> { |
88 | /// self.mpsc_receiver.reregister(poll, token_factory)?; |
89 | /// |
90 | /// // Other registration actions... |
91 | /// |
92 | /// Ok(()) |
93 | /// } |
94 | /// ``` |
95 | /// |
96 | /// The `TransientSource` will take care of updating the registration of the |
97 | /// inner source, even if it actually needs to be unregistered or initially |
98 | /// registered. |
99 | /// |
100 | /// ## Replacing or removing `TransientSource`s |
101 | /// |
102 | /// Not properly removing or replacing `TransientSource`s can cause spurious |
103 | /// wakeups of the event loop, and in some cases can leak file descriptors or |
104 | /// fail to free entries in Calloop's internal data structures. No unsoundness |
105 | /// or undefined behaviour will result, but leaking file descriptors can result |
106 | /// in errors or panics. |
107 | /// |
108 | /// If you want to remove a source before it returns `PostAction::Remove`, use |
109 | /// the [`TransientSource::remove()`] method. If you want to replace a source |
110 | /// with another one, use the [`TransientSource::replace()`] method. Either of |
111 | /// these may be called at any time during processing or from outside the event |
112 | /// loop. Both require either returning `PostAction::Reregister` from the |
113 | /// `process_event()` call that does this, or reregistering the event source |
114 | /// some other way eg. via the top-level loop handle. |
115 | /// |
116 | /// If, instead, you directly assign a new source to the variable holding the |
117 | /// `TransientSource`, the inner source will be dropped before it can be |
118 | /// unregistered. For example: |
119 | /// |
120 | /// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193 |
121 | /// self.mpsc_receiver = Default::default(); |
122 | /// self.mpsc_receiver = new_channel.into(); |
123 | /// ``` |
124 | #[derive (Debug, Default)] |
125 | pub struct TransientSource<T> { |
126 | state: TransientSourceState<T>, |
127 | } |
128 | |
129 | /// This is the internal state of the [`TransientSource`], as a separate type so |
130 | /// it's not exposed. |
131 | #[derive (Debug)] |
132 | enum TransientSourceState<T> { |
133 | /// The source should be kept in the loop. |
134 | Keep(T), |
135 | /// The source needs to be registered with the loop. |
136 | Register(T), |
137 | /// The source needs to be disabled but kept. |
138 | Disable(T), |
139 | /// The source needs to be removed from the loop. |
140 | Remove(T), |
141 | /// The source is being replaced by another. For most API purposes (eg. |
142 | /// `map()`), this will be treated as the `Register` state enclosing the new |
143 | /// source. |
144 | Replace { |
145 | /// The new source, which will be registered and used from now on. |
146 | new: T, |
147 | /// The old source, which will be unregistered and dropped. |
148 | old: T, |
149 | }, |
150 | /// The source has been removed from the loop and dropped (this might also |
151 | /// be observed if there is a panic while changing states). |
152 | None, |
153 | } |
154 | |
155 | impl<T> Default for TransientSourceState<T> { |
156 | fn default() -> Self { |
157 | Self::None |
158 | } |
159 | } |
160 | |
161 | impl<T> TransientSourceState<T> { |
162 | /// If a caller needs to flag the contained source for removal or |
163 | /// registration, we need to replace the enum variant safely. This requires |
164 | /// having a `None` value in there temporarily while we do the swap. |
165 | /// |
166 | /// If the variant is `None` the value will not change and `replacer` will |
167 | /// not be called. If the variant is `Replace` then `replacer` will be |
168 | /// called **on the new source**, which may cause the old source to leak |
169 | /// registration in the event loop if it has not yet been unregistered. |
170 | /// |
171 | /// The `replacer` function here is expected to be one of the enum variant |
172 | /// constructors eg. `replace(TransientSource::Remove)`. |
173 | fn replace_state<F>(&mut self, replacer: F) |
174 | where |
175 | F: FnOnce(T) -> Self, |
176 | { |
177 | *self = match std::mem::take(self) { |
178 | Self::Keep(source) |
179 | | Self::Register(source) |
180 | | Self::Remove(source) |
181 | | Self::Disable(source) |
182 | | Self::Replace { new: source, .. } => replacer(source), |
183 | Self::None => return, |
184 | }; |
185 | } |
186 | } |
187 | |
188 | impl<T> TransientSource<T> { |
189 | /// Apply a function to the enclosed source, if it exists and is not about |
190 | /// to be removed. |
191 | pub fn map<F, U>(&mut self, f: F) -> Option<U> |
192 | where |
193 | F: FnOnce(&mut T) -> U, |
194 | { |
195 | match &mut self.state { |
196 | TransientSourceState::Keep(source) |
197 | | TransientSourceState::Register(source) |
198 | | TransientSourceState::Disable(source) |
199 | | TransientSourceState::Replace { new: source, .. } => Some(f(source)), |
200 | TransientSourceState::Remove(_) | TransientSourceState::None => None, |
201 | } |
202 | } |
203 | |
204 | /// Returns `true` if there is no wrapped event source. |
205 | pub fn is_none(&self) -> bool { |
206 | matches!(self.state, TransientSourceState::None) |
207 | } |
208 | |
209 | /// Removes the wrapped event source from the event loop and this wrapper. |
210 | /// |
211 | /// If this is called from outside of the event loop, you will need to wake |
212 | /// up the event loop for any changes to take place. If it is called from |
213 | /// within the event loop, you must return `PostAction::Reregister` from |
214 | /// your own event source's `process_events()`, and the source will be |
215 | /// unregistered as needed after it exits. |
216 | pub fn remove(&mut self) { |
217 | self.state.replace_state(TransientSourceState::Remove); |
218 | } |
219 | |
220 | /// Replace the currently wrapped source with the given one. No more events |
221 | /// will be generated from the old source after this point. The old source |
222 | /// will not be dropped immediately, it will be kept so that it can be |
223 | /// deregistered. |
224 | /// |
225 | /// If this is called from outside of the event loop, you will need to wake |
226 | /// up the event loop for any changes to take place. If it is called from |
227 | /// within the event loop, you must return `PostAction::Reregister` from |
228 | /// your own event source's `process_events()`, and the sources will be |
229 | /// registered and unregistered as needed after it exits. |
230 | pub fn replace(&mut self, new: T) { |
231 | self.state |
232 | .replace_state(|old| TransientSourceState::Replace { new, old }); |
233 | } |
234 | } |
235 | |
236 | impl<T: crate::EventSource> From<T> for TransientSource<T> { |
237 | fn from(source: T) -> Self { |
238 | Self { |
239 | state: TransientSourceState::Register(source), |
240 | } |
241 | } |
242 | } |
243 | |
244 | impl<T: crate::EventSource> crate::EventSource for TransientSource<T> { |
245 | type Event = T::Event; |
246 | type Metadata = T::Metadata; |
247 | type Ret = T::Ret; |
248 | type Error = T::Error; |
249 | |
250 | fn process_events<F>( |
251 | &mut self, |
252 | readiness: crate::Readiness, |
253 | token: crate::Token, |
254 | callback: F, |
255 | ) -> Result<crate::PostAction, Self::Error> |
256 | where |
257 | F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
258 | { |
259 | let reregister = if let TransientSourceState::Keep(source) = &mut self.state { |
260 | let child_post_action = source.process_events(readiness, token, callback)?; |
261 | |
262 | match child_post_action { |
263 | // Nothing needs to change. |
264 | crate::PostAction::Continue => false, |
265 | |
266 | // Our child source needs re-registration, therefore this |
267 | // wrapper needs re-registration. |
268 | crate::PostAction::Reregister => true, |
269 | |
270 | // If our nested source needs to be removed or disabled, we need |
271 | // to swap it out for the "Remove" or "Disable" variant. |
272 | crate::PostAction::Disable => { |
273 | self.state.replace_state(TransientSourceState::Disable); |
274 | true |
275 | } |
276 | |
277 | crate::PostAction::Remove => { |
278 | self.state.replace_state(TransientSourceState::Remove); |
279 | true |
280 | } |
281 | } |
282 | } else { |
283 | false |
284 | }; |
285 | |
286 | let post_action = if reregister { |
287 | crate::PostAction::Reregister |
288 | } else { |
289 | crate::PostAction::Continue |
290 | }; |
291 | |
292 | Ok(post_action) |
293 | } |
294 | |
295 | fn register( |
296 | &mut self, |
297 | poll: &mut crate::Poll, |
298 | token_factory: &mut crate::TokenFactory, |
299 | ) -> crate::Result<()> { |
300 | match &mut self.state { |
301 | TransientSourceState::Keep(source) => { |
302 | source.register(poll, token_factory)?; |
303 | } |
304 | TransientSourceState::Register(source) |
305 | | TransientSourceState::Disable(source) |
306 | | TransientSourceState::Replace { new: source, .. } => { |
307 | source.register(poll, token_factory)?; |
308 | self.state.replace_state(TransientSourceState::Keep); |
309 | // Drops the disposed source in the Replace case. |
310 | } |
311 | TransientSourceState::Remove(_source) => { |
312 | self.state.replace_state(|_| TransientSourceState::None); |
313 | } |
314 | TransientSourceState::None => (), |
315 | } |
316 | Ok(()) |
317 | } |
318 | |
319 | fn reregister( |
320 | &mut self, |
321 | poll: &mut crate::Poll, |
322 | token_factory: &mut crate::TokenFactory, |
323 | ) -> crate::Result<()> { |
324 | match &mut self.state { |
325 | TransientSourceState::Keep(source) => source.reregister(poll, token_factory)?, |
326 | TransientSourceState::Register(source) => { |
327 | source.register(poll, token_factory)?; |
328 | self.state.replace_state(TransientSourceState::Keep); |
329 | } |
330 | TransientSourceState::Disable(source) => { |
331 | source.unregister(poll)?; |
332 | } |
333 | TransientSourceState::Remove(source) => { |
334 | source.unregister(poll)?; |
335 | self.state.replace_state(|_| TransientSourceState::None); |
336 | } |
337 | TransientSourceState::Replace { new, old } => { |
338 | old.unregister(poll)?; |
339 | new.register(poll, token_factory)?; |
340 | self.state.replace_state(TransientSourceState::Keep); |
341 | // Drops 'dispose'. |
342 | } |
343 | TransientSourceState::None => (), |
344 | } |
345 | Ok(()) |
346 | } |
347 | |
348 | fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> { |
349 | match &mut self.state { |
350 | TransientSourceState::Keep(source) |
351 | | TransientSourceState::Register(source) |
352 | | TransientSourceState::Disable(source) => source.unregister(poll)?, |
353 | TransientSourceState::Remove(source) => { |
354 | source.unregister(poll)?; |
355 | self.state.replace_state(|_| TransientSourceState::None); |
356 | } |
357 | TransientSourceState::Replace { new, old } => { |
358 | old.unregister(poll)?; |
359 | new.unregister(poll)?; |
360 | self.state.replace_state(TransientSourceState::Register); |
361 | } |
362 | TransientSourceState::None => (), |
363 | } |
364 | Ok(()) |
365 | } |
366 | } |
367 | |
368 | #[cfg (test)] |
369 | mod tests { |
370 | use super::*; |
371 | use crate::{ |
372 | channel::{channel, Channel, Event}, |
373 | ping::{make_ping, PingSource}, |
374 | Dispatcher, EventSource, PostAction, |
375 | }; |
376 | use std::{ |
377 | rc::Rc, |
378 | sync::atomic::{AtomicBool, Ordering}, |
379 | time::Duration, |
380 | }; |
381 | |
382 | #[test ] |
383 | fn test_transient_drop() { |
384 | // A test source that sets a flag when it's dropped. |
385 | struct TestSource<'a> { |
386 | dropped: &'a AtomicBool, |
387 | ping: PingSource, |
388 | } |
389 | |
390 | impl<'a> Drop for TestSource<'a> { |
391 | fn drop(&mut self) { |
392 | self.dropped.store(true, Ordering::Relaxed) |
393 | } |
394 | } |
395 | |
396 | impl<'a> crate::EventSource for TestSource<'a> { |
397 | type Event = (); |
398 | type Metadata = (); |
399 | type Ret = (); |
400 | type Error = Box<dyn std::error::Error + Sync + Send>; |
401 | |
402 | fn process_events<F>( |
403 | &mut self, |
404 | readiness: crate::Readiness, |
405 | token: crate::Token, |
406 | callback: F, |
407 | ) -> Result<crate::PostAction, Self::Error> |
408 | where |
409 | F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
410 | { |
411 | self.ping.process_events(readiness, token, callback)?; |
412 | Ok(PostAction::Remove) |
413 | } |
414 | |
415 | fn register( |
416 | &mut self, |
417 | poll: &mut crate::Poll, |
418 | token_factory: &mut crate::TokenFactory, |
419 | ) -> crate::Result<()> { |
420 | self.ping.register(poll, token_factory) |
421 | } |
422 | |
423 | fn reregister( |
424 | &mut self, |
425 | poll: &mut crate::Poll, |
426 | token_factory: &mut crate::TokenFactory, |
427 | ) -> crate::Result<()> { |
428 | self.ping.reregister(poll, token_factory) |
429 | } |
430 | |
431 | fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> { |
432 | self.ping.unregister(poll) |
433 | } |
434 | } |
435 | |
436 | // Test that the inner source is actually dropped when it asks to be |
437 | // removed from the loop, while the TransientSource remains. We use two |
438 | // flags for this: |
439 | // - fired: should be set only when the inner event source has an event |
440 | // - dropped: set by the drop handler for the inner source (it's an |
441 | // AtomicBool becaues it requires a longer lifetime than the fired |
442 | // flag) |
443 | let mut fired = false; |
444 | let dropped = false.into(); |
445 | |
446 | // The inner source that should be dropped after the first loop run. |
447 | let (pinger, ping) = make_ping().unwrap(); |
448 | let inner = TestSource { |
449 | dropped: &dropped, |
450 | ping, |
451 | }; |
452 | |
453 | // The TransientSource wrapper. |
454 | let outer: TransientSource<_> = inner.into(); |
455 | |
456 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
457 | let handle = event_loop.handle(); |
458 | |
459 | let _token = handle |
460 | .insert_source(outer, |_, _, fired| { |
461 | *fired = true; |
462 | }) |
463 | .unwrap(); |
464 | |
465 | // First loop run: the ping generates an event for the inner source. |
466 | pinger.ping(); |
467 | |
468 | event_loop.dispatch(Duration::ZERO, &mut fired).unwrap(); |
469 | |
470 | assert!(fired); |
471 | assert!(dropped.load(Ordering::Relaxed)); |
472 | |
473 | // Second loop run: the ping does nothing because the receiver has been |
474 | // dropped. |
475 | fired = false; |
476 | |
477 | pinger.ping(); |
478 | |
479 | event_loop.dispatch(Duration::ZERO, &mut fired).unwrap(); |
480 | assert!(!fired); |
481 | } |
482 | |
483 | #[test ] |
484 | fn test_transient_passthrough() { |
485 | // Test that event processing works when a source is nested inside a |
486 | // TransientSource. In particular, we want to ensure that the final |
487 | // event is received even if it corresponds to that same event source |
488 | // returning `PostAction::Remove`. |
489 | let (sender, receiver) = channel(); |
490 | let outer: TransientSource<_> = receiver.into(); |
491 | |
492 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
493 | let handle = event_loop.handle(); |
494 | |
495 | // Our callback puts the receied events in here for us to check later. |
496 | let mut msg_queue = vec![]; |
497 | |
498 | let _token = handle |
499 | .insert_source(outer, |msg, _, queue: &mut Vec<_>| { |
500 | queue.push(msg); |
501 | }) |
502 | .unwrap(); |
503 | |
504 | // Send some data and drop the sender. We specifically want to test that |
505 | // we get the "closed" message. |
506 | sender.send(0u32).unwrap(); |
507 | sender.send(1u32).unwrap(); |
508 | sender.send(2u32).unwrap(); |
509 | sender.send(3u32).unwrap(); |
510 | drop(sender); |
511 | |
512 | // Run loop once to process events. |
513 | event_loop.dispatch(Duration::ZERO, &mut msg_queue).unwrap(); |
514 | |
515 | assert!(matches!( |
516 | msg_queue.as_slice(), |
517 | &[ |
518 | Event::Msg(0u32), |
519 | Event::Msg(1u32), |
520 | Event::Msg(2u32), |
521 | Event::Msg(3u32), |
522 | Event::Closed |
523 | ] |
524 | )); |
525 | } |
526 | |
527 | #[test ] |
528 | fn test_transient_map() { |
529 | struct IdSource { |
530 | id: u32, |
531 | ping: PingSource, |
532 | } |
533 | |
534 | impl EventSource for IdSource { |
535 | type Event = u32; |
536 | type Metadata = (); |
537 | type Ret = (); |
538 | type Error = Box<dyn std::error::Error + Sync + Send>; |
539 | |
540 | fn process_events<F>( |
541 | &mut self, |
542 | readiness: crate::Readiness, |
543 | token: crate::Token, |
544 | mut callback: F, |
545 | ) -> Result<PostAction, Self::Error> |
546 | where |
547 | F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
548 | { |
549 | let id = self.id; |
550 | self.ping |
551 | .process_events(readiness, token, |_, md| callback(id, md))?; |
552 | |
553 | let action = if self.id > 2 { |
554 | PostAction::Remove |
555 | } else { |
556 | PostAction::Continue |
557 | }; |
558 | |
559 | Ok(action) |
560 | } |
561 | |
562 | fn register( |
563 | &mut self, |
564 | poll: &mut crate::Poll, |
565 | token_factory: &mut crate::TokenFactory, |
566 | ) -> crate::Result<()> { |
567 | self.ping.register(poll, token_factory) |
568 | } |
569 | |
570 | fn reregister( |
571 | &mut self, |
572 | poll: &mut crate::Poll, |
573 | token_factory: &mut crate::TokenFactory, |
574 | ) -> crate::Result<()> { |
575 | self.ping.reregister(poll, token_factory) |
576 | } |
577 | |
578 | fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> { |
579 | self.ping.unregister(poll) |
580 | } |
581 | } |
582 | |
583 | struct WrapperSource(TransientSource<IdSource>); |
584 | |
585 | impl EventSource for WrapperSource { |
586 | type Event = <IdSource as EventSource>::Event; |
587 | type Metadata = <IdSource as EventSource>::Metadata; |
588 | type Ret = <IdSource as EventSource>::Ret; |
589 | type Error = <IdSource as EventSource>::Error; |
590 | |
591 | fn process_events<F>( |
592 | &mut self, |
593 | readiness: crate::Readiness, |
594 | token: crate::Token, |
595 | callback: F, |
596 | ) -> Result<PostAction, Self::Error> |
597 | where |
598 | F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
599 | { |
600 | let action = self.0.process_events(readiness, token, callback); |
601 | self.0.map(|inner| inner.id += 1); |
602 | action |
603 | } |
604 | |
605 | fn register( |
606 | &mut self, |
607 | poll: &mut crate::Poll, |
608 | token_factory: &mut crate::TokenFactory, |
609 | ) -> crate::Result<()> { |
610 | self.0.map(|inner| inner.id += 1); |
611 | self.0.register(poll, token_factory) |
612 | } |
613 | |
614 | fn reregister( |
615 | &mut self, |
616 | poll: &mut crate::Poll, |
617 | token_factory: &mut crate::TokenFactory, |
618 | ) -> crate::Result<()> { |
619 | self.0.map(|inner| inner.id += 1); |
620 | self.0.reregister(poll, token_factory) |
621 | } |
622 | |
623 | fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> { |
624 | self.0.map(|inner| inner.id += 1); |
625 | self.0.unregister(poll) |
626 | } |
627 | } |
628 | |
629 | // To test the id later. |
630 | let mut id = 0; |
631 | |
632 | // Create our source. |
633 | let (pinger, ping) = make_ping().unwrap(); |
634 | let inner = IdSource { id, ping }; |
635 | |
636 | // The TransientSource wrapper. |
637 | let outer: TransientSource<_> = inner.into(); |
638 | |
639 | // The top level source. |
640 | let top = WrapperSource(outer); |
641 | |
642 | // Create a dispatcher so we can check the source afterwards. |
643 | let dispatcher = Dispatcher::new(top, |got_id, _, test_id| { |
644 | *test_id = got_id; |
645 | }); |
646 | |
647 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
648 | let handle = event_loop.handle(); |
649 | |
650 | let token = handle.register_dispatcher(dispatcher.clone()).unwrap(); |
651 | |
652 | // First loop run: the ping generates an event for the inner source. |
653 | // The ID should be 1 after the increment in register(). |
654 | pinger.ping(); |
655 | event_loop.dispatch(Duration::ZERO, &mut id).unwrap(); |
656 | assert_eq!(id, 1); |
657 | |
658 | // Second loop run: the ID should be 2 after the previous |
659 | // process_events(). |
660 | pinger.ping(); |
661 | event_loop.dispatch(Duration::ZERO, &mut id).unwrap(); |
662 | assert_eq!(id, 2); |
663 | |
664 | // Third loop run: the ID should be 3 after another process_events(). |
665 | pinger.ping(); |
666 | event_loop.dispatch(Duration::ZERO, &mut id).unwrap(); |
667 | assert_eq!(id, 3); |
668 | |
669 | // Fourth loop run: the callback is no longer called by the inner |
670 | // source, so our local ID is not incremented. |
671 | pinger.ping(); |
672 | event_loop.dispatch(Duration::ZERO, &mut id).unwrap(); |
673 | assert_eq!(id, 3); |
674 | |
675 | // Remove the dispatcher so we can inspect the sources. |
676 | handle.remove(token); |
677 | |
678 | let mut top_after = dispatcher.into_source_inner(); |
679 | |
680 | // I expect the inner source to be dropped, so the TransientSource |
681 | // variant is None (its version of None, not Option::None), so its map() |
682 | // won't call the passed-in function (hence the unreachable!()) and its |
683 | // return value should be Option::None. |
684 | assert!(top_after.0.map(|_| unreachable!()).is_none()); |
685 | } |
686 | |
687 | #[test ] |
688 | fn test_transient_disable() { |
689 | // Test that disabling and enabling is handled properly. |
690 | struct DisablingSource(PingSource); |
691 | |
692 | impl EventSource for DisablingSource { |
693 | type Event = (); |
694 | type Metadata = (); |
695 | type Ret = (); |
696 | type Error = Box<dyn std::error::Error + Sync + Send>; |
697 | |
698 | fn process_events<F>( |
699 | &mut self, |
700 | readiness: crate::Readiness, |
701 | token: crate::Token, |
702 | callback: F, |
703 | ) -> Result<PostAction, Self::Error> |
704 | where |
705 | F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
706 | { |
707 | self.0.process_events(readiness, token, callback)?; |
708 | Ok(PostAction::Disable) |
709 | } |
710 | |
711 | fn register( |
712 | &mut self, |
713 | poll: &mut crate::Poll, |
714 | token_factory: &mut crate::TokenFactory, |
715 | ) -> crate::Result<()> { |
716 | self.0.register(poll, token_factory) |
717 | } |
718 | |
719 | fn reregister( |
720 | &mut self, |
721 | poll: &mut crate::Poll, |
722 | token_factory: &mut crate::TokenFactory, |
723 | ) -> crate::Result<()> { |
724 | self.0.reregister(poll, token_factory) |
725 | } |
726 | |
727 | fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> { |
728 | self.0.unregister(poll) |
729 | } |
730 | } |
731 | |
732 | // Flag for checking when the source fires. |
733 | let mut fired = false; |
734 | |
735 | // Create our source. |
736 | let (pinger, ping) = make_ping().unwrap(); |
737 | |
738 | let inner = DisablingSource(ping); |
739 | |
740 | // The TransientSource wrapper. |
741 | let outer: TransientSource<_> = inner.into(); |
742 | |
743 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
744 | let handle = event_loop.handle(); |
745 | let token = handle |
746 | .insert_source(outer, |_, _, fired| { |
747 | *fired = true; |
748 | }) |
749 | .unwrap(); |
750 | |
751 | // Ping here and not later, to check that disabling after an event is |
752 | // triggered but not processed does not discard the event. |
753 | pinger.ping(); |
754 | event_loop.dispatch(Duration::ZERO, &mut fired).unwrap(); |
755 | assert!(fired); |
756 | |
757 | // Source should now be disabled. |
758 | pinger.ping(); |
759 | fired = false; |
760 | event_loop.dispatch(Duration::ZERO, &mut fired).unwrap(); |
761 | assert!(!fired); |
762 | |
763 | // Re-enable the source. |
764 | handle.enable(&token).unwrap(); |
765 | |
766 | // Trigger another event. |
767 | pinger.ping(); |
768 | fired = false; |
769 | event_loop.dispatch(Duration::ZERO, &mut fired).unwrap(); |
770 | assert!(fired); |
771 | } |
772 | |
773 | #[test ] |
774 | fn test_transient_replace_unregister() { |
775 | // This is a bit of a complex test, but it essentially boils down to: |
776 | // how can a "parent" event source containing a TransientSource replace |
777 | // the "child" source without leaking the source's registration? |
778 | |
779 | // First, a source that finishes immediately. This is so we cover the |
780 | // edge case of replacing a source as soon as it wants to be removed. |
781 | struct FinishImmediatelySource { |
782 | source: PingSource, |
783 | data: Option<i32>, |
784 | registered: bool, |
785 | dropped: Rc<AtomicBool>, |
786 | } |
787 | |
788 | impl FinishImmediatelySource { |
789 | // The constructor passes out the drop flag so we can check that |
790 | // this source was or wasn't dropped. |
791 | fn new(source: PingSource, data: i32) -> (Self, Rc<AtomicBool>) { |
792 | let dropped = Rc::new(false.into()); |
793 | |
794 | ( |
795 | Self { |
796 | source, |
797 | data: Some(data), |
798 | registered: false, |
799 | dropped: Rc::clone(&dropped), |
800 | }, |
801 | dropped, |
802 | ) |
803 | } |
804 | } |
805 | |
806 | impl EventSource for FinishImmediatelySource { |
807 | type Event = i32; |
808 | type Metadata = (); |
809 | type Ret = (); |
810 | type Error = Box<dyn std::error::Error + Sync + Send>; |
811 | |
812 | fn process_events<F>( |
813 | &mut self, |
814 | readiness: crate::Readiness, |
815 | token: crate::Token, |
816 | mut callback: F, |
817 | ) -> Result<PostAction, Self::Error> |
818 | where |
819 | F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
820 | { |
821 | let mut data = self.data.take(); |
822 | |
823 | self.source.process_events(readiness, token, |_, _| { |
824 | if let Some(data) = data.take() { |
825 | callback(data, &mut ()) |
826 | } |
827 | })?; |
828 | |
829 | self.data = data; |
830 | |
831 | Ok(if self.data.is_none() { |
832 | PostAction::Remove |
833 | } else { |
834 | PostAction::Continue |
835 | }) |
836 | } |
837 | |
838 | fn register( |
839 | &mut self, |
840 | poll: &mut crate::Poll, |
841 | token_factory: &mut crate::TokenFactory, |
842 | ) -> crate::Result<()> { |
843 | self.registered = true; |
844 | self.source.register(poll, token_factory) |
845 | } |
846 | |
847 | fn reregister( |
848 | &mut self, |
849 | poll: &mut crate::Poll, |
850 | token_factory: &mut crate::TokenFactory, |
851 | ) -> crate::Result<()> { |
852 | self.source.reregister(poll, token_factory) |
853 | } |
854 | |
855 | fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> { |
856 | self.registered = false; |
857 | self.source.unregister(poll) |
858 | } |
859 | } |
860 | |
861 | // The drop handler sets a flag we can check for debugging (we want to |
862 | // know that the source itself was dropped), and also checks that the |
863 | // source was unregistered. Ultimately neither the source nor its |
864 | // registration should be leaked. |
865 | |
866 | impl Drop for FinishImmediatelySource { |
867 | fn drop(&mut self) { |
868 | assert!(!self.registered, "source dropped while still registered" ); |
869 | self.dropped.store(true, Ordering::Relaxed); |
870 | } |
871 | } |
872 | |
873 | // Our wrapper source handles detecting when the child source finishes, |
874 | // and replacing that child source with another one that will generate |
875 | // more events. This is one intended use case of the TransientSource. |
876 | |
877 | struct WrapperSource { |
878 | current: TransientSource<FinishImmediatelySource>, |
879 | replacement: Option<FinishImmediatelySource>, |
880 | dropped: Rc<AtomicBool>, |
881 | } |
882 | |
883 | impl WrapperSource { |
884 | // The constructor passes out the drop flag so we can check that |
885 | // this source was or wasn't dropped. |
886 | fn new( |
887 | first: FinishImmediatelySource, |
888 | second: FinishImmediatelySource, |
889 | ) -> (Self, Rc<AtomicBool>) { |
890 | let dropped = Rc::new(false.into()); |
891 | |
892 | ( |
893 | Self { |
894 | current: first.into(), |
895 | replacement: second.into(), |
896 | dropped: Rc::clone(&dropped), |
897 | }, |
898 | dropped, |
899 | ) |
900 | } |
901 | } |
902 | |
903 | impl EventSource for WrapperSource { |
904 | type Event = i32; |
905 | type Metadata = (); |
906 | type Ret = (); |
907 | type Error = Box<dyn std::error::Error + Sync + Send>; |
908 | |
909 | fn process_events<F>( |
910 | &mut self, |
911 | readiness: crate::Readiness, |
912 | token: crate::Token, |
913 | mut callback: F, |
914 | ) -> Result<PostAction, Self::Error> |
915 | where |
916 | F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
917 | { |
918 | // Did our event source generate an event? |
919 | let mut fired = false; |
920 | |
921 | let post_action = self.current.process_events(readiness, token, |data, _| { |
922 | callback(data, &mut ()); |
923 | fired = true; |
924 | })?; |
925 | |
926 | if fired { |
927 | // The event source will be unregistered after the current |
928 | // process_events() iteration is finished. The replace() |
929 | // method will handle doing that even while we've added a |
930 | // new source. |
931 | if let Some(replacement) = self.replacement.take() { |
932 | self.current.replace(replacement); |
933 | } |
934 | |
935 | // Parent source is responsible for flagging this, but it's |
936 | // already set. |
937 | assert_eq!(post_action, PostAction::Reregister); |
938 | } |
939 | |
940 | Ok(post_action) |
941 | } |
942 | |
943 | fn register( |
944 | &mut self, |
945 | poll: &mut crate::Poll, |
946 | token_factory: &mut crate::TokenFactory, |
947 | ) -> crate::Result<()> { |
948 | self.current.register(poll, token_factory) |
949 | } |
950 | |
951 | fn reregister( |
952 | &mut self, |
953 | poll: &mut crate::Poll, |
954 | token_factory: &mut crate::TokenFactory, |
955 | ) -> crate::Result<()> { |
956 | self.current.reregister(poll, token_factory) |
957 | } |
958 | |
959 | fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> { |
960 | self.current.unregister(poll) |
961 | } |
962 | } |
963 | |
964 | impl Drop for WrapperSource { |
965 | fn drop(&mut self) { |
966 | self.dropped.store(true, Ordering::Relaxed); |
967 | } |
968 | } |
969 | |
970 | // Construct the various nested sources - FinishImmediatelySource inside |
971 | // TransientSource inside WrapperSource. The numbers let us verify which |
972 | // event source fires first. |
973 | let (ping0_tx, ping0_rx) = crate::ping::make_ping().unwrap(); |
974 | let (ping1_tx, ping1_rx) = crate::ping::make_ping().unwrap(); |
975 | let (inner0, inner0_dropped) = FinishImmediatelySource::new(ping0_rx, 0); |
976 | let (inner1, inner1_dropped) = FinishImmediatelySource::new(ping1_rx, 1); |
977 | let (outer, outer_dropped) = WrapperSource::new(inner0, inner1); |
978 | |
979 | // Now the actual test starts. |
980 | |
981 | let mut event_loop: crate::EventLoop<(Option<i32>, crate::LoopSignal)> = |
982 | crate::EventLoop::try_new().unwrap(); |
983 | let handle = event_loop.handle(); |
984 | let signal = event_loop.get_signal(); |
985 | |
986 | // This is how we communicate with the event sources. |
987 | let mut context = (None, signal); |
988 | |
989 | let _token = handle |
990 | .insert_source(outer, |data, _, (evt, sig)| { |
991 | *evt = Some(data); |
992 | sig.stop(); |
993 | }) |
994 | .unwrap(); |
995 | |
996 | // Ensure our sources fire. |
997 | ping0_tx.ping(); |
998 | ping1_tx.ping(); |
999 | |
1000 | // Use run() rather than dispatch() because it's not strictly part of |
1001 | // any API contract as to how many runs of the event loop it takes to |
1002 | // replace the nested source. |
1003 | event_loop.run(None, &mut context, |_| {}).unwrap(); |
1004 | |
1005 | // First, make sure the inner source actually did fire. |
1006 | assert_eq!(context.0.take(), Some(0), "first inner source did not fire" ); |
1007 | |
1008 | // Make sure that the outer source is still alive. |
1009 | assert!( |
1010 | !outer_dropped.load(Ordering::Relaxed), |
1011 | "outer source already dropped" |
1012 | ); |
1013 | |
1014 | // Make sure that the inner child source IS dropped now. |
1015 | assert!( |
1016 | inner0_dropped.load(Ordering::Relaxed), |
1017 | "first inner source not dropped" |
1018 | ); |
1019 | |
1020 | // Make sure that, in between the first event and second event, the |
1021 | // replacement child source still exists. |
1022 | assert!( |
1023 | !inner1_dropped.load(Ordering::Relaxed), |
1024 | "replacement inner source dropped" |
1025 | ); |
1026 | |
1027 | // Run the event loop until we get a second event. |
1028 | event_loop.run(None, &mut context, |_| {}).unwrap(); |
1029 | |
1030 | // Ensure the replacement source fired (which checks that it was |
1031 | // registered and is being processed by the TransientSource). |
1032 | assert_eq!(context.0.take(), Some(1), "replacement source did not fire" ); |
1033 | } |
1034 | |
1035 | #[test ] |
1036 | fn test_transient_remove() { |
1037 | // This tests that calling remove(), even before an event source has |
1038 | // requested its own removal, results in the event source being removed. |
1039 | |
1040 | const STOP_AT: i32 = 2; |
1041 | |
1042 | // A wrapper source to automate the removal of the inner source. |
1043 | struct WrapperSource { |
1044 | inner: TransientSource<Channel<i32>>, |
1045 | } |
1046 | |
1047 | impl EventSource for WrapperSource { |
1048 | type Event = i32; |
1049 | type Metadata = (); |
1050 | type Ret = (); |
1051 | type Error = Box<dyn std::error::Error + Sync + Send>; |
1052 | |
1053 | fn process_events<F>( |
1054 | &mut self, |
1055 | readiness: crate::Readiness, |
1056 | token: crate::Token, |
1057 | mut callback: F, |
1058 | ) -> Result<PostAction, Self::Error> |
1059 | where |
1060 | F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
1061 | { |
1062 | let mut remove = false; |
1063 | |
1064 | let mut post_action = self.inner.process_events(readiness, token, |evt, _| { |
1065 | if let Event::Msg(num) = evt { |
1066 | callback(num, &mut ()); |
1067 | remove = num >= STOP_AT; |
1068 | } |
1069 | })?; |
1070 | |
1071 | if remove { |
1072 | self.inner.remove(); |
1073 | post_action |= PostAction::Reregister; |
1074 | } |
1075 | |
1076 | Ok(post_action) |
1077 | } |
1078 | |
1079 | fn register( |
1080 | &mut self, |
1081 | poll: &mut crate::Poll, |
1082 | token_factory: &mut crate::TokenFactory, |
1083 | ) -> crate::Result<()> { |
1084 | self.inner.register(poll, token_factory) |
1085 | } |
1086 | |
1087 | fn reregister( |
1088 | &mut self, |
1089 | poll: &mut crate::Poll, |
1090 | token_factory: &mut crate::TokenFactory, |
1091 | ) -> crate::Result<()> { |
1092 | self.inner.reregister(poll, token_factory) |
1093 | } |
1094 | |
1095 | fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> { |
1096 | self.inner.unregister(poll) |
1097 | } |
1098 | } |
1099 | |
1100 | // Create our sources and loop. |
1101 | |
1102 | let (sender, receiver) = channel(); |
1103 | let wrapper = WrapperSource { |
1104 | inner: receiver.into(), |
1105 | }; |
1106 | |
1107 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
1108 | let handle = event_loop.handle(); |
1109 | |
1110 | handle |
1111 | .insert_source(wrapper, |num, _, out: &mut Option<_>| { |
1112 | *out = Some(num); |
1113 | }) |
1114 | .unwrap(); |
1115 | |
1116 | // Storage for callback data. |
1117 | let mut out = None; |
1118 | |
1119 | // Send some data we expect to get callbacks for. |
1120 | for num in 0..=STOP_AT { |
1121 | sender.send(num).unwrap(); |
1122 | event_loop.dispatch(Duration::ZERO, &mut out).unwrap(); |
1123 | assert_eq!(out.take(), Some(num)); |
1124 | } |
1125 | |
1126 | // Now we expect the receiver to be gone. |
1127 | assert!(matches!( |
1128 | sender.send(STOP_AT + 1), |
1129 | Err(std::sync::mpsc::SendError { .. }) |
1130 | )); |
1131 | } |
1132 | } |
1133 | |