1 | //! A thread pool for isolating blocking I/O in async programs. |
2 | //! |
3 | //! Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async |
4 | //! support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible |
5 | //! solutions, they're not always available or ideal. |
6 | //! |
7 | //! Since blocking is not allowed inside futures, we must move blocking I/O onto a special thread |
8 | //! pool provided by this crate. The pool dynamically spawns and stops threads depending on the |
9 | //! current number of running I/O jobs. |
10 | //! |
11 | //! Note that there is a limit on the number of active threads. Once that limit is hit, a running |
12 | //! job has to finish before others get a chance to run. When a thread is idle, it waits for the |
13 | //! next job or shuts down after a certain timeout. |
14 | //! |
15 | //! The default number of threads (set to 500) can be altered by setting BLOCKING_MAX_THREADS environment |
16 | //! variable with value between 1 and 10000. |
17 | //! |
18 | //! [IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port |
19 | //! [AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html |
20 | //! [io_uring]: https://lwn.net/Articles/776703 |
21 | //! |
22 | //! # Examples |
23 | //! |
24 | //! Read the contents of a file: |
25 | //! |
26 | //! ```no_run |
27 | //! use blocking::unblock; |
28 | //! use std::fs; |
29 | //! |
30 | //! # futures_lite::future::block_on(async { |
31 | //! let contents = unblock(|| fs::read_to_string("file.txt" )).await?; |
32 | //! println!("{}" , contents); |
33 | //! # std::io::Result::Ok(()) }); |
34 | //! ``` |
35 | //! |
36 | //! Read a file and pipe its contents to stdout: |
37 | //! |
38 | //! ```no_run |
39 | //! use blocking::{unblock, Unblock}; |
40 | //! use futures_lite::io; |
41 | //! use std::fs::File; |
42 | //! |
43 | //! # futures_lite::future::block_on(async { |
44 | //! let input = unblock(|| File::open("file.txt" )).await?; |
45 | //! let input = Unblock::new(input); |
46 | //! let mut output = Unblock::new(std::io::stdout()); |
47 | //! |
48 | //! io::copy(input, &mut output).await?; |
49 | //! # std::io::Result::Ok(()) }); |
50 | //! ``` |
51 | //! |
52 | //! Iterate over the contents of a directory: |
53 | //! |
54 | //! ```no_run |
55 | //! use blocking::Unblock; |
56 | //! use futures_lite::prelude::*; |
57 | //! use std::fs; |
58 | //! |
59 | //! # futures_lite::future::block_on(async { |
60 | //! let mut dir = Unblock::new(fs::read_dir("." )?); |
61 | //! while let Some(item) = dir.next().await { |
62 | //! println!("{}" , item?.file_name().to_string_lossy()); |
63 | //! } |
64 | //! # std::io::Result::Ok(()) }); |
65 | //! ``` |
66 | //! |
67 | //! Spawn a process: |
68 | //! |
69 | //! ```no_run |
70 | //! use blocking::unblock; |
71 | //! use std::process::Command; |
72 | //! |
73 | //! # futures_lite::future::block_on(async { |
74 | //! let out = unblock(|| Command::new("dir" ).output()).await?; |
75 | //! # std::io::Result::Ok(()) }); |
76 | //! ``` |
77 | |
78 | #![warn (missing_docs, missing_debug_implementations, rust_2018_idioms)] |
79 | #![forbid (unsafe_code)] |
80 | #![doc ( |
81 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
82 | )] |
83 | #![doc ( |
84 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
85 | )] |
86 | |
87 | use std::any::Any; |
88 | use std::collections::VecDeque; |
89 | use std::fmt; |
90 | use std::io::{self, Read, Seek, SeekFrom, Write}; |
91 | use std::num::NonZeroUsize; |
92 | use std::panic; |
93 | use std::pin::Pin; |
94 | use std::sync::atomic::{AtomicUsize, Ordering}; |
95 | use std::sync::{Condvar, Mutex, MutexGuard}; |
96 | use std::task::{Context, Poll}; |
97 | use std::thread; |
98 | use std::time::Duration; |
99 | |
100 | #[cfg (not(target_family = "wasm" ))] |
101 | use std::env; |
102 | |
103 | use async_channel::{bounded, Receiver}; |
104 | use async_task::Runnable; |
105 | use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; |
106 | use futures_lite::{future, prelude::*, ready}; |
107 | use piper::{pipe, Reader, Writer}; |
108 | |
109 | #[doc (no_inline)] |
110 | pub use async_task::Task; |
111 | |
112 | /// Default value for max threads that Executor can grow to |
113 | #[cfg (not(target_family = "wasm" ))] |
114 | const DEFAULT_MAX_THREADS: usize = 500; |
115 | |
116 | /// Minimum value for max threads config |
117 | #[cfg (not(target_family = "wasm" ))] |
118 | const MIN_MAX_THREADS: usize = 1; |
119 | |
120 | /// Maximum value for max threads config |
121 | #[cfg (not(target_family = "wasm" ))] |
122 | const MAX_MAX_THREADS: usize = 10000; |
123 | |
124 | /// Env variable that allows to override default value for max threads. |
125 | #[cfg (not(target_family = "wasm" ))] |
126 | const MAX_THREADS_ENV: &str = "BLOCKING_MAX_THREADS" ; |
127 | |
128 | /// The blocking executor. |
129 | struct Executor { |
130 | /// Inner state of the executor. |
131 | inner: Mutex<Inner>, |
132 | |
133 | /// Used to put idle threads to sleep and wake them up when new work comes in. |
134 | cvar: Condvar, |
135 | } |
136 | |
137 | /// Inner state of the blocking executor. |
138 | struct Inner { |
139 | /// Number of idle threads in the pool. |
140 | /// |
141 | /// Idle threads are sleeping, waiting to get a task to run. |
142 | idle_count: usize, |
143 | |
144 | /// Total number of threads in the pool. |
145 | /// |
146 | /// This is the number of idle threads + the number of active threads. |
147 | thread_count: usize, |
148 | |
149 | /// The queue of blocking tasks. |
150 | queue: VecDeque<Runnable>, |
151 | |
152 | /// Maximum number of threads in the pool |
153 | thread_limit: NonZeroUsize, |
154 | } |
155 | |
156 | impl Executor { |
157 | #[cfg (not(target_family = "wasm" ))] |
158 | fn max_threads() -> usize { |
159 | match env::var(MAX_THREADS_ENV) { |
160 | Ok(v) => v |
161 | .parse::<usize>() |
162 | .map(|v| v.max(MIN_MAX_THREADS).min(MAX_MAX_THREADS)) |
163 | .unwrap_or(DEFAULT_MAX_THREADS), |
164 | Err(_) => DEFAULT_MAX_THREADS, |
165 | } |
166 | } |
167 | |
168 | /// Get a reference to the global executor. |
169 | #[inline ] |
170 | fn get() -> &'static Self { |
171 | #[cfg (not(target_family = "wasm" ))] |
172 | { |
173 | use async_lock::OnceCell; |
174 | |
175 | static EXECUTOR: OnceCell<Executor> = OnceCell::new(); |
176 | |
177 | return EXECUTOR.get_or_init_blocking(|| { |
178 | let thread_limit = Self::max_threads(); |
179 | Executor { |
180 | inner: Mutex::new(Inner { |
181 | idle_count: 0, |
182 | thread_count: 0, |
183 | queue: VecDeque::new(), |
184 | thread_limit: NonZeroUsize::new(thread_limit).unwrap(), |
185 | }), |
186 | cvar: Condvar::new(), |
187 | } |
188 | }); |
189 | } |
190 | |
191 | #[cfg (target_family = "wasm" )] |
192 | panic!("cannot spawn a blocking task on WASM" ) |
193 | } |
194 | |
195 | /// Spawns a future onto this executor. |
196 | /// |
197 | /// Returns a [`Task`] handle for the spawned task. |
198 | fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> { |
199 | let (runnable, task) = async_task::spawn(future, |r| { |
200 | // Initialize the executor if we haven't already. |
201 | let executor = Self::get(); |
202 | |
203 | // Schedule the task on our executor. |
204 | executor.schedule(r) |
205 | }); |
206 | runnable.schedule(); |
207 | task |
208 | } |
209 | |
210 | /// Runs the main loop on the current thread. |
211 | /// |
212 | /// This function runs blocking tasks until it becomes idle and times out. |
213 | fn main_loop(&'static self) { |
214 | let span = tracing::trace_span!("blocking::main_loop" ); |
215 | let _enter = span.enter(); |
216 | |
217 | let mut inner = self.inner.lock().unwrap(); |
218 | loop { |
219 | // This thread is not idle anymore because it's going to run tasks. |
220 | inner.idle_count -= 1; |
221 | |
222 | // Run tasks in the queue. |
223 | while let Some(runnable) = inner.queue.pop_front() { |
224 | // We have found a task - grow the pool if needed. |
225 | self.grow_pool(inner); |
226 | |
227 | // Run the task. |
228 | panic::catch_unwind(|| runnable.run()).ok(); |
229 | |
230 | // Re-lock the inner state and continue. |
231 | inner = self.inner.lock().unwrap(); |
232 | } |
233 | |
234 | // This thread is now becoming idle. |
235 | inner.idle_count += 1; |
236 | |
237 | // Put the thread to sleep until another task is scheduled. |
238 | let timeout = Duration::from_millis(500); |
239 | tracing::trace!(?timeout, "going to sleep" ); |
240 | let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap(); |
241 | inner = lock; |
242 | |
243 | // If there are no tasks after a while, stop this thread. |
244 | if res.timed_out() && inner.queue.is_empty() { |
245 | inner.idle_count -= 1; |
246 | inner.thread_count -= 1; |
247 | break; |
248 | } |
249 | |
250 | tracing::trace!("notified" ); |
251 | } |
252 | |
253 | tracing::trace!("shutting down due to lack of tasks" ); |
254 | } |
255 | |
256 | /// Schedules a runnable task for execution. |
257 | fn schedule(&'static self, runnable: Runnable) { |
258 | let mut inner = self.inner.lock().unwrap(); |
259 | inner.queue.push_back(runnable); |
260 | |
261 | // Notify a sleeping thread and spawn more threads if needed. |
262 | self.cvar.notify_one(); |
263 | self.grow_pool(inner); |
264 | } |
265 | |
266 | /// Spawns more blocking threads if the pool is overloaded with work. |
267 | fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) { |
268 | let span = tracing::trace_span!( |
269 | "grow_pool" , |
270 | queue_len = inner.queue.len(), |
271 | idle_count = inner.idle_count, |
272 | thread_count = inner.thread_count, |
273 | ); |
274 | let _enter = span.enter(); |
275 | |
276 | // If runnable tasks greatly outnumber idle threads and there aren't too many threads |
277 | // already, then be aggressive: wake all idle threads and spawn one more thread. |
278 | while inner.queue.len() > inner.idle_count * 5 |
279 | && inner.thread_count < inner.thread_limit.get() |
280 | { |
281 | tracing::trace!("spawning a new thread to handle blocking tasks" ); |
282 | |
283 | // The new thread starts in idle state. |
284 | inner.idle_count += 1; |
285 | inner.thread_count += 1; |
286 | |
287 | // Notify all existing idle threads because we need to hurry up. |
288 | self.cvar.notify_all(); |
289 | |
290 | // Generate a new thread ID. |
291 | static ID: AtomicUsize = AtomicUsize::new(1); |
292 | let id = ID.fetch_add(1, Ordering::Relaxed); |
293 | |
294 | // Spawn the new thread. |
295 | if let Err(e) = thread::Builder::new() |
296 | .name(format!("blocking- {}" , id)) |
297 | .spawn(move || self.main_loop()) |
298 | { |
299 | // We were unable to spawn the thread, so we need to undo the state changes. |
300 | tracing::error!("failed to spawn a blocking thread: {}" , e); |
301 | inner.idle_count -= 1; |
302 | inner.thread_count -= 1; |
303 | |
304 | // The current number of threads is likely to be the system's upper limit, so update |
305 | // thread_limit accordingly. |
306 | inner.thread_limit = { |
307 | let new_limit = inner.thread_count; |
308 | |
309 | // If the limit is about to be set to zero, set it to one instead so that if, |
310 | // in the future, we are able to spawn more threads, we will be able to do so. |
311 | NonZeroUsize::new(new_limit).unwrap_or_else(|| { |
312 | tracing::warn!( |
313 | "attempted to lower thread_limit to zero; setting to one instead" |
314 | ); |
315 | NonZeroUsize::new(1).unwrap() |
316 | }) |
317 | }; |
318 | } |
319 | } |
320 | } |
321 | } |
322 | |
323 | /// Runs blocking code on a thread pool. |
324 | /// |
325 | /// # Examples |
326 | /// |
327 | /// Read the contents of a file: |
328 | /// |
329 | /// ```no_run |
330 | /// use blocking::unblock; |
331 | /// use std::fs; |
332 | /// |
333 | /// # futures_lite::future::block_on(async { |
334 | /// let contents = unblock(|| fs::read_to_string("file.txt" )).await?; |
335 | /// # std::io::Result::Ok(()) }); |
336 | /// ``` |
337 | /// |
338 | /// Spawn a process: |
339 | /// |
340 | /// ```no_run |
341 | /// use blocking::unblock; |
342 | /// use std::process::Command; |
343 | /// |
344 | /// # futures_lite::future::block_on(async { |
345 | /// let out = unblock(|| Command::new("dir" ).output()).await?; |
346 | /// # std::io::Result::Ok(()) }); |
347 | /// ``` |
348 | pub fn unblock<T, F>(f: F) -> Task<T> |
349 | where |
350 | F: FnOnce() -> T + Send + 'static, |
351 | T: Send + 'static, |
352 | { |
353 | Executor::spawn(future:async move { f() }) |
354 | } |
355 | |
356 | /// Runs blocking I/O on a thread pool. |
357 | /// |
358 | /// Blocking I/O must be isolated from async code. This type moves blocking I/O operations onto a |
359 | /// special thread pool while exposing a familiar async interface. |
360 | /// |
361 | /// This type implements traits [`Stream`], [`AsyncRead`], [`AsyncWrite`], or [`AsyncSeek`] if the |
362 | /// inner type implements [`Iterator`], [`Read`], [`Write`], or [`Seek`], respectively. |
363 | /// |
364 | /// # Caveats |
365 | /// |
366 | /// [`Unblock`] is a low-level primitive, and as such it comes with some caveats. |
367 | /// |
368 | /// For higher-level primitives built on top of [`Unblock`], look into [`async-fs`] or |
369 | /// [`async-process`] (on Windows). |
370 | /// |
371 | /// [`async-fs`]: https://github.com/smol-rs/async-fs |
372 | /// [`async-process`]: https://github.com/smol-rs/async-process |
373 | /// |
374 | /// [`Unblock`] communicates with I/O operations on the thread pool through a pipe. That means an |
375 | /// async read/write operation simply receives/sends some bytes from/into the pipe. When in reading |
376 | /// mode, the thread pool reads bytes from the I/O handle and forwards them into the pipe until it |
377 | /// becomes full. When in writing mode, the thread pool reads bytes from the pipe and forwards them |
378 | /// into the I/O handle. |
379 | /// |
380 | /// Use [`Unblock::with_capacity()`] to configure the capacity of the pipe. |
381 | /// |
382 | /// ### Reading |
383 | /// |
384 | /// If you create an [`Unblock`]`<`[`Stdin`][`std::io::Stdin`]`>`, read some bytes from it, |
385 | /// and then drop it, a blocked read operation may keep hanging on the thread pool. The next |
386 | /// attempt to read from stdin will lose bytes read by the hanging operation. This is a difficult |
387 | /// problem to solve, so make sure you only use a single stdin handle for the duration of the |
388 | /// entire program. |
389 | /// |
390 | /// ### Writing |
391 | /// |
392 | /// If writing data through the [`AsyncWrite`] trait, make sure to flush before dropping the |
393 | /// [`Unblock`] handle or some buffered data might get lost. |
394 | /// |
395 | /// ### Seeking |
396 | /// |
397 | /// Because of buffering in the pipe, if [`Unblock`] wraps a [`File`][`std::fs::File`], a single |
398 | /// read operation may move the file cursor farther than is the span of the operation. In fact, |
399 | /// reading just keeps going in the background until the pipe gets full. Keep this mind when |
400 | /// using [`AsyncSeek`] with [relative][`SeekFrom::Current`] offsets. |
401 | /// |
402 | /// # Examples |
403 | /// |
404 | /// ``` |
405 | /// use blocking::Unblock; |
406 | /// use futures_lite::prelude::*; |
407 | /// |
408 | /// # futures_lite::future::block_on(async { |
409 | /// let mut stdout = Unblock::new(std::io::stdout()); |
410 | /// stdout.write_all(b"Hello world!" ).await?; |
411 | /// stdout.flush().await?; |
412 | /// # std::io::Result::Ok(()) }); |
413 | /// ``` |
414 | pub struct Unblock<T> { |
415 | state: State<T>, |
416 | cap: Option<usize>, |
417 | } |
418 | |
419 | impl<T> Unblock<T> { |
420 | /// Wraps a blocking I/O handle into the async [`Unblock`] interface. |
421 | /// |
422 | /// # Examples |
423 | /// |
424 | /// ```no_run |
425 | /// use blocking::Unblock; |
426 | /// |
427 | /// let stdin = Unblock::new(std::io::stdin()); |
428 | /// ``` |
429 | pub fn new(io: T) -> Unblock<T> { |
430 | Unblock { |
431 | state: State::Idle(Some(Box::new(io))), |
432 | cap: None, |
433 | } |
434 | } |
435 | |
436 | /// Wraps a blocking I/O handle into the async [`Unblock`] interface with a custom buffer |
437 | /// capacity. |
438 | /// |
439 | /// When communicating with the inner [`Stream`]/[`Read`]/[`Write`] type from async code, data |
440 | /// transferred between blocking and async code goes through a buffer of limited capacity. This |
441 | /// constructor configures that capacity. |
442 | /// |
443 | /// The default capacity is: |
444 | /// |
445 | /// * For [`Iterator`] types: 8192 items. |
446 | /// * For [`Read`]/[`Write`] types: 8 MB. |
447 | /// |
448 | /// # Examples |
449 | /// |
450 | /// ```no_run |
451 | /// use blocking::Unblock; |
452 | /// |
453 | /// let stdout = Unblock::with_capacity(64 * 1024, std::io::stdout()); |
454 | /// ``` |
455 | pub fn with_capacity(cap: usize, io: T) -> Unblock<T> { |
456 | Unblock { |
457 | state: State::Idle(Some(Box::new(io))), |
458 | cap: Some(cap), |
459 | } |
460 | } |
461 | |
462 | /// Gets a mutable reference to the blocking I/O handle. |
463 | /// |
464 | /// This is an async method because the I/O handle might be on the thread pool and needs to |
465 | /// be moved onto the current thread before we can get a reference to it. |
466 | /// |
467 | /// # Examples |
468 | /// |
469 | /// ```no_run |
470 | /// use blocking::{unblock, Unblock}; |
471 | /// use std::fs::File; |
472 | /// |
473 | /// # futures_lite::future::block_on(async { |
474 | /// let file = unblock(|| File::create("file.txt" )).await?; |
475 | /// let mut file = Unblock::new(file); |
476 | /// |
477 | /// let metadata = file.get_mut().await.metadata()?; |
478 | /// # std::io::Result::Ok(()) }); |
479 | /// ``` |
480 | pub async fn get_mut(&mut self) -> &mut T { |
481 | // Wait for the running task to stop and ignore I/O errors if there are any. |
482 | future::poll_fn(|cx| self.poll_stop(cx)).await.ok(); |
483 | |
484 | // Assume idle state and get a reference to the inner value. |
485 | match &mut self.state { |
486 | State::Idle(t) => t.as_mut().expect("inner value was taken out" ), |
487 | State::WithMut(..) |
488 | | State::Streaming(..) |
489 | | State::Reading(..) |
490 | | State::Writing(..) |
491 | | State::Seeking(..) => { |
492 | unreachable!("when stopped, the state machine must be in idle state" ); |
493 | } |
494 | } |
495 | } |
496 | |
497 | /// Performs a blocking operation on the I/O handle. |
498 | /// |
499 | /// # Examples |
500 | /// |
501 | /// ```no_run |
502 | /// use blocking::{unblock, Unblock}; |
503 | /// use std::fs::File; |
504 | /// |
505 | /// # futures_lite::future::block_on(async { |
506 | /// let file = unblock(|| File::create("file.txt" )).await?; |
507 | /// let mut file = Unblock::new(file); |
508 | /// |
509 | /// let metadata = file.with_mut(|f| f.metadata()).await?; |
510 | /// # std::io::Result::Ok(()) }); |
511 | /// ``` |
512 | pub async fn with_mut<R, F>(&mut self, op: F) -> R |
513 | where |
514 | F: FnOnce(&mut T) -> R + Send + 'static, |
515 | R: Send + 'static, |
516 | T: Send + 'static, |
517 | { |
518 | // Wait for the running task to stop and ignore I/O errors if there are any. |
519 | future::poll_fn(|cx| self.poll_stop(cx)).await.ok(); |
520 | |
521 | // Assume idle state and take out the inner value. |
522 | let mut t = match &mut self.state { |
523 | State::Idle(t) => t.take().expect("inner value was taken out" ), |
524 | State::WithMut(..) |
525 | | State::Streaming(..) |
526 | | State::Reading(..) |
527 | | State::Writing(..) |
528 | | State::Seeking(..) => { |
529 | unreachable!("when stopped, the state machine must be in idle state" ); |
530 | } |
531 | }; |
532 | |
533 | let (sender, receiver) = bounded(1); |
534 | let task = Executor::spawn(async move { |
535 | sender.try_send(op(&mut t)).ok(); |
536 | t |
537 | }); |
538 | self.state = State::WithMut(task); |
539 | |
540 | receiver |
541 | .recv() |
542 | .await |
543 | .expect("`Unblock::with_mut()` operation has panicked" ) |
544 | } |
545 | |
546 | /// Extracts the inner blocking I/O handle. |
547 | /// |
548 | /// This is an async method because the I/O handle might be on the thread pool and needs to |
549 | /// be moved onto the current thread before we can extract it. |
550 | /// |
551 | /// # Examples |
552 | /// |
553 | /// ```no_run |
554 | /// use blocking::{unblock, Unblock}; |
555 | /// use futures_lite::prelude::*; |
556 | /// use std::fs::File; |
557 | /// |
558 | /// # futures_lite::future::block_on(async { |
559 | /// let file = unblock(|| File::create("file.txt" )).await?; |
560 | /// let file = Unblock::new(file); |
561 | /// |
562 | /// let file = file.into_inner().await; |
563 | /// # std::io::Result::Ok(()) }); |
564 | /// ``` |
565 | pub async fn into_inner(self) -> T { |
566 | // There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just |
567 | // bind `self` to a local mutable variable. |
568 | let mut this = self; |
569 | |
570 | // Wait for the running task to stop and ignore I/O errors if there are any. |
571 | future::poll_fn(|cx| this.poll_stop(cx)).await.ok(); |
572 | |
573 | // Assume idle state and extract the inner value. |
574 | match &mut this.state { |
575 | State::Idle(t) => *t.take().expect("inner value was taken out" ), |
576 | State::WithMut(..) |
577 | | State::Streaming(..) |
578 | | State::Reading(..) |
579 | | State::Writing(..) |
580 | | State::Seeking(..) => { |
581 | unreachable!("when stopped, the state machine must be in idle state" ); |
582 | } |
583 | } |
584 | } |
585 | |
586 | /// Waits for the running task to stop. |
587 | /// |
588 | /// On success, the state machine is moved into the idle state. |
589 | fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
590 | loop { |
591 | match &mut self.state { |
592 | State::Idle(_) => return Poll::Ready(Ok(())), |
593 | |
594 | State::WithMut(task) => { |
595 | // Poll the task to wait for it to finish. |
596 | let io = ready!(Pin::new(task).poll(cx)); |
597 | self.state = State::Idle(Some(io)); |
598 | } |
599 | |
600 | State::Streaming(any, task) => { |
601 | // Drop the receiver to close the channel. This stops the `send()` operation in |
602 | // the task, after which the task returns the iterator back. |
603 | any.take(); |
604 | |
605 | // Poll the task to retrieve the iterator. |
606 | let iter = ready!(Pin::new(task).poll(cx)); |
607 | self.state = State::Idle(Some(iter)); |
608 | } |
609 | |
610 | State::Reading(reader, task) => { |
611 | // Drop the reader to close the pipe. This stops copying inside the task, after |
612 | // which the task returns the I/O handle back. |
613 | reader.take(); |
614 | |
615 | // Poll the task to retrieve the I/O handle. |
616 | let (res, io) = ready!(Pin::new(task).poll(cx)); |
617 | // Make sure to move into the idle state before reporting errors. |
618 | self.state = State::Idle(Some(io)); |
619 | res?; |
620 | } |
621 | |
622 | State::Writing(writer, task) => { |
623 | // Drop the writer to close the pipe. This stops copying inside the task, after |
624 | // which the task flushes the I/O handle and |
625 | writer.take(); |
626 | |
627 | // Poll the task to retrieve the I/O handle. |
628 | let (res, io) = ready!(Pin::new(task).poll(cx)); |
629 | // Make sure to move into the idle state before reporting errors. |
630 | self.state = State::Idle(Some(io)); |
631 | res?; |
632 | } |
633 | |
634 | State::Seeking(task) => { |
635 | // Poll the task to wait for it to finish. |
636 | let (_, res, io) = ready!(Pin::new(task).poll(cx)); |
637 | // Make sure to move into the idle state before reporting errors. |
638 | self.state = State::Idle(Some(io)); |
639 | res?; |
640 | } |
641 | } |
642 | } |
643 | } |
644 | } |
645 | |
646 | impl<T: fmt::Debug> fmt::Debug for Unblock<T> { |
647 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
648 | struct Closed; |
649 | impl fmt::Debug for Closed { |
650 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
651 | f.write_str("<closed>" ) |
652 | } |
653 | } |
654 | |
655 | struct Blocked; |
656 | impl fmt::Debug for Blocked { |
657 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
658 | f.write_str("<blocked>" ) |
659 | } |
660 | } |
661 | |
662 | match &self.state { |
663 | State::Idle(None) => f.debug_struct("Unblock" ).field("io" , &Closed).finish(), |
664 | State::Idle(Some(io)) => { |
665 | let io: &T = io; |
666 | f.debug_struct("Unblock" ).field("io" , io).finish() |
667 | } |
668 | State::WithMut(..) |
669 | | State::Streaming(..) |
670 | | State::Reading(..) |
671 | | State::Writing(..) |
672 | | State::Seeking(..) => f.debug_struct("Unblock" ).field("io" , &Blocked).finish(), |
673 | } |
674 | } |
675 | } |
676 | |
677 | /// Current state of a blocking task. |
678 | enum State<T> { |
679 | /// There is no blocking task. |
680 | /// |
681 | /// The inner value is readily available, unless it has already been extracted. The value is |
682 | /// extracted out by [`Unblock::into_inner()`], [`AsyncWrite::poll_close()`], or by awaiting |
683 | /// [`Unblock`]. |
684 | Idle(Option<Box<T>>), |
685 | |
686 | /// A [`Unblock::with_mut()`] closure was spawned and is still running. |
687 | WithMut(Task<Box<T>>), |
688 | |
689 | /// The inner value is an [`Iterator`] currently iterating in a task. |
690 | /// |
691 | /// The `dyn Any` value here is a `Pin<Box<Receiver<<T as Iterator>::Item>>>`. |
692 | Streaming(Option<Box<dyn Any + Send + Sync>>, Task<Box<T>>), |
693 | |
694 | /// The inner value is a [`Read`] currently reading in a task. |
695 | Reading(Option<Reader>, Task<(io::Result<()>, Box<T>)>), |
696 | |
697 | /// The inner value is a [`Write`] currently writing in a task. |
698 | Writing(Option<Writer>, Task<(io::Result<()>, Box<T>)>), |
699 | |
700 | /// The inner value is a [`Seek`] currently seeking in a task. |
701 | Seeking(Task<(SeekFrom, io::Result<u64>, Box<T>)>), |
702 | } |
703 | |
704 | impl<T: Iterator + Send + 'static> Stream for Unblock<T> |
705 | where |
706 | T::Item: Send + 'static, |
707 | { |
708 | type Item = T::Item; |
709 | |
710 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { |
711 | loop { |
712 | match &mut self.state { |
713 | // If not in idle or active streaming state, stop the running task. |
714 | State::WithMut(..) |
715 | | State::Streaming(None, _) |
716 | | State::Reading(..) |
717 | | State::Writing(..) |
718 | | State::Seeking(..) => { |
719 | // Wait for the running task to stop. |
720 | ready!(self.poll_stop(cx)).ok(); |
721 | } |
722 | |
723 | // If idle, start a streaming task. |
724 | State::Idle(iter) => { |
725 | // Take the iterator out to run it on a blocking task. |
726 | let mut iter = iter.take().expect("inner iterator was taken out" ); |
727 | |
728 | // This channel capacity seems to work well in practice. If it's too low, there |
729 | // will be too much synchronization between tasks. If too high, memory |
730 | // consumption increases. |
731 | let (sender, receiver) = bounded(self.cap.unwrap_or(8 * 1024)); // 8192 items |
732 | |
733 | // Spawn a blocking task that runs the iterator and returns it when done. |
734 | let task = Executor::spawn(async move { |
735 | for item in &mut iter { |
736 | if sender.send(item).await.is_err() { |
737 | break; |
738 | } |
739 | } |
740 | iter |
741 | }); |
742 | |
743 | // Move into the busy state and poll again. |
744 | self.state = State::Streaming(Some(Box::new(Box::pin(receiver))), task); |
745 | } |
746 | |
747 | // If streaming, receive an item. |
748 | State::Streaming(Some(any), task) => { |
749 | let receiver = any.downcast_mut::<Pin<Box<Receiver<T::Item>>>>().unwrap(); |
750 | |
751 | // Poll the channel. |
752 | let opt = ready!(receiver.as_mut().poll_next(cx)); |
753 | |
754 | // If the channel is closed, retrieve the iterator back from the blocking task. |
755 | // This is not really a required step, but it's cleaner to drop the iterator on |
756 | // the same thread that created it. |
757 | if opt.is_none() { |
758 | // Poll the task to retrieve the iterator. |
759 | let iter = ready!(Pin::new(task).poll(cx)); |
760 | self.state = State::Idle(Some(iter)); |
761 | } |
762 | |
763 | return Poll::Ready(opt); |
764 | } |
765 | } |
766 | } |
767 | } |
768 | } |
769 | |
770 | impl<T: Read + Send + 'static> AsyncRead for Unblock<T> { |
771 | fn poll_read( |
772 | mut self: Pin<&mut Self>, |
773 | cx: &mut Context<'_>, |
774 | buf: &mut [u8], |
775 | ) -> Poll<io::Result<usize>> { |
776 | loop { |
777 | match &mut self.state { |
778 | // If not in idle or active reading state, stop the running task. |
779 | State::WithMut(..) |
780 | | State::Reading(None, _) |
781 | | State::Streaming(..) |
782 | | State::Writing(..) |
783 | | State::Seeking(..) => { |
784 | // Wait for the running task to stop. |
785 | ready!(self.poll_stop(cx))?; |
786 | } |
787 | |
788 | // If idle, start a reading task. |
789 | State::Idle(io) => { |
790 | // Take the I/O handle out to read it on a blocking task. |
791 | let mut io = io.take().expect("inner value was taken out" ); |
792 | |
793 | // This pipe capacity seems to work well in practice. If it's too low, there |
794 | // will be too much synchronization between tasks. If too high, memory |
795 | // consumption increases. |
796 | let (reader, mut writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB |
797 | |
798 | // Spawn a blocking task that reads and returns the I/O handle when done. |
799 | let task = Executor::spawn(async move { |
800 | // Copy bytes from the I/O handle into the pipe until the pipe is closed or |
801 | // an error occurs. |
802 | loop { |
803 | match future::poll_fn(|cx| writer.poll_fill(cx, &mut io)).await { |
804 | Ok(0) => return (Ok(()), io), |
805 | Ok(_) => {} |
806 | Err(err) => return (Err(err), io), |
807 | } |
808 | } |
809 | }); |
810 | |
811 | // Move into the busy state and poll again. |
812 | self.state = State::Reading(Some(reader), task); |
813 | } |
814 | |
815 | // If reading, read bytes from the pipe. |
816 | State::Reading(Some(reader), task) => { |
817 | // Poll the pipe. |
818 | let n = ready!(reader.poll_drain(cx, buf))?; |
819 | |
820 | // If the pipe is closed, retrieve the I/O handle back from the blocking task. |
821 | // This is not really a required step, but it's cleaner to drop the handle on |
822 | // the same thread that created it. |
823 | if n == 0 { |
824 | // Poll the task to retrieve the I/O handle. |
825 | let (res, io) = ready!(Pin::new(task).poll(cx)); |
826 | // Make sure to move into the idle state before reporting errors. |
827 | self.state = State::Idle(Some(io)); |
828 | res?; |
829 | } |
830 | |
831 | return Poll::Ready(Ok(n)); |
832 | } |
833 | } |
834 | } |
835 | } |
836 | } |
837 | |
838 | impl<T: Write + Send + 'static> AsyncWrite for Unblock<T> { |
839 | fn poll_write( |
840 | mut self: Pin<&mut Self>, |
841 | cx: &mut Context<'_>, |
842 | buf: &[u8], |
843 | ) -> Poll<io::Result<usize>> { |
844 | loop { |
845 | match &mut self.state { |
846 | // If not in idle or active writing state, stop the running task. |
847 | State::WithMut(..) |
848 | | State::Writing(None, _) |
849 | | State::Streaming(..) |
850 | | State::Reading(..) |
851 | | State::Seeking(..) => { |
852 | // Wait for the running task to stop. |
853 | ready!(self.poll_stop(cx))?; |
854 | } |
855 | |
856 | // If idle, start the writing task. |
857 | State::Idle(io) => { |
858 | // Take the I/O handle out to write on a blocking task. |
859 | let mut io = io.take().expect("inner value was taken out" ); |
860 | |
861 | // This pipe capacity seems to work well in practice. If it's too low, there will |
862 | // be too much synchronization between tasks. If too high, memory consumption |
863 | // increases. |
864 | let (mut reader, writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB |
865 | |
866 | // Spawn a blocking task that writes and returns the I/O handle when done. |
867 | let task = Executor::spawn(async move { |
868 | // Copy bytes from the pipe into the I/O handle until the pipe is closed or an |
869 | // error occurs. Flush the I/O handle at the end. |
870 | loop { |
871 | match future::poll_fn(|cx| reader.poll_drain(cx, &mut io)).await { |
872 | Ok(0) => return (io.flush(), io), |
873 | Ok(_) => {} |
874 | Err(err) => { |
875 | io.flush().ok(); |
876 | return (Err(err), io); |
877 | } |
878 | } |
879 | } |
880 | }); |
881 | |
882 | // Move into the busy state and poll again. |
883 | self.state = State::Writing(Some(writer), task); |
884 | } |
885 | |
886 | // If writing, write more bytes into the pipe. |
887 | State::Writing(Some(writer), _) => return writer.poll_fill(cx, buf), |
888 | } |
889 | } |
890 | } |
891 | |
892 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
893 | loop { |
894 | match &mut self.state { |
895 | // If not in idle state, stop the running task. |
896 | State::WithMut(..) |
897 | | State::Streaming(..) |
898 | | State::Writing(..) |
899 | | State::Reading(..) |
900 | | State::Seeking(..) => { |
901 | // Wait for the running task to stop. |
902 | ready!(self.poll_stop(cx))?; |
903 | } |
904 | |
905 | // Idle implies flushed. |
906 | State::Idle(_) => return Poll::Ready(Ok(())), |
907 | } |
908 | } |
909 | } |
910 | |
911 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
912 | // First, make sure the I/O handle is flushed. |
913 | ready!(Pin::new(&mut self).poll_flush(cx))?; |
914 | |
915 | // Then move into the idle state with no I/O handle, thus dropping it. |
916 | self.state = State::Idle(None); |
917 | Poll::Ready(Ok(())) |
918 | } |
919 | } |
920 | |
921 | impl<T: Seek + Send + 'static> AsyncSeek for Unblock<T> { |
922 | fn poll_seek( |
923 | mut self: Pin<&mut Self>, |
924 | cx: &mut Context<'_>, |
925 | pos: SeekFrom, |
926 | ) -> Poll<io::Result<u64>> { |
927 | loop { |
928 | match &mut self.state { |
929 | // If not in idle state, stop the running task. |
930 | State::WithMut(..) |
931 | | State::Streaming(..) |
932 | | State::Reading(..) |
933 | | State::Writing(..) => { |
934 | // Wait for the running task to stop. |
935 | ready!(self.poll_stop(cx))?; |
936 | } |
937 | |
938 | State::Idle(io) => { |
939 | // Take the I/O handle out to seek on a blocking task. |
940 | let mut io = io.take().expect("inner value was taken out" ); |
941 | |
942 | let task = Executor::spawn(async move { |
943 | let res = io.seek(pos); |
944 | (pos, res, io) |
945 | }); |
946 | self.state = State::Seeking(task); |
947 | } |
948 | |
949 | State::Seeking(task) => { |
950 | // Poll the task to wait for it to finish. |
951 | let (original_pos, res, io) = ready!(Pin::new(task).poll(cx)); |
952 | // Make sure to move into the idle state before reporting errors. |
953 | self.state = State::Idle(Some(io)); |
954 | let current = res?; |
955 | |
956 | // If the `pos` argument matches the original one, return the result. |
957 | if original_pos == pos { |
958 | return Poll::Ready(Ok(current)); |
959 | } |
960 | } |
961 | } |
962 | } |
963 | } |
964 | } |
965 | |
966 | #[cfg (all(test, not(target_family = "wasm" )))] |
967 | mod tests { |
968 | use super::*; |
969 | |
970 | #[test ] |
971 | fn test_max_threads() { |
972 | // properly set env var |
973 | env::set_var(MAX_THREADS_ENV, "100" ); |
974 | assert_eq!(100, Executor::max_threads()); |
975 | |
976 | // passed value below minimum, so we set it to minimum |
977 | env::set_var(MAX_THREADS_ENV, "0" ); |
978 | assert_eq!(1, Executor::max_threads()); |
979 | |
980 | // passed value above maximum, so we set to allowed maximum |
981 | env::set_var(MAX_THREADS_ENV, "50000" ); |
982 | assert_eq!(10000, Executor::max_threads()); |
983 | |
984 | // no env var, use default |
985 | env::set_var(MAX_THREADS_ENV, "" ); |
986 | assert_eq!(500, Executor::max_threads()); |
987 | |
988 | // not a number, use default |
989 | env::set_var(MAX_THREADS_ENV, "NOTINT" ); |
990 | assert_eq!(500, Executor::max_threads()); |
991 | } |
992 | } |
993 | |