1 | use crate::loom::sync::atomic::AtomicUsize; |
2 | |
3 | use std::fmt; |
4 | use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; |
5 | use std::usize; |
6 | |
7 | pub(super) struct State { |
8 | val: AtomicUsize, |
9 | } |
10 | |
11 | /// Current state value. |
12 | #[derive (Copy, Clone)] |
13 | pub(super) struct Snapshot(usize); |
14 | |
15 | type UpdateResult = Result<Snapshot, Snapshot>; |
16 | |
17 | /// The task is currently being run. |
18 | const RUNNING: usize = 0b0001; |
19 | |
20 | /// The task is complete. |
21 | /// |
22 | /// Once this bit is set, it is never unset. |
23 | const COMPLETE: usize = 0b0010; |
24 | |
25 | /// Extracts the task's lifecycle value from the state. |
26 | const LIFECYCLE_MASK: usize = 0b11; |
27 | |
28 | /// Flag tracking if the task has been pushed into a run queue. |
29 | const NOTIFIED: usize = 0b100; |
30 | |
31 | /// The join handle is still around. |
32 | const JOIN_INTEREST: usize = 0b1_000; |
33 | |
34 | /// A join handle waker has been set. |
35 | const JOIN_WAKER: usize = 0b10_000; |
36 | |
37 | /// The task has been forcibly cancelled. |
38 | const CANCELLED: usize = 0b100_000; |
39 | |
40 | /// All bits. |
41 | const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED; |
42 | |
43 | /// Bits used by the ref count portion of the state. |
44 | const REF_COUNT_MASK: usize = !STATE_MASK; |
45 | |
46 | /// Number of positions to shift the ref count. |
47 | const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize; |
48 | |
49 | /// One ref count. |
50 | const REF_ONE: usize = 1 << REF_COUNT_SHIFT; |
51 | |
52 | /// State a task is initialized with. |
53 | /// |
54 | /// A task is initialized with three references: |
55 | /// |
56 | /// * A reference that will be stored in an `OwnedTasks` or `LocalOwnedTasks`. |
57 | /// * A reference that will be sent to the scheduler as an ordinary notification. |
58 | /// * A reference for the `JoinHandle`. |
59 | /// |
60 | /// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set. |
61 | /// As the task starts with a `Notified`, `NOTIFIED` is set. |
62 | const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED; |
63 | |
64 | #[must_use ] |
65 | pub(super) enum TransitionToRunning { |
66 | Success, |
67 | Cancelled, |
68 | Failed, |
69 | Dealloc, |
70 | } |
71 | |
72 | #[must_use ] |
73 | pub(super) enum TransitionToIdle { |
74 | Ok, |
75 | OkNotified, |
76 | OkDealloc, |
77 | Cancelled, |
78 | } |
79 | |
80 | #[must_use ] |
81 | pub(super) enum TransitionToNotifiedByVal { |
82 | DoNothing, |
83 | Submit, |
84 | Dealloc, |
85 | } |
86 | |
87 | #[must_use ] |
88 | pub(crate) enum TransitionToNotifiedByRef { |
89 | DoNothing, |
90 | Submit, |
91 | } |
92 | |
93 | /// All transitions are performed via RMW operations. This establishes an |
94 | /// unambiguous modification order. |
95 | impl State { |
96 | /// Returns a task's initial state. |
97 | pub(super) fn new() -> State { |
98 | // The raw task returned by this method has a ref-count of three. See |
99 | // the comment on INITIAL_STATE for more. |
100 | State { |
101 | val: AtomicUsize::new(INITIAL_STATE), |
102 | } |
103 | } |
104 | |
105 | /// Loads the current state, establishes `Acquire` ordering. |
106 | pub(super) fn load(&self) -> Snapshot { |
107 | Snapshot(self.val.load(Acquire)) |
108 | } |
109 | |
110 | /// Attempts to transition the lifecycle to `Running`. This sets the |
111 | /// notified bit to false so notifications during the poll can be detected. |
112 | pub(super) fn transition_to_running(&self) -> TransitionToRunning { |
113 | self.fetch_update_action(|mut next| { |
114 | let action; |
115 | assert!(next.is_notified()); |
116 | |
117 | if !next.is_idle() { |
118 | // This happens if the task is either currently running or if it |
119 | // has already completed, e.g. if it was cancelled during |
120 | // shutdown. Consume the ref-count and return. |
121 | next.ref_dec(); |
122 | if next.ref_count() == 0 { |
123 | action = TransitionToRunning::Dealloc; |
124 | } else { |
125 | action = TransitionToRunning::Failed; |
126 | } |
127 | } else { |
128 | // We are able to lock the RUNNING bit. |
129 | next.set_running(); |
130 | next.unset_notified(); |
131 | |
132 | if next.is_cancelled() { |
133 | action = TransitionToRunning::Cancelled; |
134 | } else { |
135 | action = TransitionToRunning::Success; |
136 | } |
137 | } |
138 | (action, Some(next)) |
139 | }) |
140 | } |
141 | |
142 | /// Transitions the task from `Running` -> `Idle`. |
143 | /// |
144 | /// Returns `true` if the transition to `Idle` is successful, `false` otherwise. |
145 | /// The transition to `Idle` fails if the task has been flagged to be |
146 | /// cancelled. |
147 | pub(super) fn transition_to_idle(&self) -> TransitionToIdle { |
148 | self.fetch_update_action(|curr| { |
149 | assert!(curr.is_running()); |
150 | |
151 | if curr.is_cancelled() { |
152 | return (TransitionToIdle::Cancelled, None); |
153 | } |
154 | |
155 | let mut next = curr; |
156 | let action; |
157 | next.unset_running(); |
158 | |
159 | if !next.is_notified() { |
160 | // Polling the future consumes the ref-count of the Notified. |
161 | next.ref_dec(); |
162 | if next.ref_count() == 0 { |
163 | action = TransitionToIdle::OkDealloc; |
164 | } else { |
165 | action = TransitionToIdle::Ok; |
166 | } |
167 | } else { |
168 | // The caller will schedule a new notification, so we create a |
169 | // new ref-count for the notification. Our own ref-count is kept |
170 | // for now, and the caller will drop it shortly. |
171 | next.ref_inc(); |
172 | action = TransitionToIdle::OkNotified; |
173 | } |
174 | |
175 | (action, Some(next)) |
176 | }) |
177 | } |
178 | |
179 | /// Transitions the task from `Running` -> `Complete`. |
180 | pub(super) fn transition_to_complete(&self) -> Snapshot { |
181 | const DELTA: usize = RUNNING | COMPLETE; |
182 | |
183 | let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel)); |
184 | assert!(prev.is_running()); |
185 | assert!(!prev.is_complete()); |
186 | |
187 | Snapshot(prev.0 ^ DELTA) |
188 | } |
189 | |
190 | /// Transitions from `Complete` -> `Terminal`, decrementing the reference |
191 | /// count the specified number of times. |
192 | /// |
193 | /// Returns true if the task should be deallocated. |
194 | pub(super) fn transition_to_terminal(&self, count: usize) -> bool { |
195 | let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel)); |
196 | assert!( |
197 | prev.ref_count() >= count, |
198 | "current: {}, sub: {}" , |
199 | prev.ref_count(), |
200 | count |
201 | ); |
202 | prev.ref_count() == count |
203 | } |
204 | |
205 | /// Transitions the state to `NOTIFIED`. |
206 | /// |
207 | /// If no task needs to be submitted, a ref-count is consumed. |
208 | /// |
209 | /// If a task needs to be submitted, the ref-count is incremented for the |
210 | /// new Notified. |
211 | pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal { |
212 | self.fetch_update_action(|mut snapshot| { |
213 | let action; |
214 | |
215 | if snapshot.is_running() { |
216 | // If the task is running, we mark it as notified, but we should |
217 | // not submit anything as the thread currently running the |
218 | // future is responsible for that. |
219 | snapshot.set_notified(); |
220 | snapshot.ref_dec(); |
221 | |
222 | // The thread that set the running bit also holds a ref-count. |
223 | assert!(snapshot.ref_count() > 0); |
224 | |
225 | action = TransitionToNotifiedByVal::DoNothing; |
226 | } else if snapshot.is_complete() || snapshot.is_notified() { |
227 | // We do not need to submit any notifications, but we have to |
228 | // decrement the ref-count. |
229 | snapshot.ref_dec(); |
230 | |
231 | if snapshot.ref_count() == 0 { |
232 | action = TransitionToNotifiedByVal::Dealloc; |
233 | } else { |
234 | action = TransitionToNotifiedByVal::DoNothing; |
235 | } |
236 | } else { |
237 | // We create a new notified that we can submit. The caller |
238 | // retains ownership of the ref-count they passed in. |
239 | snapshot.set_notified(); |
240 | snapshot.ref_inc(); |
241 | action = TransitionToNotifiedByVal::Submit; |
242 | } |
243 | |
244 | (action, Some(snapshot)) |
245 | }) |
246 | } |
247 | |
248 | /// Transitions the state to `NOTIFIED`. |
249 | pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef { |
250 | self.fetch_update_action(|mut snapshot| { |
251 | if snapshot.is_complete() || snapshot.is_notified() { |
252 | // There is nothing to do in this case. |
253 | (TransitionToNotifiedByRef::DoNothing, None) |
254 | } else if snapshot.is_running() { |
255 | // If the task is running, we mark it as notified, but we should |
256 | // not submit as the thread currently running the future is |
257 | // responsible for that. |
258 | snapshot.set_notified(); |
259 | (TransitionToNotifiedByRef::DoNothing, Some(snapshot)) |
260 | } else { |
261 | // The task is idle and not notified. We should submit a |
262 | // notification. |
263 | snapshot.set_notified(); |
264 | snapshot.ref_inc(); |
265 | (TransitionToNotifiedByRef::Submit, Some(snapshot)) |
266 | } |
267 | }) |
268 | } |
269 | |
270 | /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref |
271 | /// count. |
272 | /// |
273 | /// Returns `true` if the notified bit was transitioned from `0` to `1`; |
274 | /// otherwise `false.` |
275 | #[cfg (all( |
276 | tokio_unstable, |
277 | tokio_taskdump, |
278 | feature = "rt" , |
279 | target_os = "linux" , |
280 | any(target_arch = "aarch64" , target_arch = "x86" , target_arch = "x86_64" ) |
281 | ))] |
282 | pub(super) fn transition_to_notified_for_tracing(&self) -> bool { |
283 | self.fetch_update_action(|mut snapshot| { |
284 | if snapshot.is_notified() { |
285 | (false, None) |
286 | } else { |
287 | snapshot.set_notified(); |
288 | snapshot.ref_inc(); |
289 | (true, Some(snapshot)) |
290 | } |
291 | }) |
292 | } |
293 | |
294 | /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle. |
295 | /// |
296 | /// Returns `true` if the task needs to be submitted to the pool for |
297 | /// execution. |
298 | pub(super) fn transition_to_notified_and_cancel(&self) -> bool { |
299 | self.fetch_update_action(|mut snapshot| { |
300 | if snapshot.is_cancelled() || snapshot.is_complete() { |
301 | // Aborts to completed or cancelled tasks are no-ops. |
302 | (false, None) |
303 | } else if snapshot.is_running() { |
304 | // If the task is running, we mark it as cancelled. The thread |
305 | // running the task will notice the cancelled bit when it |
306 | // stops polling and it will kill the task. |
307 | // |
308 | // The set_notified() call is not strictly necessary but it will |
309 | // in some cases let a wake_by_ref call return without having |
310 | // to perform a compare_exchange. |
311 | snapshot.set_notified(); |
312 | snapshot.set_cancelled(); |
313 | (false, Some(snapshot)) |
314 | } else { |
315 | // The task is idle. We set the cancelled and notified bits and |
316 | // submit a notification if the notified bit was not already |
317 | // set. |
318 | snapshot.set_cancelled(); |
319 | if !snapshot.is_notified() { |
320 | snapshot.set_notified(); |
321 | snapshot.ref_inc(); |
322 | (true, Some(snapshot)) |
323 | } else { |
324 | (false, Some(snapshot)) |
325 | } |
326 | } |
327 | }) |
328 | } |
329 | |
330 | /// Sets the `CANCELLED` bit and attempts to transition to `Running`. |
331 | /// |
332 | /// Returns `true` if the transition to `Running` succeeded. |
333 | pub(super) fn transition_to_shutdown(&self) -> bool { |
334 | let mut prev = Snapshot(0); |
335 | |
336 | let _ = self.fetch_update(|mut snapshot| { |
337 | prev = snapshot; |
338 | |
339 | if snapshot.is_idle() { |
340 | snapshot.set_running(); |
341 | } |
342 | |
343 | // If the task was not idle, the thread currently running the task |
344 | // will notice the cancelled bit and cancel it once the poll |
345 | // completes. |
346 | snapshot.set_cancelled(); |
347 | Some(snapshot) |
348 | }); |
349 | |
350 | prev.is_idle() |
351 | } |
352 | |
353 | /// Optimistically tries to swap the state assuming the join handle is |
354 | /// __immediately__ dropped on spawn. |
355 | pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> { |
356 | use std::sync::atomic::Ordering::Relaxed; |
357 | |
358 | // Relaxed is acceptable as if this function is called and succeeds, |
359 | // then nothing has been done w/ the join handle. |
360 | // |
361 | // The moment the join handle is used (polled), the `JOIN_WAKER` flag is |
362 | // set, at which point the CAS will fail. |
363 | // |
364 | // Given this, there is no risk if this operation is reordered. |
365 | self.val |
366 | .compare_exchange_weak( |
367 | INITIAL_STATE, |
368 | (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST, |
369 | Release, |
370 | Relaxed, |
371 | ) |
372 | .map(|_| ()) |
373 | .map_err(|_| ()) |
374 | } |
375 | |
376 | /// Tries to unset the `JOIN_INTEREST` flag. |
377 | /// |
378 | /// Returns `Ok` if the operation happens before the task transitions to a |
379 | /// completed state, `Err` otherwise. |
380 | pub(super) fn unset_join_interested(&self) -> UpdateResult { |
381 | self.fetch_update(|curr| { |
382 | assert!(curr.is_join_interested()); |
383 | |
384 | if curr.is_complete() { |
385 | return None; |
386 | } |
387 | |
388 | let mut next = curr; |
389 | next.unset_join_interested(); |
390 | |
391 | Some(next) |
392 | }) |
393 | } |
394 | |
395 | /// Sets the `JOIN_WAKER` bit. |
396 | /// |
397 | /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if |
398 | /// the task has completed. |
399 | pub(super) fn set_join_waker(&self) -> UpdateResult { |
400 | self.fetch_update(|curr| { |
401 | assert!(curr.is_join_interested()); |
402 | assert!(!curr.is_join_waker_set()); |
403 | |
404 | if curr.is_complete() { |
405 | return None; |
406 | } |
407 | |
408 | let mut next = curr; |
409 | next.set_join_waker(); |
410 | |
411 | Some(next) |
412 | }) |
413 | } |
414 | |
415 | /// Unsets the `JOIN_WAKER` bit. |
416 | /// |
417 | /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if |
418 | /// the task has completed. |
419 | pub(super) fn unset_waker(&self) -> UpdateResult { |
420 | self.fetch_update(|curr| { |
421 | assert!(curr.is_join_interested()); |
422 | assert!(curr.is_join_waker_set()); |
423 | |
424 | if curr.is_complete() { |
425 | return None; |
426 | } |
427 | |
428 | let mut next = curr; |
429 | next.unset_join_waker(); |
430 | |
431 | Some(next) |
432 | }) |
433 | } |
434 | |
435 | pub(super) fn ref_inc(&self) { |
436 | use std::process; |
437 | use std::sync::atomic::Ordering::Relaxed; |
438 | |
439 | // Using a relaxed ordering is alright here, as knowledge of the |
440 | // original reference prevents other threads from erroneously deleting |
441 | // the object. |
442 | // |
443 | // As explained in the [Boost documentation][1], Increasing the |
444 | // reference counter can always be done with memory_order_relaxed: New |
445 | // references to an object can only be formed from an existing |
446 | // reference, and passing an existing reference from one thread to |
447 | // another must already provide any required synchronization. |
448 | // |
449 | // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) |
450 | let prev = self.val.fetch_add(REF_ONE, Relaxed); |
451 | |
452 | // If the reference count overflowed, abort. |
453 | if prev > isize::MAX as usize { |
454 | process::abort(); |
455 | } |
456 | } |
457 | |
458 | /// Returns `true` if the task should be released. |
459 | pub(super) fn ref_dec(&self) -> bool { |
460 | let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel)); |
461 | assert!(prev.ref_count() >= 1); |
462 | prev.ref_count() == 1 |
463 | } |
464 | |
465 | /// Returns `true` if the task should be released. |
466 | pub(super) fn ref_dec_twice(&self) -> bool { |
467 | let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel)); |
468 | assert!(prev.ref_count() >= 2); |
469 | prev.ref_count() == 2 |
470 | } |
471 | |
472 | fn fetch_update_action<F, T>(&self, mut f: F) -> T |
473 | where |
474 | F: FnMut(Snapshot) -> (T, Option<Snapshot>), |
475 | { |
476 | let mut curr = self.load(); |
477 | |
478 | loop { |
479 | let (output, next) = f(curr); |
480 | let next = match next { |
481 | Some(next) => next, |
482 | None => return output, |
483 | }; |
484 | |
485 | let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); |
486 | |
487 | match res { |
488 | Ok(_) => return output, |
489 | Err(actual) => curr = Snapshot(actual), |
490 | } |
491 | } |
492 | } |
493 | |
494 | fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> |
495 | where |
496 | F: FnMut(Snapshot) -> Option<Snapshot>, |
497 | { |
498 | let mut curr = self.load(); |
499 | |
500 | loop { |
501 | let next = match f(curr) { |
502 | Some(next) => next, |
503 | None => return Err(curr), |
504 | }; |
505 | |
506 | let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); |
507 | |
508 | match res { |
509 | Ok(_) => return Ok(next), |
510 | Err(actual) => curr = Snapshot(actual), |
511 | } |
512 | } |
513 | } |
514 | } |
515 | |
516 | // ===== impl Snapshot ===== |
517 | |
518 | impl Snapshot { |
519 | /// Returns `true` if the task is in an idle state. |
520 | pub(super) fn is_idle(self) -> bool { |
521 | self.0 & (RUNNING | COMPLETE) == 0 |
522 | } |
523 | |
524 | /// Returns `true` if the task has been flagged as notified. |
525 | pub(super) fn is_notified(self) -> bool { |
526 | self.0 & NOTIFIED == NOTIFIED |
527 | } |
528 | |
529 | fn unset_notified(&mut self) { |
530 | self.0 &= !NOTIFIED; |
531 | } |
532 | |
533 | fn set_notified(&mut self) { |
534 | self.0 |= NOTIFIED; |
535 | } |
536 | |
537 | pub(super) fn is_running(self) -> bool { |
538 | self.0 & RUNNING == RUNNING |
539 | } |
540 | |
541 | fn set_running(&mut self) { |
542 | self.0 |= RUNNING; |
543 | } |
544 | |
545 | fn unset_running(&mut self) { |
546 | self.0 &= !RUNNING; |
547 | } |
548 | |
549 | pub(super) fn is_cancelled(self) -> bool { |
550 | self.0 & CANCELLED == CANCELLED |
551 | } |
552 | |
553 | fn set_cancelled(&mut self) { |
554 | self.0 |= CANCELLED; |
555 | } |
556 | |
557 | /// Returns `true` if the task's future has completed execution. |
558 | pub(super) fn is_complete(self) -> bool { |
559 | self.0 & COMPLETE == COMPLETE |
560 | } |
561 | |
562 | pub(super) fn is_join_interested(self) -> bool { |
563 | self.0 & JOIN_INTEREST == JOIN_INTEREST |
564 | } |
565 | |
566 | fn unset_join_interested(&mut self) { |
567 | self.0 &= !JOIN_INTEREST; |
568 | } |
569 | |
570 | pub(super) fn is_join_waker_set(self) -> bool { |
571 | self.0 & JOIN_WAKER == JOIN_WAKER |
572 | } |
573 | |
574 | fn set_join_waker(&mut self) { |
575 | self.0 |= JOIN_WAKER; |
576 | } |
577 | |
578 | fn unset_join_waker(&mut self) { |
579 | self.0 &= !JOIN_WAKER; |
580 | } |
581 | |
582 | pub(super) fn ref_count(self) -> usize { |
583 | (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT |
584 | } |
585 | |
586 | fn ref_inc(&mut self) { |
587 | assert!(self.0 <= isize::MAX as usize); |
588 | self.0 += REF_ONE; |
589 | } |
590 | |
591 | pub(super) fn ref_dec(&mut self) { |
592 | assert!(self.ref_count() > 0); |
593 | self.0 -= REF_ONE; |
594 | } |
595 | } |
596 | |
597 | impl fmt::Debug for State { |
598 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
599 | let snapshot: Snapshot = self.load(); |
600 | snapshot.fmt(fmt) |
601 | } |
602 | } |
603 | |
604 | impl fmt::Debug for Snapshot { |
605 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
606 | fmt&mut DebugStruct<'_, '_>.debug_struct("Snapshot" ) |
607 | .field("is_running" , &self.is_running()) |
608 | .field("is_complete" , &self.is_complete()) |
609 | .field("is_notified" , &self.is_notified()) |
610 | .field("is_cancelled" , &self.is_cancelled()) |
611 | .field("is_join_interested" , &self.is_join_interested()) |
612 | .field("is_join_waker_set" , &self.is_join_waker_set()) |
613 | .field(name:"ref_count" , &self.ref_count()) |
614 | .finish() |
615 | } |
616 | } |
617 | |