1 | // Take a look at the license at the top of the repository in the LICENSE file. |
2 | |
3 | use std::{ |
4 | any::Any, cell::Cell, marker::PhantomData, mem, num::NonZeroU32, panic, pin::Pin, ptr, thread, |
5 | }; |
6 | |
7 | use futures_channel::oneshot; |
8 | use futures_core::{ |
9 | future::Future, |
10 | task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, |
11 | }; |
12 | use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; |
13 | use futures_util::FutureExt; |
14 | |
15 | use crate::{ |
16 | thread_guard::ThreadGuard, translate::*, MainContext, MainLoop, Priority, Source, SourceId, |
17 | }; |
18 | |
19 | // Wrapper around Send Futures and non-Send Futures that will panic |
20 | // if the non-Send Future is polled/dropped from a different thread |
21 | // than where this was created. |
22 | enum FutureWrapper { |
23 | Send(FutureObj<'static, Box<dyn Any + Send + 'static>>), |
24 | NonSend(ThreadGuard<LocalFutureObj<'static, Box<dyn Any + 'static>>>), |
25 | } |
26 | |
27 | unsafe impl Send for FutureWrapper {} |
28 | unsafe impl Sync for FutureWrapper {} |
29 | |
30 | impl Future for FutureWrapper { |
31 | type Output = Box<dyn Any + 'static>; |
32 | |
33 | fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> { |
34 | match self.get_mut() { |
35 | FutureWrapper::Send(fut: &mut FutureObj<'_, Box>) => { |
36 | Pin::new(fut).poll(cx:ctx).map(|b: Box| b as Box<dyn Any + 'static>) |
37 | } |
38 | FutureWrapper::NonSend(fut: &mut ThreadGuard>) => Pin::new(fut.get_mut()).poll(cx:ctx), |
39 | } |
40 | } |
41 | } |
42 | |
43 | // The TaskSource and WakerSource are split up as the TaskSource |
44 | // must only be finalized on the thread that owns the main context, |
45 | // but the WakerSource is passed around to arbitrary threads for |
46 | // being able to wake up the TaskSource. |
47 | // |
48 | // The WakerSource is set up as a child source of the TaskSource, i.e. |
49 | // whenever it is ready also the TaskSource is ready. |
50 | #[repr (C)] |
51 | struct TaskSource { |
52 | source: ffi::GSource, |
53 | future: FutureWrapper, |
54 | waker: Waker, |
55 | return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>, |
56 | } |
57 | |
58 | #[repr (C)] |
59 | struct WakerSource { |
60 | source: ffi::GSource, |
61 | } |
62 | |
63 | impl TaskSource { |
64 | unsafe extern "C" fn dispatch( |
65 | source: *mut ffi::GSource, |
66 | callback: ffi::GSourceFunc, |
67 | _user_data: ffi::gpointer, |
68 | ) -> ffi::gboolean { |
69 | let source = &mut *(source as *mut Self); |
70 | debug_assert!(callback.is_none()); |
71 | |
72 | // Poll the TaskSource and ensure we're never called again if the |
73 | // contained Future resolved now. |
74 | if let Poll::Ready(()) = source.poll() { |
75 | ffi::G_SOURCE_REMOVE |
76 | } else { |
77 | ffi::G_SOURCE_CONTINUE |
78 | } |
79 | } |
80 | |
81 | unsafe extern "C" fn finalize(source: *mut ffi::GSource) { |
82 | let source = source as *mut Self; |
83 | |
84 | // This will panic if the future was a local future and is dropped from a different thread |
85 | // than where it was created so try to drop it from the main context if we're on another |
86 | // thread and the main context still exists. |
87 | // |
88 | // This can only really happen if the `Source` was manually retrieve from the context, but |
89 | // better safe than sorry. |
90 | match (*source).future { |
91 | FutureWrapper::Send(_) => { |
92 | ptr::drop_in_place(&mut (*source).future); |
93 | } |
94 | FutureWrapper::NonSend(ref mut future) if future.is_owner() => { |
95 | ptr::drop_in_place(&mut (*source).future); |
96 | } |
97 | FutureWrapper::NonSend(ref mut future) => { |
98 | let context = ffi::g_source_get_context(source as *mut ffi::GSource); |
99 | if !context.is_null() { |
100 | let future = ptr::read(future); |
101 | let context = MainContext::from_glib_none(context); |
102 | context.invoke(move || { |
103 | drop(future); |
104 | }); |
105 | } else { |
106 | // This will panic |
107 | ptr::drop_in_place(&mut (*source).future); |
108 | } |
109 | } |
110 | } |
111 | |
112 | // Drop the waker to unref the underlying GSource |
113 | ptr::drop_in_place(&mut (*source).waker); |
114 | } |
115 | } |
116 | |
117 | impl WakerSource { |
118 | unsafe fn clone_raw(waker: *const ()) -> RawWaker { |
119 | static VTABLE: RawWakerVTable = RawWakerVTable::new( |
120 | WakerSource::clone_raw, |
121 | WakerSource::wake_raw, |
122 | WakerSource::wake_by_ref_raw, |
123 | WakerSource::drop_raw, |
124 | ); |
125 | |
126 | let waker = waker as *const ffi::GSource; |
127 | ffi::g_source_ref(mut_override(waker)); |
128 | RawWaker::new(waker as *const (), &VTABLE) |
129 | } |
130 | |
131 | unsafe fn wake_raw(waker: *const ()) { |
132 | Self::wake_by_ref_raw(waker); |
133 | Self::drop_raw(waker); |
134 | } |
135 | |
136 | unsafe fn wake_by_ref_raw(waker: *const ()) { |
137 | let waker = waker as *const ffi::GSource; |
138 | ffi::g_source_set_ready_time(mut_override(waker), 0); |
139 | } |
140 | |
141 | unsafe fn drop_raw(waker: *const ()) { |
142 | let waker = waker as *const ffi::GSource; |
143 | ffi::g_source_unref(mut_override(waker)); |
144 | } |
145 | |
146 | unsafe extern "C" fn dispatch( |
147 | source: *mut ffi::GSource, |
148 | _callback: ffi::GSourceFunc, |
149 | _user_data: ffi::gpointer, |
150 | ) -> ffi::gboolean { |
151 | // Set ready-time to -1 so that we're not called again before |
152 | // being woken up another time. |
153 | ffi::g_source_set_ready_time(mut_override(source), -1); |
154 | ffi::G_SOURCE_CONTINUE |
155 | } |
156 | } |
157 | |
158 | unsafe impl Send for TaskSource {} |
159 | unsafe impl Sync for TaskSource {} |
160 | |
161 | unsafe impl Send for WakerSource {} |
162 | unsafe impl Sync for WakerSource {} |
163 | |
164 | impl TaskSource { |
165 | #[allow (clippy::new_ret_no_self)] |
166 | // checker-ignore-item |
167 | fn new( |
168 | priority: Priority, |
169 | future: FutureWrapper, |
170 | return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>, |
171 | ) -> Source { |
172 | unsafe { |
173 | static TASK_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs { |
174 | check: None, |
175 | prepare: None, |
176 | dispatch: Some(TaskSource::dispatch), |
177 | finalize: Some(TaskSource::finalize), |
178 | closure_callback: None, |
179 | closure_marshal: None, |
180 | }; |
181 | static WAKER_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs { |
182 | check: None, |
183 | prepare: None, |
184 | dispatch: Some(WakerSource::dispatch), |
185 | finalize: None, |
186 | closure_callback: None, |
187 | closure_marshal: None, |
188 | }; |
189 | |
190 | let source = ffi::g_source_new( |
191 | mut_override(&TASK_SOURCE_FUNCS), |
192 | mem::size_of::<Self>() as u32, |
193 | ); |
194 | |
195 | let waker_source = ffi::g_source_new( |
196 | mut_override(&WAKER_SOURCE_FUNCS), |
197 | mem::size_of::<WakerSource>() as u32, |
198 | ); |
199 | |
200 | ffi::g_source_set_priority(source, priority.into_glib()); |
201 | ffi::g_source_add_child_source(source, waker_source); |
202 | |
203 | { |
204 | let source = &mut *(source as *mut Self); |
205 | ptr::write(&mut source.future, future); |
206 | ptr::write(&mut source.return_tx, return_tx); |
207 | |
208 | // This creates a new reference to the waker source. |
209 | let waker = Waker::from_raw(WakerSource::clone_raw(waker_source as *const ())); |
210 | ptr::write(&mut source.waker, waker); |
211 | } |
212 | |
213 | // Set ready time to 0 so that the source is immediately dispatched |
214 | // for doing the initial polling. This will then either resolve the |
215 | // future or register the waker wherever necessary. |
216 | ffi::g_source_set_ready_time(waker_source, 0); |
217 | |
218 | // Unref the waker source, a strong reference to it is stored inside |
219 | // the task source directly and inside the task source as child source. |
220 | ffi::g_source_unref(waker_source); |
221 | |
222 | from_glib_full(source) |
223 | } |
224 | } |
225 | |
226 | fn poll(&mut self) -> Poll<()> { |
227 | let source = &self.source as *const _; |
228 | let executor: Borrowed<MainContext> = |
229 | unsafe { from_glib_borrow(ffi::g_source_get_context(mut_override(source))) }; |
230 | |
231 | assert!( |
232 | executor.is_owner(), |
233 | "Polling futures only allowed if the thread is owning the MainContext" |
234 | ); |
235 | |
236 | executor |
237 | .with_thread_default(|| { |
238 | let _enter = futures_executor::enter().unwrap(); |
239 | let mut context = Context::from_waker(&self.waker); |
240 | |
241 | // This will panic if the future was a local future and is called from |
242 | // a different thread than where it was created. |
243 | if let Some(tx) = self.return_tx.take() { |
244 | let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
245 | Pin::new(&mut self.future).poll(&mut context) |
246 | })); |
247 | match res { |
248 | Ok(Poll::Ready(res)) => { |
249 | let _ = tx.send(Ok(res)); |
250 | Poll::Ready(()) |
251 | } |
252 | Ok(Poll::Pending) => { |
253 | self.return_tx.replace(tx); |
254 | Poll::Pending |
255 | } |
256 | Err(e) => { |
257 | let _ = tx.send(Err(e)); |
258 | Poll::Ready(()) |
259 | } |
260 | } |
261 | } else { |
262 | Pin::new(&mut self.future).poll(&mut context).map(|_| ()) |
263 | } |
264 | }) |
265 | .expect("current thread is not owner of the main context" ) |
266 | } |
267 | } |
268 | |
269 | // rustdoc-stripper-ignore-next |
270 | /// A handle to a task running on a [`MainContext`]. |
271 | /// |
272 | /// Like [`std::thread::JoinHandle`] for a task rather than a thread. The return value from the |
273 | /// task can be retrieved by awaiting on this object. Dropping the handle "detaches" the task, |
274 | /// allowing it to complete but discarding the return value. |
275 | #[derive (Debug)] |
276 | pub struct JoinHandle<T> { |
277 | rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>, |
278 | source: Source, |
279 | id: Cell<Option<NonZeroU32>>, |
280 | phantom: PhantomData<oneshot::Receiver<std::thread::Result<T>>>, |
281 | } |
282 | |
283 | impl<T> JoinHandle<T> { |
284 | #[inline ] |
285 | fn new( |
286 | ctx: &MainContext, |
287 | source: Source, |
288 | rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>, |
289 | ) -> Self { |
290 | let id = source.attach(Some(ctx)); |
291 | let id = Cell::new(Some(unsafe { NonZeroU32::new_unchecked(id.as_raw()) })); |
292 | Self { |
293 | rx, |
294 | source, |
295 | id, |
296 | phantom: PhantomData, |
297 | } |
298 | } |
299 | // rustdoc-stripper-ignore-next |
300 | /// Returns the internal source ID. |
301 | /// |
302 | /// Returns `None` if the handle was aborted already. |
303 | #[inline ] |
304 | pub fn as_raw_source_id(&self) -> Option<u32> { |
305 | self.id.get().map(|i| i.get()) |
306 | } |
307 | // rustdoc-stripper-ignore-next |
308 | /// Aborts the task associated with the handle. |
309 | #[inline ] |
310 | pub fn abort(&self) { |
311 | self.source.destroy(); |
312 | self.id.replace(None); |
313 | } |
314 | // rustdoc-stripper-ignore-next |
315 | /// Returns the [`Source`] associated with this handle. |
316 | #[inline ] |
317 | pub fn source(&self) -> &Source { |
318 | &self.source |
319 | } |
320 | // rustdoc-stripper-ignore-next |
321 | /// Safely converts the handle into a [`SourceId`]. |
322 | /// |
323 | /// Can be used to discard the return value while still retaining the ability to abort the |
324 | /// underlying task. Returns `Err(self)` if the handle was aborted already. |
325 | pub fn into_source_id(self) -> Result<SourceId, Self> { |
326 | if let Some(id) = self.id.take() { |
327 | Ok(unsafe { SourceId::from_glib(id.get()) }) |
328 | } else { |
329 | Err(self) |
330 | } |
331 | } |
332 | } |
333 | |
334 | impl<T: 'static> Future for JoinHandle<T> { |
335 | type Output = Result<T, JoinError>; |
336 | #[inline ] |
337 | fn poll( |
338 | mut self: std::pin::Pin<&mut Self>, |
339 | cx: &mut std::task::Context<'_>, |
340 | ) -> std::task::Poll<Self::Output> { |
341 | std::pin::Pin::new(&mut self.rx).poll(cx).map(|r: Result, …>, …>| match r { |
342 | Err(_) => Err(JoinErrorInner::Cancelled.into()), |
343 | Ok(Err(e: Box)) => Err(JoinErrorInner::Panic(e).into()), |
344 | Ok(Ok(r: Box)) => Ok(*r.downcast().unwrap()), |
345 | }) |
346 | } |
347 | } |
348 | |
349 | impl<T: 'static> futures_core::FusedFuture for JoinHandle<T> { |
350 | #[inline ] |
351 | fn is_terminated(&self) -> bool { |
352 | self.rx.is_terminated() |
353 | } |
354 | } |
355 | |
356 | unsafe impl<T> Send for JoinHandle<T> {} |
357 | |
358 | // rustdoc-stripper-ignore-next |
359 | /// Variant of [`JoinHandle`] that is returned from [`MainContext::spawn_from_within`]. |
360 | #[derive (Debug)] |
361 | pub struct SpawnWithinJoinHandle<T> { |
362 | rx: Option<oneshot::Receiver<JoinHandle<T>>>, |
363 | join_handle: Option<JoinHandle<T>>, |
364 | } |
365 | |
366 | impl<T> SpawnWithinJoinHandle<T> { |
367 | // rustdoc-stripper-ignore-next |
368 | /// Waits until the task is spawned and returns the [`JoinHandle`]. |
369 | pub async fn into_inner(self) -> Result<JoinHandle<T>, JoinError> { |
370 | if let Some(join_handle: JoinHandle) = self.join_handle { |
371 | return Ok(join_handle); |
372 | } |
373 | |
374 | if let Some(rx: Receiver>) = self.rx { |
375 | match rx.await { |
376 | Ok(join_handle: JoinHandle) => return Ok(join_handle), |
377 | Err(_) => return Err(JoinErrorInner::Cancelled.into()), |
378 | } |
379 | } |
380 | |
381 | Err(JoinErrorInner::Cancelled.into()) |
382 | } |
383 | } |
384 | |
385 | impl<T: 'static> Future for SpawnWithinJoinHandle<T> { |
386 | type Output = Result<T, JoinError>; |
387 | #[inline ] |
388 | fn poll( |
389 | mut self: std::pin::Pin<&mut Self>, |
390 | cx: &mut std::task::Context<'_>, |
391 | ) -> std::task::Poll<Self::Output> { |
392 | if let Some(ref mut rx) = self.rx { |
393 | match std::pin::Pin::new(rx).poll(cx) { |
394 | std::task::Poll::Pending => return std::task::Poll::Pending, |
395 | std::task::Poll::Ready(Err(_)) => { |
396 | self.rx = None; |
397 | return std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into())); |
398 | } |
399 | std::task::Poll::Ready(Ok(join_handle)) => { |
400 | self.rx = None; |
401 | self.join_handle = Some(join_handle); |
402 | } |
403 | } |
404 | } |
405 | |
406 | if let Some(ref mut join_handle) = self.join_handle { |
407 | match std::pin::Pin::new(join_handle).poll(cx) { |
408 | std::task::Poll::Pending => return std::task::Poll::Pending, |
409 | std::task::Poll::Ready(Err(e)) => { |
410 | self.join_handle = None; |
411 | return std::task::Poll::Ready(Err(e)); |
412 | } |
413 | std::task::Poll::Ready(Ok(r)) => { |
414 | self.join_handle = None; |
415 | return std::task::Poll::Ready(Ok(r)); |
416 | } |
417 | } |
418 | } |
419 | |
420 | std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into())) |
421 | } |
422 | } |
423 | |
424 | impl<T: 'static> futures_core::FusedFuture for SpawnWithinJoinHandle<T> { |
425 | #[inline ] |
426 | fn is_terminated(&self) -> bool { |
427 | if let Some(ref rx: &Receiver>) = self.rx { |
428 | rx.is_terminated() |
429 | } else if let Some(ref join_handle: &JoinHandle) = self.join_handle { |
430 | join_handle.is_terminated() |
431 | } else { |
432 | true |
433 | } |
434 | } |
435 | } |
436 | |
437 | // rustdoc-stripper-ignore-next |
438 | /// Task failure from awaiting a [`JoinHandle`]. |
439 | #[derive (Debug)] |
440 | pub struct JoinError(JoinErrorInner); |
441 | |
442 | impl JoinError { |
443 | // rustdoc-stripper-ignore-next |
444 | /// Returns `true` if the handle was cancelled. |
445 | #[inline ] |
446 | pub fn is_cancelled(&self) -> bool { |
447 | matches!(self.0, JoinErrorInner::Cancelled) |
448 | } |
449 | // rustdoc-stripper-ignore-next |
450 | /// Returns `true` if the task terminated with a panic. |
451 | #[inline ] |
452 | pub fn is_panic(&self) -> bool { |
453 | matches!(self.0, JoinErrorInner::Panic(_)) |
454 | } |
455 | // rustdoc-stripper-ignore-next |
456 | /// Converts the error into a panic result. |
457 | /// |
458 | /// # Panics |
459 | /// |
460 | /// Panics if the error is not a panic error. Use [`is_panic`](Self::is_panic) to check first |
461 | /// if the error represents a panic. |
462 | #[inline ] |
463 | pub fn into_panic(self) -> Box<dyn Any + Send + 'static> { |
464 | self.try_into_panic() |
465 | .expect("`JoinError` is not a panic error" ) |
466 | } |
467 | // rustdoc-stripper-ignore-next |
468 | /// Attempts to convert the error into a panic result. |
469 | /// |
470 | /// Returns `Err(self)` if the error is not a panic result. |
471 | #[inline ] |
472 | pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, Self> { |
473 | match self.0 { |
474 | JoinErrorInner::Panic(e) => Ok(e), |
475 | e => Err(Self(e)), |
476 | } |
477 | } |
478 | } |
479 | |
480 | impl std::error::Error for JoinError {} |
481 | |
482 | impl std::fmt::Display for JoinError { |
483 | #[inline ] |
484 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
485 | self.0.fmt(f) |
486 | } |
487 | } |
488 | |
489 | #[derive (thiserror::Error, Debug)] |
490 | enum JoinErrorInner { |
491 | #[error("task cancelled" )] |
492 | Cancelled, |
493 | #[error("task panicked" )] |
494 | Panic(Box<dyn Any + Send + 'static>), |
495 | } |
496 | |
497 | impl From<JoinErrorInner> for JoinError { |
498 | #[inline ] |
499 | fn from(e: JoinErrorInner) -> Self { |
500 | Self(e) |
501 | } |
502 | } |
503 | |
504 | impl MainContext { |
505 | // rustdoc-stripper-ignore-next |
506 | /// Spawn a new infallible `Future` on the main context. |
507 | /// |
508 | /// This can be called from any thread and will execute the future from the thread |
509 | /// where main context is running, e.g. via a `MainLoop`. |
510 | pub fn spawn<R: Send + 'static, F: Future<Output = R> + Send + 'static>( |
511 | &self, |
512 | f: F, |
513 | ) -> JoinHandle<R> { |
514 | self.spawn_with_priority(crate::Priority::default(), f) |
515 | } |
516 | |
517 | // rustdoc-stripper-ignore-next |
518 | /// Spawn a new infallible `Future` on the main context. |
519 | /// |
520 | /// The given `Future` does not have to be `Send`. |
521 | /// |
522 | /// This can be called only from the thread where the main context is running, e.g. |
523 | /// from any other `Future` that is executed on this main context, or after calling |
524 | /// `with_thread_default` or `acquire` on the main context. |
525 | pub fn spawn_local<R: 'static, F: Future<Output = R> + 'static>(&self, f: F) -> JoinHandle<R> { |
526 | self.spawn_local_with_priority(crate::Priority::default(), f) |
527 | } |
528 | |
529 | // rustdoc-stripper-ignore-next |
530 | /// Spawn a new infallible `Future` on the main context, with a non-default priority. |
531 | /// |
532 | /// This can be called from any thread and will execute the future from the thread |
533 | /// where main context is running, e.g. via a `MainLoop`. |
534 | pub fn spawn_with_priority<R: Send + 'static, F: Future<Output = R> + Send + 'static>( |
535 | &self, |
536 | priority: Priority, |
537 | f: F, |
538 | ) -> JoinHandle<R> { |
539 | let f = FutureObj::new(Box::new(async move { |
540 | Box::new(f.await) as Box<dyn Any + Send + 'static> |
541 | })); |
542 | let (tx, rx) = oneshot::channel(); |
543 | let source = TaskSource::new(priority, FutureWrapper::Send(f), Some(tx)); |
544 | JoinHandle::new(self, source, rx) |
545 | } |
546 | |
547 | // rustdoc-stripper-ignore-next |
548 | /// Spawn a new infallible `Future` on the main context, with a non-default priority. |
549 | /// |
550 | /// The given `Future` does not have to be `Send`. |
551 | /// |
552 | /// This can be called only from the thread where the main context is running, e.g. |
553 | /// from any other `Future` that is executed on this main context, or after calling |
554 | /// `with_thread_default` or `acquire` on the main context. |
555 | pub fn spawn_local_with_priority<R: 'static, F: Future<Output = R> + 'static>( |
556 | &self, |
557 | priority: Priority, |
558 | f: F, |
559 | ) -> JoinHandle<R> { |
560 | let _acquire = self |
561 | .acquire() |
562 | .expect("Spawning local futures only allowed on the thread owning the MainContext" ); |
563 | let f = LocalFutureObj::new(Box::new(async move { |
564 | Box::new(f.await) as Box<dyn Any + 'static> |
565 | })); |
566 | let (tx, rx) = oneshot::channel(); |
567 | let source = TaskSource::new( |
568 | priority, |
569 | FutureWrapper::NonSend(ThreadGuard::new(f)), |
570 | Some(tx), |
571 | ); |
572 | JoinHandle::new(self, source, rx) |
573 | } |
574 | |
575 | // rustdoc-stripper-ignore-next |
576 | /// Spawn a new infallible `Future` on the main context from inside the main context. |
577 | /// |
578 | /// The given `Future` does not have to be `Send` but the closure to spawn it has to be. |
579 | /// |
580 | /// This can be called only from any thread. |
581 | pub fn spawn_from_within<R: 'static, F: Future<Output = R> + 'static>( |
582 | &self, |
583 | func: impl FnOnce() -> F + Send + 'static, |
584 | ) -> SpawnWithinJoinHandle<R> { |
585 | self.spawn_from_within_with_priority(crate::Priority::default(), func) |
586 | } |
587 | |
588 | // rustdoc-stripper-ignore-next |
589 | /// Spawn a new infallible `Future` on the main context from inside the main context. |
590 | /// |
591 | /// The given `Future` does not have to be `Send` but the closure to spawn it has to be. |
592 | /// |
593 | /// This can be called only from any thread. |
594 | pub fn spawn_from_within_with_priority<R: 'static, F: Future<Output = R> + 'static>( |
595 | &self, |
596 | priority: Priority, |
597 | func: impl FnOnce() -> F + Send + 'static, |
598 | ) -> SpawnWithinJoinHandle<R> { |
599 | let ctx = self.clone(); |
600 | let (tx, rx) = oneshot::channel(); |
601 | self.invoke_with_priority(priority, move || { |
602 | let _ = tx.send(ctx.spawn_local(func())); |
603 | }); |
604 | |
605 | SpawnWithinJoinHandle { |
606 | rx: Some(rx), |
607 | join_handle: None, |
608 | } |
609 | } |
610 | |
611 | // rustdoc-stripper-ignore-next |
612 | /// Runs a new, infallible `Future` on the main context and block until it finished, returning |
613 | /// the result of the `Future`. |
614 | /// |
615 | /// The given `Future` does not have to be `Send` or `'static`. |
616 | /// |
617 | /// This must only be called if no `MainLoop` or anything else is running on this specific main |
618 | /// context. |
619 | #[allow (clippy::transmute_ptr_to_ptr)] |
620 | pub fn block_on<F: Future>(&self, f: F) -> F::Output { |
621 | let mut res = None; |
622 | let l = MainLoop::new(Some(self), false); |
623 | |
624 | let f = async { |
625 | res = Some(panic::AssertUnwindSafe(f).catch_unwind().await); |
626 | l.quit(); |
627 | }; |
628 | |
629 | let f = unsafe { |
630 | // Super-unsafe: We transmute here to get rid of the 'static lifetime |
631 | let f = LocalFutureObj::new(Box::new(async move { |
632 | f.await; |
633 | Box::new(()) as Box<dyn Any + 'static> |
634 | })); |
635 | let f: LocalFutureObj<'static, Box<dyn Any + 'static>> = mem::transmute(f); |
636 | f |
637 | }; |
638 | |
639 | let source = TaskSource::new( |
640 | crate::Priority::default(), |
641 | FutureWrapper::NonSend(ThreadGuard::new(f)), |
642 | None, |
643 | ); |
644 | source.attach(Some(self)); |
645 | |
646 | l.run(); |
647 | |
648 | match res.unwrap() { |
649 | Ok(v) => v, |
650 | Err(e) => panic::resume_unwind(e), |
651 | } |
652 | } |
653 | } |
654 | |
655 | impl Spawn for MainContext { |
656 | fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> { |
657 | let (tx: Sender, …>>, _) = oneshot::channel(); |
658 | let source: Source = TaskSource::new( |
659 | crate::Priority::default(), |
660 | future:FutureWrapper::Send(FutureObj::new(Box::new(async move { |
661 | f.await; |
662 | Box::new(()) as Box<dyn Any + Send + 'static> |
663 | }))), |
664 | return_tx:Some(tx), |
665 | ); |
666 | source.attach(context:Some(self)); |
667 | Ok(()) |
668 | } |
669 | } |
670 | |
671 | impl LocalSpawn for MainContext { |
672 | fn spawn_local_obj(&self, f: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { |
673 | let (tx: Sender, …>>, _) = oneshot::channel(); |
674 | let source: Source = TaskSource::new( |
675 | crate::Priority::default(), |
676 | future:FutureWrapper::NonSend(ThreadGuard::new(LocalFutureObj::new(Box::new( |
677 | async move { |
678 | f.await; |
679 | Box::new(()) as Box<dyn Any + 'static> |
680 | }, |
681 | )))), |
682 | return_tx:Some(tx), |
683 | ); |
684 | source.attach(context:Some(self)); |
685 | Ok(()) |
686 | } |
687 | } |
688 | |
689 | #[cfg (test)] |
690 | mod tests { |
691 | use std::{sync::mpsc, thread}; |
692 | |
693 | use futures_channel::oneshot; |
694 | use futures_util::future::{FutureExt, TryFutureExt}; |
695 | |
696 | use super::*; |
697 | |
698 | #[test ] |
699 | fn test_spawn() { |
700 | let c = MainContext::new(); |
701 | let l = crate::MainLoop::new(Some(&c), false); |
702 | |
703 | let (sender, receiver) = mpsc::channel(); |
704 | let (o_sender, o_receiver) = oneshot::channel(); |
705 | |
706 | let l_clone = l.clone(); |
707 | c.spawn( |
708 | o_receiver |
709 | .and_then(move |()| { |
710 | sender.send(()).unwrap(); |
711 | l_clone.quit(); |
712 | |
713 | futures_util::future::ok(()) |
714 | }) |
715 | .then(|res| { |
716 | assert!(res.is_ok()); |
717 | futures_util::future::ready(()) |
718 | }), |
719 | ); |
720 | |
721 | thread::spawn(move || { |
722 | l.run(); |
723 | }); |
724 | |
725 | o_sender.send(()).unwrap(); |
726 | |
727 | receiver.recv().unwrap(); |
728 | } |
729 | |
730 | #[test ] |
731 | fn test_spawn_local() { |
732 | let c = MainContext::new(); |
733 | let l = crate::MainLoop::new(Some(&c), false); |
734 | |
735 | c.with_thread_default(|| { |
736 | let l_clone = l.clone(); |
737 | c.spawn_local(futures_util::future::lazy(move |_ctx| { |
738 | l_clone.quit(); |
739 | })); |
740 | |
741 | l.run(); |
742 | }) |
743 | .unwrap(); |
744 | } |
745 | |
746 | #[test ] |
747 | fn test_spawn_from_within() { |
748 | let c = MainContext::new(); |
749 | let l = crate::MainLoop::new(Some(&c), false); |
750 | |
751 | std::thread::spawn({ |
752 | let l_clone = l.clone(); |
753 | move || { |
754 | c.spawn_from_within(move || async move { |
755 | l_clone.quit(); |
756 | }); |
757 | } |
758 | }); |
759 | |
760 | l.run(); |
761 | } |
762 | |
763 | #[test ] |
764 | fn test_block_on() { |
765 | let c = MainContext::new(); |
766 | |
767 | let mut v = None; |
768 | { |
769 | let v = &mut v; |
770 | |
771 | let future = futures_util::future::lazy(|_ctx| { |
772 | *v = Some(123); |
773 | Ok::<i32, ()>(123) |
774 | }); |
775 | |
776 | let res = c.block_on(future); |
777 | assert_eq!(res, Ok(123)); |
778 | } |
779 | |
780 | assert_eq!(v, Some(123)); |
781 | } |
782 | |
783 | #[test ] |
784 | fn test_spawn_return() { |
785 | let c = MainContext::new(); |
786 | c.block_on(async { |
787 | let val = 1; |
788 | let ret = c |
789 | .spawn(async move { futures_util::future::ready(2).await + val }) |
790 | .await; |
791 | assert_eq!(ret.unwrap(), 3); |
792 | }); |
793 | } |
794 | |
795 | #[test ] |
796 | fn test_spawn_panic() { |
797 | let c = MainContext::new(); |
798 | c.block_on(async { |
799 | let ret = c |
800 | .spawn(async { |
801 | panic!("failed" ); |
802 | }) |
803 | .await; |
804 | assert_eq!( |
805 | *ret.unwrap_err().into_panic().downcast::<&str>().unwrap(), |
806 | "failed" |
807 | ); |
808 | }); |
809 | } |
810 | } |
811 | |