1use crate::Task;
2use async_channel::{Receiver, Sender};
3use async_lock::Mutex;
4use futures_lite::future;
5use once_cell::sync::OnceCell;
6use std::{io, thread};
7
8// The current number of threads (some might be shutting down and not in the pool anymore)
9static GLOBAL_EXECUTOR_THREADS_NUMBER: Mutex<usize> = Mutex::new(data:0);
10// The expected number of threads (excluding the one that are shutting down)
11static GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER: Mutex<usize> = Mutex::new(data:0);
12
13thread_local! {
14 // Used to shutdown a thread when we receive a message from the Sender.
15 // We send an ack using to the Receiver once we're finished shutting down.
16 static THREAD_SHUTDOWN: OnceCell<(Sender<()>, Receiver<()>)> = OnceCell::new();
17}
18
19/// Spawn more executor threads, up to configured max value.
20///
21/// Returns how many threads we spawned.
22///
23/// # Examples
24///
25/// ```
26/// async_global_executor::spawn_more_threads(2);
27/// ```
28pub async fn spawn_more_threads(count: usize) -> io::Result<usize> {
29 // Get the current configuration, or initialize the thread pool.
30 let config: &Config = crateOption<&Config>::config::GLOBAL_EXECUTOR_CONFIG
31 .get()
32 .unwrap_or_else(|| {
33 crate::init();
34 crate::config::GLOBAL_EXECUTOR_CONFIG.get().unwrap()
35 });
36 // How many threads do we have (including shutting down)
37 let mut threads_number: MutexGuard<'_, usize> = GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await;
38 // How many threads are we supposed to have (when all shutdowns are complete)
39 let mut expected_threads_number: MutexGuard<'_, usize> = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
40 // Ensure we don't exceed configured max threads (including shutting down)
41 let count: usize = count.min(config.max_threads - *threads_number);
42 for _ in 0..count {
43 threadBuilder::Builder::new()
44 .name((config.thread_name_fn)())
45 .spawn(thread_main_loop)?;
46 *threads_number += 1;
47 *expected_threads_number += 1;
48 }
49 Ok(count)
50}
51
52/// Stop one of the executor threads, down to configured min value
53///
54/// Returns whether a thread has been stopped.
55///
56/// # Examples
57///
58/// ```
59/// async_global_executor::stop_thread();
60/// ```
61pub fn stop_thread() -> Task<bool> {
62 crate::spawn(future:stop_current_executor_thread())
63}
64
65/// Stop the current executor thread, if we exceed the configured min value
66///
67/// Returns whether the thread has been stopped.
68///
69/// # Examples
70///
71/// ```
72/// async_global_executor::stop_current_thread();
73/// ```
74pub fn stop_current_thread() -> Task<bool> {
75 crate::spawn_local(future:stop_current_executor_thread())
76}
77
78fn thread_main_loop() {
79 // This will be used to ask for shutdown.
80 let (s, r) = async_channel::bounded(1);
81 // This wil be used to ack once shutdown is complete.
82 let (s_ack, r_ack) = async_channel::bounded(1);
83 THREAD_SHUTDOWN.with(|thread_shutdown| drop(thread_shutdown.set((s, r_ack))));
84
85 // Main loop
86 loop {
87 #[allow(clippy::blocks_in_if_conditions)]
88 if std::panic::catch_unwind(|| {
89 crate::executor::LOCAL_EXECUTOR.with(|executor| {
90 let local = executor.run(async {
91 // Wait until we're asked to shutdown.
92 let _ = r.recv().await;
93 });
94 let global = crate::executor::GLOBAL_EXECUTOR.run(future::pending::<()>());
95 crate::reactor::block_on(future::or(local, global));
96 });
97 })
98 .is_ok()
99 {
100 break;
101 }
102 }
103
104 wait_for_local_executor_completion();
105
106 // Ack that we're done shutting down.
107 crate::reactor::block_on(async {
108 let _ = s_ack.send(()).await;
109 });
110}
111
112fn wait_for_local_executor_completion() {
113 loop {
114 #[allow(clippy::blocks_in_if_conditions)]
115 if stdResult<(), Box>::panic::catch_unwind(|| {
116 crate::executor::LOCAL_EXECUTOR.with(|executor: &LocalExecutor<'static>| {
117 crate::reactor::block_on(future:async {
118 // Wait for spawned tasks completion
119 while !executor.is_empty() {
120 executor.tick().await;
121 }
122 });
123 });
124 })
125 .is_ok()
126 {
127 break;
128 }
129 }
130}
131
132async fn stop_current_executor_thread() -> bool {
133 // How many threads are we supposed to have (when all shutdowns are complete)
134 let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
135 // Ensure we don't go below the configured min_threads (ignoring shutting down)
136 if *expected_threads_number
137 > crate::config::GLOBAL_EXECUTOR_CONFIG
138 .get()
139 .unwrap()
140 .min_threads
141 {
142 let (s, r_ack) =
143 THREAD_SHUTDOWN.with(|thread_shutdown| thread_shutdown.get().unwrap().clone());
144 let _ = s.send(()).await;
145 // We now expect to have one less thread (this one is shutting down)
146 *expected_threads_number -= 1;
147 // Unlock the Mutex
148 drop(expected_threads_number);
149 let _ = r_ack.recv().await;
150 // This thread is done shutting down
151 *GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await -= 1;
152 true
153 } else {
154 false
155 }
156}
157