| 1 | use std::sync::atomic::{AtomicUsize, Ordering}; |
| 2 | |
| 3 | pub(super) struct AtomicCounters { |
| 4 | /// Packs together a number of counters. The counters are ordered as |
| 5 | /// follows, from least to most significant bits (here, we assuming |
| 6 | /// that [`THREADS_BITS`] is equal to 10): |
| 7 | /// |
| 8 | /// * Bits 0..10: Stores the number of **sleeping threads** |
| 9 | /// * Bits 10..20: Stores the number of **inactive threads** |
| 10 | /// * Bits 20..: Stores the **job event counter** (JEC) |
| 11 | /// |
| 12 | /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note |
| 13 | /// that the total number of bits (and hence the number of bits used for the |
| 14 | /// JEC) will depend on whether we are using a 32- or 64-bit architecture. |
| 15 | value: AtomicUsize, |
| 16 | } |
| 17 | |
| 18 | #[derive (Copy, Clone)] |
| 19 | pub(super) struct Counters { |
| 20 | word: usize, |
| 21 | } |
| 22 | |
| 23 | /// A value read from the **Jobs Event Counter**. |
| 24 | /// See the [`README.md`](README.md) for more |
| 25 | /// coverage of how the jobs event counter works. |
| 26 | #[derive (Copy, Clone, Debug, PartialEq, PartialOrd)] |
| 27 | pub(super) struct JobsEventCounter(usize); |
| 28 | |
| 29 | impl JobsEventCounter { |
| 30 | pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX); |
| 31 | |
| 32 | #[inline ] |
| 33 | pub(super) fn as_usize(self) -> usize { |
| 34 | self.0 |
| 35 | } |
| 36 | |
| 37 | /// The JEC "is sleepy" if the last thread to increment it was in the |
| 38 | /// process of becoming sleepy. This is indicated by its value being *even*. |
| 39 | /// When new jobs are posted, they check if the JEC is sleepy, and if so |
| 40 | /// they incremented it. |
| 41 | #[inline ] |
| 42 | pub(super) fn is_sleepy(self) -> bool { |
| 43 | (self.as_usize() & 1) == 0 |
| 44 | } |
| 45 | |
| 46 | /// The JEC "is active" if the last thread to increment it was posting new |
| 47 | /// work. This is indicated by its value being *odd*. When threads get |
| 48 | /// sleepy, they will check if the JEC is active, and increment it. |
| 49 | #[inline ] |
| 50 | pub(super) fn is_active(self) -> bool { |
| 51 | !self.is_sleepy() |
| 52 | } |
| 53 | } |
| 54 | |
| 55 | /// Number of bits used for the thread counters. |
| 56 | #[cfg (target_pointer_width = "64" )] |
| 57 | const THREADS_BITS: usize = 16; |
| 58 | |
| 59 | #[cfg (target_pointer_width = "32" )] |
| 60 | const THREADS_BITS: usize = 8; |
| 61 | |
| 62 | /// Bits to shift to select the sleeping threads |
| 63 | /// (used with `select_bits`). |
| 64 | #[allow (clippy::erasing_op)] |
| 65 | const SLEEPING_SHIFT: usize = 0 * THREADS_BITS; |
| 66 | |
| 67 | /// Bits to shift to select the inactive threads |
| 68 | /// (used with `select_bits`). |
| 69 | #[allow (clippy::identity_op)] |
| 70 | const INACTIVE_SHIFT: usize = 1 * THREADS_BITS; |
| 71 | |
| 72 | /// Bits to shift to select the JEC |
| 73 | /// (use JOBS_BITS). |
| 74 | const JEC_SHIFT: usize = 2 * THREADS_BITS; |
| 75 | |
| 76 | /// Max value for the thread counters. |
| 77 | pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1; |
| 78 | |
| 79 | /// Constant that can be added to add one sleeping thread. |
| 80 | const ONE_SLEEPING: usize = 1; |
| 81 | |
| 82 | /// Constant that can be added to add one inactive thread. |
| 83 | /// An inactive thread is either idle, sleepy, or sleeping. |
| 84 | const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT; |
| 85 | |
| 86 | /// Constant that can be added to add one to the JEC. |
| 87 | const ONE_JEC: usize = 1 << JEC_SHIFT; |
| 88 | |
| 89 | impl AtomicCounters { |
| 90 | #[inline ] |
| 91 | pub(super) fn new() -> AtomicCounters { |
| 92 | AtomicCounters { |
| 93 | value: AtomicUsize::new(0), |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | /// Load and return the current value of the various counters. |
| 98 | /// This value can then be given to other method which will |
| 99 | /// attempt to update the counters via compare-and-swap. |
| 100 | #[inline ] |
| 101 | pub(super) fn load(&self, ordering: Ordering) -> Counters { |
| 102 | Counters::new(self.value.load(ordering)) |
| 103 | } |
| 104 | |
| 105 | #[inline ] |
| 106 | fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool { |
| 107 | self.value |
| 108 | .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed) |
| 109 | .is_ok() |
| 110 | } |
| 111 | |
| 112 | /// Adds an inactive thread. This cannot fail. |
| 113 | /// |
| 114 | /// This should be invoked when a thread enters its idle loop looking |
| 115 | /// for work. It is decremented when work is found. Note that it is |
| 116 | /// not decremented if the thread transitions from idle to sleepy or sleeping; |
| 117 | /// so the number of inactive threads is always greater-than-or-equal |
| 118 | /// to the number of sleeping threads. |
| 119 | #[inline ] |
| 120 | pub(super) fn add_inactive_thread(&self) { |
| 121 | self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst); |
| 122 | } |
| 123 | |
| 124 | /// Increments the jobs event counter if `increment_when`, when applied to |
| 125 | /// the current value, is true. Used to toggle the JEC from even (sleepy) to |
| 126 | /// odd (active) or vice versa. Returns the final value of the counters, for |
| 127 | /// which `increment_when` is guaranteed to return false. |
| 128 | pub(super) fn increment_jobs_event_counter_if( |
| 129 | &self, |
| 130 | increment_when: impl Fn(JobsEventCounter) -> bool, |
| 131 | ) -> Counters { |
| 132 | loop { |
| 133 | let old_value = self.load(Ordering::SeqCst); |
| 134 | if increment_when(old_value.jobs_counter()) { |
| 135 | let new_value = old_value.increment_jobs_counter(); |
| 136 | if self.try_exchange(old_value, new_value, Ordering::SeqCst) { |
| 137 | return new_value; |
| 138 | } |
| 139 | } else { |
| 140 | return old_value; |
| 141 | } |
| 142 | } |
| 143 | } |
| 144 | |
| 145 | /// Subtracts an inactive thread. This cannot fail. It is invoked |
| 146 | /// when a thread finds work and hence becomes active. It returns the |
| 147 | /// number of sleeping threads to wake up (if any). |
| 148 | /// |
| 149 | /// See `add_inactive_thread`. |
| 150 | #[inline ] |
| 151 | pub(super) fn sub_inactive_thread(&self) -> usize { |
| 152 | let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst)); |
| 153 | debug_assert!( |
| 154 | old_value.inactive_threads() > 0, |
| 155 | "sub_inactive_thread: old_value {:?} has no inactive threads" , |
| 156 | old_value, |
| 157 | ); |
| 158 | debug_assert!( |
| 159 | old_value.sleeping_threads() <= old_value.inactive_threads(), |
| 160 | "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads" , |
| 161 | old_value, |
| 162 | old_value.sleeping_threads(), |
| 163 | old_value.inactive_threads(), |
| 164 | ); |
| 165 | |
| 166 | // Current heuristic: whenever an inactive thread goes away, if |
| 167 | // there are any sleeping threads, wake 'em up. |
| 168 | let sleeping_threads = old_value.sleeping_threads(); |
| 169 | std::cmp::min(sleeping_threads, 2) |
| 170 | } |
| 171 | |
| 172 | /// Subtracts a sleeping thread. This cannot fail, but it is only |
| 173 | /// safe to do if you you know the number of sleeping threads is |
| 174 | /// non-zero (i.e., because you have just awoken a sleeping |
| 175 | /// thread). |
| 176 | #[inline ] |
| 177 | pub(super) fn sub_sleeping_thread(&self) { |
| 178 | let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst)); |
| 179 | debug_assert!( |
| 180 | old_value.sleeping_threads() > 0, |
| 181 | "sub_sleeping_thread: old_value {:?} had no sleeping threads" , |
| 182 | old_value, |
| 183 | ); |
| 184 | debug_assert!( |
| 185 | old_value.sleeping_threads() <= old_value.inactive_threads(), |
| 186 | "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads" , |
| 187 | old_value, |
| 188 | old_value.sleeping_threads(), |
| 189 | old_value.inactive_threads(), |
| 190 | ); |
| 191 | } |
| 192 | |
| 193 | #[inline ] |
| 194 | pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool { |
| 195 | debug_assert!( |
| 196 | old_value.inactive_threads() > 0, |
| 197 | "try_add_sleeping_thread: old_value {:?} has no inactive threads" , |
| 198 | old_value, |
| 199 | ); |
| 200 | debug_assert!( |
| 201 | old_value.sleeping_threads() < THREADS_MAX, |
| 202 | "try_add_sleeping_thread: old_value {:?} has too many sleeping threads" , |
| 203 | old_value, |
| 204 | ); |
| 205 | |
| 206 | let mut new_value = old_value; |
| 207 | new_value.word += ONE_SLEEPING; |
| 208 | |
| 209 | self.try_exchange(old_value, new_value, Ordering::SeqCst) |
| 210 | } |
| 211 | } |
| 212 | |
| 213 | #[inline ] |
| 214 | fn select_thread(word: usize, shift: usize) -> usize { |
| 215 | ((word >> shift) as usize) & THREADS_MAX |
| 216 | } |
| 217 | |
| 218 | #[inline ] |
| 219 | fn select_jec(word: usize) -> usize { |
| 220 | (word >> JEC_SHIFT) as usize |
| 221 | } |
| 222 | |
| 223 | impl Counters { |
| 224 | #[inline ] |
| 225 | fn new(word: usize) -> Counters { |
| 226 | Counters { word } |
| 227 | } |
| 228 | |
| 229 | #[inline ] |
| 230 | fn increment_jobs_counter(self) -> Counters { |
| 231 | // We can freely add to JEC because it occupies the most significant bits. |
| 232 | // Thus it doesn't overflow into the other counters, just wraps itself. |
| 233 | Counters { |
| 234 | word: self.word.wrapping_add(ONE_JEC), |
| 235 | } |
| 236 | } |
| 237 | |
| 238 | #[inline ] |
| 239 | pub(super) fn jobs_counter(self) -> JobsEventCounter { |
| 240 | JobsEventCounter(select_jec(self.word)) |
| 241 | } |
| 242 | |
| 243 | /// The number of threads that are not actively |
| 244 | /// executing work. They may be idle, sleepy, or asleep. |
| 245 | #[inline ] |
| 246 | pub(super) fn inactive_threads(self) -> usize { |
| 247 | select_thread(self.word, INACTIVE_SHIFT) |
| 248 | } |
| 249 | |
| 250 | #[inline ] |
| 251 | pub(super) fn awake_but_idle_threads(self) -> usize { |
| 252 | debug_assert!( |
| 253 | self.sleeping_threads() <= self.inactive_threads(), |
| 254 | "sleeping threads: {} > raw idle threads {}" , |
| 255 | self.sleeping_threads(), |
| 256 | self.inactive_threads() |
| 257 | ); |
| 258 | self.inactive_threads() - self.sleeping_threads() |
| 259 | } |
| 260 | |
| 261 | #[inline ] |
| 262 | pub(super) fn sleeping_threads(self) -> usize { |
| 263 | select_thread(self.word, SLEEPING_SHIFT) |
| 264 | } |
| 265 | } |
| 266 | |
| 267 | impl std::fmt::Debug for Counters { |
| 268 | fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| 269 | let word: String = format!(" {:016x}" , self.word); |
| 270 | fmt&mut DebugStruct<'_, '_>.debug_struct("Counters" ) |
| 271 | .field("word" , &word) |
| 272 | .field("jobs" , &self.jobs_counter().0) |
| 273 | .field("inactive" , &self.inactive_threads()) |
| 274 | .field(name:"sleeping" , &self.sleeping_threads()) |
| 275 | .finish() |
| 276 | } |
| 277 | } |
| 278 | |