| 1 | use crate::{ |
| 2 | cfg::{self, CfgPrivate}, |
| 3 | clear::Clear, |
| 4 | page, |
| 5 | sync::{ |
| 6 | alloc, |
| 7 | atomic::{ |
| 8 | AtomicPtr, AtomicUsize, |
| 9 | Ordering::{self, *}, |
| 10 | }, |
| 11 | }, |
| 12 | tid::Tid, |
| 13 | Pack, |
| 14 | }; |
| 15 | |
| 16 | use std::{fmt, ptr, slice}; |
| 17 | |
| 18 | // ┌─────────────┐ ┌────────┐ |
| 19 | // │ page 1 │ │ │ |
| 20 | // ├─────────────┤ ┌───▶│ next──┼─┐ |
| 21 | // │ page 2 │ │ ├────────┤ │ |
| 22 | // │ │ │ │XXXXXXXX│ │ |
| 23 | // │ local_free──┼─┘ ├────────┤ │ |
| 24 | // │ global_free─┼─┐ │ │◀┘ |
| 25 | // ├─────────────┤ └───▶│ next──┼─┐ |
| 26 | // │ page 3 │ ├────────┤ │ |
| 27 | // └─────────────┘ │XXXXXXXX│ │ |
| 28 | // ... ├────────┤ │ |
| 29 | // ┌─────────────┐ │XXXXXXXX│ │ |
| 30 | // │ page n │ ├────────┤ │ |
| 31 | // └─────────────┘ │ │◀┘ |
| 32 | // │ next──┼───▶ |
| 33 | // ├────────┤ |
| 34 | // │XXXXXXXX│ |
| 35 | // └────────┘ |
| 36 | // ... |
| 37 | pub(crate) struct Shard<T, C: cfg::Config> { |
| 38 | /// The shard's parent thread ID. |
| 39 | pub(crate) tid: usize, |
| 40 | /// The local free list for each page. |
| 41 | /// |
| 42 | /// These are only ever accessed from this shard's thread, so they are |
| 43 | /// stored separately from the shared state for the page that can be |
| 44 | /// accessed concurrently, to minimize false sharing. |
| 45 | local: Box<[page::Local]>, |
| 46 | /// The shared state for each page in this shard. |
| 47 | /// |
| 48 | /// This consists of the page's metadata (size, previous size), remote free |
| 49 | /// list, and a pointer to the actual array backing that page. |
| 50 | shared: Box<[page::Shared<T, C>]>, |
| 51 | } |
| 52 | |
| 53 | pub(crate) struct Array<T, C: cfg::Config> { |
| 54 | shards: Box<[Ptr<T, C>]>, |
| 55 | max: AtomicUsize, |
| 56 | } |
| 57 | |
| 58 | #[derive (Debug)] |
| 59 | struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>); |
| 60 | |
| 61 | #[derive (Debug)] |
| 62 | pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>); |
| 63 | |
| 64 | // === impl Shard === |
| 65 | |
| 66 | impl<T, C> Shard<T, C> |
| 67 | where |
| 68 | C: cfg::Config, |
| 69 | { |
| 70 | #[inline (always)] |
| 71 | pub(crate) fn with_slot<'a, U>( |
| 72 | &'a self, |
| 73 | idx: usize, |
| 74 | f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>, |
| 75 | ) -> Option<U> { |
| 76 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
| 77 | let (addr, page_index) = page::indices::<C>(idx); |
| 78 | |
| 79 | test_println!("-> {:?}" , addr); |
| 80 | if page_index >= self.shared.len() { |
| 81 | return None; |
| 82 | } |
| 83 | |
| 84 | self.shared[page_index].with_slot(addr, f) |
| 85 | } |
| 86 | |
| 87 | pub(crate) fn new(tid: usize) -> Self { |
| 88 | let mut total_sz = 0; |
| 89 | let shared = (0..C::MAX_PAGES) |
| 90 | .map(|page_num| { |
| 91 | let sz = C::page_size(page_num); |
| 92 | let prev_sz = total_sz; |
| 93 | total_sz += sz; |
| 94 | page::Shared::new(sz, prev_sz) |
| 95 | }) |
| 96 | .collect(); |
| 97 | let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect(); |
| 98 | Self { tid, local, shared } |
| 99 | } |
| 100 | } |
| 101 | |
| 102 | impl<T, C> Shard<Option<T>, C> |
| 103 | where |
| 104 | C: cfg::Config, |
| 105 | { |
| 106 | /// Remove an item on the shard's local thread. |
| 107 | pub(crate) fn take_local(&self, idx: usize) -> Option<T> { |
| 108 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
| 109 | let (addr, page_index) = page::indices::<C>(idx); |
| 110 | |
| 111 | test_println!("-> remove_local {:?}" , addr); |
| 112 | |
| 113 | self.shared |
| 114 | .get(page_index)? |
| 115 | .take(addr, C::unpack_gen(idx), self.local(page_index)) |
| 116 | } |
| 117 | |
| 118 | /// Remove an item, while on a different thread from the shard's local thread. |
| 119 | pub(crate) fn take_remote(&self, idx: usize) -> Option<T> { |
| 120 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
| 121 | debug_assert!(Tid::<C>::current().as_usize() != self.tid); |
| 122 | |
| 123 | let (addr, page_index) = page::indices::<C>(idx); |
| 124 | |
| 125 | test_println!("-> take_remote {:?}; page {:?}" , addr, page_index); |
| 126 | |
| 127 | let shared = self.shared.get(page_index)?; |
| 128 | shared.take(addr, C::unpack_gen(idx), shared.free_list()) |
| 129 | } |
| 130 | |
| 131 | pub(crate) fn remove_local(&self, idx: usize) -> bool { |
| 132 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
| 133 | let (addr, page_index) = page::indices::<C>(idx); |
| 134 | |
| 135 | if page_index >= self.shared.len() { |
| 136 | return false; |
| 137 | } |
| 138 | |
| 139 | self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index)) |
| 140 | } |
| 141 | |
| 142 | pub(crate) fn remove_remote(&self, idx: usize) -> bool { |
| 143 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
| 144 | let (addr, page_index) = page::indices::<C>(idx); |
| 145 | |
| 146 | if page_index >= self.shared.len() { |
| 147 | return false; |
| 148 | } |
| 149 | |
| 150 | let shared = &self.shared[page_index]; |
| 151 | shared.remove(addr, C::unpack_gen(idx), shared.free_list()) |
| 152 | } |
| 153 | |
| 154 | pub(crate) fn iter(&self) -> std::slice::Iter<'_, page::Shared<Option<T>, C>> { |
| 155 | self.shared.iter() |
| 156 | } |
| 157 | } |
| 158 | |
| 159 | impl<T, C> Shard<T, C> |
| 160 | where |
| 161 | T: Clear + Default, |
| 162 | C: cfg::Config, |
| 163 | { |
| 164 | pub(crate) fn init_with<U>( |
| 165 | &self, |
| 166 | mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>, |
| 167 | ) -> Option<U> { |
| 168 | // Can we fit the value into an exist`ing page? |
| 169 | for (page_idx, page) in self.shared.iter().enumerate() { |
| 170 | let local = self.local(page_idx); |
| 171 | |
| 172 | test_println!("-> page {}; {:?}; {:?}" , page_idx, local, page); |
| 173 | |
| 174 | if let Some(res) = page.init_with(local, &mut init) { |
| 175 | return Some(res); |
| 176 | } |
| 177 | } |
| 178 | |
| 179 | None |
| 180 | } |
| 181 | |
| 182 | pub(crate) fn mark_clear_local(&self, idx: usize) -> bool { |
| 183 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
| 184 | let (addr, page_index) = page::indices::<C>(idx); |
| 185 | |
| 186 | if page_index >= self.shared.len() { |
| 187 | return false; |
| 188 | } |
| 189 | |
| 190 | self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index)) |
| 191 | } |
| 192 | |
| 193 | pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool { |
| 194 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
| 195 | let (addr, page_index) = page::indices::<C>(idx); |
| 196 | |
| 197 | if page_index >= self.shared.len() { |
| 198 | return false; |
| 199 | } |
| 200 | |
| 201 | let shared = &self.shared[page_index]; |
| 202 | shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list()) |
| 203 | } |
| 204 | |
| 205 | pub(crate) fn clear_after_release(&self, idx: usize) { |
| 206 | crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire); |
| 207 | let tid = Tid::<C>::current().as_usize(); |
| 208 | test_println!( |
| 209 | "-> clear_after_release; self.tid= {:?}; current.tid= {:?};" , |
| 210 | tid, |
| 211 | self.tid |
| 212 | ); |
| 213 | if tid == self.tid { |
| 214 | self.clear_local(idx); |
| 215 | } else { |
| 216 | self.clear_remote(idx); |
| 217 | } |
| 218 | } |
| 219 | |
| 220 | fn clear_local(&self, idx: usize) -> bool { |
| 221 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
| 222 | let (addr, page_index) = page::indices::<C>(idx); |
| 223 | |
| 224 | if page_index >= self.shared.len() { |
| 225 | return false; |
| 226 | } |
| 227 | |
| 228 | self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index)) |
| 229 | } |
| 230 | |
| 231 | fn clear_remote(&self, idx: usize) -> bool { |
| 232 | debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); |
| 233 | let (addr, page_index) = page::indices::<C>(idx); |
| 234 | |
| 235 | if page_index >= self.shared.len() { |
| 236 | return false; |
| 237 | } |
| 238 | |
| 239 | let shared = &self.shared[page_index]; |
| 240 | shared.clear(addr, C::unpack_gen(idx), shared.free_list()) |
| 241 | } |
| 242 | |
| 243 | #[inline (always)] |
| 244 | fn local(&self, i: usize) -> &page::Local { |
| 245 | #[cfg (debug_assertions)] |
| 246 | debug_assert_eq_in_drop!( |
| 247 | Tid::<C>::current().as_usize(), |
| 248 | self.tid, |
| 249 | "tried to access local data from another thread!" |
| 250 | ); |
| 251 | |
| 252 | &self.local[i] |
| 253 | } |
| 254 | } |
| 255 | |
| 256 | impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> { |
| 257 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 258 | let mut d: DebugStruct<'_, '_> = f.debug_struct(name:"Shard" ); |
| 259 | |
| 260 | #[cfg (debug_assertions)] |
| 261 | d.field("tid" , &self.tid); |
| 262 | d.field(name:"shared" , &self.shared).finish() |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | // === impl Array === |
| 267 | |
| 268 | impl<T, C> Array<T, C> |
| 269 | where |
| 270 | C: cfg::Config, |
| 271 | { |
| 272 | pub(crate) fn new() -> Self { |
| 273 | let mut shards = Vec::with_capacity(C::MAX_SHARDS); |
| 274 | for _ in 0..C::MAX_SHARDS { |
| 275 | // XXX(eliza): T_T this could be avoided with maybeuninit or something... |
| 276 | shards.push(Ptr::null()); |
| 277 | } |
| 278 | Self { |
| 279 | shards: shards.into(), |
| 280 | max: AtomicUsize::new(0), |
| 281 | } |
| 282 | } |
| 283 | |
| 284 | #[inline ] |
| 285 | pub(crate) fn get(&self, idx: usize) -> Option<&Shard<T, C>> { |
| 286 | test_println!("-> get shard= {}" , idx); |
| 287 | self.shards.get(idx)?.load(Acquire) |
| 288 | } |
| 289 | |
| 290 | #[inline ] |
| 291 | pub(crate) fn current(&self) -> (Tid<C>, &Shard<T, C>) { |
| 292 | let tid = Tid::<C>::current(); |
| 293 | test_println!("current: {:?}" , tid); |
| 294 | let idx = tid.as_usize(); |
| 295 | assert!( |
| 296 | idx < self.shards.len(), |
| 297 | "Thread count overflowed the configured max count. \ |
| 298 | Thread index = {}, max threads = {}." , |
| 299 | idx, |
| 300 | C::MAX_SHARDS, |
| 301 | ); |
| 302 | // It's okay for this to be relaxed. The value is only ever stored by |
| 303 | // the thread that corresponds to the index, and we are that thread. |
| 304 | let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| { |
| 305 | let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx)))); |
| 306 | test_println!("-> allocated new shard for index {} at {:p}" , idx, ptr); |
| 307 | self.shards[idx].set(ptr); |
| 308 | let mut max = self.max.load(Acquire); |
| 309 | while max < idx { |
| 310 | match self.max.compare_exchange(max, idx, AcqRel, Acquire) { |
| 311 | Ok(_) => break, |
| 312 | Err(actual) => max = actual, |
| 313 | } |
| 314 | } |
| 315 | test_println!("-> highest index= {}, prev= {}" , std::cmp::max(max, idx), max); |
| 316 | unsafe { |
| 317 | // Safety: we just put it there! |
| 318 | &*ptr |
| 319 | } |
| 320 | .get_ref() |
| 321 | }); |
| 322 | (tid, shard) |
| 323 | } |
| 324 | |
| 325 | pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> { |
| 326 | test_println!("Array::iter_mut" ); |
| 327 | let max = self.max.load(Acquire); |
| 328 | test_println!("-> highest index= {}" , max); |
| 329 | IterMut(self.shards[0..=max].iter_mut()) |
| 330 | } |
| 331 | } |
| 332 | |
| 333 | impl<T, C: cfg::Config> Drop for Array<T, C> { |
| 334 | fn drop(&mut self) { |
| 335 | // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`... |
| 336 | let max: usize = self.max.load(order:Acquire); |
| 337 | for shard: &Ptr in &self.shards[0..=max] { |
| 338 | // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`... |
| 339 | let ptr: *mut Track> = shard.0.load(order:Acquire); |
| 340 | if ptr.is_null() { |
| 341 | continue; |
| 342 | } |
| 343 | let shard: Box = unsafe { |
| 344 | // Safety: this is the only place where these boxes are |
| 345 | // deallocated, and we have exclusive access to the shard array, |
| 346 | // because...we are dropping it... |
| 347 | Box::from_raw(ptr) |
| 348 | }; |
| 349 | drop(shard) |
| 350 | } |
| 351 | } |
| 352 | } |
| 353 | |
| 354 | impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> { |
| 355 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 356 | let max: usize = self.max.load(order:Acquire); |
| 357 | let mut set: DebugMap<'_, '_> = f.debug_map(); |
| 358 | for shard: &Ptr in &self.shards[0..=max] { |
| 359 | let ptr: *mut Track> = shard.0.load(order:Acquire); |
| 360 | if let Some(shard: NonNull) = ptr::NonNull::new(ptr) { |
| 361 | set.entry(&format_args!(" {:p}" , ptr), value:unsafe { shard.as_ref() }); |
| 362 | } else { |
| 363 | set.entry(&format_args!(" {:p}" , ptr), &()); |
| 364 | } |
| 365 | } |
| 366 | set.finish() |
| 367 | } |
| 368 | } |
| 369 | |
| 370 | // === impl Ptr === |
| 371 | |
| 372 | impl<T, C: cfg::Config> Ptr<T, C> { |
| 373 | #[inline ] |
| 374 | fn null() -> Self { |
| 375 | Self(AtomicPtr::new(ptr::null_mut())) |
| 376 | } |
| 377 | |
| 378 | #[inline ] |
| 379 | fn load(&self, order: Ordering) -> Option<&Shard<T, C>> { |
| 380 | let ptr = self.0.load(order); |
| 381 | test_println!("---> loaded= {:p} (order= {:?})" , ptr, order); |
| 382 | if ptr.is_null() { |
| 383 | test_println!("---> null" ); |
| 384 | return None; |
| 385 | } |
| 386 | let track = unsafe { |
| 387 | // Safety: The returned reference will have the same lifetime as the |
| 388 | // reference to the shard pointer, which (morally, if not actually) |
| 389 | // owns the shard. The shard is only deallocated when the shard |
| 390 | // array is dropped, and it won't be dropped while this pointer is |
| 391 | // borrowed --- and the returned reference has the same lifetime. |
| 392 | // |
| 393 | // We know that the pointer is not null, because we just |
| 394 | // null-checked it immediately prior. |
| 395 | &*ptr |
| 396 | }; |
| 397 | |
| 398 | Some(track.get_ref()) |
| 399 | } |
| 400 | |
| 401 | #[inline ] |
| 402 | fn set(&self, new: *mut alloc::Track<Shard<T, C>>) { |
| 403 | self.0 |
| 404 | .compare_exchange(ptr::null_mut(), new, AcqRel, Acquire) |
| 405 | .expect("a shard can only be inserted by the thread that owns it, this is a bug!" ); |
| 406 | } |
| 407 | } |
| 408 | |
| 409 | // === Iterators === |
| 410 | |
| 411 | impl<'a, T, C> Iterator for IterMut<'a, T, C> |
| 412 | where |
| 413 | T: 'a, |
| 414 | C: cfg::Config + 'a, |
| 415 | { |
| 416 | type Item = &'a Shard<T, C>; |
| 417 | fn next(&mut self) -> Option<Self::Item> { |
| 418 | test_println!("IterMut::next" ); |
| 419 | loop { |
| 420 | // Skip over empty indices if they are less than the highest |
| 421 | // allocated shard. Some threads may have accessed the slab |
| 422 | // (generating a thread ID) but never actually inserted data, so |
| 423 | // they may have never allocated a shard. |
| 424 | let next: Option<&'a mut Ptr> = self.0.next(); |
| 425 | test_println!("-> next.is_some= {}" , next.is_some()); |
| 426 | if let Some(shard: &Shard) = next?.load(order:Acquire) { |
| 427 | test_println!("-> done" ); |
| 428 | return Some(shard); |
| 429 | } |
| 430 | } |
| 431 | } |
| 432 | } |
| 433 | |