1 | use std::sync::atomic::{AtomicUsize, Ordering}; |
2 | |
3 | pub(super) struct AtomicCounters { |
4 | /// Packs together a number of counters. The counters are ordered as |
5 | /// follows, from least to most significant bits (here, we assuming |
6 | /// that [`THREADS_BITS`] is equal to 10): |
7 | /// |
8 | /// * Bits 0..10: Stores the number of **sleeping threads** |
9 | /// * Bits 10..20: Stores the number of **inactive threads** |
10 | /// * Bits 20..: Stores the **job event counter** (JEC) |
11 | /// |
12 | /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note |
13 | /// that the total number of bits (and hence the number of bits used for the |
14 | /// JEC) will depend on whether we are using a 32- or 64-bit architecture. |
15 | value: AtomicUsize, |
16 | } |
17 | |
18 | #[derive(Copy, Clone)] |
19 | pub(super) struct Counters { |
20 | word: usize, |
21 | } |
22 | |
23 | /// A value read from the **Jobs Event Counter**. |
24 | /// See the [`README.md`](README.md) for more |
25 | /// coverage of how the jobs event counter works. |
26 | #[derive(Copy, Clone, Debug, PartialEq, PartialOrd)] |
27 | pub(super) struct JobsEventCounter(usize); |
28 | |
29 | impl JobsEventCounter { |
30 | pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX); |
31 | |
32 | #[inline ] |
33 | pub(super) fn as_usize(self) -> usize { |
34 | self.0 |
35 | } |
36 | |
37 | /// The JEC "is sleepy" if the last thread to increment it was in the |
38 | /// process of becoming sleepy. This is indicated by its value being *even*. |
39 | /// When new jobs are posted, they check if the JEC is sleepy, and if so |
40 | /// they incremented it. |
41 | #[inline ] |
42 | pub(super) fn is_sleepy(self) -> bool { |
43 | (self.as_usize() & 1) == 0 |
44 | } |
45 | |
46 | /// The JEC "is active" if the last thread to increment it was posting new |
47 | /// work. This is indicated by its value being *odd*. When threads get |
48 | /// sleepy, they will check if the JEC is active, and increment it. |
49 | #[inline ] |
50 | pub(super) fn is_active(self) -> bool { |
51 | !self.is_sleepy() |
52 | } |
53 | } |
54 | |
55 | /// Number of bits used for the thread counters. |
56 | #[cfg (target_pointer_width = "64" )] |
57 | const THREADS_BITS: usize = 16; |
58 | |
59 | #[cfg (target_pointer_width = "32" )] |
60 | const THREADS_BITS: usize = 8; |
61 | |
62 | /// Bits to shift to select the sleeping threads |
63 | /// (used with `select_bits`). |
64 | #[allow (clippy::erasing_op)] |
65 | const SLEEPING_SHIFT: usize = 0 * THREADS_BITS; |
66 | |
67 | /// Bits to shift to select the inactive threads |
68 | /// (used with `select_bits`). |
69 | #[allow (clippy::identity_op)] |
70 | const INACTIVE_SHIFT: usize = 1 * THREADS_BITS; |
71 | |
72 | /// Bits to shift to select the JEC |
73 | /// (use JOBS_BITS). |
74 | const JEC_SHIFT: usize = 2 * THREADS_BITS; |
75 | |
76 | /// Max value for the thread counters. |
77 | pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1; |
78 | |
79 | /// Constant that can be added to add one sleeping thread. |
80 | const ONE_SLEEPING: usize = 1; |
81 | |
82 | /// Constant that can be added to add one inactive thread. |
83 | /// An inactive thread is either idle, sleepy, or sleeping. |
84 | const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT; |
85 | |
86 | /// Constant that can be added to add one to the JEC. |
87 | const ONE_JEC: usize = 1 << JEC_SHIFT; |
88 | |
89 | impl AtomicCounters { |
90 | #[inline ] |
91 | pub(super) fn new() -> AtomicCounters { |
92 | AtomicCounters { |
93 | value: AtomicUsize::new(0), |
94 | } |
95 | } |
96 | |
97 | /// Load and return the current value of the various counters. |
98 | /// This value can then be given to other method which will |
99 | /// attempt to update the counters via compare-and-swap. |
100 | #[inline ] |
101 | pub(super) fn load(&self, ordering: Ordering) -> Counters { |
102 | Counters::new(self.value.load(ordering)) |
103 | } |
104 | |
105 | #[inline ] |
106 | fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool { |
107 | self.value |
108 | .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed) |
109 | .is_ok() |
110 | } |
111 | |
112 | /// Adds an inactive thread. This cannot fail. |
113 | /// |
114 | /// This should be invoked when a thread enters its idle loop looking |
115 | /// for work. It is decremented when work is found. Note that it is |
116 | /// not decremented if the thread transitions from idle to sleepy or sleeping; |
117 | /// so the number of inactive threads is always greater-than-or-equal |
118 | /// to the number of sleeping threads. |
119 | #[inline ] |
120 | pub(super) fn add_inactive_thread(&self) { |
121 | self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst); |
122 | } |
123 | |
124 | /// Increments the jobs event counter if `increment_when`, when applied to |
125 | /// the current value, is true. Used to toggle the JEC from even (sleepy) to |
126 | /// odd (active) or vice versa. Returns the final value of the counters, for |
127 | /// which `increment_when` is guaranteed to return false. |
128 | pub(super) fn increment_jobs_event_counter_if( |
129 | &self, |
130 | increment_when: impl Fn(JobsEventCounter) -> bool, |
131 | ) -> Counters { |
132 | loop { |
133 | let old_value = self.load(Ordering::SeqCst); |
134 | if increment_when(old_value.jobs_counter()) { |
135 | let new_value = old_value.increment_jobs_counter(); |
136 | if self.try_exchange(old_value, new_value, Ordering::SeqCst) { |
137 | return new_value; |
138 | } |
139 | } else { |
140 | return old_value; |
141 | } |
142 | } |
143 | } |
144 | |
145 | /// Subtracts an inactive thread. This cannot fail. It is invoked |
146 | /// when a thread finds work and hence becomes active. It returns the |
147 | /// number of sleeping threads to wake up (if any). |
148 | /// |
149 | /// See `add_inactive_thread`. |
150 | #[inline ] |
151 | pub(super) fn sub_inactive_thread(&self) -> usize { |
152 | let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst)); |
153 | debug_assert!( |
154 | old_value.inactive_threads() > 0, |
155 | "sub_inactive_thread: old_value {:?} has no inactive threads" , |
156 | old_value, |
157 | ); |
158 | debug_assert!( |
159 | old_value.sleeping_threads() <= old_value.inactive_threads(), |
160 | "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads" , |
161 | old_value, |
162 | old_value.sleeping_threads(), |
163 | old_value.inactive_threads(), |
164 | ); |
165 | |
166 | // Current heuristic: whenever an inactive thread goes away, if |
167 | // there are any sleeping threads, wake 'em up. |
168 | let sleeping_threads = old_value.sleeping_threads(); |
169 | std::cmp::min(sleeping_threads, 2) |
170 | } |
171 | |
172 | /// Subtracts a sleeping thread. This cannot fail, but it is only |
173 | /// safe to do if you you know the number of sleeping threads is |
174 | /// non-zero (i.e., because you have just awoken a sleeping |
175 | /// thread). |
176 | #[inline ] |
177 | pub(super) fn sub_sleeping_thread(&self) { |
178 | let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst)); |
179 | debug_assert!( |
180 | old_value.sleeping_threads() > 0, |
181 | "sub_sleeping_thread: old_value {:?} had no sleeping threads" , |
182 | old_value, |
183 | ); |
184 | debug_assert!( |
185 | old_value.sleeping_threads() <= old_value.inactive_threads(), |
186 | "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads" , |
187 | old_value, |
188 | old_value.sleeping_threads(), |
189 | old_value.inactive_threads(), |
190 | ); |
191 | } |
192 | |
193 | #[inline ] |
194 | pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool { |
195 | debug_assert!( |
196 | old_value.inactive_threads() > 0, |
197 | "try_add_sleeping_thread: old_value {:?} has no inactive threads" , |
198 | old_value, |
199 | ); |
200 | debug_assert!( |
201 | old_value.sleeping_threads() < THREADS_MAX, |
202 | "try_add_sleeping_thread: old_value {:?} has too many sleeping threads" , |
203 | old_value, |
204 | ); |
205 | |
206 | let mut new_value = old_value; |
207 | new_value.word += ONE_SLEEPING; |
208 | |
209 | self.try_exchange(old_value, new_value, Ordering::SeqCst) |
210 | } |
211 | } |
212 | |
213 | #[inline ] |
214 | fn select_thread(word: usize, shift: usize) -> usize { |
215 | (word >> shift) & THREADS_MAX |
216 | } |
217 | |
218 | #[inline ] |
219 | fn select_jec(word: usize) -> usize { |
220 | word >> JEC_SHIFT |
221 | } |
222 | |
223 | impl Counters { |
224 | #[inline ] |
225 | fn new(word: usize) -> Counters { |
226 | Counters { word } |
227 | } |
228 | |
229 | #[inline ] |
230 | fn increment_jobs_counter(self) -> Counters { |
231 | // We can freely add to JEC because it occupies the most significant bits. |
232 | // Thus it doesn't overflow into the other counters, just wraps itself. |
233 | Counters { |
234 | word: self.word.wrapping_add(ONE_JEC), |
235 | } |
236 | } |
237 | |
238 | #[inline ] |
239 | pub(super) fn jobs_counter(self) -> JobsEventCounter { |
240 | JobsEventCounter(select_jec(self.word)) |
241 | } |
242 | |
243 | /// The number of threads that are not actively |
244 | /// executing work. They may be idle, sleepy, or asleep. |
245 | #[inline ] |
246 | pub(super) fn inactive_threads(self) -> usize { |
247 | select_thread(self.word, INACTIVE_SHIFT) |
248 | } |
249 | |
250 | #[inline ] |
251 | pub(super) fn awake_but_idle_threads(self) -> usize { |
252 | debug_assert!( |
253 | self.sleeping_threads() <= self.inactive_threads(), |
254 | "sleeping threads: {} > raw idle threads {}" , |
255 | self.sleeping_threads(), |
256 | self.inactive_threads() |
257 | ); |
258 | self.inactive_threads() - self.sleeping_threads() |
259 | } |
260 | |
261 | #[inline ] |
262 | pub(super) fn sleeping_threads(self) -> usize { |
263 | select_thread(self.word, SLEEPING_SHIFT) |
264 | } |
265 | } |
266 | |
267 | impl std::fmt::Debug for Counters { |
268 | fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
269 | let word = format!("{:016x}" , self.word); |
270 | fmt.debug_struct("Counters" ) |
271 | .field("word" , &word) |
272 | .field("jobs" , &self.jobs_counter().0) |
273 | .field("inactive" , &self.inactive_threads()) |
274 | .field("sleeping" , &self.sleeping_threads()) |
275 | .finish() |
276 | } |
277 | } |
278 | |