1 | //! Core task module. |
2 | //! |
3 | //! # Safety |
4 | //! |
5 | //! The functions in this module are private to the `task` module. All of them |
6 | //! should be considered `unsafe` to use, but are not marked as such since it |
7 | //! would be too noisy. |
8 | //! |
9 | //! Make sure to consult the relevant safety section of each function before |
10 | //! use. |
11 | |
12 | use crate::future::Future; |
13 | use crate::loom::cell::UnsafeCell; |
14 | use crate::runtime::context; |
15 | use crate::runtime::task::raw::{self, Vtable}; |
16 | use crate::runtime::task::state::State; |
17 | use crate::runtime::task::{Id, Schedule}; |
18 | use crate::util::linked_list; |
19 | |
20 | use std::pin::Pin; |
21 | use std::ptr::NonNull; |
22 | use std::task::{Context, Poll, Waker}; |
23 | |
24 | /// The task cell. Contains the components of the task. |
25 | /// |
26 | /// It is critical for `Header` to be the first field as the task structure will |
27 | /// be referenced by both *mut Cell and *mut Header. |
28 | /// |
29 | /// Any changes to the layout of this struct _must_ also be reflected in the |
30 | /// const fns in raw.rs. |
31 | /// |
32 | // # This struct should be cache padded to avoid false sharing. The cache padding rules are copied |
33 | // from crossbeam-utils/src/cache_padded.rs |
34 | // |
35 | // Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache |
36 | // lines at a time, so we have to align to 128 bytes rather than 64. |
37 | // |
38 | // Sources: |
39 | // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf |
40 | // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 |
41 | // |
42 | // ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. |
43 | // |
44 | // Sources: |
45 | // - https://www.mono-project.com/news/2016/09/12/arm64-icache/ |
46 | // |
47 | // powerpc64 has 128-byte cache line size. |
48 | // |
49 | // Sources: |
50 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 |
51 | #[cfg_attr ( |
52 | any( |
53 | target_arch = "x86_64" , |
54 | target_arch = "aarch64" , |
55 | target_arch = "powerpc64" , |
56 | ), |
57 | repr(align(128)) |
58 | )] |
59 | // arm, mips, mips64, riscv64, sparc, and hexagon have 32-byte cache line size. |
60 | // |
61 | // Sources: |
62 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 |
63 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 |
64 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 |
65 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 |
66 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 |
67 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 |
68 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 |
69 | // |
70 | // riscv32 is assumed not to exceed the cache line size of riscv64. |
71 | #[cfg_attr ( |
72 | any( |
73 | target_arch = "arm" , |
74 | target_arch = "mips" , |
75 | target_arch = "mips64" , |
76 | target_arch = "riscv32" , |
77 | target_arch = "riscv64" , |
78 | target_arch = "sparc" , |
79 | target_arch = "hexagon" , |
80 | ), |
81 | repr(align(32)) |
82 | )] |
83 | // m68k has 16-byte cache line size. |
84 | // |
85 | // Sources: |
86 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9 |
87 | #[cfg_attr (target_arch = "m68k" , repr(align(16)))] |
88 | // s390x has 256-byte cache line size. |
89 | // |
90 | // Sources: |
91 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 |
92 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 |
93 | #[cfg_attr (target_arch = "s390x" , repr(align(256)))] |
94 | // x86, wasm, and sparc64 have 64-byte cache line size. |
95 | // |
96 | // Sources: |
97 | // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 |
98 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 |
99 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 |
100 | // |
101 | // All others are assumed to have 64-byte cache line size. |
102 | #[cfg_attr ( |
103 | not(any( |
104 | target_arch = "x86_64" , |
105 | target_arch = "aarch64" , |
106 | target_arch = "powerpc64" , |
107 | target_arch = "arm" , |
108 | target_arch = "mips" , |
109 | target_arch = "mips64" , |
110 | target_arch = "riscv32" , |
111 | target_arch = "riscv64" , |
112 | target_arch = "sparc" , |
113 | target_arch = "hexagon" , |
114 | target_arch = "m68k" , |
115 | target_arch = "s390x" , |
116 | )), |
117 | repr(align(64)) |
118 | )] |
119 | #[repr (C)] |
120 | pub(super) struct Cell<T: Future, S> { |
121 | /// Hot task state data |
122 | pub(super) header: Header, |
123 | |
124 | /// Either the future or output, depending on the execution stage. |
125 | pub(super) core: Core<T, S>, |
126 | |
127 | /// Cold data |
128 | pub(super) trailer: Trailer, |
129 | } |
130 | |
131 | pub(super) struct CoreStage<T: Future> { |
132 | stage: UnsafeCell<Stage<T>>, |
133 | } |
134 | |
135 | /// The core of the task. |
136 | /// |
137 | /// Holds the future or output, depending on the stage of execution. |
138 | /// |
139 | /// Any changes to the layout of this struct _must_ also be reflected in the |
140 | /// const fns in raw.rs. |
141 | #[repr (C)] |
142 | pub(super) struct Core<T: Future, S> { |
143 | /// Scheduler used to drive this future. |
144 | pub(super) scheduler: S, |
145 | |
146 | /// The task's ID, used for populating `JoinError`s. |
147 | pub(super) task_id: Id, |
148 | |
149 | /// Either the future or the output. |
150 | pub(super) stage: CoreStage<T>, |
151 | } |
152 | |
153 | /// Crate public as this is also needed by the pool. |
154 | #[repr (C)] |
155 | pub(crate) struct Header { |
156 | /// Task state. |
157 | pub(super) state: State, |
158 | |
159 | /// Pointer to next task, used with the injection queue. |
160 | pub(super) queue_next: UnsafeCell<Option<NonNull<Header>>>, |
161 | |
162 | /// Table of function pointers for executing actions on the task. |
163 | pub(super) vtable: &'static Vtable, |
164 | |
165 | /// This integer contains the id of the OwnedTasks or LocalOwnedTasks that |
166 | /// this task is stored in. If the task is not in any list, should be the |
167 | /// id of the list that it was previously in, or zero if it has never been |
168 | /// in any list. |
169 | /// |
170 | /// Once a task has been bound to a list, it can never be bound to another |
171 | /// list, even if removed from the first list. |
172 | /// |
173 | /// The id is not unset when removed from a list because we want to be able |
174 | /// to read the id without synchronization, even if it is concurrently being |
175 | /// removed from the list. |
176 | pub(super) owner_id: UnsafeCell<u64>, |
177 | |
178 | /// The tracing ID for this instrumented task. |
179 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
180 | pub(super) tracing_id: Option<tracing::Id>, |
181 | } |
182 | |
183 | unsafe impl Send for Header {} |
184 | unsafe impl Sync for Header {} |
185 | |
186 | /// Cold data is stored after the future. Data is considered cold if it is only |
187 | /// used during creation or shutdown of the task. |
188 | pub(super) struct Trailer { |
189 | /// Pointers for the linked list in the `OwnedTasks` that owns this task. |
190 | pub(super) owned: linked_list::Pointers<Header>, |
191 | /// Consumer task waiting on completion of this task. |
192 | pub(super) waker: UnsafeCell<Option<Waker>>, |
193 | } |
194 | |
195 | generate_addr_of_methods! { |
196 | impl<> Trailer { |
197 | pub(super) unsafe fn addr_of_owned(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Header>> { |
198 | &self.owned |
199 | } |
200 | } |
201 | } |
202 | |
203 | /// Either the future or the output. |
204 | pub(super) enum Stage<T: Future> { |
205 | Running(T), |
206 | Finished(super::Result<T::Output>), |
207 | Consumed, |
208 | } |
209 | |
210 | impl<T: Future, S: Schedule> Cell<T, S> { |
211 | /// Allocates a new task cell, containing the header, trailer, and core |
212 | /// structures. |
213 | pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> { |
214 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
215 | let tracing_id = future.id(); |
216 | let result = Box::new(Cell { |
217 | header: Header { |
218 | state, |
219 | queue_next: UnsafeCell::new(None), |
220 | vtable: raw::vtable::<T, S>(), |
221 | owner_id: UnsafeCell::new(0), |
222 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
223 | tracing_id, |
224 | }, |
225 | core: Core { |
226 | scheduler, |
227 | stage: CoreStage { |
228 | stage: UnsafeCell::new(Stage::Running(future)), |
229 | }, |
230 | task_id, |
231 | }, |
232 | trailer: Trailer { |
233 | waker: UnsafeCell::new(None), |
234 | owned: linked_list::Pointers::new(), |
235 | }, |
236 | }); |
237 | |
238 | #[cfg (debug_assertions)] |
239 | { |
240 | let trailer_addr = (&result.trailer) as *const Trailer as usize; |
241 | let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(&result.header)) }; |
242 | assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize); |
243 | |
244 | let scheduler_addr = (&result.core.scheduler) as *const S as usize; |
245 | let scheduler_ptr = |
246 | unsafe { Header::get_scheduler::<S>(NonNull::from(&result.header)) }; |
247 | assert_eq!(scheduler_addr, scheduler_ptr.as_ptr() as usize); |
248 | |
249 | let id_addr = (&result.core.task_id) as *const Id as usize; |
250 | let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(&result.header)) }; |
251 | assert_eq!(id_addr, id_ptr.as_ptr() as usize); |
252 | } |
253 | |
254 | result |
255 | } |
256 | } |
257 | |
258 | impl<T: Future> CoreStage<T> { |
259 | pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R { |
260 | self.stage.with_mut(f) |
261 | } |
262 | } |
263 | |
264 | /// Set and clear the task id in the context when the future is executed or |
265 | /// dropped, or when the output produced by the future is dropped. |
266 | pub(crate) struct TaskIdGuard { |
267 | parent_task_id: Option<Id>, |
268 | } |
269 | |
270 | impl TaskIdGuard { |
271 | fn enter(id: Id) -> Self { |
272 | TaskIdGuard { |
273 | parent_task_id: context::set_current_task_id(Some(id)), |
274 | } |
275 | } |
276 | } |
277 | |
278 | impl Drop for TaskIdGuard { |
279 | fn drop(&mut self) { |
280 | context::set_current_task_id(self.parent_task_id); |
281 | } |
282 | } |
283 | |
284 | impl<T: Future, S: Schedule> Core<T, S> { |
285 | /// Polls the future. |
286 | /// |
287 | /// # Safety |
288 | /// |
289 | /// The caller must ensure it is safe to mutate the `state` field. This |
290 | /// requires ensuring mutual exclusion between any concurrent thread that |
291 | /// might modify the future or output field. |
292 | /// |
293 | /// The mutual exclusion is implemented by `Harness` and the `Lifecycle` |
294 | /// component of the task state. |
295 | /// |
296 | /// `self` must also be pinned. This is handled by storing the task on the |
297 | /// heap. |
298 | pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> { |
299 | let res = { |
300 | self.stage.stage.with_mut(|ptr| { |
301 | // Safety: The caller ensures mutual exclusion to the field. |
302 | let future = match unsafe { &mut *ptr } { |
303 | Stage::Running(future) => future, |
304 | _ => unreachable!("unexpected stage" ), |
305 | }; |
306 | |
307 | // Safety: The caller ensures the future is pinned. |
308 | let future = unsafe { Pin::new_unchecked(future) }; |
309 | |
310 | let _guard = TaskIdGuard::enter(self.task_id); |
311 | future.poll(&mut cx) |
312 | }) |
313 | }; |
314 | |
315 | if res.is_ready() { |
316 | self.drop_future_or_output(); |
317 | } |
318 | |
319 | res |
320 | } |
321 | |
322 | /// Drops the future. |
323 | /// |
324 | /// # Safety |
325 | /// |
326 | /// The caller must ensure it is safe to mutate the `stage` field. |
327 | pub(super) fn drop_future_or_output(&self) { |
328 | // Safety: the caller ensures mutual exclusion to the field. |
329 | unsafe { |
330 | self.set_stage(Stage::Consumed); |
331 | } |
332 | } |
333 | |
334 | /// Stores the task output. |
335 | /// |
336 | /// # Safety |
337 | /// |
338 | /// The caller must ensure it is safe to mutate the `stage` field. |
339 | pub(super) fn store_output(&self, output: super::Result<T::Output>) { |
340 | // Safety: the caller ensures mutual exclusion to the field. |
341 | unsafe { |
342 | self.set_stage(Stage::Finished(output)); |
343 | } |
344 | } |
345 | |
346 | /// Takes the task output. |
347 | /// |
348 | /// # Safety |
349 | /// |
350 | /// The caller must ensure it is safe to mutate the `stage` field. |
351 | pub(super) fn take_output(&self) -> super::Result<T::Output> { |
352 | use std::mem; |
353 | |
354 | self.stage.stage.with_mut(|ptr| { |
355 | // Safety:: the caller ensures mutual exclusion to the field. |
356 | match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) { |
357 | Stage::Finished(output) => output, |
358 | _ => panic!("JoinHandle polled after completion" ), |
359 | } |
360 | }) |
361 | } |
362 | |
363 | unsafe fn set_stage(&self, stage: Stage<T>) { |
364 | let _guard = TaskIdGuard::enter(self.task_id); |
365 | self.stage.stage.with_mut(|ptr| *ptr = stage) |
366 | } |
367 | } |
368 | |
369 | impl Header { |
370 | pub(super) unsafe fn set_next(&self, next: Option<NonNull<Header>>) { |
371 | self.queue_next.with_mut(|ptr| *ptr = next); |
372 | } |
373 | |
374 | // safety: The caller must guarantee exclusive access to this field, and |
375 | // must ensure that the id is either 0 or the id of the OwnedTasks |
376 | // containing this task. |
377 | pub(super) unsafe fn set_owner_id(&self, owner: u64) { |
378 | self.owner_id.with_mut(|ptr| *ptr = owner); |
379 | } |
380 | |
381 | pub(super) fn get_owner_id(&self) -> u64 { |
382 | // safety: If there are concurrent writes, then that write has violated |
383 | // the safety requirements on `set_owner_id`. |
384 | unsafe { self.owner_id.with(|ptr| *ptr) } |
385 | } |
386 | |
387 | /// Gets a pointer to the `Trailer` of the task containing this `Header`. |
388 | /// |
389 | /// # Safety |
390 | /// |
391 | /// The provided raw pointer must point at the header of a task. |
392 | pub(super) unsafe fn get_trailer(me: NonNull<Header>) -> NonNull<Trailer> { |
393 | let offset = me.as_ref().vtable.trailer_offset; |
394 | let trailer = me.as_ptr().cast::<u8>().add(offset).cast::<Trailer>(); |
395 | NonNull::new_unchecked(trailer) |
396 | } |
397 | |
398 | /// Gets a pointer to the scheduler of the task containing this `Header`. |
399 | /// |
400 | /// # Safety |
401 | /// |
402 | /// The provided raw pointer must point at the header of a task. |
403 | /// |
404 | /// The generic type S must be set to the correct scheduler type for this |
405 | /// task. |
406 | pub(super) unsafe fn get_scheduler<S>(me: NonNull<Header>) -> NonNull<S> { |
407 | let offset = me.as_ref().vtable.scheduler_offset; |
408 | let scheduler = me.as_ptr().cast::<u8>().add(offset).cast::<S>(); |
409 | NonNull::new_unchecked(scheduler) |
410 | } |
411 | |
412 | /// Gets a pointer to the id of the task containing this `Header`. |
413 | /// |
414 | /// # Safety |
415 | /// |
416 | /// The provided raw pointer must point at the header of a task. |
417 | pub(super) unsafe fn get_id_ptr(me: NonNull<Header>) -> NonNull<Id> { |
418 | let offset = me.as_ref().vtable.id_offset; |
419 | let id = me.as_ptr().cast::<u8>().add(offset).cast::<Id>(); |
420 | NonNull::new_unchecked(id) |
421 | } |
422 | |
423 | /// Gets the id of the task containing this `Header`. |
424 | /// |
425 | /// # Safety |
426 | /// |
427 | /// The provided raw pointer must point at the header of a task. |
428 | pub(super) unsafe fn get_id(me: NonNull<Header>) -> Id { |
429 | let ptr = Header::get_id_ptr(me).as_ptr(); |
430 | *ptr |
431 | } |
432 | |
433 | /// Gets the tracing id of the task containing this `Header`. |
434 | /// |
435 | /// # Safety |
436 | /// |
437 | /// The provided raw pointer must point at the header of a task. |
438 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
439 | pub(super) unsafe fn get_tracing_id(me: &NonNull<Header>) -> Option<&tracing::Id> { |
440 | me.as_ref().tracing_id.as_ref() |
441 | } |
442 | } |
443 | |
444 | impl Trailer { |
445 | pub(super) unsafe fn set_waker(&self, waker: Option<Waker>) { |
446 | self.waker.with_mut(|ptr: *mut Option| { |
447 | *ptr = waker; |
448 | }); |
449 | } |
450 | |
451 | pub(super) unsafe fn will_wake(&self, waker: &Waker) -> bool { |
452 | self.waker |
453 | .with(|ptr: *const Option| (*ptr).as_ref().unwrap().will_wake(waker)) |
454 | } |
455 | |
456 | pub(super) fn wake_join(&self) { |
457 | self.waker.with(|ptr: *const Option| match unsafe { &*ptr } { |
458 | Some(waker: &Waker) => waker.wake_by_ref(), |
459 | None => panic!("waker missing" ), |
460 | }); |
461 | } |
462 | } |
463 | |
464 | #[test ] |
465 | #[cfg (not(loom))] |
466 | fn header_lte_cache_line() { |
467 | use std::mem::size_of; |
468 | |
469 | assert!(size_of::<Header>() <= 8 * size_of::<*const ()>()); |
470 | } |
471 | |