1 | //! This module defines an `IdleNotifiedSet`, which is a collection of elements. |
2 | //! Each element is intended to correspond to a task, and the collection will |
3 | //! keep track of which tasks have had their waker notified, and which have not. |
4 | //! |
5 | //! Each entry in the set holds some user-specified value. The value's type is |
6 | //! specified using the `T` parameter. It will usually be a `JoinHandle` or |
7 | //! similar. |
8 | |
9 | use std::marker::PhantomPinned; |
10 | use std::mem::ManuallyDrop; |
11 | use std::ptr::NonNull; |
12 | use std::task::{Context, Waker}; |
13 | |
14 | use crate::loom::cell::UnsafeCell; |
15 | use crate::loom::sync::{Arc, Mutex}; |
16 | use crate::util::linked_list::{self, Link}; |
17 | use crate::util::{waker_ref, Wake}; |
18 | |
19 | type LinkedList<T> = |
20 | linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>; |
21 | |
22 | /// This is the main handle to the collection. |
23 | pub(crate) struct IdleNotifiedSet<T> { |
24 | lists: Arc<Lists<T>>, |
25 | length: usize, |
26 | } |
27 | |
28 | /// A handle to an entry that is guaranteed to be stored in the idle or notified |
29 | /// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet` |
30 | /// mutably to prevent the entry from being moved to the `Neither` list, which |
31 | /// only the `IdleNotifiedSet` may do. |
32 | /// |
33 | /// The main consequence of being stored in one of the lists is that the `value` |
34 | /// field has not yet been consumed. |
35 | /// |
36 | /// Note: This entry can be moved from the idle to the notified list while this |
37 | /// object exists by waking its waker. |
38 | pub(crate) struct EntryInOneOfTheLists<'a, T> { |
39 | entry: Arc<ListEntry<T>>, |
40 | set: &'a mut IdleNotifiedSet<T>, |
41 | } |
42 | |
43 | type Lists<T> = Mutex<ListsInner<T>>; |
44 | |
45 | /// The linked lists hold strong references to the ListEntry items, and the |
46 | /// ListEntry items also hold a strong reference back to the Lists object, but |
47 | /// the destructor of the `IdleNotifiedSet` will clear the two lists, so once |
48 | /// that object is destroyed, no ref-cycles will remain. |
49 | struct ListsInner<T> { |
50 | notified: LinkedList<T>, |
51 | idle: LinkedList<T>, |
52 | /// Whenever an element in the `notified` list is woken, this waker will be |
53 | /// notified and consumed, if it exists. |
54 | waker: Option<Waker>, |
55 | } |
56 | |
57 | /// Which of the two lists in the shared Lists object is this entry stored in? |
58 | /// |
59 | /// If the value is `Idle`, then an entry's waker may move it to the notified |
60 | /// list. Otherwise, only the `IdleNotifiedSet` may move it. |
61 | /// |
62 | /// If the value is `Neither`, then it is still possible that the entry is in |
63 | /// some third external list (this happens in `drain`). |
64 | #[derive(Copy, Clone, Eq, PartialEq)] |
65 | enum List { |
66 | Notified, |
67 | Idle, |
68 | Neither, |
69 | } |
70 | |
71 | /// An entry in the list. |
72 | /// |
73 | /// # Safety |
74 | /// |
75 | /// The `my_list` field must only be accessed while holding the mutex in |
76 | /// `parent`. It is an invariant that the value of `my_list` corresponds to |
77 | /// which linked list in the `parent` holds this entry. Once this field takes |
78 | /// the value `Neither`, then it may never be modified again. |
79 | /// |
80 | /// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field |
81 | /// must only be accessed while holding the mutex. If the value of `my_list` is |
82 | /// `Neither`, then the `pointers` field may be accessed by the |
83 | /// `IdleNotifiedSet` (this happens inside `drain`). |
84 | /// |
85 | /// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed |
86 | /// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to |
87 | /// `Neither` assumes ownership of the `value`, and it must either drop it or |
88 | /// move it out from this entry to prevent it from getting leaked. (Since the |
89 | /// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the |
90 | /// value should not be leaked.) |
91 | struct ListEntry<T> { |
92 | /// The linked list pointers of the list this entry is in. |
93 | pointers: linked_list::Pointers<ListEntry<T>>, |
94 | /// Pointer to the shared `Lists` struct. |
95 | parent: Arc<Lists<T>>, |
96 | /// The value stored in this entry. |
97 | value: UnsafeCell<ManuallyDrop<T>>, |
98 | /// Used to remember which list this entry is in. |
99 | my_list: UnsafeCell<List>, |
100 | /// Required by the `linked_list::Pointers` field. |
101 | _pin: PhantomPinned, |
102 | } |
103 | |
104 | generate_addr_of_methods! { |
105 | impl<T> ListEntry<T> { |
106 | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> { |
107 | &self.pointers |
108 | } |
109 | } |
110 | } |
111 | |
112 | // With mutable access to the `IdleNotifiedSet`, you can get mutable access to |
113 | // the values. |
114 | unsafe impl<T: Send> Send for IdleNotifiedSet<T> {} |
115 | // With the current API we strictly speaking don't even need `T: Sync`, but we |
116 | // require it anyway to support adding &self APIs that access the values in the |
117 | // future. |
118 | unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {} |
119 | |
120 | // These impls control when it is safe to create a Waker. Since the waker does |
121 | // not allow access to the value in any way (including its destructor), it is |
122 | // not necessary for `T` to be Send or Sync. |
123 | unsafe impl<T> Send for ListEntry<T> {} |
124 | unsafe impl<T> Sync for ListEntry<T> {} |
125 | |
126 | impl<T> IdleNotifiedSet<T> { |
127 | /// Create a new `IdleNotifiedSet`. |
128 | pub(crate) fn new() -> Self { |
129 | let lists = Mutex::new(ListsInner { |
130 | notified: LinkedList::new(), |
131 | idle: LinkedList::new(), |
132 | waker: None, |
133 | }); |
134 | |
135 | IdleNotifiedSet { |
136 | lists: Arc::new(lists), |
137 | length: 0, |
138 | } |
139 | } |
140 | |
141 | pub(crate) fn len(&self) -> usize { |
142 | self.length |
143 | } |
144 | |
145 | pub(crate) fn is_empty(&self) -> bool { |
146 | self.length == 0 |
147 | } |
148 | |
149 | /// Insert the given value into the `idle` list. |
150 | pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> { |
151 | self.length += 1; |
152 | |
153 | let entry = Arc::new(ListEntry { |
154 | parent: self.lists.clone(), |
155 | value: UnsafeCell::new(ManuallyDrop::new(value)), |
156 | my_list: UnsafeCell::new(List::Idle), |
157 | pointers: linked_list::Pointers::new(), |
158 | _pin: PhantomPinned, |
159 | }); |
160 | |
161 | { |
162 | let mut lock = self.lists.lock(); |
163 | lock.idle.push_front(entry.clone()); |
164 | } |
165 | |
166 | // Safety: We just put the entry in the idle list, so it is in one of the lists. |
167 | EntryInOneOfTheLists { entry, set: self } |
168 | } |
169 | |
170 | /// Pop an entry from the notified list to poll it. The entry is moved to |
171 | /// the idle list atomically. |
172 | pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> { |
173 | // We don't decrement the length because this call moves the entry to |
174 | // the idle list rather than removing it. |
175 | if self.length == 0 { |
176 | // Fast path. |
177 | return None; |
178 | } |
179 | |
180 | let mut lock = self.lists.lock(); |
181 | |
182 | let should_update_waker = match lock.waker.as_mut() { |
183 | Some(cur_waker) => !waker.will_wake(cur_waker), |
184 | None => true, |
185 | }; |
186 | if should_update_waker { |
187 | lock.waker = Some(waker.clone()); |
188 | } |
189 | |
190 | // Pop the entry, returning None if empty. |
191 | let entry = lock.notified.pop_back()?; |
192 | |
193 | lock.idle.push_front(entry.clone()); |
194 | |
195 | // Safety: We are holding the lock. |
196 | entry.my_list.with_mut(|ptr| unsafe { |
197 | *ptr = List::Idle; |
198 | }); |
199 | |
200 | drop(lock); |
201 | |
202 | // Safety: We just put the entry in the idle list, so it is in one of the lists. |
203 | Some(EntryInOneOfTheLists { entry, set: self }) |
204 | } |
205 | |
206 | /// Call a function on every element in this list. |
207 | pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) { |
208 | fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) { |
209 | let mut node = list.last(); |
210 | |
211 | while let Some(entry) = node { |
212 | ptrs.push(entry.value.with_mut(|ptr| { |
213 | let ptr: *mut ManuallyDrop<T> = ptr; |
214 | let ptr: *mut T = ptr.cast(); |
215 | ptr |
216 | })); |
217 | |
218 | let prev = entry.pointers.get_prev(); |
219 | node = prev.map(|prev| unsafe { &*prev.as_ptr() }); |
220 | } |
221 | } |
222 | |
223 | // Atomically get a raw pointer to the value of every entry. |
224 | // |
225 | // Since this only locks the mutex once, it is not possible for a value |
226 | // to get moved from the idle list to the notified list during the |
227 | // operation, which would otherwise result in some value being listed |
228 | // twice. |
229 | let mut ptrs = Vec::with_capacity(self.len()); |
230 | { |
231 | let mut lock = self.lists.lock(); |
232 | |
233 | get_ptrs(&mut lock.idle, &mut ptrs); |
234 | get_ptrs(&mut lock.notified, &mut ptrs); |
235 | } |
236 | debug_assert_eq!(ptrs.len(), ptrs.capacity()); |
237 | |
238 | for ptr in ptrs { |
239 | // Safety: When we grabbed the pointers, the entries were in one of |
240 | // the two lists. This means that their value was valid at the time, |
241 | // and it must still be valid because we are the IdleNotifiedSet, |
242 | // and only we can remove an entry from the two lists. (It's |
243 | // possible that an entry is moved from one list to the other during |
244 | // this loop, but that is ok.) |
245 | func(unsafe { &mut *ptr }); |
246 | } |
247 | } |
248 | |
249 | /// Remove all entries in both lists, applying some function to each element. |
250 | /// |
251 | /// The closure is called on all elements even if it panics. Having it panic |
252 | /// twice is a double-panic, and will abort the application. |
253 | pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) { |
254 | if self.length == 0 { |
255 | // Fast path. |
256 | return; |
257 | } |
258 | self.length = 0; |
259 | |
260 | // The LinkedList is not cleared on panic, so we use a bomb to clear it. |
261 | // |
262 | // This value has the invariant that any entry in its `all_entries` list |
263 | // has `my_list` set to `Neither` and that the value has not yet been |
264 | // dropped. |
265 | struct AllEntries<T, F: FnMut(T)> { |
266 | all_entries: LinkedList<T>, |
267 | func: F, |
268 | } |
269 | |
270 | impl<T, F: FnMut(T)> AllEntries<T, F> { |
271 | fn pop_next(&mut self) -> bool { |
272 | if let Some(entry) = self.all_entries.pop_back() { |
273 | // Safety: We just took this value from the list, so we can |
274 | // destroy the value in the entry. |
275 | entry |
276 | .value |
277 | .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) }); |
278 | true |
279 | } else { |
280 | false |
281 | } |
282 | } |
283 | } |
284 | |
285 | impl<T, F: FnMut(T)> Drop for AllEntries<T, F> { |
286 | fn drop(&mut self) { |
287 | while self.pop_next() {} |
288 | } |
289 | } |
290 | |
291 | let mut all_entries = AllEntries { |
292 | all_entries: LinkedList::new(), |
293 | func, |
294 | }; |
295 | |
296 | // Atomically move all entries to the new linked list in the AllEntries |
297 | // object. |
298 | { |
299 | let mut lock = self.lists.lock(); |
300 | unsafe { |
301 | // Safety: We are holding the lock and `all_entries` is a new |
302 | // LinkedList. |
303 | move_to_new_list(&mut lock.idle, &mut all_entries.all_entries); |
304 | move_to_new_list(&mut lock.notified, &mut all_entries.all_entries); |
305 | } |
306 | } |
307 | |
308 | // Keep destroying entries in the list until it is empty. |
309 | // |
310 | // If the closure panics, then the destructor of the `AllEntries` bomb |
311 | // ensures that we keep running the destructor on the remaining values. |
312 | // A second panic will abort the program. |
313 | while all_entries.pop_next() {} |
314 | } |
315 | } |
316 | |
317 | /// # Safety |
318 | /// |
319 | /// The mutex for the entries must be held, and the target list must be such |
320 | /// that setting `my_list` to `Neither` is ok. |
321 | unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) { |
322 | while let Some(entry) = from.pop_back() { |
323 | entry.my_list.with_mut(|ptr| { |
324 | *ptr = List::Neither; |
325 | }); |
326 | to.push_front(entry); |
327 | } |
328 | } |
329 | |
330 | impl<'a, T> EntryInOneOfTheLists<'a, T> { |
331 | /// Remove this entry from the list it is in, returning the value associated |
332 | /// with the entry. |
333 | /// |
334 | /// This consumes the value, since it is no longer guaranteed to be in a |
335 | /// list. |
336 | pub(crate) fn remove(self) -> T { |
337 | self.set.length -= 1; |
338 | |
339 | { |
340 | let mut lock = self.set.lists.lock(); |
341 | |
342 | // Safety: We are holding the lock so there is no race, and we will |
343 | // remove the entry afterwards to uphold invariants. |
344 | let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe { |
345 | let old_my_list = *ptr; |
346 | *ptr = List::Neither; |
347 | old_my_list |
348 | }); |
349 | |
350 | let list = match old_my_list { |
351 | List::Idle => &mut lock.idle, |
352 | List::Notified => &mut lock.notified, |
353 | // An entry in one of the lists is in one of the lists. |
354 | List::Neither => unreachable!(), |
355 | }; |
356 | |
357 | unsafe { |
358 | // Safety: We just checked that the entry is in this particular |
359 | // list. |
360 | list.remove(ListEntry::as_raw(&self.entry)).unwrap(); |
361 | } |
362 | } |
363 | |
364 | // By setting `my_list` to `Neither`, we have taken ownership of the |
365 | // value. We return it to the caller. |
366 | // |
367 | // Safety: We have a mutable reference to the `IdleNotifiedSet` that |
368 | // owns this entry, so we can use its permission to access the value. |
369 | self.entry |
370 | .value |
371 | .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) }) |
372 | } |
373 | |
374 | /// Access the value in this entry together with a context for its waker. |
375 | pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U |
376 | where |
377 | F: FnOnce(&mut T, &mut Context<'_>) -> U, |
378 | T: 'static, |
379 | { |
380 | let waker = waker_ref(&self.entry); |
381 | |
382 | let mut context = Context::from_waker(&waker); |
383 | |
384 | // Safety: We have a mutable reference to the `IdleNotifiedSet` that |
385 | // owns this entry, so we can use its permission to access the value. |
386 | self.entry |
387 | .value |
388 | .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) }) |
389 | } |
390 | } |
391 | |
392 | impl<T> Drop for IdleNotifiedSet<T> { |
393 | fn drop(&mut self) { |
394 | // Clear both lists. |
395 | self.drain(drop); |
396 | |
397 | #[cfg (debug_assertions)] |
398 | if !std::thread::panicking() { |
399 | let lock = self.lists.lock(); |
400 | assert!(lock.idle.is_empty()); |
401 | assert!(lock.notified.is_empty()); |
402 | } |
403 | } |
404 | } |
405 | |
406 | impl<T: 'static> Wake for ListEntry<T> { |
407 | fn wake_by_ref(me: &Arc<Self>) { |
408 | let mut lock = me.parent.lock(); |
409 | |
410 | // Safety: We are holding the lock and we will update the lists to |
411 | // maintain invariants. |
412 | let old_my_list = me.my_list.with_mut(|ptr| unsafe { |
413 | let old_my_list = *ptr; |
414 | if old_my_list == List::Idle { |
415 | *ptr = List::Notified; |
416 | } |
417 | old_my_list |
418 | }); |
419 | |
420 | if old_my_list == List::Idle { |
421 | // We move ourself to the notified list. |
422 | let me = unsafe { |
423 | // Safety: We just checked that we are in this particular list. |
424 | lock.idle.remove(ListEntry::as_raw(me)).unwrap() |
425 | }; |
426 | lock.notified.push_front(me); |
427 | |
428 | if let Some(waker) = lock.waker.take() { |
429 | drop(lock); |
430 | waker.wake(); |
431 | } |
432 | } |
433 | } |
434 | |
435 | fn wake(me: Arc<Self>) { |
436 | Self::wake_by_ref(&me); |
437 | } |
438 | } |
439 | |
440 | /// # Safety |
441 | /// |
442 | /// `ListEntry` is forced to be !Unpin. |
443 | unsafe impl<T> linked_list::Link for ListEntry<T> { |
444 | type Handle = Arc<ListEntry<T>>; |
445 | type Target = ListEntry<T>; |
446 | |
447 | fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> { |
448 | let ptr: *const ListEntry<T> = Arc::as_ptr(handle); |
449 | // Safety: We can't get a null pointer from `Arc::as_ptr`. |
450 | unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) } |
451 | } |
452 | |
453 | unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> { |
454 | Arc::from_raw(ptr.as_ptr()) |
455 | } |
456 | |
457 | unsafe fn pointers( |
458 | target: NonNull<ListEntry<T>>, |
459 | ) -> NonNull<linked_list::Pointers<ListEntry<T>>> { |
460 | ListEntry::addr_of_pointers(target) |
461 | } |
462 | } |
463 | |
464 | #[cfg (all(test, not(loom)))] |
465 | mod tests { |
466 | use crate::runtime::Builder; |
467 | use crate::task::JoinSet; |
468 | |
469 | // A test that runs under miri. |
470 | // |
471 | // https://github.com/tokio-rs/tokio/pull/5693 |
472 | #[test] |
473 | fn join_set_test() { |
474 | let rt = Builder::new_current_thread().build().unwrap(); |
475 | |
476 | let mut set = JoinSet::new(); |
477 | set.spawn_on(futures::future::ready(()), rt.handle()); |
478 | |
479 | rt.block_on(set.join_next()).unwrap().unwrap(); |
480 | } |
481 | } |
482 | |