1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
3
4use crate::latch::CoreLatch;
5use crossbeam_utils::CachePadded;
6use std::sync::atomic::Ordering;
7use std::sync::{Condvar, Mutex};
8use std::thread;
9use std::usize;
10
11mod counters;
12pub(crate) use self::counters::THREADS_MAX;
13use self::counters::{AtomicCounters, JobsEventCounter};
14
15/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
16/// of workers. It has callbacks that are invoked periodically at significant events,
17/// such as when workers are looping and looking for work, when latches are set, or when
18/// jobs are published, and it either blocks threads or wakes them in response to these
19/// events. See the [`README.md`] in this module for more details.
20///
21/// [`README.md`] README.md
22pub(super) struct Sleep {
23 /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
24 /// them block.
25 worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
26
27 counters: AtomicCounters,
28}
29
30/// An instance of this struct is created when a thread becomes idle.
31/// It is consumed when the thread finds work, and passed by `&mut`
32/// reference for operations that preserve the idle state. (In other
33/// words, producing one of these structs is evidence the thread is
34/// idle.) It tracks state such as how long the thread has been idle.
35pub(super) struct IdleState {
36 /// What is worker index of the idle thread?
37 worker_index: usize,
38
39 /// How many rounds have we been circling without sleeping?
40 rounds: u32,
41
42 /// Once we become sleepy, what was the sleepy counter value?
43 /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
44 jobs_counter: JobsEventCounter,
45}
46
47/// The "sleep state" for an individual worker.
48#[derive(Default)]
49struct WorkerSleepState {
50 /// Set to true when the worker goes to sleep; set to false when
51 /// the worker is notified or when it wakes.
52 is_blocked: Mutex<bool>,
53
54 condvar: Condvar,
55}
56
57const ROUNDS_UNTIL_SLEEPY: u32 = 32;
58const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
59
60impl Sleep {
61 pub(super) fn new(n_threads: usize) -> Sleep {
62 assert!(n_threads <= THREADS_MAX);
63 Sleep {
64 worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
65 counters: AtomicCounters::new(),
66 }
67 }
68
69 #[inline]
70 pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
71 self.counters.add_inactive_thread();
72
73 IdleState {
74 worker_index,
75 rounds: 0,
76 jobs_counter: JobsEventCounter::DUMMY,
77 }
78 }
79
80 #[inline]
81 pub(super) fn work_found(&self) {
82 // If we were the last idle thread and other threads are still sleeping,
83 // then we should wake up another thread.
84 let threads_to_wake = self.counters.sub_inactive_thread();
85 self.wake_any_threads(threads_to_wake as u32);
86 }
87
88 #[inline]
89 pub(super) fn no_work_found(
90 &self,
91 idle_state: &mut IdleState,
92 latch: &CoreLatch,
93 has_injected_jobs: impl FnOnce() -> bool,
94 ) {
95 if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
96 thread::yield_now();
97 idle_state.rounds += 1;
98 } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
99 idle_state.jobs_counter = self.announce_sleepy();
100 idle_state.rounds += 1;
101 thread::yield_now();
102 } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
103 idle_state.rounds += 1;
104 thread::yield_now();
105 } else {
106 debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
107 self.sleep(idle_state, latch, has_injected_jobs);
108 }
109 }
110
111 #[cold]
112 fn announce_sleepy(&self) -> JobsEventCounter {
113 self.counters
114 .increment_jobs_event_counter_if(JobsEventCounter::is_active)
115 .jobs_counter()
116 }
117
118 #[cold]
119 fn sleep(
120 &self,
121 idle_state: &mut IdleState,
122 latch: &CoreLatch,
123 has_injected_jobs: impl FnOnce() -> bool,
124 ) {
125 let worker_index = idle_state.worker_index;
126
127 if !latch.get_sleepy() {
128 return;
129 }
130
131 let sleep_state = &self.worker_sleep_states[worker_index];
132 let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
133 debug_assert!(!*is_blocked);
134
135 // Our latch was signalled. We should wake back up fully as we
136 // will have some stuff to do.
137 if !latch.fall_asleep() {
138 idle_state.wake_fully();
139 return;
140 }
141
142 loop {
143 let counters = self.counters.load(Ordering::SeqCst);
144
145 // Check if the JEC has changed since we got sleepy.
146 debug_assert!(idle_state.jobs_counter.is_sleepy());
147 if counters.jobs_counter() != idle_state.jobs_counter {
148 // JEC has changed, so a new job was posted, but for some reason
149 // we didn't see it. We should return to just before the SLEEPY
150 // state so we can do another search and (if we fail to find
151 // work) go back to sleep.
152 idle_state.wake_partly();
153 latch.wake_up();
154 return;
155 }
156
157 // Otherwise, let's move from IDLE to SLEEPING.
158 if self.counters.try_add_sleeping_thread(counters) {
159 break;
160 }
161 }
162
163 // Successfully registered as asleep.
164
165 // We have one last check for injected jobs to do. This protects against
166 // deadlock in the very unlikely event that
167 //
168 // - an external job is being injected while we are sleepy
169 // - that job triggers the rollover over the JEC such that we don't see it
170 // - we are the last active worker thread
171 std::sync::atomic::fence(Ordering::SeqCst);
172 if has_injected_jobs() {
173 // If we see an externally injected job, then we have to 'wake
174 // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
175 // the one that wakes us.)
176 self.counters.sub_sleeping_thread();
177 } else {
178 // If we don't see an injected job (the normal case), then flag
179 // ourselves as asleep and wait till we are notified.
180 //
181 // (Note that `is_blocked` is held under a mutex and the mutex was
182 // acquired *before* we incremented the "sleepy counter". This means
183 // that whomever is coming to wake us will have to wait until we
184 // release the mutex in the call to `wait`, so they will see this
185 // boolean as true.)
186 *is_blocked = true;
187 while *is_blocked {
188 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
189 }
190 }
191
192 // Update other state:
193 idle_state.wake_fully();
194 latch.wake_up();
195 }
196
197 /// Notify the given thread that it should wake up (if it is
198 /// sleeping). When this method is invoked, we typically know the
199 /// thread is asleep, though in rare cases it could have been
200 /// awoken by (e.g.) new work having been posted.
201 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
202 self.wake_specific_thread(target_worker_index);
203 }
204
205 /// Signals that `num_jobs` new jobs were injected into the thread
206 /// pool from outside. This function will ensure that there are
207 /// threads available to process them, waking threads from sleep
208 /// if necessary.
209 ///
210 /// # Parameters
211 ///
212 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
213 /// We'll try to get at least one thread per job.
214 #[inline]
215 pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
216 // This fence is needed to guarantee that threads
217 // as they are about to fall asleep, observe any
218 // new jobs that may have been injected.
219 std::sync::atomic::fence(Ordering::SeqCst);
220
221 self.new_jobs(num_jobs, queue_was_empty)
222 }
223
224 /// Signals that `num_jobs` new jobs were pushed onto a thread's
225 /// local deque. This function will try to ensure that there are
226 /// threads available to process them, waking threads from sleep
227 /// if necessary. However, this is not guaranteed: under certain
228 /// race conditions, the function may fail to wake any new
229 /// threads; in that case the existing thread should eventually
230 /// pop the job.
231 ///
232 /// # Parameters
233 ///
234 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
235 /// We'll try to get at least one thread per job.
236 #[inline]
237 pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
238 self.new_jobs(num_jobs, queue_was_empty)
239 }
240
241 /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
242 #[inline]
243 fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
244 // Read the counters and -- if sleepy workers have announced themselves
245 // -- announce that there is now work available. The final value of `counters`
246 // with which we exit the loop thus corresponds to a state when
247 let counters = self
248 .counters
249 .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
250 let num_awake_but_idle = counters.awake_but_idle_threads();
251 let num_sleepers = counters.sleeping_threads();
252
253 if num_sleepers == 0 {
254 // nobody to wake
255 return;
256 }
257
258 // Promote from u16 to u32 so we can interoperate with
259 // num_jobs more easily.
260 let num_awake_but_idle = num_awake_but_idle as u32;
261 let num_sleepers = num_sleepers as u32;
262
263 // If the queue is non-empty, then we always wake up a worker
264 // -- clearly the existing idle jobs aren't enough. Otherwise,
265 // check to see if we have enough idle workers.
266 if !queue_was_empty {
267 let num_to_wake = std::cmp::min(num_jobs, num_sleepers);
268 self.wake_any_threads(num_to_wake);
269 } else if num_awake_but_idle < num_jobs {
270 let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
271 self.wake_any_threads(num_to_wake);
272 }
273 }
274
275 #[cold]
276 fn wake_any_threads(&self, mut num_to_wake: u32) {
277 if num_to_wake > 0 {
278 for i in 0..self.worker_sleep_states.len() {
279 if self.wake_specific_thread(i) {
280 num_to_wake -= 1;
281 if num_to_wake == 0 {
282 return;
283 }
284 }
285 }
286 }
287 }
288
289 fn wake_specific_thread(&self, index: usize) -> bool {
290 let sleep_state = &self.worker_sleep_states[index];
291
292 let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
293 if *is_blocked {
294 *is_blocked = false;
295 sleep_state.condvar.notify_one();
296
297 // When the thread went to sleep, it will have incremented
298 // this value. When we wake it, its our job to decrement
299 // it. We could have the thread do it, but that would
300 // introduce a delay between when the thread was
301 // *notified* and when this counter was decremented. That
302 // might mislead people with new work into thinking that
303 // there are sleeping threads that they should try to
304 // wake, when in fact there is nothing left for them to
305 // do.
306 self.counters.sub_sleeping_thread();
307
308 true
309 } else {
310 false
311 }
312 }
313}
314
315impl IdleState {
316 fn wake_fully(&mut self) {
317 self.rounds = 0;
318 self.jobs_counter = JobsEventCounter::DUMMY;
319 }
320
321 fn wake_partly(&mut self) {
322 self.rounds = ROUNDS_UNTIL_SLEEPY;
323 self.jobs_counter = JobsEventCounter::DUMMY;
324 }
325}
326