| 1 | //! Code that decides when workers should go to sleep. See README.md | 
| 2 | //! for an overview. | 
| 3 |  | 
| 4 | use crate::latch::CoreLatch; | 
| 5 | use crate::sync::{Condvar, Mutex}; | 
| 6 | use crossbeam_utils::CachePadded; | 
| 7 | use std::sync::atomic::Ordering; | 
| 8 | use std::thread; | 
| 9 | use std::usize; | 
| 10 |  | 
| 11 | mod counters; | 
| 12 | pub(crate) use self::counters::THREADS_MAX; | 
| 13 | use self::counters::{AtomicCounters, JobsEventCounter}; | 
| 14 |  | 
| 15 | /// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping | 
| 16 | /// of workers. It has callbacks that are invoked periodically at significant events, | 
| 17 | /// such as when workers are looping and looking for work, when latches are set, or when | 
| 18 | /// jobs are published, and it either blocks threads or wakes them in response to these | 
| 19 | /// events. See the [`README.md`] in this module for more details. | 
| 20 | /// | 
| 21 | /// [`README.md`] README.md | 
| 22 | pub(super) struct Sleep { | 
| 23 |     /// One "sleep state" per worker. Used to track if a worker is sleeping and to have | 
| 24 |     /// them block. | 
| 25 |     worker_sleep_states: Vec<CachePadded<WorkerSleepState>>, | 
| 26 |  | 
| 27 |     counters: AtomicCounters, | 
| 28 | } | 
| 29 |  | 
| 30 | /// An instance of this struct is created when a thread becomes idle. | 
| 31 | /// It is consumed when the thread finds work, and passed by `&mut` | 
| 32 | /// reference for operations that preserve the idle state. (In other | 
| 33 | /// words, producing one of these structs is evidence the thread is | 
| 34 | /// idle.) It tracks state such as how long the thread has been idle. | 
| 35 | pub(super) struct IdleState { | 
| 36 |     /// What is worker index of the idle thread? | 
| 37 |     worker_index: usize, | 
| 38 |  | 
| 39 |     /// How many rounds have we been circling without sleeping? | 
| 40 |     rounds: u32, | 
| 41 |  | 
| 42 |     /// Once we become sleepy, what was the sleepy counter value? | 
| 43 |     /// Set to `INVALID_SLEEPY_COUNTER` otherwise. | 
| 44 |     jobs_counter: JobsEventCounter, | 
| 45 | } | 
| 46 |  | 
| 47 | /// The "sleep state" for an individual worker. | 
| 48 | #[derive (Default)] | 
| 49 | struct WorkerSleepState { | 
| 50 |     /// Set to true when the worker goes to sleep; set to false when | 
| 51 |     /// the worker is notified or when it wakes. | 
| 52 |     is_blocked: Mutex<bool>, | 
| 53 |  | 
| 54 |     condvar: Condvar, | 
| 55 | } | 
| 56 |  | 
| 57 | const ROUNDS_UNTIL_SLEEPY: u32 = 32; | 
| 58 | const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1; | 
| 59 |  | 
| 60 | impl Sleep { | 
| 61 |     pub(super) fn new(n_threads: usize) -> Sleep { | 
| 62 |         assert!(n_threads <= THREADS_MAX); | 
| 63 |         Sleep { | 
| 64 |             worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(), | 
| 65 |             counters: AtomicCounters::new(), | 
| 66 |         } | 
| 67 |     } | 
| 68 |  | 
| 69 |     #[inline ] | 
| 70 |     pub(super) fn start_looking(&self, worker_index: usize) -> IdleState { | 
| 71 |         self.counters.add_inactive_thread(); | 
| 72 |  | 
| 73 |         IdleState { | 
| 74 |             worker_index, | 
| 75 |             rounds: 0, | 
| 76 |             jobs_counter: JobsEventCounter::DUMMY, | 
| 77 |         } | 
| 78 |     } | 
| 79 |  | 
| 80 |     #[inline ] | 
| 81 |     pub(super) fn work_found(&self) { | 
| 82 |         // If we were the last idle thread and other threads are still sleeping, | 
| 83 |         // then we should wake up another thread. | 
| 84 |         let threads_to_wake = self.counters.sub_inactive_thread(); | 
| 85 |         self.wake_any_threads(threads_to_wake as u32); | 
| 86 |     } | 
| 87 |  | 
| 88 |     #[inline ] | 
| 89 |     pub(super) fn no_work_found( | 
| 90 |         &self, | 
| 91 |         idle_state: &mut IdleState, | 
| 92 |         latch: &CoreLatch, | 
| 93 |         has_injected_jobs: impl FnOnce() -> bool, | 
| 94 |     ) { | 
| 95 |         if idle_state.rounds < ROUNDS_UNTIL_SLEEPY { | 
| 96 |             thread::yield_now(); | 
| 97 |             idle_state.rounds += 1; | 
| 98 |         } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY { | 
| 99 |             idle_state.jobs_counter = self.announce_sleepy(); | 
| 100 |             idle_state.rounds += 1; | 
| 101 |             thread::yield_now(); | 
| 102 |         } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING { | 
| 103 |             idle_state.rounds += 1; | 
| 104 |             thread::yield_now(); | 
| 105 |         } else { | 
| 106 |             debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING); | 
| 107 |             self.sleep(idle_state, latch, has_injected_jobs); | 
| 108 |         } | 
| 109 |     } | 
| 110 |  | 
| 111 |     #[cold ] | 
| 112 |     fn announce_sleepy(&self) -> JobsEventCounter { | 
| 113 |         self.counters | 
| 114 |             .increment_jobs_event_counter_if(JobsEventCounter::is_active) | 
| 115 |             .jobs_counter() | 
| 116 |     } | 
| 117 |  | 
| 118 |     #[cold ] | 
| 119 |     fn sleep( | 
| 120 |         &self, | 
| 121 |         idle_state: &mut IdleState, | 
| 122 |         latch: &CoreLatch, | 
| 123 |         has_injected_jobs: impl FnOnce() -> bool, | 
| 124 |     ) { | 
| 125 |         let worker_index = idle_state.worker_index; | 
| 126 |  | 
| 127 |         if !latch.get_sleepy() { | 
| 128 |             return; | 
| 129 |         } | 
| 130 |  | 
| 131 |         let sleep_state = &self.worker_sleep_states[worker_index]; | 
| 132 |         let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); | 
| 133 |         debug_assert!(!*is_blocked); | 
| 134 |  | 
| 135 |         // Our latch was signalled. We should wake back up fully as we | 
| 136 |         // will have some stuff to do. | 
| 137 |         if !latch.fall_asleep() { | 
| 138 |             idle_state.wake_fully(); | 
| 139 |             return; | 
| 140 |         } | 
| 141 |  | 
| 142 |         loop { | 
| 143 |             let counters = self.counters.load(Ordering::SeqCst); | 
| 144 |  | 
| 145 |             // Check if the JEC has changed since we got sleepy. | 
| 146 |             debug_assert!(idle_state.jobs_counter.is_sleepy()); | 
| 147 |             if counters.jobs_counter() != idle_state.jobs_counter { | 
| 148 |                 // JEC has changed, so a new job was posted, but for some reason | 
| 149 |                 // we didn't see it. We should return to just before the SLEEPY | 
| 150 |                 // state so we can do another search and (if we fail to find | 
| 151 |                 // work) go back to sleep. | 
| 152 |                 idle_state.wake_partly(); | 
| 153 |                 latch.wake_up(); | 
| 154 |                 return; | 
| 155 |             } | 
| 156 |  | 
| 157 |             // Otherwise, let's move from IDLE to SLEEPING. | 
| 158 |             if self.counters.try_add_sleeping_thread(counters) { | 
| 159 |                 break; | 
| 160 |             } | 
| 161 |         } | 
| 162 |  | 
| 163 |         // Successfully registered as asleep. | 
| 164 |  | 
| 165 |         // We have one last check for injected jobs to do. This protects against | 
| 166 |         // deadlock in the very unlikely event that | 
| 167 |         // | 
| 168 |         // - an external job is being injected while we are sleepy | 
| 169 |         // - that job triggers the rollover over the JEC such that we don't see it | 
| 170 |         // - we are the last active worker thread | 
| 171 |         std::sync::atomic::fence(Ordering::SeqCst); | 
| 172 |         if has_injected_jobs() { | 
| 173 |             // If we see an externally injected job, then we have to 'wake | 
| 174 |             // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by | 
| 175 |             // the one that wakes us.) | 
| 176 |             self.counters.sub_sleeping_thread(); | 
| 177 |         } else { | 
| 178 |             // If we don't see an injected job (the normal case), then flag | 
| 179 |             // ourselves as asleep and wait till we are notified. | 
| 180 |             // | 
| 181 |             // (Note that `is_blocked` is held under a mutex and the mutex was | 
| 182 |             // acquired *before* we incremented the "sleepy counter". This means | 
| 183 |             // that whomever is coming to wake us will have to wait until we | 
| 184 |             // release the mutex in the call to `wait`, so they will see this | 
| 185 |             // boolean as true.) | 
| 186 |             *is_blocked = true; | 
| 187 |             while *is_blocked { | 
| 188 |                 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap(); | 
| 189 |             } | 
| 190 |         } | 
| 191 |  | 
| 192 |         // Update other state: | 
| 193 |         idle_state.wake_fully(); | 
| 194 |         latch.wake_up(); | 
| 195 |     } | 
| 196 |  | 
| 197 |     /// Notify the given thread that it should wake up (if it is | 
| 198 |     /// sleeping).  When this method is invoked, we typically know the | 
| 199 |     /// thread is asleep, though in rare cases it could have been | 
| 200 |     /// awoken by (e.g.) new work having been posted. | 
| 201 |     pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { | 
| 202 |         self.wake_specific_thread(target_worker_index); | 
| 203 |     } | 
| 204 |  | 
| 205 |     /// Signals that `num_jobs` new jobs were injected into the thread | 
| 206 |     /// pool from outside. This function will ensure that there are | 
| 207 |     /// threads available to process them, waking threads from sleep | 
| 208 |     /// if necessary. | 
| 209 |     /// | 
| 210 |     /// # Parameters | 
| 211 |     /// | 
| 212 |     /// - `num_jobs` -- lower bound on number of jobs available for stealing. | 
| 213 |     ///   We'll try to get at least one thread per job. | 
| 214 |     #[inline ] | 
| 215 |     pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) { | 
| 216 |         // This fence is needed to guarantee that threads | 
| 217 |         // as they are about to fall asleep, observe any | 
| 218 |         // new jobs that may have been injected. | 
| 219 |         std::sync::atomic::fence(Ordering::SeqCst); | 
| 220 |  | 
| 221 |         self.new_jobs(num_jobs, queue_was_empty) | 
| 222 |     } | 
| 223 |  | 
| 224 |     /// Signals that `num_jobs` new jobs were pushed onto a thread's | 
| 225 |     /// local deque. This function will try to ensure that there are | 
| 226 |     /// threads available to process them, waking threads from sleep | 
| 227 |     /// if necessary. However, this is not guaranteed: under certain | 
| 228 |     /// race conditions, the function may fail to wake any new | 
| 229 |     /// threads; in that case the existing thread should eventually | 
| 230 |     /// pop the job. | 
| 231 |     /// | 
| 232 |     /// # Parameters | 
| 233 |     /// | 
| 234 |     /// - `num_jobs` -- lower bound on number of jobs available for stealing. | 
| 235 |     ///   We'll try to get at least one thread per job. | 
| 236 |     #[inline ] | 
| 237 |     pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) { | 
| 238 |         self.new_jobs(num_jobs, queue_was_empty) | 
| 239 |     } | 
| 240 |  | 
| 241 |     /// Common helper for `new_injected_jobs` and `new_internal_jobs`. | 
| 242 |     #[inline ] | 
| 243 |     fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) { | 
| 244 |         // Read the counters and -- if sleepy workers have announced themselves | 
| 245 |         // -- announce that there is now work available. The final value of `counters` | 
| 246 |         // with which we exit the loop thus corresponds to a state when | 
| 247 |         let counters = self | 
| 248 |             .counters | 
| 249 |             .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy); | 
| 250 |         let num_awake_but_idle = counters.awake_but_idle_threads(); | 
| 251 |         let num_sleepers = counters.sleeping_threads(); | 
| 252 |  | 
| 253 |         if num_sleepers == 0 { | 
| 254 |             // nobody to wake | 
| 255 |             return; | 
| 256 |         } | 
| 257 |  | 
| 258 |         // Promote from u16 to u32 so we can interoperate with | 
| 259 |         // num_jobs more easily. | 
| 260 |         let num_awake_but_idle = num_awake_but_idle as u32; | 
| 261 |         let num_sleepers = num_sleepers as u32; | 
| 262 |  | 
| 263 |         // If the queue is non-empty, then we always wake up a worker | 
| 264 |         // -- clearly the existing idle jobs aren't enough. Otherwise, | 
| 265 |         // check to see if we have enough idle workers. | 
| 266 |         if !queue_was_empty { | 
| 267 |             let num_to_wake = std::cmp::min(num_jobs, num_sleepers); | 
| 268 |             self.wake_any_threads(num_to_wake); | 
| 269 |         } else if num_awake_but_idle < num_jobs { | 
| 270 |             let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers); | 
| 271 |             self.wake_any_threads(num_to_wake); | 
| 272 |         } | 
| 273 |     } | 
| 274 |  | 
| 275 |     #[cold ] | 
| 276 |     fn wake_any_threads(&self, mut num_to_wake: u32) { | 
| 277 |         if num_to_wake > 0 { | 
| 278 |             for i in 0..self.worker_sleep_states.len() { | 
| 279 |                 if self.wake_specific_thread(i) { | 
| 280 |                     num_to_wake -= 1; | 
| 281 |                     if num_to_wake == 0 { | 
| 282 |                         return; | 
| 283 |                     } | 
| 284 |                 } | 
| 285 |             } | 
| 286 |         } | 
| 287 |     } | 
| 288 |  | 
| 289 |     fn wake_specific_thread(&self, index: usize) -> bool { | 
| 290 |         let sleep_state = &self.worker_sleep_states[index]; | 
| 291 |  | 
| 292 |         let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); | 
| 293 |         if *is_blocked { | 
| 294 |             *is_blocked = false; | 
| 295 |             sleep_state.condvar.notify_one(); | 
| 296 |  | 
| 297 |             // When the thread went to sleep, it will have incremented | 
| 298 |             // this value. When we wake it, its our job to decrement | 
| 299 |             // it. We could have the thread do it, but that would | 
| 300 |             // introduce a delay between when the thread was | 
| 301 |             // *notified* and when this counter was decremented. That | 
| 302 |             // might mislead people with new work into thinking that | 
| 303 |             // there are sleeping threads that they should try to | 
| 304 |             // wake, when in fact there is nothing left for them to | 
| 305 |             // do. | 
| 306 |             self.counters.sub_sleeping_thread(); | 
| 307 |  | 
| 308 |             true | 
| 309 |         } else { | 
| 310 |             false | 
| 311 |         } | 
| 312 |     } | 
| 313 | } | 
| 314 |  | 
| 315 | impl IdleState { | 
| 316 |     fn wake_fully(&mut self) { | 
| 317 |         self.rounds = 0; | 
| 318 |         self.jobs_counter = JobsEventCounter::DUMMY; | 
| 319 |     } | 
| 320 |  | 
| 321 |     fn wake_partly(&mut self) { | 
| 322 |         self.rounds = ROUNDS_UNTIL_SLEEPY; | 
| 323 |         self.jobs_counter = JobsEventCounter::DUMMY; | 
| 324 |     } | 
| 325 | } | 
| 326 |  |