1#[cfg(not(feature = "tokio"))]
2use async_executor::Executor as AsyncExecutor;
3#[cfg(not(feature = "tokio"))]
4use async_task::Task as AsyncTask;
5#[cfg(not(feature = "tokio"))]
6use std::sync::Arc;
7#[cfg(feature = "tokio")]
8use std::{future::pending, marker::PhantomData};
9use std::{
10 future::Future,
11 pin::Pin,
12 task::{Context, Poll},
13};
14#[cfg(feature = "tokio")]
15use tokio::task::JoinHandle;
16
17/// A wrapper around the underlying runtime/executor.
18///
19/// This is used to run asynchronous tasks internally and allows integration with various runtimes.
20/// See [`crate::Connection::executor`] for an example of integration with external runtimes.
21///
22/// **Note:** You can (and should) completely ignore this type when building with `tokio` feature
23/// enabled.
24#[cfg(not(feature = "tokio"))]
25#[derive(Debug, Clone)]
26pub struct Executor<'a> {
27 executor: Arc<AsyncExecutor<'a>>,
28}
29#[cfg(feature = "tokio")]
30#[derive(Debug, Clone)]
31pub struct Executor<'a> {
32 phantom: PhantomData<&'a ()>,
33}
34
35impl<'a> Executor<'a> {
36 /// Spawns a task onto the executor.
37 #[doc(hidden)]
38 pub fn spawn<T: Send + 'static>(
39 &self,
40 future: impl Future<Output = T> + Send + 'static,
41 #[allow(unused)] name: &str,
42 ) -> Task<T> {
43 #[cfg(not(feature = "tokio"))]
44 {
45 Task(Some(self.executor.spawn(future)))
46 }
47
48 #[cfg(feature = "tokio")]
49 {
50 #[cfg(tokio_unstable)]
51 {
52 Task(Some(
53 tokio::task::Builder::new()
54 .name(name)
55 .spawn(future)
56 // SAFETY: Looking at the code, this call always returns an `Ok`.
57 .unwrap(),
58 ))
59 }
60 #[cfg(not(tokio_unstable))]
61 {
62 Task(Some(tokio::task::spawn(future)))
63 }
64 }
65 }
66
67 /// Returns `true` if there are no unfinished tasks.
68 ///
69 /// With `tokio` feature enabled, this always returns `true`.
70 pub fn is_empty(&self) -> bool {
71 #[cfg(not(feature = "tokio"))]
72 {
73 self.executor.is_empty()
74 }
75
76 #[cfg(feature = "tokio")]
77 true
78 }
79
80 /// Runs a single task.
81 ///
82 /// With `tokio` feature enabled, its a noop and never returns.
83 pub async fn tick(&self) {
84 #[cfg(not(feature = "tokio"))]
85 {
86 self.executor.tick().await
87 }
88
89 #[cfg(feature = "tokio")]
90 {
91 pending().await
92 }
93 }
94
95 /// Create a new `Executor`.
96 pub(crate) fn new() -> Self {
97 #[cfg(not(feature = "tokio"))]
98 {
99 Self {
100 executor: Arc::new(AsyncExecutor::new()),
101 }
102 }
103
104 #[cfg(feature = "tokio")]
105 {
106 Self {
107 phantom: PhantomData,
108 }
109 }
110 }
111
112 /// Runs the executor until the given future completes.
113 ///
114 /// With `tokio` feature enabled, it just awaits on the `future`.
115 pub(crate) async fn run<T>(&self, future: impl Future<Output = T>) -> T {
116 #[cfg(not(feature = "tokio"))]
117 {
118 self.executor.run(future).await
119 }
120 #[cfg(feature = "tokio")]
121 {
122 future.await
123 }
124 }
125}
126
127/// A wrapper around the task API of the underlying runtime/executor.
128///
129/// This follows the semantics of `async_task::Task` on drop:
130///
131/// * it will be cancelled, rather than detached. For detaching, use the `detach` method.
132/// * errors from the task cancellation will will be ignored. If you need to know about task errors,
133/// convert the task to a `FallibleTask` using the `fallible` method.
134#[cfg(not(feature = "tokio"))]
135#[doc(hidden)]
136#[derive(Debug)]
137pub struct Task<T>(Option<AsyncTask<T>>);
138#[cfg(feature = "tokio")]
139#[doc(hidden)]
140#[derive(Debug)]
141pub struct Task<T>(Option<JoinHandle<T>>);
142
143impl<T> Task<T> {
144 /// Detaches the task to let it keep running in the background.
145 #[allow(unused_mut)]
146 #[allow(unused)]
147 pub fn detach(mut self) {
148 #[cfg(not(feature = "tokio"))]
149 {
150 self.0.take().expect(msg:"async_task::Task is none").detach()
151 }
152
153 #[cfg(feature = "tokio")]
154 {
155 self.0.take().expect("tokio::task::JoinHandle is none");
156 }
157 }
158}
159
160impl<T> Task<T>
161where
162 T: Send + 'static,
163{
164 /// Launch the given blocking function in a task.
165 #[allow(unused)]
166 pub(crate) fn spawn_blocking<F>(f: F, #[allow(unused)] name: &str) -> Self
167 where
168 F: FnOnce() -> T + Send + 'static,
169 {
170 #[cfg(not(feature = "tokio"))]
171 {
172 Self(Some(blocking::unblock(f)))
173 }
174
175 #[cfg(feature = "tokio")]
176 {
177 #[cfg(tokio_unstable)]
178 {
179 Self(Some(
180 tokio::task::Builder::new()
181 .name(name)
182 .spawn_blocking(f)
183 // SAFETY: Looking at the code, this call always returns an `Ok`.
184 .unwrap(),
185 ))
186 }
187 #[cfg(not(tokio_unstable))]
188 {
189 Self(Some(tokio::task::spawn_blocking(f)))
190 }
191 }
192 }
193}
194
195impl<T> Drop for Task<T> {
196 fn drop(&mut self) {
197 #[cfg(feature = "tokio")]
198 {
199 if let Some(join_handle) = self.0.take() {
200 join_handle.abort();
201 }
202 }
203 }
204}
205
206impl<T> Future for Task<T> {
207 type Output = T;
208
209 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
210 #[cfg(not(feature = "tokio"))]
211 {
212 Pin::new(&mut self.get_mut().0.as_mut().expect(msg:"async_task::Task is none")).poll(cx)
213 }
214
215 #[cfg(feature = "tokio")]
216 {
217 Pin::new(
218 &mut self
219 .get_mut()
220 .0
221 .as_mut()
222 .expect("tokio::task::JoinHandle is none"),
223 )
224 .poll(cx)
225 .map(|r| r.expect("tokio::task::JoinHandle error"))
226 }
227 }
228}
229