1 | use crate::future::Future; |
2 | use crate::runtime::task::core::{Cell, Core, Header, Trailer}; |
3 | use crate::runtime::task::state::{Snapshot, State}; |
4 | use crate::runtime::task::waker::waker_ref; |
5 | use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task}; |
6 | |
7 | use crate::runtime::TaskMeta; |
8 | use std::any::Any; |
9 | use std::mem; |
10 | use std::mem::ManuallyDrop; |
11 | use std::panic; |
12 | use std::ptr::NonNull; |
13 | use std::task::{Context, Poll, Waker}; |
14 | |
15 | /// Typed raw task handle. |
16 | pub(super) struct Harness<T: Future, S: 'static> { |
17 | cell: NonNull<Cell<T, S>>, |
18 | } |
19 | |
20 | impl<T, S> Harness<T, S> |
21 | where |
22 | T: Future, |
23 | S: 'static, |
24 | { |
25 | pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> { |
26 | Harness { |
27 | cell: ptr.cast::<Cell<T, S>>(), |
28 | } |
29 | } |
30 | |
31 | fn header_ptr(&self) -> NonNull<Header> { |
32 | self.cell.cast() |
33 | } |
34 | |
35 | fn header(&self) -> &Header { |
36 | unsafe { &*self.header_ptr().as_ptr() } |
37 | } |
38 | |
39 | fn state(&self) -> &State { |
40 | &self.header().state |
41 | } |
42 | |
43 | fn trailer(&self) -> &Trailer { |
44 | unsafe { &self.cell.as_ref().trailer } |
45 | } |
46 | |
47 | fn core(&self) -> &Core<T, S> { |
48 | unsafe { &self.cell.as_ref().core } |
49 | } |
50 | } |
51 | |
52 | /// Task operations that can be implemented without being generic over the |
53 | /// scheduler or task. Only one version of these methods should exist in the |
54 | /// final binary. |
55 | impl RawTask { |
56 | pub(super) fn drop_reference(self) { |
57 | if self.state().ref_dec() { |
58 | self.dealloc(); |
59 | } |
60 | } |
61 | |
62 | /// This call consumes a ref-count and notifies the task. This will create a |
63 | /// new Notified and submit it if necessary. |
64 | /// |
65 | /// The caller does not need to hold a ref-count besides the one that was |
66 | /// passed to this call. |
67 | pub(super) fn wake_by_val(&self) { |
68 | use super::state::TransitionToNotifiedByVal; |
69 | |
70 | match self.state().transition_to_notified_by_val() { |
71 | TransitionToNotifiedByVal::Submit => { |
72 | // The caller has given us a ref-count, and the transition has |
73 | // created a new ref-count, so we now hold two. We turn the new |
74 | // ref-count Notified and pass it to the call to `schedule`. |
75 | // |
76 | // The old ref-count is retained for now to ensure that the task |
77 | // is not dropped during the call to `schedule` if the call |
78 | // drops the task it was given. |
79 | self.schedule(); |
80 | |
81 | // Now that we have completed the call to schedule, we can |
82 | // release our ref-count. |
83 | self.drop_reference(); |
84 | } |
85 | TransitionToNotifiedByVal::Dealloc => { |
86 | self.dealloc(); |
87 | } |
88 | TransitionToNotifiedByVal::DoNothing => {} |
89 | } |
90 | } |
91 | |
92 | /// This call notifies the task. It will not consume any ref-counts, but the |
93 | /// caller should hold a ref-count. This will create a new Notified and |
94 | /// submit it if necessary. |
95 | pub(super) fn wake_by_ref(&self) { |
96 | use super::state::TransitionToNotifiedByRef; |
97 | |
98 | match self.state().transition_to_notified_by_ref() { |
99 | TransitionToNotifiedByRef::Submit => { |
100 | // The transition above incremented the ref-count for a new task |
101 | // and the caller also holds a ref-count. The caller's ref-count |
102 | // ensures that the task is not destroyed even if the new task |
103 | // is dropped before `schedule` returns. |
104 | self.schedule(); |
105 | } |
106 | TransitionToNotifiedByRef::DoNothing => {} |
107 | } |
108 | } |
109 | |
110 | /// Remotely aborts the task. |
111 | /// |
112 | /// The caller should hold a ref-count, but we do not consume it. |
113 | /// |
114 | /// This is similar to `shutdown` except that it asks the runtime to perform |
115 | /// the shutdown. This is necessary to avoid the shutdown happening in the |
116 | /// wrong thread for non-Send tasks. |
117 | pub(super) fn remote_abort(&self) { |
118 | if self.state().transition_to_notified_and_cancel() { |
119 | // The transition has created a new ref-count, which we turn into |
120 | // a Notified and pass to the task. |
121 | // |
122 | // Since the caller holds a ref-count, the task cannot be destroyed |
123 | // before the call to `schedule` returns even if the call drops the |
124 | // `Notified` internally. |
125 | self.schedule(); |
126 | } |
127 | } |
128 | |
129 | /// Try to set the waker notified when the task is complete. Returns true if |
130 | /// the task has already completed. If this call returns false, then the |
131 | /// waker will not be notified. |
132 | pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool { |
133 | can_read_output(self.header(), self.trailer(), waker) |
134 | } |
135 | } |
136 | |
137 | impl<T, S> Harness<T, S> |
138 | where |
139 | T: Future, |
140 | S: Schedule, |
141 | { |
142 | pub(super) fn drop_reference(self) { |
143 | if self.state().ref_dec() { |
144 | self.dealloc(); |
145 | } |
146 | } |
147 | |
148 | /// Polls the inner future. A ref-count is consumed. |
149 | /// |
150 | /// All necessary state checks and transitions are performed. |
151 | /// Panics raised while polling the future are handled. |
152 | pub(super) fn poll(self) { |
153 | // We pass our ref-count to `poll_inner`. |
154 | match self.poll_inner() { |
155 | PollFuture::Notified => { |
156 | // The `poll_inner` call has given us two ref-counts back. |
157 | // We give one of them to a new task and call `yield_now`. |
158 | self.core() |
159 | .scheduler |
160 | .yield_now(Notified(self.get_new_task())); |
161 | |
162 | // The remaining ref-count is now dropped. We kept the extra |
163 | // ref-count until now to ensure that even if the `yield_now` |
164 | // call drops the provided task, the task isn't deallocated |
165 | // before after `yield_now` returns. |
166 | self.drop_reference(); |
167 | } |
168 | PollFuture::Complete => { |
169 | self.complete(); |
170 | } |
171 | PollFuture::Dealloc => { |
172 | self.dealloc(); |
173 | } |
174 | PollFuture::Done => (), |
175 | } |
176 | } |
177 | |
178 | /// Polls the task and cancel it if necessary. This takes ownership of a |
179 | /// ref-count. |
180 | /// |
181 | /// If the return value is Notified, the caller is given ownership of two |
182 | /// ref-counts. |
183 | /// |
184 | /// If the return value is Complete, the caller is given ownership of a |
185 | /// single ref-count, which should be passed on to `complete`. |
186 | /// |
187 | /// If the return value is `Dealloc`, then this call consumed the last |
188 | /// ref-count and the caller should call `dealloc`. |
189 | /// |
190 | /// Otherwise the ref-count is consumed and the caller should not access |
191 | /// `self` again. |
192 | fn poll_inner(&self) -> PollFuture { |
193 | use super::state::{TransitionToIdle, TransitionToRunning}; |
194 | |
195 | match self.state().transition_to_running() { |
196 | TransitionToRunning::Success => { |
197 | // Separated to reduce LLVM codegen |
198 | fn transition_result_to_poll_future(result: TransitionToIdle) -> PollFuture { |
199 | match result { |
200 | TransitionToIdle::Ok => PollFuture::Done, |
201 | TransitionToIdle::OkNotified => PollFuture::Notified, |
202 | TransitionToIdle::OkDealloc => PollFuture::Dealloc, |
203 | TransitionToIdle::Cancelled => PollFuture::Complete, |
204 | } |
205 | } |
206 | let header_ptr = self.header_ptr(); |
207 | let waker_ref = waker_ref::<S>(&header_ptr); |
208 | let cx = Context::from_waker(&waker_ref); |
209 | let res = poll_future(self.core(), cx); |
210 | |
211 | if res == Poll::Ready(()) { |
212 | // The future completed. Move on to complete the task. |
213 | return PollFuture::Complete; |
214 | } |
215 | |
216 | let transition_res = self.state().transition_to_idle(); |
217 | if let TransitionToIdle::Cancelled = transition_res { |
218 | // The transition to idle failed because the task was |
219 | // cancelled during the poll. |
220 | cancel_task(self.core()); |
221 | } |
222 | transition_result_to_poll_future(transition_res) |
223 | } |
224 | TransitionToRunning::Cancelled => { |
225 | cancel_task(self.core()); |
226 | PollFuture::Complete |
227 | } |
228 | TransitionToRunning::Failed => PollFuture::Done, |
229 | TransitionToRunning::Dealloc => PollFuture::Dealloc, |
230 | } |
231 | } |
232 | |
233 | /// Forcibly shuts down the task. |
234 | /// |
235 | /// Attempt to transition to `Running` in order to forcibly shutdown the |
236 | /// task. If the task is currently running or in a state of completion, then |
237 | /// there is nothing further to do. When the task completes running, it will |
238 | /// notice the `CANCELLED` bit and finalize the task. |
239 | pub(super) fn shutdown(self) { |
240 | if !self.state().transition_to_shutdown() { |
241 | // The task is concurrently running. No further work needed. |
242 | self.drop_reference(); |
243 | return; |
244 | } |
245 | |
246 | // By transitioning the lifecycle to `Running`, we have permission to |
247 | // drop the future. |
248 | cancel_task(self.core()); |
249 | self.complete(); |
250 | } |
251 | |
252 | pub(super) fn dealloc(self) { |
253 | // Observe that we expect to have mutable access to these objects |
254 | // because we are going to drop them. This only matters when running |
255 | // under loom. |
256 | self.trailer().waker.with_mut(|_| ()); |
257 | self.core().stage.with_mut(|_| ()); |
258 | |
259 | // Safety: The caller of this method just transitioned our ref-count to |
260 | // zero, so it is our responsibility to release the allocation. |
261 | // |
262 | // We don't hold any references into the allocation at this point, but |
263 | // it is possible for another thread to still hold a `&State` into the |
264 | // allocation if that other thread has decremented its last ref-count, |
265 | // but has not yet returned from the relevant method on `State`. |
266 | // |
267 | // However, the `State` type consists of just an `AtomicUsize`, and an |
268 | // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`. |
269 | // As explained in the documentation for `UnsafeCell`, such references |
270 | // are allowed to be dangling after their last use, even if the |
271 | // reference has not yet gone out of scope. |
272 | unsafe { |
273 | drop(Box::from_raw(self.cell.as_ptr())); |
274 | } |
275 | } |
276 | |
277 | // ===== join handle ===== |
278 | |
279 | /// Read the task output into `dst`. |
280 | pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) { |
281 | if can_read_output(self.header(), self.trailer(), waker) { |
282 | *dst = Poll::Ready(self.core().take_output()); |
283 | } |
284 | } |
285 | |
286 | pub(super) fn drop_join_handle_slow(self) { |
287 | // Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in |
288 | // case the task concurrently completed. |
289 | let transition = self.state().transition_to_join_handle_dropped(); |
290 | |
291 | if transition.drop_output { |
292 | // It is our responsibility to drop the output. This is critical as |
293 | // the task output may not be `Send` and as such must remain with |
294 | // the scheduler or `JoinHandle`. i.e. if the output remains in the |
295 | // task structure until the task is deallocated, it may be dropped |
296 | // by a Waker on any arbitrary thread. |
297 | // |
298 | // Panics are delivered to the user via the `JoinHandle`. Given that |
299 | // they are dropping the `JoinHandle`, we assume they are not |
300 | // interested in the panic and swallow it. |
301 | let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
302 | self.core().drop_future_or_output(); |
303 | })); |
304 | } |
305 | |
306 | if transition.drop_waker { |
307 | // If the JOIN_WAKER flag is unset at this point, the task is either |
308 | // already terminal or not complete so the `JoinHandle` is responsible |
309 | // for dropping the waker. |
310 | // Safety: |
311 | // If the JOIN_WAKER bit is not set the join handle has exclusive |
312 | // access to the waker as per rule 2 in task/mod.rs. |
313 | // This can only be the case at this point in two scenarios: |
314 | // 1. The task completed and the runtime unset `JOIN_WAKER` flag |
315 | // after accessing the waker during task completion. So the |
316 | // `JoinHandle` is the only one to access the join waker here. |
317 | // 2. The task is not completed so the `JoinHandle` was able to unset |
318 | // `JOIN_WAKER` bit itself to get mutable access to the waker. |
319 | // The runtime will not access the waker when this flag is unset. |
320 | unsafe { self.trailer().set_waker(None) }; |
321 | } |
322 | |
323 | // Drop the `JoinHandle` reference, possibly deallocating the task |
324 | self.drop_reference(); |
325 | } |
326 | |
327 | // ====== internal ====== |
328 | |
329 | /// Completes the task. This method assumes that the state is RUNNING. |
330 | fn complete(self) { |
331 | // The future has completed and its output has been written to the task |
332 | // stage. We transition from running to complete. |
333 | let snapshot = self.state().transition_to_complete(); |
334 | |
335 | // We catch panics here in case dropping the future or waking the |
336 | // JoinHandle panics. |
337 | let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
338 | if !snapshot.is_join_interested() { |
339 | // The `JoinHandle` is not interested in the output of |
340 | // this task. It is our responsibility to drop the |
341 | // output. The join waker was already dropped by the |
342 | // `JoinHandle` before. |
343 | self.core().drop_future_or_output(); |
344 | } else if snapshot.is_join_waker_set() { |
345 | // Notify the waker. Reading the waker field is safe per rule 4 |
346 | // in task/mod.rs, since the JOIN_WAKER bit is set and the call |
347 | // to transition_to_complete() above set the COMPLETE bit. |
348 | self.trailer().wake_join(); |
349 | |
350 | // Inform the `JoinHandle` that we are done waking the waker by |
351 | // unsetting the `JOIN_WAKER` bit. If the `JoinHandle` has |
352 | // already been dropped and `JOIN_INTEREST` is unset, then we must |
353 | // drop the waker ourselves. |
354 | if !self |
355 | .state() |
356 | .unset_waker_after_complete() |
357 | .is_join_interested() |
358 | { |
359 | // SAFETY: We have COMPLETE=1 and JOIN_INTEREST=0, so |
360 | // we have exclusive access to the waker. |
361 | unsafe { self.trailer().set_waker(None) }; |
362 | } |
363 | } |
364 | })); |
365 | |
366 | // We catch panics here in case invoking a hook panics. |
367 | // |
368 | // We call this in a separate block so that it runs after the task appears to have |
369 | // completed and will still run if the destructor panics. |
370 | if let Some(f) = self.trailer().hooks.task_terminate_callback.as_ref() { |
371 | let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
372 | f(&TaskMeta { |
373 | id: self.core().task_id, |
374 | _phantom: Default::default(), |
375 | }) |
376 | })); |
377 | } |
378 | |
379 | // The task has completed execution and will no longer be scheduled. |
380 | let num_release = self.release(); |
381 | |
382 | if self.state().transition_to_terminal(num_release) { |
383 | self.dealloc(); |
384 | } |
385 | } |
386 | |
387 | /// Releases the task from the scheduler. Returns the number of ref-counts |
388 | /// that should be decremented. |
389 | fn release(&self) -> usize { |
390 | // We don't actually increment the ref-count here, but the new task is |
391 | // never destroyed, so that's ok. |
392 | let me = ManuallyDrop::new(self.get_new_task()); |
393 | |
394 | if let Some(task) = self.core().scheduler.release(&me) { |
395 | mem::forget(task); |
396 | 2 |
397 | } else { |
398 | 1 |
399 | } |
400 | } |
401 | |
402 | /// Creates a new task that holds its own ref-count. |
403 | /// |
404 | /// # Safety |
405 | /// |
406 | /// Any use of `self` after this call must ensure that a ref-count to the |
407 | /// task holds the task alive until after the use of `self`. Passing the |
408 | /// returned Task to any method on `self` is unsound if dropping the Task |
409 | /// could drop `self` before the call on `self` returned. |
410 | fn get_new_task(&self) -> Task<S> { |
411 | // safety: The header is at the beginning of the cell, so this cast is |
412 | // safe. |
413 | unsafe { Task::from_raw(self.cell.cast()) } |
414 | } |
415 | } |
416 | |
417 | fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool { |
418 | // Load a snapshot of the current task state |
419 | let snapshot = header.state.load(); |
420 | |
421 | debug_assert!(snapshot.is_join_interested()); |
422 | |
423 | if !snapshot.is_complete() { |
424 | // If the task is not complete, try storing the provided waker in the |
425 | // task's waker field. |
426 | |
427 | let res = if snapshot.is_join_waker_set() { |
428 | // If JOIN_WAKER is set, then JoinHandle has previously stored a |
429 | // waker in the waker field per step (iii) of rule 5 in task/mod.rs. |
430 | |
431 | // Optimization: if the stored waker and the provided waker wake the |
432 | // same task, then return without touching the waker field. (Reading |
433 | // the waker field below is safe per rule 3 in task/mod.rs.) |
434 | if unsafe { trailer.will_wake(waker) } { |
435 | return false; |
436 | } |
437 | |
438 | // Otherwise swap the stored waker with the provided waker by |
439 | // following the rule 5 in task/mod.rs. |
440 | header |
441 | .state |
442 | .unset_waker() |
443 | .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot)) |
444 | } else { |
445 | // If JOIN_WAKER is unset, then JoinHandle has mutable access to the |
446 | // waker field per rule 2 in task/mod.rs; therefore, skip step (i) |
447 | // of rule 5 and try to store the provided waker in the waker field. |
448 | set_join_waker(header, trailer, waker.clone(), snapshot) |
449 | }; |
450 | |
451 | match res { |
452 | Ok(_) => return false, |
453 | Err(snapshot) => { |
454 | assert!(snapshot.is_complete()); |
455 | } |
456 | } |
457 | } |
458 | true |
459 | } |
460 | |
461 | fn set_join_waker( |
462 | header: &Header, |
463 | trailer: &Trailer, |
464 | waker: Waker, |
465 | snapshot: Snapshot, |
466 | ) -> Result<Snapshot, Snapshot> { |
467 | assert!(snapshot.is_join_interested()); |
468 | assert!(!snapshot.is_join_waker_set()); |
469 | |
470 | // Safety: Only the `JoinHandle` may set the `waker` field. When |
471 | // `JOIN_INTEREST` is **not** set, nothing else will touch the field. |
472 | unsafe { |
473 | trailer.set_waker(Some(waker)); |
474 | } |
475 | |
476 | // Update the `JoinWaker` state accordingly |
477 | let res: Result = header.state.set_join_waker(); |
478 | |
479 | // If the state could not be updated, then clear the join waker |
480 | if res.is_err() { |
481 | unsafe { |
482 | trailer.set_waker(None); |
483 | } |
484 | } |
485 | |
486 | res |
487 | } |
488 | |
489 | enum PollFuture { |
490 | Complete, |
491 | Notified, |
492 | Done, |
493 | Dealloc, |
494 | } |
495 | |
496 | /// Cancels the task and store the appropriate error in the stage field. |
497 | fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) { |
498 | // Drop the future from a panic guard. |
499 | let res: Result<(), Box> = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
500 | core.drop_future_or_output(); |
501 | })); |
502 | |
503 | core.store_output(Err(panic_result_to_join_error(core.task_id, res))); |
504 | } |
505 | |
506 | fn panic_result_to_join_error( |
507 | task_id: Id, |
508 | res: Result<(), Box<dyn Any + Send + 'static>>, |
509 | ) -> JoinError { |
510 | match res { |
511 | Ok(()) => JoinError::cancelled(task_id), |
512 | Err(panic: Box) => JoinError::panic(task_id, err:panic), |
513 | } |
514 | } |
515 | |
516 | /// Polls the future. If the future completes, the output is written to the |
517 | /// stage field. |
518 | fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> { |
519 | // Poll the future. |
520 | let output = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
521 | struct Guard<'a, T: Future, S: Schedule> { |
522 | core: &'a Core<T, S>, |
523 | } |
524 | impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> { |
525 | fn drop(&mut self) { |
526 | // If the future panics on poll, we drop it inside the panic |
527 | // guard. |
528 | self.core.drop_future_or_output(); |
529 | } |
530 | } |
531 | let guard = Guard { core }; |
532 | let res = guard.core.poll(cx); |
533 | mem::forget(guard); |
534 | res |
535 | })); |
536 | |
537 | // Prepare output for being placed in the core stage. |
538 | let output = match output { |
539 | Ok(Poll::Pending) => return Poll::Pending, |
540 | Ok(Poll::Ready(output)) => Ok(output), |
541 | Err(panic) => Err(panic_to_error(&core.scheduler, core.task_id, panic)), |
542 | }; |
543 | |
544 | // Catch and ignore panics if the future panics on drop. |
545 | let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
546 | core.store_output(output); |
547 | })); |
548 | |
549 | if res.is_err() { |
550 | core.scheduler.unhandled_panic(); |
551 | } |
552 | |
553 | Poll::Ready(()) |
554 | } |
555 | |
556 | #[cold ] |
557 | fn panic_to_error<S: Schedule>( |
558 | scheduler: &S, |
559 | task_id: Id, |
560 | panic: Box<dyn Any + Send + 'static>, |
561 | ) -> JoinError { |
562 | scheduler.unhandled_panic(); |
563 | JoinError::panic(task_id, err:panic) |
564 | } |
565 | |