| 1 | use crate::cell::Cell; |
| 2 | use crate::sync as public; |
| 3 | use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; |
| 4 | use crate::sync::poison::once::ExclusiveState; |
| 5 | use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake_all}; |
| 6 | |
| 7 | // On some platforms, the OS is very nice and handles the waiter queue for us. |
| 8 | // This means we only need one atomic value with 4 states: |
| 9 | |
| 10 | /// No initialization has run yet, and no thread is currently using the Once. |
| 11 | const INCOMPLETE: Primitive = 0; |
| 12 | /// Some thread has previously attempted to initialize the Once, but it panicked, |
| 13 | /// so the Once is now poisoned. There are no other threads currently accessing |
| 14 | /// this Once. |
| 15 | const POISONED: Primitive = 1; |
| 16 | /// Some thread is currently attempting to run initialization. It may succeed, |
| 17 | /// so all future threads need to wait for it to finish. |
| 18 | const RUNNING: Primitive = 2; |
| 19 | /// Initialization has completed and all future calls should finish immediately. |
| 20 | const COMPLETE: Primitive = 3; |
| 21 | |
| 22 | // An additional bit indicates whether there are waiting threads: |
| 23 | |
| 24 | /// May only be set if the state is not COMPLETE. |
| 25 | const QUEUED: Primitive = 4; |
| 26 | |
| 27 | // Threads wait by setting the QUEUED bit and calling `futex_wait` on the state |
| 28 | // variable. When the running thread finishes, it will wake all waiting threads using |
| 29 | // `futex_wake_all`. |
| 30 | |
| 31 | const STATE_MASK: Primitive = 0b11; |
| 32 | |
| 33 | pub struct OnceState { |
| 34 | poisoned: bool, |
| 35 | set_state_to: Cell<Primitive>, |
| 36 | } |
| 37 | |
| 38 | impl OnceState { |
| 39 | #[inline ] |
| 40 | pub fn is_poisoned(&self) -> bool { |
| 41 | self.poisoned |
| 42 | } |
| 43 | |
| 44 | #[inline ] |
| 45 | pub fn poison(&self) { |
| 46 | self.set_state_to.set(POISONED); |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | struct CompletionGuard<'a> { |
| 51 | state_and_queued: &'a Futex, |
| 52 | set_state_on_drop_to: Primitive, |
| 53 | } |
| 54 | |
| 55 | impl<'a> Drop for CompletionGuard<'a> { |
| 56 | fn drop(&mut self) { |
| 57 | // Use release ordering to propagate changes to all threads checking |
| 58 | // up on the Once. `futex_wake_all` does its own synchronization, hence |
| 59 | // we do not need `AcqRel`. |
| 60 | if self.state_and_queued.swap(self.set_state_on_drop_to, order:Release) & QUEUED != 0 { |
| 61 | futex_wake_all(self.state_and_queued); |
| 62 | } |
| 63 | } |
| 64 | } |
| 65 | |
| 66 | pub struct Once { |
| 67 | state_and_queued: Futex, |
| 68 | } |
| 69 | |
| 70 | impl Once { |
| 71 | #[inline ] |
| 72 | pub const fn new() -> Once { |
| 73 | Once { state_and_queued: Futex::new(INCOMPLETE) } |
| 74 | } |
| 75 | |
| 76 | #[inline ] |
| 77 | pub fn is_completed(&self) -> bool { |
| 78 | // Use acquire ordering to make all initialization changes visible to the |
| 79 | // current thread. |
| 80 | self.state_and_queued.load(Acquire) == COMPLETE |
| 81 | } |
| 82 | |
| 83 | #[inline ] |
| 84 | pub(crate) fn state(&mut self) -> ExclusiveState { |
| 85 | match *self.state_and_queued.get_mut() { |
| 86 | INCOMPLETE => ExclusiveState::Incomplete, |
| 87 | POISONED => ExclusiveState::Poisoned, |
| 88 | COMPLETE => ExclusiveState::Complete, |
| 89 | _ => unreachable!("invalid Once state" ), |
| 90 | } |
| 91 | } |
| 92 | |
| 93 | #[inline ] |
| 94 | pub(crate) fn set_state(&mut self, new_state: ExclusiveState) { |
| 95 | *self.state_and_queued.get_mut() = match new_state { |
| 96 | ExclusiveState::Incomplete => INCOMPLETE, |
| 97 | ExclusiveState::Poisoned => POISONED, |
| 98 | ExclusiveState::Complete => COMPLETE, |
| 99 | }; |
| 100 | } |
| 101 | |
| 102 | #[cold ] |
| 103 | #[track_caller ] |
| 104 | pub fn wait(&self, ignore_poisoning: bool) { |
| 105 | let mut state_and_queued = self.state_and_queued.load(Acquire); |
| 106 | loop { |
| 107 | let state = state_and_queued & STATE_MASK; |
| 108 | let queued = state_and_queued & QUEUED != 0; |
| 109 | match state { |
| 110 | COMPLETE => return, |
| 111 | POISONED if !ignore_poisoning => { |
| 112 | // Panic to propagate the poison. |
| 113 | panic!("Once instance has previously been poisoned" ); |
| 114 | } |
| 115 | _ => { |
| 116 | // Set the QUEUED bit if it has not already been set. |
| 117 | if !queued { |
| 118 | state_and_queued += QUEUED; |
| 119 | if let Err(new) = self.state_and_queued.compare_exchange_weak( |
| 120 | state, |
| 121 | state_and_queued, |
| 122 | Relaxed, |
| 123 | Acquire, |
| 124 | ) { |
| 125 | state_and_queued = new; |
| 126 | continue; |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | futex_wait(&self.state_and_queued, state_and_queued, None); |
| 131 | state_and_queued = self.state_and_queued.load(Acquire); |
| 132 | } |
| 133 | } |
| 134 | } |
| 135 | } |
| 136 | |
| 137 | #[cold ] |
| 138 | #[track_caller ] |
| 139 | pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) { |
| 140 | let mut state_and_queued = self.state_and_queued.load(Acquire); |
| 141 | loop { |
| 142 | let state = state_and_queued & STATE_MASK; |
| 143 | let queued = state_and_queued & QUEUED != 0; |
| 144 | match state { |
| 145 | COMPLETE => return, |
| 146 | POISONED if !ignore_poisoning => { |
| 147 | // Panic to propagate the poison. |
| 148 | panic!("Once instance has previously been poisoned" ); |
| 149 | } |
| 150 | INCOMPLETE | POISONED => { |
| 151 | // Try to register the current thread as the one running. |
| 152 | let next = RUNNING + if queued { QUEUED } else { 0 }; |
| 153 | if let Err(new) = self.state_and_queued.compare_exchange_weak( |
| 154 | state_and_queued, |
| 155 | next, |
| 156 | Acquire, |
| 157 | Acquire, |
| 158 | ) { |
| 159 | state_and_queued = new; |
| 160 | continue; |
| 161 | } |
| 162 | |
| 163 | // `waiter_queue` will manage other waiting threads, and |
| 164 | // wake them up on drop. |
| 165 | let mut waiter_queue = CompletionGuard { |
| 166 | state_and_queued: &self.state_and_queued, |
| 167 | set_state_on_drop_to: POISONED, |
| 168 | }; |
| 169 | // Run the function, letting it know if we're poisoned or not. |
| 170 | let f_state = public::OnceState { |
| 171 | inner: OnceState { |
| 172 | poisoned: state == POISONED, |
| 173 | set_state_to: Cell::new(COMPLETE), |
| 174 | }, |
| 175 | }; |
| 176 | f(&f_state); |
| 177 | waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get(); |
| 178 | return; |
| 179 | } |
| 180 | _ => { |
| 181 | // All other values must be RUNNING. |
| 182 | assert!(state == RUNNING); |
| 183 | |
| 184 | // Set the QUEUED bit if it is not already set. |
| 185 | if !queued { |
| 186 | state_and_queued += QUEUED; |
| 187 | if let Err(new) = self.state_and_queued.compare_exchange_weak( |
| 188 | state, |
| 189 | state_and_queued, |
| 190 | Relaxed, |
| 191 | Acquire, |
| 192 | ) { |
| 193 | state_and_queued = new; |
| 194 | continue; |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | futex_wait(&self.state_and_queued, state_and_queued, None); |
| 199 | state_and_queued = self.state_and_queued.load(Acquire); |
| 200 | } |
| 201 | } |
| 202 | } |
| 203 | } |
| 204 | } |
| 205 | |