1 | // Take a look at the license at the top of the repository in the LICENSE file. |
2 | |
3 | use std::{ |
4 | any::Any, cell::Cell, fmt, marker::PhantomData, mem, num::NonZeroU32, panic, pin::Pin, ptr, |
5 | thread, |
6 | }; |
7 | |
8 | use futures_channel::oneshot; |
9 | use futures_core::{ |
10 | future::Future, |
11 | task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, |
12 | }; |
13 | use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; |
14 | use futures_util::FutureExt; |
15 | |
16 | use crate::{ |
17 | ffi, thread_guard::ThreadGuard, translate::*, MainContext, MainLoop, Priority, Source, SourceId, |
18 | }; |
19 | |
20 | // Wrapper around Send Futures and non-Send Futures that will panic |
21 | // if the non-Send Future is polled/dropped from a different thread |
22 | // than where this was created. |
23 | enum FutureWrapper { |
24 | Send(FutureObj<'static, Box<dyn Any + Send + 'static>>), |
25 | NonSend(ThreadGuard<LocalFutureObj<'static, Box<dyn Any + 'static>>>), |
26 | } |
27 | |
28 | unsafe impl Send for FutureWrapper {} |
29 | unsafe impl Sync for FutureWrapper {} |
30 | |
31 | impl Future for FutureWrapper { |
32 | type Output = Box<dyn Any + 'static>; |
33 | |
34 | fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> { |
35 | match self.get_mut() { |
36 | FutureWrapper::Send(fut: &mut FutureObj<'static, Box>) => { |
37 | Pin::new(fut).poll(cx:ctx).map(|b: Box| b as Box<dyn Any + 'static>) |
38 | } |
39 | FutureWrapper::NonSend(fut: &mut ThreadGuard>) => Pin::new(fut.get_mut()).poll(cx:ctx), |
40 | } |
41 | } |
42 | } |
43 | |
44 | // The TaskSource and WakerSource are split up as the TaskSource |
45 | // must only be finalized on the thread that owns the main context, |
46 | // but the WakerSource is passed around to arbitrary threads for |
47 | // being able to wake up the TaskSource. |
48 | // |
49 | // The WakerSource is set up as a child source of the TaskSource, i.e. |
50 | // whenever it is ready also the TaskSource is ready. |
51 | #[repr (C)] |
52 | struct TaskSource { |
53 | source: ffi::GSource, |
54 | future: FutureWrapper, |
55 | waker: Waker, |
56 | return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>, |
57 | } |
58 | |
59 | #[repr (C)] |
60 | struct WakerSource { |
61 | source: ffi::GSource, |
62 | } |
63 | |
64 | impl TaskSource { |
65 | unsafe extern "C" fn dispatch( |
66 | source: *mut ffi::GSource, |
67 | callback: ffi::GSourceFunc, |
68 | _user_data: ffi::gpointer, |
69 | ) -> ffi::gboolean { |
70 | let source = &mut *(source as *mut Self); |
71 | debug_assert!(callback.is_none()); |
72 | |
73 | // Poll the TaskSource and ensure we're never called again if the |
74 | // contained Future resolved now. |
75 | if let Poll::Ready(()) = source.poll() { |
76 | ffi::G_SOURCE_REMOVE |
77 | } else { |
78 | ffi::G_SOURCE_CONTINUE |
79 | } |
80 | } |
81 | |
82 | unsafe extern "C" fn finalize(source: *mut ffi::GSource) { |
83 | let source = source as *mut Self; |
84 | |
85 | // This will panic if the future was a local future and is dropped from a different thread |
86 | // than where it was created so try to drop it from the main context if we're on another |
87 | // thread and the main context still exists. |
88 | // |
89 | // This can only really happen if the `Source` was manually retrieve from the context, but |
90 | // better safe than sorry. |
91 | match (*source).future { |
92 | FutureWrapper::Send(_) => { |
93 | ptr::drop_in_place(&mut (*source).future); |
94 | } |
95 | FutureWrapper::NonSend(ref mut future) if future.is_owner() => { |
96 | ptr::drop_in_place(&mut (*source).future); |
97 | } |
98 | FutureWrapper::NonSend(ref mut future) => { |
99 | let context = ffi::g_source_get_context(source as *mut ffi::GSource); |
100 | if !context.is_null() { |
101 | let future = ptr::read(future); |
102 | let context = MainContext::from_glib_none(context); |
103 | context.invoke(move || { |
104 | drop(future); |
105 | }); |
106 | } else { |
107 | // This will panic |
108 | ptr::drop_in_place(&mut (*source).future); |
109 | } |
110 | } |
111 | } |
112 | |
113 | ptr::drop_in_place(&mut (*source).return_tx); |
114 | |
115 | // Drop the waker to unref the underlying GSource |
116 | ptr::drop_in_place(&mut (*source).waker); |
117 | } |
118 | } |
119 | |
120 | impl WakerSource { |
121 | unsafe fn clone_raw(waker: *const ()) -> RawWaker { |
122 | static VTABLE: RawWakerVTable = RawWakerVTable::new( |
123 | WakerSource::clone_raw, |
124 | WakerSource::wake_raw, |
125 | WakerSource::wake_by_ref_raw, |
126 | WakerSource::drop_raw, |
127 | ); |
128 | |
129 | let waker = waker as *const ffi::GSource; |
130 | ffi::g_source_ref(mut_override(waker)); |
131 | RawWaker::new(waker as *const (), &VTABLE) |
132 | } |
133 | |
134 | unsafe fn wake_raw(waker: *const ()) { |
135 | Self::wake_by_ref_raw(waker); |
136 | Self::drop_raw(waker); |
137 | } |
138 | |
139 | unsafe fn wake_by_ref_raw(waker: *const ()) { |
140 | let waker = waker as *const ffi::GSource; |
141 | ffi::g_source_set_ready_time(mut_override(waker), 0); |
142 | } |
143 | |
144 | unsafe fn drop_raw(waker: *const ()) { |
145 | let waker = waker as *const ffi::GSource; |
146 | ffi::g_source_unref(mut_override(waker)); |
147 | } |
148 | |
149 | unsafe extern "C" fn dispatch( |
150 | source: *mut ffi::GSource, |
151 | _callback: ffi::GSourceFunc, |
152 | _user_data: ffi::gpointer, |
153 | ) -> ffi::gboolean { |
154 | // Set ready-time to -1 so that we're not called again before |
155 | // being woken up another time. |
156 | ffi::g_source_set_ready_time(mut_override(source), -1); |
157 | ffi::G_SOURCE_CONTINUE |
158 | } |
159 | } |
160 | |
161 | unsafe impl Send for TaskSource {} |
162 | unsafe impl Sync for TaskSource {} |
163 | |
164 | unsafe impl Send for WakerSource {} |
165 | unsafe impl Sync for WakerSource {} |
166 | |
167 | impl TaskSource { |
168 | #[allow (clippy::new_ret_no_self)] |
169 | // checker-ignore-item |
170 | fn new( |
171 | priority: Priority, |
172 | future: FutureWrapper, |
173 | return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>, |
174 | ) -> Source { |
175 | unsafe { |
176 | static TASK_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs { |
177 | check: None, |
178 | prepare: None, |
179 | dispatch: Some(TaskSource::dispatch), |
180 | finalize: Some(TaskSource::finalize), |
181 | closure_callback: None, |
182 | closure_marshal: None, |
183 | }; |
184 | static WAKER_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs { |
185 | check: None, |
186 | prepare: None, |
187 | dispatch: Some(WakerSource::dispatch), |
188 | finalize: None, |
189 | closure_callback: None, |
190 | closure_marshal: None, |
191 | }; |
192 | |
193 | let source = ffi::g_source_new( |
194 | mut_override(&TASK_SOURCE_FUNCS), |
195 | mem::size_of::<Self>() as u32, |
196 | ); |
197 | |
198 | let waker_source = ffi::g_source_new( |
199 | mut_override(&WAKER_SOURCE_FUNCS), |
200 | mem::size_of::<WakerSource>() as u32, |
201 | ); |
202 | |
203 | ffi::g_source_set_priority(source, priority.into_glib()); |
204 | ffi::g_source_add_child_source(source, waker_source); |
205 | |
206 | { |
207 | let source = &mut *(source as *mut Self); |
208 | ptr::write(&mut source.future, future); |
209 | ptr::write(&mut source.return_tx, return_tx); |
210 | |
211 | // This creates a new reference to the waker source. |
212 | let waker = Waker::from_raw(WakerSource::clone_raw(waker_source as *const ())); |
213 | ptr::write(&mut source.waker, waker); |
214 | } |
215 | |
216 | // Set ready time to 0 so that the source is immediately dispatched |
217 | // for doing the initial polling. This will then either resolve the |
218 | // future or register the waker wherever necessary. |
219 | ffi::g_source_set_ready_time(waker_source, 0); |
220 | |
221 | // Unref the waker source, a strong reference to it is stored inside |
222 | // the task source directly and inside the task source as child source. |
223 | ffi::g_source_unref(waker_source); |
224 | |
225 | from_glib_full(source) |
226 | } |
227 | } |
228 | |
229 | fn poll(&mut self) -> Poll<()> { |
230 | let source = &self.source as *const _; |
231 | let executor: Borrowed<MainContext> = |
232 | unsafe { from_glib_borrow(ffi::g_source_get_context(mut_override(source))) }; |
233 | |
234 | assert!( |
235 | executor.is_owner(), |
236 | "Polling futures only allowed if the thread is owning the MainContext" |
237 | ); |
238 | |
239 | executor |
240 | .with_thread_default(|| { |
241 | let _enter = futures_executor::enter().unwrap(); |
242 | let mut context = Context::from_waker(&self.waker); |
243 | |
244 | // This will panic if the future was a local future and is called from |
245 | // a different thread than where it was created. |
246 | if let Some(tx) = self.return_tx.take() { |
247 | let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
248 | Pin::new(&mut self.future).poll(&mut context) |
249 | })); |
250 | match res { |
251 | Ok(Poll::Ready(res)) => { |
252 | let _ = tx.send(Ok(res)); |
253 | Poll::Ready(()) |
254 | } |
255 | Ok(Poll::Pending) => { |
256 | self.return_tx.replace(tx); |
257 | Poll::Pending |
258 | } |
259 | Err(e) => { |
260 | let _ = tx.send(Err(e)); |
261 | Poll::Ready(()) |
262 | } |
263 | } |
264 | } else { |
265 | Pin::new(&mut self.future).poll(&mut context).map(|_| ()) |
266 | } |
267 | }) |
268 | .expect("current thread is not owner of the main context" ) |
269 | } |
270 | } |
271 | |
272 | // rustdoc-stripper-ignore-next |
273 | /// A handle to a task running on a [`MainContext`]. |
274 | /// |
275 | /// Like [`std::thread::JoinHandle`] for a task rather than a thread. The return value from the |
276 | /// task can be retrieved by awaiting on this object. Dropping the handle "detaches" the task, |
277 | /// allowing it to complete but discarding the return value. |
278 | #[derive (Debug)] |
279 | pub struct JoinHandle<T> { |
280 | rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>, |
281 | source: Source, |
282 | id: Cell<Option<NonZeroU32>>, |
283 | phantom: PhantomData<oneshot::Receiver<std::thread::Result<T>>>, |
284 | } |
285 | |
286 | impl<T> JoinHandle<T> { |
287 | #[inline ] |
288 | fn new( |
289 | ctx: &MainContext, |
290 | source: Source, |
291 | rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>, |
292 | ) -> Self { |
293 | let id = source.attach(Some(ctx)); |
294 | let id = Cell::new(Some(unsafe { NonZeroU32::new_unchecked(id.as_raw()) })); |
295 | Self { |
296 | rx, |
297 | source, |
298 | id, |
299 | phantom: PhantomData, |
300 | } |
301 | } |
302 | // rustdoc-stripper-ignore-next |
303 | /// Returns the internal source ID. |
304 | /// |
305 | /// Returns `None` if the handle was aborted already. |
306 | #[inline ] |
307 | pub fn as_raw_source_id(&self) -> Option<u32> { |
308 | self.id.get().map(|i| i.get()) |
309 | } |
310 | // rustdoc-stripper-ignore-next |
311 | /// Aborts the task associated with the handle. |
312 | #[inline ] |
313 | pub fn abort(&self) { |
314 | self.source.destroy(); |
315 | self.id.replace(None); |
316 | } |
317 | // rustdoc-stripper-ignore-next |
318 | /// Returns the [`Source`] associated with this handle. |
319 | #[inline ] |
320 | pub fn source(&self) -> &Source { |
321 | &self.source |
322 | } |
323 | // rustdoc-stripper-ignore-next |
324 | /// Safely converts the handle into a [`SourceId`]. |
325 | /// |
326 | /// Can be used to discard the return value while still retaining the ability to abort the |
327 | /// underlying task. Returns `Err(self)` if the handle was aborted already. |
328 | pub fn into_source_id(self) -> Result<SourceId, Self> { |
329 | if let Some(id) = self.id.take() { |
330 | Ok(unsafe { SourceId::from_glib(id.get()) }) |
331 | } else { |
332 | Err(self) |
333 | } |
334 | } |
335 | } |
336 | |
337 | impl<T: 'static> Future for JoinHandle<T> { |
338 | type Output = Result<T, JoinError>; |
339 | #[inline ] |
340 | fn poll( |
341 | mut self: std::pin::Pin<&mut Self>, |
342 | cx: &mut std::task::Context<'_>, |
343 | ) -> std::task::Poll<Self::Output> { |
344 | std::pin::Pin::new(&mut self.rx).poll(cx).map(|r: Result, …>, …>| match r { |
345 | Err(_) => Err(JoinErrorInner::Cancelled.into()), |
346 | Ok(Err(e: Box)) => Err(JoinErrorInner::Panic(e).into()), |
347 | Ok(Ok(r: Box)) => Ok(*r.downcast().unwrap()), |
348 | }) |
349 | } |
350 | } |
351 | |
352 | impl<T: 'static> futures_core::FusedFuture for JoinHandle<T> { |
353 | #[inline ] |
354 | fn is_terminated(&self) -> bool { |
355 | self.rx.is_terminated() |
356 | } |
357 | } |
358 | |
359 | // Safety: We can't rely on the auto implementation because we are retrieving |
360 | // the result as a `Box<dyn Any + 'static>` from the [`Source`]. We need to |
361 | // rely on type erasure here, so we have to manually assert the Send bound too. |
362 | unsafe impl<T: Send> Send for JoinHandle<T> {} |
363 | |
364 | // rustdoc-stripper-ignore-next |
365 | /// Variant of [`JoinHandle`] that is returned from [`MainContext::spawn_from_within`]. |
366 | #[derive (Debug)] |
367 | pub struct SpawnWithinJoinHandle<T> { |
368 | rx: Option<oneshot::Receiver<JoinHandle<T>>>, |
369 | join_handle: Option<JoinHandle<T>>, |
370 | } |
371 | |
372 | impl<T> SpawnWithinJoinHandle<T> { |
373 | // rustdoc-stripper-ignore-next |
374 | /// Waits until the task is spawned and returns the [`JoinHandle`]. |
375 | pub async fn into_inner(self) -> Result<JoinHandle<T>, JoinError> { |
376 | if let Some(join_handle: JoinHandle) = self.join_handle { |
377 | return Ok(join_handle); |
378 | } |
379 | |
380 | if let Some(rx: Receiver>) = self.rx { |
381 | match rx.await { |
382 | Ok(join_handle: JoinHandle) => return Ok(join_handle), |
383 | Err(_) => return Err(JoinErrorInner::Cancelled.into()), |
384 | } |
385 | } |
386 | |
387 | Err(JoinErrorInner::Cancelled.into()) |
388 | } |
389 | } |
390 | |
391 | impl<T: 'static> Future for SpawnWithinJoinHandle<T> { |
392 | type Output = Result<T, JoinError>; |
393 | #[inline ] |
394 | fn poll( |
395 | mut self: std::pin::Pin<&mut Self>, |
396 | cx: &mut std::task::Context<'_>, |
397 | ) -> std::task::Poll<Self::Output> { |
398 | if let Some(ref mut rx) = self.rx { |
399 | match std::pin::Pin::new(rx).poll(cx) { |
400 | std::task::Poll::Pending => return std::task::Poll::Pending, |
401 | std::task::Poll::Ready(Err(_)) => { |
402 | self.rx = None; |
403 | return std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into())); |
404 | } |
405 | std::task::Poll::Ready(Ok(join_handle)) => { |
406 | self.rx = None; |
407 | self.join_handle = Some(join_handle); |
408 | } |
409 | } |
410 | } |
411 | |
412 | if let Some(ref mut join_handle) = self.join_handle { |
413 | match std::pin::Pin::new(join_handle).poll(cx) { |
414 | std::task::Poll::Pending => return std::task::Poll::Pending, |
415 | std::task::Poll::Ready(Err(e)) => { |
416 | self.join_handle = None; |
417 | return std::task::Poll::Ready(Err(e)); |
418 | } |
419 | std::task::Poll::Ready(Ok(r)) => { |
420 | self.join_handle = None; |
421 | return std::task::Poll::Ready(Ok(r)); |
422 | } |
423 | } |
424 | } |
425 | |
426 | std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into())) |
427 | } |
428 | } |
429 | |
430 | impl<T: 'static> futures_core::FusedFuture for SpawnWithinJoinHandle<T> { |
431 | #[inline ] |
432 | fn is_terminated(&self) -> bool { |
433 | if let Some(ref rx: &Receiver>) = self.rx { |
434 | rx.is_terminated() |
435 | } else if let Some(ref join_handle: &JoinHandle) = self.join_handle { |
436 | join_handle.is_terminated() |
437 | } else { |
438 | true |
439 | } |
440 | } |
441 | } |
442 | |
443 | // rustdoc-stripper-ignore-next |
444 | /// Task failure from awaiting a [`JoinHandle`]. |
445 | #[derive (Debug)] |
446 | pub struct JoinError(JoinErrorInner); |
447 | |
448 | impl JoinError { |
449 | // rustdoc-stripper-ignore-next |
450 | /// Returns `true` if the handle was cancelled. |
451 | #[inline ] |
452 | pub fn is_cancelled(&self) -> bool { |
453 | matches!(self.0, JoinErrorInner::Cancelled) |
454 | } |
455 | // rustdoc-stripper-ignore-next |
456 | /// Returns `true` if the task terminated with a panic. |
457 | #[inline ] |
458 | pub fn is_panic(&self) -> bool { |
459 | matches!(self.0, JoinErrorInner::Panic(_)) |
460 | } |
461 | // rustdoc-stripper-ignore-next |
462 | /// Converts the error into a panic result. |
463 | /// |
464 | /// # Panics |
465 | /// |
466 | /// Panics if the error is not a panic error. Use [`is_panic`](Self::is_panic) to check first |
467 | /// if the error represents a panic. |
468 | #[inline ] |
469 | pub fn into_panic(self) -> Box<dyn Any + Send + 'static> { |
470 | self.try_into_panic() |
471 | .expect("`JoinError` is not a panic error" ) |
472 | } |
473 | // rustdoc-stripper-ignore-next |
474 | /// Attempts to convert the error into a panic result. |
475 | /// |
476 | /// Returns `Err(self)` if the error is not a panic result. |
477 | #[inline ] |
478 | pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, Self> { |
479 | match self.0 { |
480 | JoinErrorInner::Panic(e) => Ok(e), |
481 | e => Err(Self(e)), |
482 | } |
483 | } |
484 | } |
485 | |
486 | impl std::error::Error for JoinError {} |
487 | |
488 | impl std::fmt::Display for JoinError { |
489 | #[inline ] |
490 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
491 | self.0.fmt(f) |
492 | } |
493 | } |
494 | |
495 | #[derive (Debug)] |
496 | enum JoinErrorInner { |
497 | Cancelled, |
498 | Panic(Box<dyn Any + Send + 'static>), |
499 | } |
500 | |
501 | impl From<JoinErrorInner> for JoinError { |
502 | #[inline ] |
503 | fn from(e: JoinErrorInner) -> Self { |
504 | Self(e) |
505 | } |
506 | } |
507 | |
508 | impl std::error::Error for JoinErrorInner {} |
509 | |
510 | impl fmt::Display for JoinErrorInner { |
511 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
512 | match self { |
513 | Self::Cancelled => fmt.write_str(data:"task cancelled" ), |
514 | Self::Panic(_) => fmt.write_str(data:"task panicked" ), |
515 | } |
516 | } |
517 | } |
518 | |
519 | impl MainContext { |
520 | // rustdoc-stripper-ignore-next |
521 | /// Spawn a new infallible `Future` on the main context. |
522 | /// |
523 | /// This can be called from any thread and will execute the future from the thread |
524 | /// where main context is running, e.g. via a `MainLoop`. |
525 | pub fn spawn<R: Send + 'static, F: Future<Output = R> + Send + 'static>( |
526 | &self, |
527 | f: F, |
528 | ) -> JoinHandle<R> { |
529 | self.spawn_with_priority(crate::Priority::default(), f) |
530 | } |
531 | |
532 | // rustdoc-stripper-ignore-next |
533 | /// Spawn a new infallible `Future` on the main context. |
534 | /// |
535 | /// The given `Future` does not have to be `Send`. |
536 | /// |
537 | /// This can be called only from the thread where the main context is running, e.g. |
538 | /// from any other `Future` that is executed on this main context, or after calling |
539 | /// `with_thread_default` or `acquire` on the main context. |
540 | pub fn spawn_local<R: 'static, F: Future<Output = R> + 'static>(&self, f: F) -> JoinHandle<R> { |
541 | self.spawn_local_with_priority(crate::Priority::default(), f) |
542 | } |
543 | |
544 | // rustdoc-stripper-ignore-next |
545 | /// Spawn a new infallible `Future` on the main context, with a non-default priority. |
546 | /// |
547 | /// This can be called from any thread and will execute the future from the thread |
548 | /// where main context is running, e.g. via a `MainLoop`. |
549 | pub fn spawn_with_priority<R: Send + 'static, F: Future<Output = R> + Send + 'static>( |
550 | &self, |
551 | priority: Priority, |
552 | f: F, |
553 | ) -> JoinHandle<R> { |
554 | let f = FutureObj::new(Box::new(async move { |
555 | Box::new(f.await) as Box<dyn Any + Send + 'static> |
556 | })); |
557 | let (tx, rx) = oneshot::channel(); |
558 | let source = TaskSource::new(priority, FutureWrapper::Send(f), Some(tx)); |
559 | JoinHandle::new(self, source, rx) |
560 | } |
561 | |
562 | // rustdoc-stripper-ignore-next |
563 | /// Spawn a new infallible `Future` on the main context, with a non-default priority. |
564 | /// |
565 | /// The given `Future` does not have to be `Send`. |
566 | /// |
567 | /// This can be called only from the thread where the main context is running, e.g. |
568 | /// from any other `Future` that is executed on this main context, or after calling |
569 | /// `with_thread_default` or `acquire` on the main context. |
570 | pub fn spawn_local_with_priority<R: 'static, F: Future<Output = R> + 'static>( |
571 | &self, |
572 | priority: Priority, |
573 | f: F, |
574 | ) -> JoinHandle<R> { |
575 | let _acquire = self |
576 | .acquire() |
577 | .expect("Spawning local futures only allowed on the thread owning the MainContext" ); |
578 | let f = LocalFutureObj::new(Box::new(async move { |
579 | Box::new(f.await) as Box<dyn Any + 'static> |
580 | })); |
581 | let (tx, rx) = oneshot::channel(); |
582 | let source = TaskSource::new( |
583 | priority, |
584 | FutureWrapper::NonSend(ThreadGuard::new(f)), |
585 | Some(tx), |
586 | ); |
587 | JoinHandle::new(self, source, rx) |
588 | } |
589 | |
590 | // rustdoc-stripper-ignore-next |
591 | /// Spawn a new infallible `Future` on the main context from inside the main context. |
592 | /// |
593 | /// The given `Future` does not have to be `Send` but the closure to spawn it has to be. |
594 | /// |
595 | /// This can be called only from any thread. |
596 | pub fn spawn_from_within<R: Send + 'static, F: Future<Output = R> + 'static>( |
597 | &self, |
598 | func: impl FnOnce() -> F + Send + 'static, |
599 | ) -> SpawnWithinJoinHandle<R> { |
600 | self.spawn_from_within_with_priority(crate::Priority::default(), func) |
601 | } |
602 | |
603 | // rustdoc-stripper-ignore-next |
604 | /// Spawn a new infallible `Future` on the main context from inside the main context. |
605 | /// |
606 | /// The given `Future` does not have to be `Send` but the closure to spawn it has to be. |
607 | /// |
608 | /// This can be called only from any thread. |
609 | pub fn spawn_from_within_with_priority<R: Send + 'static, F: Future<Output = R> + 'static>( |
610 | &self, |
611 | priority: Priority, |
612 | func: impl FnOnce() -> F + Send + 'static, |
613 | ) -> SpawnWithinJoinHandle<R> { |
614 | let ctx = self.clone(); |
615 | let (tx, rx) = oneshot::channel(); |
616 | self.invoke_with_priority(priority, move || { |
617 | let _ = tx.send(ctx.spawn_local(func())); |
618 | }); |
619 | |
620 | SpawnWithinJoinHandle { |
621 | rx: Some(rx), |
622 | join_handle: None, |
623 | } |
624 | } |
625 | |
626 | // rustdoc-stripper-ignore-next |
627 | /// Runs a new, infallible `Future` on the main context and block until it finished, returning |
628 | /// the result of the `Future`. |
629 | /// |
630 | /// The given `Future` does not have to be `Send` or `'static`. |
631 | /// |
632 | /// This must only be called if no `MainLoop` or anything else is running on this specific main |
633 | /// context. |
634 | #[allow (clippy::transmute_ptr_to_ptr)] |
635 | pub fn block_on<F: Future>(&self, f: F) -> F::Output { |
636 | let mut res = None; |
637 | let l = MainLoop::new(Some(self), false); |
638 | |
639 | let f = async { |
640 | res = Some(panic::AssertUnwindSafe(f).catch_unwind().await); |
641 | l.quit(); |
642 | }; |
643 | |
644 | let f = unsafe { |
645 | // Super-unsafe: We transmute here to get rid of the 'static lifetime |
646 | let f = LocalFutureObj::new(Box::new(async move { |
647 | f.await; |
648 | Box::new(()) as Box<dyn Any + 'static> |
649 | })); |
650 | let f: LocalFutureObj<'static, Box<dyn Any + 'static>> = mem::transmute(f); |
651 | f |
652 | }; |
653 | |
654 | let source = TaskSource::new( |
655 | crate::Priority::default(), |
656 | FutureWrapper::NonSend(ThreadGuard::new(f)), |
657 | None, |
658 | ); |
659 | source.attach(Some(self)); |
660 | |
661 | l.run(); |
662 | |
663 | match res.unwrap() { |
664 | Ok(v) => v, |
665 | Err(e) => panic::resume_unwind(e), |
666 | } |
667 | } |
668 | } |
669 | |
670 | impl Spawn for MainContext { |
671 | fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> { |
672 | let (tx: Sender, …>>, _) = oneshot::channel(); |
673 | let source: Source = TaskSource::new( |
674 | crate::Priority::default(), |
675 | future:FutureWrapper::Send(FutureObj::new(Box::new(async move { |
676 | f.await; |
677 | Box::new(()) as Box<dyn Any + Send + 'static> |
678 | }))), |
679 | return_tx:Some(tx), |
680 | ); |
681 | source.attach(context:Some(self)); |
682 | Ok(()) |
683 | } |
684 | } |
685 | |
686 | impl LocalSpawn for MainContext { |
687 | fn spawn_local_obj(&self, f: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { |
688 | let (tx: Sender, …>>, _) = oneshot::channel(); |
689 | let source: Source = TaskSource::new( |
690 | crate::Priority::default(), |
691 | future:FutureWrapper::NonSend(ThreadGuard::new(LocalFutureObj::new(Box::new( |
692 | async move { |
693 | f.await; |
694 | Box::new(()) as Box<dyn Any + 'static> |
695 | }, |
696 | )))), |
697 | return_tx:Some(tx), |
698 | ); |
699 | source.attach(context:Some(self)); |
700 | Ok(()) |
701 | } |
702 | } |
703 | |
704 | #[cfg (test)] |
705 | mod tests { |
706 | use std::{sync::mpsc, thread}; |
707 | |
708 | use futures_channel::oneshot; |
709 | use futures_util::future::{FutureExt, TryFutureExt}; |
710 | |
711 | use super::*; |
712 | |
713 | #[test ] |
714 | fn test_spawn() { |
715 | let c = MainContext::new(); |
716 | let l = crate::MainLoop::new(Some(&c), false); |
717 | |
718 | let (sender, receiver) = mpsc::channel(); |
719 | let (o_sender, o_receiver) = oneshot::channel(); |
720 | |
721 | let l_clone = l.clone(); |
722 | c.spawn( |
723 | o_receiver |
724 | .and_then(move |()| { |
725 | sender.send(()).unwrap(); |
726 | l_clone.quit(); |
727 | |
728 | futures_util::future::ok(()) |
729 | }) |
730 | .then(|res| { |
731 | assert!(res.is_ok()); |
732 | futures_util::future::ready(()) |
733 | }), |
734 | ); |
735 | |
736 | let join_handle = thread::spawn(move || { |
737 | l.run(); |
738 | }); |
739 | |
740 | o_sender.send(()).unwrap(); |
741 | |
742 | receiver.recv().unwrap(); |
743 | |
744 | join_handle.join().unwrap(); |
745 | } |
746 | |
747 | #[test ] |
748 | fn test_spawn_local() { |
749 | let c = MainContext::new(); |
750 | let l = crate::MainLoop::new(Some(&c), false); |
751 | |
752 | c.with_thread_default(|| { |
753 | let l_clone = l.clone(); |
754 | c.spawn_local(futures_util::future::lazy(move |_ctx| { |
755 | l_clone.quit(); |
756 | })); |
757 | |
758 | l.run(); |
759 | }) |
760 | .unwrap(); |
761 | } |
762 | |
763 | #[test ] |
764 | fn test_spawn_from_within() { |
765 | let c = MainContext::new(); |
766 | let l = crate::MainLoop::new(Some(&c), false); |
767 | |
768 | let join_handle = std::thread::spawn({ |
769 | let l_clone = l.clone(); |
770 | move || { |
771 | c.spawn_from_within(move || async move { |
772 | let rc = std::rc::Rc::new(123); |
773 | futures_util::future::ready(()).await; |
774 | assert_eq!(std::rc::Rc::strong_count(&rc), 1); |
775 | l_clone.quit(); |
776 | }); |
777 | } |
778 | }); |
779 | |
780 | l.run(); |
781 | |
782 | join_handle.join().unwrap(); |
783 | } |
784 | |
785 | #[test ] |
786 | fn test_block_on() { |
787 | let c = MainContext::new(); |
788 | |
789 | let mut v = None; |
790 | { |
791 | let v = &mut v; |
792 | |
793 | let future = futures_util::future::lazy(|_ctx| { |
794 | *v = Some(123); |
795 | Ok::<i32, ()>(123) |
796 | }); |
797 | |
798 | let res = c.block_on(future); |
799 | assert_eq!(res, Ok(123)); |
800 | } |
801 | |
802 | assert_eq!(v, Some(123)); |
803 | } |
804 | |
805 | #[test ] |
806 | fn test_spawn_return() { |
807 | let c = MainContext::new(); |
808 | c.block_on(async { |
809 | let val = 1; |
810 | let ret = c |
811 | .spawn(async move { futures_util::future::ready(2).await + val }) |
812 | .await; |
813 | assert_eq!(ret.unwrap(), 3); |
814 | }); |
815 | } |
816 | |
817 | #[test ] |
818 | fn test_spawn_panic() { |
819 | let c = MainContext::new(); |
820 | c.block_on(async { |
821 | let ret = c |
822 | .spawn(async { |
823 | panic!("failed" ); |
824 | }) |
825 | .await; |
826 | assert_eq!( |
827 | *ret.unwrap_err().into_panic().downcast::<&str>().unwrap(), |
828 | "failed" |
829 | ); |
830 | }); |
831 | } |
832 | |
833 | #[test ] |
834 | fn test_spawn_abort() { |
835 | let c = MainContext::new(); |
836 | let v = std::sync::Arc::new(1); |
837 | let v_clone = v.clone(); |
838 | let c_ref = &c; |
839 | c.block_on(async move { |
840 | let handle = c_ref.spawn(async move { |
841 | let _v = v_clone; |
842 | let test: u128 = std::future::pending().await; |
843 | println!("{test}" ); |
844 | unreachable!(); |
845 | }); |
846 | |
847 | handle.abort(); |
848 | }); |
849 | drop(c); |
850 | |
851 | // Make sure the inner future is actually freed. |
852 | assert_eq!(std::sync::Arc::strong_count(&v), 1); |
853 | } |
854 | } |
855 | |