1//! This module defines an `IdleNotifiedSet`, which is a collection of elements.
2//! Each element is intended to correspond to a task, and the collection will
3//! keep track of which tasks have had their waker notified, and which have not.
4//!
5//! Each entry in the set holds some user-specified value. The value's type is
6//! specified using the `T` parameter. It will usually be a `JoinHandle` or
7//! similar.
8
9use std::marker::PhantomPinned;
10use std::mem::ManuallyDrop;
11use std::ptr::NonNull;
12use std::task::{Context, Waker};
13
14use crate::loom::cell::UnsafeCell;
15use crate::loom::sync::{Arc, Mutex};
16use crate::util::linked_list::{self, Link};
17use crate::util::{waker_ref, Wake};
18
19type LinkedList<T> =
20 linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>;
21
22/// This is the main handle to the collection.
23pub(crate) struct IdleNotifiedSet<T> {
24 lists: Arc<Lists<T>>,
25 length: usize,
26}
27
28/// A handle to an entry that is guaranteed to be stored in the idle or notified
29/// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet`
30/// mutably to prevent the entry from being moved to the `Neither` list, which
31/// only the `IdleNotifiedSet` may do.
32///
33/// The main consequence of being stored in one of the lists is that the `value`
34/// field has not yet been consumed.
35///
36/// Note: This entry can be moved from the idle to the notified list while this
37/// object exists by waking its waker.
38pub(crate) struct EntryInOneOfTheLists<'a, T> {
39 entry: Arc<ListEntry<T>>,
40 set: &'a mut IdleNotifiedSet<T>,
41}
42
43type Lists<T> = Mutex<ListsInner<T>>;
44
45/// The linked lists hold strong references to the ListEntry items, and the
46/// ListEntry items also hold a strong reference back to the Lists object, but
47/// the destructor of the `IdleNotifiedSet` will clear the two lists, so once
48/// that object is destroyed, no ref-cycles will remain.
49struct ListsInner<T> {
50 notified: LinkedList<T>,
51 idle: LinkedList<T>,
52 /// Whenever an element in the `notified` list is woken, this waker will be
53 /// notified and consumed, if it exists.
54 waker: Option<Waker>,
55}
56
57/// Which of the two lists in the shared Lists object is this entry stored in?
58///
59/// If the value is `Idle`, then an entry's waker may move it to the notified
60/// list. Otherwise, only the `IdleNotifiedSet` may move it.
61///
62/// If the value is `Neither`, then it is still possible that the entry is in
63/// some third external list (this happens in `drain`).
64#[derive(Copy, Clone, Eq, PartialEq)]
65enum List {
66 Notified,
67 Idle,
68 Neither,
69}
70
71/// An entry in the list.
72///
73/// # Safety
74///
75/// The `my_list` field must only be accessed while holding the mutex in
76/// `parent`. It is an invariant that the value of `my_list` corresponds to
77/// which linked list in the `parent` holds this entry. Once this field takes
78/// the value `Neither`, then it may never be modified again.
79///
80/// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field
81/// must only be accessed while holding the mutex. If the value of `my_list` is
82/// `Neither`, then the `pointers` field may be accessed by the
83/// `IdleNotifiedSet` (this happens inside `drain`).
84///
85/// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed
86/// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to
87/// `Neither` assumes ownership of the `value`, and it must either drop it or
88/// move it out from this entry to prevent it from getting leaked. (Since the
89/// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the
90/// value should not be leaked.)
91struct ListEntry<T> {
92 /// The linked list pointers of the list this entry is in.
93 pointers: linked_list::Pointers<ListEntry<T>>,
94 /// Pointer to the shared `Lists` struct.
95 parent: Arc<Lists<T>>,
96 /// The value stored in this entry.
97 value: UnsafeCell<ManuallyDrop<T>>,
98 /// Used to remember which list this entry is in.
99 my_list: UnsafeCell<List>,
100 /// Required by the `linked_list::Pointers` field.
101 _pin: PhantomPinned,
102}
103
104generate_addr_of_methods! {
105 impl<T> ListEntry<T> {
106 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
107 &self.pointers
108 }
109 }
110}
111
112// With mutable access to the `IdleNotifiedSet`, you can get mutable access to
113// the values.
114unsafe impl<T: Send> Send for IdleNotifiedSet<T> {}
115// With the current API we strictly speaking don't even need `T: Sync`, but we
116// require it anyway to support adding &self APIs that access the values in the
117// future.
118unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {}
119
120// These impls control when it is safe to create a Waker. Since the waker does
121// not allow access to the value in any way (including its destructor), it is
122// not necessary for `T` to be Send or Sync.
123unsafe impl<T> Send for ListEntry<T> {}
124unsafe impl<T> Sync for ListEntry<T> {}
125
126impl<T> IdleNotifiedSet<T> {
127 /// Create a new `IdleNotifiedSet`.
128 pub(crate) fn new() -> Self {
129 let lists = Mutex::new(ListsInner {
130 notified: LinkedList::new(),
131 idle: LinkedList::new(),
132 waker: None,
133 });
134
135 IdleNotifiedSet {
136 lists: Arc::new(lists),
137 length: 0,
138 }
139 }
140
141 pub(crate) fn len(&self) -> usize {
142 self.length
143 }
144
145 pub(crate) fn is_empty(&self) -> bool {
146 self.length == 0
147 }
148
149 /// Insert the given value into the `idle` list.
150 pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> {
151 self.length += 1;
152
153 let entry = Arc::new(ListEntry {
154 parent: self.lists.clone(),
155 value: UnsafeCell::new(ManuallyDrop::new(value)),
156 my_list: UnsafeCell::new(List::Idle),
157 pointers: linked_list::Pointers::new(),
158 _pin: PhantomPinned,
159 });
160
161 {
162 let mut lock = self.lists.lock();
163 lock.idle.push_front(entry.clone());
164 }
165
166 // Safety: We just put the entry in the idle list, so it is in one of the lists.
167 EntryInOneOfTheLists { entry, set: self }
168 }
169
170 /// Pop an entry from the notified list to poll it. The entry is moved to
171 /// the idle list atomically.
172 pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> {
173 // We don't decrement the length because this call moves the entry to
174 // the idle list rather than removing it.
175 if self.length == 0 {
176 // Fast path.
177 return None;
178 }
179
180 let mut lock = self.lists.lock();
181
182 let should_update_waker = match lock.waker.as_mut() {
183 Some(cur_waker) => !waker.will_wake(cur_waker),
184 None => true,
185 };
186 if should_update_waker {
187 lock.waker = Some(waker.clone());
188 }
189
190 // Pop the entry, returning None if empty.
191 let entry = lock.notified.pop_back()?;
192
193 lock.idle.push_front(entry.clone());
194
195 // Safety: We are holding the lock.
196 entry.my_list.with_mut(|ptr| unsafe {
197 *ptr = List::Idle;
198 });
199
200 drop(lock);
201
202 // Safety: We just put the entry in the idle list, so it is in one of the lists.
203 Some(EntryInOneOfTheLists { entry, set: self })
204 }
205
206 /// Call a function on every element in this list.
207 pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) {
208 fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) {
209 let mut node = list.last();
210
211 while let Some(entry) = node {
212 ptrs.push(entry.value.with_mut(|ptr| {
213 let ptr: *mut ManuallyDrop<T> = ptr;
214 let ptr: *mut T = ptr.cast();
215 ptr
216 }));
217
218 let prev = entry.pointers.get_prev();
219 node = prev.map(|prev| unsafe { &*prev.as_ptr() });
220 }
221 }
222
223 // Atomically get a raw pointer to the value of every entry.
224 //
225 // Since this only locks the mutex once, it is not possible for a value
226 // to get moved from the idle list to the notified list during the
227 // operation, which would otherwise result in some value being listed
228 // twice.
229 let mut ptrs = Vec::with_capacity(self.len());
230 {
231 let mut lock = self.lists.lock();
232
233 get_ptrs(&mut lock.idle, &mut ptrs);
234 get_ptrs(&mut lock.notified, &mut ptrs);
235 }
236 debug_assert_eq!(ptrs.len(), ptrs.capacity());
237
238 for ptr in ptrs {
239 // Safety: When we grabbed the pointers, the entries were in one of
240 // the two lists. This means that their value was valid at the time,
241 // and it must still be valid because we are the IdleNotifiedSet,
242 // and only we can remove an entry from the two lists. (It's
243 // possible that an entry is moved from one list to the other during
244 // this loop, but that is ok.)
245 func(unsafe { &mut *ptr });
246 }
247 }
248
249 /// Remove all entries in both lists, applying some function to each element.
250 ///
251 /// The closure is called on all elements even if it panics. Having it panic
252 /// twice is a double-panic, and will abort the application.
253 pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) {
254 if self.length == 0 {
255 // Fast path.
256 return;
257 }
258 self.length = 0;
259
260 // The LinkedList is not cleared on panic, so we use a bomb to clear it.
261 //
262 // This value has the invariant that any entry in its `all_entries` list
263 // has `my_list` set to `Neither` and that the value has not yet been
264 // dropped.
265 struct AllEntries<T, F: FnMut(T)> {
266 all_entries: LinkedList<T>,
267 func: F,
268 }
269
270 impl<T, F: FnMut(T)> AllEntries<T, F> {
271 fn pop_next(&mut self) -> bool {
272 if let Some(entry) = self.all_entries.pop_back() {
273 // Safety: We just took this value from the list, so we can
274 // destroy the value in the entry.
275 entry
276 .value
277 .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) });
278 true
279 } else {
280 false
281 }
282 }
283 }
284
285 impl<T, F: FnMut(T)> Drop for AllEntries<T, F> {
286 fn drop(&mut self) {
287 while self.pop_next() {}
288 }
289 }
290
291 let mut all_entries = AllEntries {
292 all_entries: LinkedList::new(),
293 func,
294 };
295
296 // Atomically move all entries to the new linked list in the AllEntries
297 // object.
298 {
299 let mut lock = self.lists.lock();
300 unsafe {
301 // Safety: We are holding the lock and `all_entries` is a new
302 // LinkedList.
303 move_to_new_list(&mut lock.idle, &mut all_entries.all_entries);
304 move_to_new_list(&mut lock.notified, &mut all_entries.all_entries);
305 }
306 }
307
308 // Keep destroying entries in the list until it is empty.
309 //
310 // If the closure panics, then the destructor of the `AllEntries` bomb
311 // ensures that we keep running the destructor on the remaining values.
312 // A second panic will abort the program.
313 while all_entries.pop_next() {}
314 }
315}
316
317/// # Safety
318///
319/// The mutex for the entries must be held, and the target list must be such
320/// that setting `my_list` to `Neither` is ok.
321unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) {
322 while let Some(entry) = from.pop_back() {
323 entry.my_list.with_mut(|ptr| {
324 *ptr = List::Neither;
325 });
326 to.push_front(entry);
327 }
328}
329
330impl<'a, T> EntryInOneOfTheLists<'a, T> {
331 /// Remove this entry from the list it is in, returning the value associated
332 /// with the entry.
333 ///
334 /// This consumes the value, since it is no longer guaranteed to be in a
335 /// list.
336 pub(crate) fn remove(self) -> T {
337 self.set.length -= 1;
338
339 {
340 let mut lock = self.set.lists.lock();
341
342 // Safety: We are holding the lock so there is no race, and we will
343 // remove the entry afterwards to uphold invariants.
344 let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe {
345 let old_my_list = *ptr;
346 *ptr = List::Neither;
347 old_my_list
348 });
349
350 let list = match old_my_list {
351 List::Idle => &mut lock.idle,
352 List::Notified => &mut lock.notified,
353 // An entry in one of the lists is in one of the lists.
354 List::Neither => unreachable!(),
355 };
356
357 unsafe {
358 // Safety: We just checked that the entry is in this particular
359 // list.
360 list.remove(ListEntry::as_raw(&self.entry)).unwrap();
361 }
362 }
363
364 // By setting `my_list` to `Neither`, we have taken ownership of the
365 // value. We return it to the caller.
366 //
367 // Safety: We have a mutable reference to the `IdleNotifiedSet` that
368 // owns this entry, so we can use its permission to access the value.
369 self.entry
370 .value
371 .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) })
372 }
373
374 /// Access the value in this entry together with a context for its waker.
375 pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U
376 where
377 F: FnOnce(&mut T, &mut Context<'_>) -> U,
378 T: 'static,
379 {
380 let waker = waker_ref(&self.entry);
381
382 let mut context = Context::from_waker(&waker);
383
384 // Safety: We have a mutable reference to the `IdleNotifiedSet` that
385 // owns this entry, so we can use its permission to access the value.
386 self.entry
387 .value
388 .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) })
389 }
390}
391
392impl<T> Drop for IdleNotifiedSet<T> {
393 fn drop(&mut self) {
394 // Clear both lists.
395 self.drain(drop);
396
397 #[cfg(debug_assertions)]
398 if !std::thread::panicking() {
399 let lock = self.lists.lock();
400 assert!(lock.idle.is_empty());
401 assert!(lock.notified.is_empty());
402 }
403 }
404}
405
406impl<T: 'static> Wake for ListEntry<T> {
407 fn wake_by_ref(me: &Arc<Self>) {
408 let mut lock = me.parent.lock();
409
410 // Safety: We are holding the lock and we will update the lists to
411 // maintain invariants.
412 let old_my_list = me.my_list.with_mut(|ptr| unsafe {
413 let old_my_list = *ptr;
414 if old_my_list == List::Idle {
415 *ptr = List::Notified;
416 }
417 old_my_list
418 });
419
420 if old_my_list == List::Idle {
421 // We move ourself to the notified list.
422 let me = unsafe {
423 // Safety: We just checked that we are in this particular list.
424 lock.idle.remove(ListEntry::as_raw(me)).unwrap()
425 };
426 lock.notified.push_front(me);
427
428 if let Some(waker) = lock.waker.take() {
429 drop(lock);
430 waker.wake();
431 }
432 }
433 }
434
435 fn wake(me: Arc<Self>) {
436 Self::wake_by_ref(&me);
437 }
438}
439
440/// # Safety
441///
442/// `ListEntry` is forced to be !Unpin.
443unsafe impl<T> linked_list::Link for ListEntry<T> {
444 type Handle = Arc<ListEntry<T>>;
445 type Target = ListEntry<T>;
446
447 fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> {
448 let ptr: *const ListEntry<T> = Arc::as_ptr(handle);
449 // Safety: We can't get a null pointer from `Arc::as_ptr`.
450 unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) }
451 }
452
453 unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> {
454 Arc::from_raw(ptr.as_ptr())
455 }
456
457 unsafe fn pointers(
458 target: NonNull<ListEntry<T>>,
459 ) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
460 ListEntry::addr_of_pointers(target)
461 }
462}
463
464#[cfg(all(test, not(loom)))]
465mod tests {
466 use crate::runtime::Builder;
467 use crate::task::JoinSet;
468
469 // A test that runs under miri.
470 //
471 // https://github.com/tokio-rs/tokio/pull/5693
472 #[test]
473 fn join_set_test() {
474 let rt = Builder::new_current_thread().build().unwrap();
475
476 let mut set = JoinSet::new();
477 set.spawn_on(futures::future::ready(()), rt.handle());
478
479 rt.block_on(set.join_next()).unwrap().unwrap();
480 }
481}
482