1 | //! A shutdown channel. |
2 | //! |
3 | //! Each worker holds the `Sender` half. When all the `Sender` halves are |
4 | //! dropped, the `Receiver` receives a notification. |
5 | |
6 | use crate::loom::sync::Arc; |
7 | use crate::sync::oneshot; |
8 | |
9 | use std::time::Duration; |
10 | |
11 | #[derive(Debug, Clone)] |
12 | pub(super) struct Sender { |
13 | _tx: Arc<oneshot::Sender<()>>, |
14 | } |
15 | |
16 | #[derive(Debug)] |
17 | pub(super) struct Receiver { |
18 | rx: oneshot::Receiver<()>, |
19 | } |
20 | |
21 | pub(super) fn channel() -> (Sender, Receiver) { |
22 | let (tx, rx) = oneshot::channel(); |
23 | let tx = Sender { _tx: Arc::new(tx) }; |
24 | let rx = Receiver { rx }; |
25 | |
26 | (tx, rx) |
27 | } |
28 | |
29 | impl Receiver { |
30 | /// Blocks the current thread until all `Sender` handles drop. |
31 | /// |
32 | /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout` |
33 | /// duration. If `timeout` is `None`, then the thread is blocked until the |
34 | /// shutdown signal is received. |
35 | /// |
36 | /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`. |
37 | pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool { |
38 | use crate::runtime::context::try_enter_blocking_region; |
39 | |
40 | if timeout == Some(Duration::from_nanos(0)) { |
41 | return false; |
42 | } |
43 | |
44 | let mut e = match try_enter_blocking_region() { |
45 | Some(enter) => enter, |
46 | _ => { |
47 | if std::thread::panicking() { |
48 | // Don't panic in a panic |
49 | return false; |
50 | } else { |
51 | panic!( |
52 | "Cannot drop a runtime in a context where blocking is not allowed. \ |
53 | This happens when a runtime is dropped from within an asynchronous context." |
54 | ); |
55 | } |
56 | } |
57 | }; |
58 | |
59 | // The oneshot completes with an Err |
60 | // |
61 | // If blocking fails to wait, this indicates a problem parking the |
62 | // current thread (usually, shutting down a runtime stored in a |
63 | // thread-local). |
64 | if let Some(timeout) = timeout { |
65 | e.block_on_timeout(&mut self.rx, timeout).is_ok() |
66 | } else { |
67 | let _ = e.block_on(&mut self.rx); |
68 | true |
69 | } |
70 | } |
71 | } |
72 | |