1 | use crate::primitive::sync::atomic; |
2 | use core::cell::Cell; |
3 | use core::fmt; |
4 | |
5 | const SPIN_LIMIT: u32 = 6; |
6 | const YIELD_LIMIT: u32 = 10; |
7 | |
8 | /// Performs exponential backoff in spin loops. |
9 | /// |
10 | /// Backing off in spin loops reduces contention and improves overall performance. |
11 | /// |
12 | /// This primitive can execute *YIELD* and *PAUSE* instructions, yield the current thread to the OS |
13 | /// scheduler, and tell when is a good time to block the thread using a different synchronization |
14 | /// mechanism. Each step of the back off procedure takes roughly twice as long as the previous |
15 | /// step. |
16 | /// |
17 | /// # Examples |
18 | /// |
19 | /// Backing off in a lock-free loop: |
20 | /// |
21 | /// ``` |
22 | /// use crossbeam_utils::Backoff; |
23 | /// use std::sync::atomic::AtomicUsize; |
24 | /// use std::sync::atomic::Ordering::SeqCst; |
25 | /// |
26 | /// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize { |
27 | /// let backoff = Backoff::new(); |
28 | /// loop { |
29 | /// let val = a.load(SeqCst); |
30 | /// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() { |
31 | /// return val; |
32 | /// } |
33 | /// backoff.spin(); |
34 | /// } |
35 | /// } |
36 | /// ``` |
37 | /// |
38 | /// Waiting for an [`AtomicBool`] to become `true`: |
39 | /// |
40 | /// ``` |
41 | /// use crossbeam_utils::Backoff; |
42 | /// use std::sync::atomic::AtomicBool; |
43 | /// use std::sync::atomic::Ordering::SeqCst; |
44 | /// |
45 | /// fn spin_wait(ready: &AtomicBool) { |
46 | /// let backoff = Backoff::new(); |
47 | /// while !ready.load(SeqCst) { |
48 | /// backoff.snooze(); |
49 | /// } |
50 | /// } |
51 | /// ``` |
52 | /// |
53 | /// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait. |
54 | /// Note that whoever sets the atomic variable to `true` must notify the parked thread by calling |
55 | /// [`unpark()`]: |
56 | /// |
57 | /// ``` |
58 | /// use crossbeam_utils::Backoff; |
59 | /// use std::sync::atomic::AtomicBool; |
60 | /// use std::sync::atomic::Ordering::SeqCst; |
61 | /// use std::thread; |
62 | /// |
63 | /// fn blocking_wait(ready: &AtomicBool) { |
64 | /// let backoff = Backoff::new(); |
65 | /// while !ready.load(SeqCst) { |
66 | /// if backoff.is_completed() { |
67 | /// thread::park(); |
68 | /// } else { |
69 | /// backoff.snooze(); |
70 | /// } |
71 | /// } |
72 | /// } |
73 | /// ``` |
74 | /// |
75 | /// [`is_completed`]: Backoff::is_completed |
76 | /// [`std::thread::park()`]: std::thread::park |
77 | /// [`Condvar`]: std::sync::Condvar |
78 | /// [`AtomicBool`]: std::sync::atomic::AtomicBool |
79 | /// [`unpark()`]: std::thread::Thread::unpark |
80 | pub struct Backoff { |
81 | step: Cell<u32>, |
82 | } |
83 | |
84 | impl Backoff { |
85 | /// Creates a new `Backoff`. |
86 | /// |
87 | /// # Examples |
88 | /// |
89 | /// ``` |
90 | /// use crossbeam_utils::Backoff; |
91 | /// |
92 | /// let backoff = Backoff::new(); |
93 | /// ``` |
94 | #[inline ] |
95 | pub fn new() -> Self { |
96 | Backoff { step: Cell::new(0) } |
97 | } |
98 | |
99 | /// Resets the `Backoff`. |
100 | /// |
101 | /// # Examples |
102 | /// |
103 | /// ``` |
104 | /// use crossbeam_utils::Backoff; |
105 | /// |
106 | /// let backoff = Backoff::new(); |
107 | /// backoff.reset(); |
108 | /// ``` |
109 | #[inline ] |
110 | pub fn reset(&self) { |
111 | self.step.set(0); |
112 | } |
113 | |
114 | /// Backs off in a lock-free loop. |
115 | /// |
116 | /// This method should be used when we need to retry an operation because another thread made |
117 | /// progress. |
118 | /// |
119 | /// The processor may yield using the *YIELD* or *PAUSE* instruction. |
120 | /// |
121 | /// # Examples |
122 | /// |
123 | /// Backing off in a lock-free loop: |
124 | /// |
125 | /// ``` |
126 | /// use crossbeam_utils::Backoff; |
127 | /// use std::sync::atomic::AtomicUsize; |
128 | /// use std::sync::atomic::Ordering::SeqCst; |
129 | /// |
130 | /// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize { |
131 | /// let backoff = Backoff::new(); |
132 | /// loop { |
133 | /// let val = a.load(SeqCst); |
134 | /// if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() { |
135 | /// return val; |
136 | /// } |
137 | /// backoff.spin(); |
138 | /// } |
139 | /// } |
140 | /// |
141 | /// let a = AtomicUsize::new(7); |
142 | /// assert_eq!(fetch_mul(&a, 8), 7); |
143 | /// assert_eq!(a.load(SeqCst), 56); |
144 | /// ``` |
145 | #[inline ] |
146 | pub fn spin(&self) { |
147 | for _ in 0..1 << self.step.get().min(SPIN_LIMIT) { |
148 | // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, |
149 | // use [`core::hint::spin_loop`] instead. |
150 | #[allow (deprecated)] |
151 | atomic::spin_loop_hint(); |
152 | } |
153 | |
154 | if self.step.get() <= SPIN_LIMIT { |
155 | self.step.set(self.step.get() + 1); |
156 | } |
157 | } |
158 | |
159 | /// Backs off in a blocking loop. |
160 | /// |
161 | /// This method should be used when we need to wait for another thread to make progress. |
162 | /// |
163 | /// The processor may yield using the *YIELD* or *PAUSE* instruction and the current thread |
164 | /// may yield by giving up a timeslice to the OS scheduler. |
165 | /// |
166 | /// In `#[no_std]` environments, this method is equivalent to [`spin`]. |
167 | /// |
168 | /// If possible, use [`is_completed`] to check when it is advised to stop using backoff and |
169 | /// block the current thread using a different synchronization mechanism instead. |
170 | /// |
171 | /// [`spin`]: Backoff::spin |
172 | /// [`is_completed`]: Backoff::is_completed |
173 | /// |
174 | /// # Examples |
175 | /// |
176 | /// Waiting for an [`AtomicBool`] to become `true`: |
177 | /// |
178 | /// ``` |
179 | /// use crossbeam_utils::Backoff; |
180 | /// use std::sync::Arc; |
181 | /// use std::sync::atomic::AtomicBool; |
182 | /// use std::sync::atomic::Ordering::SeqCst; |
183 | /// use std::thread; |
184 | /// use std::time::Duration; |
185 | /// |
186 | /// fn spin_wait(ready: &AtomicBool) { |
187 | /// let backoff = Backoff::new(); |
188 | /// while !ready.load(SeqCst) { |
189 | /// backoff.snooze(); |
190 | /// } |
191 | /// } |
192 | /// |
193 | /// let ready = Arc::new(AtomicBool::new(false)); |
194 | /// let ready2 = ready.clone(); |
195 | /// |
196 | /// thread::spawn(move || { |
197 | /// thread::sleep(Duration::from_millis(100)); |
198 | /// ready2.store(true, SeqCst); |
199 | /// }); |
200 | /// |
201 | /// assert_eq!(ready.load(SeqCst), false); |
202 | /// spin_wait(&ready); |
203 | /// assert_eq!(ready.load(SeqCst), true); |
204 | /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
205 | /// ``` |
206 | /// |
207 | /// [`AtomicBool`]: std::sync::atomic::AtomicBool |
208 | #[inline ] |
209 | pub fn snooze(&self) { |
210 | if self.step.get() <= SPIN_LIMIT { |
211 | for _ in 0..1 << self.step.get() { |
212 | // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, |
213 | // use [`core::hint::spin_loop`] instead. |
214 | #[allow (deprecated)] |
215 | atomic::spin_loop_hint(); |
216 | } |
217 | } else { |
218 | #[cfg (not(feature = "std" ))] |
219 | for _ in 0..1 << self.step.get() { |
220 | // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+, |
221 | // use [`core::hint::spin_loop`] instead. |
222 | #[allow (deprecated)] |
223 | atomic::spin_loop_hint(); |
224 | } |
225 | |
226 | #[cfg (feature = "std" )] |
227 | ::std::thread::yield_now(); |
228 | } |
229 | |
230 | if self.step.get() <= YIELD_LIMIT { |
231 | self.step.set(self.step.get() + 1); |
232 | } |
233 | } |
234 | |
235 | /// Returns `true` if exponential backoff has completed and blocking the thread is advised. |
236 | /// |
237 | /// # Examples |
238 | /// |
239 | /// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait: |
240 | /// |
241 | /// ``` |
242 | /// use crossbeam_utils::Backoff; |
243 | /// use std::sync::Arc; |
244 | /// use std::sync::atomic::AtomicBool; |
245 | /// use std::sync::atomic::Ordering::SeqCst; |
246 | /// use std::thread; |
247 | /// use std::time::Duration; |
248 | /// |
249 | /// fn blocking_wait(ready: &AtomicBool) { |
250 | /// let backoff = Backoff::new(); |
251 | /// while !ready.load(SeqCst) { |
252 | /// if backoff.is_completed() { |
253 | /// thread::park(); |
254 | /// } else { |
255 | /// backoff.snooze(); |
256 | /// } |
257 | /// } |
258 | /// } |
259 | /// |
260 | /// let ready = Arc::new(AtomicBool::new(false)); |
261 | /// let ready2 = ready.clone(); |
262 | /// let waiter = thread::current(); |
263 | /// |
264 | /// thread::spawn(move || { |
265 | /// thread::sleep(Duration::from_millis(100)); |
266 | /// ready2.store(true, SeqCst); |
267 | /// waiter.unpark(); |
268 | /// }); |
269 | /// |
270 | /// assert_eq!(ready.load(SeqCst), false); |
271 | /// blocking_wait(&ready); |
272 | /// assert_eq!(ready.load(SeqCst), true); |
273 | /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
274 | /// ``` |
275 | /// |
276 | /// [`AtomicBool`]: std::sync::atomic::AtomicBool |
277 | #[inline ] |
278 | pub fn is_completed(&self) -> bool { |
279 | self.step.get() > YIELD_LIMIT |
280 | } |
281 | } |
282 | |
283 | impl fmt::Debug for Backoff { |
284 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
285 | f&mut DebugStruct<'_, '_>.debug_struct("Backoff" ) |
286 | .field("step" , &self.step) |
287 | .field(name:"is_completed" , &self.is_completed()) |
288 | .finish() |
289 | } |
290 | } |
291 | |
292 | impl Default for Backoff { |
293 | fn default() -> Backoff { |
294 | Backoff::new() |
295 | } |
296 | } |
297 | |