1// Copyright 2024 The AccessKit Authors. All rights reserved.
2// Licensed under the Apache License, Version 2.0 (found in
3// the LICENSE-APACHE file) or the MIT license (found in
4// the LICENSE-MIT file), at your option.
5
6// Derived from zbus.
7// Copyright 2024 Zeeshan Ali Khan.
8// Licensed under the MIT license (found in the LICENSE-MIT file).
9
10#[cfg(not(feature = "tokio"))]
11use async_executor::Executor as AsyncExecutor;
12#[cfg(not(feature = "tokio"))]
13use async_task::Task as AsyncTask;
14#[cfg(feature = "tokio")]
15use std::marker::PhantomData;
16#[cfg(not(feature = "tokio"))]
17use std::sync::Arc;
18use std::{
19 future::Future,
20 pin::Pin,
21 task::{Context, Poll},
22};
23#[cfg(feature = "tokio")]
24use tokio::task::JoinHandle;
25
26/// A wrapper around the underlying runtime/executor.
27///
28/// This is used to run asynchronous tasks internally and allows integration with various runtimes.
29/// See [`crate::Connection::executor`] for an example of integration with external runtimes.
30///
31/// **Note:** You can (and should) completely ignore this type when building with `tokio` feature
32/// enabled.
33#[cfg(not(feature = "tokio"))]
34#[derive(Debug, Clone)]
35pub(crate) struct Executor<'a> {
36 executor: Arc<AsyncExecutor<'a>>,
37}
38#[cfg(feature = "tokio")]
39#[derive(Debug, Clone)]
40pub(crate) struct Executor<'a> {
41 phantom: PhantomData<&'a ()>,
42}
43
44impl<'a> Executor<'a> {
45 /// Spawns a task onto the executor.
46 pub(crate) fn spawn<T: Send + 'static>(
47 &self,
48 future: impl Future<Output = T> + Send + 'static,
49 #[allow(unused)] name: &str,
50 ) -> Task<T> {
51 #[cfg(not(feature = "tokio"))]
52 {
53 Task(Some(self.executor.spawn(future)))
54 }
55
56 #[cfg(feature = "tokio")]
57 {
58 #[cfg(tokio_unstable)]
59 {
60 Task(Some(
61 tokio::task::Builder::new()
62 .name(name)
63 .spawn(future)
64 // SAFETY: Looking at the code, this call always returns an `Ok`.
65 .unwrap(),
66 ))
67 }
68 #[cfg(not(tokio_unstable))]
69 {
70 Task(Some(tokio::task::spawn(future)))
71 }
72 }
73 }
74
75 /// Create a new `Executor`.
76 pub(crate) fn new() -> Self {
77 #[cfg(not(feature = "tokio"))]
78 {
79 Self {
80 executor: Arc::new(AsyncExecutor::new()),
81 }
82 }
83
84 #[cfg(feature = "tokio")]
85 {
86 Self {
87 phantom: PhantomData,
88 }
89 }
90 }
91
92 /// Runs the executor until the given future completes.
93 ///
94 /// With `tokio` feature enabled, it just awaits on the `future`.
95 pub(crate) async fn run<T>(&self, future: impl Future<Output = T>) -> T {
96 #[cfg(not(feature = "tokio"))]
97 {
98 self.executor.run(future).await
99 }
100 #[cfg(feature = "tokio")]
101 {
102 future.await
103 }
104 }
105}
106
107/// A wrapper around the task API of the underlying runtime/executor.
108///
109/// This follows the semantics of `async_task::Task` on drop:
110///
111/// * it will be cancelled, rather than detached. For detaching, use the `detach` method.
112/// * errors from the task cancellation will will be ignored. If you need to know about task errors,
113/// convert the task to a `FallibleTask` using the `fallible` method.
114#[cfg(not(feature = "tokio"))]
115#[derive(Debug)]
116pub(crate) struct Task<T>(Option<AsyncTask<T>>);
117#[cfg(feature = "tokio")]
118#[derive(Debug)]
119pub(crate) struct Task<T>(Option<JoinHandle<T>>);
120
121impl<T> Task<T> {
122 /// Detaches the task to let it keep running in the background.
123 #[allow(unused_mut)]
124 #[allow(unused)]
125 pub(crate) fn detach(mut self) {
126 #[cfg(not(feature = "tokio"))]
127 {
128 self.0.take().expect(msg:"async_task::Task is none").detach()
129 }
130
131 #[cfg(feature = "tokio")]
132 {
133 self.0.take().expect("tokio::task::JoinHandle is none");
134 }
135 }
136}
137
138impl<T> Drop for Task<T> {
139 fn drop(&mut self) {
140 #[cfg(feature = "tokio")]
141 {
142 if let Some(join_handle) = self.0.take() {
143 join_handle.abort();
144 }
145 }
146 }
147}
148
149impl<T> Future for Task<T> {
150 type Output = T;
151
152 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
153 #[cfg(not(feature = "tokio"))]
154 {
155 Pin::new(&mut self.get_mut().0.as_mut().expect(msg:"async_task::Task is none")).poll(cx)
156 }
157
158 #[cfg(feature = "tokio")]
159 {
160 Pin::new(
161 &mut self
162 .get_mut()
163 .0
164 .as_mut()
165 .expect("tokio::task::JoinHandle is none"),
166 )
167 .poll(cx)
168 .map(|r| r.expect("tokio::task::JoinHandle error"))
169 }
170 }
171}
172