| 1 | use core::fmt; |
| 2 | use core::future::Future; |
| 3 | use core::marker::PhantomData; |
| 4 | use core::mem; |
| 5 | use core::pin::Pin; |
| 6 | use core::ptr::NonNull; |
| 7 | use core::sync::atomic::Ordering; |
| 8 | use core::task::{Context, Poll}; |
| 9 | |
| 10 | use crate::header::Header; |
| 11 | use crate::raw::Panic; |
| 12 | use crate::runnable::ScheduleInfo; |
| 13 | use crate::state::*; |
| 14 | |
| 15 | /// A spawned task. |
| 16 | /// |
| 17 | /// A [`Task`] can be awaited to retrieve the output of its future. |
| 18 | /// |
| 19 | /// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the |
| 20 | /// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a |
| 21 | /// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()] |
| 22 | /// method. |
| 23 | /// |
| 24 | /// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor |
| 25 | /// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking |
| 26 | /// [`run()`][`super::Runnable::run()`]. |
| 27 | /// |
| 28 | /// # Examples |
| 29 | /// |
| 30 | /// ``` |
| 31 | /// use smol::{future, Executor}; |
| 32 | /// use std::thread; |
| 33 | /// |
| 34 | /// let ex = Executor::new(); |
| 35 | /// |
| 36 | /// // Spawn a future onto the executor. |
| 37 | /// let task = ex.spawn(async { |
| 38 | /// println!("Hello from a task!" ); |
| 39 | /// 1 + 2 |
| 40 | /// }); |
| 41 | /// |
| 42 | /// // Run an executor thread. |
| 43 | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
| 44 | /// |
| 45 | /// // Wait for the task's output. |
| 46 | /// assert_eq!(future::block_on(task), 3); |
| 47 | /// ``` |
| 48 | #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background" ] |
| 49 | pub struct Task<T, M = ()> { |
| 50 | /// A raw task pointer. |
| 51 | pub(crate) ptr: NonNull<()>, |
| 52 | |
| 53 | /// A marker capturing generic types `T` and `M`. |
| 54 | pub(crate) _marker: PhantomData<(T, M)>, |
| 55 | } |
| 56 | |
| 57 | unsafe impl<T: Send, M: Send + Sync> Send for Task<T, M> {} |
| 58 | unsafe impl<T, M: Send + Sync> Sync for Task<T, M> {} |
| 59 | |
| 60 | impl<T, M> Unpin for Task<T, M> {} |
| 61 | |
| 62 | #[cfg (feature = "std" )] |
| 63 | impl<T, M> std::panic::UnwindSafe for Task<T, M> {} |
| 64 | #[cfg (feature = "std" )] |
| 65 | impl<T, M> std::panic::RefUnwindSafe for Task<T, M> {} |
| 66 | |
| 67 | impl<T, M> Task<T, M> { |
| 68 | /// Detaches the task to let it keep running in the background. |
| 69 | /// |
| 70 | /// # Examples |
| 71 | /// |
| 72 | /// ``` |
| 73 | /// use smol::{Executor, Timer}; |
| 74 | /// use std::time::Duration; |
| 75 | /// |
| 76 | /// let ex = Executor::new(); |
| 77 | /// |
| 78 | /// // Spawn a deamon future. |
| 79 | /// ex.spawn(async { |
| 80 | /// loop { |
| 81 | /// println!("I'm a daemon task looping forever." ); |
| 82 | /// Timer::after(Duration::from_secs(1)).await; |
| 83 | /// } |
| 84 | /// }) |
| 85 | /// .detach(); |
| 86 | /// ``` |
| 87 | pub fn detach(self) { |
| 88 | let mut this = self; |
| 89 | let _out = this.set_detached(); |
| 90 | mem::forget(this); |
| 91 | } |
| 92 | |
| 93 | /// Cancels the task and waits for it to stop running. |
| 94 | /// |
| 95 | /// Returns the task's output if it was completed just before it got canceled, or [`None`] if |
| 96 | /// it didn't complete. |
| 97 | /// |
| 98 | /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of |
| 99 | /// canceling because it also waits for the task to stop running. |
| 100 | /// |
| 101 | /// # Examples |
| 102 | /// |
| 103 | /// ``` |
| 104 | /// # if cfg!(miri) { return; } // Miri does not support epoll |
| 105 | /// use smol::{future, Executor, Timer}; |
| 106 | /// use std::thread; |
| 107 | /// use std::time::Duration; |
| 108 | /// |
| 109 | /// let ex = Executor::new(); |
| 110 | /// |
| 111 | /// // Spawn a deamon future. |
| 112 | /// let task = ex.spawn(async { |
| 113 | /// loop { |
| 114 | /// println!("Even though I'm in an infinite loop, you can still cancel me!" ); |
| 115 | /// Timer::after(Duration::from_secs(1)).await; |
| 116 | /// } |
| 117 | /// }); |
| 118 | /// |
| 119 | /// // Run an executor thread. |
| 120 | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
| 121 | /// |
| 122 | /// future::block_on(async { |
| 123 | /// Timer::after(Duration::from_secs(3)).await; |
| 124 | /// task.cancel().await; |
| 125 | /// }); |
| 126 | /// ``` |
| 127 | pub async fn cancel(self) -> Option<T> { |
| 128 | let mut this = self; |
| 129 | this.set_canceled(); |
| 130 | this.fallible().await |
| 131 | } |
| 132 | |
| 133 | /// Converts this task into a [`FallibleTask`]. |
| 134 | /// |
| 135 | /// Like [`Task`], a fallible task will poll the task's output until it is |
| 136 | /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being |
| 137 | /// dropped without being run. Resolves to the task's output when completed, |
| 138 | /// or [`None`] if it didn't complete. |
| 139 | /// |
| 140 | /// # Examples |
| 141 | /// |
| 142 | /// ``` |
| 143 | /// use smol::{future, Executor}; |
| 144 | /// use std::thread; |
| 145 | /// |
| 146 | /// let ex = Executor::new(); |
| 147 | /// |
| 148 | /// // Spawn a future onto the executor. |
| 149 | /// let task = ex.spawn(async { |
| 150 | /// println!("Hello from a task!" ); |
| 151 | /// 1 + 2 |
| 152 | /// }) |
| 153 | /// .fallible(); |
| 154 | /// |
| 155 | /// // Run an executor thread. |
| 156 | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
| 157 | /// |
| 158 | /// // Wait for the task's output. |
| 159 | /// assert_eq!(future::block_on(task), Some(3)); |
| 160 | /// ``` |
| 161 | /// |
| 162 | /// ``` |
| 163 | /// use smol::future; |
| 164 | /// |
| 165 | /// // Schedule function which drops the runnable without running it. |
| 166 | /// let schedule = move |runnable| drop(runnable); |
| 167 | /// |
| 168 | /// // Create a task with the future and the schedule function. |
| 169 | /// let (runnable, task) = async_task::spawn(async { |
| 170 | /// println!("Hello from a task!" ); |
| 171 | /// 1 + 2 |
| 172 | /// }, schedule); |
| 173 | /// runnable.schedule(); |
| 174 | /// |
| 175 | /// // Wait for the task's output. |
| 176 | /// assert_eq!(future::block_on(task.fallible()), None); |
| 177 | /// ``` |
| 178 | pub fn fallible(self) -> FallibleTask<T, M> { |
| 179 | FallibleTask { task: self } |
| 180 | } |
| 181 | |
| 182 | /// Puts the task in canceled state. |
| 183 | fn set_canceled(&mut self) { |
| 184 | let ptr = self.ptr.as_ptr(); |
| 185 | let header = ptr as *const Header<M>; |
| 186 | |
| 187 | unsafe { |
| 188 | let mut state = (*header).state.load(Ordering::Acquire); |
| 189 | |
| 190 | loop { |
| 191 | // If the task has been completed or closed, it can't be canceled. |
| 192 | if state & (COMPLETED | CLOSED) != 0 { |
| 193 | break; |
| 194 | } |
| 195 | |
| 196 | // If the task is not scheduled nor running, we'll need to schedule it. |
| 197 | let new = if state & (SCHEDULED | RUNNING) == 0 { |
| 198 | (state | SCHEDULED | CLOSED) + REFERENCE |
| 199 | } else { |
| 200 | state | CLOSED |
| 201 | }; |
| 202 | |
| 203 | // Mark the task as closed. |
| 204 | match (*header).state.compare_exchange_weak( |
| 205 | state, |
| 206 | new, |
| 207 | Ordering::AcqRel, |
| 208 | Ordering::Acquire, |
| 209 | ) { |
| 210 | Ok(_) => { |
| 211 | // If the task is not scheduled nor running, schedule it one more time so |
| 212 | // that its future gets dropped by the executor. |
| 213 | if state & (SCHEDULED | RUNNING) == 0 { |
| 214 | ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); |
| 215 | } |
| 216 | |
| 217 | // Notify the awaiter that the task has been closed. |
| 218 | if state & AWAITER != 0 { |
| 219 | (*header).notify(None); |
| 220 | } |
| 221 | |
| 222 | break; |
| 223 | } |
| 224 | Err(s) => state = s, |
| 225 | } |
| 226 | } |
| 227 | } |
| 228 | } |
| 229 | |
| 230 | /// Puts the task in detached state. |
| 231 | fn set_detached(&mut self) -> Option<Result<T, Panic>> { |
| 232 | let ptr = self.ptr.as_ptr(); |
| 233 | let header = ptr as *const Header<M>; |
| 234 | |
| 235 | unsafe { |
| 236 | // A place where the output will be stored in case it needs to be dropped. |
| 237 | let mut output = None; |
| 238 | |
| 239 | // Optimistically assume the `Task` is being detached just after creating the task. |
| 240 | // This is a common case so if the `Task` is datached, the overhead of it is only one |
| 241 | // compare-exchange operation. |
| 242 | if let Err(mut state) = (*header).state.compare_exchange_weak( |
| 243 | SCHEDULED | TASK | REFERENCE, |
| 244 | SCHEDULED | REFERENCE, |
| 245 | Ordering::AcqRel, |
| 246 | Ordering::Acquire, |
| 247 | ) { |
| 248 | loop { |
| 249 | // If the task has been completed but not yet closed, that means its output |
| 250 | // must be dropped. |
| 251 | if state & COMPLETED != 0 && state & CLOSED == 0 { |
| 252 | // Mark the task as closed in order to grab its output. |
| 253 | match (*header).state.compare_exchange_weak( |
| 254 | state, |
| 255 | state | CLOSED, |
| 256 | Ordering::AcqRel, |
| 257 | Ordering::Acquire, |
| 258 | ) { |
| 259 | Ok(_) => { |
| 260 | // Read the output. |
| 261 | output = Some( |
| 262 | (((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>) |
| 263 | .read(), |
| 264 | ); |
| 265 | |
| 266 | // Update the state variable because we're continuing the loop. |
| 267 | state |= CLOSED; |
| 268 | } |
| 269 | Err(s) => state = s, |
| 270 | } |
| 271 | } else { |
| 272 | // If this is the last reference to the task and it's not closed, then |
| 273 | // close it and schedule one more time so that its future gets dropped by |
| 274 | // the executor. |
| 275 | let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { |
| 276 | SCHEDULED | CLOSED | REFERENCE |
| 277 | } else { |
| 278 | state & !TASK |
| 279 | }; |
| 280 | |
| 281 | // Unset the `TASK` flag. |
| 282 | match (*header).state.compare_exchange_weak( |
| 283 | state, |
| 284 | new, |
| 285 | Ordering::AcqRel, |
| 286 | Ordering::Acquire, |
| 287 | ) { |
| 288 | Ok(_) => { |
| 289 | // If this is the last reference to the task, we need to either |
| 290 | // schedule dropping its future or destroy it. |
| 291 | if state & !(REFERENCE - 1) == 0 { |
| 292 | if state & CLOSED == 0 { |
| 293 | ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); |
| 294 | } else { |
| 295 | ((*header).vtable.destroy)(ptr); |
| 296 | } |
| 297 | } |
| 298 | |
| 299 | break; |
| 300 | } |
| 301 | Err(s) => state = s, |
| 302 | } |
| 303 | } |
| 304 | } |
| 305 | } |
| 306 | |
| 307 | output |
| 308 | } |
| 309 | } |
| 310 | |
| 311 | /// Polls the task to retrieve its output. |
| 312 | /// |
| 313 | /// Returns `Some` if the task has completed or `None` if it was closed. |
| 314 | /// |
| 315 | /// A task becomes closed in the following cases: |
| 316 | /// |
| 317 | /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. |
| 318 | /// 2. Its output gets awaited by the `Task`. |
| 319 | /// 3. It panics while polling the future. |
| 320 | /// 4. It is completed and the `Task` gets dropped. |
| 321 | fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { |
| 322 | let ptr = self.ptr.as_ptr(); |
| 323 | let header = ptr as *const Header<M>; |
| 324 | |
| 325 | unsafe { |
| 326 | let mut state = (*header).state.load(Ordering::Acquire); |
| 327 | |
| 328 | loop { |
| 329 | // If the task has been closed, notify the awaiter and return `None`. |
| 330 | if state & CLOSED != 0 { |
| 331 | // If the task is scheduled or running, we need to wait until its future is |
| 332 | // dropped. |
| 333 | if state & (SCHEDULED | RUNNING) != 0 { |
| 334 | // Replace the waker with one associated with the current task. |
| 335 | (*header).register(cx.waker()); |
| 336 | |
| 337 | // Reload the state after registering. It is possible changes occurred just |
| 338 | // before registration so we need to check for that. |
| 339 | state = (*header).state.load(Ordering::Acquire); |
| 340 | |
| 341 | // If the task is still scheduled or running, we need to wait because its |
| 342 | // future is not dropped yet. |
| 343 | if state & (SCHEDULED | RUNNING) != 0 { |
| 344 | return Poll::Pending; |
| 345 | } |
| 346 | } |
| 347 | |
| 348 | // Even though the awaiter is most likely the current task, it could also be |
| 349 | // another task. |
| 350 | (*header).notify(Some(cx.waker())); |
| 351 | return Poll::Ready(None); |
| 352 | } |
| 353 | |
| 354 | // If the task is not completed, register the current task. |
| 355 | if state & COMPLETED == 0 { |
| 356 | // Replace the waker with one associated with the current task. |
| 357 | (*header).register(cx.waker()); |
| 358 | |
| 359 | // Reload the state after registering. It is possible that the task became |
| 360 | // completed or closed just before registration so we need to check for that. |
| 361 | state = (*header).state.load(Ordering::Acquire); |
| 362 | |
| 363 | // If the task has been closed, restart. |
| 364 | if state & CLOSED != 0 { |
| 365 | continue; |
| 366 | } |
| 367 | |
| 368 | // If the task is still not completed, we're blocked on it. |
| 369 | if state & COMPLETED == 0 { |
| 370 | return Poll::Pending; |
| 371 | } |
| 372 | } |
| 373 | |
| 374 | // Since the task is now completed, mark it as closed in order to grab its output. |
| 375 | match (*header).state.compare_exchange( |
| 376 | state, |
| 377 | state | CLOSED, |
| 378 | Ordering::AcqRel, |
| 379 | Ordering::Acquire, |
| 380 | ) { |
| 381 | Ok(_) => { |
| 382 | // Notify the awaiter. Even though the awaiter is most likely the current |
| 383 | // task, it could also be another task. |
| 384 | if state & AWAITER != 0 { |
| 385 | (*header).notify(Some(cx.waker())); |
| 386 | } |
| 387 | |
| 388 | // Take the output from the task. |
| 389 | let output = ((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>; |
| 390 | let output = output.read(); |
| 391 | |
| 392 | // Propagate the panic if the task panicked. |
| 393 | let output = match output { |
| 394 | Ok(output) => output, |
| 395 | Err(panic) => { |
| 396 | #[cfg (feature = "std" )] |
| 397 | std::panic::resume_unwind(panic); |
| 398 | |
| 399 | #[cfg (not(feature = "std" ))] |
| 400 | match panic {} |
| 401 | } |
| 402 | }; |
| 403 | |
| 404 | return Poll::Ready(Some(output)); |
| 405 | } |
| 406 | Err(s) => state = s, |
| 407 | } |
| 408 | } |
| 409 | } |
| 410 | } |
| 411 | |
| 412 | fn header(&self) -> &Header<M> { |
| 413 | let ptr = self.ptr.as_ptr(); |
| 414 | let header = ptr as *const Header<M>; |
| 415 | unsafe { &*header } |
| 416 | } |
| 417 | |
| 418 | /// Returns `true` if the current task is finished. |
| 419 | /// |
| 420 | /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. |
| 421 | pub fn is_finished(&self) -> bool { |
| 422 | let ptr = self.ptr.as_ptr(); |
| 423 | let header = ptr as *const Header<M>; |
| 424 | |
| 425 | unsafe { |
| 426 | let state = (*header).state.load(Ordering::Acquire); |
| 427 | state & (CLOSED | COMPLETED) != 0 |
| 428 | } |
| 429 | } |
| 430 | |
| 431 | /// Get the metadata associated with this task. |
| 432 | /// |
| 433 | /// Tasks can be created with a metadata object associated with them; by default, this |
| 434 | /// is a `()` value. See the [`Builder::metadata()`] method for more information. |
| 435 | pub fn metadata(&self) -> &M { |
| 436 | &self.header().metadata |
| 437 | } |
| 438 | } |
| 439 | |
| 440 | impl<T, M> Drop for Task<T, M> { |
| 441 | fn drop(&mut self) { |
| 442 | self.set_canceled(); |
| 443 | self.set_detached(); |
| 444 | } |
| 445 | } |
| 446 | |
| 447 | impl<T, M> Future for Task<T, M> { |
| 448 | type Output = T; |
| 449 | |
| 450 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 451 | match self.poll_task(cx) { |
| 452 | Poll::Ready(t: Option) => Poll::Ready(t.expect(msg:"Task polled after completion" )), |
| 453 | Poll::Pending => Poll::Pending, |
| 454 | } |
| 455 | } |
| 456 | } |
| 457 | |
| 458 | impl<T, M: fmt::Debug> fmt::Debug for Task<T, M> { |
| 459 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 460 | f&mut DebugStruct<'_, '_>.debug_struct("Task" ) |
| 461 | .field(name:"header" , self.header()) |
| 462 | .finish() |
| 463 | } |
| 464 | } |
| 465 | |
| 466 | /// A spawned task with a fallible response. |
| 467 | /// |
| 468 | /// This type behaves like [`Task`], however it produces an `Option<T>` when |
| 469 | /// polled and will return `None` if the executor dropped its |
| 470 | /// [`Runnable`][`super::Runnable`] without being run. |
| 471 | /// |
| 472 | /// This can be useful to avoid the panic produced when polling the `Task` |
| 473 | /// future if the executor dropped its `Runnable`. |
| 474 | #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background" ] |
| 475 | pub struct FallibleTask<T, M = ()> { |
| 476 | task: Task<T, M>, |
| 477 | } |
| 478 | |
| 479 | impl<T, M> FallibleTask<T, M> { |
| 480 | /// Detaches the task to let it keep running in the background. |
| 481 | /// |
| 482 | /// # Examples |
| 483 | /// |
| 484 | /// ``` |
| 485 | /// use smol::{Executor, Timer}; |
| 486 | /// use std::time::Duration; |
| 487 | /// |
| 488 | /// let ex = Executor::new(); |
| 489 | /// |
| 490 | /// // Spawn a deamon future. |
| 491 | /// ex.spawn(async { |
| 492 | /// loop { |
| 493 | /// println!("I'm a daemon task looping forever." ); |
| 494 | /// Timer::after(Duration::from_secs(1)).await; |
| 495 | /// } |
| 496 | /// }) |
| 497 | /// .fallible() |
| 498 | /// .detach(); |
| 499 | /// ``` |
| 500 | pub fn detach(self) { |
| 501 | self.task.detach() |
| 502 | } |
| 503 | |
| 504 | /// Cancels the task and waits for it to stop running. |
| 505 | /// |
| 506 | /// Returns the task's output if it was completed just before it got canceled, or [`None`] if |
| 507 | /// it didn't complete. |
| 508 | /// |
| 509 | /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of |
| 510 | /// canceling because it also waits for the task to stop running. |
| 511 | /// |
| 512 | /// # Examples |
| 513 | /// |
| 514 | /// ``` |
| 515 | /// # if cfg!(miri) { return; } // Miri does not support epoll |
| 516 | /// use smol::{future, Executor, Timer}; |
| 517 | /// use std::thread; |
| 518 | /// use std::time::Duration; |
| 519 | /// |
| 520 | /// let ex = Executor::new(); |
| 521 | /// |
| 522 | /// // Spawn a deamon future. |
| 523 | /// let task = ex.spawn(async { |
| 524 | /// loop { |
| 525 | /// println!("Even though I'm in an infinite loop, you can still cancel me!" ); |
| 526 | /// Timer::after(Duration::from_secs(1)).await; |
| 527 | /// } |
| 528 | /// }) |
| 529 | /// .fallible(); |
| 530 | /// |
| 531 | /// // Run an executor thread. |
| 532 | /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); |
| 533 | /// |
| 534 | /// future::block_on(async { |
| 535 | /// Timer::after(Duration::from_secs(3)).await; |
| 536 | /// task.cancel().await; |
| 537 | /// }); |
| 538 | /// ``` |
| 539 | pub async fn cancel(self) -> Option<T> { |
| 540 | self.task.cancel().await |
| 541 | } |
| 542 | |
| 543 | /// Returns `true` if the current task is finished. |
| 544 | /// |
| 545 | /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. |
| 546 | pub fn is_finished(&self) -> bool { |
| 547 | self.task.is_finished() |
| 548 | } |
| 549 | } |
| 550 | |
| 551 | impl<T, M> Future for FallibleTask<T, M> { |
| 552 | type Output = Option<T>; |
| 553 | |
| 554 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 555 | self.task.poll_task(cx) |
| 556 | } |
| 557 | } |
| 558 | |
| 559 | impl<T, M: fmt::Debug> fmt::Debug for FallibleTask<T, M> { |
| 560 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 561 | f&mut DebugStruct<'_, '_>.debug_struct("FallibleTask" ) |
| 562 | .field(name:"header" , self.task.header()) |
| 563 | .finish() |
| 564 | } |
| 565 | } |
| 566 | |