| 1 | //! The global data and participant for garbage collection. |
| 2 | //! |
| 3 | //! # Registration |
| 4 | //! |
| 5 | //! In order to track all participants in one place, we need some form of participant |
| 6 | //! registration. When a participant is created, it is registered to a global lock-free |
| 7 | //! singly-linked list of registries; and when a participant is leaving, it is unregistered from the |
| 8 | //! list. |
| 9 | //! |
| 10 | //! # Pinning |
| 11 | //! |
| 12 | //! Every participant contains an integer that tells whether the participant is pinned and if so, |
| 13 | //! what was the global epoch at the time it was pinned. Participants also hold a pin counter that |
| 14 | //! aids in periodic global epoch advancement. |
| 15 | //! |
| 16 | //! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned. |
| 17 | //! Guards are necessary for performing atomic operations, and for freeing/dropping locations. |
| 18 | //! |
| 19 | //! # Thread-local bag |
| 20 | //! |
| 21 | //! Objects that get unlinked from concurrent data structures must be stashed away until the global |
| 22 | //! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects |
| 23 | //! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current |
| 24 | //! global epoch and pushed into the global queue of bags. We store objects in thread-local storages |
| 25 | //! for amortizing the synchronization cost of pushing the garbages to a global queue. |
| 26 | //! |
| 27 | //! # Global queue |
| 28 | //! |
| 29 | //! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and |
| 30 | //! destroyed along the way. This design reduces contention on data structures. The global queue |
| 31 | //! cannot be explicitly accessed: the only way to interact with it is by calling functions |
| 32 | //! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers |
| 33 | //! garbage collection. |
| 34 | //! |
| 35 | //! Ideally each instance of concurrent data structure may have its own queue that gets fully |
| 36 | //! destroyed as soon as the data structure gets dropped. |
| 37 | |
| 38 | use crate::primitive::cell::UnsafeCell; |
| 39 | use crate::primitive::sync::atomic::{self, Ordering}; |
| 40 | use core::cell::Cell; |
| 41 | use core::mem::{self, ManuallyDrop}; |
| 42 | use core::num::Wrapping; |
| 43 | use core::{fmt, ptr}; |
| 44 | |
| 45 | use crossbeam_utils::CachePadded; |
| 46 | |
| 47 | use crate::atomic::{Owned, Shared}; |
| 48 | use crate::collector::{Collector, LocalHandle}; |
| 49 | use crate::deferred::Deferred; |
| 50 | use crate::epoch::{AtomicEpoch, Epoch}; |
| 51 | use crate::guard::{unprotected, Guard}; |
| 52 | use crate::sync::list::{Entry, IsElement, IterError, List}; |
| 53 | use crate::sync::queue::Queue; |
| 54 | |
| 55 | /// Maximum number of objects a bag can contain. |
| 56 | #[cfg (not(any(crossbeam_sanitize, miri)))] |
| 57 | const MAX_OBJECTS: usize = 64; |
| 58 | // Makes it more likely to trigger any potential data races. |
| 59 | #[cfg (any(crossbeam_sanitize, miri))] |
| 60 | const MAX_OBJECTS: usize = 4; |
| 61 | |
| 62 | /// A bag of deferred functions. |
| 63 | pub(crate) struct Bag { |
| 64 | /// Stashed objects. |
| 65 | deferreds: [Deferred; MAX_OBJECTS], |
| 66 | len: usize, |
| 67 | } |
| 68 | |
| 69 | /// `Bag::try_push()` requires that it is safe for another thread to execute the given functions. |
| 70 | unsafe impl Send for Bag {} |
| 71 | |
| 72 | impl Bag { |
| 73 | /// Returns a new, empty bag. |
| 74 | pub(crate) fn new() -> Self { |
| 75 | Self::default() |
| 76 | } |
| 77 | |
| 78 | /// Returns `true` if the bag is empty. |
| 79 | pub(crate) fn is_empty(&self) -> bool { |
| 80 | self.len == 0 |
| 81 | } |
| 82 | |
| 83 | /// Attempts to insert a deferred function into the bag. |
| 84 | /// |
| 85 | /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is |
| 86 | /// full. |
| 87 | /// |
| 88 | /// # Safety |
| 89 | /// |
| 90 | /// It should be safe for another thread to execute the given function. |
| 91 | pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> { |
| 92 | if self.len < MAX_OBJECTS { |
| 93 | self.deferreds[self.len] = deferred; |
| 94 | self.len += 1; |
| 95 | Ok(()) |
| 96 | } else { |
| 97 | Err(deferred) |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | /// Seals the bag with the given epoch. |
| 102 | fn seal(self, epoch: Epoch) -> SealedBag { |
| 103 | SealedBag { epoch, _bag: self } |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | impl Default for Bag { |
| 108 | fn default() -> Self { |
| 109 | Bag { |
| 110 | len: 0, |
| 111 | deferreds: [Deferred::NO_OP; MAX_OBJECTS], |
| 112 | } |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | impl Drop for Bag { |
| 117 | fn drop(&mut self) { |
| 118 | // Call all deferred functions. |
| 119 | for deferred: &mut Deferred in &mut self.deferreds[..self.len] { |
| 120 | let no_op: Deferred = Deferred::NO_OP; |
| 121 | let owned_deferred: Deferred = mem::replace(dest:deferred, src:no_op); |
| 122 | owned_deferred.call(); |
| 123 | } |
| 124 | } |
| 125 | } |
| 126 | |
| 127 | // can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long |
| 128 | impl fmt::Debug for Bag { |
| 129 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 130 | f&mut DebugStruct<'_, '_>.debug_struct("Bag" ) |
| 131 | .field(name:"deferreds" , &&self.deferreds[..self.len]) |
| 132 | .finish() |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | /// A pair of an epoch and a bag. |
| 137 | #[derive (Default, Debug)] |
| 138 | struct SealedBag { |
| 139 | epoch: Epoch, |
| 140 | _bag: Bag, |
| 141 | } |
| 142 | |
| 143 | /// It is safe to share `SealedBag` because `is_expired` only inspects the epoch. |
| 144 | unsafe impl Sync for SealedBag {} |
| 145 | |
| 146 | impl SealedBag { |
| 147 | /// Checks if it is safe to drop the bag w.r.t. the given global epoch. |
| 148 | fn is_expired(&self, global_epoch: Epoch) -> bool { |
| 149 | // A pinned participant can witness at most one epoch advancement. Therefore, any bag that |
| 150 | // is within one epoch of the current one cannot be destroyed yet. |
| 151 | global_epoch.wrapping_sub(self.epoch) >= 2 |
| 152 | } |
| 153 | } |
| 154 | |
| 155 | /// The global data for a garbage collector. |
| 156 | pub(crate) struct Global { |
| 157 | /// The intrusive linked list of `Local`s. |
| 158 | locals: List<Local>, |
| 159 | |
| 160 | /// The global queue of bags of deferred functions. |
| 161 | queue: Queue<SealedBag>, |
| 162 | |
| 163 | /// The global epoch. |
| 164 | pub(crate) epoch: CachePadded<AtomicEpoch>, |
| 165 | } |
| 166 | |
| 167 | impl Global { |
| 168 | /// Number of bags to destroy. |
| 169 | const COLLECT_STEPS: usize = 8; |
| 170 | |
| 171 | /// Creates a new global data for garbage collection. |
| 172 | #[inline ] |
| 173 | pub(crate) fn new() -> Self { |
| 174 | Self { |
| 175 | locals: List::new(), |
| 176 | queue: Queue::new(), |
| 177 | epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())), |
| 178 | } |
| 179 | } |
| 180 | |
| 181 | /// Pushes the bag into the global queue and replaces the bag with a new empty bag. |
| 182 | pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) { |
| 183 | let bag = mem::replace(bag, Bag::new()); |
| 184 | |
| 185 | atomic::fence(Ordering::SeqCst); |
| 186 | |
| 187 | let epoch = self.epoch.load(Ordering::Relaxed); |
| 188 | self.queue.push(bag.seal(epoch), guard); |
| 189 | } |
| 190 | |
| 191 | /// Collects several bags from the global queue and executes deferred functions in them. |
| 192 | /// |
| 193 | /// Note: This may itself produce garbage and in turn allocate new bags. |
| 194 | /// |
| 195 | /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold |
| 196 | /// path. In other words, we want the compiler to optimize branching for the case when |
| 197 | /// `collect()` is not called. |
| 198 | #[cold ] |
| 199 | pub(crate) fn collect(&self, guard: &Guard) { |
| 200 | let global_epoch = self.try_advance(guard); |
| 201 | |
| 202 | let steps = if cfg!(crossbeam_sanitize) { |
| 203 | usize::max_value() |
| 204 | } else { |
| 205 | Self::COLLECT_STEPS |
| 206 | }; |
| 207 | |
| 208 | for _ in 0..steps { |
| 209 | match self.queue.try_pop_if( |
| 210 | &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch), |
| 211 | guard, |
| 212 | ) { |
| 213 | None => break, |
| 214 | Some(sealed_bag) => drop(sealed_bag), |
| 215 | } |
| 216 | } |
| 217 | } |
| 218 | |
| 219 | /// Attempts to advance the global epoch. |
| 220 | /// |
| 221 | /// The global epoch can advance only if all currently pinned participants have been pinned in |
| 222 | /// the current epoch. |
| 223 | /// |
| 224 | /// Returns the current global epoch. |
| 225 | /// |
| 226 | /// `try_advance()` is annotated `#[cold]` because it is rarely called. |
| 227 | #[cold ] |
| 228 | pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch { |
| 229 | let global_epoch = self.epoch.load(Ordering::Relaxed); |
| 230 | atomic::fence(Ordering::SeqCst); |
| 231 | |
| 232 | // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly |
| 233 | // easy to implement in a lock-free manner. However, traversal can be slow due to cache |
| 234 | // misses and data dependencies. We should experiment with other data structures as well. |
| 235 | for local in self.locals.iter(guard) { |
| 236 | match local { |
| 237 | Err(IterError::Stalled) => { |
| 238 | // A concurrent thread stalled this iteration. That thread might also try to |
| 239 | // advance the epoch, in which case we leave the job to it. Otherwise, the |
| 240 | // epoch will not be advanced. |
| 241 | return global_epoch; |
| 242 | } |
| 243 | Ok(local) => { |
| 244 | let local_epoch = local.epoch.load(Ordering::Relaxed); |
| 245 | |
| 246 | // If the participant was pinned in a different epoch, we cannot advance the |
| 247 | // global epoch just yet. |
| 248 | if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch { |
| 249 | return global_epoch; |
| 250 | } |
| 251 | } |
| 252 | } |
| 253 | } |
| 254 | atomic::fence(Ordering::Acquire); |
| 255 | |
| 256 | // All pinned participants were pinned in the current global epoch. |
| 257 | // Now let's advance the global epoch... |
| 258 | // |
| 259 | // Note that if another thread already advanced it before us, this store will simply |
| 260 | // overwrite the global epoch with the same value. This is true because `try_advance` was |
| 261 | // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be |
| 262 | // advanced two steps ahead of it. |
| 263 | let new_epoch = global_epoch.successor(); |
| 264 | self.epoch.store(new_epoch, Ordering::Release); |
| 265 | new_epoch |
| 266 | } |
| 267 | } |
| 268 | |
| 269 | /// Participant for garbage collection. |
| 270 | #[repr (C)] // Note: `entry` must be the first field |
| 271 | pub(crate) struct Local { |
| 272 | /// A node in the intrusive linked list of `Local`s. |
| 273 | entry: Entry, |
| 274 | |
| 275 | /// A reference to the global data. |
| 276 | /// |
| 277 | /// When all guards and handles get dropped, this reference is destroyed. |
| 278 | collector: UnsafeCell<ManuallyDrop<Collector>>, |
| 279 | |
| 280 | /// The local bag of deferred functions. |
| 281 | pub(crate) bag: UnsafeCell<Bag>, |
| 282 | |
| 283 | /// The number of guards keeping this participant pinned. |
| 284 | guard_count: Cell<usize>, |
| 285 | |
| 286 | /// The number of active handles. |
| 287 | handle_count: Cell<usize>, |
| 288 | |
| 289 | /// Total number of pinnings performed. |
| 290 | /// |
| 291 | /// This is just an auxiliary counter that sometimes kicks off collection. |
| 292 | pin_count: Cell<Wrapping<usize>>, |
| 293 | |
| 294 | /// The local epoch. |
| 295 | epoch: CachePadded<AtomicEpoch>, |
| 296 | } |
| 297 | |
| 298 | // Make sure `Local` is less than or equal to 2048 bytes. |
| 299 | // https://github.com/crossbeam-rs/crossbeam/issues/551 |
| 300 | #[cfg (not(any(crossbeam_sanitize, miri)))] // `crossbeam_sanitize` and `miri` reduce the size of `Local` |
| 301 | #[test ] |
| 302 | fn local_size() { |
| 303 | // TODO: https://github.com/crossbeam-rs/crossbeam/issues/869 |
| 304 | // assert!( |
| 305 | // core::mem::size_of::<Local>() <= 2048, |
| 306 | // "An allocation of `Local` should be <= 2048 bytes." |
| 307 | // ); |
| 308 | } |
| 309 | |
| 310 | impl Local { |
| 311 | /// Number of pinnings after which a participant will execute some deferred functions from the |
| 312 | /// global queue. |
| 313 | const PINNINGS_BETWEEN_COLLECT: usize = 128; |
| 314 | |
| 315 | /// Registers a new `Local` in the provided `Global`. |
| 316 | pub(crate) fn register(collector: &Collector) -> LocalHandle { |
| 317 | unsafe { |
| 318 | // Since we dereference no pointers in this block, it is safe to use `unprotected`. |
| 319 | |
| 320 | let local = Owned::new(Local { |
| 321 | entry: Entry::default(), |
| 322 | collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())), |
| 323 | bag: UnsafeCell::new(Bag::new()), |
| 324 | guard_count: Cell::new(0), |
| 325 | handle_count: Cell::new(1), |
| 326 | pin_count: Cell::new(Wrapping(0)), |
| 327 | epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())), |
| 328 | }) |
| 329 | .into_shared(unprotected()); |
| 330 | collector.global.locals.insert(local, unprotected()); |
| 331 | LocalHandle { |
| 332 | local: local.as_raw(), |
| 333 | } |
| 334 | } |
| 335 | } |
| 336 | |
| 337 | /// Returns a reference to the `Global` in which this `Local` resides. |
| 338 | #[inline ] |
| 339 | pub(crate) fn global(&self) -> &Global { |
| 340 | &self.collector().global |
| 341 | } |
| 342 | |
| 343 | /// Returns a reference to the `Collector` in which this `Local` resides. |
| 344 | #[inline ] |
| 345 | pub(crate) fn collector(&self) -> &Collector { |
| 346 | self.collector.with(|c| unsafe { &**c }) |
| 347 | } |
| 348 | |
| 349 | /// Returns `true` if the current participant is pinned. |
| 350 | #[inline ] |
| 351 | pub(crate) fn is_pinned(&self) -> bool { |
| 352 | self.guard_count.get() > 0 |
| 353 | } |
| 354 | |
| 355 | /// Adds `deferred` to the thread-local bag. |
| 356 | /// |
| 357 | /// # Safety |
| 358 | /// |
| 359 | /// It should be safe for another thread to execute the given function. |
| 360 | pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) { |
| 361 | let bag = self.bag.with_mut(|b| &mut *b); |
| 362 | |
| 363 | while let Err(d) = bag.try_push(deferred) { |
| 364 | self.global().push_bag(bag, guard); |
| 365 | deferred = d; |
| 366 | } |
| 367 | } |
| 368 | |
| 369 | pub(crate) fn flush(&self, guard: &Guard) { |
| 370 | let bag = self.bag.with_mut(|b| unsafe { &mut *b }); |
| 371 | |
| 372 | if !bag.is_empty() { |
| 373 | self.global().push_bag(bag, guard); |
| 374 | } |
| 375 | |
| 376 | self.global().collect(guard); |
| 377 | } |
| 378 | |
| 379 | /// Pins the `Local`. |
| 380 | #[inline ] |
| 381 | pub(crate) fn pin(&self) -> Guard { |
| 382 | let guard = Guard { local: self }; |
| 383 | |
| 384 | let guard_count = self.guard_count.get(); |
| 385 | self.guard_count.set(guard_count.checked_add(1).unwrap()); |
| 386 | |
| 387 | if guard_count == 0 { |
| 388 | let global_epoch = self.global().epoch.load(Ordering::Relaxed); |
| 389 | let new_epoch = global_epoch.pinned(); |
| 390 | |
| 391 | // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence. |
| 392 | // The fence makes sure that any future loads from `Atomic`s will not happen before |
| 393 | // this store. |
| 394 | if cfg!(all( |
| 395 | any(target_arch = "x86" , target_arch = "x86_64" ), |
| 396 | not(miri) |
| 397 | )) { |
| 398 | // HACK(stjepang): On x86 architectures there are two different ways of executing |
| 399 | // a `SeqCst` fence. |
| 400 | // |
| 401 | // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. |
| 402 | // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` |
| 403 | // instruction. |
| 404 | // |
| 405 | // Both instructions have the effect of a full barrier, but benchmarks have shown |
| 406 | // that the second one makes pinning faster in this particular case. It is not |
| 407 | // clear that this is permitted by the C++ memory model (SC fences work very |
| 408 | // differently from SC accesses), but experimental evidence suggests that this |
| 409 | // works fine. Using inline assembly would be a viable (and correct) alternative, |
| 410 | // but alas, that is not possible on stable Rust. |
| 411 | let current = Epoch::starting(); |
| 412 | let res = self.epoch.compare_exchange( |
| 413 | current, |
| 414 | new_epoch, |
| 415 | Ordering::SeqCst, |
| 416 | Ordering::SeqCst, |
| 417 | ); |
| 418 | debug_assert!(res.is_ok(), "participant was expected to be unpinned" ); |
| 419 | // We add a compiler fence to make it less likely for LLVM to do something wrong |
| 420 | // here. Formally, this is not enough to get rid of data races; practically, |
| 421 | // it should go a long way. |
| 422 | atomic::compiler_fence(Ordering::SeqCst); |
| 423 | } else { |
| 424 | self.epoch.store(new_epoch, Ordering::Relaxed); |
| 425 | atomic::fence(Ordering::SeqCst); |
| 426 | } |
| 427 | |
| 428 | // Increment the pin counter. |
| 429 | let count = self.pin_count.get(); |
| 430 | self.pin_count.set(count + Wrapping(1)); |
| 431 | |
| 432 | // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting |
| 433 | // some garbage. |
| 434 | if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 { |
| 435 | self.global().collect(&guard); |
| 436 | } |
| 437 | } |
| 438 | |
| 439 | guard |
| 440 | } |
| 441 | |
| 442 | /// Unpins the `Local`. |
| 443 | #[inline ] |
| 444 | pub(crate) fn unpin(&self) { |
| 445 | let guard_count = self.guard_count.get(); |
| 446 | self.guard_count.set(guard_count - 1); |
| 447 | |
| 448 | if guard_count == 1 { |
| 449 | self.epoch.store(Epoch::starting(), Ordering::Release); |
| 450 | |
| 451 | if self.handle_count.get() == 0 { |
| 452 | self.finalize(); |
| 453 | } |
| 454 | } |
| 455 | } |
| 456 | |
| 457 | /// Unpins and then pins the `Local`. |
| 458 | #[inline ] |
| 459 | pub(crate) fn repin(&self) { |
| 460 | let guard_count = self.guard_count.get(); |
| 461 | |
| 462 | // Update the local epoch only if there's only one guard. |
| 463 | if guard_count == 1 { |
| 464 | let epoch = self.epoch.load(Ordering::Relaxed); |
| 465 | let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned(); |
| 466 | |
| 467 | // Update the local epoch only if the global epoch is greater than the local epoch. |
| 468 | if epoch != global_epoch { |
| 469 | // We store the new epoch with `Release` because we need to ensure any memory |
| 470 | // accesses from the previous epoch do not leak into the new one. |
| 471 | self.epoch.store(global_epoch, Ordering::Release); |
| 472 | |
| 473 | // However, we don't need a following `SeqCst` fence, because it is safe for memory |
| 474 | // accesses from the new epoch to be executed before updating the local epoch. At |
| 475 | // worse, other threads will see the new epoch late and delay GC slightly. |
| 476 | } |
| 477 | } |
| 478 | } |
| 479 | |
| 480 | /// Increments the handle count. |
| 481 | #[inline ] |
| 482 | pub(crate) fn acquire_handle(&self) { |
| 483 | let handle_count = self.handle_count.get(); |
| 484 | debug_assert!(handle_count >= 1); |
| 485 | self.handle_count.set(handle_count + 1); |
| 486 | } |
| 487 | |
| 488 | /// Decrements the handle count. |
| 489 | #[inline ] |
| 490 | pub(crate) fn release_handle(&self) { |
| 491 | let guard_count = self.guard_count.get(); |
| 492 | let handle_count = self.handle_count.get(); |
| 493 | debug_assert!(handle_count >= 1); |
| 494 | self.handle_count.set(handle_count - 1); |
| 495 | |
| 496 | if guard_count == 0 && handle_count == 1 { |
| 497 | self.finalize(); |
| 498 | } |
| 499 | } |
| 500 | |
| 501 | /// Removes the `Local` from the global linked list. |
| 502 | #[cold ] |
| 503 | fn finalize(&self) { |
| 504 | debug_assert_eq!(self.guard_count.get(), 0); |
| 505 | debug_assert_eq!(self.handle_count.get(), 0); |
| 506 | |
| 507 | // Temporarily increment handle count. This is required so that the following call to `pin` |
| 508 | // doesn't call `finalize` again. |
| 509 | self.handle_count.set(1); |
| 510 | unsafe { |
| 511 | // Pin and move the local bag into the global queue. It's important that `push_bag` |
| 512 | // doesn't defer destruction on any new garbage. |
| 513 | let guard = &self.pin(); |
| 514 | self.global() |
| 515 | .push_bag(self.bag.with_mut(|b| &mut *b), guard); |
| 516 | } |
| 517 | // Revert the handle count back to zero. |
| 518 | self.handle_count.set(0); |
| 519 | |
| 520 | unsafe { |
| 521 | // Take the reference to the `Global` out of this `Local`. Since we're not protected |
| 522 | // by a guard at this time, it's crucial that the reference is read before marking the |
| 523 | // `Local` as deleted. |
| 524 | let collector: Collector = ptr::read(self.collector.with(|c| &*(*c))); |
| 525 | |
| 526 | // Mark this node in the linked list as deleted. |
| 527 | self.entry.delete(unprotected()); |
| 528 | |
| 529 | // Finally, drop the reference to the global. Note that this might be the last reference |
| 530 | // to the `Global`. If so, the global data will be destroyed and all deferred functions |
| 531 | // in its queue will be executed. |
| 532 | drop(collector); |
| 533 | } |
| 534 | } |
| 535 | } |
| 536 | |
| 537 | impl IsElement<Self> for Local { |
| 538 | fn entry_of(local: &Self) -> &Entry { |
| 539 | // SAFETY: `Local` is `repr(C)` and `entry` is the first field of it. |
| 540 | unsafe { |
| 541 | let entry_ptr: *const Entry = (local as *const Self).cast::<Entry>(); |
| 542 | &*entry_ptr |
| 543 | } |
| 544 | } |
| 545 | |
| 546 | unsafe fn element_of(entry: &Entry) -> &Self { |
| 547 | // SAFETY: `Local` is `repr(C)` and `entry` is the first field of it. |
| 548 | let local_ptr: *const Local = (entry as *const Entry).cast::<Self>(); |
| 549 | &*local_ptr |
| 550 | } |
| 551 | |
| 552 | unsafe fn finalize(entry: &Entry, guard: &Guard) { |
| 553 | guard.defer_destroy(ptr:Shared::from(Self::element_of(entry) as *const _)); |
| 554 | } |
| 555 | } |
| 556 | |
| 557 | #[cfg (all(test, not(crossbeam_loom)))] |
| 558 | mod tests { |
| 559 | use std::sync::atomic::{AtomicUsize, Ordering}; |
| 560 | |
| 561 | use super::*; |
| 562 | |
| 563 | #[test ] |
| 564 | fn check_defer() { |
| 565 | static FLAG: AtomicUsize = AtomicUsize::new(0); |
| 566 | fn set() { |
| 567 | FLAG.store(42, Ordering::Relaxed); |
| 568 | } |
| 569 | |
| 570 | let d = Deferred::new(set); |
| 571 | assert_eq!(FLAG.load(Ordering::Relaxed), 0); |
| 572 | d.call(); |
| 573 | assert_eq!(FLAG.load(Ordering::Relaxed), 42); |
| 574 | } |
| 575 | |
| 576 | #[test ] |
| 577 | fn check_bag() { |
| 578 | static FLAG: AtomicUsize = AtomicUsize::new(0); |
| 579 | fn incr() { |
| 580 | FLAG.fetch_add(1, Ordering::Relaxed); |
| 581 | } |
| 582 | |
| 583 | let mut bag = Bag::new(); |
| 584 | assert!(bag.is_empty()); |
| 585 | |
| 586 | for _ in 0..MAX_OBJECTS { |
| 587 | assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() }); |
| 588 | assert!(!bag.is_empty()); |
| 589 | assert_eq!(FLAG.load(Ordering::Relaxed), 0); |
| 590 | } |
| 591 | |
| 592 | let result = unsafe { bag.try_push(Deferred::new(incr)) }; |
| 593 | assert!(result.is_err()); |
| 594 | assert!(!bag.is_empty()); |
| 595 | assert_eq!(FLAG.load(Ordering::Relaxed), 0); |
| 596 | |
| 597 | drop(bag); |
| 598 | assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS); |
| 599 | } |
| 600 | } |
| 601 | |