1 | use crate::future::Future; |
2 | use crate::runtime::task::core::{Cell, Core, Header, Trailer}; |
3 | use crate::runtime::task::state::{Snapshot, State}; |
4 | use crate::runtime::task::waker::waker_ref; |
5 | use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task}; |
6 | |
7 | use std::any::Any; |
8 | use std::mem; |
9 | use std::mem::ManuallyDrop; |
10 | use std::panic; |
11 | use std::ptr::NonNull; |
12 | use std::task::{Context, Poll, Waker}; |
13 | |
14 | /// Typed raw task handle. |
15 | pub(super) struct Harness<T: Future, S: 'static> { |
16 | cell: NonNull<Cell<T, S>>, |
17 | } |
18 | |
19 | impl<T, S> Harness<T, S> |
20 | where |
21 | T: Future, |
22 | S: 'static, |
23 | { |
24 | pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> { |
25 | Harness { |
26 | cell: ptr.cast::<Cell<T, S>>(), |
27 | } |
28 | } |
29 | |
30 | fn header_ptr(&self) -> NonNull<Header> { |
31 | self.cell.cast() |
32 | } |
33 | |
34 | fn header(&self) -> &Header { |
35 | unsafe { &*self.header_ptr().as_ptr() } |
36 | } |
37 | |
38 | fn state(&self) -> &State { |
39 | &self.header().state |
40 | } |
41 | |
42 | fn trailer(&self) -> &Trailer { |
43 | unsafe { &self.cell.as_ref().trailer } |
44 | } |
45 | |
46 | fn core(&self) -> &Core<T, S> { |
47 | unsafe { &self.cell.as_ref().core } |
48 | } |
49 | } |
50 | |
51 | /// Task operations that can be implemented without being generic over the |
52 | /// scheduler or task. Only one version of these methods should exist in the |
53 | /// final binary. |
54 | impl RawTask { |
55 | pub(super) fn drop_reference(self) { |
56 | if self.state().ref_dec() { |
57 | self.dealloc(); |
58 | } |
59 | } |
60 | |
61 | /// This call consumes a ref-count and notifies the task. This will create a |
62 | /// new Notified and submit it if necessary. |
63 | /// |
64 | /// The caller does not need to hold a ref-count besides the one that was |
65 | /// passed to this call. |
66 | pub(super) fn wake_by_val(&self) { |
67 | use super::state::TransitionToNotifiedByVal; |
68 | |
69 | match self.state().transition_to_notified_by_val() { |
70 | TransitionToNotifiedByVal::Submit => { |
71 | // The caller has given us a ref-count, and the transition has |
72 | // created a new ref-count, so we now hold two. We turn the new |
73 | // ref-count Notified and pass it to the call to `schedule`. |
74 | // |
75 | // The old ref-count is retained for now to ensure that the task |
76 | // is not dropped during the call to `schedule` if the call |
77 | // drops the task it was given. |
78 | self.schedule(); |
79 | |
80 | // Now that we have completed the call to schedule, we can |
81 | // release our ref-count. |
82 | self.drop_reference(); |
83 | } |
84 | TransitionToNotifiedByVal::Dealloc => { |
85 | self.dealloc(); |
86 | } |
87 | TransitionToNotifiedByVal::DoNothing => {} |
88 | } |
89 | } |
90 | |
91 | /// This call notifies the task. It will not consume any ref-counts, but the |
92 | /// caller should hold a ref-count. This will create a new Notified and |
93 | /// submit it if necessary. |
94 | pub(super) fn wake_by_ref(&self) { |
95 | use super::state::TransitionToNotifiedByRef; |
96 | |
97 | match self.state().transition_to_notified_by_ref() { |
98 | TransitionToNotifiedByRef::Submit => { |
99 | // The transition above incremented the ref-count for a new task |
100 | // and the caller also holds a ref-count. The caller's ref-count |
101 | // ensures that the task is not destroyed even if the new task |
102 | // is dropped before `schedule` returns. |
103 | self.schedule(); |
104 | } |
105 | TransitionToNotifiedByRef::DoNothing => {} |
106 | } |
107 | } |
108 | |
109 | /// Remotely aborts the task. |
110 | /// |
111 | /// The caller should hold a ref-count, but we do not consume it. |
112 | /// |
113 | /// This is similar to `shutdown` except that it asks the runtime to perform |
114 | /// the shutdown. This is necessary to avoid the shutdown happening in the |
115 | /// wrong thread for non-Send tasks. |
116 | pub(super) fn remote_abort(&self) { |
117 | if self.state().transition_to_notified_and_cancel() { |
118 | // The transition has created a new ref-count, which we turn into |
119 | // a Notified and pass to the task. |
120 | // |
121 | // Since the caller holds a ref-count, the task cannot be destroyed |
122 | // before the call to `schedule` returns even if the call drops the |
123 | // `Notified` internally. |
124 | self.schedule(); |
125 | } |
126 | } |
127 | |
128 | /// Try to set the waker notified when the task is complete. Returns true if |
129 | /// the task has already completed. If this call returns false, then the |
130 | /// waker will not be notified. |
131 | pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool { |
132 | can_read_output(self.header(), self.trailer(), waker) |
133 | } |
134 | } |
135 | |
136 | impl<T, S> Harness<T, S> |
137 | where |
138 | T: Future, |
139 | S: Schedule, |
140 | { |
141 | pub(super) fn drop_reference(self) { |
142 | if self.state().ref_dec() { |
143 | self.dealloc(); |
144 | } |
145 | } |
146 | |
147 | /// Polls the inner future. A ref-count is consumed. |
148 | /// |
149 | /// All necessary state checks and transitions are performed. |
150 | /// Panics raised while polling the future are handled. |
151 | pub(super) fn poll(self) { |
152 | // We pass our ref-count to `poll_inner`. |
153 | match self.poll_inner() { |
154 | PollFuture::Notified => { |
155 | // The `poll_inner` call has given us two ref-counts back. |
156 | // We give one of them to a new task and call `yield_now`. |
157 | self.core() |
158 | .scheduler |
159 | .yield_now(Notified(self.get_new_task())); |
160 | |
161 | // The remaining ref-count is now dropped. We kept the extra |
162 | // ref-count until now to ensure that even if the `yield_now` |
163 | // call drops the provided task, the task isn't deallocated |
164 | // before after `yield_now` returns. |
165 | self.drop_reference(); |
166 | } |
167 | PollFuture::Complete => { |
168 | self.complete(); |
169 | } |
170 | PollFuture::Dealloc => { |
171 | self.dealloc(); |
172 | } |
173 | PollFuture::Done => (), |
174 | } |
175 | } |
176 | |
177 | /// Polls the task and cancel it if necessary. This takes ownership of a |
178 | /// ref-count. |
179 | /// |
180 | /// If the return value is Notified, the caller is given ownership of two |
181 | /// ref-counts. |
182 | /// |
183 | /// If the return value is Complete, the caller is given ownership of a |
184 | /// single ref-count, which should be passed on to `complete`. |
185 | /// |
186 | /// If the return value is Dealloc, then this call consumed the last |
187 | /// ref-count and the caller should call `dealloc`. |
188 | /// |
189 | /// Otherwise the ref-count is consumed and the caller should not access |
190 | /// `self` again. |
191 | fn poll_inner(&self) -> PollFuture { |
192 | use super::state::{TransitionToIdle, TransitionToRunning}; |
193 | |
194 | match self.state().transition_to_running() { |
195 | TransitionToRunning::Success => { |
196 | // Separated to reduce LLVM codegen |
197 | fn transition_result_to_poll_future(result: TransitionToIdle) -> PollFuture { |
198 | match result { |
199 | TransitionToIdle::Ok => PollFuture::Done, |
200 | TransitionToIdle::OkNotified => PollFuture::Notified, |
201 | TransitionToIdle::OkDealloc => PollFuture::Dealloc, |
202 | TransitionToIdle::Cancelled => PollFuture::Complete, |
203 | } |
204 | } |
205 | let header_ptr = self.header_ptr(); |
206 | let waker_ref = waker_ref::<S>(&header_ptr); |
207 | let cx = Context::from_waker(&waker_ref); |
208 | let res = poll_future(self.core(), cx); |
209 | |
210 | if res == Poll::Ready(()) { |
211 | // The future completed. Move on to complete the task. |
212 | return PollFuture::Complete; |
213 | } |
214 | |
215 | let transition_res = self.state().transition_to_idle(); |
216 | if let TransitionToIdle::Cancelled = transition_res { |
217 | // The transition to idle failed because the task was |
218 | // cancelled during the poll. |
219 | cancel_task(self.core()); |
220 | } |
221 | transition_result_to_poll_future(transition_res) |
222 | } |
223 | TransitionToRunning::Cancelled => { |
224 | cancel_task(self.core()); |
225 | PollFuture::Complete |
226 | } |
227 | TransitionToRunning::Failed => PollFuture::Done, |
228 | TransitionToRunning::Dealloc => PollFuture::Dealloc, |
229 | } |
230 | } |
231 | |
232 | /// Forcibly shuts down the task. |
233 | /// |
234 | /// Attempt to transition to `Running` in order to forcibly shutdown the |
235 | /// task. If the task is currently running or in a state of completion, then |
236 | /// there is nothing further to do. When the task completes running, it will |
237 | /// notice the `CANCELLED` bit and finalize the task. |
238 | pub(super) fn shutdown(self) { |
239 | if !self.state().transition_to_shutdown() { |
240 | // The task is concurrently running. No further work needed. |
241 | self.drop_reference(); |
242 | return; |
243 | } |
244 | |
245 | // By transitioning the lifecycle to `Running`, we have permission to |
246 | // drop the future. |
247 | cancel_task(self.core()); |
248 | self.complete(); |
249 | } |
250 | |
251 | pub(super) fn dealloc(self) { |
252 | // Release the join waker, if there is one. |
253 | self.trailer().waker.with_mut(drop); |
254 | |
255 | // Check causality |
256 | self.core().stage.with_mut(drop); |
257 | |
258 | // Safety: The caller of this method just transitioned our ref-count to |
259 | // zero, so it is our responsibility to release the allocation. |
260 | // |
261 | // We don't hold any references into the allocation at this point, but |
262 | // it is possible for another thread to still hold a `&State` into the |
263 | // allocation if that other thread has decremented its last ref-count, |
264 | // but has not yet returned from the relevant method on `State`. |
265 | // |
266 | // However, the `State` type consists of just an `AtomicUsize`, and an |
267 | // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`. |
268 | // As explained in the documentation for `UnsafeCell`, such references |
269 | // are allowed to be dangling after their last use, even if the |
270 | // reference has not yet gone out of scope. |
271 | unsafe { |
272 | drop(Box::from_raw(self.cell.as_ptr())); |
273 | } |
274 | } |
275 | |
276 | // ===== join handle ===== |
277 | |
278 | /// Read the task output into `dst`. |
279 | pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) { |
280 | if can_read_output(self.header(), self.trailer(), waker) { |
281 | *dst = Poll::Ready(self.core().take_output()); |
282 | } |
283 | } |
284 | |
285 | pub(super) fn drop_join_handle_slow(self) { |
286 | // Try to unset `JOIN_INTEREST`. This must be done as a first step in |
287 | // case the task concurrently completed. |
288 | if self.state().unset_join_interested().is_err() { |
289 | // It is our responsibility to drop the output. This is critical as |
290 | // the task output may not be `Send` and as such must remain with |
291 | // the scheduler or `JoinHandle`. i.e. if the output remains in the |
292 | // task structure until the task is deallocated, it may be dropped |
293 | // by a Waker on any arbitrary thread. |
294 | // |
295 | // Panics are delivered to the user via the `JoinHandle`. Given that |
296 | // they are dropping the `JoinHandle`, we assume they are not |
297 | // interested in the panic and swallow it. |
298 | let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
299 | self.core().drop_future_or_output(); |
300 | })); |
301 | } |
302 | |
303 | // Drop the `JoinHandle` reference, possibly deallocating the task |
304 | self.drop_reference(); |
305 | } |
306 | |
307 | // ====== internal ====== |
308 | |
309 | /// Completes the task. This method assumes that the state is RUNNING. |
310 | fn complete(self) { |
311 | // The future has completed and its output has been written to the task |
312 | // stage. We transition from running to complete. |
313 | |
314 | let snapshot = self.state().transition_to_complete(); |
315 | |
316 | // We catch panics here in case dropping the future or waking the |
317 | // JoinHandle panics. |
318 | let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
319 | if !snapshot.is_join_interested() { |
320 | // The `JoinHandle` is not interested in the output of |
321 | // this task. It is our responsibility to drop the |
322 | // output. |
323 | self.core().drop_future_or_output(); |
324 | } else if snapshot.is_join_waker_set() { |
325 | // Notify the waker. Reading the waker field is safe per rule 4 |
326 | // in task/mod.rs, since the JOIN_WAKER bit is set and the call |
327 | // to transition_to_complete() above set the COMPLETE bit. |
328 | self.trailer().wake_join(); |
329 | } |
330 | })); |
331 | |
332 | // The task has completed execution and will no longer be scheduled. |
333 | let num_release = self.release(); |
334 | |
335 | if self.state().transition_to_terminal(num_release) { |
336 | self.dealloc(); |
337 | } |
338 | } |
339 | |
340 | /// Releases the task from the scheduler. Returns the number of ref-counts |
341 | /// that should be decremented. |
342 | fn release(&self) -> usize { |
343 | // We don't actually increment the ref-count here, but the new task is |
344 | // never destroyed, so that's ok. |
345 | let me = ManuallyDrop::new(self.get_new_task()); |
346 | |
347 | if let Some(task) = self.core().scheduler.release(&me) { |
348 | mem::forget(task); |
349 | 2 |
350 | } else { |
351 | 1 |
352 | } |
353 | } |
354 | |
355 | /// Creates a new task that holds its own ref-count. |
356 | /// |
357 | /// # Safety |
358 | /// |
359 | /// Any use of `self` after this call must ensure that a ref-count to the |
360 | /// task holds the task alive until after the use of `self`. Passing the |
361 | /// returned Task to any method on `self` is unsound if dropping the Task |
362 | /// could drop `self` before the call on `self` returned. |
363 | fn get_new_task(&self) -> Task<S> { |
364 | // safety: The header is at the beginning of the cell, so this cast is |
365 | // safe. |
366 | unsafe { Task::from_raw(self.cell.cast()) } |
367 | } |
368 | } |
369 | |
370 | fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool { |
371 | // Load a snapshot of the current task state |
372 | let snapshot = header.state.load(); |
373 | |
374 | debug_assert!(snapshot.is_join_interested()); |
375 | |
376 | if !snapshot.is_complete() { |
377 | // If the task is not complete, try storing the provided waker in the |
378 | // task's waker field. |
379 | |
380 | let res = if snapshot.is_join_waker_set() { |
381 | // If JOIN_WAKER is set, then JoinHandle has previously stored a |
382 | // waker in the waker field per step (iii) of rule 5 in task/mod.rs. |
383 | |
384 | // Optimization: if the stored waker and the provided waker wake the |
385 | // same task, then return without touching the waker field. (Reading |
386 | // the waker field below is safe per rule 3 in task/mod.rs.) |
387 | if unsafe { trailer.will_wake(waker) } { |
388 | return false; |
389 | } |
390 | |
391 | // Otherwise swap the stored waker with the provided waker by |
392 | // following the rule 5 in task/mod.rs. |
393 | header |
394 | .state |
395 | .unset_waker() |
396 | .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot)) |
397 | } else { |
398 | // If JOIN_WAKER is unset, then JoinHandle has mutable access to the |
399 | // waker field per rule 2 in task/mod.rs; therefore, skip step (i) |
400 | // of rule 5 and try to store the provided waker in the waker field. |
401 | set_join_waker(header, trailer, waker.clone(), snapshot) |
402 | }; |
403 | |
404 | match res { |
405 | Ok(_) => return false, |
406 | Err(snapshot) => { |
407 | assert!(snapshot.is_complete()); |
408 | } |
409 | } |
410 | } |
411 | true |
412 | } |
413 | |
414 | fn set_join_waker( |
415 | header: &Header, |
416 | trailer: &Trailer, |
417 | waker: Waker, |
418 | snapshot: Snapshot, |
419 | ) -> Result<Snapshot, Snapshot> { |
420 | assert!(snapshot.is_join_interested()); |
421 | assert!(!snapshot.is_join_waker_set()); |
422 | |
423 | // Safety: Only the `JoinHandle` may set the `waker` field. When |
424 | // `JOIN_INTEREST` is **not** set, nothing else will touch the field. |
425 | unsafe { |
426 | trailer.set_waker(Some(waker)); |
427 | } |
428 | |
429 | // Update the `JoinWaker` state accordingly |
430 | let res = header.state.set_join_waker(); |
431 | |
432 | // If the state could not be updated, then clear the join waker |
433 | if res.is_err() { |
434 | unsafe { |
435 | trailer.set_waker(None); |
436 | } |
437 | } |
438 | |
439 | res |
440 | } |
441 | |
442 | enum PollFuture { |
443 | Complete, |
444 | Notified, |
445 | Done, |
446 | Dealloc, |
447 | } |
448 | |
449 | /// Cancels the task and store the appropriate error in the stage field. |
450 | fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) { |
451 | // Drop the future from a panic guard. |
452 | let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
453 | core.drop_future_or_output(); |
454 | })); |
455 | |
456 | core.store_output(Err(panic_result_to_join_error(core.task_id, res))); |
457 | } |
458 | |
459 | fn panic_result_to_join_error( |
460 | task_id: Id, |
461 | res: Result<(), Box<dyn Any + Send + 'static>>, |
462 | ) -> JoinError { |
463 | match res { |
464 | Ok(()) => JoinError::cancelled(task_id), |
465 | Err(panic) => JoinError::panic(task_id, panic), |
466 | } |
467 | } |
468 | |
469 | /// Polls the future. If the future completes, the output is written to the |
470 | /// stage field. |
471 | fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> { |
472 | // Poll the future. |
473 | let output = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
474 | struct Guard<'a, T: Future, S: Schedule> { |
475 | core: &'a Core<T, S>, |
476 | } |
477 | impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> { |
478 | fn drop(&mut self) { |
479 | // If the future panics on poll, we drop it inside the panic |
480 | // guard. |
481 | self.core.drop_future_or_output(); |
482 | } |
483 | } |
484 | let guard = Guard { core }; |
485 | let res = guard.core.poll(cx); |
486 | mem::forget(guard); |
487 | res |
488 | })); |
489 | |
490 | // Prepare output for being placed in the core stage. |
491 | let output = match output { |
492 | Ok(Poll::Pending) => return Poll::Pending, |
493 | Ok(Poll::Ready(output)) => Ok(output), |
494 | Err(panic) => Err(panic_to_error(&core.scheduler, core.task_id, panic)), |
495 | }; |
496 | |
497 | // Catch and ignore panics if the future panics on drop. |
498 | let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
499 | core.store_output(output); |
500 | })); |
501 | |
502 | if res.is_err() { |
503 | core.scheduler.unhandled_panic(); |
504 | } |
505 | |
506 | Poll::Ready(()) |
507 | } |
508 | |
509 | #[cold ] |
510 | fn panic_to_error<S: Schedule>( |
511 | scheduler: &S, |
512 | task_id: Id, |
513 | panic: Box<dyn Any + Send + 'static>, |
514 | ) -> JoinError { |
515 | scheduler.unhandled_panic(); |
516 | JoinError::panic(task_id, panic) |
517 | } |
518 | |