1 | //! This module defines an `IdleNotifiedSet`, which is a collection of elements. |
2 | //! Each element is intended to correspond to a task, and the collection will |
3 | //! keep track of which tasks have had their waker notified, and which have not. |
4 | //! |
5 | //! Each entry in the set holds some user-specified value. The value's type is |
6 | //! specified using the `T` parameter. It will usually be a `JoinHandle` or |
7 | //! similar. |
8 | |
9 | use std::marker::PhantomPinned; |
10 | use std::mem::ManuallyDrop; |
11 | use std::ptr::NonNull; |
12 | use std::task::{Context, Waker}; |
13 | |
14 | use crate::loom::cell::UnsafeCell; |
15 | use crate::loom::sync::{Arc, Mutex}; |
16 | use crate::util::linked_list::{self, Link}; |
17 | use crate::util::{waker_ref, Wake}; |
18 | |
19 | type LinkedList<T> = |
20 | linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>; |
21 | |
22 | /// This is the main handle to the collection. |
23 | pub(crate) struct IdleNotifiedSet<T> { |
24 | lists: Arc<Lists<T>>, |
25 | length: usize, |
26 | } |
27 | |
28 | /// A handle to an entry that is guaranteed to be stored in the idle or notified |
29 | /// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet` |
30 | /// mutably to prevent the entry from being moved to the `Neither` list, which |
31 | /// only the `IdleNotifiedSet` may do. |
32 | /// |
33 | /// The main consequence of being stored in one of the lists is that the `value` |
34 | /// field has not yet been consumed. |
35 | /// |
36 | /// Note: This entry can be moved from the idle to the notified list while this |
37 | /// object exists by waking its waker. |
38 | pub(crate) struct EntryInOneOfTheLists<'a, T> { |
39 | entry: Arc<ListEntry<T>>, |
40 | set: &'a mut IdleNotifiedSet<T>, |
41 | } |
42 | |
43 | type Lists<T> = Mutex<ListsInner<T>>; |
44 | |
45 | /// The linked lists hold strong references to the `ListEntry` items, and the |
46 | /// `ListEntry` items also hold a strong reference back to the Lists object, but |
47 | /// the destructor of the `IdleNotifiedSet` will clear the two lists, so once |
48 | /// that object is destroyed, no ref-cycles will remain. |
49 | struct ListsInner<T> { |
50 | notified: LinkedList<T>, |
51 | idle: LinkedList<T>, |
52 | /// Whenever an element in the `notified` list is woken, this waker will be |
53 | /// notified and consumed, if it exists. |
54 | waker: Option<Waker>, |
55 | } |
56 | |
57 | /// Which of the two lists in the shared Lists object is this entry stored in? |
58 | /// |
59 | /// If the value is `Idle`, then an entry's waker may move it to the notified |
60 | /// list. Otherwise, only the `IdleNotifiedSet` may move it. |
61 | /// |
62 | /// If the value is `Neither`, then it is still possible that the entry is in |
63 | /// some third external list (this happens in `drain`). |
64 | #[derive (Copy, Clone, Eq, PartialEq)] |
65 | enum List { |
66 | Notified, |
67 | Idle, |
68 | Neither, |
69 | } |
70 | |
71 | /// An entry in the list. |
72 | /// |
73 | /// # Safety |
74 | /// |
75 | /// The `my_list` field must only be accessed while holding the mutex in |
76 | /// `parent`. It is an invariant that the value of `my_list` corresponds to |
77 | /// which linked list in the `parent` holds this entry. Once this field takes |
78 | /// the value `Neither`, then it may never be modified again. |
79 | /// |
80 | /// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field |
81 | /// must only be accessed while holding the mutex. If the value of `my_list` is |
82 | /// `Neither`, then the `pointers` field may be accessed by the |
83 | /// `IdleNotifiedSet` (this happens inside `drain`). |
84 | /// |
85 | /// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed |
86 | /// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to |
87 | /// `Neither` assumes ownership of the `value`, and it must either drop it or |
88 | /// move it out from this entry to prevent it from getting leaked. (Since the |
89 | /// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the |
90 | /// value should not be leaked.) |
91 | struct ListEntry<T> { |
92 | /// The linked list pointers of the list this entry is in. |
93 | pointers: linked_list::Pointers<ListEntry<T>>, |
94 | /// Pointer to the shared `Lists` struct. |
95 | parent: Arc<Lists<T>>, |
96 | /// The value stored in this entry. |
97 | value: UnsafeCell<ManuallyDrop<T>>, |
98 | /// Used to remember which list this entry is in. |
99 | my_list: UnsafeCell<List>, |
100 | /// Required by the `linked_list::Pointers` field. |
101 | _pin: PhantomPinned, |
102 | } |
103 | |
104 | generate_addr_of_methods! { |
105 | impl<T> ListEntry<T> { |
106 | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> { |
107 | &self.pointers |
108 | } |
109 | } |
110 | } |
111 | |
112 | // With mutable access to the `IdleNotifiedSet`, you can get mutable access to |
113 | // the values. |
114 | unsafe impl<T: Send> Send for IdleNotifiedSet<T> {} |
115 | // With the current API we strictly speaking don't even need `T: Sync`, but we |
116 | // require it anyway to support adding &self APIs that access the values in the |
117 | // future. |
118 | unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {} |
119 | |
120 | // These impls control when it is safe to create a Waker. Since the waker does |
121 | // not allow access to the value in any way (including its destructor), it is |
122 | // not necessary for `T` to be Send or Sync. |
123 | unsafe impl<T> Send for ListEntry<T> {} |
124 | unsafe impl<T> Sync for ListEntry<T> {} |
125 | |
126 | impl<T> IdleNotifiedSet<T> { |
127 | /// Create a new `IdleNotifiedSet`. |
128 | pub(crate) fn new() -> Self { |
129 | let lists = Mutex::new(ListsInner { |
130 | notified: LinkedList::new(), |
131 | idle: LinkedList::new(), |
132 | waker: None, |
133 | }); |
134 | |
135 | IdleNotifiedSet { |
136 | lists: Arc::new(lists), |
137 | length: 0, |
138 | } |
139 | } |
140 | |
141 | pub(crate) fn len(&self) -> usize { |
142 | self.length |
143 | } |
144 | |
145 | pub(crate) fn is_empty(&self) -> bool { |
146 | self.length == 0 |
147 | } |
148 | |
149 | /// Insert the given value into the `idle` list. |
150 | pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> { |
151 | self.length += 1; |
152 | |
153 | let entry = Arc::new(ListEntry { |
154 | parent: self.lists.clone(), |
155 | value: UnsafeCell::new(ManuallyDrop::new(value)), |
156 | my_list: UnsafeCell::new(List::Idle), |
157 | pointers: linked_list::Pointers::new(), |
158 | _pin: PhantomPinned, |
159 | }); |
160 | |
161 | { |
162 | let mut lock = self.lists.lock(); |
163 | lock.idle.push_front(entry.clone()); |
164 | } |
165 | |
166 | // Safety: We just put the entry in the idle list, so it is in one of the lists. |
167 | EntryInOneOfTheLists { entry, set: self } |
168 | } |
169 | |
170 | /// Pop an entry from the notified list to poll it. The entry is moved to |
171 | /// the idle list atomically. |
172 | pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> { |
173 | // We don't decrement the length because this call moves the entry to |
174 | // the idle list rather than removing it. |
175 | if self.length == 0 { |
176 | // Fast path. |
177 | return None; |
178 | } |
179 | |
180 | let mut lock = self.lists.lock(); |
181 | |
182 | let should_update_waker = match lock.waker.as_mut() { |
183 | Some(cur_waker) => !waker.will_wake(cur_waker), |
184 | None => true, |
185 | }; |
186 | if should_update_waker { |
187 | lock.waker = Some(waker.clone()); |
188 | } |
189 | |
190 | // Pop the entry, returning None if empty. |
191 | let entry = lock.notified.pop_back()?; |
192 | |
193 | lock.idle.push_front(entry.clone()); |
194 | |
195 | // Safety: We are holding the lock. |
196 | entry.my_list.with_mut(|ptr| unsafe { |
197 | *ptr = List::Idle; |
198 | }); |
199 | |
200 | drop(lock); |
201 | |
202 | // Safety: We just put the entry in the idle list, so it is in one of the lists. |
203 | Some(EntryInOneOfTheLists { entry, set: self }) |
204 | } |
205 | |
206 | /// Tries to pop an entry from the notified list to poll it. The entry is moved to |
207 | /// the idle list atomically. |
208 | pub(crate) fn try_pop_notified(&mut self) -> Option<EntryInOneOfTheLists<'_, T>> { |
209 | // We don't decrement the length because this call moves the entry to |
210 | // the idle list rather than removing it. |
211 | if self.length == 0 { |
212 | // Fast path. |
213 | return None; |
214 | } |
215 | |
216 | let mut lock = self.lists.lock(); |
217 | |
218 | // Pop the entry, returning None if empty. |
219 | let entry = lock.notified.pop_back()?; |
220 | |
221 | lock.idle.push_front(entry.clone()); |
222 | |
223 | // Safety: We are holding the lock. |
224 | entry.my_list.with_mut(|ptr| unsafe { |
225 | *ptr = List::Idle; |
226 | }); |
227 | |
228 | drop(lock); |
229 | |
230 | // Safety: We just put the entry in the idle list, so it is in one of the lists. |
231 | Some(EntryInOneOfTheLists { entry, set: self }) |
232 | } |
233 | |
234 | /// Call a function on every element in this list. |
235 | pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) { |
236 | fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) { |
237 | let mut node = list.last(); |
238 | |
239 | while let Some(entry) = node { |
240 | ptrs.push(entry.value.with_mut(|ptr| { |
241 | let ptr: *mut ManuallyDrop<T> = ptr; |
242 | let ptr: *mut T = ptr.cast(); |
243 | ptr |
244 | })); |
245 | |
246 | let prev = entry.pointers.get_prev(); |
247 | node = prev.map(|prev| unsafe { &*prev.as_ptr() }); |
248 | } |
249 | } |
250 | |
251 | // Atomically get a raw pointer to the value of every entry. |
252 | // |
253 | // Since this only locks the mutex once, it is not possible for a value |
254 | // to get moved from the idle list to the notified list during the |
255 | // operation, which would otherwise result in some value being listed |
256 | // twice. |
257 | let mut ptrs = Vec::with_capacity(self.len()); |
258 | { |
259 | let mut lock = self.lists.lock(); |
260 | |
261 | get_ptrs(&mut lock.idle, &mut ptrs); |
262 | get_ptrs(&mut lock.notified, &mut ptrs); |
263 | } |
264 | debug_assert_eq!(ptrs.len(), ptrs.capacity()); |
265 | |
266 | for ptr in ptrs { |
267 | // Safety: When we grabbed the pointers, the entries were in one of |
268 | // the two lists. This means that their value was valid at the time, |
269 | // and it must still be valid because we are the IdleNotifiedSet, |
270 | // and only we can remove an entry from the two lists. (It's |
271 | // possible that an entry is moved from one list to the other during |
272 | // this loop, but that is ok.) |
273 | func(unsafe { &mut *ptr }); |
274 | } |
275 | } |
276 | |
277 | /// Remove all entries in both lists, applying some function to each element. |
278 | /// |
279 | /// The closure is called on all elements even if it panics. Having it panic |
280 | /// twice is a double-panic, and will abort the application. |
281 | pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) { |
282 | if self.length == 0 { |
283 | // Fast path. |
284 | return; |
285 | } |
286 | self.length = 0; |
287 | |
288 | // The LinkedList is not cleared on panic, so we use a bomb to clear it. |
289 | // |
290 | // This value has the invariant that any entry in its `all_entries` list |
291 | // has `my_list` set to `Neither` and that the value has not yet been |
292 | // dropped. |
293 | struct AllEntries<T, F: FnMut(T)> { |
294 | all_entries: LinkedList<T>, |
295 | func: F, |
296 | } |
297 | |
298 | impl<T, F: FnMut(T)> AllEntries<T, F> { |
299 | fn pop_next(&mut self) -> bool { |
300 | if let Some(entry) = self.all_entries.pop_back() { |
301 | // Safety: We just took this value from the list, so we can |
302 | // destroy the value in the entry. |
303 | entry |
304 | .value |
305 | .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) }); |
306 | true |
307 | } else { |
308 | false |
309 | } |
310 | } |
311 | } |
312 | |
313 | impl<T, F: FnMut(T)> Drop for AllEntries<T, F> { |
314 | fn drop(&mut self) { |
315 | while self.pop_next() {} |
316 | } |
317 | } |
318 | |
319 | let mut all_entries = AllEntries { |
320 | all_entries: LinkedList::new(), |
321 | func, |
322 | }; |
323 | |
324 | // Atomically move all entries to the new linked list in the AllEntries |
325 | // object. |
326 | { |
327 | let mut lock = self.lists.lock(); |
328 | unsafe { |
329 | // Safety: We are holding the lock and `all_entries` is a new |
330 | // LinkedList. |
331 | move_to_new_list(&mut lock.idle, &mut all_entries.all_entries); |
332 | move_to_new_list(&mut lock.notified, &mut all_entries.all_entries); |
333 | } |
334 | } |
335 | |
336 | // Keep destroying entries in the list until it is empty. |
337 | // |
338 | // If the closure panics, then the destructor of the `AllEntries` bomb |
339 | // ensures that we keep running the destructor on the remaining values. |
340 | // A second panic will abort the program. |
341 | while all_entries.pop_next() {} |
342 | } |
343 | } |
344 | |
345 | /// # Safety |
346 | /// |
347 | /// The mutex for the entries must be held, and the target list must be such |
348 | /// that setting `my_list` to `Neither` is ok. |
349 | unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) { |
350 | while let Some(entry: Arc>) = from.pop_back() { |
351 | entry.my_list.with_mut(|ptr: *mut List| { |
352 | *ptr = List::Neither; |
353 | }); |
354 | to.push_front(val:entry); |
355 | } |
356 | } |
357 | |
358 | impl<'a, T> EntryInOneOfTheLists<'a, T> { |
359 | /// Remove this entry from the list it is in, returning the value associated |
360 | /// with the entry. |
361 | /// |
362 | /// This consumes the value, since it is no longer guaranteed to be in a |
363 | /// list. |
364 | pub(crate) fn remove(self) -> T { |
365 | self.set.length -= 1; |
366 | |
367 | { |
368 | let mut lock = self.set.lists.lock(); |
369 | |
370 | // Safety: We are holding the lock so there is no race, and we will |
371 | // remove the entry afterwards to uphold invariants. |
372 | let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe { |
373 | let old_my_list = *ptr; |
374 | *ptr = List::Neither; |
375 | old_my_list |
376 | }); |
377 | |
378 | let list = match old_my_list { |
379 | List::Idle => &mut lock.idle, |
380 | List::Notified => &mut lock.notified, |
381 | // An entry in one of the lists is in one of the lists. |
382 | List::Neither => unreachable!(), |
383 | }; |
384 | |
385 | unsafe { |
386 | // Safety: We just checked that the entry is in this particular |
387 | // list. |
388 | list.remove(ListEntry::as_raw(&self.entry)).unwrap(); |
389 | } |
390 | } |
391 | |
392 | // By setting `my_list` to `Neither`, we have taken ownership of the |
393 | // value. We return it to the caller. |
394 | // |
395 | // Safety: We have a mutable reference to the `IdleNotifiedSet` that |
396 | // owns this entry, so we can use its permission to access the value. |
397 | self.entry |
398 | .value |
399 | .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) }) |
400 | } |
401 | |
402 | /// Access the value in this entry together with a context for its waker. |
403 | pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U |
404 | where |
405 | F: FnOnce(&mut T, &mut Context<'_>) -> U, |
406 | T: 'static, |
407 | { |
408 | let waker = waker_ref(&self.entry); |
409 | |
410 | let mut context = Context::from_waker(&waker); |
411 | |
412 | // Safety: We have a mutable reference to the `IdleNotifiedSet` that |
413 | // owns this entry, so we can use its permission to access the value. |
414 | self.entry |
415 | .value |
416 | .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) }) |
417 | } |
418 | } |
419 | |
420 | impl<T> Drop for IdleNotifiedSet<T> { |
421 | fn drop(&mut self) { |
422 | // Clear both lists. |
423 | self.drain(func:drop); |
424 | |
425 | #[cfg (debug_assertions)] |
426 | if !std::thread::panicking() { |
427 | let lock: MutexGuard<'_, ListsInner<…>> = self.lists.lock(); |
428 | assert!(lock.idle.is_empty()); |
429 | assert!(lock.notified.is_empty()); |
430 | } |
431 | } |
432 | } |
433 | |
434 | impl<T: 'static> Wake for ListEntry<T> { |
435 | fn wake_by_ref(me: &Arc<Self>) { |
436 | let mut lock = me.parent.lock(); |
437 | |
438 | // Safety: We are holding the lock and we will update the lists to |
439 | // maintain invariants. |
440 | let old_my_list = me.my_list.with_mut(|ptr| unsafe { |
441 | let old_my_list = *ptr; |
442 | if old_my_list == List::Idle { |
443 | *ptr = List::Notified; |
444 | } |
445 | old_my_list |
446 | }); |
447 | |
448 | if old_my_list == List::Idle { |
449 | // We move ourself to the notified list. |
450 | let me = unsafe { |
451 | // Safety: We just checked that we are in this particular list. |
452 | lock.idle.remove(ListEntry::as_raw(me)).unwrap() |
453 | }; |
454 | lock.notified.push_front(me); |
455 | |
456 | if let Some(waker) = lock.waker.take() { |
457 | drop(lock); |
458 | waker.wake(); |
459 | } |
460 | } |
461 | } |
462 | |
463 | fn wake(me: Arc<Self>) { |
464 | Self::wake_by_ref(&me); |
465 | } |
466 | } |
467 | |
468 | /// # Safety |
469 | /// |
470 | /// `ListEntry` is forced to be !Unpin. |
471 | unsafe impl<T> linked_list::Link for ListEntry<T> { |
472 | type Handle = Arc<ListEntry<T>>; |
473 | type Target = ListEntry<T>; |
474 | |
475 | fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> { |
476 | let ptr: *const ListEntry<T> = Arc::as_ptr(this:handle); |
477 | // Safety: We can't get a null pointer from `Arc::as_ptr`. |
478 | unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) } |
479 | } |
480 | |
481 | unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> { |
482 | Arc::from_raw(ptr.as_ptr()) |
483 | } |
484 | |
485 | unsafe fn pointers( |
486 | target: NonNull<ListEntry<T>>, |
487 | ) -> NonNull<linked_list::Pointers<ListEntry<T>>> { |
488 | ListEntry::addr_of_pointers(me:target) |
489 | } |
490 | } |
491 | |
492 | #[cfg (all(test, not(loom)))] |
493 | mod tests { |
494 | use crate::runtime::Builder; |
495 | use crate::task::JoinSet; |
496 | |
497 | // A test that runs under miri. |
498 | // |
499 | // https://github.com/tokio-rs/tokio/pull/5693 |
500 | #[test ] |
501 | fn join_set_test() { |
502 | let rt = Builder::new_current_thread().build().unwrap(); |
503 | |
504 | let mut set = JoinSet::new(); |
505 | set.spawn_on(futures::future::ready(()), rt.handle()); |
506 | |
507 | rt.block_on(set.join_next()).unwrap().unwrap(); |
508 | } |
509 | } |
510 | |