1 | use std::future::Future; |
2 | use std::pin::Pin; |
3 | use std::sync::Arc; |
4 | use std::task::{Context, Poll}; |
5 | |
6 | use pin_project_lite::pin_project; |
7 | |
8 | use crate::io; |
9 | use crate::task::{JoinHandle, Task, TaskLocalsWrapper}; |
10 | |
11 | /// Task builder that configures the settings of a new task. |
12 | #[derive (Debug, Default)] |
13 | pub struct Builder { |
14 | pub(crate) name: Option<String>, |
15 | } |
16 | |
17 | impl Builder { |
18 | /// Creates a new builder. |
19 | #[inline ] |
20 | pub fn new() -> Builder { |
21 | Builder { name: None } |
22 | } |
23 | |
24 | /// Configures the name of the task. |
25 | #[inline ] |
26 | pub fn name(mut self, name: String) -> Builder { |
27 | self.name = Some(name); |
28 | self |
29 | } |
30 | |
31 | fn build<F, T>(self, future: F) -> SupportTaskLocals<F> |
32 | where |
33 | F: Future<Output = T>, |
34 | { |
35 | let name = self.name.map(Arc::new); |
36 | |
37 | // Create a new task handle. |
38 | let task = Task::new(name); |
39 | |
40 | #[cfg (not(target_os = "unknown" ))] |
41 | once_cell::sync::Lazy::force(&crate::rt::RUNTIME); |
42 | |
43 | let tag = TaskLocalsWrapper::new(task); |
44 | |
45 | SupportTaskLocals { tag, future } |
46 | } |
47 | |
48 | /// Spawns a task with the configured settings. |
49 | #[cfg (not(target_os = "unknown" ))] |
50 | pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
51 | where |
52 | F: Future<Output = T> + Send + 'static, |
53 | T: Send + 'static, |
54 | { |
55 | let wrapped = self.build(future); |
56 | |
57 | kv_log_macro::trace!("spawn" , { |
58 | task_id: wrapped.tag.id().0, |
59 | parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
60 | }); |
61 | |
62 | let task = wrapped.tag.task().clone(); |
63 | let handle = async_global_executor::spawn(wrapped); |
64 | |
65 | Ok(JoinHandle::new(handle, task)) |
66 | } |
67 | |
68 | /// Spawns a task locally with the configured settings. |
69 | #[cfg (all(not(target_os = "unknown" ), feature = "unstable" ))] |
70 | pub fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
71 | where |
72 | F: Future<Output = T> + 'static, |
73 | T: 'static, |
74 | { |
75 | let wrapped = self.build(future); |
76 | |
77 | kv_log_macro::trace!("spawn_local" , { |
78 | task_id: wrapped.tag.id().0, |
79 | parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
80 | }); |
81 | |
82 | let task = wrapped.tag.task().clone(); |
83 | let handle = async_global_executor::spawn_local(wrapped); |
84 | |
85 | Ok(JoinHandle::new(handle, task)) |
86 | } |
87 | |
88 | /// Spawns a task locally with the configured settings. |
89 | #[cfg (all(target_arch = "wasm32" , feature = "unstable" ))] |
90 | pub fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
91 | where |
92 | F: Future<Output = T> + 'static, |
93 | T: 'static, |
94 | { |
95 | use futures_channel::oneshot::channel; |
96 | let (sender, receiver) = channel(); |
97 | |
98 | let wrapped = self.build(async move { |
99 | let res = future.await; |
100 | let _ = sender.send(res); |
101 | }); |
102 | kv_log_macro::trace!("spawn_local" , { |
103 | task_id: wrapped.tag.id().0, |
104 | parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
105 | }); |
106 | |
107 | let task = wrapped.tag.task().clone(); |
108 | wasm_bindgen_futures::spawn_local(wrapped); |
109 | |
110 | Ok(JoinHandle::new(receiver, task)) |
111 | } |
112 | |
113 | /// Spawns a task locally with the configured settings. |
114 | #[cfg (all(target_arch = "wasm32" , not(feature = "unstable" )))] |
115 | pub(crate) fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
116 | where |
117 | F: Future<Output = T> + 'static, |
118 | T: 'static, |
119 | { |
120 | use futures_channel::oneshot::channel; |
121 | let (sender, receiver) = channel(); |
122 | |
123 | let wrapped = self.build(async move { |
124 | let res = future.await; |
125 | let _ = sender.send(res); |
126 | }); |
127 | |
128 | kv_log_macro::trace!("spawn_local" , { |
129 | task_id: wrapped.tag.id().0, |
130 | parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
131 | }); |
132 | |
133 | let task = wrapped.tag.task().clone(); |
134 | wasm_bindgen_futures::spawn_local(wrapped); |
135 | |
136 | Ok(JoinHandle::new(receiver, task)) |
137 | } |
138 | |
139 | /// Spawns a task with the configured settings, blocking on its execution. |
140 | #[cfg (not(target_os = "unknown" ))] |
141 | pub fn blocking<F, T>(self, future: F) -> T |
142 | where |
143 | F: Future<Output = T>, |
144 | { |
145 | use std::cell::Cell; |
146 | |
147 | let wrapped = self.build(future); |
148 | |
149 | // Log this `block_on` operation. |
150 | kv_log_macro::trace!("block_on" , { |
151 | task_id: wrapped.tag.id().0, |
152 | parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
153 | }); |
154 | |
155 | thread_local! { |
156 | /// Tracks the number of nested block_on calls. |
157 | static NUM_NESTED_BLOCKING: Cell<usize> = Cell::new(0); |
158 | } |
159 | |
160 | // Run the future as a task. |
161 | NUM_NESTED_BLOCKING.with(|num_nested_blocking| { |
162 | let count = num_nested_blocking.get(); |
163 | let should_run = count == 0; |
164 | // increase the count |
165 | num_nested_blocking.replace(count + 1); |
166 | |
167 | unsafe { |
168 | TaskLocalsWrapper::set_current(&wrapped.tag, || { |
169 | let res = if should_run { |
170 | // The first call should run the executor |
171 | async_global_executor::block_on(wrapped) |
172 | } else { |
173 | futures_lite::future::block_on(wrapped) |
174 | }; |
175 | num_nested_blocking.replace(num_nested_blocking.get() - 1); |
176 | res |
177 | }) |
178 | } |
179 | }) |
180 | } |
181 | } |
182 | |
183 | pin_project! { |
184 | /// Wrapper to add support for task locals. |
185 | struct SupportTaskLocals<F> { |
186 | tag: TaskLocalsWrapper, |
187 | #[pin] |
188 | future: F, |
189 | } |
190 | } |
191 | |
192 | impl<F: Future> Future for SupportTaskLocals<F> { |
193 | type Output = F::Output; |
194 | |
195 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
196 | unsafe { |
197 | TaskLocalsWrapper::set_current(&self.tag, || { |
198 | let this: Projection<'_, F> = self.project(); |
199 | this.future.poll(cx) |
200 | }) |
201 | } |
202 | } |
203 | } |
204 | |