1 | use crate::cell::Cell; |
2 | use crate::sync as public; |
3 | use crate::sync::atomic::{ |
4 | AtomicU32, |
5 | Ordering::{Acquire, Relaxed, Release}, |
6 | }; |
7 | use crate::sync::once::ExclusiveState; |
8 | use crate::sys::futex::{futex_wait, futex_wake_all}; |
9 | |
10 | // On some platforms, the OS is very nice and handles the waiter queue for us. |
11 | // This means we only need one atomic value with 5 states: |
12 | |
13 | /// No initialization has run yet, and no thread is currently using the Once. |
14 | const INCOMPLETE: u32 = 0; |
15 | /// Some thread has previously attempted to initialize the Once, but it panicked, |
16 | /// so the Once is now poisoned. There are no other threads currently accessing |
17 | /// this Once. |
18 | const POISONED: u32 = 1; |
19 | /// Some thread is currently attempting to run initialization. It may succeed, |
20 | /// so all future threads need to wait for it to finish. |
21 | const RUNNING: u32 = 2; |
22 | /// Some thread is currently attempting to run initialization and there are threads |
23 | /// waiting for it to finish. |
24 | const QUEUED: u32 = 3; |
25 | /// Initialization has completed and all future calls should finish immediately. |
26 | const COMPLETE: u32 = 4; |
27 | |
28 | // Threads wait by setting the state to QUEUED and calling `futex_wait` on the state |
29 | // variable. When the running thread finishes, it will wake all waiting threads using |
30 | // `futex_wake_all`. |
31 | |
32 | pub struct OnceState { |
33 | poisoned: bool, |
34 | set_state_to: Cell<u32>, |
35 | } |
36 | |
37 | impl OnceState { |
38 | #[inline ] |
39 | pub fn is_poisoned(&self) -> bool { |
40 | self.poisoned |
41 | } |
42 | |
43 | #[inline ] |
44 | pub fn poison(&self) { |
45 | self.set_state_to.set(POISONED); |
46 | } |
47 | } |
48 | |
49 | struct CompletionGuard<'a> { |
50 | state: &'a AtomicU32, |
51 | set_state_on_drop_to: u32, |
52 | } |
53 | |
54 | impl<'a> Drop for CompletionGuard<'a> { |
55 | fn drop(&mut self) { |
56 | // Use release ordering to propagate changes to all threads checking |
57 | // up on the Once. `futex_wake_all` does its own synchronization, hence |
58 | // we do not need `AcqRel`. |
59 | if self.state.swap(self.set_state_on_drop_to, order:Release) == QUEUED { |
60 | futex_wake_all(&self.state); |
61 | } |
62 | } |
63 | } |
64 | |
65 | pub struct Once { |
66 | state: AtomicU32, |
67 | } |
68 | |
69 | impl Once { |
70 | #[inline ] |
71 | pub const fn new() -> Once { |
72 | Once { state: AtomicU32::new(INCOMPLETE) } |
73 | } |
74 | |
75 | #[inline ] |
76 | pub fn is_completed(&self) -> bool { |
77 | // Use acquire ordering to make all initialization changes visible to the |
78 | // current thread. |
79 | self.state.load(Acquire) == COMPLETE |
80 | } |
81 | |
82 | #[inline ] |
83 | pub(crate) fn state(&mut self) -> ExclusiveState { |
84 | match *self.state.get_mut() { |
85 | INCOMPLETE => ExclusiveState::Incomplete, |
86 | POISONED => ExclusiveState::Poisoned, |
87 | COMPLETE => ExclusiveState::Complete, |
88 | _ => unreachable!("invalid Once state" ), |
89 | } |
90 | } |
91 | |
92 | // This uses FnMut to match the API of the generic implementation. As this |
93 | // implementation is quite light-weight, it is generic over the closure and |
94 | // so avoids the cost of dynamic dispatch. |
95 | #[cold ] |
96 | #[track_caller ] |
97 | pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) { |
98 | let mut state = self.state.load(Acquire); |
99 | loop { |
100 | match state { |
101 | POISONED if !ignore_poisoning => { |
102 | // Panic to propagate the poison. |
103 | panic!("Once instance has previously been poisoned" ); |
104 | } |
105 | INCOMPLETE | POISONED => { |
106 | // Try to register the current thread as the one running. |
107 | if let Err(new) = |
108 | self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire) |
109 | { |
110 | state = new; |
111 | continue; |
112 | } |
113 | // `waiter_queue` will manage other waiting threads, and |
114 | // wake them up on drop. |
115 | let mut waiter_queue = |
116 | CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED }; |
117 | // Run the function, letting it know if we're poisoned or not. |
118 | let f_state = public::OnceState { |
119 | inner: OnceState { |
120 | poisoned: state == POISONED, |
121 | set_state_to: Cell::new(COMPLETE), |
122 | }, |
123 | }; |
124 | f(&f_state); |
125 | waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get(); |
126 | return; |
127 | } |
128 | RUNNING | QUEUED => { |
129 | // Set the state to QUEUED if it is not already. |
130 | if state == RUNNING |
131 | && let Err(new) = |
132 | self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire) |
133 | { |
134 | state = new; |
135 | continue; |
136 | } |
137 | |
138 | futex_wait(&self.state, QUEUED, None); |
139 | state = self.state.load(Acquire); |
140 | } |
141 | COMPLETE => return, |
142 | _ => unreachable!("state is never set to invalid values" ), |
143 | } |
144 | } |
145 | } |
146 | } |
147 | |