1 | //! A thread pool for isolating blocking I/O in async programs. |
2 | //! |
3 | //! Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async |
4 | //! support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible |
5 | //! solutions, they're not always available or ideal. |
6 | //! |
7 | //! Since blocking is not allowed inside futures, we must move blocking I/O onto a special thread |
8 | //! pool provided by this crate. The pool dynamically spawns and stops threads depending on the |
9 | //! current number of running I/O jobs. |
10 | //! |
11 | //! Note that there is a limit on the number of active threads. Once that limit is hit, a running |
12 | //! job has to finish before others get a chance to run. When a thread is idle, it waits for the |
13 | //! next job or shuts down after a certain timeout. |
14 | //! |
15 | //! The default number of threads (set to 500) can be altered by setting BLOCKING_MAX_THREADS environment |
16 | //! variable with value between 1 and 10000. |
17 | //! |
18 | //! [IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port |
19 | //! [AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html |
20 | //! [io_uring]: https://lwn.net/Articles/776703 |
21 | //! |
22 | //! # Examples |
23 | //! |
24 | //! Read the contents of a file: |
25 | //! |
26 | //! ```no_run |
27 | //! use blocking::unblock; |
28 | //! use std::fs; |
29 | //! |
30 | //! # futures_lite::future::block_on(async { |
31 | //! let contents = unblock(|| fs::read_to_string("file.txt" )).await?; |
32 | //! println!("{}" , contents); |
33 | //! # std::io::Result::Ok(()) }); |
34 | //! ``` |
35 | //! |
36 | //! Read a file and pipe its contents to stdout: |
37 | //! |
38 | //! ```no_run |
39 | //! use blocking::{unblock, Unblock}; |
40 | //! use futures_lite::io; |
41 | //! use std::fs::File; |
42 | //! |
43 | //! # futures_lite::future::block_on(async { |
44 | //! let input = unblock(|| File::open("file.txt" )).await?; |
45 | //! let input = Unblock::new(input); |
46 | //! let mut output = Unblock::new(std::io::stdout()); |
47 | //! |
48 | //! io::copy(input, &mut output).await?; |
49 | //! # std::io::Result::Ok(()) }); |
50 | //! ``` |
51 | //! |
52 | //! Iterate over the contents of a directory: |
53 | //! |
54 | //! ```no_run |
55 | //! use blocking::Unblock; |
56 | //! use futures_lite::prelude::*; |
57 | //! use std::fs; |
58 | //! |
59 | //! # futures_lite::future::block_on(async { |
60 | //! let mut dir = Unblock::new(fs::read_dir("." )?); |
61 | //! while let Some(item) = dir.next().await { |
62 | //! println!("{}" , item?.file_name().to_string_lossy()); |
63 | //! } |
64 | //! # std::io::Result::Ok(()) }); |
65 | //! ``` |
66 | //! |
67 | //! Spawn a process: |
68 | //! |
69 | //! ```no_run |
70 | //! use blocking::unblock; |
71 | //! use std::process::Command; |
72 | //! |
73 | //! # futures_lite::future::block_on(async { |
74 | //! let out = unblock(|| Command::new("dir" ).output()).await?; |
75 | //! # std::io::Result::Ok(()) }); |
76 | //! ``` |
77 | |
78 | #![warn (missing_docs, missing_debug_implementations, rust_2018_idioms)] |
79 | #![forbid (unsafe_code)] |
80 | #![doc ( |
81 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
82 | )] |
83 | #![doc ( |
84 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
85 | )] |
86 | |
87 | use std::any::Any; |
88 | use std::collections::VecDeque; |
89 | use std::fmt; |
90 | use std::io::{self, Read, Seek, SeekFrom, Write}; |
91 | use std::num::NonZeroUsize; |
92 | use std::panic; |
93 | use std::pin::Pin; |
94 | use std::sync::atomic::{AtomicUsize, Ordering}; |
95 | use std::sync::{Condvar, Mutex, MutexGuard}; |
96 | use std::task::{Context, Poll}; |
97 | use std::thread; |
98 | use std::time::Duration; |
99 | |
100 | #[cfg (not(target_family = "wasm" ))] |
101 | use std::env; |
102 | |
103 | use async_channel::{bounded, Receiver}; |
104 | use async_task::Runnable; |
105 | use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; |
106 | use futures_lite::{ |
107 | future::{self, Future}, |
108 | ready, |
109 | stream::Stream, |
110 | }; |
111 | use piper::{pipe, Reader, Writer}; |
112 | |
113 | #[doc (no_inline)] |
114 | pub use async_task::Task; |
115 | |
116 | /// Default value for max threads that Executor can grow to |
117 | #[cfg (not(target_family = "wasm" ))] |
118 | const DEFAULT_MAX_THREADS: NonZeroUsize = { |
119 | if let Some(size: NonZero) = NonZeroUsize::new(500) { |
120 | size |
121 | } else { |
122 | panic!("DEFAULT_MAX_THREADS is non-zero" ); |
123 | } |
124 | }; |
125 | |
126 | /// Minimum value for max threads config |
127 | #[cfg (not(target_family = "wasm" ))] |
128 | const MIN_MAX_THREADS: usize = 1; |
129 | |
130 | /// Maximum value for max threads config |
131 | #[cfg (not(target_family = "wasm" ))] |
132 | const MAX_MAX_THREADS: usize = 10000; |
133 | |
134 | /// Env variable that allows to override default value for max threads. |
135 | #[cfg (not(target_family = "wasm" ))] |
136 | const MAX_THREADS_ENV: &str = "BLOCKING_MAX_THREADS" ; |
137 | |
138 | /// The blocking executor. |
139 | struct Executor { |
140 | /// Inner state of the executor. |
141 | inner: Mutex<Inner>, |
142 | |
143 | /// Used to put idle threads to sleep and wake them up when new work comes in. |
144 | cvar: Condvar, |
145 | } |
146 | |
147 | /// Inner state of the blocking executor. |
148 | struct Inner { |
149 | /// Number of idle threads in the pool. |
150 | /// |
151 | /// Idle threads are sleeping, waiting to get a task to run. |
152 | idle_count: usize, |
153 | |
154 | /// Total number of threads in the pool. |
155 | /// |
156 | /// This is the number of idle threads + the number of active threads. |
157 | thread_count: usize, |
158 | |
159 | // TODO: The option is only used for const-initialization. This can be replaced with |
160 | // a normal VecDeque when the MSRV can be bumped passed |
161 | /// The queue of blocking tasks. |
162 | queue: Option<VecDeque<Runnable>>, |
163 | |
164 | /// Maximum number of threads in the pool |
165 | thread_limit: Option<NonZeroUsize>, |
166 | } |
167 | |
168 | impl Inner { |
169 | #[inline ] |
170 | fn queue(&mut self) -> &mut VecDeque<Runnable> { |
171 | self.queue.get_or_insert_with(VecDeque::new) |
172 | } |
173 | } |
174 | |
175 | impl Executor { |
176 | #[cfg (not(target_family = "wasm" ))] |
177 | fn max_threads() -> NonZeroUsize { |
178 | match env::var(MAX_THREADS_ENV) { |
179 | Ok(v) => v |
180 | .parse::<usize>() |
181 | .ok() |
182 | .and_then(|v| NonZeroUsize::new(v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS))) |
183 | .unwrap_or(DEFAULT_MAX_THREADS), |
184 | Err(_) => DEFAULT_MAX_THREADS, |
185 | } |
186 | } |
187 | |
188 | #[cfg (target_family = "wasm" )] |
189 | fn max_threads() -> NonZeroUsize { |
190 | NonZeroUsize::new(1).unwrap() |
191 | } |
192 | |
193 | /// Get a reference to the global executor. |
194 | #[inline ] |
195 | fn get() -> &'static Self { |
196 | #[cfg (not(target_family = "wasm" ))] |
197 | { |
198 | static EXECUTOR: Executor = Executor { |
199 | inner: Mutex::new(Inner { |
200 | idle_count: 0, |
201 | thread_count: 0, |
202 | queue: None, |
203 | thread_limit: None, |
204 | }), |
205 | cvar: Condvar::new(), |
206 | }; |
207 | |
208 | &EXECUTOR |
209 | } |
210 | |
211 | #[cfg (target_family = "wasm" )] |
212 | panic!("cannot spawn a blocking task on WASM" ) |
213 | } |
214 | |
215 | /// Spawns a future onto this executor. |
216 | /// |
217 | /// Returns a [`Task`] handle for the spawned task. |
218 | fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> { |
219 | let (runnable, task) = async_task::Builder::new().propagate_panic(true).spawn( |
220 | move |()| future, |
221 | |r| { |
222 | // Initialize the executor if we haven't already. |
223 | let executor = Self::get(); |
224 | |
225 | // Schedule the task on our executor. |
226 | executor.schedule(r) |
227 | }, |
228 | ); |
229 | runnable.schedule(); |
230 | task |
231 | } |
232 | |
233 | /// Runs the main loop on the current thread. |
234 | /// |
235 | /// This function runs blocking tasks until it becomes idle and times out. |
236 | fn main_loop(&'static self) { |
237 | #[cfg (feature = "tracing" )] |
238 | let _span = tracing::trace_span!("blocking::main_loop" ).entered(); |
239 | |
240 | let mut inner = self.inner.lock().unwrap(); |
241 | loop { |
242 | // This thread is not idle anymore because it's going to run tasks. |
243 | inner.idle_count -= 1; |
244 | |
245 | // Run tasks in the queue. |
246 | while let Some(runnable) = inner.queue().pop_front() { |
247 | // We have found a task - grow the pool if needed. |
248 | self.grow_pool(inner); |
249 | |
250 | // Run the task. |
251 | panic::catch_unwind(|| runnable.run()).ok(); |
252 | |
253 | // Re-lock the inner state and continue. |
254 | inner = self.inner.lock().unwrap(); |
255 | } |
256 | |
257 | // This thread is now becoming idle. |
258 | inner.idle_count += 1; |
259 | |
260 | // Put the thread to sleep until another task is scheduled. |
261 | let timeout = Duration::from_millis(500); |
262 | #[cfg (feature = "tracing" )] |
263 | tracing::trace!(?timeout, "going to sleep" ); |
264 | let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap(); |
265 | inner = lock; |
266 | |
267 | // If there are no tasks after a while, stop this thread. |
268 | if res.timed_out() && inner.queue().is_empty() { |
269 | inner.idle_count -= 1; |
270 | inner.thread_count -= 1; |
271 | break; |
272 | } |
273 | |
274 | #[cfg (feature = "tracing" )] |
275 | tracing::trace!("notified" ); |
276 | } |
277 | |
278 | #[cfg (feature = "tracing" )] |
279 | tracing::trace!("shutting down due to lack of tasks" ); |
280 | } |
281 | |
282 | /// Schedules a runnable task for execution. |
283 | fn schedule(&'static self, runnable: Runnable) { |
284 | let mut inner = self.inner.lock().unwrap(); |
285 | inner.queue().push_back(runnable); |
286 | |
287 | // Notify a sleeping thread and spawn more threads if needed. |
288 | self.cvar.notify_one(); |
289 | self.grow_pool(inner); |
290 | } |
291 | |
292 | /// Spawns more blocking threads if the pool is overloaded with work. |
293 | fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) { |
294 | #[cfg (feature = "tracing" )] |
295 | let _span = tracing::trace_span!( |
296 | "grow_pool" , |
297 | queue_len = inner.queue().len(), |
298 | idle_count = inner.idle_count, |
299 | thread_count = inner.thread_count, |
300 | ) |
301 | .entered(); |
302 | |
303 | let thread_limit = inner |
304 | .thread_limit |
305 | .get_or_insert_with(Self::max_threads) |
306 | .get(); |
307 | |
308 | // If runnable tasks greatly outnumber idle threads and there aren't too many threads |
309 | // already, then be aggressive: wake all idle threads and spawn one more thread. |
310 | while inner.queue().len() > inner.idle_count * 5 && inner.thread_count < thread_limit { |
311 | #[cfg (feature = "tracing" )] |
312 | tracing::trace!("spawning a new thread to handle blocking tasks" ); |
313 | |
314 | // The new thread starts in idle state. |
315 | inner.idle_count += 1; |
316 | inner.thread_count += 1; |
317 | |
318 | // Notify all existing idle threads because we need to hurry up. |
319 | self.cvar.notify_all(); |
320 | |
321 | // Generate a new thread ID. |
322 | static ID: AtomicUsize = AtomicUsize::new(1); |
323 | let id = ID.fetch_add(1, Ordering::Relaxed); |
324 | |
325 | // Spawn the new thread. |
326 | if let Err(_e) = thread::Builder::new() |
327 | .name(format!("blocking- {}" , id)) |
328 | .spawn(move || self.main_loop()) |
329 | { |
330 | // We were unable to spawn the thread, so we need to undo the state changes. |
331 | #[cfg (feature = "tracing" )] |
332 | tracing::error!("failed to spawn a blocking thread: {}" , _e); |
333 | inner.idle_count -= 1; |
334 | inner.thread_count -= 1; |
335 | |
336 | // The current number of threads is likely to be the system's upper limit, so update |
337 | // thread_limit accordingly. |
338 | inner.thread_limit = { |
339 | let new_limit = inner.thread_count; |
340 | |
341 | // If the limit is about to be set to zero, set it to one instead so that if, |
342 | // in the future, we are able to spawn more threads, we will be able to do so. |
343 | Some(NonZeroUsize::new(new_limit).unwrap_or_else(|| { |
344 | #[cfg (feature = "tracing" )] |
345 | tracing::warn!( |
346 | "attempted to lower thread_limit to zero; setting to one instead" |
347 | ); |
348 | NonZeroUsize::new(1).unwrap() |
349 | })) |
350 | }; |
351 | } |
352 | } |
353 | } |
354 | } |
355 | |
356 | /// Runs blocking code on a thread pool. |
357 | /// |
358 | /// # Examples |
359 | /// |
360 | /// Read the contents of a file: |
361 | /// |
362 | /// ```no_run |
363 | /// use blocking::unblock; |
364 | /// use std::fs; |
365 | /// |
366 | /// # futures_lite::future::block_on(async { |
367 | /// let contents = unblock(|| fs::read_to_string("file.txt" )).await?; |
368 | /// # std::io::Result::Ok(()) }); |
369 | /// ``` |
370 | /// |
371 | /// Spawn a process: |
372 | /// |
373 | /// ```no_run |
374 | /// use blocking::unblock; |
375 | /// use std::process::Command; |
376 | /// |
377 | /// # futures_lite::future::block_on(async { |
378 | /// let out = unblock(|| Command::new("dir" ).output()).await?; |
379 | /// # std::io::Result::Ok(()) }); |
380 | /// ``` |
381 | pub fn unblock<T, F>(f: F) -> Task<T> |
382 | where |
383 | F: FnOnce() -> T + Send + 'static, |
384 | T: Send + 'static, |
385 | { |
386 | Executor::spawn(future:async move { f() }) |
387 | } |
388 | |
389 | /// Runs blocking I/O on a thread pool. |
390 | /// |
391 | /// Blocking I/O must be isolated from async code. This type moves blocking I/O operations onto a |
392 | /// special thread pool while exposing a familiar async interface. |
393 | /// |
394 | /// This type implements traits [`Stream`], [`AsyncRead`], [`AsyncWrite`], or [`AsyncSeek`] if the |
395 | /// inner type implements [`Iterator`], [`Read`], [`Write`], or [`Seek`], respectively. |
396 | /// |
397 | /// # Caveats |
398 | /// |
399 | /// [`Unblock`] is a low-level primitive, and as such it comes with some caveats. |
400 | /// |
401 | /// For higher-level primitives built on top of [`Unblock`], look into [`async-fs`] or |
402 | /// [`async-process`] (on Windows). |
403 | /// |
404 | /// [`async-fs`]: https://github.com/smol-rs/async-fs |
405 | /// [`async-process`]: https://github.com/smol-rs/async-process |
406 | /// |
407 | /// [`Unblock`] communicates with I/O operations on the thread pool through a pipe. That means an |
408 | /// async read/write operation simply receives/sends some bytes from/into the pipe. When in reading |
409 | /// mode, the thread pool reads bytes from the I/O handle and forwards them into the pipe until it |
410 | /// becomes full. When in writing mode, the thread pool reads bytes from the pipe and forwards them |
411 | /// into the I/O handle. |
412 | /// |
413 | /// Use [`Unblock::with_capacity()`] to configure the capacity of the pipe. |
414 | /// |
415 | /// ### Reading |
416 | /// |
417 | /// If you create an [`Unblock`]`<`[`Stdin`][`std::io::Stdin`]`>`, read some bytes from it, |
418 | /// and then drop it, a blocked read operation may keep hanging on the thread pool. The next |
419 | /// attempt to read from stdin will lose bytes read by the hanging operation. This is a difficult |
420 | /// problem to solve, so make sure you only use a single stdin handle for the duration of the |
421 | /// entire program. |
422 | /// |
423 | /// ### Writing |
424 | /// |
425 | /// If writing data through the [`AsyncWrite`] trait, make sure to flush before dropping the |
426 | /// [`Unblock`] handle or some buffered data might get lost. |
427 | /// |
428 | /// ### Seeking |
429 | /// |
430 | /// Because of buffering in the pipe, if [`Unblock`] wraps a [`File`][`std::fs::File`], a single |
431 | /// read operation may move the file cursor farther than is the span of the operation. In fact, |
432 | /// reading just keeps going in the background until the pipe gets full. Keep this mind when |
433 | /// using [`AsyncSeek`] with [relative][`SeekFrom::Current`] offsets. |
434 | /// |
435 | /// # Examples |
436 | /// |
437 | /// ``` |
438 | /// use blocking::Unblock; |
439 | /// use futures_lite::prelude::*; |
440 | /// |
441 | /// # futures_lite::future::block_on(async { |
442 | /// let mut stdout = Unblock::new(std::io::stdout()); |
443 | /// stdout.write_all(b"Hello world!" ).await?; |
444 | /// stdout.flush().await?; |
445 | /// # std::io::Result::Ok(()) }); |
446 | /// ``` |
447 | pub struct Unblock<T> { |
448 | state: State<T>, |
449 | cap: Option<usize>, |
450 | } |
451 | |
452 | impl<T> Unblock<T> { |
453 | /// Wraps a blocking I/O handle into the async [`Unblock`] interface. |
454 | /// |
455 | /// # Examples |
456 | /// |
457 | /// ```no_run |
458 | /// use blocking::Unblock; |
459 | /// |
460 | /// let stdin = Unblock::new(std::io::stdin()); |
461 | /// ``` |
462 | pub fn new(io: T) -> Unblock<T> { |
463 | Unblock { |
464 | state: State::Idle(Some(Box::new(io))), |
465 | cap: None, |
466 | } |
467 | } |
468 | |
469 | /// Wraps a blocking I/O handle into the async [`Unblock`] interface with a custom buffer |
470 | /// capacity. |
471 | /// |
472 | /// When communicating with the inner [`Stream`]/[`Read`]/[`Write`] type from async code, data |
473 | /// transferred between blocking and async code goes through a buffer of limited capacity. This |
474 | /// constructor configures that capacity. |
475 | /// |
476 | /// The default capacity is: |
477 | /// |
478 | /// * For [`Iterator`] types: 8192 items. |
479 | /// * For [`Read`]/[`Write`] types: 8 MB. |
480 | /// |
481 | /// # Examples |
482 | /// |
483 | /// ```no_run |
484 | /// use blocking::Unblock; |
485 | /// |
486 | /// let stdout = Unblock::with_capacity(64 * 1024, std::io::stdout()); |
487 | /// ``` |
488 | pub fn with_capacity(cap: usize, io: T) -> Unblock<T> { |
489 | Unblock { |
490 | state: State::Idle(Some(Box::new(io))), |
491 | cap: Some(cap), |
492 | } |
493 | } |
494 | |
495 | /// Gets a mutable reference to the blocking I/O handle. |
496 | /// |
497 | /// This is an async method because the I/O handle might be on the thread pool and needs to |
498 | /// be moved onto the current thread before we can get a reference to it. |
499 | /// |
500 | /// # Examples |
501 | /// |
502 | /// ```no_run |
503 | /// use blocking::{unblock, Unblock}; |
504 | /// use std::fs::File; |
505 | /// |
506 | /// # futures_lite::future::block_on(async { |
507 | /// let file = unblock(|| File::create("file.txt" )).await?; |
508 | /// let mut file = Unblock::new(file); |
509 | /// |
510 | /// let metadata = file.get_mut().await.metadata()?; |
511 | /// # std::io::Result::Ok(()) }); |
512 | /// ``` |
513 | pub async fn get_mut(&mut self) -> &mut T { |
514 | // Wait for the running task to stop and ignore I/O errors if there are any. |
515 | future::poll_fn(|cx| self.poll_stop(cx)).await.ok(); |
516 | |
517 | // Assume idle state and get a reference to the inner value. |
518 | match &mut self.state { |
519 | State::Idle(t) => t.as_mut().expect("inner value was taken out" ), |
520 | State::WithMut(..) |
521 | | State::Streaming(..) |
522 | | State::Reading(..) |
523 | | State::Writing(..) |
524 | | State::Seeking(..) => { |
525 | unreachable!("when stopped, the state machine must be in idle state" ); |
526 | } |
527 | } |
528 | } |
529 | |
530 | /// Performs a blocking operation on the I/O handle. |
531 | /// |
532 | /// # Examples |
533 | /// |
534 | /// ```no_run |
535 | /// use blocking::{unblock, Unblock}; |
536 | /// use std::fs::File; |
537 | /// |
538 | /// # futures_lite::future::block_on(async { |
539 | /// let file = unblock(|| File::create("file.txt" )).await?; |
540 | /// let mut file = Unblock::new(file); |
541 | /// |
542 | /// let metadata = file.with_mut(|f| f.metadata()).await?; |
543 | /// # std::io::Result::Ok(()) }); |
544 | /// ``` |
545 | pub async fn with_mut<R, F>(&mut self, op: F) -> R |
546 | where |
547 | F: FnOnce(&mut T) -> R + Send + 'static, |
548 | R: Send + 'static, |
549 | T: Send + 'static, |
550 | { |
551 | // Wait for the running task to stop and ignore I/O errors if there are any. |
552 | future::poll_fn(|cx| self.poll_stop(cx)).await.ok(); |
553 | |
554 | // Assume idle state and take out the inner value. |
555 | let mut t = match &mut self.state { |
556 | State::Idle(t) => t.take().expect("inner value was taken out" ), |
557 | State::WithMut(..) |
558 | | State::Streaming(..) |
559 | | State::Reading(..) |
560 | | State::Writing(..) |
561 | | State::Seeking(..) => { |
562 | unreachable!("when stopped, the state machine must be in idle state" ); |
563 | } |
564 | }; |
565 | |
566 | let (sender, receiver) = bounded(1); |
567 | let task = Executor::spawn(async move { |
568 | sender.try_send(op(&mut t)).ok(); |
569 | t |
570 | }); |
571 | self.state = State::WithMut(task); |
572 | |
573 | receiver |
574 | .recv() |
575 | .await |
576 | .expect("`Unblock::with_mut()` operation has panicked" ) |
577 | } |
578 | |
579 | /// Extracts the inner blocking I/O handle. |
580 | /// |
581 | /// This is an async method because the I/O handle might be on the thread pool and needs to |
582 | /// be moved onto the current thread before we can extract it. |
583 | /// |
584 | /// # Examples |
585 | /// |
586 | /// ```no_run |
587 | /// use blocking::{unblock, Unblock}; |
588 | /// use futures_lite::prelude::*; |
589 | /// use std::fs::File; |
590 | /// |
591 | /// # futures_lite::future::block_on(async { |
592 | /// let file = unblock(|| File::create("file.txt" )).await?; |
593 | /// let file = Unblock::new(file); |
594 | /// |
595 | /// let file = file.into_inner().await; |
596 | /// # std::io::Result::Ok(()) }); |
597 | /// ``` |
598 | pub async fn into_inner(self) -> T { |
599 | // There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just |
600 | // bind `self` to a local mutable variable. |
601 | let mut this = self; |
602 | |
603 | // Wait for the running task to stop and ignore I/O errors if there are any. |
604 | future::poll_fn(|cx| this.poll_stop(cx)).await.ok(); |
605 | |
606 | // Assume idle state and extract the inner value. |
607 | match &mut this.state { |
608 | State::Idle(t) => *t.take().expect("inner value was taken out" ), |
609 | State::WithMut(..) |
610 | | State::Streaming(..) |
611 | | State::Reading(..) |
612 | | State::Writing(..) |
613 | | State::Seeking(..) => { |
614 | unreachable!("when stopped, the state machine must be in idle state" ); |
615 | } |
616 | } |
617 | } |
618 | |
619 | /// Waits for the running task to stop. |
620 | /// |
621 | /// On success, the state machine is moved into the idle state. |
622 | fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
623 | loop { |
624 | match &mut self.state { |
625 | State::Idle(_) => return Poll::Ready(Ok(())), |
626 | |
627 | State::WithMut(task) => { |
628 | // Poll the task to wait for it to finish. |
629 | let io = ready!(Pin::new(task).poll(cx)); |
630 | self.state = State::Idle(Some(io)); |
631 | } |
632 | |
633 | State::Streaming(any, task) => { |
634 | // Drop the receiver to close the channel. This stops the `send()` operation in |
635 | // the task, after which the task returns the iterator back. |
636 | any.take(); |
637 | |
638 | // Poll the task to retrieve the iterator. |
639 | let iter = ready!(Pin::new(task).poll(cx)); |
640 | self.state = State::Idle(Some(iter)); |
641 | } |
642 | |
643 | State::Reading(reader, task) => { |
644 | // Drop the reader to close the pipe. This stops copying inside the task, after |
645 | // which the task returns the I/O handle back. |
646 | reader.take(); |
647 | |
648 | // Poll the task to retrieve the I/O handle. |
649 | let (res, io) = ready!(Pin::new(task).poll(cx)); |
650 | // Make sure to move into the idle state before reporting errors. |
651 | self.state = State::Idle(Some(io)); |
652 | res?; |
653 | } |
654 | |
655 | State::Writing(writer, task) => { |
656 | // Drop the writer to close the pipe. This stops copying inside the task, after |
657 | // which the task flushes the I/O handle and |
658 | writer.take(); |
659 | |
660 | // Poll the task to retrieve the I/O handle. |
661 | let (res, io) = ready!(Pin::new(task).poll(cx)); |
662 | // Make sure to move into the idle state before reporting errors. |
663 | self.state = State::Idle(Some(io)); |
664 | res?; |
665 | } |
666 | |
667 | State::Seeking(task) => { |
668 | // Poll the task to wait for it to finish. |
669 | let (_, res, io) = ready!(Pin::new(task).poll(cx)); |
670 | // Make sure to move into the idle state before reporting errors. |
671 | self.state = State::Idle(Some(io)); |
672 | res?; |
673 | } |
674 | } |
675 | } |
676 | } |
677 | } |
678 | |
679 | impl<T: fmt::Debug> fmt::Debug for Unblock<T> { |
680 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
681 | struct Closed; |
682 | impl fmt::Debug for Closed { |
683 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
684 | f.write_str("<closed>" ) |
685 | } |
686 | } |
687 | |
688 | struct Blocked; |
689 | impl fmt::Debug for Blocked { |
690 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
691 | f.write_str("<blocked>" ) |
692 | } |
693 | } |
694 | |
695 | match &self.state { |
696 | State::Idle(None) => f.debug_struct("Unblock" ).field("io" , &Closed).finish(), |
697 | State::Idle(Some(io)) => { |
698 | let io: &T = io; |
699 | f.debug_struct("Unblock" ).field("io" , io).finish() |
700 | } |
701 | State::WithMut(..) |
702 | | State::Streaming(..) |
703 | | State::Reading(..) |
704 | | State::Writing(..) |
705 | | State::Seeking(..) => f.debug_struct("Unblock" ).field("io" , &Blocked).finish(), |
706 | } |
707 | } |
708 | } |
709 | |
710 | /// Current state of a blocking task. |
711 | enum State<T> { |
712 | /// There is no blocking task. |
713 | /// |
714 | /// The inner value is readily available, unless it has already been extracted. The value is |
715 | /// extracted out by [`Unblock::into_inner()`], [`AsyncWrite::poll_close()`], or by awaiting |
716 | /// [`Unblock`]. |
717 | Idle(Option<Box<T>>), |
718 | |
719 | /// A [`Unblock::with_mut()`] closure was spawned and is still running. |
720 | WithMut(Task<Box<T>>), |
721 | |
722 | /// The inner value is an [`Iterator`] currently iterating in a task. |
723 | /// |
724 | /// The `dyn Any` value here is a `Pin<Box<Receiver<<T as Iterator>::Item>>>`. |
725 | Streaming(Option<Box<dyn Any + Send + Sync>>, Task<Box<T>>), |
726 | |
727 | /// The inner value is a [`Read`] currently reading in a task. |
728 | Reading(Option<Reader>, Task<(io::Result<()>, Box<T>)>), |
729 | |
730 | /// The inner value is a [`Write`] currently writing in a task. |
731 | Writing(Option<Writer>, Task<(io::Result<()>, Box<T>)>), |
732 | |
733 | /// The inner value is a [`Seek`] currently seeking in a task. |
734 | Seeking(Task<(SeekFrom, io::Result<u64>, Box<T>)>), |
735 | } |
736 | |
737 | impl<T: Iterator + Send + 'static> Stream for Unblock<T> |
738 | where |
739 | T::Item: Send + 'static, |
740 | { |
741 | type Item = T::Item; |
742 | |
743 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { |
744 | loop { |
745 | match &mut self.state { |
746 | // If not in idle or active streaming state, stop the running task. |
747 | State::WithMut(..) |
748 | | State::Streaming(None, _) |
749 | | State::Reading(..) |
750 | | State::Writing(..) |
751 | | State::Seeking(..) => { |
752 | // Wait for the running task to stop. |
753 | ready!(self.poll_stop(cx)).ok(); |
754 | } |
755 | |
756 | // If idle, start a streaming task. |
757 | State::Idle(iter) => { |
758 | // Take the iterator out to run it on a blocking task. |
759 | let mut iter = iter.take().expect("inner iterator was taken out" ); |
760 | |
761 | // This channel capacity seems to work well in practice. If it's too low, there |
762 | // will be too much synchronization between tasks. If too high, memory |
763 | // consumption increases. |
764 | let (sender, receiver) = bounded(self.cap.unwrap_or(8 * 1024)); // 8192 items |
765 | |
766 | // Spawn a blocking task that runs the iterator and returns it when done. |
767 | let task = Executor::spawn(async move { |
768 | for item in &mut iter { |
769 | if sender.send(item).await.is_err() { |
770 | break; |
771 | } |
772 | } |
773 | iter |
774 | }); |
775 | |
776 | // Move into the busy state and poll again. |
777 | self.state = State::Streaming(Some(Box::new(Box::pin(receiver))), task); |
778 | } |
779 | |
780 | // If streaming, receive an item. |
781 | State::Streaming(Some(any), task) => { |
782 | let receiver = any.downcast_mut::<Pin<Box<Receiver<T::Item>>>>().unwrap(); |
783 | |
784 | // Poll the channel. |
785 | let opt = ready!(receiver.as_mut().poll_next(cx)); |
786 | |
787 | // If the channel is closed, retrieve the iterator back from the blocking task. |
788 | // This is not really a required step, but it's cleaner to drop the iterator on |
789 | // the same thread that created it. |
790 | if opt.is_none() { |
791 | // Poll the task to retrieve the iterator. |
792 | let iter = ready!(Pin::new(task).poll(cx)); |
793 | self.state = State::Idle(Some(iter)); |
794 | } |
795 | |
796 | return Poll::Ready(opt); |
797 | } |
798 | } |
799 | } |
800 | } |
801 | } |
802 | |
803 | impl<T: Read + Send + 'static> AsyncRead for Unblock<T> { |
804 | fn poll_read( |
805 | mut self: Pin<&mut Self>, |
806 | cx: &mut Context<'_>, |
807 | buf: &mut [u8], |
808 | ) -> Poll<io::Result<usize>> { |
809 | loop { |
810 | match &mut self.state { |
811 | // If not in idle or active reading state, stop the running task. |
812 | State::WithMut(..) |
813 | | State::Reading(None, _) |
814 | | State::Streaming(..) |
815 | | State::Writing(..) |
816 | | State::Seeking(..) => { |
817 | // Wait for the running task to stop. |
818 | ready!(self.poll_stop(cx))?; |
819 | } |
820 | |
821 | // If idle, start a reading task. |
822 | State::Idle(io) => { |
823 | // Take the I/O handle out to read it on a blocking task. |
824 | let mut io = io.take().expect("inner value was taken out" ); |
825 | |
826 | // This pipe capacity seems to work well in practice. If it's too low, there |
827 | // will be too much synchronization between tasks. If too high, memory |
828 | // consumption increases. |
829 | let (reader, mut writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB |
830 | |
831 | // Spawn a blocking task that reads and returns the I/O handle when done. |
832 | let task = Executor::spawn(async move { |
833 | // Copy bytes from the I/O handle into the pipe until the pipe is closed or |
834 | // an error occurs. |
835 | loop { |
836 | match future::poll_fn(|cx| writer.poll_fill(cx, &mut io)).await { |
837 | Ok(0) => return (Ok(()), io), |
838 | Ok(_) => {} |
839 | Err(err) => return (Err(err), io), |
840 | } |
841 | } |
842 | }); |
843 | |
844 | // Move into the busy state and poll again. |
845 | self.state = State::Reading(Some(reader), task); |
846 | } |
847 | |
848 | // If reading, read bytes from the pipe. |
849 | State::Reading(Some(reader), task) => { |
850 | // Poll the pipe. |
851 | let n = ready!(reader.poll_drain(cx, buf))?; |
852 | |
853 | // If the pipe is closed, retrieve the I/O handle back from the blocking task. |
854 | // This is not really a required step, but it's cleaner to drop the handle on |
855 | // the same thread that created it. |
856 | if n == 0 { |
857 | // Poll the task to retrieve the I/O handle. |
858 | let (res, io) = ready!(Pin::new(task).poll(cx)); |
859 | // Make sure to move into the idle state before reporting errors. |
860 | self.state = State::Idle(Some(io)); |
861 | res?; |
862 | } |
863 | |
864 | return Poll::Ready(Ok(n)); |
865 | } |
866 | } |
867 | } |
868 | } |
869 | } |
870 | |
871 | impl<T: Write + Send + 'static> AsyncWrite for Unblock<T> { |
872 | fn poll_write( |
873 | mut self: Pin<&mut Self>, |
874 | cx: &mut Context<'_>, |
875 | buf: &[u8], |
876 | ) -> Poll<io::Result<usize>> { |
877 | loop { |
878 | match &mut self.state { |
879 | // If not in idle or active writing state, stop the running task. |
880 | State::WithMut(..) |
881 | | State::Writing(None, _) |
882 | | State::Streaming(..) |
883 | | State::Reading(..) |
884 | | State::Seeking(..) => { |
885 | // Wait for the running task to stop. |
886 | ready!(self.poll_stop(cx))?; |
887 | } |
888 | |
889 | // If idle, start the writing task. |
890 | State::Idle(io) => { |
891 | // Take the I/O handle out to write on a blocking task. |
892 | let mut io = io.take().expect("inner value was taken out" ); |
893 | |
894 | // This pipe capacity seems to work well in practice. If it's too low, there will |
895 | // be too much synchronization between tasks. If too high, memory consumption |
896 | // increases. |
897 | let (mut reader, writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB |
898 | |
899 | // Spawn a blocking task that writes and returns the I/O handle when done. |
900 | let task = Executor::spawn(async move { |
901 | // Copy bytes from the pipe into the I/O handle until the pipe is closed or an |
902 | // error occurs. Flush the I/O handle at the end. |
903 | loop { |
904 | match future::poll_fn(|cx| reader.poll_drain(cx, &mut io)).await { |
905 | Ok(0) => return (io.flush(), io), |
906 | Ok(_) => {} |
907 | Err(err) => { |
908 | io.flush().ok(); |
909 | return (Err(err), io); |
910 | } |
911 | } |
912 | } |
913 | }); |
914 | |
915 | // Move into the busy state and poll again. |
916 | self.state = State::Writing(Some(writer), task); |
917 | } |
918 | |
919 | // If writing, write more bytes into the pipe. |
920 | State::Writing(Some(writer), _) => return writer.poll_fill(cx, buf), |
921 | } |
922 | } |
923 | } |
924 | |
925 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
926 | loop { |
927 | match &mut self.state { |
928 | // If not in idle state, stop the running task. |
929 | State::WithMut(..) |
930 | | State::Streaming(..) |
931 | | State::Writing(..) |
932 | | State::Reading(..) |
933 | | State::Seeking(..) => { |
934 | // Wait for the running task to stop. |
935 | ready!(self.poll_stop(cx))?; |
936 | } |
937 | |
938 | // Idle implies flushed. |
939 | State::Idle(_) => return Poll::Ready(Ok(())), |
940 | } |
941 | } |
942 | } |
943 | |
944 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
945 | // First, make sure the I/O handle is flushed. |
946 | ready!(Pin::new(&mut self).poll_flush(cx))?; |
947 | |
948 | // Then move into the idle state with no I/O handle, thus dropping it. |
949 | self.state = State::Idle(None); |
950 | Poll::Ready(Ok(())) |
951 | } |
952 | } |
953 | |
954 | impl<T: Seek + Send + 'static> AsyncSeek for Unblock<T> { |
955 | fn poll_seek( |
956 | mut self: Pin<&mut Self>, |
957 | cx: &mut Context<'_>, |
958 | pos: SeekFrom, |
959 | ) -> Poll<io::Result<u64>> { |
960 | loop { |
961 | match &mut self.state { |
962 | // If not in idle state, stop the running task. |
963 | State::WithMut(..) |
964 | | State::Streaming(..) |
965 | | State::Reading(..) |
966 | | State::Writing(..) => { |
967 | // Wait for the running task to stop. |
968 | ready!(self.poll_stop(cx))?; |
969 | } |
970 | |
971 | State::Idle(io) => { |
972 | // Take the I/O handle out to seek on a blocking task. |
973 | let mut io = io.take().expect("inner value was taken out" ); |
974 | |
975 | let task = Executor::spawn(async move { |
976 | let res = io.seek(pos); |
977 | (pos, res, io) |
978 | }); |
979 | self.state = State::Seeking(task); |
980 | } |
981 | |
982 | State::Seeking(task) => { |
983 | // Poll the task to wait for it to finish. |
984 | let (original_pos, res, io) = ready!(Pin::new(task).poll(cx)); |
985 | // Make sure to move into the idle state before reporting errors. |
986 | self.state = State::Idle(Some(io)); |
987 | let current = res?; |
988 | |
989 | // If the `pos` argument matches the original one, return the result. |
990 | if original_pos == pos { |
991 | return Poll::Ready(Ok(current)); |
992 | } |
993 | } |
994 | } |
995 | } |
996 | } |
997 | } |
998 | |
999 | #[cfg (all(test, not(target_family = "wasm" )))] |
1000 | mod tests { |
1001 | use super::*; |
1002 | |
1003 | #[test ] |
1004 | fn test_max_threads() { |
1005 | // properly set env var |
1006 | env::set_var(MAX_THREADS_ENV, "100" ); |
1007 | assert_eq!(100, Executor::max_threads().get()); |
1008 | |
1009 | // passed value below minimum, so we set it to minimum |
1010 | env::set_var(MAX_THREADS_ENV, "0" ); |
1011 | assert_eq!(1, Executor::max_threads().get()); |
1012 | |
1013 | // passed value above maximum, so we set to allowed maximum |
1014 | env::set_var(MAX_THREADS_ENV, "50000" ); |
1015 | assert_eq!(10000, Executor::max_threads().get()); |
1016 | |
1017 | // no env var, use default |
1018 | env::set_var(MAX_THREADS_ENV, "" ); |
1019 | assert_eq!(500, Executor::max_threads().get()); |
1020 | |
1021 | // not a number, use default |
1022 | env::set_var(MAX_THREADS_ENV, "NOTINT" ); |
1023 | assert_eq!(500, Executor::max_threads().get()); |
1024 | } |
1025 | } |
1026 | |