1 | use crate::{ |
2 | cfg::{self, CfgPrivate}, |
3 | clear::Clear, |
4 | page, |
5 | sync::{ |
6 | alloc, |
7 | atomic::{ |
8 | AtomicPtr, AtomicUsize, |
9 | Ordering::{self, *}, |
10 | }, |
11 | }, |
12 | tid::Tid, |
13 | Pack, |
14 | }; |
15 | |
16 | use std::{fmt, ptr, slice}; |
17 | |
18 | // ┌─────────────┐ ┌────────┐ |
19 | // │ page 1 │ │ │ |
20 | // ├─────────────┤ ┌───▶│ next──┼─┐ |
21 | // │ page 2 │ │ ├────────┤ │ |
22 | // │ │ │ │XXXXXXXX│ │ |
23 | // │ local_free──┼─┘ ├────────┤ │ |
24 | // │ global_free─┼─┐ │ │◀┘ |
25 | // ├─────────────┤ └───▶│ next──┼─┐ |
26 | // │ page 3 │ ├────────┤ │ |
27 | // └─────────────┘ │XXXXXXXX│ │ |
28 | // ... ├────────┤ │ |
29 | // ┌─────────────┐ │XXXXXXXX│ │ |
30 | // │ page n │ ├────────┤ │ |
31 | // └─────────────┘ │ │◀┘ |
32 | // │ next──┼───▶ |
33 | // ├────────┤ |
34 | // │XXXXXXXX│ |
35 | // └────────┘ |
36 | // ... |
37 | pub(crate) struct Shard<T, C: cfg::Config> { |
38 | /// The shard's parent thread ID. |
39 | pub(crate) tid: usize, |
40 | /// The local free list for each page. |
41 | /// |
42 | /// These are only ever accessed from this shard's thread, so they are |
43 | /// stored separately from the shared state for the page that can be |
44 | /// accessed concurrently, to minimize false sharing. |
45 | local: Box<[page::Local]>, |
46 | /// The shared state for each page in this shard. |
47 | /// |
48 | /// This consists of the page's metadata (size, previous size), remote free |
49 | /// list, and a pointer to the actual array backing that page. |
50 | shared: Box<[page::Shared<T, C>]>, |
51 | } |
52 | |
53 | pub(crate) struct Array<T, C: cfg::Config> { |
54 | shards: Box<[Ptr<T, C>]>, |
55 | max: AtomicUsize, |
56 | } |
57 | |
58 | #[derive (Debug)] |
59 | struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>); |
60 | |
61 | #[derive (Debug)] |
62 | pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>); |
63 | |
64 | // === impl Shard === |
65 | |
66 | impl<T, C> Shard<T, C> |
67 | where |
68 | C: cfg::Config, |
69 | { |
70 | #[inline (always)] |
71 | pub(crate) fn with_slot<'a, U>( |
72 | &'a self, |
73 | idx: usize, |
74 | f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>, |
75 | ) -> Option<U> { |
76 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
77 | let (addr, page_index) = page::indices::<C>(idx); |
78 | |
79 | test_println!("-> {:?}" , addr); |
80 | if page_index > self.shared.len() { |
81 | return None; |
82 | } |
83 | |
84 | self.shared[page_index].with_slot(addr, f) |
85 | } |
86 | |
87 | pub(crate) fn new(tid: usize) -> Self { |
88 | let mut total_sz = 0; |
89 | let shared = (0..C::MAX_PAGES) |
90 | .map(|page_num| { |
91 | let sz = C::page_size(page_num); |
92 | let prev_sz = total_sz; |
93 | total_sz += sz; |
94 | page::Shared::new(sz, prev_sz) |
95 | }) |
96 | .collect(); |
97 | let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect(); |
98 | Self { tid, local, shared } |
99 | } |
100 | } |
101 | |
102 | impl<T, C> Shard<Option<T>, C> |
103 | where |
104 | C: cfg::Config, |
105 | { |
106 | /// Remove an item on the shard's local thread. |
107 | pub(crate) fn take_local(&self, idx: usize) -> Option<T> { |
108 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
109 | let (addr, page_index) = page::indices::<C>(idx); |
110 | |
111 | test_println!("-> remove_local {:?}" , addr); |
112 | |
113 | self.shared |
114 | .get(page_index)? |
115 | .take(addr, C::unpack_gen(idx), self.local(page_index)) |
116 | } |
117 | |
118 | /// Remove an item, while on a different thread from the shard's local thread. |
119 | pub(crate) fn take_remote(&self, idx: usize) -> Option<T> { |
120 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
121 | debug_assert!(Tid::<C>::current().as_usize() != self.tid); |
122 | |
123 | let (addr, page_index) = page::indices::<C>(idx); |
124 | |
125 | test_println!("-> take_remote {:?}; page {:?}" , addr, page_index); |
126 | |
127 | let shared = self.shared.get(page_index)?; |
128 | shared.take(addr, C::unpack_gen(idx), shared.free_list()) |
129 | } |
130 | |
131 | pub(crate) fn remove_local(&self, idx: usize) -> bool { |
132 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
133 | let (addr, page_index) = page::indices::<C>(idx); |
134 | |
135 | if page_index > self.shared.len() { |
136 | return false; |
137 | } |
138 | |
139 | self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index)) |
140 | } |
141 | |
142 | pub(crate) fn remove_remote(&self, idx: usize) -> bool { |
143 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
144 | let (addr, page_index) = page::indices::<C>(idx); |
145 | |
146 | if page_index > self.shared.len() { |
147 | return false; |
148 | } |
149 | |
150 | let shared = &self.shared[page_index]; |
151 | shared.remove(addr, C::unpack_gen(idx), shared.free_list()) |
152 | } |
153 | |
154 | pub(crate) fn iter(&self) -> std::slice::Iter<'_, page::Shared<Option<T>, C>> { |
155 | self.shared.iter() |
156 | } |
157 | } |
158 | |
159 | impl<T, C> Shard<T, C> |
160 | where |
161 | T: Clear + Default, |
162 | C: cfg::Config, |
163 | { |
164 | pub(crate) fn init_with<U>( |
165 | &self, |
166 | mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>, |
167 | ) -> Option<U> { |
168 | // Can we fit the value into an exist`ing page? |
169 | for (page_idx, page) in self.shared.iter().enumerate() { |
170 | let local = self.local(page_idx); |
171 | |
172 | test_println!("-> page {}; {:?}; {:?}" , page_idx, local, page); |
173 | |
174 | if let Some(res) = page.init_with(local, &mut init) { |
175 | return Some(res); |
176 | } |
177 | } |
178 | |
179 | None |
180 | } |
181 | |
182 | pub(crate) fn mark_clear_local(&self, idx: usize) -> bool { |
183 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
184 | let (addr, page_index) = page::indices::<C>(idx); |
185 | |
186 | if page_index > self.shared.len() { |
187 | return false; |
188 | } |
189 | |
190 | self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index)) |
191 | } |
192 | |
193 | pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool { |
194 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
195 | let (addr, page_index) = page::indices::<C>(idx); |
196 | |
197 | if page_index > self.shared.len() { |
198 | return false; |
199 | } |
200 | |
201 | let shared = &self.shared[page_index]; |
202 | shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list()) |
203 | } |
204 | |
205 | pub(crate) fn clear_after_release(&self, idx: usize) { |
206 | crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire); |
207 | let tid = Tid::<C>::current().as_usize(); |
208 | test_println!( |
209 | "-> clear_after_release; self.tid= {:?}; current.tid= {:?};" , |
210 | tid, |
211 | self.tid |
212 | ); |
213 | if tid == self.tid { |
214 | self.clear_local(idx); |
215 | } else { |
216 | self.clear_remote(idx); |
217 | } |
218 | } |
219 | |
220 | fn clear_local(&self, idx: usize) -> bool { |
221 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
222 | let (addr, page_index) = page::indices::<C>(idx); |
223 | |
224 | if page_index > self.shared.len() { |
225 | return false; |
226 | } |
227 | |
228 | self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index)) |
229 | } |
230 | |
231 | fn clear_remote(&self, idx: usize) -> bool { |
232 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
233 | let (addr, page_index) = page::indices::<C>(idx); |
234 | |
235 | if page_index > self.shared.len() { |
236 | return false; |
237 | } |
238 | |
239 | let shared = &self.shared[page_index]; |
240 | shared.clear(addr, C::unpack_gen(idx), shared.free_list()) |
241 | } |
242 | |
243 | #[inline (always)] |
244 | fn local(&self, i: usize) -> &page::Local { |
245 | #[cfg (debug_assertions)] |
246 | debug_assert_eq_in_drop!( |
247 | Tid::<C>::current().as_usize(), |
248 | self.tid, |
249 | "tried to access local data from another thread!" |
250 | ); |
251 | |
252 | &self.local[i] |
253 | } |
254 | } |
255 | |
256 | impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> { |
257 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
258 | let mut d: DebugStruct<'_, '_> = f.debug_struct(name:"Shard" ); |
259 | |
260 | #[cfg (debug_assertions)] |
261 | d.field(name:"tid" , &self.tid); |
262 | d.field(name:"shared" , &self.shared).finish() |
263 | } |
264 | } |
265 | |
266 | // === impl Array === |
267 | |
268 | impl<T, C> Array<T, C> |
269 | where |
270 | C: cfg::Config, |
271 | { |
272 | pub(crate) fn new() -> Self { |
273 | let mut shards = Vec::with_capacity(C::MAX_SHARDS); |
274 | for _ in 0..C::MAX_SHARDS { |
275 | // XXX(eliza): T_T this could be avoided with maybeuninit or something... |
276 | shards.push(Ptr::null()); |
277 | } |
278 | Self { |
279 | shards: shards.into(), |
280 | max: AtomicUsize::new(0), |
281 | } |
282 | } |
283 | |
284 | #[inline ] |
285 | pub(crate) fn get(&self, idx: usize) -> Option<&Shard<T, C>> { |
286 | test_println!("-> get shard= {}" , idx); |
287 | self.shards.get(idx)?.load(Acquire) |
288 | } |
289 | |
290 | #[inline ] |
291 | pub(crate) fn current(&self) -> (Tid<C>, &Shard<T, C>) { |
292 | let tid = Tid::<C>::current(); |
293 | test_println!("current: {:?}" , tid); |
294 | let idx = tid.as_usize(); |
295 | assert!( |
296 | idx < self.shards.len(), |
297 | "Thread count overflowed the configured max count. \ |
298 | Thread index = {}, max threads = {}." , |
299 | idx, |
300 | C::MAX_SHARDS, |
301 | ); |
302 | // It's okay for this to be relaxed. The value is only ever stored by |
303 | // the thread that corresponds to the index, and we are that thread. |
304 | let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| { |
305 | let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx)))); |
306 | test_println!("-> allocated new shard for index {} at {:p}" , idx, ptr); |
307 | self.shards[idx].set(ptr); |
308 | let mut max = self.max.load(Acquire); |
309 | while max < idx { |
310 | match self.max.compare_exchange(max, idx, AcqRel, Acquire) { |
311 | Ok(_) => break, |
312 | Err(actual) => max = actual, |
313 | } |
314 | } |
315 | test_println!("-> highest index= {}, prev= {}" , std::cmp::max(max, idx), max); |
316 | unsafe { |
317 | // Safety: we just put it there! |
318 | &*ptr |
319 | } |
320 | .get_ref() |
321 | }); |
322 | (tid, shard) |
323 | } |
324 | |
325 | pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> { |
326 | test_println!("Array::iter_mut" ); |
327 | let max = self.max.load(Acquire); |
328 | test_println!("-> highest index= {}" , max); |
329 | IterMut(self.shards[0..=max].iter_mut()) |
330 | } |
331 | } |
332 | |
333 | impl<T, C: cfg::Config> Drop for Array<T, C> { |
334 | fn drop(&mut self) { |
335 | // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`... |
336 | let max: usize = self.max.load(order:Acquire); |
337 | for shard: &Ptr in &self.shards[0..=max] { |
338 | // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`... |
339 | let ptr: *mut Track> = shard.0.load(order:Acquire); |
340 | if ptr.is_null() { |
341 | continue; |
342 | } |
343 | let shard: Box = unsafe { |
344 | // Safety: this is the only place where these boxes are |
345 | // deallocated, and we have exclusive access to the shard array, |
346 | // because...we are dropping it... |
347 | Box::from_raw(ptr) |
348 | }; |
349 | drop(shard) |
350 | } |
351 | } |
352 | } |
353 | |
354 | impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> { |
355 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
356 | let max: usize = self.max.load(order:Acquire); |
357 | let mut set: DebugMap<'_, '_> = f.debug_map(); |
358 | for shard: &Ptr in &self.shards[0..=max] { |
359 | let ptr: *mut Track> = shard.0.load(order:Acquire); |
360 | if let Some(shard: NonNull) = ptr::NonNull::new(ptr) { |
361 | set.entry(&format_args!(" {:p}" , ptr), value:unsafe { shard.as_ref() }); |
362 | } else { |
363 | set.entry(&format_args!(" {:p}" , ptr), &()); |
364 | } |
365 | } |
366 | set.finish() |
367 | } |
368 | } |
369 | |
370 | // === impl Ptr === |
371 | |
372 | impl<T, C: cfg::Config> Ptr<T, C> { |
373 | #[inline ] |
374 | fn null() -> Self { |
375 | Self(AtomicPtr::new(ptr::null_mut())) |
376 | } |
377 | |
378 | #[inline ] |
379 | fn load(&self, order: Ordering) -> Option<&Shard<T, C>> { |
380 | let ptr = self.0.load(order); |
381 | test_println!("---> loaded= {:p} (order= {:?})" , ptr, order); |
382 | if ptr.is_null() { |
383 | test_println!("---> null" ); |
384 | return None; |
385 | } |
386 | let track = unsafe { |
387 | // Safety: The returned reference will have the same lifetime as the |
388 | // reference to the shard pointer, which (morally, if not actually) |
389 | // owns the shard. The shard is only deallocated when the shard |
390 | // array is dropped, and it won't be dropped while this pointer is |
391 | // borrowed --- and the returned reference has the same lifetime. |
392 | // |
393 | // We know that the pointer is not null, because we just |
394 | // null-checked it immediately prior. |
395 | &*ptr |
396 | }; |
397 | |
398 | Some(track.get_ref()) |
399 | } |
400 | |
401 | #[inline ] |
402 | fn set(&self, new: *mut alloc::Track<Shard<T, C>>) { |
403 | self.0 |
404 | .compare_exchange(ptr::null_mut(), new, AcqRel, Acquire) |
405 | .expect("a shard can only be inserted by the thread that owns it, this is a bug!" ); |
406 | } |
407 | } |
408 | |
409 | // === Iterators === |
410 | |
411 | impl<'a, T, C> Iterator for IterMut<'a, T, C> |
412 | where |
413 | T: 'a, |
414 | C: cfg::Config + 'a, |
415 | { |
416 | type Item = &'a Shard<T, C>; |
417 | fn next(&mut self) -> Option<Self::Item> { |
418 | test_println!("IterMut::next" ); |
419 | loop { |
420 | // Skip over empty indices if they are less than the highest |
421 | // allocated shard. Some threads may have accessed the slab |
422 | // (generating a thread ID) but never actually inserted data, so |
423 | // they may have never allocated a shard. |
424 | let next: Option<&mut Ptr> = self.0.next(); |
425 | test_println!("-> next.is_some= {}" , next.is_some()); |
426 | if let Some(shard: &Shard) = next?.load(order:Acquire) { |
427 | test_println!("-> done" ); |
428 | return Some(shard); |
429 | } |
430 | } |
431 | } |
432 | } |
433 | |