| 1 | use std::future::Future; |
| 2 | use std::pin::Pin; |
| 3 | use std::sync::Arc; |
| 4 | use std::task::{Context, Poll}; |
| 5 | |
| 6 | use pin_project_lite::pin_project; |
| 7 | |
| 8 | use crate::io; |
| 9 | use crate::task::{JoinHandle, Task, TaskLocalsWrapper}; |
| 10 | |
| 11 | /// Task builder that configures the settings of a new task. |
| 12 | #[derive (Debug, Default)] |
| 13 | pub struct Builder { |
| 14 | pub(crate) name: Option<String>, |
| 15 | } |
| 16 | |
| 17 | impl Builder { |
| 18 | /// Creates a new builder. |
| 19 | #[inline ] |
| 20 | pub fn new() -> Builder { |
| 21 | Builder { name: None } |
| 22 | } |
| 23 | |
| 24 | /// Configures the name of the task. |
| 25 | #[inline ] |
| 26 | pub fn name(mut self, name: String) -> Builder { |
| 27 | self.name = Some(name); |
| 28 | self |
| 29 | } |
| 30 | |
| 31 | fn build<F, T>(self, future: F) -> SupportTaskLocals<F> |
| 32 | where |
| 33 | F: Future<Output = T>, |
| 34 | { |
| 35 | let name = self.name.map(Arc::new); |
| 36 | |
| 37 | // Create a new task handle. |
| 38 | let task = Task::new(name); |
| 39 | |
| 40 | #[cfg (not(target_os = "unknown" ))] |
| 41 | once_cell::sync::Lazy::force(&crate::rt::RUNTIME); |
| 42 | |
| 43 | let tag = TaskLocalsWrapper::new(task); |
| 44 | |
| 45 | SupportTaskLocals { tag, future } |
| 46 | } |
| 47 | |
| 48 | /// Spawns a task with the configured settings. |
| 49 | #[cfg (not(target_os = "unknown" ))] |
| 50 | pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
| 51 | where |
| 52 | F: Future<Output = T> + Send + 'static, |
| 53 | T: Send + 'static, |
| 54 | { |
| 55 | let wrapped = self.build(future); |
| 56 | |
| 57 | kv_log_macro::trace!("spawn" , { |
| 58 | task_id: wrapped.tag.id().0, |
| 59 | parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
| 60 | }); |
| 61 | |
| 62 | let task = wrapped.tag.task().clone(); |
| 63 | let handle = async_global_executor::spawn(wrapped); |
| 64 | |
| 65 | Ok(JoinHandle::new(handle, task)) |
| 66 | } |
| 67 | |
| 68 | /// Spawns a task locally with the configured settings. |
| 69 | #[cfg (all(not(target_os = "unknown" ), feature = "unstable" ))] |
| 70 | pub fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
| 71 | where |
| 72 | F: Future<Output = T> + 'static, |
| 73 | T: 'static, |
| 74 | { |
| 75 | let wrapped = self.build(future); |
| 76 | |
| 77 | kv_log_macro::trace!("spawn_local" , { |
| 78 | task_id: wrapped.tag.id().0, |
| 79 | parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
| 80 | }); |
| 81 | |
| 82 | let task = wrapped.tag.task().clone(); |
| 83 | let handle = async_global_executor::spawn_local(wrapped); |
| 84 | |
| 85 | Ok(JoinHandle::new(handle, task)) |
| 86 | } |
| 87 | |
| 88 | /// Spawns a task locally with the configured settings. |
| 89 | #[cfg (all(target_arch = "wasm32" , feature = "unstable" ))] |
| 90 | pub fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
| 91 | where |
| 92 | F: Future<Output = T> + 'static, |
| 93 | T: 'static, |
| 94 | { |
| 95 | use futures_channel::oneshot::channel; |
| 96 | let (sender, receiver) = channel(); |
| 97 | |
| 98 | let wrapped = self.build(async move { |
| 99 | let res = future.await; |
| 100 | let _ = sender.send(res); |
| 101 | }); |
| 102 | kv_log_macro::trace!("spawn_local" , { |
| 103 | task_id: wrapped.tag.id().0, |
| 104 | parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
| 105 | }); |
| 106 | |
| 107 | let task = wrapped.tag.task().clone(); |
| 108 | wasm_bindgen_futures::spawn_local(wrapped); |
| 109 | |
| 110 | Ok(JoinHandle::new(receiver, task)) |
| 111 | } |
| 112 | |
| 113 | /// Spawns a task locally with the configured settings. |
| 114 | #[cfg (all(target_arch = "wasm32" , not(feature = "unstable" )))] |
| 115 | pub(crate) fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
| 116 | where |
| 117 | F: Future<Output = T> + 'static, |
| 118 | T: 'static, |
| 119 | { |
| 120 | use futures_channel::oneshot::channel; |
| 121 | let (sender, receiver) = channel(); |
| 122 | |
| 123 | let wrapped = self.build(async move { |
| 124 | let res = future.await; |
| 125 | let _ = sender.send(res); |
| 126 | }); |
| 127 | |
| 128 | kv_log_macro::trace!("spawn_local" , { |
| 129 | task_id: wrapped.tag.id().0, |
| 130 | parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
| 131 | }); |
| 132 | |
| 133 | let task = wrapped.tag.task().clone(); |
| 134 | wasm_bindgen_futures::spawn_local(wrapped); |
| 135 | |
| 136 | Ok(JoinHandle::new(receiver, task)) |
| 137 | } |
| 138 | |
| 139 | /// Spawns a task with the configured settings, blocking on its execution. |
| 140 | #[cfg (not(target_os = "unknown" ))] |
| 141 | pub fn blocking<F, T>(self, future: F) -> T |
| 142 | where |
| 143 | F: Future<Output = T>, |
| 144 | { |
| 145 | use std::cell::Cell; |
| 146 | |
| 147 | let wrapped = self.build(future); |
| 148 | |
| 149 | // Log this `block_on` operation. |
| 150 | kv_log_macro::trace!("block_on" , { |
| 151 | task_id: wrapped.tag.id().0, |
| 152 | parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
| 153 | }); |
| 154 | |
| 155 | thread_local! { |
| 156 | /// Tracks the number of nested block_on calls. |
| 157 | static NUM_NESTED_BLOCKING: Cell<usize> = Cell::new(0); |
| 158 | } |
| 159 | |
| 160 | // Run the future as a task. |
| 161 | NUM_NESTED_BLOCKING.with(|num_nested_blocking| { |
| 162 | let count = num_nested_blocking.get(); |
| 163 | let should_run = count == 0; |
| 164 | // increase the count |
| 165 | num_nested_blocking.replace(count + 1); |
| 166 | |
| 167 | unsafe { |
| 168 | TaskLocalsWrapper::set_current(&wrapped.tag, || { |
| 169 | let res = if should_run { |
| 170 | // The first call should run the executor |
| 171 | async_global_executor::block_on(wrapped) |
| 172 | } else { |
| 173 | futures_lite::future::block_on(wrapped) |
| 174 | }; |
| 175 | num_nested_blocking.replace(num_nested_blocking.get() - 1); |
| 176 | res |
| 177 | }) |
| 178 | } |
| 179 | }) |
| 180 | } |
| 181 | } |
| 182 | |
| 183 | pin_project! { |
| 184 | /// Wrapper to add support for task locals. |
| 185 | struct SupportTaskLocals<F> { |
| 186 | tag: TaskLocalsWrapper, |
| 187 | #[pin] |
| 188 | future: F, |
| 189 | } |
| 190 | } |
| 191 | |
| 192 | impl<F: Future> Future for SupportTaskLocals<F> { |
| 193 | type Output = F::Output; |
| 194 | |
| 195 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 196 | unsafe { |
| 197 | TaskLocalsWrapper::set_current(&self.tag, || { |
| 198 | let this: Projection<'_, F> = self.project(); |
| 199 | this.future.poll(cx) |
| 200 | }) |
| 201 | } |
| 202 | } |
| 203 | } |
| 204 | |