1//! A thread pool for isolating blocking I/O in async programs.
2//!
3//! Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async
4//! support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible
5//! solutions, they're not always available or ideal.
6//!
7//! Since blocking is not allowed inside futures, we must move blocking I/O onto a special thread
8//! pool provided by this crate. The pool dynamically spawns and stops threads depending on the
9//! current number of running I/O jobs.
10//!
11//! Note that there is a limit on the number of active threads. Once that limit is hit, a running
12//! job has to finish before others get a chance to run. When a thread is idle, it waits for the
13//! next job or shuts down after a certain timeout.
14//!
15//! The default number of threads (set to 500) can be altered by setting BLOCKING_MAX_THREADS environment
16//! variable with value between 1 and 10000.
17//!
18//! [IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port
19//! [AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html
20//! [io_uring]: https://lwn.net/Articles/776703
21//!
22//! # Examples
23//!
24//! Read the contents of a file:
25//!
26//! ```no_run
27//! use blocking::unblock;
28//! use std::fs;
29//!
30//! # futures_lite::future::block_on(async {
31//! let contents = unblock(|| fs::read_to_string("file.txt")).await?;
32//! println!("{}", contents);
33//! # std::io::Result::Ok(()) });
34//! ```
35//!
36//! Read a file and pipe its contents to stdout:
37//!
38//! ```no_run
39//! use blocking::{unblock, Unblock};
40//! use futures_lite::io;
41//! use std::fs::File;
42//!
43//! # futures_lite::future::block_on(async {
44//! let input = unblock(|| File::open("file.txt")).await?;
45//! let input = Unblock::new(input);
46//! let mut output = Unblock::new(std::io::stdout());
47//!
48//! io::copy(input, &mut output).await?;
49//! # std::io::Result::Ok(()) });
50//! ```
51//!
52//! Iterate over the contents of a directory:
53//!
54//! ```no_run
55//! use blocking::Unblock;
56//! use futures_lite::prelude::*;
57//! use std::fs;
58//!
59//! # futures_lite::future::block_on(async {
60//! let mut dir = Unblock::new(fs::read_dir(".")?);
61//! while let Some(item) = dir.next().await {
62//! println!("{}", item?.file_name().to_string_lossy());
63//! }
64//! # std::io::Result::Ok(()) });
65//! ```
66//!
67//! Spawn a process:
68//!
69//! ```no_run
70//! use blocking::unblock;
71//! use std::process::Command;
72//!
73//! # futures_lite::future::block_on(async {
74//! let out = unblock(|| Command::new("dir").output()).await?;
75//! # std::io::Result::Ok(()) });
76//! ```
77
78#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
79#![forbid(unsafe_code)]
80#![doc(
81 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
82)]
83#![doc(
84 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
85)]
86
87use std::any::Any;
88use std::collections::VecDeque;
89use std::fmt;
90use std::io::{self, Read, Seek, SeekFrom, Write};
91use std::num::NonZeroUsize;
92use std::panic;
93use std::pin::Pin;
94use std::sync::atomic::{AtomicUsize, Ordering};
95use std::sync::{Condvar, Mutex, MutexGuard};
96use std::task::{Context, Poll};
97use std::thread;
98use std::time::Duration;
99
100#[cfg(not(target_family = "wasm"))]
101use std::env;
102
103use async_channel::{bounded, Receiver};
104use async_task::Runnable;
105use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
106use futures_lite::{future, prelude::*, ready};
107use piper::{pipe, Reader, Writer};
108
109#[doc(no_inline)]
110pub use async_task::Task;
111
112/// Default value for max threads that Executor can grow to
113#[cfg(not(target_family = "wasm"))]
114const DEFAULT_MAX_THREADS: usize = 500;
115
116/// Minimum value for max threads config
117#[cfg(not(target_family = "wasm"))]
118const MIN_MAX_THREADS: usize = 1;
119
120/// Maximum value for max threads config
121#[cfg(not(target_family = "wasm"))]
122const MAX_MAX_THREADS: usize = 10000;
123
124/// Env variable that allows to override default value for max threads.
125#[cfg(not(target_family = "wasm"))]
126const MAX_THREADS_ENV: &str = "BLOCKING_MAX_THREADS";
127
128/// The blocking executor.
129struct Executor {
130 /// Inner state of the executor.
131 inner: Mutex<Inner>,
132
133 /// Used to put idle threads to sleep and wake them up when new work comes in.
134 cvar: Condvar,
135}
136
137/// Inner state of the blocking executor.
138struct Inner {
139 /// Number of idle threads in the pool.
140 ///
141 /// Idle threads are sleeping, waiting to get a task to run.
142 idle_count: usize,
143
144 /// Total number of threads in the pool.
145 ///
146 /// This is the number of idle threads + the number of active threads.
147 thread_count: usize,
148
149 /// The queue of blocking tasks.
150 queue: VecDeque<Runnable>,
151
152 /// Maximum number of threads in the pool
153 thread_limit: NonZeroUsize,
154}
155
156impl Executor {
157 #[cfg(not(target_family = "wasm"))]
158 fn max_threads() -> usize {
159 match env::var(MAX_THREADS_ENV) {
160 Ok(v) => v
161 .parse::<usize>()
162 .map(|v| v.max(MIN_MAX_THREADS).min(MAX_MAX_THREADS))
163 .unwrap_or(DEFAULT_MAX_THREADS),
164 Err(_) => DEFAULT_MAX_THREADS,
165 }
166 }
167
168 /// Get a reference to the global executor.
169 #[inline]
170 fn get() -> &'static Self {
171 #[cfg(not(target_family = "wasm"))]
172 {
173 use async_lock::OnceCell;
174
175 static EXECUTOR: OnceCell<Executor> = OnceCell::new();
176
177 return EXECUTOR.get_or_init_blocking(|| {
178 let thread_limit = Self::max_threads();
179 Executor {
180 inner: Mutex::new(Inner {
181 idle_count: 0,
182 thread_count: 0,
183 queue: VecDeque::new(),
184 thread_limit: NonZeroUsize::new(thread_limit).unwrap(),
185 }),
186 cvar: Condvar::new(),
187 }
188 });
189 }
190
191 #[cfg(target_family = "wasm")]
192 panic!("cannot spawn a blocking task on WASM")
193 }
194
195 /// Spawns a future onto this executor.
196 ///
197 /// Returns a [`Task`] handle for the spawned task.
198 fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
199 let (runnable, task) = async_task::spawn(future, |r| {
200 // Initialize the executor if we haven't already.
201 let executor = Self::get();
202
203 // Schedule the task on our executor.
204 executor.schedule(r)
205 });
206 runnable.schedule();
207 task
208 }
209
210 /// Runs the main loop on the current thread.
211 ///
212 /// This function runs blocking tasks until it becomes idle and times out.
213 fn main_loop(&'static self) {
214 let span = tracing::trace_span!("blocking::main_loop");
215 let _enter = span.enter();
216
217 let mut inner = self.inner.lock().unwrap();
218 loop {
219 // This thread is not idle anymore because it's going to run tasks.
220 inner.idle_count -= 1;
221
222 // Run tasks in the queue.
223 while let Some(runnable) = inner.queue.pop_front() {
224 // We have found a task - grow the pool if needed.
225 self.grow_pool(inner);
226
227 // Run the task.
228 panic::catch_unwind(|| runnable.run()).ok();
229
230 // Re-lock the inner state and continue.
231 inner = self.inner.lock().unwrap();
232 }
233
234 // This thread is now becoming idle.
235 inner.idle_count += 1;
236
237 // Put the thread to sleep until another task is scheduled.
238 let timeout = Duration::from_millis(500);
239 tracing::trace!(?timeout, "going to sleep");
240 let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap();
241 inner = lock;
242
243 // If there are no tasks after a while, stop this thread.
244 if res.timed_out() && inner.queue.is_empty() {
245 inner.idle_count -= 1;
246 inner.thread_count -= 1;
247 break;
248 }
249
250 tracing::trace!("notified");
251 }
252
253 tracing::trace!("shutting down due to lack of tasks");
254 }
255
256 /// Schedules a runnable task for execution.
257 fn schedule(&'static self, runnable: Runnable) {
258 let mut inner = self.inner.lock().unwrap();
259 inner.queue.push_back(runnable);
260
261 // Notify a sleeping thread and spawn more threads if needed.
262 self.cvar.notify_one();
263 self.grow_pool(inner);
264 }
265
266 /// Spawns more blocking threads if the pool is overloaded with work.
267 fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
268 let span = tracing::trace_span!(
269 "grow_pool",
270 queue_len = inner.queue.len(),
271 idle_count = inner.idle_count,
272 thread_count = inner.thread_count,
273 );
274 let _enter = span.enter();
275
276 // If runnable tasks greatly outnumber idle threads and there aren't too many threads
277 // already, then be aggressive: wake all idle threads and spawn one more thread.
278 while inner.queue.len() > inner.idle_count * 5
279 && inner.thread_count < inner.thread_limit.get()
280 {
281 tracing::trace!("spawning a new thread to handle blocking tasks");
282
283 // The new thread starts in idle state.
284 inner.idle_count += 1;
285 inner.thread_count += 1;
286
287 // Notify all existing idle threads because we need to hurry up.
288 self.cvar.notify_all();
289
290 // Generate a new thread ID.
291 static ID: AtomicUsize = AtomicUsize::new(1);
292 let id = ID.fetch_add(1, Ordering::Relaxed);
293
294 // Spawn the new thread.
295 if let Err(e) = thread::Builder::new()
296 .name(format!("blocking-{}", id))
297 .spawn(move || self.main_loop())
298 {
299 // We were unable to spawn the thread, so we need to undo the state changes.
300 tracing::error!("failed to spawn a blocking thread: {}", e);
301 inner.idle_count -= 1;
302 inner.thread_count -= 1;
303
304 // The current number of threads is likely to be the system's upper limit, so update
305 // thread_limit accordingly.
306 inner.thread_limit = {
307 let new_limit = inner.thread_count;
308
309 // If the limit is about to be set to zero, set it to one instead so that if,
310 // in the future, we are able to spawn more threads, we will be able to do so.
311 NonZeroUsize::new(new_limit).unwrap_or_else(|| {
312 tracing::warn!(
313 "attempted to lower thread_limit to zero; setting to one instead"
314 );
315 NonZeroUsize::new(1).unwrap()
316 })
317 };
318 }
319 }
320 }
321}
322
323/// Runs blocking code on a thread pool.
324///
325/// # Examples
326///
327/// Read the contents of a file:
328///
329/// ```no_run
330/// use blocking::unblock;
331/// use std::fs;
332///
333/// # futures_lite::future::block_on(async {
334/// let contents = unblock(|| fs::read_to_string("file.txt")).await?;
335/// # std::io::Result::Ok(()) });
336/// ```
337///
338/// Spawn a process:
339///
340/// ```no_run
341/// use blocking::unblock;
342/// use std::process::Command;
343///
344/// # futures_lite::future::block_on(async {
345/// let out = unblock(|| Command::new("dir").output()).await?;
346/// # std::io::Result::Ok(()) });
347/// ```
348pub fn unblock<T, F>(f: F) -> Task<T>
349where
350 F: FnOnce() -> T + Send + 'static,
351 T: Send + 'static,
352{
353 Executor::spawn(future:async move { f() })
354}
355
356/// Runs blocking I/O on a thread pool.
357///
358/// Blocking I/O must be isolated from async code. This type moves blocking I/O operations onto a
359/// special thread pool while exposing a familiar async interface.
360///
361/// This type implements traits [`Stream`], [`AsyncRead`], [`AsyncWrite`], or [`AsyncSeek`] if the
362/// inner type implements [`Iterator`], [`Read`], [`Write`], or [`Seek`], respectively.
363///
364/// # Caveats
365///
366/// [`Unblock`] is a low-level primitive, and as such it comes with some caveats.
367///
368/// For higher-level primitives built on top of [`Unblock`], look into [`async-fs`] or
369/// [`async-process`] (on Windows).
370///
371/// [`async-fs`]: https://github.com/smol-rs/async-fs
372/// [`async-process`]: https://github.com/smol-rs/async-process
373///
374/// [`Unblock`] communicates with I/O operations on the thread pool through a pipe. That means an
375/// async read/write operation simply receives/sends some bytes from/into the pipe. When in reading
376/// mode, the thread pool reads bytes from the I/O handle and forwards them into the pipe until it
377/// becomes full. When in writing mode, the thread pool reads bytes from the pipe and forwards them
378/// into the I/O handle.
379///
380/// Use [`Unblock::with_capacity()`] to configure the capacity of the pipe.
381///
382/// ### Reading
383///
384/// If you create an [`Unblock`]`<`[`Stdin`][`std::io::Stdin`]`>`, read some bytes from it,
385/// and then drop it, a blocked read operation may keep hanging on the thread pool. The next
386/// attempt to read from stdin will lose bytes read by the hanging operation. This is a difficult
387/// problem to solve, so make sure you only use a single stdin handle for the duration of the
388/// entire program.
389///
390/// ### Writing
391///
392/// If writing data through the [`AsyncWrite`] trait, make sure to flush before dropping the
393/// [`Unblock`] handle or some buffered data might get lost.
394///
395/// ### Seeking
396///
397/// Because of buffering in the pipe, if [`Unblock`] wraps a [`File`][`std::fs::File`], a single
398/// read operation may move the file cursor farther than is the span of the operation. In fact,
399/// reading just keeps going in the background until the pipe gets full. Keep this mind when
400/// using [`AsyncSeek`] with [relative][`SeekFrom::Current`] offsets.
401///
402/// # Examples
403///
404/// ```
405/// use blocking::Unblock;
406/// use futures_lite::prelude::*;
407///
408/// # futures_lite::future::block_on(async {
409/// let mut stdout = Unblock::new(std::io::stdout());
410/// stdout.write_all(b"Hello world!").await?;
411/// stdout.flush().await?;
412/// # std::io::Result::Ok(()) });
413/// ```
414pub struct Unblock<T> {
415 state: State<T>,
416 cap: Option<usize>,
417}
418
419impl<T> Unblock<T> {
420 /// Wraps a blocking I/O handle into the async [`Unblock`] interface.
421 ///
422 /// # Examples
423 ///
424 /// ```no_run
425 /// use blocking::Unblock;
426 ///
427 /// let stdin = Unblock::new(std::io::stdin());
428 /// ```
429 pub fn new(io: T) -> Unblock<T> {
430 Unblock {
431 state: State::Idle(Some(Box::new(io))),
432 cap: None,
433 }
434 }
435
436 /// Wraps a blocking I/O handle into the async [`Unblock`] interface with a custom buffer
437 /// capacity.
438 ///
439 /// When communicating with the inner [`Stream`]/[`Read`]/[`Write`] type from async code, data
440 /// transferred between blocking and async code goes through a buffer of limited capacity. This
441 /// constructor configures that capacity.
442 ///
443 /// The default capacity is:
444 ///
445 /// * For [`Iterator`] types: 8192 items.
446 /// * For [`Read`]/[`Write`] types: 8 MB.
447 ///
448 /// # Examples
449 ///
450 /// ```no_run
451 /// use blocking::Unblock;
452 ///
453 /// let stdout = Unblock::with_capacity(64 * 1024, std::io::stdout());
454 /// ```
455 pub fn with_capacity(cap: usize, io: T) -> Unblock<T> {
456 Unblock {
457 state: State::Idle(Some(Box::new(io))),
458 cap: Some(cap),
459 }
460 }
461
462 /// Gets a mutable reference to the blocking I/O handle.
463 ///
464 /// This is an async method because the I/O handle might be on the thread pool and needs to
465 /// be moved onto the current thread before we can get a reference to it.
466 ///
467 /// # Examples
468 ///
469 /// ```no_run
470 /// use blocking::{unblock, Unblock};
471 /// use std::fs::File;
472 ///
473 /// # futures_lite::future::block_on(async {
474 /// let file = unblock(|| File::create("file.txt")).await?;
475 /// let mut file = Unblock::new(file);
476 ///
477 /// let metadata = file.get_mut().await.metadata()?;
478 /// # std::io::Result::Ok(()) });
479 /// ```
480 pub async fn get_mut(&mut self) -> &mut T {
481 // Wait for the running task to stop and ignore I/O errors if there are any.
482 future::poll_fn(|cx| self.poll_stop(cx)).await.ok();
483
484 // Assume idle state and get a reference to the inner value.
485 match &mut self.state {
486 State::Idle(t) => t.as_mut().expect("inner value was taken out"),
487 State::WithMut(..)
488 | State::Streaming(..)
489 | State::Reading(..)
490 | State::Writing(..)
491 | State::Seeking(..) => {
492 unreachable!("when stopped, the state machine must be in idle state");
493 }
494 }
495 }
496
497 /// Performs a blocking operation on the I/O handle.
498 ///
499 /// # Examples
500 ///
501 /// ```no_run
502 /// use blocking::{unblock, Unblock};
503 /// use std::fs::File;
504 ///
505 /// # futures_lite::future::block_on(async {
506 /// let file = unblock(|| File::create("file.txt")).await?;
507 /// let mut file = Unblock::new(file);
508 ///
509 /// let metadata = file.with_mut(|f| f.metadata()).await?;
510 /// # std::io::Result::Ok(()) });
511 /// ```
512 pub async fn with_mut<R, F>(&mut self, op: F) -> R
513 where
514 F: FnOnce(&mut T) -> R + Send + 'static,
515 R: Send + 'static,
516 T: Send + 'static,
517 {
518 // Wait for the running task to stop and ignore I/O errors if there are any.
519 future::poll_fn(|cx| self.poll_stop(cx)).await.ok();
520
521 // Assume idle state and take out the inner value.
522 let mut t = match &mut self.state {
523 State::Idle(t) => t.take().expect("inner value was taken out"),
524 State::WithMut(..)
525 | State::Streaming(..)
526 | State::Reading(..)
527 | State::Writing(..)
528 | State::Seeking(..) => {
529 unreachable!("when stopped, the state machine must be in idle state");
530 }
531 };
532
533 let (sender, receiver) = bounded(1);
534 let task = Executor::spawn(async move {
535 sender.try_send(op(&mut t)).ok();
536 t
537 });
538 self.state = State::WithMut(task);
539
540 receiver
541 .recv()
542 .await
543 .expect("`Unblock::with_mut()` operation has panicked")
544 }
545
546 /// Extracts the inner blocking I/O handle.
547 ///
548 /// This is an async method because the I/O handle might be on the thread pool and needs to
549 /// be moved onto the current thread before we can extract it.
550 ///
551 /// # Examples
552 ///
553 /// ```no_run
554 /// use blocking::{unblock, Unblock};
555 /// use futures_lite::prelude::*;
556 /// use std::fs::File;
557 ///
558 /// # futures_lite::future::block_on(async {
559 /// let file = unblock(|| File::create("file.txt")).await?;
560 /// let file = Unblock::new(file);
561 ///
562 /// let file = file.into_inner().await;
563 /// # std::io::Result::Ok(()) });
564 /// ```
565 pub async fn into_inner(self) -> T {
566 // There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
567 // bind `self` to a local mutable variable.
568 let mut this = self;
569
570 // Wait for the running task to stop and ignore I/O errors if there are any.
571 future::poll_fn(|cx| this.poll_stop(cx)).await.ok();
572
573 // Assume idle state and extract the inner value.
574 match &mut this.state {
575 State::Idle(t) => *t.take().expect("inner value was taken out"),
576 State::WithMut(..)
577 | State::Streaming(..)
578 | State::Reading(..)
579 | State::Writing(..)
580 | State::Seeking(..) => {
581 unreachable!("when stopped, the state machine must be in idle state");
582 }
583 }
584 }
585
586 /// Waits for the running task to stop.
587 ///
588 /// On success, the state machine is moved into the idle state.
589 fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
590 loop {
591 match &mut self.state {
592 State::Idle(_) => return Poll::Ready(Ok(())),
593
594 State::WithMut(task) => {
595 // Poll the task to wait for it to finish.
596 let io = ready!(Pin::new(task).poll(cx));
597 self.state = State::Idle(Some(io));
598 }
599
600 State::Streaming(any, task) => {
601 // Drop the receiver to close the channel. This stops the `send()` operation in
602 // the task, after which the task returns the iterator back.
603 any.take();
604
605 // Poll the task to retrieve the iterator.
606 let iter = ready!(Pin::new(task).poll(cx));
607 self.state = State::Idle(Some(iter));
608 }
609
610 State::Reading(reader, task) => {
611 // Drop the reader to close the pipe. This stops copying inside the task, after
612 // which the task returns the I/O handle back.
613 reader.take();
614
615 // Poll the task to retrieve the I/O handle.
616 let (res, io) = ready!(Pin::new(task).poll(cx));
617 // Make sure to move into the idle state before reporting errors.
618 self.state = State::Idle(Some(io));
619 res?;
620 }
621
622 State::Writing(writer, task) => {
623 // Drop the writer to close the pipe. This stops copying inside the task, after
624 // which the task flushes the I/O handle and
625 writer.take();
626
627 // Poll the task to retrieve the I/O handle.
628 let (res, io) = ready!(Pin::new(task).poll(cx));
629 // Make sure to move into the idle state before reporting errors.
630 self.state = State::Idle(Some(io));
631 res?;
632 }
633
634 State::Seeking(task) => {
635 // Poll the task to wait for it to finish.
636 let (_, res, io) = ready!(Pin::new(task).poll(cx));
637 // Make sure to move into the idle state before reporting errors.
638 self.state = State::Idle(Some(io));
639 res?;
640 }
641 }
642 }
643 }
644}
645
646impl<T: fmt::Debug> fmt::Debug for Unblock<T> {
647 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
648 struct Closed;
649 impl fmt::Debug for Closed {
650 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
651 f.write_str("<closed>")
652 }
653 }
654
655 struct Blocked;
656 impl fmt::Debug for Blocked {
657 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
658 f.write_str("<blocked>")
659 }
660 }
661
662 match &self.state {
663 State::Idle(None) => f.debug_struct("Unblock").field("io", &Closed).finish(),
664 State::Idle(Some(io)) => {
665 let io: &T = io;
666 f.debug_struct("Unblock").field("io", io).finish()
667 }
668 State::WithMut(..)
669 | State::Streaming(..)
670 | State::Reading(..)
671 | State::Writing(..)
672 | State::Seeking(..) => f.debug_struct("Unblock").field("io", &Blocked).finish(),
673 }
674 }
675}
676
677/// Current state of a blocking task.
678enum State<T> {
679 /// There is no blocking task.
680 ///
681 /// The inner value is readily available, unless it has already been extracted. The value is
682 /// extracted out by [`Unblock::into_inner()`], [`AsyncWrite::poll_close()`], or by awaiting
683 /// [`Unblock`].
684 Idle(Option<Box<T>>),
685
686 /// A [`Unblock::with_mut()`] closure was spawned and is still running.
687 WithMut(Task<Box<T>>),
688
689 /// The inner value is an [`Iterator`] currently iterating in a task.
690 ///
691 /// The `dyn Any` value here is a `Pin<Box<Receiver<<T as Iterator>::Item>>>`.
692 Streaming(Option<Box<dyn Any + Send + Sync>>, Task<Box<T>>),
693
694 /// The inner value is a [`Read`] currently reading in a task.
695 Reading(Option<Reader>, Task<(io::Result<()>, Box<T>)>),
696
697 /// The inner value is a [`Write`] currently writing in a task.
698 Writing(Option<Writer>, Task<(io::Result<()>, Box<T>)>),
699
700 /// The inner value is a [`Seek`] currently seeking in a task.
701 Seeking(Task<(SeekFrom, io::Result<u64>, Box<T>)>),
702}
703
704impl<T: Iterator + Send + 'static> Stream for Unblock<T>
705where
706 T::Item: Send + 'static,
707{
708 type Item = T::Item;
709
710 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
711 loop {
712 match &mut self.state {
713 // If not in idle or active streaming state, stop the running task.
714 State::WithMut(..)
715 | State::Streaming(None, _)
716 | State::Reading(..)
717 | State::Writing(..)
718 | State::Seeking(..) => {
719 // Wait for the running task to stop.
720 ready!(self.poll_stop(cx)).ok();
721 }
722
723 // If idle, start a streaming task.
724 State::Idle(iter) => {
725 // Take the iterator out to run it on a blocking task.
726 let mut iter = iter.take().expect("inner iterator was taken out");
727
728 // This channel capacity seems to work well in practice. If it's too low, there
729 // will be too much synchronization between tasks. If too high, memory
730 // consumption increases.
731 let (sender, receiver) = bounded(self.cap.unwrap_or(8 * 1024)); // 8192 items
732
733 // Spawn a blocking task that runs the iterator and returns it when done.
734 let task = Executor::spawn(async move {
735 for item in &mut iter {
736 if sender.send(item).await.is_err() {
737 break;
738 }
739 }
740 iter
741 });
742
743 // Move into the busy state and poll again.
744 self.state = State::Streaming(Some(Box::new(Box::pin(receiver))), task);
745 }
746
747 // If streaming, receive an item.
748 State::Streaming(Some(any), task) => {
749 let receiver = any.downcast_mut::<Pin<Box<Receiver<T::Item>>>>().unwrap();
750
751 // Poll the channel.
752 let opt = ready!(receiver.as_mut().poll_next(cx));
753
754 // If the channel is closed, retrieve the iterator back from the blocking task.
755 // This is not really a required step, but it's cleaner to drop the iterator on
756 // the same thread that created it.
757 if opt.is_none() {
758 // Poll the task to retrieve the iterator.
759 let iter = ready!(Pin::new(task).poll(cx));
760 self.state = State::Idle(Some(iter));
761 }
762
763 return Poll::Ready(opt);
764 }
765 }
766 }
767 }
768}
769
770impl<T: Read + Send + 'static> AsyncRead for Unblock<T> {
771 fn poll_read(
772 mut self: Pin<&mut Self>,
773 cx: &mut Context<'_>,
774 buf: &mut [u8],
775 ) -> Poll<io::Result<usize>> {
776 loop {
777 match &mut self.state {
778 // If not in idle or active reading state, stop the running task.
779 State::WithMut(..)
780 | State::Reading(None, _)
781 | State::Streaming(..)
782 | State::Writing(..)
783 | State::Seeking(..) => {
784 // Wait for the running task to stop.
785 ready!(self.poll_stop(cx))?;
786 }
787
788 // If idle, start a reading task.
789 State::Idle(io) => {
790 // Take the I/O handle out to read it on a blocking task.
791 let mut io = io.take().expect("inner value was taken out");
792
793 // This pipe capacity seems to work well in practice. If it's too low, there
794 // will be too much synchronization between tasks. If too high, memory
795 // consumption increases.
796 let (reader, mut writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
797
798 // Spawn a blocking task that reads and returns the I/O handle when done.
799 let task = Executor::spawn(async move {
800 // Copy bytes from the I/O handle into the pipe until the pipe is closed or
801 // an error occurs.
802 loop {
803 match future::poll_fn(|cx| writer.poll_fill(cx, &mut io)).await {
804 Ok(0) => return (Ok(()), io),
805 Ok(_) => {}
806 Err(err) => return (Err(err), io),
807 }
808 }
809 });
810
811 // Move into the busy state and poll again.
812 self.state = State::Reading(Some(reader), task);
813 }
814
815 // If reading, read bytes from the pipe.
816 State::Reading(Some(reader), task) => {
817 // Poll the pipe.
818 let n = ready!(reader.poll_drain(cx, buf))?;
819
820 // If the pipe is closed, retrieve the I/O handle back from the blocking task.
821 // This is not really a required step, but it's cleaner to drop the handle on
822 // the same thread that created it.
823 if n == 0 {
824 // Poll the task to retrieve the I/O handle.
825 let (res, io) = ready!(Pin::new(task).poll(cx));
826 // Make sure to move into the idle state before reporting errors.
827 self.state = State::Idle(Some(io));
828 res?;
829 }
830
831 return Poll::Ready(Ok(n));
832 }
833 }
834 }
835 }
836}
837
838impl<T: Write + Send + 'static> AsyncWrite for Unblock<T> {
839 fn poll_write(
840 mut self: Pin<&mut Self>,
841 cx: &mut Context<'_>,
842 buf: &[u8],
843 ) -> Poll<io::Result<usize>> {
844 loop {
845 match &mut self.state {
846 // If not in idle or active writing state, stop the running task.
847 State::WithMut(..)
848 | State::Writing(None, _)
849 | State::Streaming(..)
850 | State::Reading(..)
851 | State::Seeking(..) => {
852 // Wait for the running task to stop.
853 ready!(self.poll_stop(cx))?;
854 }
855
856 // If idle, start the writing task.
857 State::Idle(io) => {
858 // Take the I/O handle out to write on a blocking task.
859 let mut io = io.take().expect("inner value was taken out");
860
861 // This pipe capacity seems to work well in practice. If it's too low, there will
862 // be too much synchronization between tasks. If too high, memory consumption
863 // increases.
864 let (mut reader, writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
865
866 // Spawn a blocking task that writes and returns the I/O handle when done.
867 let task = Executor::spawn(async move {
868 // Copy bytes from the pipe into the I/O handle until the pipe is closed or an
869 // error occurs. Flush the I/O handle at the end.
870 loop {
871 match future::poll_fn(|cx| reader.poll_drain(cx, &mut io)).await {
872 Ok(0) => return (io.flush(), io),
873 Ok(_) => {}
874 Err(err) => {
875 io.flush().ok();
876 return (Err(err), io);
877 }
878 }
879 }
880 });
881
882 // Move into the busy state and poll again.
883 self.state = State::Writing(Some(writer), task);
884 }
885
886 // If writing, write more bytes into the pipe.
887 State::Writing(Some(writer), _) => return writer.poll_fill(cx, buf),
888 }
889 }
890 }
891
892 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
893 loop {
894 match &mut self.state {
895 // If not in idle state, stop the running task.
896 State::WithMut(..)
897 | State::Streaming(..)
898 | State::Writing(..)
899 | State::Reading(..)
900 | State::Seeking(..) => {
901 // Wait for the running task to stop.
902 ready!(self.poll_stop(cx))?;
903 }
904
905 // Idle implies flushed.
906 State::Idle(_) => return Poll::Ready(Ok(())),
907 }
908 }
909 }
910
911 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
912 // First, make sure the I/O handle is flushed.
913 ready!(Pin::new(&mut self).poll_flush(cx))?;
914
915 // Then move into the idle state with no I/O handle, thus dropping it.
916 self.state = State::Idle(None);
917 Poll::Ready(Ok(()))
918 }
919}
920
921impl<T: Seek + Send + 'static> AsyncSeek for Unblock<T> {
922 fn poll_seek(
923 mut self: Pin<&mut Self>,
924 cx: &mut Context<'_>,
925 pos: SeekFrom,
926 ) -> Poll<io::Result<u64>> {
927 loop {
928 match &mut self.state {
929 // If not in idle state, stop the running task.
930 State::WithMut(..)
931 | State::Streaming(..)
932 | State::Reading(..)
933 | State::Writing(..) => {
934 // Wait for the running task to stop.
935 ready!(self.poll_stop(cx))?;
936 }
937
938 State::Idle(io) => {
939 // Take the I/O handle out to seek on a blocking task.
940 let mut io = io.take().expect("inner value was taken out");
941
942 let task = Executor::spawn(async move {
943 let res = io.seek(pos);
944 (pos, res, io)
945 });
946 self.state = State::Seeking(task);
947 }
948
949 State::Seeking(task) => {
950 // Poll the task to wait for it to finish.
951 let (original_pos, res, io) = ready!(Pin::new(task).poll(cx));
952 // Make sure to move into the idle state before reporting errors.
953 self.state = State::Idle(Some(io));
954 let current = res?;
955
956 // If the `pos` argument matches the original one, return the result.
957 if original_pos == pos {
958 return Poll::Ready(Ok(current));
959 }
960 }
961 }
962 }
963 }
964}
965
966#[cfg(all(test, not(target_family = "wasm")))]
967mod tests {
968 use super::*;
969
970 #[test]
971 fn test_max_threads() {
972 // properly set env var
973 env::set_var(MAX_THREADS_ENV, "100");
974 assert_eq!(100, Executor::max_threads());
975
976 // passed value below minimum, so we set it to minimum
977 env::set_var(MAX_THREADS_ENV, "0");
978 assert_eq!(1, Executor::max_threads());
979
980 // passed value above maximum, so we set to allowed maximum
981 env::set_var(MAX_THREADS_ENV, "50000");
982 assert_eq!(10000, Executor::max_threads());
983
984 // no env var, use default
985 env::set_var(MAX_THREADS_ENV, "");
986 assert_eq!(500, Executor::max_threads());
987
988 // not a number, use default
989 env::set_var(MAX_THREADS_ENV, "NOTINT");
990 assert_eq!(500, Executor::max_threads());
991 }
992}
993