1//! This module defines an `IdleNotifiedSet`, which is a collection of elements.
2//! Each element is intended to correspond to a task, and the collection will
3//! keep track of which tasks have had their waker notified, and which have not.
4//!
5//! Each entry in the set holds some user-specified value. The value's type is
6//! specified using the `T` parameter. It will usually be a `JoinHandle` or
7//! similar.
8
9use std::marker::PhantomPinned;
10use std::mem::ManuallyDrop;
11use std::ptr::NonNull;
12use std::task::{Context, Waker};
13
14use crate::loom::cell::UnsafeCell;
15use crate::loom::sync::{Arc, Mutex};
16use crate::util::linked_list::{self, Link};
17use crate::util::{waker_ref, Wake};
18
19type LinkedList<T> =
20 linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>;
21
22/// This is the main handle to the collection.
23pub(crate) struct IdleNotifiedSet<T> {
24 lists: Arc<Lists<T>>,
25 length: usize,
26}
27
28/// A handle to an entry that is guaranteed to be stored in the idle or notified
29/// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet`
30/// mutably to prevent the entry from being moved to the `Neither` list, which
31/// only the `IdleNotifiedSet` may do.
32///
33/// The main consequence of being stored in one of the lists is that the `value`
34/// field has not yet been consumed.
35///
36/// Note: This entry can be moved from the idle to the notified list while this
37/// object exists by waking its waker.
38pub(crate) struct EntryInOneOfTheLists<'a, T> {
39 entry: Arc<ListEntry<T>>,
40 set: &'a mut IdleNotifiedSet<T>,
41}
42
43type Lists<T> = Mutex<ListsInner<T>>;
44
45/// The linked lists hold strong references to the `ListEntry` items, and the
46/// `ListEntry` items also hold a strong reference back to the Lists object, but
47/// the destructor of the `IdleNotifiedSet` will clear the two lists, so once
48/// that object is destroyed, no ref-cycles will remain.
49struct ListsInner<T> {
50 notified: LinkedList<T>,
51 idle: LinkedList<T>,
52 /// Whenever an element in the `notified` list is woken, this waker will be
53 /// notified and consumed, if it exists.
54 waker: Option<Waker>,
55}
56
57/// Which of the two lists in the shared Lists object is this entry stored in?
58///
59/// If the value is `Idle`, then an entry's waker may move it to the notified
60/// list. Otherwise, only the `IdleNotifiedSet` may move it.
61///
62/// If the value is `Neither`, then it is still possible that the entry is in
63/// some third external list (this happens in `drain`).
64#[derive(Copy, Clone, Eq, PartialEq)]
65enum List {
66 Notified,
67 Idle,
68 Neither,
69}
70
71/// An entry in the list.
72///
73/// # Safety
74///
75/// The `my_list` field must only be accessed while holding the mutex in
76/// `parent`. It is an invariant that the value of `my_list` corresponds to
77/// which linked list in the `parent` holds this entry. Once this field takes
78/// the value `Neither`, then it may never be modified again.
79///
80/// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field
81/// must only be accessed while holding the mutex. If the value of `my_list` is
82/// `Neither`, then the `pointers` field may be accessed by the
83/// `IdleNotifiedSet` (this happens inside `drain`).
84///
85/// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed
86/// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to
87/// `Neither` assumes ownership of the `value`, and it must either drop it or
88/// move it out from this entry to prevent it from getting leaked. (Since the
89/// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the
90/// value should not be leaked.)
91struct ListEntry<T> {
92 /// The linked list pointers of the list this entry is in.
93 pointers: linked_list::Pointers<ListEntry<T>>,
94 /// Pointer to the shared `Lists` struct.
95 parent: Arc<Lists<T>>,
96 /// The value stored in this entry.
97 value: UnsafeCell<ManuallyDrop<T>>,
98 /// Used to remember which list this entry is in.
99 my_list: UnsafeCell<List>,
100 /// Required by the `linked_list::Pointers` field.
101 _pin: PhantomPinned,
102}
103
104generate_addr_of_methods! {
105 impl<T> ListEntry<T> {
106 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
107 &self.pointers
108 }
109 }
110}
111
112// With mutable access to the `IdleNotifiedSet`, you can get mutable access to
113// the values.
114unsafe impl<T: Send> Send for IdleNotifiedSet<T> {}
115// With the current API we strictly speaking don't even need `T: Sync`, but we
116// require it anyway to support adding &self APIs that access the values in the
117// future.
118unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {}
119
120// These impls control when it is safe to create a Waker. Since the waker does
121// not allow access to the value in any way (including its destructor), it is
122// not necessary for `T` to be Send or Sync.
123unsafe impl<T> Send for ListEntry<T> {}
124unsafe impl<T> Sync for ListEntry<T> {}
125
126impl<T> IdleNotifiedSet<T> {
127 /// Create a new `IdleNotifiedSet`.
128 pub(crate) fn new() -> Self {
129 let lists = Mutex::new(ListsInner {
130 notified: LinkedList::new(),
131 idle: LinkedList::new(),
132 waker: None,
133 });
134
135 IdleNotifiedSet {
136 lists: Arc::new(lists),
137 length: 0,
138 }
139 }
140
141 pub(crate) fn len(&self) -> usize {
142 self.length
143 }
144
145 pub(crate) fn is_empty(&self) -> bool {
146 self.length == 0
147 }
148
149 /// Insert the given value into the `idle` list.
150 pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> {
151 self.length += 1;
152
153 let entry = Arc::new(ListEntry {
154 parent: self.lists.clone(),
155 value: UnsafeCell::new(ManuallyDrop::new(value)),
156 my_list: UnsafeCell::new(List::Idle),
157 pointers: linked_list::Pointers::new(),
158 _pin: PhantomPinned,
159 });
160
161 {
162 let mut lock = self.lists.lock();
163 lock.idle.push_front(entry.clone());
164 }
165
166 // Safety: We just put the entry in the idle list, so it is in one of the lists.
167 EntryInOneOfTheLists { entry, set: self }
168 }
169
170 /// Pop an entry from the notified list to poll it. The entry is moved to
171 /// the idle list atomically.
172 pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> {
173 // We don't decrement the length because this call moves the entry to
174 // the idle list rather than removing it.
175 if self.length == 0 {
176 // Fast path.
177 return None;
178 }
179
180 let mut lock = self.lists.lock();
181
182 let should_update_waker = match lock.waker.as_mut() {
183 Some(cur_waker) => !waker.will_wake(cur_waker),
184 None => true,
185 };
186 if should_update_waker {
187 lock.waker = Some(waker.clone());
188 }
189
190 // Pop the entry, returning None if empty.
191 let entry = lock.notified.pop_back()?;
192
193 lock.idle.push_front(entry.clone());
194
195 // Safety: We are holding the lock.
196 entry.my_list.with_mut(|ptr| unsafe {
197 *ptr = List::Idle;
198 });
199
200 drop(lock);
201
202 // Safety: We just put the entry in the idle list, so it is in one of the lists.
203 Some(EntryInOneOfTheLists { entry, set: self })
204 }
205
206 /// Tries to pop an entry from the notified list to poll it. The entry is moved to
207 /// the idle list atomically.
208 pub(crate) fn try_pop_notified(&mut self) -> Option<EntryInOneOfTheLists<'_, T>> {
209 // We don't decrement the length because this call moves the entry to
210 // the idle list rather than removing it.
211 if self.length == 0 {
212 // Fast path.
213 return None;
214 }
215
216 let mut lock = self.lists.lock();
217
218 // Pop the entry, returning None if empty.
219 let entry = lock.notified.pop_back()?;
220
221 lock.idle.push_front(entry.clone());
222
223 // Safety: We are holding the lock.
224 entry.my_list.with_mut(|ptr| unsafe {
225 *ptr = List::Idle;
226 });
227
228 drop(lock);
229
230 // Safety: We just put the entry in the idle list, so it is in one of the lists.
231 Some(EntryInOneOfTheLists { entry, set: self })
232 }
233
234 /// Call a function on every element in this list.
235 pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) {
236 fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) {
237 let mut node = list.last();
238
239 while let Some(entry) = node {
240 ptrs.push(entry.value.with_mut(|ptr| {
241 let ptr: *mut ManuallyDrop<T> = ptr;
242 let ptr: *mut T = ptr.cast();
243 ptr
244 }));
245
246 let prev = entry.pointers.get_prev();
247 node = prev.map(|prev| unsafe { &*prev.as_ptr() });
248 }
249 }
250
251 // Atomically get a raw pointer to the value of every entry.
252 //
253 // Since this only locks the mutex once, it is not possible for a value
254 // to get moved from the idle list to the notified list during the
255 // operation, which would otherwise result in some value being listed
256 // twice.
257 let mut ptrs = Vec::with_capacity(self.len());
258 {
259 let mut lock = self.lists.lock();
260
261 get_ptrs(&mut lock.idle, &mut ptrs);
262 get_ptrs(&mut lock.notified, &mut ptrs);
263 }
264 debug_assert_eq!(ptrs.len(), ptrs.capacity());
265
266 for ptr in ptrs {
267 // Safety: When we grabbed the pointers, the entries were in one of
268 // the two lists. This means that their value was valid at the time,
269 // and it must still be valid because we are the IdleNotifiedSet,
270 // and only we can remove an entry from the two lists. (It's
271 // possible that an entry is moved from one list to the other during
272 // this loop, but that is ok.)
273 func(unsafe { &mut *ptr });
274 }
275 }
276
277 /// Remove all entries in both lists, applying some function to each element.
278 ///
279 /// The closure is called on all elements even if it panics. Having it panic
280 /// twice is a double-panic, and will abort the application.
281 pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) {
282 if self.length == 0 {
283 // Fast path.
284 return;
285 }
286 self.length = 0;
287
288 // The LinkedList is not cleared on panic, so we use a bomb to clear it.
289 //
290 // This value has the invariant that any entry in its `all_entries` list
291 // has `my_list` set to `Neither` and that the value has not yet been
292 // dropped.
293 struct AllEntries<T, F: FnMut(T)> {
294 all_entries: LinkedList<T>,
295 func: F,
296 }
297
298 impl<T, F: FnMut(T)> AllEntries<T, F> {
299 fn pop_next(&mut self) -> bool {
300 if let Some(entry) = self.all_entries.pop_back() {
301 // Safety: We just took this value from the list, so we can
302 // destroy the value in the entry.
303 entry
304 .value
305 .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) });
306 true
307 } else {
308 false
309 }
310 }
311 }
312
313 impl<T, F: FnMut(T)> Drop for AllEntries<T, F> {
314 fn drop(&mut self) {
315 while self.pop_next() {}
316 }
317 }
318
319 let mut all_entries = AllEntries {
320 all_entries: LinkedList::new(),
321 func,
322 };
323
324 // Atomically move all entries to the new linked list in the AllEntries
325 // object.
326 {
327 let mut lock = self.lists.lock();
328 unsafe {
329 // Safety: We are holding the lock and `all_entries` is a new
330 // LinkedList.
331 move_to_new_list(&mut lock.idle, &mut all_entries.all_entries);
332 move_to_new_list(&mut lock.notified, &mut all_entries.all_entries);
333 }
334 }
335
336 // Keep destroying entries in the list until it is empty.
337 //
338 // If the closure panics, then the destructor of the `AllEntries` bomb
339 // ensures that we keep running the destructor on the remaining values.
340 // A second panic will abort the program.
341 while all_entries.pop_next() {}
342 }
343}
344
345/// # Safety
346///
347/// The mutex for the entries must be held, and the target list must be such
348/// that setting `my_list` to `Neither` is ok.
349unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) {
350 while let Some(entry: Arc>) = from.pop_back() {
351 entry.my_list.with_mut(|ptr: *mut List| {
352 *ptr = List::Neither;
353 });
354 to.push_front(val:entry);
355 }
356}
357
358impl<'a, T> EntryInOneOfTheLists<'a, T> {
359 /// Remove this entry from the list it is in, returning the value associated
360 /// with the entry.
361 ///
362 /// This consumes the value, since it is no longer guaranteed to be in a
363 /// list.
364 pub(crate) fn remove(self) -> T {
365 self.set.length -= 1;
366
367 {
368 let mut lock = self.set.lists.lock();
369
370 // Safety: We are holding the lock so there is no race, and we will
371 // remove the entry afterwards to uphold invariants.
372 let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe {
373 let old_my_list = *ptr;
374 *ptr = List::Neither;
375 old_my_list
376 });
377
378 let list = match old_my_list {
379 List::Idle => &mut lock.idle,
380 List::Notified => &mut lock.notified,
381 // An entry in one of the lists is in one of the lists.
382 List::Neither => unreachable!(),
383 };
384
385 unsafe {
386 // Safety: We just checked that the entry is in this particular
387 // list.
388 list.remove(ListEntry::as_raw(&self.entry)).unwrap();
389 }
390 }
391
392 // By setting `my_list` to `Neither`, we have taken ownership of the
393 // value. We return it to the caller.
394 //
395 // Safety: We have a mutable reference to the `IdleNotifiedSet` that
396 // owns this entry, so we can use its permission to access the value.
397 self.entry
398 .value
399 .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) })
400 }
401
402 /// Access the value in this entry together with a context for its waker.
403 pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U
404 where
405 F: FnOnce(&mut T, &mut Context<'_>) -> U,
406 T: 'static,
407 {
408 let waker = waker_ref(&self.entry);
409
410 let mut context = Context::from_waker(&waker);
411
412 // Safety: We have a mutable reference to the `IdleNotifiedSet` that
413 // owns this entry, so we can use its permission to access the value.
414 self.entry
415 .value
416 .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) })
417 }
418}
419
420impl<T> Drop for IdleNotifiedSet<T> {
421 fn drop(&mut self) {
422 // Clear both lists.
423 self.drain(func:drop);
424
425 #[cfg(debug_assertions)]
426 if !std::thread::panicking() {
427 let lock: MutexGuard<'_, ListsInner<…>> = self.lists.lock();
428 assert!(lock.idle.is_empty());
429 assert!(lock.notified.is_empty());
430 }
431 }
432}
433
434impl<T: 'static> Wake for ListEntry<T> {
435 fn wake_by_ref(me: &Arc<Self>) {
436 let mut lock = me.parent.lock();
437
438 // Safety: We are holding the lock and we will update the lists to
439 // maintain invariants.
440 let old_my_list = me.my_list.with_mut(|ptr| unsafe {
441 let old_my_list = *ptr;
442 if old_my_list == List::Idle {
443 *ptr = List::Notified;
444 }
445 old_my_list
446 });
447
448 if old_my_list == List::Idle {
449 // We move ourself to the notified list.
450 let me = unsafe {
451 // Safety: We just checked that we are in this particular list.
452 lock.idle.remove(ListEntry::as_raw(me)).unwrap()
453 };
454 lock.notified.push_front(me);
455
456 if let Some(waker) = lock.waker.take() {
457 drop(lock);
458 waker.wake();
459 }
460 }
461 }
462
463 fn wake(me: Arc<Self>) {
464 Self::wake_by_ref(&me);
465 }
466}
467
468/// # Safety
469///
470/// `ListEntry` is forced to be !Unpin.
471unsafe impl<T> linked_list::Link for ListEntry<T> {
472 type Handle = Arc<ListEntry<T>>;
473 type Target = ListEntry<T>;
474
475 fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> {
476 let ptr: *const ListEntry<T> = Arc::as_ptr(this:handle);
477 // Safety: We can't get a null pointer from `Arc::as_ptr`.
478 unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) }
479 }
480
481 unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> {
482 Arc::from_raw(ptr.as_ptr())
483 }
484
485 unsafe fn pointers(
486 target: NonNull<ListEntry<T>>,
487 ) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
488 ListEntry::addr_of_pointers(me:target)
489 }
490}
491
492#[cfg(all(test, not(loom)))]
493mod tests {
494 use crate::runtime::Builder;
495 use crate::task::JoinSet;
496
497 // A test that runs under miri.
498 //
499 // https://github.com/tokio-rs/tokio/pull/5693
500 #[test]
501 fn join_set_test() {
502 let rt = Builder::new_current_thread().build().unwrap();
503
504 let mut set = JoinSet::new();
505 set.spawn_on(futures::future::ready(()), rt.handle());
506
507 rt.block_on(set.join_next()).unwrap().unwrap();
508 }
509}
510