1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4 any::Any, cell::Cell, marker::PhantomData, mem, num::NonZeroU32, panic, pin::Pin, ptr, thread,
5};
6
7use futures_channel::oneshot;
8use futures_core::{
9 future::Future,
10 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
11};
12use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
13use futures_util::FutureExt;
14
15use crate::{
16 thread_guard::ThreadGuard, translate::*, MainContext, MainLoop, Priority, Source, SourceId,
17};
18
19// Wrapper around Send Futures and non-Send Futures that will panic
20// if the non-Send Future is polled/dropped from a different thread
21// than where this was created.
22enum FutureWrapper {
23 Send(FutureObj<'static, Box<dyn Any + Send + 'static>>),
24 NonSend(ThreadGuard<LocalFutureObj<'static, Box<dyn Any + 'static>>>),
25}
26
27unsafe impl Send for FutureWrapper {}
28unsafe impl Sync for FutureWrapper {}
29
30impl Future for FutureWrapper {
31 type Output = Box<dyn Any + 'static>;
32
33 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
34 match self.get_mut() {
35 FutureWrapper::Send(fut: &mut FutureObj<'_, Box>) => {
36 Pin::new(fut).poll(cx:ctx).map(|b: Box| b as Box<dyn Any + 'static>)
37 }
38 FutureWrapper::NonSend(fut: &mut ThreadGuard>) => Pin::new(fut.get_mut()).poll(cx:ctx),
39 }
40 }
41}
42
43// The TaskSource and WakerSource are split up as the TaskSource
44// must only be finalized on the thread that owns the main context,
45// but the WakerSource is passed around to arbitrary threads for
46// being able to wake up the TaskSource.
47//
48// The WakerSource is set up as a child source of the TaskSource, i.e.
49// whenever it is ready also the TaskSource is ready.
50#[repr(C)]
51struct TaskSource {
52 source: ffi::GSource,
53 future: FutureWrapper,
54 waker: Waker,
55 return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>,
56}
57
58#[repr(C)]
59struct WakerSource {
60 source: ffi::GSource,
61}
62
63impl TaskSource {
64 unsafe extern "C" fn dispatch(
65 source: *mut ffi::GSource,
66 callback: ffi::GSourceFunc,
67 _user_data: ffi::gpointer,
68 ) -> ffi::gboolean {
69 let source = &mut *(source as *mut Self);
70 debug_assert!(callback.is_none());
71
72 // Poll the TaskSource and ensure we're never called again if the
73 // contained Future resolved now.
74 if let Poll::Ready(()) = source.poll() {
75 ffi::G_SOURCE_REMOVE
76 } else {
77 ffi::G_SOURCE_CONTINUE
78 }
79 }
80
81 unsafe extern "C" fn finalize(source: *mut ffi::GSource) {
82 let source = source as *mut Self;
83
84 // This will panic if the future was a local future and is dropped from a different thread
85 // than where it was created so try to drop it from the main context if we're on another
86 // thread and the main context still exists.
87 //
88 // This can only really happen if the `Source` was manually retrieve from the context, but
89 // better safe than sorry.
90 match (*source).future {
91 FutureWrapper::Send(_) => {
92 ptr::drop_in_place(&mut (*source).future);
93 }
94 FutureWrapper::NonSend(ref mut future) if future.is_owner() => {
95 ptr::drop_in_place(&mut (*source).future);
96 }
97 FutureWrapper::NonSend(ref mut future) => {
98 let context = ffi::g_source_get_context(source as *mut ffi::GSource);
99 if !context.is_null() {
100 let future = ptr::read(future);
101 let context = MainContext::from_glib_none(context);
102 context.invoke(move || {
103 drop(future);
104 });
105 } else {
106 // This will panic
107 ptr::drop_in_place(&mut (*source).future);
108 }
109 }
110 }
111
112 // Drop the waker to unref the underlying GSource
113 ptr::drop_in_place(&mut (*source).waker);
114 }
115}
116
117impl WakerSource {
118 unsafe fn clone_raw(waker: *const ()) -> RawWaker {
119 static VTABLE: RawWakerVTable = RawWakerVTable::new(
120 WakerSource::clone_raw,
121 WakerSource::wake_raw,
122 WakerSource::wake_by_ref_raw,
123 WakerSource::drop_raw,
124 );
125
126 let waker = waker as *const ffi::GSource;
127 ffi::g_source_ref(mut_override(waker));
128 RawWaker::new(waker as *const (), &VTABLE)
129 }
130
131 unsafe fn wake_raw(waker: *const ()) {
132 Self::wake_by_ref_raw(waker);
133 Self::drop_raw(waker);
134 }
135
136 unsafe fn wake_by_ref_raw(waker: *const ()) {
137 let waker = waker as *const ffi::GSource;
138 ffi::g_source_set_ready_time(mut_override(waker), 0);
139 }
140
141 unsafe fn drop_raw(waker: *const ()) {
142 let waker = waker as *const ffi::GSource;
143 ffi::g_source_unref(mut_override(waker));
144 }
145
146 unsafe extern "C" fn dispatch(
147 source: *mut ffi::GSource,
148 _callback: ffi::GSourceFunc,
149 _user_data: ffi::gpointer,
150 ) -> ffi::gboolean {
151 // Set ready-time to -1 so that we're not called again before
152 // being woken up another time.
153 ffi::g_source_set_ready_time(mut_override(source), -1);
154 ffi::G_SOURCE_CONTINUE
155 }
156}
157
158unsafe impl Send for TaskSource {}
159unsafe impl Sync for TaskSource {}
160
161unsafe impl Send for WakerSource {}
162unsafe impl Sync for WakerSource {}
163
164impl TaskSource {
165 #[allow(clippy::new_ret_no_self)]
166 // checker-ignore-item
167 fn new(
168 priority: Priority,
169 future: FutureWrapper,
170 return_tx: Option<oneshot::Sender<thread::Result<Box<dyn Any + 'static>>>>,
171 ) -> Source {
172 unsafe {
173 static TASK_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
174 check: None,
175 prepare: None,
176 dispatch: Some(TaskSource::dispatch),
177 finalize: Some(TaskSource::finalize),
178 closure_callback: None,
179 closure_marshal: None,
180 };
181 static WAKER_SOURCE_FUNCS: ffi::GSourceFuncs = ffi::GSourceFuncs {
182 check: None,
183 prepare: None,
184 dispatch: Some(WakerSource::dispatch),
185 finalize: None,
186 closure_callback: None,
187 closure_marshal: None,
188 };
189
190 let source = ffi::g_source_new(
191 mut_override(&TASK_SOURCE_FUNCS),
192 mem::size_of::<Self>() as u32,
193 );
194
195 let waker_source = ffi::g_source_new(
196 mut_override(&WAKER_SOURCE_FUNCS),
197 mem::size_of::<WakerSource>() as u32,
198 );
199
200 ffi::g_source_set_priority(source, priority.into_glib());
201 ffi::g_source_add_child_source(source, waker_source);
202
203 {
204 let source = &mut *(source as *mut Self);
205 ptr::write(&mut source.future, future);
206 ptr::write(&mut source.return_tx, return_tx);
207
208 // This creates a new reference to the waker source.
209 let waker = Waker::from_raw(WakerSource::clone_raw(waker_source as *const ()));
210 ptr::write(&mut source.waker, waker);
211 }
212
213 // Set ready time to 0 so that the source is immediately dispatched
214 // for doing the initial polling. This will then either resolve the
215 // future or register the waker wherever necessary.
216 ffi::g_source_set_ready_time(waker_source, 0);
217
218 // Unref the waker source, a strong reference to it is stored inside
219 // the task source directly and inside the task source as child source.
220 ffi::g_source_unref(waker_source);
221
222 from_glib_full(source)
223 }
224 }
225
226 fn poll(&mut self) -> Poll<()> {
227 let source = &self.source as *const _;
228 let executor: Borrowed<MainContext> =
229 unsafe { from_glib_borrow(ffi::g_source_get_context(mut_override(source))) };
230
231 assert!(
232 executor.is_owner(),
233 "Polling futures only allowed if the thread is owning the MainContext"
234 );
235
236 executor
237 .with_thread_default(|| {
238 let _enter = futures_executor::enter().unwrap();
239 let mut context = Context::from_waker(&self.waker);
240
241 // This will panic if the future was a local future and is called from
242 // a different thread than where it was created.
243 if let Some(tx) = self.return_tx.take() {
244 let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
245 Pin::new(&mut self.future).poll(&mut context)
246 }));
247 match res {
248 Ok(Poll::Ready(res)) => {
249 let _ = tx.send(Ok(res));
250 Poll::Ready(())
251 }
252 Ok(Poll::Pending) => {
253 self.return_tx.replace(tx);
254 Poll::Pending
255 }
256 Err(e) => {
257 let _ = tx.send(Err(e));
258 Poll::Ready(())
259 }
260 }
261 } else {
262 Pin::new(&mut self.future).poll(&mut context).map(|_| ())
263 }
264 })
265 .expect("current thread is not owner of the main context")
266 }
267}
268
269// rustdoc-stripper-ignore-next
270/// A handle to a task running on a [`MainContext`].
271///
272/// Like [`std::thread::JoinHandle`] for a task rather than a thread. The return value from the
273/// task can be retrieved by awaiting on this object. Dropping the handle "detaches" the task,
274/// allowing it to complete but discarding the return value.
275#[derive(Debug)]
276pub struct JoinHandle<T> {
277 rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>,
278 source: Source,
279 id: Cell<Option<NonZeroU32>>,
280 phantom: PhantomData<oneshot::Receiver<std::thread::Result<T>>>,
281}
282
283impl<T> JoinHandle<T> {
284 #[inline]
285 fn new(
286 ctx: &MainContext,
287 source: Source,
288 rx: oneshot::Receiver<std::thread::Result<Box<dyn Any + 'static>>>,
289 ) -> Self {
290 let id = source.attach(Some(ctx));
291 let id = Cell::new(Some(unsafe { NonZeroU32::new_unchecked(id.as_raw()) }));
292 Self {
293 rx,
294 source,
295 id,
296 phantom: PhantomData,
297 }
298 }
299 // rustdoc-stripper-ignore-next
300 /// Returns the internal source ID.
301 ///
302 /// Returns `None` if the handle was aborted already.
303 #[inline]
304 pub fn as_raw_source_id(&self) -> Option<u32> {
305 self.id.get().map(|i| i.get())
306 }
307 // rustdoc-stripper-ignore-next
308 /// Aborts the task associated with the handle.
309 #[inline]
310 pub fn abort(&self) {
311 self.source.destroy();
312 self.id.replace(None);
313 }
314 // rustdoc-stripper-ignore-next
315 /// Returns the [`Source`] associated with this handle.
316 #[inline]
317 pub fn source(&self) -> &Source {
318 &self.source
319 }
320 // rustdoc-stripper-ignore-next
321 /// Safely converts the handle into a [`SourceId`].
322 ///
323 /// Can be used to discard the return value while still retaining the ability to abort the
324 /// underlying task. Returns `Err(self)` if the handle was aborted already.
325 pub fn into_source_id(self) -> Result<SourceId, Self> {
326 if let Some(id) = self.id.take() {
327 Ok(unsafe { SourceId::from_glib(id.get()) })
328 } else {
329 Err(self)
330 }
331 }
332}
333
334impl<T: 'static> Future for JoinHandle<T> {
335 type Output = Result<T, JoinError>;
336 #[inline]
337 fn poll(
338 mut self: std::pin::Pin<&mut Self>,
339 cx: &mut std::task::Context<'_>,
340 ) -> std::task::Poll<Self::Output> {
341 std::pin::Pin::new(&mut self.rx).poll(cx).map(|r: Result, …>, …>| match r {
342 Err(_) => Err(JoinErrorInner::Cancelled.into()),
343 Ok(Err(e: Box)) => Err(JoinErrorInner::Panic(e).into()),
344 Ok(Ok(r: Box)) => Ok(*r.downcast().unwrap()),
345 })
346 }
347}
348
349impl<T: 'static> futures_core::FusedFuture for JoinHandle<T> {
350 #[inline]
351 fn is_terminated(&self) -> bool {
352 self.rx.is_terminated()
353 }
354}
355
356unsafe impl<T> Send for JoinHandle<T> {}
357
358// rustdoc-stripper-ignore-next
359/// Variant of [`JoinHandle`] that is returned from [`MainContext::spawn_from_within`].
360#[derive(Debug)]
361pub struct SpawnWithinJoinHandle<T> {
362 rx: Option<oneshot::Receiver<JoinHandle<T>>>,
363 join_handle: Option<JoinHandle<T>>,
364}
365
366impl<T> SpawnWithinJoinHandle<T> {
367 // rustdoc-stripper-ignore-next
368 /// Waits until the task is spawned and returns the [`JoinHandle`].
369 pub async fn into_inner(self) -> Result<JoinHandle<T>, JoinError> {
370 if let Some(join_handle: JoinHandle) = self.join_handle {
371 return Ok(join_handle);
372 }
373
374 if let Some(rx: Receiver>) = self.rx {
375 match rx.await {
376 Ok(join_handle: JoinHandle) => return Ok(join_handle),
377 Err(_) => return Err(JoinErrorInner::Cancelled.into()),
378 }
379 }
380
381 Err(JoinErrorInner::Cancelled.into())
382 }
383}
384
385impl<T: 'static> Future for SpawnWithinJoinHandle<T> {
386 type Output = Result<T, JoinError>;
387 #[inline]
388 fn poll(
389 mut self: std::pin::Pin<&mut Self>,
390 cx: &mut std::task::Context<'_>,
391 ) -> std::task::Poll<Self::Output> {
392 if let Some(ref mut rx) = self.rx {
393 match std::pin::Pin::new(rx).poll(cx) {
394 std::task::Poll::Pending => return std::task::Poll::Pending,
395 std::task::Poll::Ready(Err(_)) => {
396 self.rx = None;
397 return std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()));
398 }
399 std::task::Poll::Ready(Ok(join_handle)) => {
400 self.rx = None;
401 self.join_handle = Some(join_handle);
402 }
403 }
404 }
405
406 if let Some(ref mut join_handle) = self.join_handle {
407 match std::pin::Pin::new(join_handle).poll(cx) {
408 std::task::Poll::Pending => return std::task::Poll::Pending,
409 std::task::Poll::Ready(Err(e)) => {
410 self.join_handle = None;
411 return std::task::Poll::Ready(Err(e));
412 }
413 std::task::Poll::Ready(Ok(r)) => {
414 self.join_handle = None;
415 return std::task::Poll::Ready(Ok(r));
416 }
417 }
418 }
419
420 std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()))
421 }
422}
423
424impl<T: 'static> futures_core::FusedFuture for SpawnWithinJoinHandle<T> {
425 #[inline]
426 fn is_terminated(&self) -> bool {
427 if let Some(ref rx: &Receiver>) = self.rx {
428 rx.is_terminated()
429 } else if let Some(ref join_handle: &JoinHandle) = self.join_handle {
430 join_handle.is_terminated()
431 } else {
432 true
433 }
434 }
435}
436
437// rustdoc-stripper-ignore-next
438/// Task failure from awaiting a [`JoinHandle`].
439#[derive(Debug)]
440pub struct JoinError(JoinErrorInner);
441
442impl JoinError {
443 // rustdoc-stripper-ignore-next
444 /// Returns `true` if the handle was cancelled.
445 #[inline]
446 pub fn is_cancelled(&self) -> bool {
447 matches!(self.0, JoinErrorInner::Cancelled)
448 }
449 // rustdoc-stripper-ignore-next
450 /// Returns `true` if the task terminated with a panic.
451 #[inline]
452 pub fn is_panic(&self) -> bool {
453 matches!(self.0, JoinErrorInner::Panic(_))
454 }
455 // rustdoc-stripper-ignore-next
456 /// Converts the error into a panic result.
457 ///
458 /// # Panics
459 ///
460 /// Panics if the error is not a panic error. Use [`is_panic`](Self::is_panic) to check first
461 /// if the error represents a panic.
462 #[inline]
463 pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
464 self.try_into_panic()
465 .expect("`JoinError` is not a panic error")
466 }
467 // rustdoc-stripper-ignore-next
468 /// Attempts to convert the error into a panic result.
469 ///
470 /// Returns `Err(self)` if the error is not a panic result.
471 #[inline]
472 pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, Self> {
473 match self.0 {
474 JoinErrorInner::Panic(e) => Ok(e),
475 e => Err(Self(e)),
476 }
477 }
478}
479
480impl std::error::Error for JoinError {}
481
482impl std::fmt::Display for JoinError {
483 #[inline]
484 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
485 self.0.fmt(f)
486 }
487}
488
489#[derive(thiserror::Error, Debug)]
490enum JoinErrorInner {
491 #[error("task cancelled")]
492 Cancelled,
493 #[error("task panicked")]
494 Panic(Box<dyn Any + Send + 'static>),
495}
496
497impl From<JoinErrorInner> for JoinError {
498 #[inline]
499 fn from(e: JoinErrorInner) -> Self {
500 Self(e)
501 }
502}
503
504impl MainContext {
505 // rustdoc-stripper-ignore-next
506 /// Spawn a new infallible `Future` on the main context.
507 ///
508 /// This can be called from any thread and will execute the future from the thread
509 /// where main context is running, e.g. via a `MainLoop`.
510 pub fn spawn<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
511 &self,
512 f: F,
513 ) -> JoinHandle<R> {
514 self.spawn_with_priority(crate::Priority::default(), f)
515 }
516
517 // rustdoc-stripper-ignore-next
518 /// Spawn a new infallible `Future` on the main context.
519 ///
520 /// The given `Future` does not have to be `Send`.
521 ///
522 /// This can be called only from the thread where the main context is running, e.g.
523 /// from any other `Future` that is executed on this main context, or after calling
524 /// `with_thread_default` or `acquire` on the main context.
525 pub fn spawn_local<R: 'static, F: Future<Output = R> + 'static>(&self, f: F) -> JoinHandle<R> {
526 self.spawn_local_with_priority(crate::Priority::default(), f)
527 }
528
529 // rustdoc-stripper-ignore-next
530 /// Spawn a new infallible `Future` on the main context, with a non-default priority.
531 ///
532 /// This can be called from any thread and will execute the future from the thread
533 /// where main context is running, e.g. via a `MainLoop`.
534 pub fn spawn_with_priority<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
535 &self,
536 priority: Priority,
537 f: F,
538 ) -> JoinHandle<R> {
539 let f = FutureObj::new(Box::new(async move {
540 Box::new(f.await) as Box<dyn Any + Send + 'static>
541 }));
542 let (tx, rx) = oneshot::channel();
543 let source = TaskSource::new(priority, FutureWrapper::Send(f), Some(tx));
544 JoinHandle::new(self, source, rx)
545 }
546
547 // rustdoc-stripper-ignore-next
548 /// Spawn a new infallible `Future` on the main context, with a non-default priority.
549 ///
550 /// The given `Future` does not have to be `Send`.
551 ///
552 /// This can be called only from the thread where the main context is running, e.g.
553 /// from any other `Future` that is executed on this main context, or after calling
554 /// `with_thread_default` or `acquire` on the main context.
555 pub fn spawn_local_with_priority<R: 'static, F: Future<Output = R> + 'static>(
556 &self,
557 priority: Priority,
558 f: F,
559 ) -> JoinHandle<R> {
560 let _acquire = self
561 .acquire()
562 .expect("Spawning local futures only allowed on the thread owning the MainContext");
563 let f = LocalFutureObj::new(Box::new(async move {
564 Box::new(f.await) as Box<dyn Any + 'static>
565 }));
566 let (tx, rx) = oneshot::channel();
567 let source = TaskSource::new(
568 priority,
569 FutureWrapper::NonSend(ThreadGuard::new(f)),
570 Some(tx),
571 );
572 JoinHandle::new(self, source, rx)
573 }
574
575 // rustdoc-stripper-ignore-next
576 /// Spawn a new infallible `Future` on the main context from inside the main context.
577 ///
578 /// The given `Future` does not have to be `Send` but the closure to spawn it has to be.
579 ///
580 /// This can be called only from any thread.
581 pub fn spawn_from_within<R: 'static, F: Future<Output = R> + 'static>(
582 &self,
583 func: impl FnOnce() -> F + Send + 'static,
584 ) -> SpawnWithinJoinHandle<R> {
585 self.spawn_from_within_with_priority(crate::Priority::default(), func)
586 }
587
588 // rustdoc-stripper-ignore-next
589 /// Spawn a new infallible `Future` on the main context from inside the main context.
590 ///
591 /// The given `Future` does not have to be `Send` but the closure to spawn it has to be.
592 ///
593 /// This can be called only from any thread.
594 pub fn spawn_from_within_with_priority<R: 'static, F: Future<Output = R> + 'static>(
595 &self,
596 priority: Priority,
597 func: impl FnOnce() -> F + Send + 'static,
598 ) -> SpawnWithinJoinHandle<R> {
599 let ctx = self.clone();
600 let (tx, rx) = oneshot::channel();
601 self.invoke_with_priority(priority, move || {
602 let _ = tx.send(ctx.spawn_local(func()));
603 });
604
605 SpawnWithinJoinHandle {
606 rx: Some(rx),
607 join_handle: None,
608 }
609 }
610
611 // rustdoc-stripper-ignore-next
612 /// Runs a new, infallible `Future` on the main context and block until it finished, returning
613 /// the result of the `Future`.
614 ///
615 /// The given `Future` does not have to be `Send` or `'static`.
616 ///
617 /// This must only be called if no `MainLoop` or anything else is running on this specific main
618 /// context.
619 #[allow(clippy::transmute_ptr_to_ptr)]
620 pub fn block_on<F: Future>(&self, f: F) -> F::Output {
621 let mut res = None;
622 let l = MainLoop::new(Some(self), false);
623
624 let f = async {
625 res = Some(panic::AssertUnwindSafe(f).catch_unwind().await);
626 l.quit();
627 };
628
629 let f = unsafe {
630 // Super-unsafe: We transmute here to get rid of the 'static lifetime
631 let f = LocalFutureObj::new(Box::new(async move {
632 f.await;
633 Box::new(()) as Box<dyn Any + 'static>
634 }));
635 let f: LocalFutureObj<'static, Box<dyn Any + 'static>> = mem::transmute(f);
636 f
637 };
638
639 let source = TaskSource::new(
640 crate::Priority::default(),
641 FutureWrapper::NonSend(ThreadGuard::new(f)),
642 None,
643 );
644 source.attach(Some(self));
645
646 l.run();
647
648 match res.unwrap() {
649 Ok(v) => v,
650 Err(e) => panic::resume_unwind(e),
651 }
652 }
653}
654
655impl Spawn for MainContext {
656 fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> {
657 let (tx: Sender, …>>, _) = oneshot::channel();
658 let source: Source = TaskSource::new(
659 crate::Priority::default(),
660 future:FutureWrapper::Send(FutureObj::new(Box::new(async move {
661 f.await;
662 Box::new(()) as Box<dyn Any + Send + 'static>
663 }))),
664 return_tx:Some(tx),
665 );
666 source.attach(context:Some(self));
667 Ok(())
668 }
669}
670
671impl LocalSpawn for MainContext {
672 fn spawn_local_obj(&self, f: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
673 let (tx: Sender, …>>, _) = oneshot::channel();
674 let source: Source = TaskSource::new(
675 crate::Priority::default(),
676 future:FutureWrapper::NonSend(ThreadGuard::new(LocalFutureObj::new(Box::new(
677 async move {
678 f.await;
679 Box::new(()) as Box<dyn Any + 'static>
680 },
681 )))),
682 return_tx:Some(tx),
683 );
684 source.attach(context:Some(self));
685 Ok(())
686 }
687}
688
689#[cfg(test)]
690mod tests {
691 use std::{sync::mpsc, thread};
692
693 use futures_channel::oneshot;
694 use futures_util::future::{FutureExt, TryFutureExt};
695
696 use super::*;
697
698 #[test]
699 fn test_spawn() {
700 let c = MainContext::new();
701 let l = crate::MainLoop::new(Some(&c), false);
702
703 let (sender, receiver) = mpsc::channel();
704 let (o_sender, o_receiver) = oneshot::channel();
705
706 let l_clone = l.clone();
707 c.spawn(
708 o_receiver
709 .and_then(move |()| {
710 sender.send(()).unwrap();
711 l_clone.quit();
712
713 futures_util::future::ok(())
714 })
715 .then(|res| {
716 assert!(res.is_ok());
717 futures_util::future::ready(())
718 }),
719 );
720
721 thread::spawn(move || {
722 l.run();
723 });
724
725 o_sender.send(()).unwrap();
726
727 receiver.recv().unwrap();
728 }
729
730 #[test]
731 fn test_spawn_local() {
732 let c = MainContext::new();
733 let l = crate::MainLoop::new(Some(&c), false);
734
735 c.with_thread_default(|| {
736 let l_clone = l.clone();
737 c.spawn_local(futures_util::future::lazy(move |_ctx| {
738 l_clone.quit();
739 }));
740
741 l.run();
742 })
743 .unwrap();
744 }
745
746 #[test]
747 fn test_spawn_from_within() {
748 let c = MainContext::new();
749 let l = crate::MainLoop::new(Some(&c), false);
750
751 std::thread::spawn({
752 let l_clone = l.clone();
753 move || {
754 c.spawn_from_within(move || async move {
755 l_clone.quit();
756 });
757 }
758 });
759
760 l.run();
761 }
762
763 #[test]
764 fn test_block_on() {
765 let c = MainContext::new();
766
767 let mut v = None;
768 {
769 let v = &mut v;
770
771 let future = futures_util::future::lazy(|_ctx| {
772 *v = Some(123);
773 Ok::<i32, ()>(123)
774 });
775
776 let res = c.block_on(future);
777 assert_eq!(res, Ok(123));
778 }
779
780 assert_eq!(v, Some(123));
781 }
782
783 #[test]
784 fn test_spawn_return() {
785 let c = MainContext::new();
786 c.block_on(async {
787 let val = 1;
788 let ret = c
789 .spawn(async move { futures_util::future::ready(2).await + val })
790 .await;
791 assert_eq!(ret.unwrap(), 3);
792 });
793 }
794
795 #[test]
796 fn test_spawn_panic() {
797 let c = MainContext::new();
798 c.block_on(async {
799 let ret = c
800 .spawn(async {
801 panic!("failed");
802 })
803 .await;
804 assert_eq!(
805 *ret.unwrap_err().into_panic().downcast::<&str>().unwrap(),
806 "failed"
807 );
808 });
809 }
810}
811