1 | use crate::loom::sync::atomic::AtomicUsize; |
2 | |
3 | use std::fmt; |
4 | use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; |
5 | use std::usize; |
6 | |
7 | pub(super) struct State { |
8 | val: AtomicUsize, |
9 | } |
10 | |
11 | /// Current state value. |
12 | #[derive(Copy, Clone)] |
13 | pub(super) struct Snapshot(usize); |
14 | |
15 | type UpdateResult = Result<Snapshot, Snapshot>; |
16 | |
17 | /// The task is currently being run. |
18 | const RUNNING: usize = 0b0001; |
19 | |
20 | /// The task is complete. |
21 | /// |
22 | /// Once this bit is set, it is never unset. |
23 | const COMPLETE: usize = 0b0010; |
24 | |
25 | /// Extracts the task's lifecycle value from the state. |
26 | const LIFECYCLE_MASK: usize = 0b11; |
27 | |
28 | /// Flag tracking if the task has been pushed into a run queue. |
29 | const NOTIFIED: usize = 0b100; |
30 | |
31 | /// The join handle is still around. |
32 | #[allow (clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 |
33 | const JOIN_INTEREST: usize = 0b1_000; |
34 | |
35 | /// A join handle waker has been set. |
36 | #[allow (clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 |
37 | const JOIN_WAKER: usize = 0b10_000; |
38 | |
39 | /// The task has been forcibly cancelled. |
40 | #[allow (clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 |
41 | const CANCELLED: usize = 0b100_000; |
42 | |
43 | /// All bits. |
44 | const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED; |
45 | |
46 | /// Bits used by the ref count portion of the state. |
47 | const REF_COUNT_MASK: usize = !STATE_MASK; |
48 | |
49 | /// Number of positions to shift the ref count. |
50 | const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize; |
51 | |
52 | /// One ref count. |
53 | const REF_ONE: usize = 1 << REF_COUNT_SHIFT; |
54 | |
55 | /// State a task is initialized with. |
56 | /// |
57 | /// A task is initialized with three references: |
58 | /// |
59 | /// * A reference that will be stored in an OwnedTasks or LocalOwnedTasks. |
60 | /// * A reference that will be sent to the scheduler as an ordinary notification. |
61 | /// * A reference for the JoinHandle. |
62 | /// |
63 | /// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set. |
64 | /// As the task starts with a `Notified`, `NOTIFIED` is set. |
65 | const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED; |
66 | |
67 | #[must_use ] |
68 | pub(super) enum TransitionToRunning { |
69 | Success, |
70 | Cancelled, |
71 | Failed, |
72 | Dealloc, |
73 | } |
74 | |
75 | #[must_use ] |
76 | pub(super) enum TransitionToIdle { |
77 | Ok, |
78 | OkNotified, |
79 | OkDealloc, |
80 | Cancelled, |
81 | } |
82 | |
83 | #[must_use ] |
84 | pub(super) enum TransitionToNotifiedByVal { |
85 | DoNothing, |
86 | Submit, |
87 | Dealloc, |
88 | } |
89 | |
90 | #[must_use ] |
91 | pub(crate) enum TransitionToNotifiedByRef { |
92 | DoNothing, |
93 | Submit, |
94 | } |
95 | |
96 | /// All transitions are performed via RMW operations. This establishes an |
97 | /// unambiguous modification order. |
98 | impl State { |
99 | /// Returns a task's initial state. |
100 | pub(super) fn new() -> State { |
101 | // The raw task returned by this method has a ref-count of three. See |
102 | // the comment on INITIAL_STATE for more. |
103 | State { |
104 | val: AtomicUsize::new(INITIAL_STATE), |
105 | } |
106 | } |
107 | |
108 | /// Loads the current state, establishes `Acquire` ordering. |
109 | pub(super) fn load(&self) -> Snapshot { |
110 | Snapshot(self.val.load(Acquire)) |
111 | } |
112 | |
113 | /// Attempts to transition the lifecycle to `Running`. This sets the |
114 | /// notified bit to false so notifications during the poll can be detected. |
115 | pub(super) fn transition_to_running(&self) -> TransitionToRunning { |
116 | self.fetch_update_action(|mut next| { |
117 | let action; |
118 | assert!(next.is_notified()); |
119 | |
120 | if !next.is_idle() { |
121 | // This happens if the task is either currently running or if it |
122 | // has already completed, e.g. if it was cancelled during |
123 | // shutdown. Consume the ref-count and return. |
124 | next.ref_dec(); |
125 | if next.ref_count() == 0 { |
126 | action = TransitionToRunning::Dealloc; |
127 | } else { |
128 | action = TransitionToRunning::Failed; |
129 | } |
130 | } else { |
131 | // We are able to lock the RUNNING bit. |
132 | next.set_running(); |
133 | next.unset_notified(); |
134 | |
135 | if next.is_cancelled() { |
136 | action = TransitionToRunning::Cancelled; |
137 | } else { |
138 | action = TransitionToRunning::Success; |
139 | } |
140 | } |
141 | (action, Some(next)) |
142 | }) |
143 | } |
144 | |
145 | /// Transitions the task from `Running` -> `Idle`. |
146 | /// |
147 | /// Returns `true` if the transition to `Idle` is successful, `false` otherwise. |
148 | /// The transition to `Idle` fails if the task has been flagged to be |
149 | /// cancelled. |
150 | pub(super) fn transition_to_idle(&self) -> TransitionToIdle { |
151 | self.fetch_update_action(|curr| { |
152 | assert!(curr.is_running()); |
153 | |
154 | if curr.is_cancelled() { |
155 | return (TransitionToIdle::Cancelled, None); |
156 | } |
157 | |
158 | let mut next = curr; |
159 | let action; |
160 | next.unset_running(); |
161 | |
162 | if !next.is_notified() { |
163 | // Polling the future consumes the ref-count of the Notified. |
164 | next.ref_dec(); |
165 | if next.ref_count() == 0 { |
166 | action = TransitionToIdle::OkDealloc; |
167 | } else { |
168 | action = TransitionToIdle::Ok; |
169 | } |
170 | } else { |
171 | // The caller will schedule a new notification, so we create a |
172 | // new ref-count for the notification. Our own ref-count is kept |
173 | // for now, and the caller will drop it shortly. |
174 | next.ref_inc(); |
175 | action = TransitionToIdle::OkNotified; |
176 | } |
177 | |
178 | (action, Some(next)) |
179 | }) |
180 | } |
181 | |
182 | /// Transitions the task from `Running` -> `Complete`. |
183 | pub(super) fn transition_to_complete(&self) -> Snapshot { |
184 | const DELTA: usize = RUNNING | COMPLETE; |
185 | |
186 | let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel)); |
187 | assert!(prev.is_running()); |
188 | assert!(!prev.is_complete()); |
189 | |
190 | Snapshot(prev.0 ^ DELTA) |
191 | } |
192 | |
193 | /// Transitions from `Complete` -> `Terminal`, decrementing the reference |
194 | /// count the specified number of times. |
195 | /// |
196 | /// Returns true if the task should be deallocated. |
197 | pub(super) fn transition_to_terminal(&self, count: usize) -> bool { |
198 | let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel)); |
199 | assert!( |
200 | prev.ref_count() >= count, |
201 | "current: {}, sub: {}" , |
202 | prev.ref_count(), |
203 | count |
204 | ); |
205 | prev.ref_count() == count |
206 | } |
207 | |
208 | /// Transitions the state to `NOTIFIED`. |
209 | /// |
210 | /// If no task needs to be submitted, a ref-count is consumed. |
211 | /// |
212 | /// If a task needs to be submitted, the ref-count is incremented for the |
213 | /// new Notified. |
214 | pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal { |
215 | self.fetch_update_action(|mut snapshot| { |
216 | let action; |
217 | |
218 | if snapshot.is_running() { |
219 | // If the task is running, we mark it as notified, but we should |
220 | // not submit anything as the thread currently running the |
221 | // future is responsible for that. |
222 | snapshot.set_notified(); |
223 | snapshot.ref_dec(); |
224 | |
225 | // The thread that set the running bit also holds a ref-count. |
226 | assert!(snapshot.ref_count() > 0); |
227 | |
228 | action = TransitionToNotifiedByVal::DoNothing; |
229 | } else if snapshot.is_complete() || snapshot.is_notified() { |
230 | // We do not need to submit any notifications, but we have to |
231 | // decrement the ref-count. |
232 | snapshot.ref_dec(); |
233 | |
234 | if snapshot.ref_count() == 0 { |
235 | action = TransitionToNotifiedByVal::Dealloc; |
236 | } else { |
237 | action = TransitionToNotifiedByVal::DoNothing; |
238 | } |
239 | } else { |
240 | // We create a new notified that we can submit. The caller |
241 | // retains ownership of the ref-count they passed in. |
242 | snapshot.set_notified(); |
243 | snapshot.ref_inc(); |
244 | action = TransitionToNotifiedByVal::Submit; |
245 | } |
246 | |
247 | (action, Some(snapshot)) |
248 | }) |
249 | } |
250 | |
251 | /// Transitions the state to `NOTIFIED`. |
252 | pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef { |
253 | self.fetch_update_action(|mut snapshot| { |
254 | if snapshot.is_complete() || snapshot.is_notified() { |
255 | // There is nothing to do in this case. |
256 | (TransitionToNotifiedByRef::DoNothing, None) |
257 | } else if snapshot.is_running() { |
258 | // If the task is running, we mark it as notified, but we should |
259 | // not submit as the thread currently running the future is |
260 | // responsible for that. |
261 | snapshot.set_notified(); |
262 | (TransitionToNotifiedByRef::DoNothing, Some(snapshot)) |
263 | } else { |
264 | // The task is idle and not notified. We should submit a |
265 | // notification. |
266 | snapshot.set_notified(); |
267 | snapshot.ref_inc(); |
268 | (TransitionToNotifiedByRef::Submit, Some(snapshot)) |
269 | } |
270 | }) |
271 | } |
272 | |
273 | /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref |
274 | /// count. |
275 | /// |
276 | /// Returns `true` if the notified bit was transitioned from `0` to `1`; |
277 | /// otherwise `false.` |
278 | #[cfg (all( |
279 | tokio_unstable, |
280 | tokio_taskdump, |
281 | feature = "rt" , |
282 | target_os = "linux" , |
283 | any(target_arch = "aarch64" , target_arch = "x86" , target_arch = "x86_64" ) |
284 | ))] |
285 | pub(super) fn transition_to_notified_for_tracing(&self) -> bool { |
286 | self.fetch_update_action(|mut snapshot| { |
287 | if snapshot.is_notified() { |
288 | (false, None) |
289 | } else { |
290 | snapshot.set_notified(); |
291 | snapshot.ref_inc(); |
292 | (true, Some(snapshot)) |
293 | } |
294 | }) |
295 | } |
296 | |
297 | /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle. |
298 | /// |
299 | /// Returns `true` if the task needs to be submitted to the pool for |
300 | /// execution. |
301 | pub(super) fn transition_to_notified_and_cancel(&self) -> bool { |
302 | self.fetch_update_action(|mut snapshot| { |
303 | if snapshot.is_cancelled() || snapshot.is_complete() { |
304 | // Aborts to completed or cancelled tasks are no-ops. |
305 | (false, None) |
306 | } else if snapshot.is_running() { |
307 | // If the task is running, we mark it as cancelled. The thread |
308 | // running the task will notice the cancelled bit when it |
309 | // stops polling and it will kill the task. |
310 | // |
311 | // The set_notified() call is not strictly necessary but it will |
312 | // in some cases let a wake_by_ref call return without having |
313 | // to perform a compare_exchange. |
314 | snapshot.set_notified(); |
315 | snapshot.set_cancelled(); |
316 | (false, Some(snapshot)) |
317 | } else { |
318 | // The task is idle. We set the cancelled and notified bits and |
319 | // submit a notification if the notified bit was not already |
320 | // set. |
321 | snapshot.set_cancelled(); |
322 | if !snapshot.is_notified() { |
323 | snapshot.set_notified(); |
324 | snapshot.ref_inc(); |
325 | (true, Some(snapshot)) |
326 | } else { |
327 | (false, Some(snapshot)) |
328 | } |
329 | } |
330 | }) |
331 | } |
332 | |
333 | /// Sets the `CANCELLED` bit and attempts to transition to `Running`. |
334 | /// |
335 | /// Returns `true` if the transition to `Running` succeeded. |
336 | pub(super) fn transition_to_shutdown(&self) -> bool { |
337 | let mut prev = Snapshot(0); |
338 | |
339 | let _ = self.fetch_update(|mut snapshot| { |
340 | prev = snapshot; |
341 | |
342 | if snapshot.is_idle() { |
343 | snapshot.set_running(); |
344 | } |
345 | |
346 | // If the task was not idle, the thread currently running the task |
347 | // will notice the cancelled bit and cancel it once the poll |
348 | // completes. |
349 | snapshot.set_cancelled(); |
350 | Some(snapshot) |
351 | }); |
352 | |
353 | prev.is_idle() |
354 | } |
355 | |
356 | /// Optimistically tries to swap the state assuming the join handle is |
357 | /// __immediately__ dropped on spawn. |
358 | pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> { |
359 | use std::sync::atomic::Ordering::Relaxed; |
360 | |
361 | // Relaxed is acceptable as if this function is called and succeeds, |
362 | // then nothing has been done w/ the join handle. |
363 | // |
364 | // The moment the join handle is used (polled), the `JOIN_WAKER` flag is |
365 | // set, at which point the CAS will fail. |
366 | // |
367 | // Given this, there is no risk if this operation is reordered. |
368 | self.val |
369 | .compare_exchange_weak( |
370 | INITIAL_STATE, |
371 | (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST, |
372 | Release, |
373 | Relaxed, |
374 | ) |
375 | .map(|_| ()) |
376 | .map_err(|_| ()) |
377 | } |
378 | |
379 | /// Tries to unset the `JOIN_INTEREST` flag. |
380 | /// |
381 | /// Returns `Ok` if the operation happens before the task transitions to a |
382 | /// completed state, `Err` otherwise. |
383 | pub(super) fn unset_join_interested(&self) -> UpdateResult { |
384 | self.fetch_update(|curr| { |
385 | assert!(curr.is_join_interested()); |
386 | |
387 | if curr.is_complete() { |
388 | return None; |
389 | } |
390 | |
391 | let mut next = curr; |
392 | next.unset_join_interested(); |
393 | |
394 | Some(next) |
395 | }) |
396 | } |
397 | |
398 | /// Sets the `JOIN_WAKER` bit. |
399 | /// |
400 | /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if |
401 | /// the task has completed. |
402 | pub(super) fn set_join_waker(&self) -> UpdateResult { |
403 | self.fetch_update(|curr| { |
404 | assert!(curr.is_join_interested()); |
405 | assert!(!curr.is_join_waker_set()); |
406 | |
407 | if curr.is_complete() { |
408 | return None; |
409 | } |
410 | |
411 | let mut next = curr; |
412 | next.set_join_waker(); |
413 | |
414 | Some(next) |
415 | }) |
416 | } |
417 | |
418 | /// Unsets the `JOIN_WAKER` bit. |
419 | /// |
420 | /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if |
421 | /// the task has completed. |
422 | pub(super) fn unset_waker(&self) -> UpdateResult { |
423 | self.fetch_update(|curr| { |
424 | assert!(curr.is_join_interested()); |
425 | assert!(curr.is_join_waker_set()); |
426 | |
427 | if curr.is_complete() { |
428 | return None; |
429 | } |
430 | |
431 | let mut next = curr; |
432 | next.unset_join_waker(); |
433 | |
434 | Some(next) |
435 | }) |
436 | } |
437 | |
438 | pub(super) fn ref_inc(&self) { |
439 | use std::process; |
440 | use std::sync::atomic::Ordering::Relaxed; |
441 | |
442 | // Using a relaxed ordering is alright here, as knowledge of the |
443 | // original reference prevents other threads from erroneously deleting |
444 | // the object. |
445 | // |
446 | // As explained in the [Boost documentation][1], Increasing the |
447 | // reference counter can always be done with memory_order_relaxed: New |
448 | // references to an object can only be formed from an existing |
449 | // reference, and passing an existing reference from one thread to |
450 | // another must already provide any required synchronization. |
451 | // |
452 | // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) |
453 | let prev = self.val.fetch_add(REF_ONE, Relaxed); |
454 | |
455 | // If the reference count overflowed, abort. |
456 | if prev > isize::MAX as usize { |
457 | process::abort(); |
458 | } |
459 | } |
460 | |
461 | /// Returns `true` if the task should be released. |
462 | pub(super) fn ref_dec(&self) -> bool { |
463 | let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel)); |
464 | assert!(prev.ref_count() >= 1); |
465 | prev.ref_count() == 1 |
466 | } |
467 | |
468 | /// Returns `true` if the task should be released. |
469 | pub(super) fn ref_dec_twice(&self) -> bool { |
470 | let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel)); |
471 | assert!(prev.ref_count() >= 2); |
472 | prev.ref_count() == 2 |
473 | } |
474 | |
475 | fn fetch_update_action<F, T>(&self, mut f: F) -> T |
476 | where |
477 | F: FnMut(Snapshot) -> (T, Option<Snapshot>), |
478 | { |
479 | let mut curr = self.load(); |
480 | |
481 | loop { |
482 | let (output, next) = f(curr); |
483 | let next = match next { |
484 | Some(next) => next, |
485 | None => return output, |
486 | }; |
487 | |
488 | let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); |
489 | |
490 | match res { |
491 | Ok(_) => return output, |
492 | Err(actual) => curr = Snapshot(actual), |
493 | } |
494 | } |
495 | } |
496 | |
497 | fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> |
498 | where |
499 | F: FnMut(Snapshot) -> Option<Snapshot>, |
500 | { |
501 | let mut curr = self.load(); |
502 | |
503 | loop { |
504 | let next = match f(curr) { |
505 | Some(next) => next, |
506 | None => return Err(curr), |
507 | }; |
508 | |
509 | let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); |
510 | |
511 | match res { |
512 | Ok(_) => return Ok(next), |
513 | Err(actual) => curr = Snapshot(actual), |
514 | } |
515 | } |
516 | } |
517 | } |
518 | |
519 | // ===== impl Snapshot ===== |
520 | |
521 | impl Snapshot { |
522 | /// Returns `true` if the task is in an idle state. |
523 | pub(super) fn is_idle(self) -> bool { |
524 | self.0 & (RUNNING | COMPLETE) == 0 |
525 | } |
526 | |
527 | /// Returns `true` if the task has been flagged as notified. |
528 | pub(super) fn is_notified(self) -> bool { |
529 | self.0 & NOTIFIED == NOTIFIED |
530 | } |
531 | |
532 | fn unset_notified(&mut self) { |
533 | self.0 &= !NOTIFIED; |
534 | } |
535 | |
536 | fn set_notified(&mut self) { |
537 | self.0 |= NOTIFIED; |
538 | } |
539 | |
540 | pub(super) fn is_running(self) -> bool { |
541 | self.0 & RUNNING == RUNNING |
542 | } |
543 | |
544 | fn set_running(&mut self) { |
545 | self.0 |= RUNNING; |
546 | } |
547 | |
548 | fn unset_running(&mut self) { |
549 | self.0 &= !RUNNING; |
550 | } |
551 | |
552 | pub(super) fn is_cancelled(self) -> bool { |
553 | self.0 & CANCELLED == CANCELLED |
554 | } |
555 | |
556 | fn set_cancelled(&mut self) { |
557 | self.0 |= CANCELLED; |
558 | } |
559 | |
560 | /// Returns `true` if the task's future has completed execution. |
561 | pub(super) fn is_complete(self) -> bool { |
562 | self.0 & COMPLETE == COMPLETE |
563 | } |
564 | |
565 | pub(super) fn is_join_interested(self) -> bool { |
566 | self.0 & JOIN_INTEREST == JOIN_INTEREST |
567 | } |
568 | |
569 | fn unset_join_interested(&mut self) { |
570 | self.0 &= !JOIN_INTEREST; |
571 | } |
572 | |
573 | pub(super) fn is_join_waker_set(self) -> bool { |
574 | self.0 & JOIN_WAKER == JOIN_WAKER |
575 | } |
576 | |
577 | fn set_join_waker(&mut self) { |
578 | self.0 |= JOIN_WAKER; |
579 | } |
580 | |
581 | fn unset_join_waker(&mut self) { |
582 | self.0 &= !JOIN_WAKER; |
583 | } |
584 | |
585 | pub(super) fn ref_count(self) -> usize { |
586 | (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT |
587 | } |
588 | |
589 | fn ref_inc(&mut self) { |
590 | assert!(self.0 <= isize::MAX as usize); |
591 | self.0 += REF_ONE; |
592 | } |
593 | |
594 | pub(super) fn ref_dec(&mut self) { |
595 | assert!(self.ref_count() > 0); |
596 | self.0 -= REF_ONE; |
597 | } |
598 | } |
599 | |
600 | impl fmt::Debug for State { |
601 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
602 | let snapshot = self.load(); |
603 | snapshot.fmt(fmt) |
604 | } |
605 | } |
606 | |
607 | impl fmt::Debug for Snapshot { |
608 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
609 | fmt.debug_struct("Snapshot" ) |
610 | .field("is_running" , &self.is_running()) |
611 | .field("is_complete" , &self.is_complete()) |
612 | .field("is_notified" , &self.is_notified()) |
613 | .field("is_cancelled" , &self.is_cancelled()) |
614 | .field("is_join_interested" , &self.is_join_interested()) |
615 | .field("is_join_waker_set" , &self.is_join_waker_set()) |
616 | .field("ref_count" , &self.ref_count()) |
617 | .finish() |
618 | } |
619 | } |
620 | |