1use crate::loom::sync::atomic::AtomicUsize;
2
3use std::fmt;
4use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5use std::usize;
6
7pub(super) struct State {
8 val: AtomicUsize,
9}
10
11/// Current state value.
12#[derive(Copy, Clone)]
13pub(super) struct Snapshot(usize);
14
15type UpdateResult = Result<Snapshot, Snapshot>;
16
17/// The task is currently being run.
18const RUNNING: usize = 0b0001;
19
20/// The task is complete.
21///
22/// Once this bit is set, it is never unset.
23const COMPLETE: usize = 0b0010;
24
25/// Extracts the task's lifecycle value from the state.
26const LIFECYCLE_MASK: usize = 0b11;
27
28/// Flag tracking if the task has been pushed into a run queue.
29const NOTIFIED: usize = 0b100;
30
31/// The join handle is still around.
32const JOIN_INTEREST: usize = 0b1_000;
33
34/// A join handle waker has been set.
35const JOIN_WAKER: usize = 0b10_000;
36
37/// The task has been forcibly cancelled.
38const CANCELLED: usize = 0b100_000;
39
40/// All bits.
41const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
42
43/// Bits used by the ref count portion of the state.
44const REF_COUNT_MASK: usize = !STATE_MASK;
45
46/// Number of positions to shift the ref count.
47const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
48
49/// One ref count.
50const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
51
52/// State a task is initialized with.
53///
54/// A task is initialized with three references:
55///
56/// * A reference that will be stored in an `OwnedTasks` or `LocalOwnedTasks`.
57/// * A reference that will be sent to the scheduler as an ordinary notification.
58/// * A reference for the `JoinHandle`.
59///
60/// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set.
61/// As the task starts with a `Notified`, `NOTIFIED` is set.
62const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED;
63
64#[must_use]
65pub(super) enum TransitionToRunning {
66 Success,
67 Cancelled,
68 Failed,
69 Dealloc,
70}
71
72#[must_use]
73pub(super) enum TransitionToIdle {
74 Ok,
75 OkNotified,
76 OkDealloc,
77 Cancelled,
78}
79
80#[must_use]
81pub(super) enum TransitionToNotifiedByVal {
82 DoNothing,
83 Submit,
84 Dealloc,
85}
86
87#[must_use]
88pub(crate) enum TransitionToNotifiedByRef {
89 DoNothing,
90 Submit,
91}
92
93/// All transitions are performed via RMW operations. This establishes an
94/// unambiguous modification order.
95impl State {
96 /// Returns a task's initial state.
97 pub(super) fn new() -> State {
98 // The raw task returned by this method has a ref-count of three. See
99 // the comment on INITIAL_STATE for more.
100 State {
101 val: AtomicUsize::new(INITIAL_STATE),
102 }
103 }
104
105 /// Loads the current state, establishes `Acquire` ordering.
106 pub(super) fn load(&self) -> Snapshot {
107 Snapshot(self.val.load(Acquire))
108 }
109
110 /// Attempts to transition the lifecycle to `Running`. This sets the
111 /// notified bit to false so notifications during the poll can be detected.
112 pub(super) fn transition_to_running(&self) -> TransitionToRunning {
113 self.fetch_update_action(|mut next| {
114 let action;
115 assert!(next.is_notified());
116
117 if !next.is_idle() {
118 // This happens if the task is either currently running or if it
119 // has already completed, e.g. if it was cancelled during
120 // shutdown. Consume the ref-count and return.
121 next.ref_dec();
122 if next.ref_count() == 0 {
123 action = TransitionToRunning::Dealloc;
124 } else {
125 action = TransitionToRunning::Failed;
126 }
127 } else {
128 // We are able to lock the RUNNING bit.
129 next.set_running();
130 next.unset_notified();
131
132 if next.is_cancelled() {
133 action = TransitionToRunning::Cancelled;
134 } else {
135 action = TransitionToRunning::Success;
136 }
137 }
138 (action, Some(next))
139 })
140 }
141
142 /// Transitions the task from `Running` -> `Idle`.
143 ///
144 /// Returns `true` if the transition to `Idle` is successful, `false` otherwise.
145 /// The transition to `Idle` fails if the task has been flagged to be
146 /// cancelled.
147 pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
148 self.fetch_update_action(|curr| {
149 assert!(curr.is_running());
150
151 if curr.is_cancelled() {
152 return (TransitionToIdle::Cancelled, None);
153 }
154
155 let mut next = curr;
156 let action;
157 next.unset_running();
158
159 if !next.is_notified() {
160 // Polling the future consumes the ref-count of the Notified.
161 next.ref_dec();
162 if next.ref_count() == 0 {
163 action = TransitionToIdle::OkDealloc;
164 } else {
165 action = TransitionToIdle::Ok;
166 }
167 } else {
168 // The caller will schedule a new notification, so we create a
169 // new ref-count for the notification. Our own ref-count is kept
170 // for now, and the caller will drop it shortly.
171 next.ref_inc();
172 action = TransitionToIdle::OkNotified;
173 }
174
175 (action, Some(next))
176 })
177 }
178
179 /// Transitions the task from `Running` -> `Complete`.
180 pub(super) fn transition_to_complete(&self) -> Snapshot {
181 const DELTA: usize = RUNNING | COMPLETE;
182
183 let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
184 assert!(prev.is_running());
185 assert!(!prev.is_complete());
186
187 Snapshot(prev.0 ^ DELTA)
188 }
189
190 /// Transitions from `Complete` -> `Terminal`, decrementing the reference
191 /// count the specified number of times.
192 ///
193 /// Returns true if the task should be deallocated.
194 pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
195 let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
196 assert!(
197 prev.ref_count() >= count,
198 "current: {}, sub: {}",
199 prev.ref_count(),
200 count
201 );
202 prev.ref_count() == count
203 }
204
205 /// Transitions the state to `NOTIFIED`.
206 ///
207 /// If no task needs to be submitted, a ref-count is consumed.
208 ///
209 /// If a task needs to be submitted, the ref-count is incremented for the
210 /// new Notified.
211 pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal {
212 self.fetch_update_action(|mut snapshot| {
213 let action;
214
215 if snapshot.is_running() {
216 // If the task is running, we mark it as notified, but we should
217 // not submit anything as the thread currently running the
218 // future is responsible for that.
219 snapshot.set_notified();
220 snapshot.ref_dec();
221
222 // The thread that set the running bit also holds a ref-count.
223 assert!(snapshot.ref_count() > 0);
224
225 action = TransitionToNotifiedByVal::DoNothing;
226 } else if snapshot.is_complete() || snapshot.is_notified() {
227 // We do not need to submit any notifications, but we have to
228 // decrement the ref-count.
229 snapshot.ref_dec();
230
231 if snapshot.ref_count() == 0 {
232 action = TransitionToNotifiedByVal::Dealloc;
233 } else {
234 action = TransitionToNotifiedByVal::DoNothing;
235 }
236 } else {
237 // We create a new notified that we can submit. The caller
238 // retains ownership of the ref-count they passed in.
239 snapshot.set_notified();
240 snapshot.ref_inc();
241 action = TransitionToNotifiedByVal::Submit;
242 }
243
244 (action, Some(snapshot))
245 })
246 }
247
248 /// Transitions the state to `NOTIFIED`.
249 pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef {
250 self.fetch_update_action(|mut snapshot| {
251 if snapshot.is_complete() || snapshot.is_notified() {
252 // There is nothing to do in this case.
253 (TransitionToNotifiedByRef::DoNothing, None)
254 } else if snapshot.is_running() {
255 // If the task is running, we mark it as notified, but we should
256 // not submit as the thread currently running the future is
257 // responsible for that.
258 snapshot.set_notified();
259 (TransitionToNotifiedByRef::DoNothing, Some(snapshot))
260 } else {
261 // The task is idle and not notified. We should submit a
262 // notification.
263 snapshot.set_notified();
264 snapshot.ref_inc();
265 (TransitionToNotifiedByRef::Submit, Some(snapshot))
266 }
267 })
268 }
269
270 /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref
271 /// count.
272 ///
273 /// Returns `true` if the notified bit was transitioned from `0` to `1`;
274 /// otherwise `false.`
275 #[cfg(all(
276 tokio_unstable,
277 tokio_taskdump,
278 feature = "rt",
279 target_os = "linux",
280 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
281 ))]
282 pub(super) fn transition_to_notified_for_tracing(&self) -> bool {
283 self.fetch_update_action(|mut snapshot| {
284 if snapshot.is_notified() {
285 (false, None)
286 } else {
287 snapshot.set_notified();
288 snapshot.ref_inc();
289 (true, Some(snapshot))
290 }
291 })
292 }
293
294 /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
295 ///
296 /// Returns `true` if the task needs to be submitted to the pool for
297 /// execution.
298 pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
299 self.fetch_update_action(|mut snapshot| {
300 if snapshot.is_cancelled() || snapshot.is_complete() {
301 // Aborts to completed or cancelled tasks are no-ops.
302 (false, None)
303 } else if snapshot.is_running() {
304 // If the task is running, we mark it as cancelled. The thread
305 // running the task will notice the cancelled bit when it
306 // stops polling and it will kill the task.
307 //
308 // The set_notified() call is not strictly necessary but it will
309 // in some cases let a wake_by_ref call return without having
310 // to perform a compare_exchange.
311 snapshot.set_notified();
312 snapshot.set_cancelled();
313 (false, Some(snapshot))
314 } else {
315 // The task is idle. We set the cancelled and notified bits and
316 // submit a notification if the notified bit was not already
317 // set.
318 snapshot.set_cancelled();
319 if !snapshot.is_notified() {
320 snapshot.set_notified();
321 snapshot.ref_inc();
322 (true, Some(snapshot))
323 } else {
324 (false, Some(snapshot))
325 }
326 }
327 })
328 }
329
330 /// Sets the `CANCELLED` bit and attempts to transition to `Running`.
331 ///
332 /// Returns `true` if the transition to `Running` succeeded.
333 pub(super) fn transition_to_shutdown(&self) -> bool {
334 let mut prev = Snapshot(0);
335
336 let _ = self.fetch_update(|mut snapshot| {
337 prev = snapshot;
338
339 if snapshot.is_idle() {
340 snapshot.set_running();
341 }
342
343 // If the task was not idle, the thread currently running the task
344 // will notice the cancelled bit and cancel it once the poll
345 // completes.
346 snapshot.set_cancelled();
347 Some(snapshot)
348 });
349
350 prev.is_idle()
351 }
352
353 /// Optimistically tries to swap the state assuming the join handle is
354 /// __immediately__ dropped on spawn.
355 pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
356 use std::sync::atomic::Ordering::Relaxed;
357
358 // Relaxed is acceptable as if this function is called and succeeds,
359 // then nothing has been done w/ the join handle.
360 //
361 // The moment the join handle is used (polled), the `JOIN_WAKER` flag is
362 // set, at which point the CAS will fail.
363 //
364 // Given this, there is no risk if this operation is reordered.
365 self.val
366 .compare_exchange_weak(
367 INITIAL_STATE,
368 (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
369 Release,
370 Relaxed,
371 )
372 .map(|_| ())
373 .map_err(|_| ())
374 }
375
376 /// Tries to unset the `JOIN_INTEREST` flag.
377 ///
378 /// Returns `Ok` if the operation happens before the task transitions to a
379 /// completed state, `Err` otherwise.
380 pub(super) fn unset_join_interested(&self) -> UpdateResult {
381 self.fetch_update(|curr| {
382 assert!(curr.is_join_interested());
383
384 if curr.is_complete() {
385 return None;
386 }
387
388 let mut next = curr;
389 next.unset_join_interested();
390
391 Some(next)
392 })
393 }
394
395 /// Sets the `JOIN_WAKER` bit.
396 ///
397 /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
398 /// the task has completed.
399 pub(super) fn set_join_waker(&self) -> UpdateResult {
400 self.fetch_update(|curr| {
401 assert!(curr.is_join_interested());
402 assert!(!curr.is_join_waker_set());
403
404 if curr.is_complete() {
405 return None;
406 }
407
408 let mut next = curr;
409 next.set_join_waker();
410
411 Some(next)
412 })
413 }
414
415 /// Unsets the `JOIN_WAKER` bit.
416 ///
417 /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if
418 /// the task has completed.
419 pub(super) fn unset_waker(&self) -> UpdateResult {
420 self.fetch_update(|curr| {
421 assert!(curr.is_join_interested());
422 assert!(curr.is_join_waker_set());
423
424 if curr.is_complete() {
425 return None;
426 }
427
428 let mut next = curr;
429 next.unset_join_waker();
430
431 Some(next)
432 })
433 }
434
435 pub(super) fn ref_inc(&self) {
436 use std::process;
437 use std::sync::atomic::Ordering::Relaxed;
438
439 // Using a relaxed ordering is alright here, as knowledge of the
440 // original reference prevents other threads from erroneously deleting
441 // the object.
442 //
443 // As explained in the [Boost documentation][1], Increasing the
444 // reference counter can always be done with memory_order_relaxed: New
445 // references to an object can only be formed from an existing
446 // reference, and passing an existing reference from one thread to
447 // another must already provide any required synchronization.
448 //
449 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
450 let prev = self.val.fetch_add(REF_ONE, Relaxed);
451
452 // If the reference count overflowed, abort.
453 if prev > isize::MAX as usize {
454 process::abort();
455 }
456 }
457
458 /// Returns `true` if the task should be released.
459 pub(super) fn ref_dec(&self) -> bool {
460 let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
461 assert!(prev.ref_count() >= 1);
462 prev.ref_count() == 1
463 }
464
465 /// Returns `true` if the task should be released.
466 pub(super) fn ref_dec_twice(&self) -> bool {
467 let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel));
468 assert!(prev.ref_count() >= 2);
469 prev.ref_count() == 2
470 }
471
472 fn fetch_update_action<F, T>(&self, mut f: F) -> T
473 where
474 F: FnMut(Snapshot) -> (T, Option<Snapshot>),
475 {
476 let mut curr = self.load();
477
478 loop {
479 let (output, next) = f(curr);
480 let next = match next {
481 Some(next) => next,
482 None => return output,
483 };
484
485 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
486
487 match res {
488 Ok(_) => return output,
489 Err(actual) => curr = Snapshot(actual),
490 }
491 }
492 }
493
494 fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
495 where
496 F: FnMut(Snapshot) -> Option<Snapshot>,
497 {
498 let mut curr = self.load();
499
500 loop {
501 let next = match f(curr) {
502 Some(next) => next,
503 None => return Err(curr),
504 };
505
506 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
507
508 match res {
509 Ok(_) => return Ok(next),
510 Err(actual) => curr = Snapshot(actual),
511 }
512 }
513 }
514}
515
516// ===== impl Snapshot =====
517
518impl Snapshot {
519 /// Returns `true` if the task is in an idle state.
520 pub(super) fn is_idle(self) -> bool {
521 self.0 & (RUNNING | COMPLETE) == 0
522 }
523
524 /// Returns `true` if the task has been flagged as notified.
525 pub(super) fn is_notified(self) -> bool {
526 self.0 & NOTIFIED == NOTIFIED
527 }
528
529 fn unset_notified(&mut self) {
530 self.0 &= !NOTIFIED;
531 }
532
533 fn set_notified(&mut self) {
534 self.0 |= NOTIFIED;
535 }
536
537 pub(super) fn is_running(self) -> bool {
538 self.0 & RUNNING == RUNNING
539 }
540
541 fn set_running(&mut self) {
542 self.0 |= RUNNING;
543 }
544
545 fn unset_running(&mut self) {
546 self.0 &= !RUNNING;
547 }
548
549 pub(super) fn is_cancelled(self) -> bool {
550 self.0 & CANCELLED == CANCELLED
551 }
552
553 fn set_cancelled(&mut self) {
554 self.0 |= CANCELLED;
555 }
556
557 /// Returns `true` if the task's future has completed execution.
558 pub(super) fn is_complete(self) -> bool {
559 self.0 & COMPLETE == COMPLETE
560 }
561
562 pub(super) fn is_join_interested(self) -> bool {
563 self.0 & JOIN_INTEREST == JOIN_INTEREST
564 }
565
566 fn unset_join_interested(&mut self) {
567 self.0 &= !JOIN_INTEREST;
568 }
569
570 pub(super) fn is_join_waker_set(self) -> bool {
571 self.0 & JOIN_WAKER == JOIN_WAKER
572 }
573
574 fn set_join_waker(&mut self) {
575 self.0 |= JOIN_WAKER;
576 }
577
578 fn unset_join_waker(&mut self) {
579 self.0 &= !JOIN_WAKER;
580 }
581
582 pub(super) fn ref_count(self) -> usize {
583 (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
584 }
585
586 fn ref_inc(&mut self) {
587 assert!(self.0 <= isize::MAX as usize);
588 self.0 += REF_ONE;
589 }
590
591 pub(super) fn ref_dec(&mut self) {
592 assert!(self.ref_count() > 0);
593 self.0 -= REF_ONE;
594 }
595}
596
597impl fmt::Debug for State {
598 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
599 let snapshot: Snapshot = self.load();
600 snapshot.fmt(fmt)
601 }
602}
603
604impl fmt::Debug for Snapshot {
605 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
606 fmt&mut DebugStruct<'_, '_>.debug_struct("Snapshot")
607 .field("is_running", &self.is_running())
608 .field("is_complete", &self.is_complete())
609 .field("is_notified", &self.is_notified())
610 .field("is_cancelled", &self.is_cancelled())
611 .field("is_join_interested", &self.is_join_interested())
612 .field("is_join_waker_set", &self.is_join_waker_set())
613 .field(name:"ref_count", &self.ref_count())
614 .finish()
615 }
616}
617