| 1 | //! A shutdown channel. |
| 2 | //! |
| 3 | //! Each worker holds the `Sender` half. When all the `Sender` halves are |
| 4 | //! dropped, the `Receiver` receives a notification. |
| 5 | |
| 6 | use crate::loom::sync::Arc; |
| 7 | use crate::sync::oneshot; |
| 8 | |
| 9 | use std::time::Duration; |
| 10 | |
| 11 | #[derive (Debug, Clone)] |
| 12 | pub(super) struct Sender { |
| 13 | _tx: Arc<oneshot::Sender<()>>, |
| 14 | } |
| 15 | |
| 16 | #[derive (Debug)] |
| 17 | pub(super) struct Receiver { |
| 18 | rx: oneshot::Receiver<()>, |
| 19 | } |
| 20 | |
| 21 | pub(super) fn channel() -> (Sender, Receiver) { |
| 22 | let (tx: Sender<()>, rx: Receiver<()>) = oneshot::channel(); |
| 23 | let tx: Sender = Sender { _tx: Arc::new(data:tx) }; |
| 24 | let rx: Receiver = Receiver { rx }; |
| 25 | |
| 26 | (tx, rx) |
| 27 | } |
| 28 | |
| 29 | impl Receiver { |
| 30 | /// Blocks the current thread until all `Sender` handles drop. |
| 31 | /// |
| 32 | /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout` |
| 33 | /// duration. If `timeout` is `None`, then the thread is blocked until the |
| 34 | /// shutdown signal is received. |
| 35 | /// |
| 36 | /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`. |
| 37 | pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool { |
| 38 | use crate::runtime::context::try_enter_blocking_region; |
| 39 | |
| 40 | if timeout == Some(Duration::from_nanos(0)) { |
| 41 | return false; |
| 42 | } |
| 43 | |
| 44 | let mut e = match try_enter_blocking_region() { |
| 45 | Some(enter) => enter, |
| 46 | _ => { |
| 47 | if std::thread::panicking() { |
| 48 | // Don't panic in a panic |
| 49 | return false; |
| 50 | } else { |
| 51 | panic!( |
| 52 | "Cannot drop a runtime in a context where blocking is not allowed. \ |
| 53 | This happens when a runtime is dropped from within an asynchronous context." |
| 54 | ); |
| 55 | } |
| 56 | } |
| 57 | }; |
| 58 | |
| 59 | // The oneshot completes with an Err |
| 60 | // |
| 61 | // If blocking fails to wait, this indicates a problem parking the |
| 62 | // current thread (usually, shutting down a runtime stored in a |
| 63 | // thread-local). |
| 64 | if let Some(timeout) = timeout { |
| 65 | e.block_on_timeout(&mut self.rx, timeout).is_ok() |
| 66 | } else { |
| 67 | let _ = e.block_on(&mut self.rx); |
| 68 | true |
| 69 | } |
| 70 | } |
| 71 | } |
| 72 | |