1 | use crate::future::Future; |
2 | use crate::runtime::task::core::{Cell, Core, Header, Trailer}; |
3 | use crate::runtime::task::state::{Snapshot, State}; |
4 | use crate::runtime::task::waker::waker_ref; |
5 | use crate::runtime::task::{JoinError, Notified, RawTask, Schedule, Task}; |
6 | |
7 | use std::mem; |
8 | use std::mem::ManuallyDrop; |
9 | use std::panic; |
10 | use std::ptr::NonNull; |
11 | use std::task::{Context, Poll, Waker}; |
12 | |
13 | /// Typed raw task handle. |
14 | pub(super) struct Harness<T: Future, S: 'static> { |
15 | cell: NonNull<Cell<T, S>>, |
16 | } |
17 | |
18 | impl<T, S> Harness<T, S> |
19 | where |
20 | T: Future, |
21 | S: 'static, |
22 | { |
23 | pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> { |
24 | Harness { |
25 | cell: ptr.cast::<Cell<T, S>>(), |
26 | } |
27 | } |
28 | |
29 | fn header_ptr(&self) -> NonNull<Header> { |
30 | self.cell.cast() |
31 | } |
32 | |
33 | fn header(&self) -> &Header { |
34 | unsafe { &*self.header_ptr().as_ptr() } |
35 | } |
36 | |
37 | fn state(&self) -> &State { |
38 | &self.header().state |
39 | } |
40 | |
41 | fn trailer(&self) -> &Trailer { |
42 | unsafe { &self.cell.as_ref().trailer } |
43 | } |
44 | |
45 | fn core(&self) -> &Core<T, S> { |
46 | unsafe { &self.cell.as_ref().core } |
47 | } |
48 | } |
49 | |
50 | /// Task operations that can be implemented without being generic over the |
51 | /// scheduler or task. Only one version of these methods should exist in the |
52 | /// final binary. |
53 | impl RawTask { |
54 | pub(super) fn drop_reference(self) { |
55 | if self.state().ref_dec() { |
56 | self.dealloc(); |
57 | } |
58 | } |
59 | |
60 | /// This call consumes a ref-count and notifies the task. This will create a |
61 | /// new Notified and submit it if necessary. |
62 | /// |
63 | /// The caller does not need to hold a ref-count besides the one that was |
64 | /// passed to this call. |
65 | pub(super) fn wake_by_val(&self) { |
66 | use super::state::TransitionToNotifiedByVal; |
67 | |
68 | match self.state().transition_to_notified_by_val() { |
69 | TransitionToNotifiedByVal::Submit => { |
70 | // The caller has given us a ref-count, and the transition has |
71 | // created a new ref-count, so we now hold two. We turn the new |
72 | // ref-count Notified and pass it to the call to `schedule`. |
73 | // |
74 | // The old ref-count is retained for now to ensure that the task |
75 | // is not dropped during the call to `schedule` if the call |
76 | // drops the task it was given. |
77 | self.schedule(); |
78 | |
79 | // Now that we have completed the call to schedule, we can |
80 | // release our ref-count. |
81 | self.drop_reference(); |
82 | } |
83 | TransitionToNotifiedByVal::Dealloc => { |
84 | self.dealloc(); |
85 | } |
86 | TransitionToNotifiedByVal::DoNothing => {} |
87 | } |
88 | } |
89 | |
90 | /// This call notifies the task. It will not consume any ref-counts, but the |
91 | /// caller should hold a ref-count. This will create a new Notified and |
92 | /// submit it if necessary. |
93 | pub(super) fn wake_by_ref(&self) { |
94 | use super::state::TransitionToNotifiedByRef; |
95 | |
96 | match self.state().transition_to_notified_by_ref() { |
97 | TransitionToNotifiedByRef::Submit => { |
98 | // The transition above incremented the ref-count for a new task |
99 | // and the caller also holds a ref-count. The caller's ref-count |
100 | // ensures that the task is not destroyed even if the new task |
101 | // is dropped before `schedule` returns. |
102 | self.schedule(); |
103 | } |
104 | TransitionToNotifiedByRef::DoNothing => {} |
105 | } |
106 | } |
107 | |
108 | /// Remotely aborts the task. |
109 | /// |
110 | /// The caller should hold a ref-count, but we do not consume it. |
111 | /// |
112 | /// This is similar to `shutdown` except that it asks the runtime to perform |
113 | /// the shutdown. This is necessary to avoid the shutdown happening in the |
114 | /// wrong thread for non-Send tasks. |
115 | pub(super) fn remote_abort(&self) { |
116 | if self.state().transition_to_notified_and_cancel() { |
117 | // The transition has created a new ref-count, which we turn into |
118 | // a Notified and pass to the task. |
119 | // |
120 | // Since the caller holds a ref-count, the task cannot be destroyed |
121 | // before the call to `schedule` returns even if the call drops the |
122 | // `Notified` internally. |
123 | self.schedule(); |
124 | } |
125 | } |
126 | |
127 | /// Try to set the waker notified when the task is complete. Returns true if |
128 | /// the task has already completed. If this call returns false, then the |
129 | /// waker will not be notified. |
130 | pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool { |
131 | can_read_output(self.header(), self.trailer(), waker) |
132 | } |
133 | } |
134 | |
135 | impl<T, S> Harness<T, S> |
136 | where |
137 | T: Future, |
138 | S: Schedule, |
139 | { |
140 | pub(super) fn drop_reference(self) { |
141 | if self.state().ref_dec() { |
142 | self.dealloc(); |
143 | } |
144 | } |
145 | |
146 | /// Polls the inner future. A ref-count is consumed. |
147 | /// |
148 | /// All necessary state checks and transitions are performed. |
149 | /// Panics raised while polling the future are handled. |
150 | pub(super) fn poll(self) { |
151 | // We pass our ref-count to `poll_inner`. |
152 | match self.poll_inner() { |
153 | PollFuture::Notified => { |
154 | // The `poll_inner` call has given us two ref-counts back. |
155 | // We give one of them to a new task and call `yield_now`. |
156 | self.core() |
157 | .scheduler |
158 | .yield_now(Notified(self.get_new_task())); |
159 | |
160 | // The remaining ref-count is now dropped. We kept the extra |
161 | // ref-count until now to ensure that even if the `yield_now` |
162 | // call drops the provided task, the task isn't deallocated |
163 | // before after `yield_now` returns. |
164 | self.drop_reference(); |
165 | } |
166 | PollFuture::Complete => { |
167 | self.complete(); |
168 | } |
169 | PollFuture::Dealloc => { |
170 | self.dealloc(); |
171 | } |
172 | PollFuture::Done => (), |
173 | } |
174 | } |
175 | |
176 | /// Polls the task and cancel it if necessary. This takes ownership of a |
177 | /// ref-count. |
178 | /// |
179 | /// If the return value is Notified, the caller is given ownership of two |
180 | /// ref-counts. |
181 | /// |
182 | /// If the return value is Complete, the caller is given ownership of a |
183 | /// single ref-count, which should be passed on to `complete`. |
184 | /// |
185 | /// If the return value is Dealloc, then this call consumed the last |
186 | /// ref-count and the caller should call `dealloc`. |
187 | /// |
188 | /// Otherwise the ref-count is consumed and the caller should not access |
189 | /// `self` again. |
190 | fn poll_inner(&self) -> PollFuture { |
191 | use super::state::{TransitionToIdle, TransitionToRunning}; |
192 | |
193 | match self.state().transition_to_running() { |
194 | TransitionToRunning::Success => { |
195 | let header_ptr = self.header_ptr(); |
196 | let waker_ref = waker_ref::<T, S>(&header_ptr); |
197 | let cx = Context::from_waker(&waker_ref); |
198 | let res = poll_future(self.core(), cx); |
199 | |
200 | if res == Poll::Ready(()) { |
201 | // The future completed. Move on to complete the task. |
202 | return PollFuture::Complete; |
203 | } |
204 | |
205 | match self.state().transition_to_idle() { |
206 | TransitionToIdle::Ok => PollFuture::Done, |
207 | TransitionToIdle::OkNotified => PollFuture::Notified, |
208 | TransitionToIdle::OkDealloc => PollFuture::Dealloc, |
209 | TransitionToIdle::Cancelled => { |
210 | // The transition to idle failed because the task was |
211 | // cancelled during the poll. |
212 | cancel_task(self.core()); |
213 | PollFuture::Complete |
214 | } |
215 | } |
216 | } |
217 | TransitionToRunning::Cancelled => { |
218 | cancel_task(self.core()); |
219 | PollFuture::Complete |
220 | } |
221 | TransitionToRunning::Failed => PollFuture::Done, |
222 | TransitionToRunning::Dealloc => PollFuture::Dealloc, |
223 | } |
224 | } |
225 | |
226 | /// Forcibly shuts down the task. |
227 | /// |
228 | /// Attempt to transition to `Running` in order to forcibly shutdown the |
229 | /// task. If the task is currently running or in a state of completion, then |
230 | /// there is nothing further to do. When the task completes running, it will |
231 | /// notice the `CANCELLED` bit and finalize the task. |
232 | pub(super) fn shutdown(self) { |
233 | if !self.state().transition_to_shutdown() { |
234 | // The task is concurrently running. No further work needed. |
235 | self.drop_reference(); |
236 | return; |
237 | } |
238 | |
239 | // By transitioning the lifecycle to `Running`, we have permission to |
240 | // drop the future. |
241 | cancel_task(self.core()); |
242 | self.complete(); |
243 | } |
244 | |
245 | pub(super) fn dealloc(self) { |
246 | // Release the join waker, if there is one. |
247 | self.trailer().waker.with_mut(drop); |
248 | |
249 | // Check causality |
250 | self.core().stage.with_mut(drop); |
251 | |
252 | // Safety: The caller of this method just transitioned our ref-count to |
253 | // zero, so it is our responsibility to release the allocation. |
254 | // |
255 | // We don't hold any references into the allocation at this point, but |
256 | // it is possible for another thread to still hold a `&State` into the |
257 | // allocation if that other thread has decremented its last ref-count, |
258 | // but has not yet returned from the relevant method on `State`. |
259 | // |
260 | // However, the `State` type consists of just an `AtomicUsize`, and an |
261 | // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`. |
262 | // As explained in the documentation for `UnsafeCell`, such references |
263 | // are allowed to be dangling after their last use, even if the |
264 | // reference has not yet gone out of scope. |
265 | unsafe { |
266 | drop(Box::from_raw(self.cell.as_ptr())); |
267 | } |
268 | } |
269 | |
270 | // ===== join handle ===== |
271 | |
272 | /// Read the task output into `dst`. |
273 | pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) { |
274 | if can_read_output(self.header(), self.trailer(), waker) { |
275 | *dst = Poll::Ready(self.core().take_output()); |
276 | } |
277 | } |
278 | |
279 | pub(super) fn drop_join_handle_slow(self) { |
280 | // Try to unset `JOIN_INTEREST`. This must be done as a first step in |
281 | // case the task concurrently completed. |
282 | if self.state().unset_join_interested().is_err() { |
283 | // It is our responsibility to drop the output. This is critical as |
284 | // the task output may not be `Send` and as such must remain with |
285 | // the scheduler or `JoinHandle`. i.e. if the output remains in the |
286 | // task structure until the task is deallocated, it may be dropped |
287 | // by a Waker on any arbitrary thread. |
288 | // |
289 | // Panics are delivered to the user via the `JoinHandle`. Given that |
290 | // they are dropping the `JoinHandle`, we assume they are not |
291 | // interested in the panic and swallow it. |
292 | let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
293 | self.core().drop_future_or_output(); |
294 | })); |
295 | } |
296 | |
297 | // Drop the `JoinHandle` reference, possibly deallocating the task |
298 | self.drop_reference(); |
299 | } |
300 | |
301 | // ====== internal ====== |
302 | |
303 | /// Completes the task. This method assumes that the state is RUNNING. |
304 | fn complete(self) { |
305 | // The future has completed and its output has been written to the task |
306 | // stage. We transition from running to complete. |
307 | |
308 | let snapshot = self.state().transition_to_complete(); |
309 | |
310 | // We catch panics here in case dropping the future or waking the |
311 | // JoinHandle panics. |
312 | let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
313 | if !snapshot.is_join_interested() { |
314 | // The `JoinHandle` is not interested in the output of |
315 | // this task. It is our responsibility to drop the |
316 | // output. |
317 | self.core().drop_future_or_output(); |
318 | } else if snapshot.is_join_waker_set() { |
319 | // Notify the waker. Reading the waker field is safe per rule 4 |
320 | // in task/mod.rs, since the JOIN_WAKER bit is set and the call |
321 | // to transition_to_complete() above set the COMPLETE bit. |
322 | self.trailer().wake_join(); |
323 | } |
324 | })); |
325 | |
326 | // The task has completed execution and will no longer be scheduled. |
327 | let num_release = self.release(); |
328 | |
329 | if self.state().transition_to_terminal(num_release) { |
330 | self.dealloc(); |
331 | } |
332 | } |
333 | |
334 | /// Releases the task from the scheduler. Returns the number of ref-counts |
335 | /// that should be decremented. |
336 | fn release(&self) -> usize { |
337 | // We don't actually increment the ref-count here, but the new task is |
338 | // never destroyed, so that's ok. |
339 | let me = ManuallyDrop::new(self.get_new_task()); |
340 | |
341 | if let Some(task) = self.core().scheduler.release(&me) { |
342 | mem::forget(task); |
343 | 2 |
344 | } else { |
345 | 1 |
346 | } |
347 | } |
348 | |
349 | /// Creates a new task that holds its own ref-count. |
350 | /// |
351 | /// # Safety |
352 | /// |
353 | /// Any use of `self` after this call must ensure that a ref-count to the |
354 | /// task holds the task alive until after the use of `self`. Passing the |
355 | /// returned Task to any method on `self` is unsound if dropping the Task |
356 | /// could drop `self` before the call on `self` returned. |
357 | fn get_new_task(&self) -> Task<S> { |
358 | // safety: The header is at the beginning of the cell, so this cast is |
359 | // safe. |
360 | unsafe { Task::from_raw(self.cell.cast()) } |
361 | } |
362 | } |
363 | |
364 | fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool { |
365 | // Load a snapshot of the current task state |
366 | let snapshot = header.state.load(); |
367 | |
368 | debug_assert!(snapshot.is_join_interested()); |
369 | |
370 | if !snapshot.is_complete() { |
371 | // If the task is not complete, try storing the provided waker in the |
372 | // task's waker field. |
373 | |
374 | let res = if snapshot.is_join_waker_set() { |
375 | // If JOIN_WAKER is set, then JoinHandle has previously stored a |
376 | // waker in the waker field per step (iii) of rule 5 in task/mod.rs. |
377 | |
378 | // Optimization: if the stored waker and the provided waker wake the |
379 | // same task, then return without touching the waker field. (Reading |
380 | // the waker field below is safe per rule 3 in task/mod.rs.) |
381 | if unsafe { trailer.will_wake(waker) } { |
382 | return false; |
383 | } |
384 | |
385 | // Otherwise swap the stored waker with the provided waker by |
386 | // following the rule 5 in task/mod.rs. |
387 | header |
388 | .state |
389 | .unset_waker() |
390 | .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot)) |
391 | } else { |
392 | // If JOIN_WAKER is unset, then JoinHandle has mutable access to the |
393 | // waker field per rule 2 in task/mod.rs; therefore, skip step (i) |
394 | // of rule 5 and try to store the provided waker in the waker field. |
395 | set_join_waker(header, trailer, waker.clone(), snapshot) |
396 | }; |
397 | |
398 | match res { |
399 | Ok(_) => return false, |
400 | Err(snapshot) => { |
401 | assert!(snapshot.is_complete()); |
402 | } |
403 | } |
404 | } |
405 | true |
406 | } |
407 | |
408 | fn set_join_waker( |
409 | header: &Header, |
410 | trailer: &Trailer, |
411 | waker: Waker, |
412 | snapshot: Snapshot, |
413 | ) -> Result<Snapshot, Snapshot> { |
414 | assert!(snapshot.is_join_interested()); |
415 | assert!(!snapshot.is_join_waker_set()); |
416 | |
417 | // Safety: Only the `JoinHandle` may set the `waker` field. When |
418 | // `JOIN_INTEREST` is **not** set, nothing else will touch the field. |
419 | unsafe { |
420 | trailer.set_waker(Some(waker)); |
421 | } |
422 | |
423 | // Update the `JoinWaker` state accordingly |
424 | let res: Result = header.state.set_join_waker(); |
425 | |
426 | // If the state could not be updated, then clear the join waker |
427 | if res.is_err() { |
428 | unsafe { |
429 | trailer.set_waker(None); |
430 | } |
431 | } |
432 | |
433 | res |
434 | } |
435 | |
436 | enum PollFuture { |
437 | Complete, |
438 | Notified, |
439 | Done, |
440 | Dealloc, |
441 | } |
442 | |
443 | /// Cancels the task and store the appropriate error in the stage field. |
444 | fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) { |
445 | // Drop the future from a panic guard. |
446 | let res: Result<(), Box> = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
447 | core.drop_future_or_output(); |
448 | })); |
449 | |
450 | match res { |
451 | Ok(()) => { |
452 | core.store_output(Err(JoinError::cancelled(core.task_id))); |
453 | } |
454 | Err(panic: Box) => { |
455 | core.store_output(Err(JoinError::panic(core.task_id, err:panic))); |
456 | } |
457 | } |
458 | } |
459 | |
460 | /// Polls the future. If the future completes, the output is written to the |
461 | /// stage field. |
462 | fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> { |
463 | // Poll the future. |
464 | let output = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
465 | struct Guard<'a, T: Future, S: Schedule> { |
466 | core: &'a Core<T, S>, |
467 | } |
468 | impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> { |
469 | fn drop(&mut self) { |
470 | // If the future panics on poll, we drop it inside the panic |
471 | // guard. |
472 | self.core.drop_future_or_output(); |
473 | } |
474 | } |
475 | let guard = Guard { core }; |
476 | let res = guard.core.poll(cx); |
477 | mem::forget(guard); |
478 | res |
479 | })); |
480 | |
481 | // Prepare output for being placed in the core stage. |
482 | let output = match output { |
483 | Ok(Poll::Pending) => return Poll::Pending, |
484 | Ok(Poll::Ready(output)) => Ok(output), |
485 | Err(panic) => { |
486 | core.scheduler.unhandled_panic(); |
487 | Err(JoinError::panic(core.task_id, panic)) |
488 | } |
489 | }; |
490 | |
491 | // Catch and ignore panics if the future panics on drop. |
492 | let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
493 | core.store_output(output); |
494 | })); |
495 | |
496 | if res.is_err() { |
497 | core.scheduler.unhandled_panic(); |
498 | } |
499 | |
500 | Poll::Ready(()) |
501 | } |
502 | |