1//! The global data and participant for garbage collection.
2//!
3//! # Registration
4//!
5//! In order to track all participants in one place, we need some form of participant
6//! registration. When a participant is created, it is registered to a global lock-free
7//! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
8//! list.
9//!
10//! # Pinning
11//!
12//! Every participant contains an integer that tells whether the participant is pinned and if so,
13//! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
14//! aids in periodic global epoch advancement.
15//!
16//! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
17//! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
18//!
19//! # Thread-local bag
20//!
21//! Objects that get unlinked from concurrent data structures must be stashed away until the global
22//! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
23//! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
24//! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
25//! for amortizing the synchronization cost of pushing the garbages to a global queue.
26//!
27//! # Global queue
28//!
29//! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
30//! destroyed along the way. This design reduces contention on data structures. The global queue
31//! cannot be explicitly accessed: the only way to interact with it is by calling functions
32//! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers
33//! garbage collection.
34//!
35//! Ideally each instance of concurrent data structure may have its own queue that gets fully
36//! destroyed as soon as the data structure gets dropped.
37
38use crate::primitive::cell::UnsafeCell;
39use crate::primitive::sync::atomic::{self, Ordering};
40use core::cell::Cell;
41use core::mem::{self, ManuallyDrop};
42use core::num::Wrapping;
43use core::{fmt, ptr};
44
45use crossbeam_utils::CachePadded;
46use memoffset::offset_of;
47
48use crate::atomic::{Owned, Shared};
49use crate::collector::{Collector, LocalHandle};
50use crate::deferred::Deferred;
51use crate::epoch::{AtomicEpoch, Epoch};
52use crate::guard::{unprotected, Guard};
53use crate::sync::list::{Entry, IsElement, IterError, List};
54use crate::sync::queue::Queue;
55
56/// Maximum number of objects a bag can contain.
57#[cfg(not(any(crossbeam_sanitize, miri)))]
58const MAX_OBJECTS: usize = 64;
59// Makes it more likely to trigger any potential data races.
60#[cfg(any(crossbeam_sanitize, miri))]
61const MAX_OBJECTS: usize = 4;
62
63/// A bag of deferred functions.
64pub(crate) struct Bag {
65 /// Stashed objects.
66 deferreds: [Deferred; MAX_OBJECTS],
67 len: usize,
68}
69
70/// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
71unsafe impl Send for Bag {}
72
73impl Bag {
74 /// Returns a new, empty bag.
75 pub(crate) fn new() -> Self {
76 Self::default()
77 }
78
79 /// Returns `true` if the bag is empty.
80 pub(crate) fn is_empty(&self) -> bool {
81 self.len == 0
82 }
83
84 /// Attempts to insert a deferred function into the bag.
85 ///
86 /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
87 /// full.
88 ///
89 /// # Safety
90 ///
91 /// It should be safe for another thread to execute the given function.
92 pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
93 if self.len < MAX_OBJECTS {
94 self.deferreds[self.len] = deferred;
95 self.len += 1;
96 Ok(())
97 } else {
98 Err(deferred)
99 }
100 }
101
102 /// Seals the bag with the given epoch.
103 fn seal(self, epoch: Epoch) -> SealedBag {
104 SealedBag { epoch, _bag: self }
105 }
106}
107
108impl Default for Bag {
109 fn default() -> Self {
110 Bag {
111 len: 0,
112 deferreds: [Deferred::NO_OP; MAX_OBJECTS],
113 }
114 }
115}
116
117impl Drop for Bag {
118 fn drop(&mut self) {
119 // Call all deferred functions.
120 for deferred in &mut self.deferreds[..self.len] {
121 let no_op = Deferred::NO_OP;
122 let owned_deferred = mem::replace(deferred, no_op);
123 owned_deferred.call();
124 }
125 }
126}
127
128// can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long
129impl fmt::Debug for Bag {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 f.debug_struct("Bag")
132 .field("deferreds", &&self.deferreds[..self.len])
133 .finish()
134 }
135}
136
137/// A pair of an epoch and a bag.
138#[derive(Default, Debug)]
139struct SealedBag {
140 epoch: Epoch,
141 _bag: Bag,
142}
143
144/// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
145unsafe impl Sync for SealedBag {}
146
147impl SealedBag {
148 /// Checks if it is safe to drop the bag w.r.t. the given global epoch.
149 fn is_expired(&self, global_epoch: Epoch) -> bool {
150 // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
151 // is within one epoch of the current one cannot be destroyed yet.
152 global_epoch.wrapping_sub(self.epoch) >= 2
153 }
154}
155
156/// The global data for a garbage collector.
157pub(crate) struct Global {
158 /// The intrusive linked list of `Local`s.
159 locals: List<Local>,
160
161 /// The global queue of bags of deferred functions.
162 queue: Queue<SealedBag>,
163
164 /// The global epoch.
165 pub(crate) epoch: CachePadded<AtomicEpoch>,
166}
167
168impl Global {
169 /// Number of bags to destroy.
170 const COLLECT_STEPS: usize = 8;
171
172 /// Creates a new global data for garbage collection.
173 #[inline]
174 pub(crate) fn new() -> Self {
175 Self {
176 locals: List::new(),
177 queue: Queue::new(),
178 epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
179 }
180 }
181
182 /// Pushes the bag into the global queue and replaces the bag with a new empty bag.
183 pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
184 let bag = mem::replace(bag, Bag::new());
185
186 atomic::fence(Ordering::SeqCst);
187
188 let epoch = self.epoch.load(Ordering::Relaxed);
189 self.queue.push(bag.seal(epoch), guard);
190 }
191
192 /// Collects several bags from the global queue and executes deferred functions in them.
193 ///
194 /// Note: This may itself produce garbage and in turn allocate new bags.
195 ///
196 /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
197 /// path. In other words, we want the compiler to optimize branching for the case when
198 /// `collect()` is not called.
199 #[cold]
200 pub(crate) fn collect(&self, guard: &Guard) {
201 let global_epoch = self.try_advance(guard);
202
203 let steps = if cfg!(crossbeam_sanitize) {
204 usize::max_value()
205 } else {
206 Self::COLLECT_STEPS
207 };
208
209 for _ in 0..steps {
210 match self.queue.try_pop_if(
211 &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
212 guard,
213 ) {
214 None => break,
215 Some(sealed_bag) => drop(sealed_bag),
216 }
217 }
218 }
219
220 /// Attempts to advance the global epoch.
221 ///
222 /// The global epoch can advance only if all currently pinned participants have been pinned in
223 /// the current epoch.
224 ///
225 /// Returns the current global epoch.
226 ///
227 /// `try_advance()` is annotated `#[cold]` because it is rarely called.
228 #[cold]
229 pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
230 let global_epoch = self.epoch.load(Ordering::Relaxed);
231 atomic::fence(Ordering::SeqCst);
232
233 // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
234 // easy to implement in a lock-free manner. However, traversal can be slow due to cache
235 // misses and data dependencies. We should experiment with other data structures as well.
236 for local in self.locals.iter(guard) {
237 match local {
238 Err(IterError::Stalled) => {
239 // A concurrent thread stalled this iteration. That thread might also try to
240 // advance the epoch, in which case we leave the job to it. Otherwise, the
241 // epoch will not be advanced.
242 return global_epoch;
243 }
244 Ok(local) => {
245 let local_epoch = local.epoch.load(Ordering::Relaxed);
246
247 // If the participant was pinned in a different epoch, we cannot advance the
248 // global epoch just yet.
249 if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
250 return global_epoch;
251 }
252 }
253 }
254 }
255 atomic::fence(Ordering::Acquire);
256
257 // All pinned participants were pinned in the current global epoch.
258 // Now let's advance the global epoch...
259 //
260 // Note that if another thread already advanced it before us, this store will simply
261 // overwrite the global epoch with the same value. This is true because `try_advance` was
262 // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
263 // advanced two steps ahead of it.
264 let new_epoch = global_epoch.successor();
265 self.epoch.store(new_epoch, Ordering::Release);
266 new_epoch
267 }
268}
269
270/// Participant for garbage collection.
271pub(crate) struct Local {
272 /// A node in the intrusive linked list of `Local`s.
273 entry: Entry,
274
275 /// A reference to the global data.
276 ///
277 /// When all guards and handles get dropped, this reference is destroyed.
278 collector: UnsafeCell<ManuallyDrop<Collector>>,
279
280 /// The local bag of deferred functions.
281 pub(crate) bag: UnsafeCell<Bag>,
282
283 /// The number of guards keeping this participant pinned.
284 guard_count: Cell<usize>,
285
286 /// The number of active handles.
287 handle_count: Cell<usize>,
288
289 /// Total number of pinnings performed.
290 ///
291 /// This is just an auxiliary counter that sometimes kicks off collection.
292 pin_count: Cell<Wrapping<usize>>,
293
294 /// The local epoch.
295 epoch: CachePadded<AtomicEpoch>,
296}
297
298// Make sure `Local` is less than or equal to 2048 bytes.
299// https://github.com/crossbeam-rs/crossbeam/issues/551
300#[cfg(not(any(crossbeam_sanitize, miri)))] // `crossbeam_sanitize` and `miri` reduce the size of `Local`
301#[test]
302fn local_size() {
303 // TODO: https://github.com/crossbeam-rs/crossbeam/issues/869
304 // assert!(
305 // core::mem::size_of::<Local>() <= 2048,
306 // "An allocation of `Local` should be <= 2048 bytes."
307 // );
308}
309
310impl Local {
311 /// Number of pinnings after which a participant will execute some deferred functions from the
312 /// global queue.
313 const PINNINGS_BETWEEN_COLLECT: usize = 128;
314
315 /// Registers a new `Local` in the provided `Global`.
316 pub(crate) fn register(collector: &Collector) -> LocalHandle {
317 unsafe {
318 // Since we dereference no pointers in this block, it is safe to use `unprotected`.
319
320 let local = Owned::new(Local {
321 entry: Entry::default(),
322 collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
323 bag: UnsafeCell::new(Bag::new()),
324 guard_count: Cell::new(0),
325 handle_count: Cell::new(1),
326 pin_count: Cell::new(Wrapping(0)),
327 epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
328 })
329 .into_shared(unprotected());
330 collector.global.locals.insert(local, unprotected());
331 LocalHandle {
332 local: local.as_raw(),
333 }
334 }
335 }
336
337 /// Returns a reference to the `Global` in which this `Local` resides.
338 #[inline]
339 pub(crate) fn global(&self) -> &Global {
340 &self.collector().global
341 }
342
343 /// Returns a reference to the `Collector` in which this `Local` resides.
344 #[inline]
345 pub(crate) fn collector(&self) -> &Collector {
346 self.collector.with(|c| unsafe { &**c })
347 }
348
349 /// Returns `true` if the current participant is pinned.
350 #[inline]
351 pub(crate) fn is_pinned(&self) -> bool {
352 self.guard_count.get() > 0
353 }
354
355 /// Adds `deferred` to the thread-local bag.
356 ///
357 /// # Safety
358 ///
359 /// It should be safe for another thread to execute the given function.
360 pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
361 let bag = self.bag.with_mut(|b| &mut *b);
362
363 while let Err(d) = bag.try_push(deferred) {
364 self.global().push_bag(bag, guard);
365 deferred = d;
366 }
367 }
368
369 pub(crate) fn flush(&self, guard: &Guard) {
370 let bag = self.bag.with_mut(|b| unsafe { &mut *b });
371
372 if !bag.is_empty() {
373 self.global().push_bag(bag, guard);
374 }
375
376 self.global().collect(guard);
377 }
378
379 /// Pins the `Local`.
380 #[inline]
381 pub(crate) fn pin(&self) -> Guard {
382 let guard = Guard { local: self };
383
384 let guard_count = self.guard_count.get();
385 self.guard_count.set(guard_count.checked_add(1).unwrap());
386
387 if guard_count == 0 {
388 let global_epoch = self.global().epoch.load(Ordering::Relaxed);
389 let new_epoch = global_epoch.pinned();
390
391 // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
392 // The fence makes sure that any future loads from `Atomic`s will not happen before
393 // this store.
394 if cfg!(all(
395 any(target_arch = "x86", target_arch = "x86_64"),
396 not(miri)
397 )) {
398 // HACK(stjepang): On x86 architectures there are two different ways of executing
399 // a `SeqCst` fence.
400 //
401 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
402 // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
403 // instruction.
404 //
405 // Both instructions have the effect of a full barrier, but benchmarks have shown
406 // that the second one makes pinning faster in this particular case. It is not
407 // clear that this is permitted by the C++ memory model (SC fences work very
408 // differently from SC accesses), but experimental evidence suggests that this
409 // works fine. Using inline assembly would be a viable (and correct) alternative,
410 // but alas, that is not possible on stable Rust.
411 let current = Epoch::starting();
412 let res = self.epoch.compare_exchange(
413 current,
414 new_epoch,
415 Ordering::SeqCst,
416 Ordering::SeqCst,
417 );
418 debug_assert!(res.is_ok(), "participant was expected to be unpinned");
419 // We add a compiler fence to make it less likely for LLVM to do something wrong
420 // here. Formally, this is not enough to get rid of data races; practically,
421 // it should go a long way.
422 atomic::compiler_fence(Ordering::SeqCst);
423 } else {
424 self.epoch.store(new_epoch, Ordering::Relaxed);
425 atomic::fence(Ordering::SeqCst);
426 }
427
428 // Increment the pin counter.
429 let count = self.pin_count.get();
430 self.pin_count.set(count + Wrapping(1));
431
432 // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
433 // some garbage.
434 if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
435 self.global().collect(&guard);
436 }
437 }
438
439 guard
440 }
441
442 /// Unpins the `Local`.
443 #[inline]
444 pub(crate) fn unpin(&self) {
445 let guard_count = self.guard_count.get();
446 self.guard_count.set(guard_count - 1);
447
448 if guard_count == 1 {
449 self.epoch.store(Epoch::starting(), Ordering::Release);
450
451 if self.handle_count.get() == 0 {
452 self.finalize();
453 }
454 }
455 }
456
457 /// Unpins and then pins the `Local`.
458 #[inline]
459 pub(crate) fn repin(&self) {
460 let guard_count = self.guard_count.get();
461
462 // Update the local epoch only if there's only one guard.
463 if guard_count == 1 {
464 let epoch = self.epoch.load(Ordering::Relaxed);
465 let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
466
467 // Update the local epoch only if the global epoch is greater than the local epoch.
468 if epoch != global_epoch {
469 // We store the new epoch with `Release` because we need to ensure any memory
470 // accesses from the previous epoch do not leak into the new one.
471 self.epoch.store(global_epoch, Ordering::Release);
472
473 // However, we don't need a following `SeqCst` fence, because it is safe for memory
474 // accesses from the new epoch to be executed before updating the local epoch. At
475 // worse, other threads will see the new epoch late and delay GC slightly.
476 }
477 }
478 }
479
480 /// Increments the handle count.
481 #[inline]
482 pub(crate) fn acquire_handle(&self) {
483 let handle_count = self.handle_count.get();
484 debug_assert!(handle_count >= 1);
485 self.handle_count.set(handle_count + 1);
486 }
487
488 /// Decrements the handle count.
489 #[inline]
490 pub(crate) fn release_handle(&self) {
491 let guard_count = self.guard_count.get();
492 let handle_count = self.handle_count.get();
493 debug_assert!(handle_count >= 1);
494 self.handle_count.set(handle_count - 1);
495
496 if guard_count == 0 && handle_count == 1 {
497 self.finalize();
498 }
499 }
500
501 /// Removes the `Local` from the global linked list.
502 #[cold]
503 fn finalize(&self) {
504 debug_assert_eq!(self.guard_count.get(), 0);
505 debug_assert_eq!(self.handle_count.get(), 0);
506
507 // Temporarily increment handle count. This is required so that the following call to `pin`
508 // doesn't call `finalize` again.
509 self.handle_count.set(1);
510 unsafe {
511 // Pin and move the local bag into the global queue. It's important that `push_bag`
512 // doesn't defer destruction on any new garbage.
513 let guard = &self.pin();
514 self.global()
515 .push_bag(self.bag.with_mut(|b| &mut *b), guard);
516 }
517 // Revert the handle count back to zero.
518 self.handle_count.set(0);
519
520 unsafe {
521 // Take the reference to the `Global` out of this `Local`. Since we're not protected
522 // by a guard at this time, it's crucial that the reference is read before marking the
523 // `Local` as deleted.
524 let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
525
526 // Mark this node in the linked list as deleted.
527 self.entry.delete(unprotected());
528
529 // Finally, drop the reference to the global. Note that this might be the last reference
530 // to the `Global`. If so, the global data will be destroyed and all deferred functions
531 // in its queue will be executed.
532 drop(collector);
533 }
534 }
535}
536
537impl IsElement<Local> for Local {
538 fn entry_of(local: &Local) -> &Entry {
539 let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
540 unsafe { &*entry_ptr }
541 }
542
543 unsafe fn element_of(entry: &Entry) -> &Local {
544 // offset_of! macro uses unsafe, but it's unnecessary in this context.
545 #[allow(unused_unsafe)]
546 let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
547 &*local_ptr
548 }
549
550 unsafe fn finalize(entry: &Entry, guard: &Guard) {
551 guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
552 }
553}
554
555#[cfg(all(test, not(crossbeam_loom)))]
556mod tests {
557 use std::sync::atomic::{AtomicUsize, Ordering};
558
559 use super::*;
560
561 #[test]
562 fn check_defer() {
563 static FLAG: AtomicUsize = AtomicUsize::new(0);
564 fn set() {
565 FLAG.store(42, Ordering::Relaxed);
566 }
567
568 let d = Deferred::new(set);
569 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
570 d.call();
571 assert_eq!(FLAG.load(Ordering::Relaxed), 42);
572 }
573
574 #[test]
575 fn check_bag() {
576 static FLAG: AtomicUsize = AtomicUsize::new(0);
577 fn incr() {
578 FLAG.fetch_add(1, Ordering::Relaxed);
579 }
580
581 let mut bag = Bag::new();
582 assert!(bag.is_empty());
583
584 for _ in 0..MAX_OBJECTS {
585 assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
586 assert!(!bag.is_empty());
587 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
588 }
589
590 let result = unsafe { bag.try_push(Deferred::new(incr)) };
591 assert!(result.is_err());
592 assert!(!bag.is_empty());
593 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
594
595 drop(bag);
596 assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
597 }
598}
599