1 | use core::fmt; |
2 | use core::future::Future; |
3 | use core::marker::PhantomData; |
4 | use core::mem; |
5 | use core::pin::Pin; |
6 | use core::ptr::NonNull; |
7 | use core::sync::atomic::Ordering; |
8 | use core::task::{Context, Poll}; |
9 | |
10 | use crate::header::Header; |
11 | use crate::raw::Panic; |
12 | use crate::runnable::ScheduleInfo; |
13 | use crate::state::*; |
14 | |
15 | /// A spawned task. |
16 | /// |
17 | /// A [`Task`] can be awaited to retrieve the output of its future. |
18 | /// |
19 | /// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the |
20 | /// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a |
21 | /// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()] |
22 | /// method. |
23 | /// |
24 | /// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor |
25 | /// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking |
26 | /// [`run()`][`super::Runnable::run()`]. |
27 | /// |
28 | /// # Examples |
29 | /// |
30 | /// ``` |
31 | /// use smol::{future, Executor}; |
32 | /// use std::thread; |
33 | /// |
34 | /// let ex = Executor::new(); |
35 | /// |
36 | /// // Spawn a future onto the executor. |
37 | /// let task = ex.spawn(async { |
38 | /// println!("Hello from a task!" ); |
39 | /// 1 + 2 |
40 | /// }); |
41 | /// |
42 | /// // Run an executor thread. |
43 | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
44 | /// |
45 | /// // Wait for the task's output. |
46 | /// assert_eq!(future::block_on(task), 3); |
47 | /// ``` |
48 | #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background" ] |
49 | pub struct Task<T, M = ()> { |
50 | /// A raw task pointer. |
51 | pub(crate) ptr: NonNull<()>, |
52 | |
53 | /// A marker capturing generic types `T` and `M`. |
54 | pub(crate) _marker: PhantomData<(T, M)>, |
55 | } |
56 | |
57 | unsafe impl<T: Send, M: Send + Sync> Send for Task<T, M> {} |
58 | unsafe impl<T, M: Send + Sync> Sync for Task<T, M> {} |
59 | |
60 | impl<T, M> Unpin for Task<T, M> {} |
61 | |
62 | #[cfg (feature = "std" )] |
63 | impl<T, M> std::panic::UnwindSafe for Task<T, M> {} |
64 | #[cfg (feature = "std" )] |
65 | impl<T, M> std::panic::RefUnwindSafe for Task<T, M> {} |
66 | |
67 | impl<T, M> Task<T, M> { |
68 | /// Detaches the task to let it keep running in the background. |
69 | /// |
70 | /// # Examples |
71 | /// |
72 | /// ``` |
73 | /// use smol::{Executor, Timer}; |
74 | /// use std::time::Duration; |
75 | /// |
76 | /// let ex = Executor::new(); |
77 | /// |
78 | /// // Spawn a deamon future. |
79 | /// ex.spawn(async { |
80 | /// loop { |
81 | /// println!("I'm a daemon task looping forever." ); |
82 | /// Timer::after(Duration::from_secs(1)).await; |
83 | /// } |
84 | /// }) |
85 | /// .detach(); |
86 | /// ``` |
87 | pub fn detach(self) { |
88 | let mut this = self; |
89 | let _out = this.set_detached(); |
90 | mem::forget(this); |
91 | } |
92 | |
93 | /// Cancels the task and waits for it to stop running. |
94 | /// |
95 | /// Returns the task's output if it was completed just before it got canceled, or [`None`] if |
96 | /// it didn't complete. |
97 | /// |
98 | /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of |
99 | /// canceling because it also waits for the task to stop running. |
100 | /// |
101 | /// # Examples |
102 | /// |
103 | /// ``` |
104 | /// # if cfg!(miri) { return; } // Miri does not support epoll |
105 | /// use smol::{future, Executor, Timer}; |
106 | /// use std::thread; |
107 | /// use std::time::Duration; |
108 | /// |
109 | /// let ex = Executor::new(); |
110 | /// |
111 | /// // Spawn a deamon future. |
112 | /// let task = ex.spawn(async { |
113 | /// loop { |
114 | /// println!("Even though I'm in an infinite loop, you can still cancel me!" ); |
115 | /// Timer::after(Duration::from_secs(1)).await; |
116 | /// } |
117 | /// }); |
118 | /// |
119 | /// // Run an executor thread. |
120 | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
121 | /// |
122 | /// future::block_on(async { |
123 | /// Timer::after(Duration::from_secs(3)).await; |
124 | /// task.cancel().await; |
125 | /// }); |
126 | /// ``` |
127 | pub async fn cancel(self) -> Option<T> { |
128 | let mut this = self; |
129 | this.set_canceled(); |
130 | this.fallible().await |
131 | } |
132 | |
133 | /// Converts this task into a [`FallibleTask`]. |
134 | /// |
135 | /// Like [`Task`], a fallible task will poll the task's output until it is |
136 | /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being |
137 | /// dropped without being run. Resolves to the task's output when completed, |
138 | /// or [`None`] if it didn't complete. |
139 | /// |
140 | /// # Examples |
141 | /// |
142 | /// ``` |
143 | /// use smol::{future, Executor}; |
144 | /// use std::thread; |
145 | /// |
146 | /// let ex = Executor::new(); |
147 | /// |
148 | /// // Spawn a future onto the executor. |
149 | /// let task = ex.spawn(async { |
150 | /// println!("Hello from a task!" ); |
151 | /// 1 + 2 |
152 | /// }) |
153 | /// .fallible(); |
154 | /// |
155 | /// // Run an executor thread. |
156 | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
157 | /// |
158 | /// // Wait for the task's output. |
159 | /// assert_eq!(future::block_on(task), Some(3)); |
160 | /// ``` |
161 | /// |
162 | /// ``` |
163 | /// use smol::future; |
164 | /// |
165 | /// // Schedule function which drops the runnable without running it. |
166 | /// let schedule = move |runnable| drop(runnable); |
167 | /// |
168 | /// // Create a task with the future and the schedule function. |
169 | /// let (runnable, task) = async_task::spawn(async { |
170 | /// println!("Hello from a task!" ); |
171 | /// 1 + 2 |
172 | /// }, schedule); |
173 | /// runnable.schedule(); |
174 | /// |
175 | /// // Wait for the task's output. |
176 | /// assert_eq!(future::block_on(task.fallible()), None); |
177 | /// ``` |
178 | pub fn fallible(self) -> FallibleTask<T, M> { |
179 | FallibleTask { task: self } |
180 | } |
181 | |
182 | /// Puts the task in canceled state. |
183 | fn set_canceled(&mut self) { |
184 | let ptr = self.ptr.as_ptr(); |
185 | let header = ptr as *const Header<M>; |
186 | |
187 | unsafe { |
188 | let mut state = (*header).state.load(Ordering::Acquire); |
189 | |
190 | loop { |
191 | // If the task has been completed or closed, it can't be canceled. |
192 | if state & (COMPLETED | CLOSED) != 0 { |
193 | break; |
194 | } |
195 | |
196 | // If the task is not scheduled nor running, we'll need to schedule it. |
197 | let new = if state & (SCHEDULED | RUNNING) == 0 { |
198 | (state | SCHEDULED | CLOSED) + REFERENCE |
199 | } else { |
200 | state | CLOSED |
201 | }; |
202 | |
203 | // Mark the task as closed. |
204 | match (*header).state.compare_exchange_weak( |
205 | state, |
206 | new, |
207 | Ordering::AcqRel, |
208 | Ordering::Acquire, |
209 | ) { |
210 | Ok(_) => { |
211 | // If the task is not scheduled nor running, schedule it one more time so |
212 | // that its future gets dropped by the executor. |
213 | if state & (SCHEDULED | RUNNING) == 0 { |
214 | ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); |
215 | } |
216 | |
217 | // Notify the awaiter that the task has been closed. |
218 | if state & AWAITER != 0 { |
219 | (*header).notify(None); |
220 | } |
221 | |
222 | break; |
223 | } |
224 | Err(s) => state = s, |
225 | } |
226 | } |
227 | } |
228 | } |
229 | |
230 | /// Puts the task in detached state. |
231 | fn set_detached(&mut self) -> Option<Result<T, Panic>> { |
232 | let ptr = self.ptr.as_ptr(); |
233 | let header = ptr as *const Header<M>; |
234 | |
235 | unsafe { |
236 | // A place where the output will be stored in case it needs to be dropped. |
237 | let mut output = None; |
238 | |
239 | // Optimistically assume the `Task` is being detached just after creating the task. |
240 | // This is a common case so if the `Task` is datached, the overhead of it is only one |
241 | // compare-exchange operation. |
242 | if let Err(mut state) = (*header).state.compare_exchange_weak( |
243 | SCHEDULED | TASK | REFERENCE, |
244 | SCHEDULED | REFERENCE, |
245 | Ordering::AcqRel, |
246 | Ordering::Acquire, |
247 | ) { |
248 | loop { |
249 | // If the task has been completed but not yet closed, that means its output |
250 | // must be dropped. |
251 | if state & COMPLETED != 0 && state & CLOSED == 0 { |
252 | // Mark the task as closed in order to grab its output. |
253 | match (*header).state.compare_exchange_weak( |
254 | state, |
255 | state | CLOSED, |
256 | Ordering::AcqRel, |
257 | Ordering::Acquire, |
258 | ) { |
259 | Ok(_) => { |
260 | // Read the output. |
261 | output = Some( |
262 | (((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>) |
263 | .read(), |
264 | ); |
265 | |
266 | // Update the state variable because we're continuing the loop. |
267 | state |= CLOSED; |
268 | } |
269 | Err(s) => state = s, |
270 | } |
271 | } else { |
272 | // If this is the last reference to the task and it's not closed, then |
273 | // close it and schedule one more time so that its future gets dropped by |
274 | // the executor. |
275 | let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { |
276 | SCHEDULED | CLOSED | REFERENCE |
277 | } else { |
278 | state & !TASK |
279 | }; |
280 | |
281 | // Unset the `TASK` flag. |
282 | match (*header).state.compare_exchange_weak( |
283 | state, |
284 | new, |
285 | Ordering::AcqRel, |
286 | Ordering::Acquire, |
287 | ) { |
288 | Ok(_) => { |
289 | // If this is the last reference to the task, we need to either |
290 | // schedule dropping its future or destroy it. |
291 | if state & !(REFERENCE - 1) == 0 { |
292 | if state & CLOSED == 0 { |
293 | ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); |
294 | } else { |
295 | ((*header).vtable.destroy)(ptr); |
296 | } |
297 | } |
298 | |
299 | break; |
300 | } |
301 | Err(s) => state = s, |
302 | } |
303 | } |
304 | } |
305 | } |
306 | |
307 | output |
308 | } |
309 | } |
310 | |
311 | /// Polls the task to retrieve its output. |
312 | /// |
313 | /// Returns `Some` if the task has completed or `None` if it was closed. |
314 | /// |
315 | /// A task becomes closed in the following cases: |
316 | /// |
317 | /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. |
318 | /// 2. Its output gets awaited by the `Task`. |
319 | /// 3. It panics while polling the future. |
320 | /// 4. It is completed and the `Task` gets dropped. |
321 | fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { |
322 | let ptr = self.ptr.as_ptr(); |
323 | let header = ptr as *const Header<M>; |
324 | |
325 | unsafe { |
326 | let mut state = (*header).state.load(Ordering::Acquire); |
327 | |
328 | loop { |
329 | // If the task has been closed, notify the awaiter and return `None`. |
330 | if state & CLOSED != 0 { |
331 | // If the task is scheduled or running, we need to wait until its future is |
332 | // dropped. |
333 | if state & (SCHEDULED | RUNNING) != 0 { |
334 | // Replace the waker with one associated with the current task. |
335 | (*header).register(cx.waker()); |
336 | |
337 | // Reload the state after registering. It is possible changes occurred just |
338 | // before registration so we need to check for that. |
339 | state = (*header).state.load(Ordering::Acquire); |
340 | |
341 | // If the task is still scheduled or running, we need to wait because its |
342 | // future is not dropped yet. |
343 | if state & (SCHEDULED | RUNNING) != 0 { |
344 | return Poll::Pending; |
345 | } |
346 | } |
347 | |
348 | // Even though the awaiter is most likely the current task, it could also be |
349 | // another task. |
350 | (*header).notify(Some(cx.waker())); |
351 | return Poll::Ready(None); |
352 | } |
353 | |
354 | // If the task is not completed, register the current task. |
355 | if state & COMPLETED == 0 { |
356 | // Replace the waker with one associated with the current task. |
357 | (*header).register(cx.waker()); |
358 | |
359 | // Reload the state after registering. It is possible that the task became |
360 | // completed or closed just before registration so we need to check for that. |
361 | state = (*header).state.load(Ordering::Acquire); |
362 | |
363 | // If the task has been closed, restart. |
364 | if state & CLOSED != 0 { |
365 | continue; |
366 | } |
367 | |
368 | // If the task is still not completed, we're blocked on it. |
369 | if state & COMPLETED == 0 { |
370 | return Poll::Pending; |
371 | } |
372 | } |
373 | |
374 | // Since the task is now completed, mark it as closed in order to grab its output. |
375 | match (*header).state.compare_exchange( |
376 | state, |
377 | state | CLOSED, |
378 | Ordering::AcqRel, |
379 | Ordering::Acquire, |
380 | ) { |
381 | Ok(_) => { |
382 | // Notify the awaiter. Even though the awaiter is most likely the current |
383 | // task, it could also be another task. |
384 | if state & AWAITER != 0 { |
385 | (*header).notify(Some(cx.waker())); |
386 | } |
387 | |
388 | // Take the output from the task. |
389 | let output = ((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>; |
390 | let output = output.read(); |
391 | |
392 | // Propagate the panic if the task panicked. |
393 | let output = match output { |
394 | Ok(output) => output, |
395 | Err(panic) => { |
396 | #[cfg (feature = "std" )] |
397 | std::panic::resume_unwind(panic); |
398 | |
399 | #[cfg (not(feature = "std" ))] |
400 | match panic {} |
401 | } |
402 | }; |
403 | |
404 | return Poll::Ready(Some(output)); |
405 | } |
406 | Err(s) => state = s, |
407 | } |
408 | } |
409 | } |
410 | } |
411 | |
412 | fn header(&self) -> &Header<M> { |
413 | let ptr = self.ptr.as_ptr(); |
414 | let header = ptr as *const Header<M>; |
415 | unsafe { &*header } |
416 | } |
417 | |
418 | /// Returns `true` if the current task is finished. |
419 | /// |
420 | /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. |
421 | pub fn is_finished(&self) -> bool { |
422 | let ptr = self.ptr.as_ptr(); |
423 | let header = ptr as *const Header<M>; |
424 | |
425 | unsafe { |
426 | let state = (*header).state.load(Ordering::Acquire); |
427 | state & (CLOSED | COMPLETED) != 0 |
428 | } |
429 | } |
430 | |
431 | /// Get the metadata associated with this task. |
432 | /// |
433 | /// Tasks can be created with a metadata object associated with them; by default, this |
434 | /// is a `()` value. See the [`Builder::metadata()`] method for more information. |
435 | pub fn metadata(&self) -> &M { |
436 | &self.header().metadata |
437 | } |
438 | } |
439 | |
440 | impl<T, M> Drop for Task<T, M> { |
441 | fn drop(&mut self) { |
442 | self.set_canceled(); |
443 | self.set_detached(); |
444 | } |
445 | } |
446 | |
447 | impl<T, M> Future for Task<T, M> { |
448 | type Output = T; |
449 | |
450 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
451 | match self.poll_task(cx) { |
452 | Poll::Ready(t: Option) => Poll::Ready(t.expect(msg:"task has failed" )), |
453 | Poll::Pending => Poll::Pending, |
454 | } |
455 | } |
456 | } |
457 | |
458 | impl<T, M: fmt::Debug> fmt::Debug for Task<T, M> { |
459 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
460 | f&mut DebugStruct<'_, '_>.debug_struct("Task" ) |
461 | .field(name:"header" , self.header()) |
462 | .finish() |
463 | } |
464 | } |
465 | |
466 | /// A spawned task with a fallible response. |
467 | /// |
468 | /// This type behaves like [`Task`], however it produces an `Option<T>` when |
469 | /// polled and will return `None` if the executor dropped its |
470 | /// [`Runnable`][`super::Runnable`] without being run. |
471 | /// |
472 | /// This can be useful to avoid the panic produced when polling the `Task` |
473 | /// future if the executor dropped its `Runnable`. |
474 | #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background" ] |
475 | pub struct FallibleTask<T, M = ()> { |
476 | task: Task<T, M>, |
477 | } |
478 | |
479 | impl<T, M> FallibleTask<T, M> { |
480 | /// Detaches the task to let it keep running in the background. |
481 | /// |
482 | /// # Examples |
483 | /// |
484 | /// ``` |
485 | /// use smol::{Executor, Timer}; |
486 | /// use std::time::Duration; |
487 | /// |
488 | /// let ex = Executor::new(); |
489 | /// |
490 | /// // Spawn a deamon future. |
491 | /// ex.spawn(async { |
492 | /// loop { |
493 | /// println!("I'm a daemon task looping forever." ); |
494 | /// Timer::after(Duration::from_secs(1)).await; |
495 | /// } |
496 | /// }) |
497 | /// .fallible() |
498 | /// .detach(); |
499 | /// ``` |
500 | pub fn detach(self) { |
501 | self.task.detach() |
502 | } |
503 | |
504 | /// Cancels the task and waits for it to stop running. |
505 | /// |
506 | /// Returns the task's output if it was completed just before it got canceled, or [`None`] if |
507 | /// it didn't complete. |
508 | /// |
509 | /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of |
510 | /// canceling because it also waits for the task to stop running. |
511 | /// |
512 | /// # Examples |
513 | /// |
514 | /// ``` |
515 | /// # if cfg!(miri) { return; } // Miri does not support epoll |
516 | /// use smol::{future, Executor, Timer}; |
517 | /// use std::thread; |
518 | /// use std::time::Duration; |
519 | /// |
520 | /// let ex = Executor::new(); |
521 | /// |
522 | /// // Spawn a deamon future. |
523 | /// let task = ex.spawn(async { |
524 | /// loop { |
525 | /// println!("Even though I'm in an infinite loop, you can still cancel me!" ); |
526 | /// Timer::after(Duration::from_secs(1)).await; |
527 | /// } |
528 | /// }) |
529 | /// .fallible(); |
530 | /// |
531 | /// // Run an executor thread. |
532 | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
533 | /// |
534 | /// future::block_on(async { |
535 | /// Timer::after(Duration::from_secs(3)).await; |
536 | /// task.cancel().await; |
537 | /// }); |
538 | /// ``` |
539 | pub async fn cancel(self) -> Option<T> { |
540 | self.task.cancel().await |
541 | } |
542 | |
543 | /// Returns `true` if the current task is finished. |
544 | /// |
545 | /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. |
546 | pub fn is_finished(&self) -> bool { |
547 | self.task.is_finished() |
548 | } |
549 | } |
550 | |
551 | impl<T, M> Future for FallibleTask<T, M> { |
552 | type Output = Option<T>; |
553 | |
554 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
555 | self.task.poll_task(cx) |
556 | } |
557 | } |
558 | |
559 | impl<T, M: fmt::Debug> fmt::Debug for FallibleTask<T, M> { |
560 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
561 | f&mut DebugStruct<'_, '_>.debug_struct("FallibleTask" ) |
562 | .field(name:"header" , self.task.header()) |
563 | .finish() |
564 | } |
565 | } |
566 | |