| 1 | // Take a look at the license at the top of the repository in the LICENSE file. |
| 2 | |
| 3 | use std::{ |
| 4 | any::Any, cell::Cell, fmt, marker::PhantomData, mem, num::NonZeroU32, panic, pin::Pin, ptr, |
| 5 | thread, |
| 6 | }; |
| 7 | |
| 8 | use futures_channel::oneshot; |
| 9 | use futures_core::{ |
| 10 | future::Future, |
| 11 | task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, |
| 12 | }; |
| 13 | use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; |
| 14 | use futures_util::FutureExt; |
| 15 | |
| 16 | use crate::{ |
| 17 | ffi, thread_guard::ThreadGuard, translate::*, MainContext, MainLoop, Priority, Source, SourceId, |
| 18 | }; |
| 19 | |
| 20 | // Wrapper around Send Futures and non-Send Futures that will panic |
| 21 | // if the non-Send Future is polled/dropped from a different thread |
| 22 | // than where this was created. |
| 23 | enum FutureWrapper { |
| 24 | Send(FutureObj<'static, Box<dyn Any + Send + 'static>>), |
| 25 | NonSend(ThreadGuard<LocalFutureObj<'static, Box<dyn Any + 'static>>>), |
| 26 | } |
| 27 | |
| 28 | unsafe impl Send for FutureWrapper {} |
| 29 | unsafe impl Sync for FutureWrapper {} |
| 30 | |
| 31 | impl Future for FutureWrapper { |
| 32 | type Output = Box<dyn Any + 'static>; |
| 33 | |
| 34 | fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> { |
| 35 | match self.get_mut() { |
| 36 | FutureWrapper::Send(fut: &mut FutureObj<'static, Box>) => { |
| 37 | Pin::new(fut).poll(cx:ctx).map(|b: Box| b as Box<dyn Any + 'static>) |
| 38 | } |
| 39 | FutureWrapper::NonSend(fut: &mut ThreadGuard>) => Pin::new(fut.get_mut()).poll(cx:ctx), |
| 40 | } |
| 41 | } |
| 42 | } |
| 43 | |
| 44 | // The TaskSource and WakerSource are split up as the TaskSource |
| 45 | // must only be finalized on the thread that owns the main context, |
| 46 | // but the WakerSource is passed around to arbitrary threads for |
| 47 | // being able to wake up the TaskSource. |
| 48 | // |
| 49 | // The WakerSource is set up as a child source of the TaskSource, i.e. |
| 50 | // whenever it is ready also the TaskSource is ready. |
| 51 | #[repr (C)] |
| 52 | struct TaskSource { |
| 53 | source: ffi::GSource, |
| 54 | future: FutureWrapper, |
| 55 | waker: Waker, |
| 56 | return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>, |
| 57 | } |
| 58 | |
| 59 | #[repr (C)] |
| 60 | struct WakerSource { |
| 61 | source: ffi::GSource, |
| 62 | } |
| 63 | |
| 64 | impl TaskSource { |
| 65 | unsafe extern "C" fn dispatch( |
| 66 | source: *mut ffi::GSource, |
| 67 | callback: ffi::GSourceFunc, |
| 68 | _user_data: ffi::gpointer, |
| 69 | ) -> ffi::gboolean { |
| 70 | let source = &mut *(source as *mut Self); |
| 71 | debug_assert!(callback.is_none()); |
| 72 | |
| 73 | // Poll the TaskSource and ensure we're never called again if the |
| 74 | // contained Future resolved now. |
| 75 | if let Poll::Ready(()) = source.poll() { |
| 76 | ffi::G_SOURCE_REMOVE |
| 77 | } else { |
| 78 | ffi::G_SOURCE_CONTINUE |
| 79 | } |
| 80 | } |
| 81 | |
| 82 | unsafe extern "C" fn finalize(source: *mut ffi::GSource) { |
| 83 | let source = source as *mut Self; |
| 84 | |
| 85 | // This will panic if the future was a local future and is dropped from a different thread |
| 86 | // than where it was created so try to drop it from the main context if we're on another |
| 87 | // thread and the main context still exists. |
| 88 | // |
| 89 | // This can only really happen if the `Source` was manually retrieve from the context, but |
| 90 | // better safe than sorry. |
| 91 | match (*source).future { |
| 92 | FutureWrapper::Send(_) => { |
| 93 | ptr::drop_in_place(&mut (*source).future); |
| 94 | } |
| 95 | FutureWrapper::NonSend(ref mut future) if future.is_owner() => { |
| 96 | ptr::drop_in_place(&mut (*source).future); |
| 97 | } |
| 98 | FutureWrapper::NonSend(ref mut future) => { |
| 99 | let context = ffi::g_source_get_context(source as *mut ffi::GSource); |
| 100 | if !context.is_null() { |
| 101 | let future = ptr::read(future); |
| 102 | let context = MainContext::from_glib_none(context); |
| 103 | context.invoke(move || { |
| 104 | drop(future); |
| 105 | }); |
| 106 | } else { |
| 107 | // This will panic |
| 108 | ptr::drop_in_place(&mut (*source).future); |
| 109 | } |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | ptr::drop_in_place(&mut (*source).return_tx); |
| 114 | |
| 115 | // Drop the waker to unref the underlying GSource |
| 116 | ptr::drop_in_place(&mut (*source).waker); |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | impl WakerSource { |
| 121 | unsafe fn clone_raw(waker: *const ()) -> RawWaker { |
| 122 | static VTABLE: RawWakerVTable = RawWakerVTable::new( |
| 123 | WakerSource::clone_raw, |
| 124 | WakerSource::wake_raw, |
| 125 | WakerSource::wake_by_ref_raw, |
| 126 | WakerSource::drop_raw, |
| 127 | ); |
| 128 | |
| 129 | let waker = waker as *const ffi::GSource; |
| 130 | ffi::g_source_ref(mut_override(waker)); |
| 131 | RawWaker::new(waker as *const (), &VTABLE) |
| 132 | } |
| 133 | |
| 134 | unsafe fn wake_raw(waker: *const ()) { |
| 135 | Self::wake_by_ref_raw(waker); |
| 136 | Self::drop_raw(waker); |
| 137 | } |
| 138 | |
| 139 | unsafe fn wake_by_ref_raw(waker: *const ()) { |
| 140 | let waker = waker as *const ffi::GSource; |
| 141 | ffi::g_source_set_ready_time(mut_override(waker), 0); |
| 142 | } |
| 143 | |
| 144 | unsafe fn drop_raw(waker: *const ()) { |
| 145 | let waker = waker as *const ffi::GSource; |
| 146 | ffi::g_source_unref(mut_override(waker)); |
| 147 | } |
| 148 | |
| 149 | unsafe extern "C" fn dispatch( |
| 150 | source: *mut ffi::GSource, |
| 151 | _callback: ffi::GSourceFunc, |
| 152 | _user_data: ffi::gpointer, |
| 153 | ) -> ffi::gboolean { |
| 154 | // Set ready-time to -1 so that we're not called again before |
| 155 | // being woken up another time. |
| 156 | ffi::g_source_set_ready_time(mut_override(source), -1); |
| 157 | ffi::G_SOURCE_CONTINUE |
| 158 | } |
| 159 | } |
| 160 | |
| 161 | unsafe impl Send for TaskSource {} |
| 162 | unsafe impl Sync for TaskSource {} |
| 163 | |
| 164 | unsafe impl Send for WakerSource {} |
| 165 | unsafe impl Sync for WakerSource {} |
| 166 | |
| 167 | impl TaskSource { |
| 168 | #[allow (clippy::new_ret_no_self)] |
| 169 | // checker-ignore-item |
| 170 | fn new( |
| 171 | priority: Priority, |
| 172 | future: FutureWrapper, |
| 173 | return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>, |
| 174 | ) -> Source { |
| 175 | unsafe { |
| 176 | static TASK_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs { |
| 177 | check: None, |
| 178 | prepare: None, |
| 179 | dispatch: Some(TaskSource::dispatch), |
| 180 | finalize: Some(TaskSource::finalize), |
| 181 | closure_callback: None, |
| 182 | closure_marshal: None, |
| 183 | }; |
| 184 | static WAKER_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs { |
| 185 | check: None, |
| 186 | prepare: None, |
| 187 | dispatch: Some(WakerSource::dispatch), |
| 188 | finalize: None, |
| 189 | closure_callback: None, |
| 190 | closure_marshal: None, |
| 191 | }; |
| 192 | |
| 193 | let source = ffi::g_source_new( |
| 194 | mut_override(&TASK_SOURCE_FUNCS), |
| 195 | mem::size_of::<Self>() as u32, |
| 196 | ); |
| 197 | |
| 198 | let waker_source = ffi::g_source_new( |
| 199 | mut_override(&WAKER_SOURCE_FUNCS), |
| 200 | mem::size_of::<WakerSource>() as u32, |
| 201 | ); |
| 202 | |
| 203 | ffi::g_source_set_priority(source, priority.into_glib()); |
| 204 | ffi::g_source_add_child_source(source, waker_source); |
| 205 | |
| 206 | { |
| 207 | let source = &mut *(source as *mut Self); |
| 208 | ptr::write(&mut source.future, future); |
| 209 | ptr::write(&mut source.return_tx, return_tx); |
| 210 | |
| 211 | // This creates a new reference to the waker source. |
| 212 | let waker = Waker::from_raw(WakerSource::clone_raw(waker_source as *const ())); |
| 213 | ptr::write(&mut source.waker, waker); |
| 214 | } |
| 215 | |
| 216 | // Set ready time to 0 so that the source is immediately dispatched |
| 217 | // for doing the initial polling. This will then either resolve the |
| 218 | // future or register the waker wherever necessary. |
| 219 | ffi::g_source_set_ready_time(waker_source, 0); |
| 220 | |
| 221 | // Unref the waker source, a strong reference to it is stored inside |
| 222 | // the task source directly and inside the task source as child source. |
| 223 | ffi::g_source_unref(waker_source); |
| 224 | |
| 225 | from_glib_full(source) |
| 226 | } |
| 227 | } |
| 228 | |
| 229 | fn poll(&mut self) -> Poll<()> { |
| 230 | let source = &self.source as *const _; |
| 231 | let executor: Borrowed<MainContext> = |
| 232 | unsafe { from_glib_borrow(ffi::g_source_get_context(mut_override(source))) }; |
| 233 | |
| 234 | assert!( |
| 235 | executor.is_owner(), |
| 236 | "Polling futures only allowed if the thread is owning the MainContext" |
| 237 | ); |
| 238 | |
| 239 | executor |
| 240 | .with_thread_default(|| { |
| 241 | let _enter = futures_executor::enter().unwrap(); |
| 242 | let mut context = Context::from_waker(&self.waker); |
| 243 | |
| 244 | // This will panic if the future was a local future and is called from |
| 245 | // a different thread than where it was created. |
| 246 | if let Some(tx) = self.return_tx.take() { |
| 247 | let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
| 248 | Pin::new(&mut self.future).poll(&mut context) |
| 249 | })); |
| 250 | match res { |
| 251 | Ok(Poll::Ready(res)) => { |
| 252 | let _ = tx.send(Ok(res)); |
| 253 | Poll::Ready(()) |
| 254 | } |
| 255 | Ok(Poll::Pending) => { |
| 256 | self.return_tx.replace(tx); |
| 257 | Poll::Pending |
| 258 | } |
| 259 | Err(e) => { |
| 260 | let _ = tx.send(Err(e)); |
| 261 | Poll::Ready(()) |
| 262 | } |
| 263 | } |
| 264 | } else { |
| 265 | Pin::new(&mut self.future).poll(&mut context).map(|_| ()) |
| 266 | } |
| 267 | }) |
| 268 | .expect("current thread is not owner of the main context" ) |
| 269 | } |
| 270 | } |
| 271 | |
| 272 | // rustdoc-stripper-ignore-next |
| 273 | /// A handle to a task running on a [`MainContext`]. |
| 274 | /// |
| 275 | /// Like [`std::thread::JoinHandle`] for a task rather than a thread. The return value from the |
| 276 | /// task can be retrieved by awaiting on this object. Dropping the handle "detaches" the task, |
| 277 | /// allowing it to complete but discarding the return value. |
| 278 | #[derive (Debug)] |
| 279 | pub struct JoinHandle<T> { |
| 280 | rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>, |
| 281 | source: Source, |
| 282 | id: Cell<Option<NonZeroU32>>, |
| 283 | phantom: PhantomData<oneshot::Receiver<std::thread::Result<T>>>, |
| 284 | } |
| 285 | |
| 286 | impl<T> JoinHandle<T> { |
| 287 | #[inline ] |
| 288 | fn new( |
| 289 | ctx: &MainContext, |
| 290 | source: Source, |
| 291 | rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>, |
| 292 | ) -> Self { |
| 293 | let id = source.attach(Some(ctx)); |
| 294 | let id = Cell::new(Some(unsafe { NonZeroU32::new_unchecked(id.as_raw()) })); |
| 295 | Self { |
| 296 | rx, |
| 297 | source, |
| 298 | id, |
| 299 | phantom: PhantomData, |
| 300 | } |
| 301 | } |
| 302 | // rustdoc-stripper-ignore-next |
| 303 | /// Returns the internal source ID. |
| 304 | /// |
| 305 | /// Returns `None` if the handle was aborted already. |
| 306 | #[inline ] |
| 307 | pub fn as_raw_source_id(&self) -> Option<u32> { |
| 308 | self.id.get().map(|i| i.get()) |
| 309 | } |
| 310 | // rustdoc-stripper-ignore-next |
| 311 | /// Aborts the task associated with the handle. |
| 312 | #[inline ] |
| 313 | pub fn abort(&self) { |
| 314 | self.source.destroy(); |
| 315 | self.id.replace(None); |
| 316 | } |
| 317 | // rustdoc-stripper-ignore-next |
| 318 | /// Returns the [`Source`] associated with this handle. |
| 319 | #[inline ] |
| 320 | pub fn source(&self) -> &Source { |
| 321 | &self.source |
| 322 | } |
| 323 | // rustdoc-stripper-ignore-next |
| 324 | /// Safely converts the handle into a [`SourceId`]. |
| 325 | /// |
| 326 | /// Can be used to discard the return value while still retaining the ability to abort the |
| 327 | /// underlying task. Returns `Err(self)` if the handle was aborted already. |
| 328 | pub fn into_source_id(self) -> Result<SourceId, Self> { |
| 329 | if let Some(id) = self.id.take() { |
| 330 | Ok(unsafe { SourceId::from_glib(id.get()) }) |
| 331 | } else { |
| 332 | Err(self) |
| 333 | } |
| 334 | } |
| 335 | } |
| 336 | |
| 337 | impl<T: 'static> Future for JoinHandle<T> { |
| 338 | type Output = Result<T, JoinError>; |
| 339 | #[inline ] |
| 340 | fn poll( |
| 341 | mut self: std::pin::Pin<&mut Self>, |
| 342 | cx: &mut std::task::Context<'_>, |
| 343 | ) -> std::task::Poll<Self::Output> { |
| 344 | std::pin::Pin::new(&mut self.rx).poll(cx).map(|r: Result, …>, …>| match r { |
| 345 | Err(_) => Err(JoinErrorInner::Cancelled.into()), |
| 346 | Ok(Err(e: Box)) => Err(JoinErrorInner::Panic(e).into()), |
| 347 | Ok(Ok(r: Box)) => Ok(*r.downcast().unwrap()), |
| 348 | }) |
| 349 | } |
| 350 | } |
| 351 | |
| 352 | impl<T: 'static> futures_core::FusedFuture for JoinHandle<T> { |
| 353 | #[inline ] |
| 354 | fn is_terminated(&self) -> bool { |
| 355 | self.rx.is_terminated() |
| 356 | } |
| 357 | } |
| 358 | |
| 359 | // Safety: We can't rely on the auto implementation because we are retrieving |
| 360 | // the result as a `Box<dyn Any + 'static>` from the [`Source`]. We need to |
| 361 | // rely on type erasure here, so we have to manually assert the Send bound too. |
| 362 | unsafe impl<T: Send> Send for JoinHandle<T> {} |
| 363 | |
| 364 | // rustdoc-stripper-ignore-next |
| 365 | /// Variant of [`JoinHandle`] that is returned from [`MainContext::spawn_from_within`]. |
| 366 | #[derive (Debug)] |
| 367 | pub struct SpawnWithinJoinHandle<T> { |
| 368 | rx: Option<oneshot::Receiver<JoinHandle<T>>>, |
| 369 | join_handle: Option<JoinHandle<T>>, |
| 370 | } |
| 371 | |
| 372 | impl<T> SpawnWithinJoinHandle<T> { |
| 373 | // rustdoc-stripper-ignore-next |
| 374 | /// Waits until the task is spawned and returns the [`JoinHandle`]. |
| 375 | pub async fn into_inner(self) -> Result<JoinHandle<T>, JoinError> { |
| 376 | if let Some(join_handle: JoinHandle) = self.join_handle { |
| 377 | return Ok(join_handle); |
| 378 | } |
| 379 | |
| 380 | if let Some(rx: Receiver>) = self.rx { |
| 381 | match rx.await { |
| 382 | Ok(join_handle: JoinHandle) => return Ok(join_handle), |
| 383 | Err(_) => return Err(JoinErrorInner::Cancelled.into()), |
| 384 | } |
| 385 | } |
| 386 | |
| 387 | Err(JoinErrorInner::Cancelled.into()) |
| 388 | } |
| 389 | } |
| 390 | |
| 391 | impl<T: 'static> Future for SpawnWithinJoinHandle<T> { |
| 392 | type Output = Result<T, JoinError>; |
| 393 | #[inline ] |
| 394 | fn poll( |
| 395 | mut self: std::pin::Pin<&mut Self>, |
| 396 | cx: &mut std::task::Context<'_>, |
| 397 | ) -> std::task::Poll<Self::Output> { |
| 398 | if let Some(ref mut rx) = self.rx { |
| 399 | match std::pin::Pin::new(rx).poll(cx) { |
| 400 | std::task::Poll::Pending => return std::task::Poll::Pending, |
| 401 | std::task::Poll::Ready(Err(_)) => { |
| 402 | self.rx = None; |
| 403 | return std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into())); |
| 404 | } |
| 405 | std::task::Poll::Ready(Ok(join_handle)) => { |
| 406 | self.rx = None; |
| 407 | self.join_handle = Some(join_handle); |
| 408 | } |
| 409 | } |
| 410 | } |
| 411 | |
| 412 | if let Some(ref mut join_handle) = self.join_handle { |
| 413 | match std::pin::Pin::new(join_handle).poll(cx) { |
| 414 | std::task::Poll::Pending => return std::task::Poll::Pending, |
| 415 | std::task::Poll::Ready(Err(e)) => { |
| 416 | self.join_handle = None; |
| 417 | return std::task::Poll::Ready(Err(e)); |
| 418 | } |
| 419 | std::task::Poll::Ready(Ok(r)) => { |
| 420 | self.join_handle = None; |
| 421 | return std::task::Poll::Ready(Ok(r)); |
| 422 | } |
| 423 | } |
| 424 | } |
| 425 | |
| 426 | std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into())) |
| 427 | } |
| 428 | } |
| 429 | |
| 430 | impl<T: 'static> futures_core::FusedFuture for SpawnWithinJoinHandle<T> { |
| 431 | #[inline ] |
| 432 | fn is_terminated(&self) -> bool { |
| 433 | if let Some(ref rx: &Receiver>) = self.rx { |
| 434 | rx.is_terminated() |
| 435 | } else if let Some(ref join_handle: &JoinHandle) = self.join_handle { |
| 436 | join_handle.is_terminated() |
| 437 | } else { |
| 438 | true |
| 439 | } |
| 440 | } |
| 441 | } |
| 442 | |
| 443 | // rustdoc-stripper-ignore-next |
| 444 | /// Task failure from awaiting a [`JoinHandle`]. |
| 445 | #[derive (Debug)] |
| 446 | pub struct JoinError(JoinErrorInner); |
| 447 | |
| 448 | impl JoinError { |
| 449 | // rustdoc-stripper-ignore-next |
| 450 | /// Returns `true` if the handle was cancelled. |
| 451 | #[inline ] |
| 452 | pub fn is_cancelled(&self) -> bool { |
| 453 | matches!(self.0, JoinErrorInner::Cancelled) |
| 454 | } |
| 455 | // rustdoc-stripper-ignore-next |
| 456 | /// Returns `true` if the task terminated with a panic. |
| 457 | #[inline ] |
| 458 | pub fn is_panic(&self) -> bool { |
| 459 | matches!(self.0, JoinErrorInner::Panic(_)) |
| 460 | } |
| 461 | // rustdoc-stripper-ignore-next |
| 462 | /// Converts the error into a panic result. |
| 463 | /// |
| 464 | /// # Panics |
| 465 | /// |
| 466 | /// Panics if the error is not a panic error. Use [`is_panic`](Self::is_panic) to check first |
| 467 | /// if the error represents a panic. |
| 468 | #[inline ] |
| 469 | pub fn into_panic(self) -> Box<dyn Any + Send + 'static> { |
| 470 | self.try_into_panic() |
| 471 | .expect("`JoinError` is not a panic error" ) |
| 472 | } |
| 473 | // rustdoc-stripper-ignore-next |
| 474 | /// Attempts to convert the error into a panic result. |
| 475 | /// |
| 476 | /// Returns `Err(self)` if the error is not a panic result. |
| 477 | #[inline ] |
| 478 | pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, Self> { |
| 479 | match self.0 { |
| 480 | JoinErrorInner::Panic(e) => Ok(e), |
| 481 | e => Err(Self(e)), |
| 482 | } |
| 483 | } |
| 484 | } |
| 485 | |
| 486 | impl std::error::Error for JoinError {} |
| 487 | |
| 488 | impl std::fmt::Display for JoinError { |
| 489 | #[inline ] |
| 490 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| 491 | self.0.fmt(f) |
| 492 | } |
| 493 | } |
| 494 | |
| 495 | #[derive (Debug)] |
| 496 | enum JoinErrorInner { |
| 497 | Cancelled, |
| 498 | Panic(Box<dyn Any + Send + 'static>), |
| 499 | } |
| 500 | |
| 501 | impl From<JoinErrorInner> for JoinError { |
| 502 | #[inline ] |
| 503 | fn from(e: JoinErrorInner) -> Self { |
| 504 | Self(e) |
| 505 | } |
| 506 | } |
| 507 | |
| 508 | impl std::error::Error for JoinErrorInner {} |
| 509 | |
| 510 | impl fmt::Display for JoinErrorInner { |
| 511 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| 512 | match self { |
| 513 | Self::Cancelled => fmt.write_str(data:"task cancelled" ), |
| 514 | Self::Panic(_) => fmt.write_str(data:"task panicked" ), |
| 515 | } |
| 516 | } |
| 517 | } |
| 518 | |
| 519 | impl MainContext { |
| 520 | // rustdoc-stripper-ignore-next |
| 521 | /// Spawn a new infallible `Future` on the main context. |
| 522 | /// |
| 523 | /// This can be called from any thread and will execute the future from the thread |
| 524 | /// where main context is running, e.g. via a `MainLoop`. |
| 525 | pub fn spawn<R: Send + 'static, F: Future<Output = R> + Send + 'static>( |
| 526 | &self, |
| 527 | f: F, |
| 528 | ) -> JoinHandle<R> { |
| 529 | self.spawn_with_priority(crate::Priority::default(), f) |
| 530 | } |
| 531 | |
| 532 | // rustdoc-stripper-ignore-next |
| 533 | /// Spawn a new infallible `Future` on the main context. |
| 534 | /// |
| 535 | /// The given `Future` does not have to be `Send`. |
| 536 | /// |
| 537 | /// This can be called only from the thread where the main context is running, e.g. |
| 538 | /// from any other `Future` that is executed on this main context, or after calling |
| 539 | /// `with_thread_default` or `acquire` on the main context. |
| 540 | pub fn spawn_local<R: 'static, F: Future<Output = R> + 'static>(&self, f: F) -> JoinHandle<R> { |
| 541 | self.spawn_local_with_priority(crate::Priority::default(), f) |
| 542 | } |
| 543 | |
| 544 | // rustdoc-stripper-ignore-next |
| 545 | /// Spawn a new infallible `Future` on the main context, with a non-default priority. |
| 546 | /// |
| 547 | /// This can be called from any thread and will execute the future from the thread |
| 548 | /// where main context is running, e.g. via a `MainLoop`. |
| 549 | pub fn spawn_with_priority<R: Send + 'static, F: Future<Output = R> + Send + 'static>( |
| 550 | &self, |
| 551 | priority: Priority, |
| 552 | f: F, |
| 553 | ) -> JoinHandle<R> { |
| 554 | let f = FutureObj::new(Box::new(async move { |
| 555 | Box::new(f.await) as Box<dyn Any + Send + 'static> |
| 556 | })); |
| 557 | let (tx, rx) = oneshot::channel(); |
| 558 | let source = TaskSource::new(priority, FutureWrapper::Send(f), Some(tx)); |
| 559 | JoinHandle::new(self, source, rx) |
| 560 | } |
| 561 | |
| 562 | // rustdoc-stripper-ignore-next |
| 563 | /// Spawn a new infallible `Future` on the main context, with a non-default priority. |
| 564 | /// |
| 565 | /// The given `Future` does not have to be `Send`. |
| 566 | /// |
| 567 | /// This can be called only from the thread where the main context is running, e.g. |
| 568 | /// from any other `Future` that is executed on this main context, or after calling |
| 569 | /// `with_thread_default` or `acquire` on the main context. |
| 570 | pub fn spawn_local_with_priority<R: 'static, F: Future<Output = R> + 'static>( |
| 571 | &self, |
| 572 | priority: Priority, |
| 573 | f: F, |
| 574 | ) -> JoinHandle<R> { |
| 575 | let _acquire = self |
| 576 | .acquire() |
| 577 | .expect("Spawning local futures only allowed on the thread owning the MainContext" ); |
| 578 | let f = LocalFutureObj::new(Box::new(async move { |
| 579 | Box::new(f.await) as Box<dyn Any + 'static> |
| 580 | })); |
| 581 | let (tx, rx) = oneshot::channel(); |
| 582 | let source = TaskSource::new( |
| 583 | priority, |
| 584 | FutureWrapper::NonSend(ThreadGuard::new(f)), |
| 585 | Some(tx), |
| 586 | ); |
| 587 | JoinHandle::new(self, source, rx) |
| 588 | } |
| 589 | |
| 590 | // rustdoc-stripper-ignore-next |
| 591 | /// Spawn a new infallible `Future` on the main context from inside the main context. |
| 592 | /// |
| 593 | /// The given `Future` does not have to be `Send` but the closure to spawn it has to be. |
| 594 | /// |
| 595 | /// This can be called only from any thread. |
| 596 | pub fn spawn_from_within<R: Send + 'static, F: Future<Output = R> + 'static>( |
| 597 | &self, |
| 598 | func: impl FnOnce() -> F + Send + 'static, |
| 599 | ) -> SpawnWithinJoinHandle<R> { |
| 600 | self.spawn_from_within_with_priority(crate::Priority::default(), func) |
| 601 | } |
| 602 | |
| 603 | // rustdoc-stripper-ignore-next |
| 604 | /// Spawn a new infallible `Future` on the main context from inside the main context. |
| 605 | /// |
| 606 | /// The given `Future` does not have to be `Send` but the closure to spawn it has to be. |
| 607 | /// |
| 608 | /// This can be called only from any thread. |
| 609 | pub fn spawn_from_within_with_priority<R: Send + 'static, F: Future<Output = R> + 'static>( |
| 610 | &self, |
| 611 | priority: Priority, |
| 612 | func: impl FnOnce() -> F + Send + 'static, |
| 613 | ) -> SpawnWithinJoinHandle<R> { |
| 614 | let ctx = self.clone(); |
| 615 | let (tx, rx) = oneshot::channel(); |
| 616 | self.invoke_with_priority(priority, move || { |
| 617 | let _ = tx.send(ctx.spawn_local(func())); |
| 618 | }); |
| 619 | |
| 620 | SpawnWithinJoinHandle { |
| 621 | rx: Some(rx), |
| 622 | join_handle: None, |
| 623 | } |
| 624 | } |
| 625 | |
| 626 | // rustdoc-stripper-ignore-next |
| 627 | /// Runs a new, infallible `Future` on the main context and block until it finished, returning |
| 628 | /// the result of the `Future`. |
| 629 | /// |
| 630 | /// The given `Future` does not have to be `Send` or `'static`. |
| 631 | /// |
| 632 | /// This must only be called if no `MainLoop` or anything else is running on this specific main |
| 633 | /// context. |
| 634 | #[allow (clippy::transmute_ptr_to_ptr)] |
| 635 | pub fn block_on<F: Future>(&self, f: F) -> F::Output { |
| 636 | let mut res = None; |
| 637 | let l = MainLoop::new(Some(self), false); |
| 638 | |
| 639 | let f = async { |
| 640 | res = Some(panic::AssertUnwindSafe(f).catch_unwind().await); |
| 641 | l.quit(); |
| 642 | }; |
| 643 | |
| 644 | let f = unsafe { |
| 645 | // Super-unsafe: We transmute here to get rid of the 'static lifetime |
| 646 | let f = LocalFutureObj::new(Box::new(async move { |
| 647 | f.await; |
| 648 | Box::new(()) as Box<dyn Any + 'static> |
| 649 | })); |
| 650 | let f: LocalFutureObj<'static, Box<dyn Any + 'static>> = mem::transmute(f); |
| 651 | f |
| 652 | }; |
| 653 | |
| 654 | let source = TaskSource::new( |
| 655 | crate::Priority::default(), |
| 656 | FutureWrapper::NonSend(ThreadGuard::new(f)), |
| 657 | None, |
| 658 | ); |
| 659 | source.attach(Some(self)); |
| 660 | |
| 661 | l.run(); |
| 662 | |
| 663 | match res.unwrap() { |
| 664 | Ok(v) => v, |
| 665 | Err(e) => panic::resume_unwind(e), |
| 666 | } |
| 667 | } |
| 668 | } |
| 669 | |
| 670 | impl Spawn for MainContext { |
| 671 | fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> { |
| 672 | let (tx: Sender, …>>, _) = oneshot::channel(); |
| 673 | let source: Source = TaskSource::new( |
| 674 | crate::Priority::default(), |
| 675 | future:FutureWrapper::Send(FutureObj::new(Box::new(async move { |
| 676 | f.await; |
| 677 | Box::new(()) as Box<dyn Any + Send + 'static> |
| 678 | }))), |
| 679 | return_tx:Some(tx), |
| 680 | ); |
| 681 | source.attach(context:Some(self)); |
| 682 | Ok(()) |
| 683 | } |
| 684 | } |
| 685 | |
| 686 | impl LocalSpawn for MainContext { |
| 687 | fn spawn_local_obj(&self, f: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { |
| 688 | let (tx: Sender, …>>, _) = oneshot::channel(); |
| 689 | let source: Source = TaskSource::new( |
| 690 | crate::Priority::default(), |
| 691 | future:FutureWrapper::NonSend(ThreadGuard::new(LocalFutureObj::new(Box::new( |
| 692 | async move { |
| 693 | f.await; |
| 694 | Box::new(()) as Box<dyn Any + 'static> |
| 695 | }, |
| 696 | )))), |
| 697 | return_tx:Some(tx), |
| 698 | ); |
| 699 | source.attach(context:Some(self)); |
| 700 | Ok(()) |
| 701 | } |
| 702 | } |
| 703 | |
| 704 | #[cfg (test)] |
| 705 | mod tests { |
| 706 | use std::{sync::mpsc, thread}; |
| 707 | |
| 708 | use futures_channel::oneshot; |
| 709 | use futures_util::future::{FutureExt, TryFutureExt}; |
| 710 | |
| 711 | use super::*; |
| 712 | |
| 713 | #[test ] |
| 714 | fn test_spawn() { |
| 715 | let c = MainContext::new(); |
| 716 | let l = crate::MainLoop::new(Some(&c), false); |
| 717 | |
| 718 | let (sender, receiver) = mpsc::channel(); |
| 719 | let (o_sender, o_receiver) = oneshot::channel(); |
| 720 | |
| 721 | let l_clone = l.clone(); |
| 722 | c.spawn( |
| 723 | o_receiver |
| 724 | .and_then(move |()| { |
| 725 | sender.send(()).unwrap(); |
| 726 | l_clone.quit(); |
| 727 | |
| 728 | futures_util::future::ok(()) |
| 729 | }) |
| 730 | .then(|res| { |
| 731 | assert!(res.is_ok()); |
| 732 | futures_util::future::ready(()) |
| 733 | }), |
| 734 | ); |
| 735 | |
| 736 | let join_handle = thread::spawn(move || { |
| 737 | l.run(); |
| 738 | }); |
| 739 | |
| 740 | o_sender.send(()).unwrap(); |
| 741 | |
| 742 | receiver.recv().unwrap(); |
| 743 | |
| 744 | join_handle.join().unwrap(); |
| 745 | } |
| 746 | |
| 747 | #[test ] |
| 748 | fn test_spawn_local() { |
| 749 | let c = MainContext::new(); |
| 750 | let l = crate::MainLoop::new(Some(&c), false); |
| 751 | |
| 752 | c.with_thread_default(|| { |
| 753 | let l_clone = l.clone(); |
| 754 | c.spawn_local(futures_util::future::lazy(move |_ctx| { |
| 755 | l_clone.quit(); |
| 756 | })); |
| 757 | |
| 758 | l.run(); |
| 759 | }) |
| 760 | .unwrap(); |
| 761 | } |
| 762 | |
| 763 | #[test ] |
| 764 | fn test_spawn_from_within() { |
| 765 | let c = MainContext::new(); |
| 766 | let l = crate::MainLoop::new(Some(&c), false); |
| 767 | |
| 768 | let join_handle = std::thread::spawn({ |
| 769 | let l_clone = l.clone(); |
| 770 | move || { |
| 771 | c.spawn_from_within(move || async move { |
| 772 | let rc = std::rc::Rc::new(123); |
| 773 | futures_util::future::ready(()).await; |
| 774 | assert_eq!(std::rc::Rc::strong_count(&rc), 1); |
| 775 | l_clone.quit(); |
| 776 | }); |
| 777 | } |
| 778 | }); |
| 779 | |
| 780 | l.run(); |
| 781 | |
| 782 | join_handle.join().unwrap(); |
| 783 | } |
| 784 | |
| 785 | #[test ] |
| 786 | fn test_block_on() { |
| 787 | let c = MainContext::new(); |
| 788 | |
| 789 | let mut v = None; |
| 790 | { |
| 791 | let v = &mut v; |
| 792 | |
| 793 | let future = futures_util::future::lazy(|_ctx| { |
| 794 | *v = Some(123); |
| 795 | Ok::<i32, ()>(123) |
| 796 | }); |
| 797 | |
| 798 | let res = c.block_on(future); |
| 799 | assert_eq!(res, Ok(123)); |
| 800 | } |
| 801 | |
| 802 | assert_eq!(v, Some(123)); |
| 803 | } |
| 804 | |
| 805 | #[test ] |
| 806 | fn test_spawn_return() { |
| 807 | let c = MainContext::new(); |
| 808 | c.block_on(async { |
| 809 | let val = 1; |
| 810 | let ret = c |
| 811 | .spawn(async move { futures_util::future::ready(2).await + val }) |
| 812 | .await; |
| 813 | assert_eq!(ret.unwrap(), 3); |
| 814 | }); |
| 815 | } |
| 816 | |
| 817 | #[test ] |
| 818 | fn test_spawn_panic() { |
| 819 | let c = MainContext::new(); |
| 820 | c.block_on(async { |
| 821 | let ret = c |
| 822 | .spawn(async { |
| 823 | panic!("failed" ); |
| 824 | }) |
| 825 | .await; |
| 826 | assert_eq!( |
| 827 | *ret.unwrap_err().into_panic().downcast::<&str>().unwrap(), |
| 828 | "failed" |
| 829 | ); |
| 830 | }); |
| 831 | } |
| 832 | |
| 833 | #[test ] |
| 834 | fn test_spawn_abort() { |
| 835 | let c = MainContext::new(); |
| 836 | let v = std::sync::Arc::new(1); |
| 837 | let v_clone = v.clone(); |
| 838 | let c_ref = &c; |
| 839 | c.block_on(async move { |
| 840 | let handle = c_ref.spawn(async move { |
| 841 | let _v = v_clone; |
| 842 | let test: u128 = std::future::pending().await; |
| 843 | println!("{test}" ); |
| 844 | unreachable!(); |
| 845 | }); |
| 846 | |
| 847 | handle.abort(); |
| 848 | }); |
| 849 | drop(c); |
| 850 | |
| 851 | // Make sure the inner future is actually freed. |
| 852 | assert_eq!(std::sync::Arc::strong_count(&v), 1); |
| 853 | } |
| 854 | } |
| 855 | |