1 | //! Core task module. |
2 | //! |
3 | //! # Safety |
4 | //! |
5 | //! The functions in this module are private to the `task` module. All of them |
6 | //! should be considered `unsafe` to use, but are not marked as such since it |
7 | //! would be too noisy. |
8 | //! |
9 | //! Make sure to consult the relevant safety section of each function before |
10 | //! use. |
11 | |
12 | use crate::future::Future; |
13 | use crate::loom::cell::UnsafeCell; |
14 | use crate::runtime::context; |
15 | use crate::runtime::task::raw::{self, Vtable}; |
16 | use crate::runtime::task::state::State; |
17 | use crate::runtime::task::{Id, Schedule}; |
18 | use crate::util::linked_list; |
19 | |
20 | use std::num::NonZeroU64; |
21 | use std::pin::Pin; |
22 | use std::ptr::NonNull; |
23 | use std::task::{Context, Poll, Waker}; |
24 | |
25 | /// The task cell. Contains the components of the task. |
26 | /// |
27 | /// It is critical for `Header` to be the first field as the task structure will |
28 | /// be referenced by both *mut Cell and *mut Header. |
29 | /// |
30 | /// Any changes to the layout of this struct _must_ also be reflected in the |
31 | /// `const` fns in raw.rs. |
32 | /// |
33 | // # This struct should be cache padded to avoid false sharing. The cache padding rules are copied |
34 | // from crossbeam-utils/src/cache_padded.rs |
35 | // |
36 | // Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache |
37 | // lines at a time, so we have to align to 128 bytes rather than 64. |
38 | // |
39 | // Sources: |
40 | // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf |
41 | // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 |
42 | // |
43 | // ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. |
44 | // |
45 | // Sources: |
46 | // - https://www.mono-project.com/news/2016/09/12/arm64-icache/ |
47 | // |
48 | // powerpc64 has 128-byte cache line size. |
49 | // |
50 | // Sources: |
51 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 |
52 | #[cfg_attr ( |
53 | any( |
54 | target_arch = "x86_64" , |
55 | target_arch = "aarch64" , |
56 | target_arch = "powerpc64" , |
57 | ), |
58 | repr(align(128)) |
59 | )] |
60 | // arm, mips, mips64, sparc, and hexagon have 32-byte cache line size. |
61 | // |
62 | // Sources: |
63 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 |
64 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 |
65 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 |
66 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 |
67 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 |
68 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 |
69 | #[cfg_attr ( |
70 | any( |
71 | target_arch = "arm" , |
72 | target_arch = "mips" , |
73 | target_arch = "mips64" , |
74 | target_arch = "sparc" , |
75 | target_arch = "hexagon" , |
76 | ), |
77 | repr(align(32)) |
78 | )] |
79 | // m68k has 16-byte cache line size. |
80 | // |
81 | // Sources: |
82 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9 |
83 | #[cfg_attr (target_arch = "m68k" , repr(align(16)))] |
84 | // s390x has 256-byte cache line size. |
85 | // |
86 | // Sources: |
87 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 |
88 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 |
89 | #[cfg_attr (target_arch = "s390x" , repr(align(256)))] |
90 | // x86, riscv, wasm, and sparc64 have 64-byte cache line size. |
91 | // |
92 | // Sources: |
93 | // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 |
94 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 |
95 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 |
96 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10 |
97 | // |
98 | // All others are assumed to have 64-byte cache line size. |
99 | #[cfg_attr ( |
100 | not(any( |
101 | target_arch = "x86_64" , |
102 | target_arch = "aarch64" , |
103 | target_arch = "powerpc64" , |
104 | target_arch = "arm" , |
105 | target_arch = "mips" , |
106 | target_arch = "mips64" , |
107 | target_arch = "sparc" , |
108 | target_arch = "hexagon" , |
109 | target_arch = "m68k" , |
110 | target_arch = "s390x" , |
111 | )), |
112 | repr(align(64)) |
113 | )] |
114 | #[repr (C)] |
115 | pub(super) struct Cell<T: Future, S> { |
116 | /// Hot task state data |
117 | pub(super) header: Header, |
118 | |
119 | /// Either the future or output, depending on the execution stage. |
120 | pub(super) core: Core<T, S>, |
121 | |
122 | /// Cold data |
123 | pub(super) trailer: Trailer, |
124 | } |
125 | |
126 | pub(super) struct CoreStage<T: Future> { |
127 | stage: UnsafeCell<Stage<T>>, |
128 | } |
129 | |
130 | /// The core of the task. |
131 | /// |
132 | /// Holds the future or output, depending on the stage of execution. |
133 | /// |
134 | /// Any changes to the layout of this struct _must_ also be reflected in the |
135 | /// `const` fns in raw.rs. |
136 | #[repr (C)] |
137 | pub(super) struct Core<T: Future, S> { |
138 | /// Scheduler used to drive this future. |
139 | pub(super) scheduler: S, |
140 | |
141 | /// The task's ID, used for populating `JoinError`s. |
142 | pub(super) task_id: Id, |
143 | |
144 | /// Either the future or the output. |
145 | pub(super) stage: CoreStage<T>, |
146 | } |
147 | |
148 | /// Crate public as this is also needed by the pool. |
149 | #[repr (C)] |
150 | pub(crate) struct Header { |
151 | /// Task state. |
152 | pub(super) state: State, |
153 | |
154 | /// Pointer to next task, used with the injection queue. |
155 | pub(super) queue_next: UnsafeCell<Option<NonNull<Header>>>, |
156 | |
157 | /// Table of function pointers for executing actions on the task. |
158 | pub(super) vtable: &'static Vtable, |
159 | |
160 | /// This integer contains the id of the `OwnedTasks` or `LocalOwnedTasks` |
161 | /// that this task is stored in. If the task is not in any list, should be |
162 | /// the id of the list that it was previously in, or `None` if it has never |
163 | /// been in any list. |
164 | /// |
165 | /// Once a task has been bound to a list, it can never be bound to another |
166 | /// list, even if removed from the first list. |
167 | /// |
168 | /// The id is not unset when removed from a list because we want to be able |
169 | /// to read the id without synchronization, even if it is concurrently being |
170 | /// removed from the list. |
171 | pub(super) owner_id: UnsafeCell<Option<NonZeroU64>>, |
172 | |
173 | /// The tracing ID for this instrumented task. |
174 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
175 | pub(super) tracing_id: Option<tracing::Id>, |
176 | } |
177 | |
178 | unsafe impl Send for Header {} |
179 | unsafe impl Sync for Header {} |
180 | |
181 | /// Cold data is stored after the future. Data is considered cold if it is only |
182 | /// used during creation or shutdown of the task. |
183 | pub(super) struct Trailer { |
184 | /// Pointers for the linked list in the `OwnedTasks` that owns this task. |
185 | pub(super) owned: linked_list::Pointers<Header>, |
186 | /// Consumer task waiting on completion of this task. |
187 | pub(super) waker: UnsafeCell<Option<Waker>>, |
188 | } |
189 | |
190 | generate_addr_of_methods! { |
191 | impl<> Trailer { |
192 | pub(super) unsafe fn addr_of_owned(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Header>> { |
193 | &self.owned |
194 | } |
195 | } |
196 | } |
197 | |
198 | /// Either the future or the output. |
199 | pub(super) enum Stage<T: Future> { |
200 | Running(T), |
201 | Finished(super::Result<T::Output>), |
202 | Consumed, |
203 | } |
204 | |
205 | impl<T: Future, S: Schedule> Cell<T, S> { |
206 | /// Allocates a new task cell, containing the header, trailer, and core |
207 | /// structures. |
208 | pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> { |
209 | // Separated into a non-generic function to reduce LLVM codegen |
210 | fn new_header( |
211 | state: State, |
212 | vtable: &'static Vtable, |
213 | #[cfg (all(tokio_unstable, feature = "tracing" ))] tracing_id: Option<tracing::Id>, |
214 | ) -> Header { |
215 | Header { |
216 | state, |
217 | queue_next: UnsafeCell::new(None), |
218 | vtable, |
219 | owner_id: UnsafeCell::new(None), |
220 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
221 | tracing_id, |
222 | } |
223 | } |
224 | |
225 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
226 | let tracing_id = future.id(); |
227 | let vtable = raw::vtable::<T, S>(); |
228 | let result = Box::new(Cell { |
229 | header: new_header( |
230 | state, |
231 | vtable, |
232 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
233 | tracing_id, |
234 | ), |
235 | core: Core { |
236 | scheduler, |
237 | stage: CoreStage { |
238 | stage: UnsafeCell::new(Stage::Running(future)), |
239 | }, |
240 | task_id, |
241 | }, |
242 | trailer: Trailer::new(), |
243 | }); |
244 | |
245 | #[cfg (debug_assertions)] |
246 | { |
247 | // Using a separate function for this code avoids instantiating it separately for every `T`. |
248 | unsafe fn check<S>(header: &Header, trailer: &Trailer, scheduler: &S, task_id: &Id) { |
249 | let trailer_addr = trailer as *const Trailer as usize; |
250 | let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(header)) }; |
251 | assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize); |
252 | |
253 | let scheduler_addr = scheduler as *const S as usize; |
254 | let scheduler_ptr = unsafe { Header::get_scheduler::<S>(NonNull::from(header)) }; |
255 | assert_eq!(scheduler_addr, scheduler_ptr.as_ptr() as usize); |
256 | |
257 | let id_addr = task_id as *const Id as usize; |
258 | let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(header)) }; |
259 | assert_eq!(id_addr, id_ptr.as_ptr() as usize); |
260 | } |
261 | unsafe { |
262 | check( |
263 | &result.header, |
264 | &result.trailer, |
265 | &result.core.scheduler, |
266 | &result.core.task_id, |
267 | ); |
268 | } |
269 | } |
270 | |
271 | result |
272 | } |
273 | } |
274 | |
275 | impl<T: Future> CoreStage<T> { |
276 | pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R { |
277 | self.stage.with_mut(f) |
278 | } |
279 | } |
280 | |
281 | /// Set and clear the task id in the context when the future is executed or |
282 | /// dropped, or when the output produced by the future is dropped. |
283 | pub(crate) struct TaskIdGuard { |
284 | parent_task_id: Option<Id>, |
285 | } |
286 | |
287 | impl TaskIdGuard { |
288 | fn enter(id: Id) -> Self { |
289 | TaskIdGuard { |
290 | parent_task_id: context::set_current_task_id(Some(id)), |
291 | } |
292 | } |
293 | } |
294 | |
295 | impl Drop for TaskIdGuard { |
296 | fn drop(&mut self) { |
297 | context::set_current_task_id(self.parent_task_id); |
298 | } |
299 | } |
300 | |
301 | impl<T: Future, S: Schedule> Core<T, S> { |
302 | /// Polls the future. |
303 | /// |
304 | /// # Safety |
305 | /// |
306 | /// The caller must ensure it is safe to mutate the `state` field. This |
307 | /// requires ensuring mutual exclusion between any concurrent thread that |
308 | /// might modify the future or output field. |
309 | /// |
310 | /// The mutual exclusion is implemented by `Harness` and the `Lifecycle` |
311 | /// component of the task state. |
312 | /// |
313 | /// `self` must also be pinned. This is handled by storing the task on the |
314 | /// heap. |
315 | pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> { |
316 | let res = { |
317 | self.stage.stage.with_mut(|ptr| { |
318 | // Safety: The caller ensures mutual exclusion to the field. |
319 | let future = match unsafe { &mut *ptr } { |
320 | Stage::Running(future) => future, |
321 | _ => unreachable!("unexpected stage" ), |
322 | }; |
323 | |
324 | // Safety: The caller ensures the future is pinned. |
325 | let future = unsafe { Pin::new_unchecked(future) }; |
326 | |
327 | let _guard = TaskIdGuard::enter(self.task_id); |
328 | future.poll(&mut cx) |
329 | }) |
330 | }; |
331 | |
332 | if res.is_ready() { |
333 | self.drop_future_or_output(); |
334 | } |
335 | |
336 | res |
337 | } |
338 | |
339 | /// Drops the future. |
340 | /// |
341 | /// # Safety |
342 | /// |
343 | /// The caller must ensure it is safe to mutate the `stage` field. |
344 | pub(super) fn drop_future_or_output(&self) { |
345 | // Safety: the caller ensures mutual exclusion to the field. |
346 | unsafe { |
347 | self.set_stage(Stage::Consumed); |
348 | } |
349 | } |
350 | |
351 | /// Stores the task output. |
352 | /// |
353 | /// # Safety |
354 | /// |
355 | /// The caller must ensure it is safe to mutate the `stage` field. |
356 | pub(super) fn store_output(&self, output: super::Result<T::Output>) { |
357 | // Safety: the caller ensures mutual exclusion to the field. |
358 | unsafe { |
359 | self.set_stage(Stage::Finished(output)); |
360 | } |
361 | } |
362 | |
363 | /// Takes the task output. |
364 | /// |
365 | /// # Safety |
366 | /// |
367 | /// The caller must ensure it is safe to mutate the `stage` field. |
368 | pub(super) fn take_output(&self) -> super::Result<T::Output> { |
369 | use std::mem; |
370 | |
371 | self.stage.stage.with_mut(|ptr| { |
372 | // Safety:: the caller ensures mutual exclusion to the field. |
373 | match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) { |
374 | Stage::Finished(output) => output, |
375 | _ => panic!("JoinHandle polled after completion" ), |
376 | } |
377 | }) |
378 | } |
379 | |
380 | unsafe fn set_stage(&self, stage: Stage<T>) { |
381 | let _guard = TaskIdGuard::enter(self.task_id); |
382 | self.stage.stage.with_mut(|ptr| *ptr = stage); |
383 | } |
384 | } |
385 | |
386 | impl Header { |
387 | pub(super) unsafe fn set_next(&self, next: Option<NonNull<Header>>) { |
388 | self.queue_next.with_mut(|ptr| *ptr = next); |
389 | } |
390 | |
391 | // safety: The caller must guarantee exclusive access to this field, and |
392 | // must ensure that the id is either `None` or the id of the OwnedTasks |
393 | // containing this task. |
394 | pub(super) unsafe fn set_owner_id(&self, owner: NonZeroU64) { |
395 | self.owner_id.with_mut(|ptr| *ptr = Some(owner)); |
396 | } |
397 | |
398 | pub(super) fn get_owner_id(&self) -> Option<NonZeroU64> { |
399 | // safety: If there are concurrent writes, then that write has violated |
400 | // the safety requirements on `set_owner_id`. |
401 | unsafe { self.owner_id.with(|ptr| *ptr) } |
402 | } |
403 | |
404 | /// Gets a pointer to the `Trailer` of the task containing this `Header`. |
405 | /// |
406 | /// # Safety |
407 | /// |
408 | /// The provided raw pointer must point at the header of a task. |
409 | pub(super) unsafe fn get_trailer(me: NonNull<Header>) -> NonNull<Trailer> { |
410 | let offset = me.as_ref().vtable.trailer_offset; |
411 | let trailer = me.as_ptr().cast::<u8>().add(offset).cast::<Trailer>(); |
412 | NonNull::new_unchecked(trailer) |
413 | } |
414 | |
415 | /// Gets a pointer to the scheduler of the task containing this `Header`. |
416 | /// |
417 | /// # Safety |
418 | /// |
419 | /// The provided raw pointer must point at the header of a task. |
420 | /// |
421 | /// The generic type S must be set to the correct scheduler type for this |
422 | /// task. |
423 | pub(super) unsafe fn get_scheduler<S>(me: NonNull<Header>) -> NonNull<S> { |
424 | let offset = me.as_ref().vtable.scheduler_offset; |
425 | let scheduler = me.as_ptr().cast::<u8>().add(offset).cast::<S>(); |
426 | NonNull::new_unchecked(scheduler) |
427 | } |
428 | |
429 | /// Gets a pointer to the id of the task containing this `Header`. |
430 | /// |
431 | /// # Safety |
432 | /// |
433 | /// The provided raw pointer must point at the header of a task. |
434 | pub(super) unsafe fn get_id_ptr(me: NonNull<Header>) -> NonNull<Id> { |
435 | let offset = me.as_ref().vtable.id_offset; |
436 | let id = me.as_ptr().cast::<u8>().add(offset).cast::<Id>(); |
437 | NonNull::new_unchecked(id) |
438 | } |
439 | |
440 | /// Gets the id of the task containing this `Header`. |
441 | /// |
442 | /// # Safety |
443 | /// |
444 | /// The provided raw pointer must point at the header of a task. |
445 | pub(super) unsafe fn get_id(me: NonNull<Header>) -> Id { |
446 | let ptr = Header::get_id_ptr(me).as_ptr(); |
447 | *ptr |
448 | } |
449 | |
450 | /// Gets the tracing id of the task containing this `Header`. |
451 | /// |
452 | /// # Safety |
453 | /// |
454 | /// The provided raw pointer must point at the header of a task. |
455 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
456 | pub(super) unsafe fn get_tracing_id(me: &NonNull<Header>) -> Option<&tracing::Id> { |
457 | me.as_ref().tracing_id.as_ref() |
458 | } |
459 | } |
460 | |
461 | impl Trailer { |
462 | fn new() -> Self { |
463 | Trailer { |
464 | waker: UnsafeCell::new(None), |
465 | owned: linked_list::Pointers::new(), |
466 | } |
467 | } |
468 | |
469 | pub(super) unsafe fn set_waker(&self, waker: Option<Waker>) { |
470 | self.waker.with_mut(|ptr| { |
471 | *ptr = waker; |
472 | }); |
473 | } |
474 | |
475 | pub(super) unsafe fn will_wake(&self, waker: &Waker) -> bool { |
476 | self.waker |
477 | .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker)) |
478 | } |
479 | |
480 | pub(super) fn wake_join(&self) { |
481 | self.waker.with(|ptr| match unsafe { &*ptr } { |
482 | Some(waker) => waker.wake_by_ref(), |
483 | None => panic!("waker missing" ), |
484 | }); |
485 | } |
486 | } |
487 | |
488 | #[test ] |
489 | #[cfg (not(loom))] |
490 | fn header_lte_cache_line() { |
491 | use std::mem::size_of; |
492 | |
493 | assert!(size_of::<Header>() <= 8 * size_of::<*const ()>()); |
494 | } |
495 | |