1 | use crate::cell::Cell; |
2 | use crate::sync as public; |
3 | use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; |
4 | use crate::sync::poison::once::ExclusiveState; |
5 | use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake_all}; |
6 | |
7 | // On some platforms, the OS is very nice and handles the waiter queue for us. |
8 | // This means we only need one atomic value with 4 states: |
9 | |
10 | /// No initialization has run yet, and no thread is currently using the Once. |
11 | const INCOMPLETE: Primitive = 0; |
12 | /// Some thread has previously attempted to initialize the Once, but it panicked, |
13 | /// so the Once is now poisoned. There are no other threads currently accessing |
14 | /// this Once. |
15 | const POISONED: Primitive = 1; |
16 | /// Some thread is currently attempting to run initialization. It may succeed, |
17 | /// so all future threads need to wait for it to finish. |
18 | const RUNNING: Primitive = 2; |
19 | /// Initialization has completed and all future calls should finish immediately. |
20 | const COMPLETE: Primitive = 3; |
21 | |
22 | // An additional bit indicates whether there are waiting threads: |
23 | |
24 | /// May only be set if the state is not COMPLETE. |
25 | const QUEUED: Primitive = 4; |
26 | |
27 | // Threads wait by setting the QUEUED bit and calling `futex_wait` on the state |
28 | // variable. When the running thread finishes, it will wake all waiting threads using |
29 | // `futex_wake_all`. |
30 | |
31 | const STATE_MASK: Primitive = 0b11; |
32 | |
33 | pub struct OnceState { |
34 | poisoned: bool, |
35 | set_state_to: Cell<Primitive>, |
36 | } |
37 | |
38 | impl OnceState { |
39 | #[inline ] |
40 | pub fn is_poisoned(&self) -> bool { |
41 | self.poisoned |
42 | } |
43 | |
44 | #[inline ] |
45 | pub fn poison(&self) { |
46 | self.set_state_to.set(POISONED); |
47 | } |
48 | } |
49 | |
50 | struct CompletionGuard<'a> { |
51 | state_and_queued: &'a Futex, |
52 | set_state_on_drop_to: Primitive, |
53 | } |
54 | |
55 | impl<'a> Drop for CompletionGuard<'a> { |
56 | fn drop(&mut self) { |
57 | // Use release ordering to propagate changes to all threads checking |
58 | // up on the Once. `futex_wake_all` does its own synchronization, hence |
59 | // we do not need `AcqRel`. |
60 | if self.state_and_queued.swap(self.set_state_on_drop_to, order:Release) & QUEUED != 0 { |
61 | futex_wake_all(self.state_and_queued); |
62 | } |
63 | } |
64 | } |
65 | |
66 | pub struct Once { |
67 | state_and_queued: Futex, |
68 | } |
69 | |
70 | impl Once { |
71 | #[inline ] |
72 | pub const fn new() -> Once { |
73 | Once { state_and_queued: Futex::new(INCOMPLETE) } |
74 | } |
75 | |
76 | #[inline ] |
77 | pub fn is_completed(&self) -> bool { |
78 | // Use acquire ordering to make all initialization changes visible to the |
79 | // current thread. |
80 | self.state_and_queued.load(Acquire) == COMPLETE |
81 | } |
82 | |
83 | #[inline ] |
84 | pub(crate) fn state(&mut self) -> ExclusiveState { |
85 | match *self.state_and_queued.get_mut() { |
86 | INCOMPLETE => ExclusiveState::Incomplete, |
87 | POISONED => ExclusiveState::Poisoned, |
88 | COMPLETE => ExclusiveState::Complete, |
89 | _ => unreachable!("invalid Once state" ), |
90 | } |
91 | } |
92 | |
93 | #[inline ] |
94 | pub(crate) fn set_state(&mut self, new_state: ExclusiveState) { |
95 | *self.state_and_queued.get_mut() = match new_state { |
96 | ExclusiveState::Incomplete => INCOMPLETE, |
97 | ExclusiveState::Poisoned => POISONED, |
98 | ExclusiveState::Complete => COMPLETE, |
99 | }; |
100 | } |
101 | |
102 | #[cold ] |
103 | #[track_caller ] |
104 | pub fn wait(&self, ignore_poisoning: bool) { |
105 | let mut state_and_queued = self.state_and_queued.load(Acquire); |
106 | loop { |
107 | let state = state_and_queued & STATE_MASK; |
108 | let queued = state_and_queued & QUEUED != 0; |
109 | match state { |
110 | COMPLETE => return, |
111 | POISONED if !ignore_poisoning => { |
112 | // Panic to propagate the poison. |
113 | panic!("Once instance has previously been poisoned" ); |
114 | } |
115 | _ => { |
116 | // Set the QUEUED bit if it has not already been set. |
117 | if !queued { |
118 | state_and_queued += QUEUED; |
119 | if let Err(new) = self.state_and_queued.compare_exchange_weak( |
120 | state, |
121 | state_and_queued, |
122 | Relaxed, |
123 | Acquire, |
124 | ) { |
125 | state_and_queued = new; |
126 | continue; |
127 | } |
128 | } |
129 | |
130 | futex_wait(&self.state_and_queued, state_and_queued, None); |
131 | state_and_queued = self.state_and_queued.load(Acquire); |
132 | } |
133 | } |
134 | } |
135 | } |
136 | |
137 | #[cold ] |
138 | #[track_caller ] |
139 | pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) { |
140 | let mut state_and_queued = self.state_and_queued.load(Acquire); |
141 | loop { |
142 | let state = state_and_queued & STATE_MASK; |
143 | let queued = state_and_queued & QUEUED != 0; |
144 | match state { |
145 | COMPLETE => return, |
146 | POISONED if !ignore_poisoning => { |
147 | // Panic to propagate the poison. |
148 | panic!("Once instance has previously been poisoned" ); |
149 | } |
150 | INCOMPLETE | POISONED => { |
151 | // Try to register the current thread as the one running. |
152 | let next = RUNNING + if queued { QUEUED } else { 0 }; |
153 | if let Err(new) = self.state_and_queued.compare_exchange_weak( |
154 | state_and_queued, |
155 | next, |
156 | Acquire, |
157 | Acquire, |
158 | ) { |
159 | state_and_queued = new; |
160 | continue; |
161 | } |
162 | |
163 | // `waiter_queue` will manage other waiting threads, and |
164 | // wake them up on drop. |
165 | let mut waiter_queue = CompletionGuard { |
166 | state_and_queued: &self.state_and_queued, |
167 | set_state_on_drop_to: POISONED, |
168 | }; |
169 | // Run the function, letting it know if we're poisoned or not. |
170 | let f_state = public::OnceState { |
171 | inner: OnceState { |
172 | poisoned: state == POISONED, |
173 | set_state_to: Cell::new(COMPLETE), |
174 | }, |
175 | }; |
176 | f(&f_state); |
177 | waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get(); |
178 | return; |
179 | } |
180 | _ => { |
181 | // All other values must be RUNNING. |
182 | assert!(state == RUNNING); |
183 | |
184 | // Set the QUEUED bit if it is not already set. |
185 | if !queued { |
186 | state_and_queued += QUEUED; |
187 | if let Err(new) = self.state_and_queued.compare_exchange_weak( |
188 | state, |
189 | state_and_queued, |
190 | Relaxed, |
191 | Acquire, |
192 | ) { |
193 | state_and_queued = new; |
194 | continue; |
195 | } |
196 | } |
197 | |
198 | futex_wait(&self.state_and_queued, state_and_queued, None); |
199 | state_and_queued = self.state_and_queued.load(Acquire); |
200 | } |
201 | } |
202 | } |
203 | } |
204 | } |
205 | |