1 | use crate::loom::sync::atomic::AtomicUsize; |
2 | |
3 | use std::fmt; |
4 | use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; |
5 | use std::usize; |
6 | |
7 | pub(super) struct State { |
8 | val: AtomicUsize, |
9 | } |
10 | |
11 | /// Current state value. |
12 | #[derive (Copy, Clone)] |
13 | pub(super) struct Snapshot(usize); |
14 | |
15 | type UpdateResult = Result<Snapshot, Snapshot>; |
16 | |
17 | /// The task is currently being run. |
18 | const RUNNING: usize = 0b0001; |
19 | |
20 | /// The task is complete. |
21 | /// |
22 | /// Once this bit is set, it is never unset. |
23 | const COMPLETE: usize = 0b0010; |
24 | |
25 | /// Extracts the task's lifecycle value from the state. |
26 | const LIFECYCLE_MASK: usize = 0b11; |
27 | |
28 | /// Flag tracking if the task has been pushed into a run queue. |
29 | const NOTIFIED: usize = 0b100; |
30 | |
31 | /// The join handle is still around. |
32 | #[allow (clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 |
33 | const JOIN_INTEREST: usize = 0b1_000; |
34 | |
35 | /// A join handle waker has been set. |
36 | #[allow (clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 |
37 | const JOIN_WAKER: usize = 0b10_000; |
38 | |
39 | /// The task has been forcibly cancelled. |
40 | #[allow (clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 |
41 | const CANCELLED: usize = 0b100_000; |
42 | |
43 | /// All bits. |
44 | const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED; |
45 | |
46 | /// Bits used by the ref count portion of the state. |
47 | const REF_COUNT_MASK: usize = !STATE_MASK; |
48 | |
49 | /// Number of positions to shift the ref count. |
50 | const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize; |
51 | |
52 | /// One ref count. |
53 | const REF_ONE: usize = 1 << REF_COUNT_SHIFT; |
54 | |
55 | /// State a task is initialized with. |
56 | /// |
57 | /// A task is initialized with three references: |
58 | /// |
59 | /// * A reference that will be stored in an OwnedTasks or LocalOwnedTasks. |
60 | /// * A reference that will be sent to the scheduler as an ordinary notification. |
61 | /// * A reference for the JoinHandle. |
62 | /// |
63 | /// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set. |
64 | /// As the task starts with a `Notified`, `NOTIFIED` is set. |
65 | const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED; |
66 | |
67 | #[must_use ] |
68 | pub(super) enum TransitionToRunning { |
69 | Success, |
70 | Cancelled, |
71 | Failed, |
72 | Dealloc, |
73 | } |
74 | |
75 | #[must_use ] |
76 | pub(super) enum TransitionToIdle { |
77 | Ok, |
78 | OkNotified, |
79 | OkDealloc, |
80 | Cancelled, |
81 | } |
82 | |
83 | #[must_use ] |
84 | pub(super) enum TransitionToNotifiedByVal { |
85 | DoNothing, |
86 | Submit, |
87 | Dealloc, |
88 | } |
89 | |
90 | #[must_use ] |
91 | pub(crate) enum TransitionToNotifiedByRef { |
92 | DoNothing, |
93 | Submit, |
94 | } |
95 | |
96 | /// All transitions are performed via RMW operations. This establishes an |
97 | /// unambiguous modification order. |
98 | impl State { |
99 | /// Returns a task's initial state. |
100 | pub(super) fn new() -> State { |
101 | // The raw task returned by this method has a ref-count of three. See |
102 | // the comment on INITIAL_STATE for more. |
103 | State { |
104 | val: AtomicUsize::new(INITIAL_STATE), |
105 | } |
106 | } |
107 | |
108 | /// Loads the current state, establishes `Acquire` ordering. |
109 | pub(super) fn load(&self) -> Snapshot { |
110 | Snapshot(self.val.load(Acquire)) |
111 | } |
112 | |
113 | /// Attempts to transition the lifecycle to `Running`. This sets the |
114 | /// notified bit to false so notifications during the poll can be detected. |
115 | pub(super) fn transition_to_running(&self) -> TransitionToRunning { |
116 | self.fetch_update_action(|mut next| { |
117 | let action; |
118 | assert!(next.is_notified()); |
119 | |
120 | if !next.is_idle() { |
121 | // This happens if the task is either currently running or if it |
122 | // has already completed, e.g. if it was cancelled during |
123 | // shutdown. Consume the ref-count and return. |
124 | next.ref_dec(); |
125 | if next.ref_count() == 0 { |
126 | action = TransitionToRunning::Dealloc; |
127 | } else { |
128 | action = TransitionToRunning::Failed; |
129 | } |
130 | } else { |
131 | // We are able to lock the RUNNING bit. |
132 | next.set_running(); |
133 | next.unset_notified(); |
134 | |
135 | if next.is_cancelled() { |
136 | action = TransitionToRunning::Cancelled; |
137 | } else { |
138 | action = TransitionToRunning::Success; |
139 | } |
140 | } |
141 | (action, Some(next)) |
142 | }) |
143 | } |
144 | |
145 | /// Transitions the task from `Running` -> `Idle`. |
146 | /// |
147 | /// Returns `true` if the transition to `Idle` is successful, `false` otherwise. |
148 | /// The transition to `Idle` fails if the task has been flagged to be |
149 | /// cancelled. |
150 | pub(super) fn transition_to_idle(&self) -> TransitionToIdle { |
151 | self.fetch_update_action(|curr| { |
152 | assert!(curr.is_running()); |
153 | |
154 | if curr.is_cancelled() { |
155 | return (TransitionToIdle::Cancelled, None); |
156 | } |
157 | |
158 | let mut next = curr; |
159 | let action; |
160 | next.unset_running(); |
161 | |
162 | if !next.is_notified() { |
163 | // Polling the future consumes the ref-count of the Notified. |
164 | next.ref_dec(); |
165 | if next.ref_count() == 0 { |
166 | action = TransitionToIdle::OkDealloc; |
167 | } else { |
168 | action = TransitionToIdle::Ok; |
169 | } |
170 | } else { |
171 | // The caller will schedule a new notification, so we create a |
172 | // new ref-count for the notification. Our own ref-count is kept |
173 | // for now, and the caller will drop it shortly. |
174 | next.ref_inc(); |
175 | action = TransitionToIdle::OkNotified; |
176 | } |
177 | |
178 | (action, Some(next)) |
179 | }) |
180 | } |
181 | |
182 | /// Transitions the task from `Running` -> `Complete`. |
183 | pub(super) fn transition_to_complete(&self) -> Snapshot { |
184 | const DELTA: usize = RUNNING | COMPLETE; |
185 | |
186 | let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel)); |
187 | assert!(prev.is_running()); |
188 | assert!(!prev.is_complete()); |
189 | |
190 | Snapshot(prev.0 ^ DELTA) |
191 | } |
192 | |
193 | /// Transitions from `Complete` -> `Terminal`, decrementing the reference |
194 | /// count the specified number of times. |
195 | /// |
196 | /// Returns true if the task should be deallocated. |
197 | pub(super) fn transition_to_terminal(&self, count: usize) -> bool { |
198 | let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel)); |
199 | assert!( |
200 | prev.ref_count() >= count, |
201 | "current: {}, sub: {}" , |
202 | prev.ref_count(), |
203 | count |
204 | ); |
205 | prev.ref_count() == count |
206 | } |
207 | |
208 | /// Transitions the state to `NOTIFIED`. |
209 | /// |
210 | /// If no task needs to be submitted, a ref-count is consumed. |
211 | /// |
212 | /// If a task needs to be submitted, the ref-count is incremented for the |
213 | /// new Notified. |
214 | pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal { |
215 | self.fetch_update_action(|mut snapshot| { |
216 | let action; |
217 | |
218 | if snapshot.is_running() { |
219 | // If the task is running, we mark it as notified, but we should |
220 | // not submit anything as the thread currently running the |
221 | // future is responsible for that. |
222 | snapshot.set_notified(); |
223 | snapshot.ref_dec(); |
224 | |
225 | // The thread that set the running bit also holds a ref-count. |
226 | assert!(snapshot.ref_count() > 0); |
227 | |
228 | action = TransitionToNotifiedByVal::DoNothing; |
229 | } else if snapshot.is_complete() || snapshot.is_notified() { |
230 | // We do not need to submit any notifications, but we have to |
231 | // decrement the ref-count. |
232 | snapshot.ref_dec(); |
233 | |
234 | if snapshot.ref_count() == 0 { |
235 | action = TransitionToNotifiedByVal::Dealloc; |
236 | } else { |
237 | action = TransitionToNotifiedByVal::DoNothing; |
238 | } |
239 | } else { |
240 | // We create a new notified that we can submit. The caller |
241 | // retains ownership of the ref-count they passed in. |
242 | snapshot.set_notified(); |
243 | snapshot.ref_inc(); |
244 | action = TransitionToNotifiedByVal::Submit; |
245 | } |
246 | |
247 | (action, Some(snapshot)) |
248 | }) |
249 | } |
250 | |
251 | /// Transitions the state to `NOTIFIED`. |
252 | pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef { |
253 | self.fetch_update_action(|mut snapshot| { |
254 | if snapshot.is_complete() || snapshot.is_notified() { |
255 | // There is nothing to do in this case. |
256 | (TransitionToNotifiedByRef::DoNothing, None) |
257 | } else if snapshot.is_running() { |
258 | // If the task is running, we mark it as notified, but we should |
259 | // not submit as the thread currently running the future is |
260 | // responsible for that. |
261 | snapshot.set_notified(); |
262 | (TransitionToNotifiedByRef::DoNothing, Some(snapshot)) |
263 | } else { |
264 | // The task is idle and not notified. We should submit a |
265 | // notification. |
266 | snapshot.set_notified(); |
267 | snapshot.ref_inc(); |
268 | (TransitionToNotifiedByRef::Submit, Some(snapshot)) |
269 | } |
270 | }) |
271 | } |
272 | |
273 | /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref count. |
274 | #[cfg (all( |
275 | tokio_unstable, |
276 | tokio_taskdump, |
277 | feature = "rt" , |
278 | target_os = "linux" , |
279 | any(target_arch = "aarch64" , target_arch = "x86" , target_arch = "x86_64" ) |
280 | ))] |
281 | pub(super) fn transition_to_notified_for_tracing(&self) { |
282 | self.fetch_update_action(|mut snapshot| { |
283 | snapshot.set_notified(); |
284 | snapshot.ref_inc(); |
285 | ((), Some(snapshot)) |
286 | }); |
287 | } |
288 | |
289 | /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle. |
290 | /// |
291 | /// Returns `true` if the task needs to be submitted to the pool for |
292 | /// execution. |
293 | pub(super) fn transition_to_notified_and_cancel(&self) -> bool { |
294 | self.fetch_update_action(|mut snapshot| { |
295 | if snapshot.is_cancelled() || snapshot.is_complete() { |
296 | // Aborts to completed or cancelled tasks are no-ops. |
297 | (false, None) |
298 | } else if snapshot.is_running() { |
299 | // If the task is running, we mark it as cancelled. The thread |
300 | // running the task will notice the cancelled bit when it |
301 | // stops polling and it will kill the task. |
302 | // |
303 | // The set_notified() call is not strictly necessary but it will |
304 | // in some cases let a wake_by_ref call return without having |
305 | // to perform a compare_exchange. |
306 | snapshot.set_notified(); |
307 | snapshot.set_cancelled(); |
308 | (false, Some(snapshot)) |
309 | } else { |
310 | // The task is idle. We set the cancelled and notified bits and |
311 | // submit a notification if the notified bit was not already |
312 | // set. |
313 | snapshot.set_cancelled(); |
314 | if !snapshot.is_notified() { |
315 | snapshot.set_notified(); |
316 | snapshot.ref_inc(); |
317 | (true, Some(snapshot)) |
318 | } else { |
319 | (false, Some(snapshot)) |
320 | } |
321 | } |
322 | }) |
323 | } |
324 | |
325 | /// Sets the `CANCELLED` bit and attempts to transition to `Running`. |
326 | /// |
327 | /// Returns `true` if the transition to `Running` succeeded. |
328 | pub(super) fn transition_to_shutdown(&self) -> bool { |
329 | let mut prev = Snapshot(0); |
330 | |
331 | let _ = self.fetch_update(|mut snapshot| { |
332 | prev = snapshot; |
333 | |
334 | if snapshot.is_idle() { |
335 | snapshot.set_running(); |
336 | } |
337 | |
338 | // If the task was not idle, the thread currently running the task |
339 | // will notice the cancelled bit and cancel it once the poll |
340 | // completes. |
341 | snapshot.set_cancelled(); |
342 | Some(snapshot) |
343 | }); |
344 | |
345 | prev.is_idle() |
346 | } |
347 | |
348 | /// Optimistically tries to swap the state assuming the join handle is |
349 | /// __immediately__ dropped on spawn. |
350 | pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> { |
351 | use std::sync::atomic::Ordering::Relaxed; |
352 | |
353 | // Relaxed is acceptable as if this function is called and succeeds, |
354 | // then nothing has been done w/ the join handle. |
355 | // |
356 | // The moment the join handle is used (polled), the `JOIN_WAKER` flag is |
357 | // set, at which point the CAS will fail. |
358 | // |
359 | // Given this, there is no risk if this operation is reordered. |
360 | self.val |
361 | .compare_exchange_weak( |
362 | INITIAL_STATE, |
363 | (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST, |
364 | Release, |
365 | Relaxed, |
366 | ) |
367 | .map(|_| ()) |
368 | .map_err(|_| ()) |
369 | } |
370 | |
371 | /// Tries to unset the JOIN_INTEREST flag. |
372 | /// |
373 | /// Returns `Ok` if the operation happens before the task transitions to a |
374 | /// completed state, `Err` otherwise. |
375 | pub(super) fn unset_join_interested(&self) -> UpdateResult { |
376 | self.fetch_update(|curr| { |
377 | assert!(curr.is_join_interested()); |
378 | |
379 | if curr.is_complete() { |
380 | return None; |
381 | } |
382 | |
383 | let mut next = curr; |
384 | next.unset_join_interested(); |
385 | |
386 | Some(next) |
387 | }) |
388 | } |
389 | |
390 | /// Sets the `JOIN_WAKER` bit. |
391 | /// |
392 | /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if |
393 | /// the task has completed. |
394 | pub(super) fn set_join_waker(&self) -> UpdateResult { |
395 | self.fetch_update(|curr| { |
396 | assert!(curr.is_join_interested()); |
397 | assert!(!curr.is_join_waker_set()); |
398 | |
399 | if curr.is_complete() { |
400 | return None; |
401 | } |
402 | |
403 | let mut next = curr; |
404 | next.set_join_waker(); |
405 | |
406 | Some(next) |
407 | }) |
408 | } |
409 | |
410 | /// Unsets the `JOIN_WAKER` bit. |
411 | /// |
412 | /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if |
413 | /// the task has completed. |
414 | pub(super) fn unset_waker(&self) -> UpdateResult { |
415 | self.fetch_update(|curr| { |
416 | assert!(curr.is_join_interested()); |
417 | assert!(curr.is_join_waker_set()); |
418 | |
419 | if curr.is_complete() { |
420 | return None; |
421 | } |
422 | |
423 | let mut next = curr; |
424 | next.unset_join_waker(); |
425 | |
426 | Some(next) |
427 | }) |
428 | } |
429 | |
430 | pub(super) fn ref_inc(&self) { |
431 | use std::process; |
432 | use std::sync::atomic::Ordering::Relaxed; |
433 | |
434 | // Using a relaxed ordering is alright here, as knowledge of the |
435 | // original reference prevents other threads from erroneously deleting |
436 | // the object. |
437 | // |
438 | // As explained in the [Boost documentation][1], Increasing the |
439 | // reference counter can always be done with memory_order_relaxed: New |
440 | // references to an object can only be formed from an existing |
441 | // reference, and passing an existing reference from one thread to |
442 | // another must already provide any required synchronization. |
443 | // |
444 | // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) |
445 | let prev = self.val.fetch_add(REF_ONE, Relaxed); |
446 | |
447 | // If the reference count overflowed, abort. |
448 | if prev > isize::MAX as usize { |
449 | process::abort(); |
450 | } |
451 | } |
452 | |
453 | /// Returns `true` if the task should be released. |
454 | pub(super) fn ref_dec(&self) -> bool { |
455 | let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel)); |
456 | assert!(prev.ref_count() >= 1); |
457 | prev.ref_count() == 1 |
458 | } |
459 | |
460 | /// Returns `true` if the task should be released. |
461 | pub(super) fn ref_dec_twice(&self) -> bool { |
462 | let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel)); |
463 | assert!(prev.ref_count() >= 2); |
464 | prev.ref_count() == 2 |
465 | } |
466 | |
467 | fn fetch_update_action<F, T>(&self, mut f: F) -> T |
468 | where |
469 | F: FnMut(Snapshot) -> (T, Option<Snapshot>), |
470 | { |
471 | let mut curr = self.load(); |
472 | |
473 | loop { |
474 | let (output, next) = f(curr); |
475 | let next = match next { |
476 | Some(next) => next, |
477 | None => return output, |
478 | }; |
479 | |
480 | let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); |
481 | |
482 | match res { |
483 | Ok(_) => return output, |
484 | Err(actual) => curr = Snapshot(actual), |
485 | } |
486 | } |
487 | } |
488 | |
489 | fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> |
490 | where |
491 | F: FnMut(Snapshot) -> Option<Snapshot>, |
492 | { |
493 | let mut curr = self.load(); |
494 | |
495 | loop { |
496 | let next = match f(curr) { |
497 | Some(next) => next, |
498 | None => return Err(curr), |
499 | }; |
500 | |
501 | let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); |
502 | |
503 | match res { |
504 | Ok(_) => return Ok(next), |
505 | Err(actual) => curr = Snapshot(actual), |
506 | } |
507 | } |
508 | } |
509 | } |
510 | |
511 | // ===== impl Snapshot ===== |
512 | |
513 | impl Snapshot { |
514 | /// Returns `true` if the task is in an idle state. |
515 | pub(super) fn is_idle(self) -> bool { |
516 | self.0 & (RUNNING | COMPLETE) == 0 |
517 | } |
518 | |
519 | /// Returns `true` if the task has been flagged as notified. |
520 | pub(super) fn is_notified(self) -> bool { |
521 | self.0 & NOTIFIED == NOTIFIED |
522 | } |
523 | |
524 | fn unset_notified(&mut self) { |
525 | self.0 &= !NOTIFIED |
526 | } |
527 | |
528 | fn set_notified(&mut self) { |
529 | self.0 |= NOTIFIED |
530 | } |
531 | |
532 | pub(super) fn is_running(self) -> bool { |
533 | self.0 & RUNNING == RUNNING |
534 | } |
535 | |
536 | fn set_running(&mut self) { |
537 | self.0 |= RUNNING; |
538 | } |
539 | |
540 | fn unset_running(&mut self) { |
541 | self.0 &= !RUNNING; |
542 | } |
543 | |
544 | pub(super) fn is_cancelled(self) -> bool { |
545 | self.0 & CANCELLED == CANCELLED |
546 | } |
547 | |
548 | fn set_cancelled(&mut self) { |
549 | self.0 |= CANCELLED; |
550 | } |
551 | |
552 | /// Returns `true` if the task's future has completed execution. |
553 | pub(super) fn is_complete(self) -> bool { |
554 | self.0 & COMPLETE == COMPLETE |
555 | } |
556 | |
557 | pub(super) fn is_join_interested(self) -> bool { |
558 | self.0 & JOIN_INTEREST == JOIN_INTEREST |
559 | } |
560 | |
561 | fn unset_join_interested(&mut self) { |
562 | self.0 &= !JOIN_INTEREST |
563 | } |
564 | |
565 | pub(super) fn is_join_waker_set(self) -> bool { |
566 | self.0 & JOIN_WAKER == JOIN_WAKER |
567 | } |
568 | |
569 | fn set_join_waker(&mut self) { |
570 | self.0 |= JOIN_WAKER; |
571 | } |
572 | |
573 | fn unset_join_waker(&mut self) { |
574 | self.0 &= !JOIN_WAKER |
575 | } |
576 | |
577 | pub(super) fn ref_count(self) -> usize { |
578 | (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT |
579 | } |
580 | |
581 | fn ref_inc(&mut self) { |
582 | assert!(self.0 <= isize::MAX as usize); |
583 | self.0 += REF_ONE; |
584 | } |
585 | |
586 | pub(super) fn ref_dec(&mut self) { |
587 | assert!(self.ref_count() > 0); |
588 | self.0 -= REF_ONE |
589 | } |
590 | } |
591 | |
592 | impl fmt::Debug for State { |
593 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
594 | let snapshot: Snapshot = self.load(); |
595 | snapshot.fmt(fmt) |
596 | } |
597 | } |
598 | |
599 | impl fmt::Debug for Snapshot { |
600 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
601 | fmt&mut DebugStruct<'_, '_>.debug_struct("Snapshot" ) |
602 | .field("is_running" , &self.is_running()) |
603 | .field("is_complete" , &self.is_complete()) |
604 | .field("is_notified" , &self.is_notified()) |
605 | .field("is_cancelled" , &self.is_cancelled()) |
606 | .field("is_join_interested" , &self.is_join_interested()) |
607 | .field("is_join_waker_set" , &self.is_join_waker_set()) |
608 | .field(name:"ref_count" , &self.ref_count()) |
609 | .finish() |
610 | } |
611 | } |
612 | |