| 1 | //! This module defines an `IdleNotifiedSet`, which is a collection of elements. |
| 2 | //! Each element is intended to correspond to a task, and the collection will |
| 3 | //! keep track of which tasks have had their waker notified, and which have not. |
| 4 | //! |
| 5 | //! Each entry in the set holds some user-specified value. The value's type is |
| 6 | //! specified using the `T` parameter. It will usually be a `JoinHandle` or |
| 7 | //! similar. |
| 8 | |
| 9 | use std::marker::PhantomPinned; |
| 10 | use std::mem::ManuallyDrop; |
| 11 | use std::ptr::NonNull; |
| 12 | use std::task::{Context, Waker}; |
| 13 | |
| 14 | use crate::loom::cell::UnsafeCell; |
| 15 | use crate::loom::sync::{Arc, Mutex}; |
| 16 | use crate::util::linked_list::{self, Link}; |
| 17 | use crate::util::{waker_ref, Wake}; |
| 18 | |
| 19 | type LinkedList<T> = |
| 20 | linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>; |
| 21 | |
| 22 | /// This is the main handle to the collection. |
| 23 | pub(crate) struct IdleNotifiedSet<T> { |
| 24 | lists: Arc<Lists<T>>, |
| 25 | length: usize, |
| 26 | } |
| 27 | |
| 28 | /// A handle to an entry that is guaranteed to be stored in the idle or notified |
| 29 | /// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet` |
| 30 | /// mutably to prevent the entry from being moved to the `Neither` list, which |
| 31 | /// only the `IdleNotifiedSet` may do. |
| 32 | /// |
| 33 | /// The main consequence of being stored in one of the lists is that the `value` |
| 34 | /// field has not yet been consumed. |
| 35 | /// |
| 36 | /// Note: This entry can be moved from the idle to the notified list while this |
| 37 | /// object exists by waking its waker. |
| 38 | pub(crate) struct EntryInOneOfTheLists<'a, T> { |
| 39 | entry: Arc<ListEntry<T>>, |
| 40 | set: &'a mut IdleNotifiedSet<T>, |
| 41 | } |
| 42 | |
| 43 | type Lists<T> = Mutex<ListsInner<T>>; |
| 44 | |
| 45 | /// The linked lists hold strong references to the `ListEntry` items, and the |
| 46 | /// `ListEntry` items also hold a strong reference back to the Lists object, but |
| 47 | /// the destructor of the `IdleNotifiedSet` will clear the two lists, so once |
| 48 | /// that object is destroyed, no ref-cycles will remain. |
| 49 | struct ListsInner<T> { |
| 50 | notified: LinkedList<T>, |
| 51 | idle: LinkedList<T>, |
| 52 | /// Whenever an element in the `notified` list is woken, this waker will be |
| 53 | /// notified and consumed, if it exists. |
| 54 | waker: Option<Waker>, |
| 55 | } |
| 56 | |
| 57 | /// Which of the two lists in the shared Lists object is this entry stored in? |
| 58 | /// |
| 59 | /// If the value is `Idle`, then an entry's waker may move it to the notified |
| 60 | /// list. Otherwise, only the `IdleNotifiedSet` may move it. |
| 61 | /// |
| 62 | /// If the value is `Neither`, then it is still possible that the entry is in |
| 63 | /// some third external list (this happens in `drain`). |
| 64 | #[derive (Copy, Clone, Eq, PartialEq)] |
| 65 | enum List { |
| 66 | Notified, |
| 67 | Idle, |
| 68 | Neither, |
| 69 | } |
| 70 | |
| 71 | /// An entry in the list. |
| 72 | /// |
| 73 | /// # Safety |
| 74 | /// |
| 75 | /// The `my_list` field must only be accessed while holding the mutex in |
| 76 | /// `parent`. It is an invariant that the value of `my_list` corresponds to |
| 77 | /// which linked list in the `parent` holds this entry. Once this field takes |
| 78 | /// the value `Neither`, then it may never be modified again. |
| 79 | /// |
| 80 | /// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field |
| 81 | /// must only be accessed while holding the mutex. If the value of `my_list` is |
| 82 | /// `Neither`, then the `pointers` field may be accessed by the |
| 83 | /// `IdleNotifiedSet` (this happens inside `drain`). |
| 84 | /// |
| 85 | /// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed |
| 86 | /// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to |
| 87 | /// `Neither` assumes ownership of the `value`, and it must either drop it or |
| 88 | /// move it out from this entry to prevent it from getting leaked. (Since the |
| 89 | /// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the |
| 90 | /// value should not be leaked.) |
| 91 | struct ListEntry<T> { |
| 92 | /// The linked list pointers of the list this entry is in. |
| 93 | pointers: linked_list::Pointers<ListEntry<T>>, |
| 94 | /// Pointer to the shared `Lists` struct. |
| 95 | parent: Arc<Lists<T>>, |
| 96 | /// The value stored in this entry. |
| 97 | value: UnsafeCell<ManuallyDrop<T>>, |
| 98 | /// Used to remember which list this entry is in. |
| 99 | my_list: UnsafeCell<List>, |
| 100 | /// Required by the `linked_list::Pointers` field. |
| 101 | _pin: PhantomPinned, |
| 102 | } |
| 103 | |
| 104 | generate_addr_of_methods! { |
| 105 | impl<T> ListEntry<T> { |
| 106 | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> { |
| 107 | &self.pointers |
| 108 | } |
| 109 | } |
| 110 | } |
| 111 | |
| 112 | // With mutable access to the `IdleNotifiedSet`, you can get mutable access to |
| 113 | // the values. |
| 114 | unsafe impl<T: Send> Send for IdleNotifiedSet<T> {} |
| 115 | // With the current API we strictly speaking don't even need `T: Sync`, but we |
| 116 | // require it anyway to support adding &self APIs that access the values in the |
| 117 | // future. |
| 118 | unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {} |
| 119 | |
| 120 | // These impls control when it is safe to create a Waker. Since the waker does |
| 121 | // not allow access to the value in any way (including its destructor), it is |
| 122 | // not necessary for `T` to be Send or Sync. |
| 123 | unsafe impl<T> Send for ListEntry<T> {} |
| 124 | unsafe impl<T> Sync for ListEntry<T> {} |
| 125 | |
| 126 | impl<T> IdleNotifiedSet<T> { |
| 127 | /// Create a new `IdleNotifiedSet`. |
| 128 | pub(crate) fn new() -> Self { |
| 129 | let lists = Mutex::new(ListsInner { |
| 130 | notified: LinkedList::new(), |
| 131 | idle: LinkedList::new(), |
| 132 | waker: None, |
| 133 | }); |
| 134 | |
| 135 | IdleNotifiedSet { |
| 136 | lists: Arc::new(lists), |
| 137 | length: 0, |
| 138 | } |
| 139 | } |
| 140 | |
| 141 | pub(crate) fn len(&self) -> usize { |
| 142 | self.length |
| 143 | } |
| 144 | |
| 145 | pub(crate) fn is_empty(&self) -> bool { |
| 146 | self.length == 0 |
| 147 | } |
| 148 | |
| 149 | /// Insert the given value into the `idle` list. |
| 150 | pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> { |
| 151 | self.length += 1; |
| 152 | |
| 153 | let entry = Arc::new(ListEntry { |
| 154 | parent: self.lists.clone(), |
| 155 | value: UnsafeCell::new(ManuallyDrop::new(value)), |
| 156 | my_list: UnsafeCell::new(List::Idle), |
| 157 | pointers: linked_list::Pointers::new(), |
| 158 | _pin: PhantomPinned, |
| 159 | }); |
| 160 | |
| 161 | { |
| 162 | let mut lock = self.lists.lock(); |
| 163 | lock.idle.push_front(entry.clone()); |
| 164 | } |
| 165 | |
| 166 | // Safety: We just put the entry in the idle list, so it is in one of the lists. |
| 167 | EntryInOneOfTheLists { entry, set: self } |
| 168 | } |
| 169 | |
| 170 | /// Pop an entry from the notified list to poll it. The entry is moved to |
| 171 | /// the idle list atomically. |
| 172 | pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> { |
| 173 | // We don't decrement the length because this call moves the entry to |
| 174 | // the idle list rather than removing it. |
| 175 | if self.length == 0 { |
| 176 | // Fast path. |
| 177 | return None; |
| 178 | } |
| 179 | |
| 180 | let mut lock = self.lists.lock(); |
| 181 | |
| 182 | let should_update_waker = match lock.waker.as_mut() { |
| 183 | Some(cur_waker) => !waker.will_wake(cur_waker), |
| 184 | None => true, |
| 185 | }; |
| 186 | if should_update_waker { |
| 187 | lock.waker = Some(waker.clone()); |
| 188 | } |
| 189 | |
| 190 | // Pop the entry, returning None if empty. |
| 191 | let entry = lock.notified.pop_back()?; |
| 192 | |
| 193 | lock.idle.push_front(entry.clone()); |
| 194 | |
| 195 | // Safety: We are holding the lock. |
| 196 | entry.my_list.with_mut(|ptr| unsafe { |
| 197 | *ptr = List::Idle; |
| 198 | }); |
| 199 | |
| 200 | drop(lock); |
| 201 | |
| 202 | // Safety: We just put the entry in the idle list, so it is in one of the lists. |
| 203 | Some(EntryInOneOfTheLists { entry, set: self }) |
| 204 | } |
| 205 | |
| 206 | /// Tries to pop an entry from the notified list to poll it. The entry is moved to |
| 207 | /// the idle list atomically. |
| 208 | pub(crate) fn try_pop_notified(&mut self) -> Option<EntryInOneOfTheLists<'_, T>> { |
| 209 | // We don't decrement the length because this call moves the entry to |
| 210 | // the idle list rather than removing it. |
| 211 | if self.length == 0 { |
| 212 | // Fast path. |
| 213 | return None; |
| 214 | } |
| 215 | |
| 216 | let mut lock = self.lists.lock(); |
| 217 | |
| 218 | // Pop the entry, returning None if empty. |
| 219 | let entry = lock.notified.pop_back()?; |
| 220 | |
| 221 | lock.idle.push_front(entry.clone()); |
| 222 | |
| 223 | // Safety: We are holding the lock. |
| 224 | entry.my_list.with_mut(|ptr| unsafe { |
| 225 | *ptr = List::Idle; |
| 226 | }); |
| 227 | |
| 228 | drop(lock); |
| 229 | |
| 230 | // Safety: We just put the entry in the idle list, so it is in one of the lists. |
| 231 | Some(EntryInOneOfTheLists { entry, set: self }) |
| 232 | } |
| 233 | |
| 234 | /// Call a function on every element in this list. |
| 235 | pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) { |
| 236 | fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) { |
| 237 | let mut node = list.last(); |
| 238 | |
| 239 | while let Some(entry) = node { |
| 240 | ptrs.push(entry.value.with_mut(|ptr| { |
| 241 | let ptr: *mut ManuallyDrop<T> = ptr; |
| 242 | let ptr: *mut T = ptr.cast(); |
| 243 | ptr |
| 244 | })); |
| 245 | |
| 246 | let prev = entry.pointers.get_prev(); |
| 247 | node = prev.map(|prev| unsafe { &*prev.as_ptr() }); |
| 248 | } |
| 249 | } |
| 250 | |
| 251 | // Atomically get a raw pointer to the value of every entry. |
| 252 | // |
| 253 | // Since this only locks the mutex once, it is not possible for a value |
| 254 | // to get moved from the idle list to the notified list during the |
| 255 | // operation, which would otherwise result in some value being listed |
| 256 | // twice. |
| 257 | let mut ptrs = Vec::with_capacity(self.len()); |
| 258 | { |
| 259 | let mut lock = self.lists.lock(); |
| 260 | |
| 261 | get_ptrs(&mut lock.idle, &mut ptrs); |
| 262 | get_ptrs(&mut lock.notified, &mut ptrs); |
| 263 | } |
| 264 | debug_assert_eq!(ptrs.len(), ptrs.capacity()); |
| 265 | |
| 266 | for ptr in ptrs { |
| 267 | // Safety: When we grabbed the pointers, the entries were in one of |
| 268 | // the two lists. This means that their value was valid at the time, |
| 269 | // and it must still be valid because we are the IdleNotifiedSet, |
| 270 | // and only we can remove an entry from the two lists. (It's |
| 271 | // possible that an entry is moved from one list to the other during |
| 272 | // this loop, but that is ok.) |
| 273 | func(unsafe { &mut *ptr }); |
| 274 | } |
| 275 | } |
| 276 | |
| 277 | /// Remove all entries in both lists, applying some function to each element. |
| 278 | /// |
| 279 | /// The closure is called on all elements even if it panics. Having it panic |
| 280 | /// twice is a double-panic, and will abort the application. |
| 281 | pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) { |
| 282 | if self.length == 0 { |
| 283 | // Fast path. |
| 284 | return; |
| 285 | } |
| 286 | self.length = 0; |
| 287 | |
| 288 | // The LinkedList is not cleared on panic, so we use a bomb to clear it. |
| 289 | // |
| 290 | // This value has the invariant that any entry in its `all_entries` list |
| 291 | // has `my_list` set to `Neither` and that the value has not yet been |
| 292 | // dropped. |
| 293 | struct AllEntries<T, F: FnMut(T)> { |
| 294 | all_entries: LinkedList<T>, |
| 295 | func: F, |
| 296 | } |
| 297 | |
| 298 | impl<T, F: FnMut(T)> AllEntries<T, F> { |
| 299 | fn pop_next(&mut self) -> bool { |
| 300 | if let Some(entry) = self.all_entries.pop_back() { |
| 301 | // Safety: We just took this value from the list, so we can |
| 302 | // destroy the value in the entry. |
| 303 | entry |
| 304 | .value |
| 305 | .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) }); |
| 306 | true |
| 307 | } else { |
| 308 | false |
| 309 | } |
| 310 | } |
| 311 | } |
| 312 | |
| 313 | impl<T, F: FnMut(T)> Drop for AllEntries<T, F> { |
| 314 | fn drop(&mut self) { |
| 315 | while self.pop_next() {} |
| 316 | } |
| 317 | } |
| 318 | |
| 319 | let mut all_entries = AllEntries { |
| 320 | all_entries: LinkedList::new(), |
| 321 | func, |
| 322 | }; |
| 323 | |
| 324 | // Atomically move all entries to the new linked list in the AllEntries |
| 325 | // object. |
| 326 | { |
| 327 | let mut lock = self.lists.lock(); |
| 328 | unsafe { |
| 329 | // Safety: We are holding the lock and `all_entries` is a new |
| 330 | // LinkedList. |
| 331 | move_to_new_list(&mut lock.idle, &mut all_entries.all_entries); |
| 332 | move_to_new_list(&mut lock.notified, &mut all_entries.all_entries); |
| 333 | } |
| 334 | } |
| 335 | |
| 336 | // Keep destroying entries in the list until it is empty. |
| 337 | // |
| 338 | // If the closure panics, then the destructor of the `AllEntries` bomb |
| 339 | // ensures that we keep running the destructor on the remaining values. |
| 340 | // A second panic will abort the program. |
| 341 | while all_entries.pop_next() {} |
| 342 | } |
| 343 | } |
| 344 | |
| 345 | /// # Safety |
| 346 | /// |
| 347 | /// The mutex for the entries must be held, and the target list must be such |
| 348 | /// that setting `my_list` to `Neither` is ok. |
| 349 | unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) { |
| 350 | while let Some(entry: Arc>) = from.pop_back() { |
| 351 | entry.my_list.with_mut(|ptr: *mut List| { |
| 352 | *ptr = List::Neither; |
| 353 | }); |
| 354 | to.push_front(val:entry); |
| 355 | } |
| 356 | } |
| 357 | |
| 358 | impl<'a, T> EntryInOneOfTheLists<'a, T> { |
| 359 | /// Remove this entry from the list it is in, returning the value associated |
| 360 | /// with the entry. |
| 361 | /// |
| 362 | /// This consumes the value, since it is no longer guaranteed to be in a |
| 363 | /// list. |
| 364 | pub(crate) fn remove(self) -> T { |
| 365 | self.set.length -= 1; |
| 366 | |
| 367 | { |
| 368 | let mut lock = self.set.lists.lock(); |
| 369 | |
| 370 | // Safety: We are holding the lock so there is no race, and we will |
| 371 | // remove the entry afterwards to uphold invariants. |
| 372 | let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe { |
| 373 | let old_my_list = *ptr; |
| 374 | *ptr = List::Neither; |
| 375 | old_my_list |
| 376 | }); |
| 377 | |
| 378 | let list = match old_my_list { |
| 379 | List::Idle => &mut lock.idle, |
| 380 | List::Notified => &mut lock.notified, |
| 381 | // An entry in one of the lists is in one of the lists. |
| 382 | List::Neither => unreachable!(), |
| 383 | }; |
| 384 | |
| 385 | unsafe { |
| 386 | // Safety: We just checked that the entry is in this particular |
| 387 | // list. |
| 388 | list.remove(ListEntry::as_raw(&self.entry)).unwrap(); |
| 389 | } |
| 390 | } |
| 391 | |
| 392 | // By setting `my_list` to `Neither`, we have taken ownership of the |
| 393 | // value. We return it to the caller. |
| 394 | // |
| 395 | // Safety: We have a mutable reference to the `IdleNotifiedSet` that |
| 396 | // owns this entry, so we can use its permission to access the value. |
| 397 | self.entry |
| 398 | .value |
| 399 | .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) }) |
| 400 | } |
| 401 | |
| 402 | /// Access the value in this entry together with a context for its waker. |
| 403 | pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U |
| 404 | where |
| 405 | F: FnOnce(&mut T, &mut Context<'_>) -> U, |
| 406 | T: 'static, |
| 407 | { |
| 408 | let waker = waker_ref(&self.entry); |
| 409 | |
| 410 | let mut context = Context::from_waker(&waker); |
| 411 | |
| 412 | // Safety: We have a mutable reference to the `IdleNotifiedSet` that |
| 413 | // owns this entry, so we can use its permission to access the value. |
| 414 | self.entry |
| 415 | .value |
| 416 | .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) }) |
| 417 | } |
| 418 | } |
| 419 | |
| 420 | impl<T> Drop for IdleNotifiedSet<T> { |
| 421 | fn drop(&mut self) { |
| 422 | // Clear both lists. |
| 423 | self.drain(func:drop); |
| 424 | |
| 425 | #[cfg (debug_assertions)] |
| 426 | if !std::thread::panicking() { |
| 427 | let lock = self.lists.lock(); |
| 428 | assert!(lock.idle.is_empty()); |
| 429 | assert!(lock.notified.is_empty()); |
| 430 | } |
| 431 | } |
| 432 | } |
| 433 | |
| 434 | impl<T: 'static> Wake for ListEntry<T> { |
| 435 | fn wake_by_ref(me: &Arc<Self>) { |
| 436 | let mut lock = me.parent.lock(); |
| 437 | |
| 438 | // Safety: We are holding the lock and we will update the lists to |
| 439 | // maintain invariants. |
| 440 | let old_my_list = me.my_list.with_mut(|ptr| unsafe { |
| 441 | let old_my_list = *ptr; |
| 442 | if old_my_list == List::Idle { |
| 443 | *ptr = List::Notified; |
| 444 | } |
| 445 | old_my_list |
| 446 | }); |
| 447 | |
| 448 | if old_my_list == List::Idle { |
| 449 | // We move ourself to the notified list. |
| 450 | let me = unsafe { |
| 451 | // Safety: We just checked that we are in this particular list. |
| 452 | lock.idle.remove(ListEntry::as_raw(me)).unwrap() |
| 453 | }; |
| 454 | lock.notified.push_front(me); |
| 455 | |
| 456 | if let Some(waker) = lock.waker.take() { |
| 457 | drop(lock); |
| 458 | waker.wake(); |
| 459 | } |
| 460 | } |
| 461 | } |
| 462 | |
| 463 | fn wake(me: Arc<Self>) { |
| 464 | Self::wake_by_ref(&me); |
| 465 | } |
| 466 | } |
| 467 | |
| 468 | /// # Safety |
| 469 | /// |
| 470 | /// `ListEntry` is forced to be !Unpin. |
| 471 | unsafe impl<T> linked_list::Link for ListEntry<T> { |
| 472 | type Handle = Arc<ListEntry<T>>; |
| 473 | type Target = ListEntry<T>; |
| 474 | |
| 475 | fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> { |
| 476 | let ptr: *const ListEntry<T> = Arc::as_ptr(this:handle); |
| 477 | // Safety: We can't get a null pointer from `Arc::as_ptr`. |
| 478 | unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) } |
| 479 | } |
| 480 | |
| 481 | unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> { |
| 482 | Arc::from_raw(ptr.as_ptr()) |
| 483 | } |
| 484 | |
| 485 | unsafe fn pointers( |
| 486 | target: NonNull<ListEntry<T>>, |
| 487 | ) -> NonNull<linked_list::Pointers<ListEntry<T>>> { |
| 488 | ListEntry::addr_of_pointers(me:target) |
| 489 | } |
| 490 | } |
| 491 | |
| 492 | #[cfg (all(test, not(loom)))] |
| 493 | mod tests { |
| 494 | use crate::runtime::Builder; |
| 495 | use crate::task::JoinSet; |
| 496 | |
| 497 | // A test that runs under miri. |
| 498 | // |
| 499 | // https://github.com/tokio-rs/tokio/pull/5693 |
| 500 | #[test ] |
| 501 | fn join_set_test() { |
| 502 | let rt = Builder::new_current_thread().build().unwrap(); |
| 503 | |
| 504 | let mut set = JoinSet::new(); |
| 505 | set.spawn_on(futures::future::ready(()), rt.handle()); |
| 506 | |
| 507 | rt.block_on(set.join_next()).unwrap().unwrap(); |
| 508 | } |
| 509 | } |
| 510 | |