1 | use crate::primitive::hint; |
2 | use core::cell::Cell; |
3 | use core::fmt; |
4 | |
5 | const SPIN_LIMIT: u32 = 6; |
6 | const YIELD_LIMIT: u32 = 10; |
7 | |
8 | /// Performs exponential backoff in spin loops. |
9 | /// |
10 | /// Backing off in spin loops reduces contention and improves overall performance. |
11 | /// |
12 | /// This primitive can execute *YIELD* and *PAUSE* instructions, yield the current thread to the OS |
13 | /// scheduler, and tell when is a good time to block the thread using a different synchronization |
14 | /// mechanism. Each step of the back off procedure takes roughly twice as long as the previous |
15 | /// step. |
16 | /// |
17 | /// # Examples |
18 | /// |
19 | /// Backing off in a lock-free loop: |
20 | /// |
21 | /// ``` |
22 | /// use crossbeam_utils::Backoff; |
23 | /// use std::sync::atomic::AtomicUsize; |
24 | /// use std::sync::atomic::Ordering::SeqCst; |
25 | /// |
26 | /// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize { |
27 | /// let backoff = Backoff::new(); |
28 | /// loop { |
29 | /// let val = a.load(SeqCst); |
30 | /// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() { |
31 | /// return val; |
32 | /// } |
33 | /// backoff.spin(); |
34 | /// } |
35 | /// } |
36 | /// ``` |
37 | /// |
38 | /// Waiting for an [`AtomicBool`] to become `true`: |
39 | /// |
40 | /// ``` |
41 | /// use crossbeam_utils::Backoff; |
42 | /// use std::sync::atomic::AtomicBool; |
43 | /// use std::sync::atomic::Ordering::SeqCst; |
44 | /// |
45 | /// fn spin_wait(ready: &AtomicBool) { |
46 | /// let backoff = Backoff::new(); |
47 | /// while !ready.load(SeqCst) { |
48 | /// backoff.snooze(); |
49 | /// } |
50 | /// } |
51 | /// ``` |
52 | /// |
53 | /// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait. |
54 | /// Note that whoever sets the atomic variable to `true` must notify the parked thread by calling |
55 | /// [`unpark()`]: |
56 | /// |
57 | /// ``` |
58 | /// use crossbeam_utils::Backoff; |
59 | /// use std::sync::atomic::AtomicBool; |
60 | /// use std::sync::atomic::Ordering::SeqCst; |
61 | /// use std::thread; |
62 | /// |
63 | /// fn blocking_wait(ready: &AtomicBool) { |
64 | /// let backoff = Backoff::new(); |
65 | /// while !ready.load(SeqCst) { |
66 | /// if backoff.is_completed() { |
67 | /// thread::park(); |
68 | /// } else { |
69 | /// backoff.snooze(); |
70 | /// } |
71 | /// } |
72 | /// } |
73 | /// ``` |
74 | /// |
75 | /// [`is_completed`]: Backoff::is_completed |
76 | /// [`std::thread::park()`]: std::thread::park |
77 | /// [`Condvar`]: std::sync::Condvar |
78 | /// [`AtomicBool`]: std::sync::atomic::AtomicBool |
79 | /// [`unpark()`]: std::thread::Thread::unpark |
80 | pub struct Backoff { |
81 | step: Cell<u32>, |
82 | } |
83 | |
84 | impl Backoff { |
85 | /// Creates a new `Backoff`. |
86 | /// |
87 | /// # Examples |
88 | /// |
89 | /// ``` |
90 | /// use crossbeam_utils::Backoff; |
91 | /// |
92 | /// let backoff = Backoff::new(); |
93 | /// ``` |
94 | #[inline ] |
95 | pub fn new() -> Self { |
96 | Backoff { step: Cell::new(0) } |
97 | } |
98 | |
99 | /// Resets the `Backoff`. |
100 | /// |
101 | /// # Examples |
102 | /// |
103 | /// ``` |
104 | /// use crossbeam_utils::Backoff; |
105 | /// |
106 | /// let backoff = Backoff::new(); |
107 | /// backoff.reset(); |
108 | /// ``` |
109 | #[inline ] |
110 | pub fn reset(&self) { |
111 | self.step.set(0); |
112 | } |
113 | |
114 | /// Backs off in a lock-free loop. |
115 | /// |
116 | /// This method should be used when we need to retry an operation because another thread made |
117 | /// progress. |
118 | /// |
119 | /// The processor may yield using the *YIELD* or *PAUSE* instruction. |
120 | /// |
121 | /// # Examples |
122 | /// |
123 | /// Backing off in a lock-free loop: |
124 | /// |
125 | /// ``` |
126 | /// use crossbeam_utils::Backoff; |
127 | /// use std::sync::atomic::AtomicUsize; |
128 | /// use std::sync::atomic::Ordering::SeqCst; |
129 | /// |
130 | /// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize { |
131 | /// let backoff = Backoff::new(); |
132 | /// loop { |
133 | /// let val = a.load(SeqCst); |
134 | /// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() { |
135 | /// return val; |
136 | /// } |
137 | /// backoff.spin(); |
138 | /// } |
139 | /// } |
140 | /// |
141 | /// let a = AtomicUsize::new(7); |
142 | /// assert_eq!(fetch_mul(&a, 8), 7); |
143 | /// assert_eq!(a.load(SeqCst), 56); |
144 | /// ``` |
145 | #[inline ] |
146 | pub fn spin(&self) { |
147 | for _ in 0..1 << self.step.get().min(SPIN_LIMIT) { |
148 | hint::spin_loop(); |
149 | } |
150 | |
151 | if self.step.get() <= SPIN_LIMIT { |
152 | self.step.set(self.step.get() + 1); |
153 | } |
154 | } |
155 | |
156 | /// Backs off in a blocking loop. |
157 | /// |
158 | /// This method should be used when we need to wait for another thread to make progress. |
159 | /// |
160 | /// The processor may yield using the *YIELD* or *PAUSE* instruction and the current thread |
161 | /// may yield by giving up a timeslice to the OS scheduler. |
162 | /// |
163 | /// In `#[no_std]` environments, this method is equivalent to [`spin`]. |
164 | /// |
165 | /// If possible, use [`is_completed`] to check when it is advised to stop using backoff and |
166 | /// block the current thread using a different synchronization mechanism instead. |
167 | /// |
168 | /// [`spin`]: Backoff::spin |
169 | /// [`is_completed`]: Backoff::is_completed |
170 | /// |
171 | /// # Examples |
172 | /// |
173 | /// Waiting for an [`AtomicBool`] to become `true`: |
174 | /// |
175 | /// ``` |
176 | /// use crossbeam_utils::Backoff; |
177 | /// use std::sync::Arc; |
178 | /// use std::sync::atomic::AtomicBool; |
179 | /// use std::sync::atomic::Ordering::SeqCst; |
180 | /// use std::thread; |
181 | /// use std::time::Duration; |
182 | /// |
183 | /// fn spin_wait(ready: &AtomicBool) { |
184 | /// let backoff = Backoff::new(); |
185 | /// while !ready.load(SeqCst) { |
186 | /// backoff.snooze(); |
187 | /// } |
188 | /// } |
189 | /// |
190 | /// let ready = Arc::new(AtomicBool::new(false)); |
191 | /// let ready2 = ready.clone(); |
192 | /// |
193 | /// thread::spawn(move || { |
194 | /// thread::sleep(Duration::from_millis(100)); |
195 | /// ready2.store(true, SeqCst); |
196 | /// }); |
197 | /// |
198 | /// assert_eq!(ready.load(SeqCst), false); |
199 | /// spin_wait(&ready); |
200 | /// assert_eq!(ready.load(SeqCst), true); |
201 | /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
202 | /// ``` |
203 | /// |
204 | /// [`AtomicBool`]: std::sync::atomic::AtomicBool |
205 | #[inline ] |
206 | pub fn snooze(&self) { |
207 | if self.step.get() <= SPIN_LIMIT { |
208 | for _ in 0..1 << self.step.get() { |
209 | hint::spin_loop(); |
210 | } |
211 | } else { |
212 | #[cfg (not(feature = "std" ))] |
213 | for _ in 0..1 << self.step.get() { |
214 | hint::spin_loop(); |
215 | } |
216 | |
217 | #[cfg (feature = "std" )] |
218 | ::std::thread::yield_now(); |
219 | } |
220 | |
221 | if self.step.get() <= YIELD_LIMIT { |
222 | self.step.set(self.step.get() + 1); |
223 | } |
224 | } |
225 | |
226 | /// Returns `true` if exponential backoff has completed and blocking the thread is advised. |
227 | /// |
228 | /// # Examples |
229 | /// |
230 | /// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait: |
231 | /// |
232 | /// ``` |
233 | /// use crossbeam_utils::Backoff; |
234 | /// use std::sync::Arc; |
235 | /// use std::sync::atomic::AtomicBool; |
236 | /// use std::sync::atomic::Ordering::SeqCst; |
237 | /// use std::thread; |
238 | /// use std::time::Duration; |
239 | /// |
240 | /// fn blocking_wait(ready: &AtomicBool) { |
241 | /// let backoff = Backoff::new(); |
242 | /// while !ready.load(SeqCst) { |
243 | /// if backoff.is_completed() { |
244 | /// thread::park(); |
245 | /// } else { |
246 | /// backoff.snooze(); |
247 | /// } |
248 | /// } |
249 | /// } |
250 | /// |
251 | /// let ready = Arc::new(AtomicBool::new(false)); |
252 | /// let ready2 = ready.clone(); |
253 | /// let waiter = thread::current(); |
254 | /// |
255 | /// thread::spawn(move || { |
256 | /// thread::sleep(Duration::from_millis(100)); |
257 | /// ready2.store(true, SeqCst); |
258 | /// waiter.unpark(); |
259 | /// }); |
260 | /// |
261 | /// assert_eq!(ready.load(SeqCst), false); |
262 | /// blocking_wait(&ready); |
263 | /// assert_eq!(ready.load(SeqCst), true); |
264 | /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
265 | /// ``` |
266 | /// |
267 | /// [`AtomicBool`]: std::sync::atomic::AtomicBool |
268 | #[inline ] |
269 | pub fn is_completed(&self) -> bool { |
270 | self.step.get() > YIELD_LIMIT |
271 | } |
272 | } |
273 | |
274 | impl fmt::Debug for Backoff { |
275 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
276 | f.debug_struct("Backoff" ) |
277 | .field("step" , &self.step) |
278 | .field("is_completed" , &self.is_completed()) |
279 | .finish() |
280 | } |
281 | } |
282 | |
283 | impl Default for Backoff { |
284 | fn default() -> Backoff { |
285 | Backoff::new() |
286 | } |
287 | } |
288 | |