1/// Epoch-based garbage collector.
2///
3/// # Examples
4///
5/// ```
6/// use crossbeam_epoch::Collector;
7///
8/// let collector = Collector::new();
9///
10/// let handle = collector.register();
11/// drop(collector); // `handle` still works after dropping `collector`
12///
13/// handle.pin().flush();
14/// ```
15use core::fmt;
16
17use crate::guard::Guard;
18use crate::internal::{Global, Local};
19use crate::primitive::sync::Arc;
20
21/// An epoch-based garbage collector.
22pub struct Collector {
23 pub(crate) global: Arc<Global>,
24}
25
26unsafe impl Send for Collector {}
27unsafe impl Sync for Collector {}
28
29impl Default for Collector {
30 fn default() -> Self {
31 Self {
32 global: Arc::new(data:Global::new()),
33 }
34 }
35}
36
37impl Collector {
38 /// Creates a new collector.
39 pub fn new() -> Self {
40 Self::default()
41 }
42
43 /// Registers a new handle for the collector.
44 pub fn register(&self) -> LocalHandle {
45 Local::register(self)
46 }
47}
48
49impl Clone for Collector {
50 /// Creates another reference to the same garbage collector.
51 fn clone(&self) -> Self {
52 Collector {
53 global: self.global.clone(),
54 }
55 }
56}
57
58impl fmt::Debug for Collector {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.pad("Collector { .. }")
61 }
62}
63
64impl PartialEq for Collector {
65 /// Checks if both handles point to the same collector.
66 fn eq(&self, rhs: &Collector) -> bool {
67 Arc::ptr_eq(&self.global, &rhs.global)
68 }
69}
70impl Eq for Collector {}
71
72/// A handle to a garbage collector.
73pub struct LocalHandle {
74 pub(crate) local: *const Local,
75}
76
77impl LocalHandle {
78 /// Pins the handle.
79 #[inline]
80 pub fn pin(&self) -> Guard {
81 unsafe { (*self.local).pin() }
82 }
83
84 /// Returns `true` if the handle is pinned.
85 #[inline]
86 pub fn is_pinned(&self) -> bool {
87 unsafe { (*self.local).is_pinned() }
88 }
89
90 /// Returns the `Collector` associated with this handle.
91 #[inline]
92 pub fn collector(&self) -> &Collector {
93 unsafe { (*self.local).collector() }
94 }
95}
96
97impl Drop for LocalHandle {
98 #[inline]
99 fn drop(&mut self) {
100 unsafe {
101 Local::release_handle(&*self.local);
102 }
103 }
104}
105
106impl fmt::Debug for LocalHandle {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 f.pad("LocalHandle { .. }")
109 }
110}
111
112#[cfg(all(test, not(crossbeam_loom)))]
113mod tests {
114 use std::mem::ManuallyDrop;
115 use std::sync::atomic::{AtomicUsize, Ordering};
116
117 use crossbeam_utils::thread;
118
119 use crate::{Collector, Owned};
120
121 const NUM_THREADS: usize = 8;
122
123 #[test]
124 fn pin_reentrant() {
125 let collector = Collector::new();
126 let handle = collector.register();
127 drop(collector);
128
129 assert!(!handle.is_pinned());
130 {
131 let _guard = &handle.pin();
132 assert!(handle.is_pinned());
133 {
134 let _guard = &handle.pin();
135 assert!(handle.is_pinned());
136 }
137 assert!(handle.is_pinned());
138 }
139 assert!(!handle.is_pinned());
140 }
141
142 #[test]
143 fn flush_local_bag() {
144 let collector = Collector::new();
145 let handle = collector.register();
146 drop(collector);
147
148 for _ in 0..100 {
149 let guard = &handle.pin();
150 unsafe {
151 let a = Owned::new(7).into_shared(guard);
152 guard.defer_destroy(a);
153
154 assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
155
156 while !(*guard.local).bag.with(|b| (*b).is_empty()) {
157 guard.flush();
158 }
159 }
160 }
161 }
162
163 #[test]
164 fn garbage_buffering() {
165 let collector = Collector::new();
166 let handle = collector.register();
167 drop(collector);
168
169 let guard = &handle.pin();
170 unsafe {
171 for _ in 0..10 {
172 let a = Owned::new(7).into_shared(guard);
173 guard.defer_destroy(a);
174 }
175 assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
176 }
177 }
178
179 #[test]
180 fn pin_holds_advance() {
181 #[cfg(miri)]
182 const N: usize = 500;
183 #[cfg(not(miri))]
184 const N: usize = 500_000;
185
186 let collector = Collector::new();
187
188 thread::scope(|scope| {
189 for _ in 0..NUM_THREADS {
190 scope.spawn(|_| {
191 let handle = collector.register();
192 for _ in 0..N {
193 let guard = &handle.pin();
194
195 let before = collector.global.epoch.load(Ordering::Relaxed);
196 collector.global.collect(guard);
197 let after = collector.global.epoch.load(Ordering::Relaxed);
198
199 assert!(after.wrapping_sub(before) <= 2);
200 }
201 });
202 }
203 })
204 .unwrap();
205 }
206
207 #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to `cfg(crossbeam_sanitize)` reduce `internal::MAX_OBJECTS`
208 #[test]
209 fn incremental() {
210 #[cfg(miri)]
211 const COUNT: usize = 500;
212 #[cfg(not(miri))]
213 const COUNT: usize = 100_000;
214 static DESTROYS: AtomicUsize = AtomicUsize::new(0);
215
216 let collector = Collector::new();
217 let handle = collector.register();
218
219 unsafe {
220 let guard = &handle.pin();
221 for _ in 0..COUNT {
222 let a = Owned::new(7i32).into_shared(guard);
223 guard.defer_unchecked(move || {
224 drop(a.into_owned());
225 DESTROYS.fetch_add(1, Ordering::Relaxed);
226 });
227 }
228 guard.flush();
229 }
230
231 let mut last = 0;
232
233 while last < COUNT {
234 let curr = DESTROYS.load(Ordering::Relaxed);
235 assert!(curr - last <= 1024);
236 last = curr;
237
238 let guard = &handle.pin();
239 collector.global.collect(guard);
240 }
241 assert!(DESTROYS.load(Ordering::Relaxed) == COUNT);
242 }
243
244 #[test]
245 fn buffering() {
246 const COUNT: usize = 10;
247 #[cfg(miri)]
248 const N: usize = 500;
249 #[cfg(not(miri))]
250 const N: usize = 100_000;
251 static DESTROYS: AtomicUsize = AtomicUsize::new(0);
252
253 let collector = Collector::new();
254 let handle = collector.register();
255
256 unsafe {
257 let guard = &handle.pin();
258 for _ in 0..COUNT {
259 let a = Owned::new(7i32).into_shared(guard);
260 guard.defer_unchecked(move || {
261 drop(a.into_owned());
262 DESTROYS.fetch_add(1, Ordering::Relaxed);
263 });
264 }
265 }
266
267 for _ in 0..N {
268 collector.global.collect(&handle.pin());
269 }
270 assert!(DESTROYS.load(Ordering::Relaxed) < COUNT);
271
272 handle.pin().flush();
273
274 while DESTROYS.load(Ordering::Relaxed) < COUNT {
275 let guard = &handle.pin();
276 collector.global.collect(guard);
277 }
278 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
279 }
280
281 #[test]
282 fn count_drops() {
283 #[cfg(miri)]
284 const COUNT: usize = 500;
285 #[cfg(not(miri))]
286 const COUNT: usize = 100_000;
287 static DROPS: AtomicUsize = AtomicUsize::new(0);
288
289 struct Elem(i32);
290
291 impl Drop for Elem {
292 fn drop(&mut self) {
293 DROPS.fetch_add(1, Ordering::Relaxed);
294 }
295 }
296
297 let collector = Collector::new();
298 let handle = collector.register();
299
300 unsafe {
301 let guard = &handle.pin();
302
303 for _ in 0..COUNT {
304 let a = Owned::new(Elem(7i32)).into_shared(guard);
305 guard.defer_destroy(a);
306 }
307 guard.flush();
308 }
309
310 while DROPS.load(Ordering::Relaxed) < COUNT {
311 let guard = &handle.pin();
312 collector.global.collect(guard);
313 }
314 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
315 }
316
317 #[test]
318 fn count_destroy() {
319 #[cfg(miri)]
320 const COUNT: usize = 500;
321 #[cfg(not(miri))]
322 const COUNT: usize = 100_000;
323 static DESTROYS: AtomicUsize = AtomicUsize::new(0);
324
325 let collector = Collector::new();
326 let handle = collector.register();
327
328 unsafe {
329 let guard = &handle.pin();
330
331 for _ in 0..COUNT {
332 let a = Owned::new(7i32).into_shared(guard);
333 guard.defer_unchecked(move || {
334 drop(a.into_owned());
335 DESTROYS.fetch_add(1, Ordering::Relaxed);
336 });
337 }
338 guard.flush();
339 }
340
341 while DESTROYS.load(Ordering::Relaxed) < COUNT {
342 let guard = &handle.pin();
343 collector.global.collect(guard);
344 }
345 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
346 }
347
348 #[test]
349 fn drop_array() {
350 const COUNT: usize = 700;
351 static DROPS: AtomicUsize = AtomicUsize::new(0);
352
353 struct Elem(i32);
354
355 impl Drop for Elem {
356 fn drop(&mut self) {
357 DROPS.fetch_add(1, Ordering::Relaxed);
358 }
359 }
360
361 let collector = Collector::new();
362 let handle = collector.register();
363
364 let mut guard = handle.pin();
365
366 let mut v = Vec::with_capacity(COUNT);
367 for i in 0..COUNT {
368 v.push(Elem(i as i32));
369 }
370
371 {
372 let a = Owned::new(v).into_shared(&guard);
373 unsafe {
374 guard.defer_destroy(a);
375 }
376 guard.flush();
377 }
378
379 while DROPS.load(Ordering::Relaxed) < COUNT {
380 guard.repin();
381 collector.global.collect(&guard);
382 }
383 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
384 }
385
386 #[test]
387 fn destroy_array() {
388 #[cfg(miri)]
389 const COUNT: usize = 500;
390 #[cfg(not(miri))]
391 const COUNT: usize = 100_000;
392 static DESTROYS: AtomicUsize = AtomicUsize::new(0);
393
394 let collector = Collector::new();
395 let handle = collector.register();
396
397 unsafe {
398 let guard = &handle.pin();
399
400 let mut v = Vec::with_capacity(COUNT);
401 for i in 0..COUNT {
402 v.push(i as i32);
403 }
404
405 let len = v.len();
406 let ptr = ManuallyDrop::new(v).as_mut_ptr() as usize;
407 guard.defer_unchecked(move || {
408 drop(Vec::from_raw_parts(ptr as *const i32 as *mut i32, len, len));
409 DESTROYS.fetch_add(len, Ordering::Relaxed);
410 });
411 guard.flush();
412 }
413
414 while DESTROYS.load(Ordering::Relaxed) < COUNT {
415 let guard = &handle.pin();
416 collector.global.collect(guard);
417 }
418 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
419 }
420
421 #[test]
422 fn stress() {
423 const THREADS: usize = 8;
424 #[cfg(miri)]
425 const COUNT: usize = 500;
426 #[cfg(not(miri))]
427 const COUNT: usize = 100_000;
428 static DROPS: AtomicUsize = AtomicUsize::new(0);
429
430 struct Elem(i32);
431
432 impl Drop for Elem {
433 fn drop(&mut self) {
434 DROPS.fetch_add(1, Ordering::Relaxed);
435 }
436 }
437
438 let collector = Collector::new();
439
440 thread::scope(|scope| {
441 for _ in 0..THREADS {
442 scope.spawn(|_| {
443 let handle = collector.register();
444 for _ in 0..COUNT {
445 let guard = &handle.pin();
446 unsafe {
447 let a = Owned::new(Elem(7i32)).into_shared(guard);
448 guard.defer_destroy(a);
449 }
450 }
451 });
452 }
453 })
454 .unwrap();
455
456 let handle = collector.register();
457 while DROPS.load(Ordering::Relaxed) < COUNT * THREADS {
458 let guard = &handle.pin();
459 collector.global.collect(guard);
460 }
461 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS);
462 }
463}
464