1use crate::{
2 cfg::{self, CfgPrivate},
3 clear::Clear,
4 page,
5 sync::{
6 alloc,
7 atomic::{
8 AtomicPtr, AtomicUsize,
9 Ordering::{self, *},
10 },
11 },
12 tid::Tid,
13 Pack,
14};
15
16use std::{fmt, ptr, slice};
17
18// ┌─────────────┐ ┌────────┐
19// │ page 1 │ │ │
20// ├─────────────┤ ┌───▶│ next──┼─┐
21// │ page 2 │ │ ├────────┤ │
22// │ │ │ │XXXXXXXX│ │
23// │ local_free──┼─┘ ├────────┤ │
24// │ global_free─┼─┐ │ │◀┘
25// ├─────────────┤ └───▶│ next──┼─┐
26// │ page 3 │ ├────────┤ │
27// └─────────────┘ │XXXXXXXX│ │
28// ... ├────────┤ │
29// ┌─────────────┐ │XXXXXXXX│ │
30// │ page n │ ├────────┤ │
31// └─────────────┘ │ │◀┘
32// │ next──┼───▶
33// ├────────┤
34// │XXXXXXXX│
35// └────────┘
36// ...
37pub(crate) struct Shard<T, C: cfg::Config> {
38 /// The shard's parent thread ID.
39 pub(crate) tid: usize,
40 /// The local free list for each page.
41 ///
42 /// These are only ever accessed from this shard's thread, so they are
43 /// stored separately from the shared state for the page that can be
44 /// accessed concurrently, to minimize false sharing.
45 local: Box<[page::Local]>,
46 /// The shared state for each page in this shard.
47 ///
48 /// This consists of the page's metadata (size, previous size), remote free
49 /// list, and a pointer to the actual array backing that page.
50 shared: Box<[page::Shared<T, C>]>,
51}
52
53pub(crate) struct Array<T, C: cfg::Config> {
54 shards: Box<[Ptr<T, C>]>,
55 max: AtomicUsize,
56}
57
58#[derive(Debug)]
59struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>);
60
61#[derive(Debug)]
62pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>);
63
64// === impl Shard ===
65
66impl<T, C> Shard<T, C>
67where
68 C: cfg::Config,
69{
70 #[inline(always)]
71 pub(crate) fn with_slot<'a, U>(
72 &'a self,
73 idx: usize,
74 f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>,
75 ) -> Option<U> {
76 debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
77 let (addr, page_index) = page::indices::<C>(idx);
78
79 test_println!("-> {:?}", addr);
80 if page_index > self.shared.len() {
81 return None;
82 }
83
84 self.shared[page_index].with_slot(addr, f)
85 }
86
87 pub(crate) fn new(tid: usize) -> Self {
88 let mut total_sz = 0;
89 let shared = (0..C::MAX_PAGES)
90 .map(|page_num| {
91 let sz = C::page_size(page_num);
92 let prev_sz = total_sz;
93 total_sz += sz;
94 page::Shared::new(sz, prev_sz)
95 })
96 .collect();
97 let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect();
98 Self { tid, local, shared }
99 }
100}
101
102impl<T, C> Shard<Option<T>, C>
103where
104 C: cfg::Config,
105{
106 /// Remove an item on the shard's local thread.
107 pub(crate) fn take_local(&self, idx: usize) -> Option<T> {
108 debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
109 let (addr, page_index) = page::indices::<C>(idx);
110
111 test_println!("-> remove_local {:?}", addr);
112
113 self.shared
114 .get(page_index)?
115 .take(addr, C::unpack_gen(idx), self.local(page_index))
116 }
117
118 /// Remove an item, while on a different thread from the shard's local thread.
119 pub(crate) fn take_remote(&self, idx: usize) -> Option<T> {
120 debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
121 debug_assert!(Tid::<C>::current().as_usize() != self.tid);
122
123 let (addr, page_index) = page::indices::<C>(idx);
124
125 test_println!("-> take_remote {:?}; page {:?}", addr, page_index);
126
127 let shared = self.shared.get(page_index)?;
128 shared.take(addr, C::unpack_gen(idx), shared.free_list())
129 }
130
131 pub(crate) fn remove_local(&self, idx: usize) -> bool {
132 debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
133 let (addr, page_index) = page::indices::<C>(idx);
134
135 if page_index > self.shared.len() {
136 return false;
137 }
138
139 self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index))
140 }
141
142 pub(crate) fn remove_remote(&self, idx: usize) -> bool {
143 debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
144 let (addr, page_index) = page::indices::<C>(idx);
145
146 if page_index > self.shared.len() {
147 return false;
148 }
149
150 let shared = &self.shared[page_index];
151 shared.remove(addr, C::unpack_gen(idx), shared.free_list())
152 }
153
154 pub(crate) fn iter(&self) -> std::slice::Iter<'_, page::Shared<Option<T>, C>> {
155 self.shared.iter()
156 }
157}
158
159impl<T, C> Shard<T, C>
160where
161 T: Clear + Default,
162 C: cfg::Config,
163{
164 pub(crate) fn init_with<U>(
165 &self,
166 mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>,
167 ) -> Option<U> {
168 // Can we fit the value into an exist`ing page?
169 for (page_idx, page) in self.shared.iter().enumerate() {
170 let local = self.local(page_idx);
171
172 test_println!("-> page {}; {:?}; {:?}", page_idx, local, page);
173
174 if let Some(res) = page.init_with(local, &mut init) {
175 return Some(res);
176 }
177 }
178
179 None
180 }
181
182 pub(crate) fn mark_clear_local(&self, idx: usize) -> bool {
183 debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
184 let (addr, page_index) = page::indices::<C>(idx);
185
186 if page_index > self.shared.len() {
187 return false;
188 }
189
190 self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index))
191 }
192
193 pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool {
194 debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
195 let (addr, page_index) = page::indices::<C>(idx);
196
197 if page_index > self.shared.len() {
198 return false;
199 }
200
201 let shared = &self.shared[page_index];
202 shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list())
203 }
204
205 pub(crate) fn clear_after_release(&self, idx: usize) {
206 crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire);
207 let tid = Tid::<C>::current().as_usize();
208 test_println!(
209 "-> clear_after_release; self.tid={:?}; current.tid={:?};",
210 tid,
211 self.tid
212 );
213 if tid == self.tid {
214 self.clear_local(idx);
215 } else {
216 self.clear_remote(idx);
217 }
218 }
219
220 fn clear_local(&self, idx: usize) -> bool {
221 debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
222 let (addr, page_index) = page::indices::<C>(idx);
223
224 if page_index > self.shared.len() {
225 return false;
226 }
227
228 self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index))
229 }
230
231 fn clear_remote(&self, idx: usize) -> bool {
232 debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
233 let (addr, page_index) = page::indices::<C>(idx);
234
235 if page_index > self.shared.len() {
236 return false;
237 }
238
239 let shared = &self.shared[page_index];
240 shared.clear(addr, C::unpack_gen(idx), shared.free_list())
241 }
242
243 #[inline(always)]
244 fn local(&self, i: usize) -> &page::Local {
245 #[cfg(debug_assertions)]
246 debug_assert_eq_in_drop!(
247 Tid::<C>::current().as_usize(),
248 self.tid,
249 "tried to access local data from another thread!"
250 );
251
252 &self.local[i]
253 }
254}
255
256impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 let mut d: DebugStruct<'_, '_> = f.debug_struct(name:"Shard");
259
260 #[cfg(debug_assertions)]
261 d.field(name:"tid", &self.tid);
262 d.field(name:"shared", &self.shared).finish()
263 }
264}
265
266// === impl Array ===
267
268impl<T, C> Array<T, C>
269where
270 C: cfg::Config,
271{
272 pub(crate) fn new() -> Self {
273 let mut shards = Vec::with_capacity(C::MAX_SHARDS);
274 for _ in 0..C::MAX_SHARDS {
275 // XXX(eliza): T_T this could be avoided with maybeuninit or something...
276 shards.push(Ptr::null());
277 }
278 Self {
279 shards: shards.into(),
280 max: AtomicUsize::new(0),
281 }
282 }
283
284 #[inline]
285 pub(crate) fn get(&self, idx: usize) -> Option<&Shard<T, C>> {
286 test_println!("-> get shard={}", idx);
287 self.shards.get(idx)?.load(Acquire)
288 }
289
290 #[inline]
291 pub(crate) fn current(&self) -> (Tid<C>, &Shard<T, C>) {
292 let tid = Tid::<C>::current();
293 test_println!("current: {:?}", tid);
294 let idx = tid.as_usize();
295 assert!(
296 idx < self.shards.len(),
297 "Thread count overflowed the configured max count. \
298 Thread index = {}, max threads = {}.",
299 idx,
300 C::MAX_SHARDS,
301 );
302 // It's okay for this to be relaxed. The value is only ever stored by
303 // the thread that corresponds to the index, and we are that thread.
304 let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| {
305 let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx))));
306 test_println!("-> allocated new shard for index {} at {:p}", idx, ptr);
307 self.shards[idx].set(ptr);
308 let mut max = self.max.load(Acquire);
309 while max < idx {
310 match self.max.compare_exchange(max, idx, AcqRel, Acquire) {
311 Ok(_) => break,
312 Err(actual) => max = actual,
313 }
314 }
315 test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max);
316 unsafe {
317 // Safety: we just put it there!
318 &*ptr
319 }
320 .get_ref()
321 });
322 (tid, shard)
323 }
324
325 pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> {
326 test_println!("Array::iter_mut");
327 let max = self.max.load(Acquire);
328 test_println!("-> highest index={}", max);
329 IterMut(self.shards[0..=max].iter_mut())
330 }
331}
332
333impl<T, C: cfg::Config> Drop for Array<T, C> {
334 fn drop(&mut self) {
335 // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
336 let max: usize = self.max.load(order:Acquire);
337 for shard: &Ptr in &self.shards[0..=max] {
338 // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
339 let ptr: *mut Track> = shard.0.load(order:Acquire);
340 if ptr.is_null() {
341 continue;
342 }
343 let shard: Box>> = unsafe {
344 // Safety: this is the only place where these boxes are
345 // deallocated, and we have exclusive access to the shard array,
346 // because...we are dropping it...
347 Box::from_raw(ptr)
348 };
349 drop(shard)
350 }
351 }
352}
353
354impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> {
355 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356 let max: usize = self.max.load(order:Acquire);
357 let mut set: DebugMap<'_, '_> = f.debug_map();
358 for shard: &Ptr in &self.shards[0..=max] {
359 let ptr: *mut Track> = shard.0.load(order:Acquire);
360 if let Some(shard: NonNull>>) = ptr::NonNull::new(ptr) {
361 set.entry(&format_args!("{:p}", ptr), value:unsafe { shard.as_ref() });
362 } else {
363 set.entry(&format_args!("{:p}", ptr), &());
364 }
365 }
366 set.finish()
367 }
368}
369
370// === impl Ptr ===
371
372impl<T, C: cfg::Config> Ptr<T, C> {
373 #[inline]
374 fn null() -> Self {
375 Self(AtomicPtr::new(ptr::null_mut()))
376 }
377
378 #[inline]
379 fn load(&self, order: Ordering) -> Option<&Shard<T, C>> {
380 let ptr = self.0.load(order);
381 test_println!("---> loaded={:p} (order={:?})", ptr, order);
382 if ptr.is_null() {
383 test_println!("---> null");
384 return None;
385 }
386 let track = unsafe {
387 // Safety: The returned reference will have the same lifetime as the
388 // reference to the shard pointer, which (morally, if not actually)
389 // owns the shard. The shard is only deallocated when the shard
390 // array is dropped, and it won't be dropped while this pointer is
391 // borrowed --- and the returned reference has the same lifetime.
392 //
393 // We know that the pointer is not null, because we just
394 // null-checked it immediately prior.
395 &*ptr
396 };
397
398 Some(track.get_ref())
399 }
400
401 #[inline]
402 fn set(&self, new: *mut alloc::Track<Shard<T, C>>) {
403 self.0
404 .compare_exchange(ptr::null_mut(), new, AcqRel, Acquire)
405 .expect("a shard can only be inserted by the thread that owns it, this is a bug!");
406 }
407}
408
409// === Iterators ===
410
411impl<'a, T, C> Iterator for IterMut<'a, T, C>
412where
413 T: 'a,
414 C: cfg::Config + 'a,
415{
416 type Item = &'a Shard<T, C>;
417 fn next(&mut self) -> Option<Self::Item> {
418 test_println!("IterMut::next");
419 loop {
420 // Skip over empty indices if they are less than the highest
421 // allocated shard. Some threads may have accessed the slab
422 // (generating a thread ID) but never actually inserted data, so
423 // they may have never allocated a shard.
424 let next: Option<&mut Ptr> = self.0.next();
425 test_println!("-> next.is_some={}", next.is_some());
426 if let Some(shard: &Shard) = next?.load(order:Acquire) {
427 test_println!("-> done");
428 return Some(shard);
429 }
430 }
431 }
432}
433