| 1 | //! A thread pool for isolating blocking I/O in async programs. |
| 2 | //! |
| 3 | //! Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async |
| 4 | //! support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible |
| 5 | //! solutions, they're not always available or ideal. |
| 6 | //! |
| 7 | //! Since blocking is not allowed inside futures, we must move blocking I/O onto a special thread |
| 8 | //! pool provided by this crate. The pool dynamically spawns and stops threads depending on the |
| 9 | //! current number of running I/O jobs. |
| 10 | //! |
| 11 | //! Note that there is a limit on the number of active threads. Once that limit is hit, a running |
| 12 | //! job has to finish before others get a chance to run. When a thread is idle, it waits for the |
| 13 | //! next job or shuts down after a certain timeout. |
| 14 | //! |
| 15 | //! The default number of threads (set to 500) can be altered by setting BLOCKING_MAX_THREADS environment |
| 16 | //! variable with value between 1 and 10000. |
| 17 | //! |
| 18 | //! [IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port |
| 19 | //! [AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html |
| 20 | //! [io_uring]: https://lwn.net/Articles/776703 |
| 21 | //! |
| 22 | //! # Examples |
| 23 | //! |
| 24 | //! Read the contents of a file: |
| 25 | //! |
| 26 | //! ```no_run |
| 27 | //! use blocking::unblock; |
| 28 | //! use std::fs; |
| 29 | //! |
| 30 | //! # futures_lite::future::block_on(async { |
| 31 | //! let contents = unblock(|| fs::read_to_string("file.txt" )).await?; |
| 32 | //! println!("{}" , contents); |
| 33 | //! # std::io::Result::Ok(()) }); |
| 34 | //! ``` |
| 35 | //! |
| 36 | //! Read a file and pipe its contents to stdout: |
| 37 | //! |
| 38 | //! ```no_run |
| 39 | //! use blocking::{unblock, Unblock}; |
| 40 | //! use futures_lite::io; |
| 41 | //! use std::fs::File; |
| 42 | //! |
| 43 | //! # futures_lite::future::block_on(async { |
| 44 | //! let input = unblock(|| File::open("file.txt" )).await?; |
| 45 | //! let input = Unblock::new(input); |
| 46 | //! let mut output = Unblock::new(std::io::stdout()); |
| 47 | //! |
| 48 | //! io::copy(input, &mut output).await?; |
| 49 | //! # std::io::Result::Ok(()) }); |
| 50 | //! ``` |
| 51 | //! |
| 52 | //! Iterate over the contents of a directory: |
| 53 | //! |
| 54 | //! ```no_run |
| 55 | //! use blocking::Unblock; |
| 56 | //! use futures_lite::prelude::*; |
| 57 | //! use std::fs; |
| 58 | //! |
| 59 | //! # futures_lite::future::block_on(async { |
| 60 | //! let mut dir = Unblock::new(fs::read_dir("." )?); |
| 61 | //! while let Some(item) = dir.next().await { |
| 62 | //! println!("{}" , item?.file_name().to_string_lossy()); |
| 63 | //! } |
| 64 | //! # std::io::Result::Ok(()) }); |
| 65 | //! ``` |
| 66 | //! |
| 67 | //! Spawn a process: |
| 68 | //! |
| 69 | //! ```no_run |
| 70 | //! use blocking::unblock; |
| 71 | //! use std::process::Command; |
| 72 | //! |
| 73 | //! # futures_lite::future::block_on(async { |
| 74 | //! let out = unblock(|| Command::new("dir" ).output()).await?; |
| 75 | //! # std::io::Result::Ok(()) }); |
| 76 | //! ``` |
| 77 | |
| 78 | #![warn (missing_docs, missing_debug_implementations, rust_2018_idioms)] |
| 79 | #![forbid (unsafe_code)] |
| 80 | #![doc ( |
| 81 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
| 82 | )] |
| 83 | #![doc ( |
| 84 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
| 85 | )] |
| 86 | |
| 87 | use std::any::Any; |
| 88 | use std::collections::VecDeque; |
| 89 | use std::fmt; |
| 90 | use std::io::{self, Read, Seek, SeekFrom, Write}; |
| 91 | use std::num::NonZeroUsize; |
| 92 | use std::panic; |
| 93 | use std::pin::Pin; |
| 94 | use std::sync::atomic::{AtomicUsize, Ordering}; |
| 95 | use std::sync::{Condvar, Mutex, MutexGuard}; |
| 96 | use std::task::{Context, Poll}; |
| 97 | use std::thread; |
| 98 | use std::time::Duration; |
| 99 | |
| 100 | #[cfg (not(target_family = "wasm" ))] |
| 101 | use std::env; |
| 102 | |
| 103 | use async_channel::{bounded, Receiver}; |
| 104 | use async_task::Runnable; |
| 105 | use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; |
| 106 | use futures_lite::{ |
| 107 | future::{self, Future}, |
| 108 | ready, |
| 109 | stream::Stream, |
| 110 | }; |
| 111 | use piper::{pipe, Reader, Writer}; |
| 112 | |
| 113 | #[doc (no_inline)] |
| 114 | pub use async_task::Task; |
| 115 | |
| 116 | /// Default value for max threads that Executor can grow to |
| 117 | #[cfg (not(target_family = "wasm" ))] |
| 118 | const DEFAULT_MAX_THREADS: NonZeroUsize = { |
| 119 | if let Some(size: NonZero) = NonZeroUsize::new(500) { |
| 120 | size |
| 121 | } else { |
| 122 | panic!("DEFAULT_MAX_THREADS is non-zero" ); |
| 123 | } |
| 124 | }; |
| 125 | |
| 126 | /// Minimum value for max threads config |
| 127 | #[cfg (not(target_family = "wasm" ))] |
| 128 | const MIN_MAX_THREADS: usize = 1; |
| 129 | |
| 130 | /// Maximum value for max threads config |
| 131 | #[cfg (not(target_family = "wasm" ))] |
| 132 | const MAX_MAX_THREADS: usize = 10000; |
| 133 | |
| 134 | /// Env variable that allows to override default value for max threads. |
| 135 | #[cfg (not(target_family = "wasm" ))] |
| 136 | const MAX_THREADS_ENV: &str = "BLOCKING_MAX_THREADS" ; |
| 137 | |
| 138 | /// The blocking executor. |
| 139 | struct Executor { |
| 140 | /// Inner state of the executor. |
| 141 | inner: Mutex<Inner>, |
| 142 | |
| 143 | /// Used to put idle threads to sleep and wake them up when new work comes in. |
| 144 | cvar: Condvar, |
| 145 | } |
| 146 | |
| 147 | /// Inner state of the blocking executor. |
| 148 | struct Inner { |
| 149 | /// Number of idle threads in the pool. |
| 150 | /// |
| 151 | /// Idle threads are sleeping, waiting to get a task to run. |
| 152 | idle_count: usize, |
| 153 | |
| 154 | /// Total number of threads in the pool. |
| 155 | /// |
| 156 | /// This is the number of idle threads + the number of active threads. |
| 157 | thread_count: usize, |
| 158 | |
| 159 | // TODO: The option is only used for const-initialization. This can be replaced with |
| 160 | // a normal VecDeque when the MSRV can be bumped passed |
| 161 | /// The queue of blocking tasks. |
| 162 | queue: Option<VecDeque<Runnable>>, |
| 163 | |
| 164 | /// Maximum number of threads in the pool |
| 165 | thread_limit: Option<NonZeroUsize>, |
| 166 | } |
| 167 | |
| 168 | impl Inner { |
| 169 | #[inline ] |
| 170 | fn queue(&mut self) -> &mut VecDeque<Runnable> { |
| 171 | self.queue.get_or_insert_with(VecDeque::new) |
| 172 | } |
| 173 | } |
| 174 | |
| 175 | impl Executor { |
| 176 | #[cfg (not(target_family = "wasm" ))] |
| 177 | fn max_threads() -> NonZeroUsize { |
| 178 | match env::var(MAX_THREADS_ENV) { |
| 179 | Ok(v) => v |
| 180 | .parse::<usize>() |
| 181 | .ok() |
| 182 | .and_then(|v| NonZeroUsize::new(v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS))) |
| 183 | .unwrap_or(DEFAULT_MAX_THREADS), |
| 184 | Err(_) => DEFAULT_MAX_THREADS, |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | #[cfg (target_family = "wasm" )] |
| 189 | fn max_threads() -> NonZeroUsize { |
| 190 | NonZeroUsize::new(1).unwrap() |
| 191 | } |
| 192 | |
| 193 | /// Get a reference to the global executor. |
| 194 | #[inline ] |
| 195 | fn get() -> &'static Self { |
| 196 | #[cfg (not(target_family = "wasm" ))] |
| 197 | { |
| 198 | static EXECUTOR: Executor = Executor { |
| 199 | inner: Mutex::new(Inner { |
| 200 | idle_count: 0, |
| 201 | thread_count: 0, |
| 202 | queue: None, |
| 203 | thread_limit: None, |
| 204 | }), |
| 205 | cvar: Condvar::new(), |
| 206 | }; |
| 207 | |
| 208 | &EXECUTOR |
| 209 | } |
| 210 | |
| 211 | #[cfg (target_family = "wasm" )] |
| 212 | panic!("cannot spawn a blocking task on WASM" ) |
| 213 | } |
| 214 | |
| 215 | /// Spawns a future onto this executor. |
| 216 | /// |
| 217 | /// Returns a [`Task`] handle for the spawned task. |
| 218 | fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> { |
| 219 | let (runnable, task) = async_task::Builder::new().propagate_panic(true).spawn( |
| 220 | move |()| future, |
| 221 | |r| { |
| 222 | // Initialize the executor if we haven't already. |
| 223 | let executor = Self::get(); |
| 224 | |
| 225 | // Schedule the task on our executor. |
| 226 | executor.schedule(r) |
| 227 | }, |
| 228 | ); |
| 229 | runnable.schedule(); |
| 230 | task |
| 231 | } |
| 232 | |
| 233 | /// Runs the main loop on the current thread. |
| 234 | /// |
| 235 | /// This function runs blocking tasks until it becomes idle and times out. |
| 236 | fn main_loop(&'static self) { |
| 237 | #[cfg (feature = "tracing" )] |
| 238 | let _span = tracing::trace_span!("blocking::main_loop" ).entered(); |
| 239 | |
| 240 | let mut inner = self.inner.lock().unwrap(); |
| 241 | loop { |
| 242 | // This thread is not idle anymore because it's going to run tasks. |
| 243 | inner.idle_count -= 1; |
| 244 | |
| 245 | // Run tasks in the queue. |
| 246 | while let Some(runnable) = inner.queue().pop_front() { |
| 247 | // We have found a task - grow the pool if needed. |
| 248 | self.grow_pool(inner); |
| 249 | |
| 250 | // Run the task. |
| 251 | panic::catch_unwind(|| runnable.run()).ok(); |
| 252 | |
| 253 | // Re-lock the inner state and continue. |
| 254 | inner = self.inner.lock().unwrap(); |
| 255 | } |
| 256 | |
| 257 | // This thread is now becoming idle. |
| 258 | inner.idle_count += 1; |
| 259 | |
| 260 | // Put the thread to sleep until another task is scheduled. |
| 261 | let timeout = Duration::from_millis(500); |
| 262 | #[cfg (feature = "tracing" )] |
| 263 | tracing::trace!(?timeout, "going to sleep" ); |
| 264 | let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap(); |
| 265 | inner = lock; |
| 266 | |
| 267 | // If there are no tasks after a while, stop this thread. |
| 268 | if res.timed_out() && inner.queue().is_empty() { |
| 269 | inner.idle_count -= 1; |
| 270 | inner.thread_count -= 1; |
| 271 | break; |
| 272 | } |
| 273 | |
| 274 | #[cfg (feature = "tracing" )] |
| 275 | tracing::trace!("notified" ); |
| 276 | } |
| 277 | |
| 278 | #[cfg (feature = "tracing" )] |
| 279 | tracing::trace!("shutting down due to lack of tasks" ); |
| 280 | } |
| 281 | |
| 282 | /// Schedules a runnable task for execution. |
| 283 | fn schedule(&'static self, runnable: Runnable) { |
| 284 | let mut inner = self.inner.lock().unwrap(); |
| 285 | inner.queue().push_back(runnable); |
| 286 | |
| 287 | // Notify a sleeping thread and spawn more threads if needed. |
| 288 | self.cvar.notify_one(); |
| 289 | self.grow_pool(inner); |
| 290 | } |
| 291 | |
| 292 | /// Spawns more blocking threads if the pool is overloaded with work. |
| 293 | fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) { |
| 294 | #[cfg (feature = "tracing" )] |
| 295 | let _span = tracing::trace_span!( |
| 296 | "grow_pool" , |
| 297 | queue_len = inner.queue().len(), |
| 298 | idle_count = inner.idle_count, |
| 299 | thread_count = inner.thread_count, |
| 300 | ) |
| 301 | .entered(); |
| 302 | |
| 303 | let thread_limit = inner |
| 304 | .thread_limit |
| 305 | .get_or_insert_with(Self::max_threads) |
| 306 | .get(); |
| 307 | |
| 308 | // If runnable tasks greatly outnumber idle threads and there aren't too many threads |
| 309 | // already, then be aggressive: wake all idle threads and spawn one more thread. |
| 310 | while inner.queue().len() > inner.idle_count * 5 && inner.thread_count < thread_limit { |
| 311 | #[cfg (feature = "tracing" )] |
| 312 | tracing::trace!("spawning a new thread to handle blocking tasks" ); |
| 313 | |
| 314 | // The new thread starts in idle state. |
| 315 | inner.idle_count += 1; |
| 316 | inner.thread_count += 1; |
| 317 | |
| 318 | // Notify all existing idle threads because we need to hurry up. |
| 319 | self.cvar.notify_all(); |
| 320 | |
| 321 | // Generate a new thread ID. |
| 322 | static ID: AtomicUsize = AtomicUsize::new(1); |
| 323 | let id = ID.fetch_add(1, Ordering::Relaxed); |
| 324 | |
| 325 | // Spawn the new thread. |
| 326 | if let Err(_e) = thread::Builder::new() |
| 327 | .name(format!("blocking- {}" , id)) |
| 328 | .spawn(move || self.main_loop()) |
| 329 | { |
| 330 | // We were unable to spawn the thread, so we need to undo the state changes. |
| 331 | #[cfg (feature = "tracing" )] |
| 332 | tracing::error!("failed to spawn a blocking thread: {}" , _e); |
| 333 | inner.idle_count -= 1; |
| 334 | inner.thread_count -= 1; |
| 335 | |
| 336 | // The current number of threads is likely to be the system's upper limit, so update |
| 337 | // thread_limit accordingly. |
| 338 | inner.thread_limit = { |
| 339 | let new_limit = inner.thread_count; |
| 340 | |
| 341 | // If the limit is about to be set to zero, set it to one instead so that if, |
| 342 | // in the future, we are able to spawn more threads, we will be able to do so. |
| 343 | Some(NonZeroUsize::new(new_limit).unwrap_or_else(|| { |
| 344 | #[cfg (feature = "tracing" )] |
| 345 | tracing::warn!( |
| 346 | "attempted to lower thread_limit to zero; setting to one instead" |
| 347 | ); |
| 348 | NonZeroUsize::new(1).unwrap() |
| 349 | })) |
| 350 | }; |
| 351 | } |
| 352 | } |
| 353 | } |
| 354 | } |
| 355 | |
| 356 | /// Runs blocking code on a thread pool. |
| 357 | /// |
| 358 | /// # Examples |
| 359 | /// |
| 360 | /// Read the contents of a file: |
| 361 | /// |
| 362 | /// ```no_run |
| 363 | /// use blocking::unblock; |
| 364 | /// use std::fs; |
| 365 | /// |
| 366 | /// # futures_lite::future::block_on(async { |
| 367 | /// let contents = unblock(|| fs::read_to_string("file.txt" )).await?; |
| 368 | /// # std::io::Result::Ok(()) }); |
| 369 | /// ``` |
| 370 | /// |
| 371 | /// Spawn a process: |
| 372 | /// |
| 373 | /// ```no_run |
| 374 | /// use blocking::unblock; |
| 375 | /// use std::process::Command; |
| 376 | /// |
| 377 | /// # futures_lite::future::block_on(async { |
| 378 | /// let out = unblock(|| Command::new("dir" ).output()).await?; |
| 379 | /// # std::io::Result::Ok(()) }); |
| 380 | /// ``` |
| 381 | pub fn unblock<T, F>(f: F) -> Task<T> |
| 382 | where |
| 383 | F: FnOnce() -> T + Send + 'static, |
| 384 | T: Send + 'static, |
| 385 | { |
| 386 | Executor::spawn(future:async move { f() }) |
| 387 | } |
| 388 | |
| 389 | /// Runs blocking I/O on a thread pool. |
| 390 | /// |
| 391 | /// Blocking I/O must be isolated from async code. This type moves blocking I/O operations onto a |
| 392 | /// special thread pool while exposing a familiar async interface. |
| 393 | /// |
| 394 | /// This type implements traits [`Stream`], [`AsyncRead`], [`AsyncWrite`], or [`AsyncSeek`] if the |
| 395 | /// inner type implements [`Iterator`], [`Read`], [`Write`], or [`Seek`], respectively. |
| 396 | /// |
| 397 | /// # Caveats |
| 398 | /// |
| 399 | /// [`Unblock`] is a low-level primitive, and as such it comes with some caveats. |
| 400 | /// |
| 401 | /// For higher-level primitives built on top of [`Unblock`], look into [`async-fs`] or |
| 402 | /// [`async-process`] (on Windows). |
| 403 | /// |
| 404 | /// [`async-fs`]: https://github.com/smol-rs/async-fs |
| 405 | /// [`async-process`]: https://github.com/smol-rs/async-process |
| 406 | /// |
| 407 | /// [`Unblock`] communicates with I/O operations on the thread pool through a pipe. That means an |
| 408 | /// async read/write operation simply receives/sends some bytes from/into the pipe. When in reading |
| 409 | /// mode, the thread pool reads bytes from the I/O handle and forwards them into the pipe until it |
| 410 | /// becomes full. When in writing mode, the thread pool reads bytes from the pipe and forwards them |
| 411 | /// into the I/O handle. |
| 412 | /// |
| 413 | /// Use [`Unblock::with_capacity()`] to configure the capacity of the pipe. |
| 414 | /// |
| 415 | /// ### Reading |
| 416 | /// |
| 417 | /// If you create an [`Unblock`]`<`[`Stdin`][`std::io::Stdin`]`>`, read some bytes from it, |
| 418 | /// and then drop it, a blocked read operation may keep hanging on the thread pool. The next |
| 419 | /// attempt to read from stdin will lose bytes read by the hanging operation. This is a difficult |
| 420 | /// problem to solve, so make sure you only use a single stdin handle for the duration of the |
| 421 | /// entire program. |
| 422 | /// |
| 423 | /// ### Writing |
| 424 | /// |
| 425 | /// If writing data through the [`AsyncWrite`] trait, make sure to flush before dropping the |
| 426 | /// [`Unblock`] handle or some buffered data might get lost. |
| 427 | /// |
| 428 | /// ### Seeking |
| 429 | /// |
| 430 | /// Because of buffering in the pipe, if [`Unblock`] wraps a [`File`][`std::fs::File`], a single |
| 431 | /// read operation may move the file cursor farther than is the span of the operation. In fact, |
| 432 | /// reading just keeps going in the background until the pipe gets full. Keep this mind when |
| 433 | /// using [`AsyncSeek`] with [relative][`SeekFrom::Current`] offsets. |
| 434 | /// |
| 435 | /// # Examples |
| 436 | /// |
| 437 | /// ``` |
| 438 | /// use blocking::Unblock; |
| 439 | /// use futures_lite::prelude::*; |
| 440 | /// |
| 441 | /// # futures_lite::future::block_on(async { |
| 442 | /// let mut stdout = Unblock::new(std::io::stdout()); |
| 443 | /// stdout.write_all(b"Hello world!" ).await?; |
| 444 | /// stdout.flush().await?; |
| 445 | /// # std::io::Result::Ok(()) }); |
| 446 | /// ``` |
| 447 | pub struct Unblock<T> { |
| 448 | state: State<T>, |
| 449 | cap: Option<usize>, |
| 450 | } |
| 451 | |
| 452 | impl<T> Unblock<T> { |
| 453 | /// Wraps a blocking I/O handle into the async [`Unblock`] interface. |
| 454 | /// |
| 455 | /// # Examples |
| 456 | /// |
| 457 | /// ```no_run |
| 458 | /// use blocking::Unblock; |
| 459 | /// |
| 460 | /// let stdin = Unblock::new(std::io::stdin()); |
| 461 | /// ``` |
| 462 | pub fn new(io: T) -> Unblock<T> { |
| 463 | Unblock { |
| 464 | state: State::Idle(Some(Box::new(io))), |
| 465 | cap: None, |
| 466 | } |
| 467 | } |
| 468 | |
| 469 | /// Wraps a blocking I/O handle into the async [`Unblock`] interface with a custom buffer |
| 470 | /// capacity. |
| 471 | /// |
| 472 | /// When communicating with the inner [`Stream`]/[`Read`]/[`Write`] type from async code, data |
| 473 | /// transferred between blocking and async code goes through a buffer of limited capacity. This |
| 474 | /// constructor configures that capacity. |
| 475 | /// |
| 476 | /// The default capacity is: |
| 477 | /// |
| 478 | /// * For [`Iterator`] types: 8192 items. |
| 479 | /// * For [`Read`]/[`Write`] types: 8 MB. |
| 480 | /// |
| 481 | /// # Examples |
| 482 | /// |
| 483 | /// ```no_run |
| 484 | /// use blocking::Unblock; |
| 485 | /// |
| 486 | /// let stdout = Unblock::with_capacity(64 * 1024, std::io::stdout()); |
| 487 | /// ``` |
| 488 | pub fn with_capacity(cap: usize, io: T) -> Unblock<T> { |
| 489 | Unblock { |
| 490 | state: State::Idle(Some(Box::new(io))), |
| 491 | cap: Some(cap), |
| 492 | } |
| 493 | } |
| 494 | |
| 495 | /// Gets a mutable reference to the blocking I/O handle. |
| 496 | /// |
| 497 | /// This is an async method because the I/O handle might be on the thread pool and needs to |
| 498 | /// be moved onto the current thread before we can get a reference to it. |
| 499 | /// |
| 500 | /// # Examples |
| 501 | /// |
| 502 | /// ```no_run |
| 503 | /// use blocking::{unblock, Unblock}; |
| 504 | /// use std::fs::File; |
| 505 | /// |
| 506 | /// # futures_lite::future::block_on(async { |
| 507 | /// let file = unblock(|| File::create("file.txt" )).await?; |
| 508 | /// let mut file = Unblock::new(file); |
| 509 | /// |
| 510 | /// let metadata = file.get_mut().await.metadata()?; |
| 511 | /// # std::io::Result::Ok(()) }); |
| 512 | /// ``` |
| 513 | pub async fn get_mut(&mut self) -> &mut T { |
| 514 | // Wait for the running task to stop and ignore I/O errors if there are any. |
| 515 | future::poll_fn(|cx| self.poll_stop(cx)).await.ok(); |
| 516 | |
| 517 | // Assume idle state and get a reference to the inner value. |
| 518 | match &mut self.state { |
| 519 | State::Idle(t) => t.as_mut().expect("inner value was taken out" ), |
| 520 | State::WithMut(..) |
| 521 | | State::Streaming(..) |
| 522 | | State::Reading(..) |
| 523 | | State::Writing(..) |
| 524 | | State::Seeking(..) => { |
| 525 | unreachable!("when stopped, the state machine must be in idle state" ); |
| 526 | } |
| 527 | } |
| 528 | } |
| 529 | |
| 530 | /// Performs a blocking operation on the I/O handle. |
| 531 | /// |
| 532 | /// # Examples |
| 533 | /// |
| 534 | /// ```no_run |
| 535 | /// use blocking::{unblock, Unblock}; |
| 536 | /// use std::fs::File; |
| 537 | /// |
| 538 | /// # futures_lite::future::block_on(async { |
| 539 | /// let file = unblock(|| File::create("file.txt" )).await?; |
| 540 | /// let mut file = Unblock::new(file); |
| 541 | /// |
| 542 | /// let metadata = file.with_mut(|f| f.metadata()).await?; |
| 543 | /// # std::io::Result::Ok(()) }); |
| 544 | /// ``` |
| 545 | pub async fn with_mut<R, F>(&mut self, op: F) -> R |
| 546 | where |
| 547 | F: FnOnce(&mut T) -> R + Send + 'static, |
| 548 | R: Send + 'static, |
| 549 | T: Send + 'static, |
| 550 | { |
| 551 | // Wait for the running task to stop and ignore I/O errors if there are any. |
| 552 | future::poll_fn(|cx| self.poll_stop(cx)).await.ok(); |
| 553 | |
| 554 | // Assume idle state and take out the inner value. |
| 555 | let mut t = match &mut self.state { |
| 556 | State::Idle(t) => t.take().expect("inner value was taken out" ), |
| 557 | State::WithMut(..) |
| 558 | | State::Streaming(..) |
| 559 | | State::Reading(..) |
| 560 | | State::Writing(..) |
| 561 | | State::Seeking(..) => { |
| 562 | unreachable!("when stopped, the state machine must be in idle state" ); |
| 563 | } |
| 564 | }; |
| 565 | |
| 566 | let (sender, receiver) = bounded(1); |
| 567 | let task = Executor::spawn(async move { |
| 568 | sender.try_send(op(&mut t)).ok(); |
| 569 | t |
| 570 | }); |
| 571 | self.state = State::WithMut(task); |
| 572 | |
| 573 | receiver |
| 574 | .recv() |
| 575 | .await |
| 576 | .expect("`Unblock::with_mut()` operation has panicked" ) |
| 577 | } |
| 578 | |
| 579 | /// Extracts the inner blocking I/O handle. |
| 580 | /// |
| 581 | /// This is an async method because the I/O handle might be on the thread pool and needs to |
| 582 | /// be moved onto the current thread before we can extract it. |
| 583 | /// |
| 584 | /// # Examples |
| 585 | /// |
| 586 | /// ```no_run |
| 587 | /// use blocking::{unblock, Unblock}; |
| 588 | /// use futures_lite::prelude::*; |
| 589 | /// use std::fs::File; |
| 590 | /// |
| 591 | /// # futures_lite::future::block_on(async { |
| 592 | /// let file = unblock(|| File::create("file.txt" )).await?; |
| 593 | /// let file = Unblock::new(file); |
| 594 | /// |
| 595 | /// let file = file.into_inner().await; |
| 596 | /// # std::io::Result::Ok(()) }); |
| 597 | /// ``` |
| 598 | pub async fn into_inner(self) -> T { |
| 599 | // There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just |
| 600 | // bind `self` to a local mutable variable. |
| 601 | let mut this = self; |
| 602 | |
| 603 | // Wait for the running task to stop and ignore I/O errors if there are any. |
| 604 | future::poll_fn(|cx| this.poll_stop(cx)).await.ok(); |
| 605 | |
| 606 | // Assume idle state and extract the inner value. |
| 607 | match &mut this.state { |
| 608 | State::Idle(t) => *t.take().expect("inner value was taken out" ), |
| 609 | State::WithMut(..) |
| 610 | | State::Streaming(..) |
| 611 | | State::Reading(..) |
| 612 | | State::Writing(..) |
| 613 | | State::Seeking(..) => { |
| 614 | unreachable!("when stopped, the state machine must be in idle state" ); |
| 615 | } |
| 616 | } |
| 617 | } |
| 618 | |
| 619 | /// Waits for the running task to stop. |
| 620 | /// |
| 621 | /// On success, the state machine is moved into the idle state. |
| 622 | fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 623 | loop { |
| 624 | match &mut self.state { |
| 625 | State::Idle(_) => return Poll::Ready(Ok(())), |
| 626 | |
| 627 | State::WithMut(task) => { |
| 628 | // Poll the task to wait for it to finish. |
| 629 | let io = ready!(Pin::new(task).poll(cx)); |
| 630 | self.state = State::Idle(Some(io)); |
| 631 | } |
| 632 | |
| 633 | State::Streaming(any, task) => { |
| 634 | // Drop the receiver to close the channel. This stops the `send()` operation in |
| 635 | // the task, after which the task returns the iterator back. |
| 636 | any.take(); |
| 637 | |
| 638 | // Poll the task to retrieve the iterator. |
| 639 | let iter = ready!(Pin::new(task).poll(cx)); |
| 640 | self.state = State::Idle(Some(iter)); |
| 641 | } |
| 642 | |
| 643 | State::Reading(reader, task) => { |
| 644 | // Drop the reader to close the pipe. This stops copying inside the task, after |
| 645 | // which the task returns the I/O handle back. |
| 646 | reader.take(); |
| 647 | |
| 648 | // Poll the task to retrieve the I/O handle. |
| 649 | let (res, io) = ready!(Pin::new(task).poll(cx)); |
| 650 | // Make sure to move into the idle state before reporting errors. |
| 651 | self.state = State::Idle(Some(io)); |
| 652 | res?; |
| 653 | } |
| 654 | |
| 655 | State::Writing(writer, task) => { |
| 656 | // Drop the writer to close the pipe. This stops copying inside the task, after |
| 657 | // which the task flushes the I/O handle and |
| 658 | writer.take(); |
| 659 | |
| 660 | // Poll the task to retrieve the I/O handle. |
| 661 | let (res, io) = ready!(Pin::new(task).poll(cx)); |
| 662 | // Make sure to move into the idle state before reporting errors. |
| 663 | self.state = State::Idle(Some(io)); |
| 664 | res?; |
| 665 | } |
| 666 | |
| 667 | State::Seeking(task) => { |
| 668 | // Poll the task to wait for it to finish. |
| 669 | let (_, res, io) = ready!(Pin::new(task).poll(cx)); |
| 670 | // Make sure to move into the idle state before reporting errors. |
| 671 | self.state = State::Idle(Some(io)); |
| 672 | res?; |
| 673 | } |
| 674 | } |
| 675 | } |
| 676 | } |
| 677 | } |
| 678 | |
| 679 | impl<T: fmt::Debug> fmt::Debug for Unblock<T> { |
| 680 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 681 | struct Closed; |
| 682 | impl fmt::Debug for Closed { |
| 683 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 684 | f.write_str("<closed>" ) |
| 685 | } |
| 686 | } |
| 687 | |
| 688 | struct Blocked; |
| 689 | impl fmt::Debug for Blocked { |
| 690 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 691 | f.write_str("<blocked>" ) |
| 692 | } |
| 693 | } |
| 694 | |
| 695 | match &self.state { |
| 696 | State::Idle(None) => f.debug_struct("Unblock" ).field("io" , &Closed).finish(), |
| 697 | State::Idle(Some(io)) => { |
| 698 | let io: &T = io; |
| 699 | f.debug_struct("Unblock" ).field("io" , io).finish() |
| 700 | } |
| 701 | State::WithMut(..) |
| 702 | | State::Streaming(..) |
| 703 | | State::Reading(..) |
| 704 | | State::Writing(..) |
| 705 | | State::Seeking(..) => f.debug_struct("Unblock" ).field("io" , &Blocked).finish(), |
| 706 | } |
| 707 | } |
| 708 | } |
| 709 | |
| 710 | /// Current state of a blocking task. |
| 711 | enum State<T> { |
| 712 | /// There is no blocking task. |
| 713 | /// |
| 714 | /// The inner value is readily available, unless it has already been extracted. The value is |
| 715 | /// extracted out by [`Unblock::into_inner()`], [`AsyncWrite::poll_close()`], or by awaiting |
| 716 | /// [`Unblock`]. |
| 717 | Idle(Option<Box<T>>), |
| 718 | |
| 719 | /// A [`Unblock::with_mut()`] closure was spawned and is still running. |
| 720 | WithMut(Task<Box<T>>), |
| 721 | |
| 722 | /// The inner value is an [`Iterator`] currently iterating in a task. |
| 723 | /// |
| 724 | /// The `dyn Any` value here is a `Pin<Box<Receiver<<T as Iterator>::Item>>>`. |
| 725 | Streaming(Option<Box<dyn Any + Send + Sync>>, Task<Box<T>>), |
| 726 | |
| 727 | /// The inner value is a [`Read`] currently reading in a task. |
| 728 | Reading(Option<Reader>, Task<(io::Result<()>, Box<T>)>), |
| 729 | |
| 730 | /// The inner value is a [`Write`] currently writing in a task. |
| 731 | Writing(Option<Writer>, Task<(io::Result<()>, Box<T>)>), |
| 732 | |
| 733 | /// The inner value is a [`Seek`] currently seeking in a task. |
| 734 | Seeking(Task<(SeekFrom, io::Result<u64>, Box<T>)>), |
| 735 | } |
| 736 | |
| 737 | impl<T: Iterator + Send + 'static> Stream for Unblock<T> |
| 738 | where |
| 739 | T::Item: Send + 'static, |
| 740 | { |
| 741 | type Item = T::Item; |
| 742 | |
| 743 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { |
| 744 | loop { |
| 745 | match &mut self.state { |
| 746 | // If not in idle or active streaming state, stop the running task. |
| 747 | State::WithMut(..) |
| 748 | | State::Streaming(None, _) |
| 749 | | State::Reading(..) |
| 750 | | State::Writing(..) |
| 751 | | State::Seeking(..) => { |
| 752 | // Wait for the running task to stop. |
| 753 | ready!(self.poll_stop(cx)).ok(); |
| 754 | } |
| 755 | |
| 756 | // If idle, start a streaming task. |
| 757 | State::Idle(iter) => { |
| 758 | // Take the iterator out to run it on a blocking task. |
| 759 | let mut iter = iter.take().expect("inner iterator was taken out" ); |
| 760 | |
| 761 | // This channel capacity seems to work well in practice. If it's too low, there |
| 762 | // will be too much synchronization between tasks. If too high, memory |
| 763 | // consumption increases. |
| 764 | let (sender, receiver) = bounded(self.cap.unwrap_or(8 * 1024)); // 8192 items |
| 765 | |
| 766 | // Spawn a blocking task that runs the iterator and returns it when done. |
| 767 | let task = Executor::spawn(async move { |
| 768 | for item in &mut iter { |
| 769 | if sender.send(item).await.is_err() { |
| 770 | break; |
| 771 | } |
| 772 | } |
| 773 | iter |
| 774 | }); |
| 775 | |
| 776 | // Move into the busy state and poll again. |
| 777 | self.state = State::Streaming(Some(Box::new(Box::pin(receiver))), task); |
| 778 | } |
| 779 | |
| 780 | // If streaming, receive an item. |
| 781 | State::Streaming(Some(any), task) => { |
| 782 | let receiver = any.downcast_mut::<Pin<Box<Receiver<T::Item>>>>().unwrap(); |
| 783 | |
| 784 | // Poll the channel. |
| 785 | let opt = ready!(receiver.as_mut().poll_next(cx)); |
| 786 | |
| 787 | // If the channel is closed, retrieve the iterator back from the blocking task. |
| 788 | // This is not really a required step, but it's cleaner to drop the iterator on |
| 789 | // the same thread that created it. |
| 790 | if opt.is_none() { |
| 791 | // Poll the task to retrieve the iterator. |
| 792 | let iter = ready!(Pin::new(task).poll(cx)); |
| 793 | self.state = State::Idle(Some(iter)); |
| 794 | } |
| 795 | |
| 796 | return Poll::Ready(opt); |
| 797 | } |
| 798 | } |
| 799 | } |
| 800 | } |
| 801 | } |
| 802 | |
| 803 | impl<T: Read + Send + 'static> AsyncRead for Unblock<T> { |
| 804 | fn poll_read( |
| 805 | mut self: Pin<&mut Self>, |
| 806 | cx: &mut Context<'_>, |
| 807 | buf: &mut [u8], |
| 808 | ) -> Poll<io::Result<usize>> { |
| 809 | loop { |
| 810 | match &mut self.state { |
| 811 | // If not in idle or active reading state, stop the running task. |
| 812 | State::WithMut(..) |
| 813 | | State::Reading(None, _) |
| 814 | | State::Streaming(..) |
| 815 | | State::Writing(..) |
| 816 | | State::Seeking(..) => { |
| 817 | // Wait for the running task to stop. |
| 818 | ready!(self.poll_stop(cx))?; |
| 819 | } |
| 820 | |
| 821 | // If idle, start a reading task. |
| 822 | State::Idle(io) => { |
| 823 | // Take the I/O handle out to read it on a blocking task. |
| 824 | let mut io = io.take().expect("inner value was taken out" ); |
| 825 | |
| 826 | // This pipe capacity seems to work well in practice. If it's too low, there |
| 827 | // will be too much synchronization between tasks. If too high, memory |
| 828 | // consumption increases. |
| 829 | let (reader, mut writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB |
| 830 | |
| 831 | // Spawn a blocking task that reads and returns the I/O handle when done. |
| 832 | let task = Executor::spawn(async move { |
| 833 | // Copy bytes from the I/O handle into the pipe until the pipe is closed or |
| 834 | // an error occurs. |
| 835 | loop { |
| 836 | match future::poll_fn(|cx| writer.poll_fill(cx, &mut io)).await { |
| 837 | Ok(0) => return (Ok(()), io), |
| 838 | Ok(_) => {} |
| 839 | Err(err) => return (Err(err), io), |
| 840 | } |
| 841 | } |
| 842 | }); |
| 843 | |
| 844 | // Move into the busy state and poll again. |
| 845 | self.state = State::Reading(Some(reader), task); |
| 846 | } |
| 847 | |
| 848 | // If reading, read bytes from the pipe. |
| 849 | State::Reading(Some(reader), task) => { |
| 850 | // Poll the pipe. |
| 851 | let n = ready!(reader.poll_drain(cx, buf))?; |
| 852 | |
| 853 | // If the pipe is closed, retrieve the I/O handle back from the blocking task. |
| 854 | // This is not really a required step, but it's cleaner to drop the handle on |
| 855 | // the same thread that created it. |
| 856 | if n == 0 { |
| 857 | // Poll the task to retrieve the I/O handle. |
| 858 | let (res, io) = ready!(Pin::new(task).poll(cx)); |
| 859 | // Make sure to move into the idle state before reporting errors. |
| 860 | self.state = State::Idle(Some(io)); |
| 861 | res?; |
| 862 | } |
| 863 | |
| 864 | return Poll::Ready(Ok(n)); |
| 865 | } |
| 866 | } |
| 867 | } |
| 868 | } |
| 869 | } |
| 870 | |
| 871 | impl<T: Write + Send + 'static> AsyncWrite for Unblock<T> { |
| 872 | fn poll_write( |
| 873 | mut self: Pin<&mut Self>, |
| 874 | cx: &mut Context<'_>, |
| 875 | buf: &[u8], |
| 876 | ) -> Poll<io::Result<usize>> { |
| 877 | loop { |
| 878 | match &mut self.state { |
| 879 | // If not in idle or active writing state, stop the running task. |
| 880 | State::WithMut(..) |
| 881 | | State::Writing(None, _) |
| 882 | | State::Streaming(..) |
| 883 | | State::Reading(..) |
| 884 | | State::Seeking(..) => { |
| 885 | // Wait for the running task to stop. |
| 886 | ready!(self.poll_stop(cx))?; |
| 887 | } |
| 888 | |
| 889 | // If idle, start the writing task. |
| 890 | State::Idle(io) => { |
| 891 | // Take the I/O handle out to write on a blocking task. |
| 892 | let mut io = io.take().expect("inner value was taken out" ); |
| 893 | |
| 894 | // This pipe capacity seems to work well in practice. If it's too low, there will |
| 895 | // be too much synchronization between tasks. If too high, memory consumption |
| 896 | // increases. |
| 897 | let (mut reader, writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB |
| 898 | |
| 899 | // Spawn a blocking task that writes and returns the I/O handle when done. |
| 900 | let task = Executor::spawn(async move { |
| 901 | // Copy bytes from the pipe into the I/O handle until the pipe is closed or an |
| 902 | // error occurs. Flush the I/O handle at the end. |
| 903 | loop { |
| 904 | match future::poll_fn(|cx| reader.poll_drain(cx, &mut io)).await { |
| 905 | Ok(0) => return (io.flush(), io), |
| 906 | Ok(_) => {} |
| 907 | Err(err) => { |
| 908 | io.flush().ok(); |
| 909 | return (Err(err), io); |
| 910 | } |
| 911 | } |
| 912 | } |
| 913 | }); |
| 914 | |
| 915 | // Move into the busy state and poll again. |
| 916 | self.state = State::Writing(Some(writer), task); |
| 917 | } |
| 918 | |
| 919 | // If writing, write more bytes into the pipe. |
| 920 | State::Writing(Some(writer), _) => return writer.poll_fill(cx, buf), |
| 921 | } |
| 922 | } |
| 923 | } |
| 924 | |
| 925 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 926 | loop { |
| 927 | match &mut self.state { |
| 928 | // If not in idle state, stop the running task. |
| 929 | State::WithMut(..) |
| 930 | | State::Streaming(..) |
| 931 | | State::Writing(..) |
| 932 | | State::Reading(..) |
| 933 | | State::Seeking(..) => { |
| 934 | // Wait for the running task to stop. |
| 935 | ready!(self.poll_stop(cx))?; |
| 936 | } |
| 937 | |
| 938 | // Idle implies flushed. |
| 939 | State::Idle(_) => return Poll::Ready(Ok(())), |
| 940 | } |
| 941 | } |
| 942 | } |
| 943 | |
| 944 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 945 | // First, make sure the I/O handle is flushed. |
| 946 | ready!(Pin::new(&mut self).poll_flush(cx))?; |
| 947 | |
| 948 | // Then move into the idle state with no I/O handle, thus dropping it. |
| 949 | self.state = State::Idle(None); |
| 950 | Poll::Ready(Ok(())) |
| 951 | } |
| 952 | } |
| 953 | |
| 954 | impl<T: Seek + Send + 'static> AsyncSeek for Unblock<T> { |
| 955 | fn poll_seek( |
| 956 | mut self: Pin<&mut Self>, |
| 957 | cx: &mut Context<'_>, |
| 958 | pos: SeekFrom, |
| 959 | ) -> Poll<io::Result<u64>> { |
| 960 | loop { |
| 961 | match &mut self.state { |
| 962 | // If not in idle state, stop the running task. |
| 963 | State::WithMut(..) |
| 964 | | State::Streaming(..) |
| 965 | | State::Reading(..) |
| 966 | | State::Writing(..) => { |
| 967 | // Wait for the running task to stop. |
| 968 | ready!(self.poll_stop(cx))?; |
| 969 | } |
| 970 | |
| 971 | State::Idle(io) => { |
| 972 | // Take the I/O handle out to seek on a blocking task. |
| 973 | let mut io = io.take().expect("inner value was taken out" ); |
| 974 | |
| 975 | let task = Executor::spawn(async move { |
| 976 | let res = io.seek(pos); |
| 977 | (pos, res, io) |
| 978 | }); |
| 979 | self.state = State::Seeking(task); |
| 980 | } |
| 981 | |
| 982 | State::Seeking(task) => { |
| 983 | // Poll the task to wait for it to finish. |
| 984 | let (original_pos, res, io) = ready!(Pin::new(task).poll(cx)); |
| 985 | // Make sure to move into the idle state before reporting errors. |
| 986 | self.state = State::Idle(Some(io)); |
| 987 | let current = res?; |
| 988 | |
| 989 | // If the `pos` argument matches the original one, return the result. |
| 990 | if original_pos == pos { |
| 991 | return Poll::Ready(Ok(current)); |
| 992 | } |
| 993 | } |
| 994 | } |
| 995 | } |
| 996 | } |
| 997 | } |
| 998 | |
| 999 | #[cfg (all(test, not(target_family = "wasm" )))] |
| 1000 | mod tests { |
| 1001 | use super::*; |
| 1002 | |
| 1003 | #[test ] |
| 1004 | fn test_max_threads() { |
| 1005 | // properly set env var |
| 1006 | env::set_var(MAX_THREADS_ENV, "100" ); |
| 1007 | assert_eq!(100, Executor::max_threads().get()); |
| 1008 | |
| 1009 | // passed value below minimum, so we set it to minimum |
| 1010 | env::set_var(MAX_THREADS_ENV, "0" ); |
| 1011 | assert_eq!(1, Executor::max_threads().get()); |
| 1012 | |
| 1013 | // passed value above maximum, so we set to allowed maximum |
| 1014 | env::set_var(MAX_THREADS_ENV, "50000" ); |
| 1015 | assert_eq!(10000, Executor::max_threads().get()); |
| 1016 | |
| 1017 | // no env var, use default |
| 1018 | env::set_var(MAX_THREADS_ENV, "" ); |
| 1019 | assert_eq!(500, Executor::max_threads().get()); |
| 1020 | |
| 1021 | // not a number, use default |
| 1022 | env::set_var(MAX_THREADS_ENV, "NOTINT" ); |
| 1023 | assert_eq!(500, Executor::max_threads().get()); |
| 1024 | } |
| 1025 | } |
| 1026 | |