1//! Core task module.
2//!
3//! # Safety
4//!
5//! The functions in this module are private to the `task` module. All of them
6//! should be considered `unsafe` to use, but are not marked as such since it
7//! would be too noisy.
8//!
9//! Make sure to consult the relevant safety section of each function before
10//! use.
11
12use crate::future::Future;
13use crate::loom::cell::UnsafeCell;
14use crate::runtime::context;
15use crate::runtime::task::raw::{self, Vtable};
16use crate::runtime::task::state::State;
17use crate::runtime::task::{Id, Schedule};
18use crate::util::linked_list;
19
20use std::pin::Pin;
21use std::ptr::NonNull;
22use std::task::{Context, Poll, Waker};
23
24/// The task cell. Contains the components of the task.
25///
26/// It is critical for `Header` to be the first field as the task structure will
27/// be referenced by both *mut Cell and *mut Header.
28///
29/// Any changes to the layout of this struct _must_ also be reflected in the
30/// const fns in raw.rs.
31///
32// # This struct should be cache padded to avoid false sharing. The cache padding rules are copied
33// from crossbeam-utils/src/cache_padded.rs
34//
35// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
36// lines at a time, so we have to align to 128 bytes rather than 64.
37//
38// Sources:
39// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
40// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
41//
42// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
43//
44// Sources:
45// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
46//
47// powerpc64 has 128-byte cache line size.
48//
49// Sources:
50// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
51#[cfg_attr(
52 any(
53 target_arch = "x86_64",
54 target_arch = "aarch64",
55 target_arch = "powerpc64",
56 ),
57 repr(align(128))
58)]
59// arm, mips, mips64, riscv64, sparc, and hexagon have 32-byte cache line size.
60//
61// Sources:
62// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
63// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
64// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
65// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
66// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
67// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
68// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
69//
70// riscv32 is assumed not to exceed the cache line size of riscv64.
71#[cfg_attr(
72 any(
73 target_arch = "arm",
74 target_arch = "mips",
75 target_arch = "mips64",
76 target_arch = "riscv32",
77 target_arch = "riscv64",
78 target_arch = "sparc",
79 target_arch = "hexagon",
80 ),
81 repr(align(32))
82)]
83// m68k has 16-byte cache line size.
84//
85// Sources:
86// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
87#[cfg_attr(target_arch = "m68k", repr(align(16)))]
88// s390x has 256-byte cache line size.
89//
90// Sources:
91// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
92// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
93#[cfg_attr(target_arch = "s390x", repr(align(256)))]
94// x86, wasm, and sparc64 have 64-byte cache line size.
95//
96// Sources:
97// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
98// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
99// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
100//
101// All others are assumed to have 64-byte cache line size.
102#[cfg_attr(
103 not(any(
104 target_arch = "x86_64",
105 target_arch = "aarch64",
106 target_arch = "powerpc64",
107 target_arch = "arm",
108 target_arch = "mips",
109 target_arch = "mips64",
110 target_arch = "riscv32",
111 target_arch = "riscv64",
112 target_arch = "sparc",
113 target_arch = "hexagon",
114 target_arch = "m68k",
115 target_arch = "s390x",
116 )),
117 repr(align(64))
118)]
119#[repr(C)]
120pub(super) struct Cell<T: Future, S> {
121 /// Hot task state data
122 pub(super) header: Header,
123
124 /// Either the future or output, depending on the execution stage.
125 pub(super) core: Core<T, S>,
126
127 /// Cold data
128 pub(super) trailer: Trailer,
129}
130
131pub(super) struct CoreStage<T: Future> {
132 stage: UnsafeCell<Stage<T>>,
133}
134
135/// The core of the task.
136///
137/// Holds the future or output, depending on the stage of execution.
138///
139/// Any changes to the layout of this struct _must_ also be reflected in the
140/// const fns in raw.rs.
141#[repr(C)]
142pub(super) struct Core<T: Future, S> {
143 /// Scheduler used to drive this future.
144 pub(super) scheduler: S,
145
146 /// The task's ID, used for populating `JoinError`s.
147 pub(super) task_id: Id,
148
149 /// Either the future or the output.
150 pub(super) stage: CoreStage<T>,
151}
152
153/// Crate public as this is also needed by the pool.
154#[repr(C)]
155pub(crate) struct Header {
156 /// Task state.
157 pub(super) state: State,
158
159 /// Pointer to next task, used with the injection queue.
160 pub(super) queue_next: UnsafeCell<Option<NonNull<Header>>>,
161
162 /// Table of function pointers for executing actions on the task.
163 pub(super) vtable: &'static Vtable,
164
165 /// This integer contains the id of the OwnedTasks or LocalOwnedTasks that
166 /// this task is stored in. If the task is not in any list, should be the
167 /// id of the list that it was previously in, or zero if it has never been
168 /// in any list.
169 ///
170 /// Once a task has been bound to a list, it can never be bound to another
171 /// list, even if removed from the first list.
172 ///
173 /// The id is not unset when removed from a list because we want to be able
174 /// to read the id without synchronization, even if it is concurrently being
175 /// removed from the list.
176 pub(super) owner_id: UnsafeCell<u64>,
177
178 /// The tracing ID for this instrumented task.
179 #[cfg(all(tokio_unstable, feature = "tracing"))]
180 pub(super) tracing_id: Option<tracing::Id>,
181}
182
183unsafe impl Send for Header {}
184unsafe impl Sync for Header {}
185
186/// Cold data is stored after the future. Data is considered cold if it is only
187/// used during creation or shutdown of the task.
188pub(super) struct Trailer {
189 /// Pointers for the linked list in the `OwnedTasks` that owns this task.
190 pub(super) owned: linked_list::Pointers<Header>,
191 /// Consumer task waiting on completion of this task.
192 pub(super) waker: UnsafeCell<Option<Waker>>,
193}
194
195generate_addr_of_methods! {
196 impl<> Trailer {
197 pub(super) unsafe fn addr_of_owned(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Header>> {
198 &self.owned
199 }
200 }
201}
202
203/// Either the future or the output.
204pub(super) enum Stage<T: Future> {
205 Running(T),
206 Finished(super::Result<T::Output>),
207 Consumed,
208}
209
210impl<T: Future, S: Schedule> Cell<T, S> {
211 /// Allocates a new task cell, containing the header, trailer, and core
212 /// structures.
213 pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
214 #[cfg(all(tokio_unstable, feature = "tracing"))]
215 let tracing_id = future.id();
216 let result = Box::new(Cell {
217 header: Header {
218 state,
219 queue_next: UnsafeCell::new(None),
220 vtable: raw::vtable::<T, S>(),
221 owner_id: UnsafeCell::new(0),
222 #[cfg(all(tokio_unstable, feature = "tracing"))]
223 tracing_id,
224 },
225 core: Core {
226 scheduler,
227 stage: CoreStage {
228 stage: UnsafeCell::new(Stage::Running(future)),
229 },
230 task_id,
231 },
232 trailer: Trailer {
233 waker: UnsafeCell::new(None),
234 owned: linked_list::Pointers::new(),
235 },
236 });
237
238 #[cfg(debug_assertions)]
239 {
240 let trailer_addr = (&result.trailer) as *const Trailer as usize;
241 let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(&result.header)) };
242 assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize);
243
244 let scheduler_addr = (&result.core.scheduler) as *const S as usize;
245 let scheduler_ptr =
246 unsafe { Header::get_scheduler::<S>(NonNull::from(&result.header)) };
247 assert_eq!(scheduler_addr, scheduler_ptr.as_ptr() as usize);
248
249 let id_addr = (&result.core.task_id) as *const Id as usize;
250 let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(&result.header)) };
251 assert_eq!(id_addr, id_ptr.as_ptr() as usize);
252 }
253
254 result
255 }
256}
257
258impl<T: Future> CoreStage<T> {
259 pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R {
260 self.stage.with_mut(f)
261 }
262}
263
264/// Set and clear the task id in the context when the future is executed or
265/// dropped, or when the output produced by the future is dropped.
266pub(crate) struct TaskIdGuard {
267 parent_task_id: Option<Id>,
268}
269
270impl TaskIdGuard {
271 fn enter(id: Id) -> Self {
272 TaskIdGuard {
273 parent_task_id: context::set_current_task_id(Some(id)),
274 }
275 }
276}
277
278impl Drop for TaskIdGuard {
279 fn drop(&mut self) {
280 context::set_current_task_id(self.parent_task_id);
281 }
282}
283
284impl<T: Future, S: Schedule> Core<T, S> {
285 /// Polls the future.
286 ///
287 /// # Safety
288 ///
289 /// The caller must ensure it is safe to mutate the `state` field. This
290 /// requires ensuring mutual exclusion between any concurrent thread that
291 /// might modify the future or output field.
292 ///
293 /// The mutual exclusion is implemented by `Harness` and the `Lifecycle`
294 /// component of the task state.
295 ///
296 /// `self` must also be pinned. This is handled by storing the task on the
297 /// heap.
298 pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
299 let res = {
300 self.stage.stage.with_mut(|ptr| {
301 // Safety: The caller ensures mutual exclusion to the field.
302 let future = match unsafe { &mut *ptr } {
303 Stage::Running(future) => future,
304 _ => unreachable!("unexpected stage"),
305 };
306
307 // Safety: The caller ensures the future is pinned.
308 let future = unsafe { Pin::new_unchecked(future) };
309
310 let _guard = TaskIdGuard::enter(self.task_id);
311 future.poll(&mut cx)
312 })
313 };
314
315 if res.is_ready() {
316 self.drop_future_or_output();
317 }
318
319 res
320 }
321
322 /// Drops the future.
323 ///
324 /// # Safety
325 ///
326 /// The caller must ensure it is safe to mutate the `stage` field.
327 pub(super) fn drop_future_or_output(&self) {
328 // Safety: the caller ensures mutual exclusion to the field.
329 unsafe {
330 self.set_stage(Stage::Consumed);
331 }
332 }
333
334 /// Stores the task output.
335 ///
336 /// # Safety
337 ///
338 /// The caller must ensure it is safe to mutate the `stage` field.
339 pub(super) fn store_output(&self, output: super::Result<T::Output>) {
340 // Safety: the caller ensures mutual exclusion to the field.
341 unsafe {
342 self.set_stage(Stage::Finished(output));
343 }
344 }
345
346 /// Takes the task output.
347 ///
348 /// # Safety
349 ///
350 /// The caller must ensure it is safe to mutate the `stage` field.
351 pub(super) fn take_output(&self) -> super::Result<T::Output> {
352 use std::mem;
353
354 self.stage.stage.with_mut(|ptr| {
355 // Safety:: the caller ensures mutual exclusion to the field.
356 match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
357 Stage::Finished(output) => output,
358 _ => panic!("JoinHandle polled after completion"),
359 }
360 })
361 }
362
363 unsafe fn set_stage(&self, stage: Stage<T>) {
364 let _guard = TaskIdGuard::enter(self.task_id);
365 self.stage.stage.with_mut(|ptr| *ptr = stage)
366 }
367}
368
369impl Header {
370 pub(super) unsafe fn set_next(&self, next: Option<NonNull<Header>>) {
371 self.queue_next.with_mut(|ptr| *ptr = next);
372 }
373
374 // safety: The caller must guarantee exclusive access to this field, and
375 // must ensure that the id is either 0 or the id of the OwnedTasks
376 // containing this task.
377 pub(super) unsafe fn set_owner_id(&self, owner: u64) {
378 self.owner_id.with_mut(|ptr| *ptr = owner);
379 }
380
381 pub(super) fn get_owner_id(&self) -> u64 {
382 // safety: If there are concurrent writes, then that write has violated
383 // the safety requirements on `set_owner_id`.
384 unsafe { self.owner_id.with(|ptr| *ptr) }
385 }
386
387 /// Gets a pointer to the `Trailer` of the task containing this `Header`.
388 ///
389 /// # Safety
390 ///
391 /// The provided raw pointer must point at the header of a task.
392 pub(super) unsafe fn get_trailer(me: NonNull<Header>) -> NonNull<Trailer> {
393 let offset = me.as_ref().vtable.trailer_offset;
394 let trailer = me.as_ptr().cast::<u8>().add(offset).cast::<Trailer>();
395 NonNull::new_unchecked(trailer)
396 }
397
398 /// Gets a pointer to the scheduler of the task containing this `Header`.
399 ///
400 /// # Safety
401 ///
402 /// The provided raw pointer must point at the header of a task.
403 ///
404 /// The generic type S must be set to the correct scheduler type for this
405 /// task.
406 pub(super) unsafe fn get_scheduler<S>(me: NonNull<Header>) -> NonNull<S> {
407 let offset = me.as_ref().vtable.scheduler_offset;
408 let scheduler = me.as_ptr().cast::<u8>().add(offset).cast::<S>();
409 NonNull::new_unchecked(scheduler)
410 }
411
412 /// Gets a pointer to the id of the task containing this `Header`.
413 ///
414 /// # Safety
415 ///
416 /// The provided raw pointer must point at the header of a task.
417 pub(super) unsafe fn get_id_ptr(me: NonNull<Header>) -> NonNull<Id> {
418 let offset = me.as_ref().vtable.id_offset;
419 let id = me.as_ptr().cast::<u8>().add(offset).cast::<Id>();
420 NonNull::new_unchecked(id)
421 }
422
423 /// Gets the id of the task containing this `Header`.
424 ///
425 /// # Safety
426 ///
427 /// The provided raw pointer must point at the header of a task.
428 pub(super) unsafe fn get_id(me: NonNull<Header>) -> Id {
429 let ptr = Header::get_id_ptr(me).as_ptr();
430 *ptr
431 }
432
433 /// Gets the tracing id of the task containing this `Header`.
434 ///
435 /// # Safety
436 ///
437 /// The provided raw pointer must point at the header of a task.
438 #[cfg(all(tokio_unstable, feature = "tracing"))]
439 pub(super) unsafe fn get_tracing_id(me: &NonNull<Header>) -> Option<&tracing::Id> {
440 me.as_ref().tracing_id.as_ref()
441 }
442}
443
444impl Trailer {
445 pub(super) unsafe fn set_waker(&self, waker: Option<Waker>) {
446 self.waker.with_mut(|ptr: *mut Option| {
447 *ptr = waker;
448 });
449 }
450
451 pub(super) unsafe fn will_wake(&self, waker: &Waker) -> bool {
452 self.waker
453 .with(|ptr: *const Option| (*ptr).as_ref().unwrap().will_wake(waker))
454 }
455
456 pub(super) fn wake_join(&self) {
457 self.waker.with(|ptr: *const Option| match unsafe { &*ptr } {
458 Some(waker: &Waker) => waker.wake_by_ref(),
459 None => panic!("waker missing"),
460 });
461 }
462}
463
464#[test]
465#[cfg(not(loom))]
466fn header_lte_cache_line() {
467 use std::mem::size_of;
468
469 assert!(size_of::<Header>() <= 8 * size_of::<*const ()>());
470}
471