1use crate::loom::sync::atomic::AtomicUsize;
2
3use std::fmt;
4use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5use std::usize;
6
7pub(super) struct State {
8 val: AtomicUsize,
9}
10
11/// Current state value.
12#[derive(Copy, Clone)]
13pub(super) struct Snapshot(usize);
14
15type UpdateResult = Result<Snapshot, Snapshot>;
16
17/// The task is currently being run.
18const RUNNING: usize = 0b0001;
19
20/// The task is complete.
21///
22/// Once this bit is set, it is never unset.
23const COMPLETE: usize = 0b0010;
24
25/// Extracts the task's lifecycle value from the state.
26const LIFECYCLE_MASK: usize = 0b11;
27
28/// Flag tracking if the task has been pushed into a run queue.
29const NOTIFIED: usize = 0b100;
30
31/// The join handle is still around.
32#[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
33const JOIN_INTEREST: usize = 0b1_000;
34
35/// A join handle waker has been set.
36#[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
37const JOIN_WAKER: usize = 0b10_000;
38
39/// The task has been forcibly cancelled.
40#[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
41const CANCELLED: usize = 0b100_000;
42
43/// All bits.
44const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
45
46/// Bits used by the ref count portion of the state.
47const REF_COUNT_MASK: usize = !STATE_MASK;
48
49/// Number of positions to shift the ref count.
50const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
51
52/// One ref count.
53const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
54
55/// State a task is initialized with.
56///
57/// A task is initialized with three references:
58///
59/// * A reference that will be stored in an OwnedTasks or LocalOwnedTasks.
60/// * A reference that will be sent to the scheduler as an ordinary notification.
61/// * A reference for the JoinHandle.
62///
63/// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set.
64/// As the task starts with a `Notified`, `NOTIFIED` is set.
65const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED;
66
67#[must_use]
68pub(super) enum TransitionToRunning {
69 Success,
70 Cancelled,
71 Failed,
72 Dealloc,
73}
74
75#[must_use]
76pub(super) enum TransitionToIdle {
77 Ok,
78 OkNotified,
79 OkDealloc,
80 Cancelled,
81}
82
83#[must_use]
84pub(super) enum TransitionToNotifiedByVal {
85 DoNothing,
86 Submit,
87 Dealloc,
88}
89
90#[must_use]
91pub(crate) enum TransitionToNotifiedByRef {
92 DoNothing,
93 Submit,
94}
95
96/// All transitions are performed via RMW operations. This establishes an
97/// unambiguous modification order.
98impl State {
99 /// Returns a task's initial state.
100 pub(super) fn new() -> State {
101 // The raw task returned by this method has a ref-count of three. See
102 // the comment on INITIAL_STATE for more.
103 State {
104 val: AtomicUsize::new(INITIAL_STATE),
105 }
106 }
107
108 /// Loads the current state, establishes `Acquire` ordering.
109 pub(super) fn load(&self) -> Snapshot {
110 Snapshot(self.val.load(Acquire))
111 }
112
113 /// Attempts to transition the lifecycle to `Running`. This sets the
114 /// notified bit to false so notifications during the poll can be detected.
115 pub(super) fn transition_to_running(&self) -> TransitionToRunning {
116 self.fetch_update_action(|mut next| {
117 let action;
118 assert!(next.is_notified());
119
120 if !next.is_idle() {
121 // This happens if the task is either currently running or if it
122 // has already completed, e.g. if it was cancelled during
123 // shutdown. Consume the ref-count and return.
124 next.ref_dec();
125 if next.ref_count() == 0 {
126 action = TransitionToRunning::Dealloc;
127 } else {
128 action = TransitionToRunning::Failed;
129 }
130 } else {
131 // We are able to lock the RUNNING bit.
132 next.set_running();
133 next.unset_notified();
134
135 if next.is_cancelled() {
136 action = TransitionToRunning::Cancelled;
137 } else {
138 action = TransitionToRunning::Success;
139 }
140 }
141 (action, Some(next))
142 })
143 }
144
145 /// Transitions the task from `Running` -> `Idle`.
146 ///
147 /// Returns `true` if the transition to `Idle` is successful, `false` otherwise.
148 /// The transition to `Idle` fails if the task has been flagged to be
149 /// cancelled.
150 pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
151 self.fetch_update_action(|curr| {
152 assert!(curr.is_running());
153
154 if curr.is_cancelled() {
155 return (TransitionToIdle::Cancelled, None);
156 }
157
158 let mut next = curr;
159 let action;
160 next.unset_running();
161
162 if !next.is_notified() {
163 // Polling the future consumes the ref-count of the Notified.
164 next.ref_dec();
165 if next.ref_count() == 0 {
166 action = TransitionToIdle::OkDealloc;
167 } else {
168 action = TransitionToIdle::Ok;
169 }
170 } else {
171 // The caller will schedule a new notification, so we create a
172 // new ref-count for the notification. Our own ref-count is kept
173 // for now, and the caller will drop it shortly.
174 next.ref_inc();
175 action = TransitionToIdle::OkNotified;
176 }
177
178 (action, Some(next))
179 })
180 }
181
182 /// Transitions the task from `Running` -> `Complete`.
183 pub(super) fn transition_to_complete(&self) -> Snapshot {
184 const DELTA: usize = RUNNING | COMPLETE;
185
186 let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
187 assert!(prev.is_running());
188 assert!(!prev.is_complete());
189
190 Snapshot(prev.0 ^ DELTA)
191 }
192
193 /// Transitions from `Complete` -> `Terminal`, decrementing the reference
194 /// count the specified number of times.
195 ///
196 /// Returns true if the task should be deallocated.
197 pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
198 let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
199 assert!(
200 prev.ref_count() >= count,
201 "current: {}, sub: {}",
202 prev.ref_count(),
203 count
204 );
205 prev.ref_count() == count
206 }
207
208 /// Transitions the state to `NOTIFIED`.
209 ///
210 /// If no task needs to be submitted, a ref-count is consumed.
211 ///
212 /// If a task needs to be submitted, the ref-count is incremented for the
213 /// new Notified.
214 pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal {
215 self.fetch_update_action(|mut snapshot| {
216 let action;
217
218 if snapshot.is_running() {
219 // If the task is running, we mark it as notified, but we should
220 // not submit anything as the thread currently running the
221 // future is responsible for that.
222 snapshot.set_notified();
223 snapshot.ref_dec();
224
225 // The thread that set the running bit also holds a ref-count.
226 assert!(snapshot.ref_count() > 0);
227
228 action = TransitionToNotifiedByVal::DoNothing;
229 } else if snapshot.is_complete() || snapshot.is_notified() {
230 // We do not need to submit any notifications, but we have to
231 // decrement the ref-count.
232 snapshot.ref_dec();
233
234 if snapshot.ref_count() == 0 {
235 action = TransitionToNotifiedByVal::Dealloc;
236 } else {
237 action = TransitionToNotifiedByVal::DoNothing;
238 }
239 } else {
240 // We create a new notified that we can submit. The caller
241 // retains ownership of the ref-count they passed in.
242 snapshot.set_notified();
243 snapshot.ref_inc();
244 action = TransitionToNotifiedByVal::Submit;
245 }
246
247 (action, Some(snapshot))
248 })
249 }
250
251 /// Transitions the state to `NOTIFIED`.
252 pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef {
253 self.fetch_update_action(|mut snapshot| {
254 if snapshot.is_complete() || snapshot.is_notified() {
255 // There is nothing to do in this case.
256 (TransitionToNotifiedByRef::DoNothing, None)
257 } else if snapshot.is_running() {
258 // If the task is running, we mark it as notified, but we should
259 // not submit as the thread currently running the future is
260 // responsible for that.
261 snapshot.set_notified();
262 (TransitionToNotifiedByRef::DoNothing, Some(snapshot))
263 } else {
264 // The task is idle and not notified. We should submit a
265 // notification.
266 snapshot.set_notified();
267 snapshot.ref_inc();
268 (TransitionToNotifiedByRef::Submit, Some(snapshot))
269 }
270 })
271 }
272
273 /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref
274 /// count.
275 ///
276 /// Returns `true` if the notified bit was transitioned from `0` to `1`;
277 /// otherwise `false.`
278 #[cfg(all(
279 tokio_unstable,
280 tokio_taskdump,
281 feature = "rt",
282 target_os = "linux",
283 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
284 ))]
285 pub(super) fn transition_to_notified_for_tracing(&self) -> bool {
286 self.fetch_update_action(|mut snapshot| {
287 if snapshot.is_notified() {
288 (false, None)
289 } else {
290 snapshot.set_notified();
291 snapshot.ref_inc();
292 (true, Some(snapshot))
293 }
294 })
295 }
296
297 /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
298 ///
299 /// Returns `true` if the task needs to be submitted to the pool for
300 /// execution.
301 pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
302 self.fetch_update_action(|mut snapshot| {
303 if snapshot.is_cancelled() || snapshot.is_complete() {
304 // Aborts to completed or cancelled tasks are no-ops.
305 (false, None)
306 } else if snapshot.is_running() {
307 // If the task is running, we mark it as cancelled. The thread
308 // running the task will notice the cancelled bit when it
309 // stops polling and it will kill the task.
310 //
311 // The set_notified() call is not strictly necessary but it will
312 // in some cases let a wake_by_ref call return without having
313 // to perform a compare_exchange.
314 snapshot.set_notified();
315 snapshot.set_cancelled();
316 (false, Some(snapshot))
317 } else {
318 // The task is idle. We set the cancelled and notified bits and
319 // submit a notification if the notified bit was not already
320 // set.
321 snapshot.set_cancelled();
322 if !snapshot.is_notified() {
323 snapshot.set_notified();
324 snapshot.ref_inc();
325 (true, Some(snapshot))
326 } else {
327 (false, Some(snapshot))
328 }
329 }
330 })
331 }
332
333 /// Sets the `CANCELLED` bit and attempts to transition to `Running`.
334 ///
335 /// Returns `true` if the transition to `Running` succeeded.
336 pub(super) fn transition_to_shutdown(&self) -> bool {
337 let mut prev = Snapshot(0);
338
339 let _ = self.fetch_update(|mut snapshot| {
340 prev = snapshot;
341
342 if snapshot.is_idle() {
343 snapshot.set_running();
344 }
345
346 // If the task was not idle, the thread currently running the task
347 // will notice the cancelled bit and cancel it once the poll
348 // completes.
349 snapshot.set_cancelled();
350 Some(snapshot)
351 });
352
353 prev.is_idle()
354 }
355
356 /// Optimistically tries to swap the state assuming the join handle is
357 /// __immediately__ dropped on spawn.
358 pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
359 use std::sync::atomic::Ordering::Relaxed;
360
361 // Relaxed is acceptable as if this function is called and succeeds,
362 // then nothing has been done w/ the join handle.
363 //
364 // The moment the join handle is used (polled), the `JOIN_WAKER` flag is
365 // set, at which point the CAS will fail.
366 //
367 // Given this, there is no risk if this operation is reordered.
368 self.val
369 .compare_exchange_weak(
370 INITIAL_STATE,
371 (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
372 Release,
373 Relaxed,
374 )
375 .map(|_| ())
376 .map_err(|_| ())
377 }
378
379 /// Tries to unset the `JOIN_INTEREST` flag.
380 ///
381 /// Returns `Ok` if the operation happens before the task transitions to a
382 /// completed state, `Err` otherwise.
383 pub(super) fn unset_join_interested(&self) -> UpdateResult {
384 self.fetch_update(|curr| {
385 assert!(curr.is_join_interested());
386
387 if curr.is_complete() {
388 return None;
389 }
390
391 let mut next = curr;
392 next.unset_join_interested();
393
394 Some(next)
395 })
396 }
397
398 /// Sets the `JOIN_WAKER` bit.
399 ///
400 /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
401 /// the task has completed.
402 pub(super) fn set_join_waker(&self) -> UpdateResult {
403 self.fetch_update(|curr| {
404 assert!(curr.is_join_interested());
405 assert!(!curr.is_join_waker_set());
406
407 if curr.is_complete() {
408 return None;
409 }
410
411 let mut next = curr;
412 next.set_join_waker();
413
414 Some(next)
415 })
416 }
417
418 /// Unsets the `JOIN_WAKER` bit.
419 ///
420 /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if
421 /// the task has completed.
422 pub(super) fn unset_waker(&self) -> UpdateResult {
423 self.fetch_update(|curr| {
424 assert!(curr.is_join_interested());
425 assert!(curr.is_join_waker_set());
426
427 if curr.is_complete() {
428 return None;
429 }
430
431 let mut next = curr;
432 next.unset_join_waker();
433
434 Some(next)
435 })
436 }
437
438 pub(super) fn ref_inc(&self) {
439 use std::process;
440 use std::sync::atomic::Ordering::Relaxed;
441
442 // Using a relaxed ordering is alright here, as knowledge of the
443 // original reference prevents other threads from erroneously deleting
444 // the object.
445 //
446 // As explained in the [Boost documentation][1], Increasing the
447 // reference counter can always be done with memory_order_relaxed: New
448 // references to an object can only be formed from an existing
449 // reference, and passing an existing reference from one thread to
450 // another must already provide any required synchronization.
451 //
452 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
453 let prev = self.val.fetch_add(REF_ONE, Relaxed);
454
455 // If the reference count overflowed, abort.
456 if prev > isize::MAX as usize {
457 process::abort();
458 }
459 }
460
461 /// Returns `true` if the task should be released.
462 pub(super) fn ref_dec(&self) -> bool {
463 let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
464 assert!(prev.ref_count() >= 1);
465 prev.ref_count() == 1
466 }
467
468 /// Returns `true` if the task should be released.
469 pub(super) fn ref_dec_twice(&self) -> bool {
470 let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel));
471 assert!(prev.ref_count() >= 2);
472 prev.ref_count() == 2
473 }
474
475 fn fetch_update_action<F, T>(&self, mut f: F) -> T
476 where
477 F: FnMut(Snapshot) -> (T, Option<Snapshot>),
478 {
479 let mut curr = self.load();
480
481 loop {
482 let (output, next) = f(curr);
483 let next = match next {
484 Some(next) => next,
485 None => return output,
486 };
487
488 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
489
490 match res {
491 Ok(_) => return output,
492 Err(actual) => curr = Snapshot(actual),
493 }
494 }
495 }
496
497 fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
498 where
499 F: FnMut(Snapshot) -> Option<Snapshot>,
500 {
501 let mut curr = self.load();
502
503 loop {
504 let next = match f(curr) {
505 Some(next) => next,
506 None => return Err(curr),
507 };
508
509 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
510
511 match res {
512 Ok(_) => return Ok(next),
513 Err(actual) => curr = Snapshot(actual),
514 }
515 }
516 }
517}
518
519// ===== impl Snapshot =====
520
521impl Snapshot {
522 /// Returns `true` if the task is in an idle state.
523 pub(super) fn is_idle(self) -> bool {
524 self.0 & (RUNNING | COMPLETE) == 0
525 }
526
527 /// Returns `true` if the task has been flagged as notified.
528 pub(super) fn is_notified(self) -> bool {
529 self.0 & NOTIFIED == NOTIFIED
530 }
531
532 fn unset_notified(&mut self) {
533 self.0 &= !NOTIFIED;
534 }
535
536 fn set_notified(&mut self) {
537 self.0 |= NOTIFIED;
538 }
539
540 pub(super) fn is_running(self) -> bool {
541 self.0 & RUNNING == RUNNING
542 }
543
544 fn set_running(&mut self) {
545 self.0 |= RUNNING;
546 }
547
548 fn unset_running(&mut self) {
549 self.0 &= !RUNNING;
550 }
551
552 pub(super) fn is_cancelled(self) -> bool {
553 self.0 & CANCELLED == CANCELLED
554 }
555
556 fn set_cancelled(&mut self) {
557 self.0 |= CANCELLED;
558 }
559
560 /// Returns `true` if the task's future has completed execution.
561 pub(super) fn is_complete(self) -> bool {
562 self.0 & COMPLETE == COMPLETE
563 }
564
565 pub(super) fn is_join_interested(self) -> bool {
566 self.0 & JOIN_INTEREST == JOIN_INTEREST
567 }
568
569 fn unset_join_interested(&mut self) {
570 self.0 &= !JOIN_INTEREST;
571 }
572
573 pub(super) fn is_join_waker_set(self) -> bool {
574 self.0 & JOIN_WAKER == JOIN_WAKER
575 }
576
577 fn set_join_waker(&mut self) {
578 self.0 |= JOIN_WAKER;
579 }
580
581 fn unset_join_waker(&mut self) {
582 self.0 &= !JOIN_WAKER;
583 }
584
585 pub(super) fn ref_count(self) -> usize {
586 (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
587 }
588
589 fn ref_inc(&mut self) {
590 assert!(self.0 <= isize::MAX as usize);
591 self.0 += REF_ONE;
592 }
593
594 pub(super) fn ref_dec(&mut self) {
595 assert!(self.ref_count() > 0);
596 self.0 -= REF_ONE;
597 }
598}
599
600impl fmt::Debug for State {
601 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
602 let snapshot = self.load();
603 snapshot.fmt(fmt)
604 }
605}
606
607impl fmt::Debug for Snapshot {
608 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
609 fmt.debug_struct("Snapshot")
610 .field("is_running", &self.is_running())
611 .field("is_complete", &self.is_complete())
612 .field("is_notified", &self.is_notified())
613 .field("is_cancelled", &self.is_cancelled())
614 .field("is_join_interested", &self.is_join_interested())
615 .field("is_join_waker_set", &self.is_join_waker_set())
616 .field("ref_count", &self.ref_count())
617 .finish()
618 }
619}
620