1use core::fmt;
2use core::future::Future;
3use core::marker::PhantomData;
4use core::mem;
5use core::pin::Pin;
6use core::ptr::NonNull;
7use core::sync::atomic::Ordering;
8use core::task::{Context, Poll};
9
10use crate::header::Header;
11use crate::raw::Panic;
12use crate::runnable::ScheduleInfo;
13use crate::state::*;
14
15/// A spawned task.
16///
17/// A [`Task`] can be awaited to retrieve the output of its future.
18///
19/// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
20/// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
21/// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
22/// method.
23///
24/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
25/// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking
26/// [`run()`][`super::Runnable::run()`].
27///
28/// # Examples
29///
30/// ```
31/// use smol::{future, Executor};
32/// use std::thread;
33///
34/// let ex = Executor::new();
35///
36/// // Spawn a future onto the executor.
37/// let task = ex.spawn(async {
38/// println!("Hello from a task!");
39/// 1 + 2
40/// });
41///
42/// // Run an executor thread.
43/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
44///
45/// // Wait for the task's output.
46/// assert_eq!(future::block_on(task), 3);
47/// ```
48#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
49pub struct Task<T, M = ()> {
50 /// A raw task pointer.
51 pub(crate) ptr: NonNull<()>,
52
53 /// A marker capturing generic types `T` and `M`.
54 pub(crate) _marker: PhantomData<(T, M)>,
55}
56
57unsafe impl<T: Send, M: Send + Sync> Send for Task<T, M> {}
58unsafe impl<T, M: Send + Sync> Sync for Task<T, M> {}
59
60impl<T, M> Unpin for Task<T, M> {}
61
62#[cfg(feature = "std")]
63impl<T, M> std::panic::UnwindSafe for Task<T, M> {}
64#[cfg(feature = "std")]
65impl<T, M> std::panic::RefUnwindSafe for Task<T, M> {}
66
67impl<T, M> Task<T, M> {
68 /// Detaches the task to let it keep running in the background.
69 ///
70 /// # Examples
71 ///
72 /// ```
73 /// use smol::{Executor, Timer};
74 /// use std::time::Duration;
75 ///
76 /// let ex = Executor::new();
77 ///
78 /// // Spawn a deamon future.
79 /// ex.spawn(async {
80 /// loop {
81 /// println!("I'm a daemon task looping forever.");
82 /// Timer::after(Duration::from_secs(1)).await;
83 /// }
84 /// })
85 /// .detach();
86 /// ```
87 pub fn detach(self) {
88 let mut this = self;
89 let _out = this.set_detached();
90 mem::forget(this);
91 }
92
93 /// Cancels the task and waits for it to stop running.
94 ///
95 /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
96 /// it didn't complete.
97 ///
98 /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
99 /// canceling because it also waits for the task to stop running.
100 ///
101 /// # Examples
102 ///
103 /// ```
104 /// # if cfg!(miri) { return; } // Miri does not support epoll
105 /// use smol::{future, Executor, Timer};
106 /// use std::thread;
107 /// use std::time::Duration;
108 ///
109 /// let ex = Executor::new();
110 ///
111 /// // Spawn a deamon future.
112 /// let task = ex.spawn(async {
113 /// loop {
114 /// println!("Even though I'm in an infinite loop, you can still cancel me!");
115 /// Timer::after(Duration::from_secs(1)).await;
116 /// }
117 /// });
118 ///
119 /// // Run an executor thread.
120 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
121 ///
122 /// future::block_on(async {
123 /// Timer::after(Duration::from_secs(3)).await;
124 /// task.cancel().await;
125 /// });
126 /// ```
127 pub async fn cancel(self) -> Option<T> {
128 let mut this = self;
129 this.set_canceled();
130 this.fallible().await
131 }
132
133 /// Converts this task into a [`FallibleTask`].
134 ///
135 /// Like [`Task`], a fallible task will poll the task's output until it is
136 /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being
137 /// dropped without being run. Resolves to the task's output when completed,
138 /// or [`None`] if it didn't complete.
139 ///
140 /// # Examples
141 ///
142 /// ```
143 /// use smol::{future, Executor};
144 /// use std::thread;
145 ///
146 /// let ex = Executor::new();
147 ///
148 /// // Spawn a future onto the executor.
149 /// let task = ex.spawn(async {
150 /// println!("Hello from a task!");
151 /// 1 + 2
152 /// })
153 /// .fallible();
154 ///
155 /// // Run an executor thread.
156 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
157 ///
158 /// // Wait for the task's output.
159 /// assert_eq!(future::block_on(task), Some(3));
160 /// ```
161 ///
162 /// ```
163 /// use smol::future;
164 ///
165 /// // Schedule function which drops the runnable without running it.
166 /// let schedule = move |runnable| drop(runnable);
167 ///
168 /// // Create a task with the future and the schedule function.
169 /// let (runnable, task) = async_task::spawn(async {
170 /// println!("Hello from a task!");
171 /// 1 + 2
172 /// }, schedule);
173 /// runnable.schedule();
174 ///
175 /// // Wait for the task's output.
176 /// assert_eq!(future::block_on(task.fallible()), None);
177 /// ```
178 pub fn fallible(self) -> FallibleTask<T, M> {
179 FallibleTask { task: self }
180 }
181
182 /// Puts the task in canceled state.
183 fn set_canceled(&mut self) {
184 let ptr = self.ptr.as_ptr();
185 let header = ptr as *const Header<M>;
186
187 unsafe {
188 let mut state = (*header).state.load(Ordering::Acquire);
189
190 loop {
191 // If the task has been completed or closed, it can't be canceled.
192 if state & (COMPLETED | CLOSED) != 0 {
193 break;
194 }
195
196 // If the task is not scheduled nor running, we'll need to schedule it.
197 let new = if state & (SCHEDULED | RUNNING) == 0 {
198 (state | SCHEDULED | CLOSED) + REFERENCE
199 } else {
200 state | CLOSED
201 };
202
203 // Mark the task as closed.
204 match (*header).state.compare_exchange_weak(
205 state,
206 new,
207 Ordering::AcqRel,
208 Ordering::Acquire,
209 ) {
210 Ok(_) => {
211 // If the task is not scheduled nor running, schedule it one more time so
212 // that its future gets dropped by the executor.
213 if state & (SCHEDULED | RUNNING) == 0 {
214 ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
215 }
216
217 // Notify the awaiter that the task has been closed.
218 if state & AWAITER != 0 {
219 (*header).notify(None);
220 }
221
222 break;
223 }
224 Err(s) => state = s,
225 }
226 }
227 }
228 }
229
230 /// Puts the task in detached state.
231 fn set_detached(&mut self) -> Option<Result<T, Panic>> {
232 let ptr = self.ptr.as_ptr();
233 let header = ptr as *const Header<M>;
234
235 unsafe {
236 // A place where the output will be stored in case it needs to be dropped.
237 let mut output = None;
238
239 // Optimistically assume the `Task` is being detached just after creating the task.
240 // This is a common case so if the `Task` is datached, the overhead of it is only one
241 // compare-exchange operation.
242 if let Err(mut state) = (*header).state.compare_exchange_weak(
243 SCHEDULED | TASK | REFERENCE,
244 SCHEDULED | REFERENCE,
245 Ordering::AcqRel,
246 Ordering::Acquire,
247 ) {
248 loop {
249 // If the task has been completed but not yet closed, that means its output
250 // must be dropped.
251 if state & COMPLETED != 0 && state & CLOSED == 0 {
252 // Mark the task as closed in order to grab its output.
253 match (*header).state.compare_exchange_weak(
254 state,
255 state | CLOSED,
256 Ordering::AcqRel,
257 Ordering::Acquire,
258 ) {
259 Ok(_) => {
260 // Read the output.
261 output = Some(
262 (((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>)
263 .read(),
264 );
265
266 // Update the state variable because we're continuing the loop.
267 state |= CLOSED;
268 }
269 Err(s) => state = s,
270 }
271 } else {
272 // If this is the last reference to the task and it's not closed, then
273 // close it and schedule one more time so that its future gets dropped by
274 // the executor.
275 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
276 SCHEDULED | CLOSED | REFERENCE
277 } else {
278 state & !TASK
279 };
280
281 // Unset the `TASK` flag.
282 match (*header).state.compare_exchange_weak(
283 state,
284 new,
285 Ordering::AcqRel,
286 Ordering::Acquire,
287 ) {
288 Ok(_) => {
289 // If this is the last reference to the task, we need to either
290 // schedule dropping its future or destroy it.
291 if state & !(REFERENCE - 1) == 0 {
292 if state & CLOSED == 0 {
293 ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
294 } else {
295 ((*header).vtable.destroy)(ptr);
296 }
297 }
298
299 break;
300 }
301 Err(s) => state = s,
302 }
303 }
304 }
305 }
306
307 output
308 }
309 }
310
311 /// Polls the task to retrieve its output.
312 ///
313 /// Returns `Some` if the task has completed or `None` if it was closed.
314 ///
315 /// A task becomes closed in the following cases:
316 ///
317 /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
318 /// 2. Its output gets awaited by the `Task`.
319 /// 3. It panics while polling the future.
320 /// 4. It is completed and the `Task` gets dropped.
321 fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
322 let ptr = self.ptr.as_ptr();
323 let header = ptr as *const Header<M>;
324
325 unsafe {
326 let mut state = (*header).state.load(Ordering::Acquire);
327
328 loop {
329 // If the task has been closed, notify the awaiter and return `None`.
330 if state & CLOSED != 0 {
331 // If the task is scheduled or running, we need to wait until its future is
332 // dropped.
333 if state & (SCHEDULED | RUNNING) != 0 {
334 // Replace the waker with one associated with the current task.
335 (*header).register(cx.waker());
336
337 // Reload the state after registering. It is possible changes occurred just
338 // before registration so we need to check for that.
339 state = (*header).state.load(Ordering::Acquire);
340
341 // If the task is still scheduled or running, we need to wait because its
342 // future is not dropped yet.
343 if state & (SCHEDULED | RUNNING) != 0 {
344 return Poll::Pending;
345 }
346 }
347
348 // Even though the awaiter is most likely the current task, it could also be
349 // another task.
350 (*header).notify(Some(cx.waker()));
351 return Poll::Ready(None);
352 }
353
354 // If the task is not completed, register the current task.
355 if state & COMPLETED == 0 {
356 // Replace the waker with one associated with the current task.
357 (*header).register(cx.waker());
358
359 // Reload the state after registering. It is possible that the task became
360 // completed or closed just before registration so we need to check for that.
361 state = (*header).state.load(Ordering::Acquire);
362
363 // If the task has been closed, restart.
364 if state & CLOSED != 0 {
365 continue;
366 }
367
368 // If the task is still not completed, we're blocked on it.
369 if state & COMPLETED == 0 {
370 return Poll::Pending;
371 }
372 }
373
374 // Since the task is now completed, mark it as closed in order to grab its output.
375 match (*header).state.compare_exchange(
376 state,
377 state | CLOSED,
378 Ordering::AcqRel,
379 Ordering::Acquire,
380 ) {
381 Ok(_) => {
382 // Notify the awaiter. Even though the awaiter is most likely the current
383 // task, it could also be another task.
384 if state & AWAITER != 0 {
385 (*header).notify(Some(cx.waker()));
386 }
387
388 // Take the output from the task.
389 let output = ((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>;
390 let output = output.read();
391
392 // Propagate the panic if the task panicked.
393 let output = match output {
394 Ok(output) => output,
395 Err(panic) => {
396 #[cfg(feature = "std")]
397 std::panic::resume_unwind(panic);
398
399 #[cfg(not(feature = "std"))]
400 match panic {}
401 }
402 };
403
404 return Poll::Ready(Some(output));
405 }
406 Err(s) => state = s,
407 }
408 }
409 }
410 }
411
412 fn header(&self) -> &Header<M> {
413 let ptr = self.ptr.as_ptr();
414 let header = ptr as *const Header<M>;
415 unsafe { &*header }
416 }
417
418 /// Returns `true` if the current task is finished.
419 ///
420 /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
421 pub fn is_finished(&self) -> bool {
422 let ptr = self.ptr.as_ptr();
423 let header = ptr as *const Header<M>;
424
425 unsafe {
426 let state = (*header).state.load(Ordering::Acquire);
427 state & (CLOSED | COMPLETED) != 0
428 }
429 }
430
431 /// Get the metadata associated with this task.
432 ///
433 /// Tasks can be created with a metadata object associated with them; by default, this
434 /// is a `()` value. See the [`Builder::metadata()`] method for more information.
435 pub fn metadata(&self) -> &M {
436 &self.header().metadata
437 }
438}
439
440impl<T, M> Drop for Task<T, M> {
441 fn drop(&mut self) {
442 self.set_canceled();
443 self.set_detached();
444 }
445}
446
447impl<T, M> Future for Task<T, M> {
448 type Output = T;
449
450 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451 match self.poll_task(cx) {
452 Poll::Ready(t: Option) => Poll::Ready(t.expect(msg:"task has failed")),
453 Poll::Pending => Poll::Pending,
454 }
455 }
456}
457
458impl<T, M: fmt::Debug> fmt::Debug for Task<T, M> {
459 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
460 f&mut DebugStruct<'_, '_>.debug_struct("Task")
461 .field(name:"header", self.header())
462 .finish()
463 }
464}
465
466/// A spawned task with a fallible response.
467///
468/// This type behaves like [`Task`], however it produces an `Option<T>` when
469/// polled and will return `None` if the executor dropped its
470/// [`Runnable`][`super::Runnable`] without being run.
471///
472/// This can be useful to avoid the panic produced when polling the `Task`
473/// future if the executor dropped its `Runnable`.
474#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
475pub struct FallibleTask<T, M = ()> {
476 task: Task<T, M>,
477}
478
479impl<T, M> FallibleTask<T, M> {
480 /// Detaches the task to let it keep running in the background.
481 ///
482 /// # Examples
483 ///
484 /// ```
485 /// use smol::{Executor, Timer};
486 /// use std::time::Duration;
487 ///
488 /// let ex = Executor::new();
489 ///
490 /// // Spawn a deamon future.
491 /// ex.spawn(async {
492 /// loop {
493 /// println!("I'm a daemon task looping forever.");
494 /// Timer::after(Duration::from_secs(1)).await;
495 /// }
496 /// })
497 /// .fallible()
498 /// .detach();
499 /// ```
500 pub fn detach(self) {
501 self.task.detach()
502 }
503
504 /// Cancels the task and waits for it to stop running.
505 ///
506 /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
507 /// it didn't complete.
508 ///
509 /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
510 /// canceling because it also waits for the task to stop running.
511 ///
512 /// # Examples
513 ///
514 /// ```
515 /// # if cfg!(miri) { return; } // Miri does not support epoll
516 /// use smol::{future, Executor, Timer};
517 /// use std::thread;
518 /// use std::time::Duration;
519 ///
520 /// let ex = Executor::new();
521 ///
522 /// // Spawn a deamon future.
523 /// let task = ex.spawn(async {
524 /// loop {
525 /// println!("Even though I'm in an infinite loop, you can still cancel me!");
526 /// Timer::after(Duration::from_secs(1)).await;
527 /// }
528 /// })
529 /// .fallible();
530 ///
531 /// // Run an executor thread.
532 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
533 ///
534 /// future::block_on(async {
535 /// Timer::after(Duration::from_secs(3)).await;
536 /// task.cancel().await;
537 /// });
538 /// ```
539 pub async fn cancel(self) -> Option<T> {
540 self.task.cancel().await
541 }
542
543 /// Returns `true` if the current task is finished.
544 ///
545 /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
546 pub fn is_finished(&self) -> bool {
547 self.task.is_finished()
548 }
549}
550
551impl<T, M> Future for FallibleTask<T, M> {
552 type Output = Option<T>;
553
554 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
555 self.task.poll_task(cx)
556 }
557}
558
559impl<T, M: fmt::Debug> fmt::Debug for FallibleTask<T, M> {
560 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
561 f&mut DebugStruct<'_, '_>.debug_struct("FallibleTask")
562 .field(name:"header", self.task.header())
563 .finish()
564 }
565}
566