1//! The global data and participant for garbage collection.
2//!
3//! # Registration
4//!
5//! In order to track all participants in one place, we need some form of participant
6//! registration. When a participant is created, it is registered to a global lock-free
7//! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
8//! list.
9//!
10//! # Pinning
11//!
12//! Every participant contains an integer that tells whether the participant is pinned and if so,
13//! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
14//! aids in periodic global epoch advancement.
15//!
16//! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
17//! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
18//!
19//! # Thread-local bag
20//!
21//! Objects that get unlinked from concurrent data structures must be stashed away until the global
22//! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
23//! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
24//! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
25//! for amortizing the synchronization cost of pushing the garbages to a global queue.
26//!
27//! # Global queue
28//!
29//! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
30//! destroyed along the way. This design reduces contention on data structures. The global queue
31//! cannot be explicitly accessed: the only way to interact with it is by calling functions
32//! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers
33//! garbage collection.
34//!
35//! Ideally each instance of concurrent data structure may have its own queue that gets fully
36//! destroyed as soon as the data structure gets dropped.
37
38use crate::primitive::cell::UnsafeCell;
39use crate::primitive::sync::atomic;
40use core::cell::Cell;
41use core::mem::{self, ManuallyDrop};
42use core::num::Wrapping;
43use core::sync::atomic::Ordering;
44use core::{fmt, ptr};
45
46use crossbeam_utils::CachePadded;
47use memoffset::offset_of;
48
49use crate::atomic::{Owned, Shared};
50use crate::collector::{Collector, LocalHandle};
51use crate::deferred::Deferred;
52use crate::epoch::{AtomicEpoch, Epoch};
53use crate::guard::{unprotected, Guard};
54use crate::sync::list::{Entry, IsElement, IterError, List};
55use crate::sync::queue::Queue;
56
57/// Maximum number of objects a bag can contain.
58#[cfg(not(any(crossbeam_sanitize, miri)))]
59const MAX_OBJECTS: usize = 64;
60// Makes it more likely to trigger any potential data races.
61#[cfg(any(crossbeam_sanitize, miri))]
62const MAX_OBJECTS: usize = 4;
63
64/// A bag of deferred functions.
65pub(crate) struct Bag {
66 /// Stashed objects.
67 deferreds: [Deferred; MAX_OBJECTS],
68 len: usize,
69}
70
71/// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
72unsafe impl Send for Bag {}
73
74impl Bag {
75 /// Returns a new, empty bag.
76 pub(crate) fn new() -> Self {
77 Self::default()
78 }
79
80 /// Returns `true` if the bag is empty.
81 pub(crate) fn is_empty(&self) -> bool {
82 self.len == 0
83 }
84
85 /// Attempts to insert a deferred function into the bag.
86 ///
87 /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
88 /// full.
89 ///
90 /// # Safety
91 ///
92 /// It should be safe for another thread to execute the given function.
93 pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
94 if self.len < MAX_OBJECTS {
95 self.deferreds[self.len] = deferred;
96 self.len += 1;
97 Ok(())
98 } else {
99 Err(deferred)
100 }
101 }
102
103 /// Seals the bag with the given epoch.
104 fn seal(self, epoch: Epoch) -> SealedBag {
105 SealedBag { epoch, _bag: self }
106 }
107}
108
109impl Default for Bag {
110 fn default() -> Self {
111 Bag {
112 len: 0,
113 deferreds: [Deferred::NO_OP; MAX_OBJECTS],
114 }
115 }
116}
117
118impl Drop for Bag {
119 fn drop(&mut self) {
120 // Call all deferred functions.
121 for deferred: &mut Deferred in &mut self.deferreds[..self.len] {
122 let no_op: Deferred = Deferred::NO_OP;
123 let owned_deferred: Deferred = mem::replace(dest:deferred, src:no_op);
124 owned_deferred.call();
125 }
126 }
127}
128
129// can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long
130impl fmt::Debug for Bag {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 f&mut DebugStruct<'_, '_>.debug_struct("Bag")
133 .field(name:"deferreds", &&self.deferreds[..self.len])
134 .finish()
135 }
136}
137
138/// A pair of an epoch and a bag.
139#[derive(Default, Debug)]
140struct SealedBag {
141 epoch: Epoch,
142 _bag: Bag,
143}
144
145/// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
146unsafe impl Sync for SealedBag {}
147
148impl SealedBag {
149 /// Checks if it is safe to drop the bag w.r.t. the given global epoch.
150 fn is_expired(&self, global_epoch: Epoch) -> bool {
151 // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
152 // is within one epoch of the current one cannot be destroyed yet.
153 global_epoch.wrapping_sub(self.epoch) >= 2
154 }
155}
156
157/// The global data for a garbage collector.
158pub(crate) struct Global {
159 /// The intrusive linked list of `Local`s.
160 locals: List<Local>,
161
162 /// The global queue of bags of deferred functions.
163 queue: Queue<SealedBag>,
164
165 /// The global epoch.
166 pub(crate) epoch: CachePadded<AtomicEpoch>,
167}
168
169impl Global {
170 /// Number of bags to destroy.
171 const COLLECT_STEPS: usize = 8;
172
173 /// Creates a new global data for garbage collection.
174 #[inline]
175 pub(crate) fn new() -> Self {
176 Self {
177 locals: List::new(),
178 queue: Queue::new(),
179 epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
180 }
181 }
182
183 /// Pushes the bag into the global queue and replaces the bag with a new empty bag.
184 pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
185 let bag = mem::replace(bag, Bag::new());
186
187 atomic::fence(Ordering::SeqCst);
188
189 let epoch = self.epoch.load(Ordering::Relaxed);
190 self.queue.push(bag.seal(epoch), guard);
191 }
192
193 /// Collects several bags from the global queue and executes deferred functions in them.
194 ///
195 /// Note: This may itself produce garbage and in turn allocate new bags.
196 ///
197 /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
198 /// path. In other words, we want the compiler to optimize branching for the case when
199 /// `collect()` is not called.
200 #[cold]
201 pub(crate) fn collect(&self, guard: &Guard) {
202 let global_epoch = self.try_advance(guard);
203
204 let steps = if cfg!(crossbeam_sanitize) {
205 usize::max_value()
206 } else {
207 Self::COLLECT_STEPS
208 };
209
210 for _ in 0..steps {
211 match self.queue.try_pop_if(
212 &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
213 guard,
214 ) {
215 None => break,
216 Some(sealed_bag) => drop(sealed_bag),
217 }
218 }
219 }
220
221 /// Attempts to advance the global epoch.
222 ///
223 /// The global epoch can advance only if all currently pinned participants have been pinned in
224 /// the current epoch.
225 ///
226 /// Returns the current global epoch.
227 ///
228 /// `try_advance()` is annotated `#[cold]` because it is rarely called.
229 #[cold]
230 pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
231 let global_epoch = self.epoch.load(Ordering::Relaxed);
232 atomic::fence(Ordering::SeqCst);
233
234 // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
235 // easy to implement in a lock-free manner. However, traversal can be slow due to cache
236 // misses and data dependencies. We should experiment with other data structures as well.
237 for local in self.locals.iter(guard) {
238 match local {
239 Err(IterError::Stalled) => {
240 // A concurrent thread stalled this iteration. That thread might also try to
241 // advance the epoch, in which case we leave the job to it. Otherwise, the
242 // epoch will not be advanced.
243 return global_epoch;
244 }
245 Ok(local) => {
246 let local_epoch = local.epoch.load(Ordering::Relaxed);
247
248 // If the participant was pinned in a different epoch, we cannot advance the
249 // global epoch just yet.
250 if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
251 return global_epoch;
252 }
253 }
254 }
255 }
256 atomic::fence(Ordering::Acquire);
257
258 // All pinned participants were pinned in the current global epoch.
259 // Now let's advance the global epoch...
260 //
261 // Note that if another thread already advanced it before us, this store will simply
262 // overwrite the global epoch with the same value. This is true because `try_advance` was
263 // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
264 // advanced two steps ahead of it.
265 let new_epoch = global_epoch.successor();
266 self.epoch.store(new_epoch, Ordering::Release);
267 new_epoch
268 }
269}
270
271/// Participant for garbage collection.
272pub(crate) struct Local {
273 /// A node in the intrusive linked list of `Local`s.
274 entry: Entry,
275
276 /// The local epoch.
277 epoch: AtomicEpoch,
278
279 /// A reference to the global data.
280 ///
281 /// When all guards and handles get dropped, this reference is destroyed.
282 collector: UnsafeCell<ManuallyDrop<Collector>>,
283
284 /// The local bag of deferred functions.
285 pub(crate) bag: UnsafeCell<Bag>,
286
287 /// The number of guards keeping this participant pinned.
288 guard_count: Cell<usize>,
289
290 /// The number of active handles.
291 handle_count: Cell<usize>,
292
293 /// Total number of pinnings performed.
294 ///
295 /// This is just an auxiliary counter that sometimes kicks off collection.
296 pin_count: Cell<Wrapping<usize>>,
297}
298
299// Make sure `Local` is less than or equal to 2048 bytes.
300// https://github.com/crossbeam-rs/crossbeam/issues/551
301#[cfg(not(any(crossbeam_sanitize, miri)))] // `crossbeam_sanitize` and `miri` reduce the size of `Local`
302#[test]
303fn local_size() {
304 // TODO: https://github.com/crossbeam-rs/crossbeam/issues/869
305 // assert!(
306 // core::mem::size_of::<Local>() <= 2048,
307 // "An allocation of `Local` should be <= 2048 bytes."
308 // );
309}
310
311impl Local {
312 /// Number of pinnings after which a participant will execute some deferred functions from the
313 /// global queue.
314 const PINNINGS_BETWEEN_COLLECT: usize = 128;
315
316 /// Registers a new `Local` in the provided `Global`.
317 pub(crate) fn register(collector: &Collector) -> LocalHandle {
318 unsafe {
319 // Since we dereference no pointers in this block, it is safe to use `unprotected`.
320
321 let local = Owned::new(Local {
322 entry: Entry::default(),
323 epoch: AtomicEpoch::new(Epoch::starting()),
324 collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
325 bag: UnsafeCell::new(Bag::new()),
326 guard_count: Cell::new(0),
327 handle_count: Cell::new(1),
328 pin_count: Cell::new(Wrapping(0)),
329 })
330 .into_shared(unprotected());
331 collector.global.locals.insert(local, unprotected());
332 LocalHandle {
333 local: local.as_raw(),
334 }
335 }
336 }
337
338 /// Returns a reference to the `Global` in which this `Local` resides.
339 #[inline]
340 pub(crate) fn global(&self) -> &Global {
341 &self.collector().global
342 }
343
344 /// Returns a reference to the `Collector` in which this `Local` resides.
345 #[inline]
346 pub(crate) fn collector(&self) -> &Collector {
347 self.collector.with(|c| unsafe { &**c })
348 }
349
350 /// Returns `true` if the current participant is pinned.
351 #[inline]
352 pub(crate) fn is_pinned(&self) -> bool {
353 self.guard_count.get() > 0
354 }
355
356 /// Adds `deferred` to the thread-local bag.
357 ///
358 /// # Safety
359 ///
360 /// It should be safe for another thread to execute the given function.
361 pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
362 let bag = self.bag.with_mut(|b| &mut *b);
363
364 while let Err(d) = bag.try_push(deferred) {
365 self.global().push_bag(bag, guard);
366 deferred = d;
367 }
368 }
369
370 pub(crate) fn flush(&self, guard: &Guard) {
371 let bag = self.bag.with_mut(|b| unsafe { &mut *b });
372
373 if !bag.is_empty() {
374 self.global().push_bag(bag, guard);
375 }
376
377 self.global().collect(guard);
378 }
379
380 /// Pins the `Local`.
381 #[inline]
382 pub(crate) fn pin(&self) -> Guard {
383 let guard = Guard { local: self };
384
385 let guard_count = self.guard_count.get();
386 self.guard_count.set(guard_count.checked_add(1).unwrap());
387
388 if guard_count == 0 {
389 let global_epoch = self.global().epoch.load(Ordering::Relaxed);
390 let new_epoch = global_epoch.pinned();
391
392 // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
393 // The fence makes sure that any future loads from `Atomic`s will not happen before
394 // this store.
395 if cfg!(all(
396 any(target_arch = "x86", target_arch = "x86_64"),
397 not(miri)
398 )) {
399 // HACK(stjepang): On x86 architectures there are two different ways of executing
400 // a `SeqCst` fence.
401 //
402 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
403 // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
404 // instruction.
405 //
406 // Both instructions have the effect of a full barrier, but benchmarks have shown
407 // that the second one makes pinning faster in this particular case. It is not
408 // clear that this is permitted by the C++ memory model (SC fences work very
409 // differently from SC accesses), but experimental evidence suggests that this
410 // works fine. Using inline assembly would be a viable (and correct) alternative,
411 // but alas, that is not possible on stable Rust.
412 let current = Epoch::starting();
413 let res = self.epoch.compare_exchange(
414 current,
415 new_epoch,
416 Ordering::SeqCst,
417 Ordering::SeqCst,
418 );
419 debug_assert!(res.is_ok(), "participant was expected to be unpinned");
420 // We add a compiler fence to make it less likely for LLVM to do something wrong
421 // here. Formally, this is not enough to get rid of data races; practically,
422 // it should go a long way.
423 atomic::compiler_fence(Ordering::SeqCst);
424 } else {
425 self.epoch.store(new_epoch, Ordering::Relaxed);
426 atomic::fence(Ordering::SeqCst);
427 }
428
429 // Increment the pin counter.
430 let count = self.pin_count.get();
431 self.pin_count.set(count + Wrapping(1));
432
433 // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
434 // some garbage.
435 if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
436 self.global().collect(&guard);
437 }
438 }
439
440 guard
441 }
442
443 /// Unpins the `Local`.
444 #[inline]
445 pub(crate) fn unpin(&self) {
446 let guard_count = self.guard_count.get();
447 self.guard_count.set(guard_count - 1);
448
449 if guard_count == 1 {
450 self.epoch.store(Epoch::starting(), Ordering::Release);
451
452 if self.handle_count.get() == 0 {
453 self.finalize();
454 }
455 }
456 }
457
458 /// Unpins and then pins the `Local`.
459 #[inline]
460 pub(crate) fn repin(&self) {
461 let guard_count = self.guard_count.get();
462
463 // Update the local epoch only if there's only one guard.
464 if guard_count == 1 {
465 let epoch = self.epoch.load(Ordering::Relaxed);
466 let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
467
468 // Update the local epoch only if the global epoch is greater than the local epoch.
469 if epoch != global_epoch {
470 // We store the new epoch with `Release` because we need to ensure any memory
471 // accesses from the previous epoch do not leak into the new one.
472 self.epoch.store(global_epoch, Ordering::Release);
473
474 // However, we don't need a following `SeqCst` fence, because it is safe for memory
475 // accesses from the new epoch to be executed before updating the local epoch. At
476 // worse, other threads will see the new epoch late and delay GC slightly.
477 }
478 }
479 }
480
481 /// Increments the handle count.
482 #[inline]
483 pub(crate) fn acquire_handle(&self) {
484 let handle_count = self.handle_count.get();
485 debug_assert!(handle_count >= 1);
486 self.handle_count.set(handle_count + 1);
487 }
488
489 /// Decrements the handle count.
490 #[inline]
491 pub(crate) fn release_handle(&self) {
492 let guard_count = self.guard_count.get();
493 let handle_count = self.handle_count.get();
494 debug_assert!(handle_count >= 1);
495 self.handle_count.set(handle_count - 1);
496
497 if guard_count == 0 && handle_count == 1 {
498 self.finalize();
499 }
500 }
501
502 /// Removes the `Local` from the global linked list.
503 #[cold]
504 fn finalize(&self) {
505 debug_assert_eq!(self.guard_count.get(), 0);
506 debug_assert_eq!(self.handle_count.get(), 0);
507
508 // Temporarily increment handle count. This is required so that the following call to `pin`
509 // doesn't call `finalize` again.
510 self.handle_count.set(1);
511 unsafe {
512 // Pin and move the local bag into the global queue. It's important that `push_bag`
513 // doesn't defer destruction on any new garbage.
514 let guard = &self.pin();
515 self.global()
516 .push_bag(self.bag.with_mut(|b| &mut *b), guard);
517 }
518 // Revert the handle count back to zero.
519 self.handle_count.set(0);
520
521 unsafe {
522 // Take the reference to the `Global` out of this `Local`. Since we're not protected
523 // by a guard at this time, it's crucial that the reference is read before marking the
524 // `Local` as deleted.
525 let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
526
527 // Mark this node in the linked list as deleted.
528 self.entry.delete(unprotected());
529
530 // Finally, drop the reference to the global. Note that this might be the last reference
531 // to the `Global`. If so, the global data will be destroyed and all deferred functions
532 // in its queue will be executed.
533 drop(collector);
534 }
535 }
536}
537
538impl IsElement<Local> for Local {
539 fn entry_of(local: &Local) -> &Entry {
540 let entry_ptr: *const Entry = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
541 unsafe { &*entry_ptr }
542 }
543
544 unsafe fn element_of(entry: &Entry) -> &Local {
545 // offset_of! macro uses unsafe, but it's unnecessary in this context.
546 #[allow(unused_unsafe)]
547 let local_ptr: *const Local = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
548 &*local_ptr
549 }
550
551 unsafe fn finalize(entry: &Entry, guard: &Guard) {
552 guard.defer_destroy(ptr:Shared::from(Self::element_of(entry) as *const _));
553 }
554}
555
556#[cfg(all(test, not(crossbeam_loom)))]
557mod tests {
558 use std::sync::atomic::{AtomicUsize, Ordering};
559
560 use super::*;
561
562 #[test]
563 fn check_defer() {
564 static FLAG: AtomicUsize = AtomicUsize::new(0);
565 fn set() {
566 FLAG.store(42, Ordering::Relaxed);
567 }
568
569 let d = Deferred::new(set);
570 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
571 d.call();
572 assert_eq!(FLAG.load(Ordering::Relaxed), 42);
573 }
574
575 #[test]
576 fn check_bag() {
577 static FLAG: AtomicUsize = AtomicUsize::new(0);
578 fn incr() {
579 FLAG.fetch_add(1, Ordering::Relaxed);
580 }
581
582 let mut bag = Bag::new();
583 assert!(bag.is_empty());
584
585 for _ in 0..MAX_OBJECTS {
586 assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
587 assert!(!bag.is_empty());
588 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
589 }
590
591 let result = unsafe { bag.try_push(Deferred::new(incr)) };
592 assert!(result.is_err());
593 assert!(!bag.is_empty());
594 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
595
596 drop(bag);
597 assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
598 }
599}
600