1 | use std::marker::PhantomData; |
2 | |
3 | use crate::Error; |
4 | |
5 | use super::once_lock::OnceLock; |
6 | |
7 | pub(crate) struct JobToken(PhantomData<()>); |
8 | |
9 | impl JobToken { |
10 | fn new() -> Self { |
11 | Self(PhantomData) |
12 | } |
13 | } |
14 | |
15 | impl Drop for JobToken { |
16 | fn drop(&mut self) { |
17 | match JobTokenServer::new() { |
18 | JobTokenServer::Inherited(jobserver: &JobServer) => jobserver.release_token_raw(), |
19 | JobTokenServer::InProcess(jobserver: &JobServer) => jobserver.release_token_raw(), |
20 | } |
21 | } |
22 | } |
23 | |
24 | enum JobTokenServer { |
25 | Inherited(inherited_jobserver::JobServer), |
26 | InProcess(inprocess_jobserver::JobServer), |
27 | } |
28 | |
29 | impl JobTokenServer { |
30 | /// This function returns a static reference to the jobserver because |
31 | /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might |
32 | /// be closed by other jobserver users in the process) and better do it |
33 | /// at the start of the program. |
34 | /// - in case a jobserver cannot be created from env (e.g. it's not |
35 | /// present), we will create a global in-process only jobserver |
36 | /// that has to be static so that it will be shared by all cc |
37 | /// compilation. |
38 | fn new() -> &'static Self { |
39 | // TODO: Replace with a OnceLock once MSRV is 1.70 |
40 | static JOBSERVER: OnceLock<JobTokenServer> = OnceLock::new(); |
41 | |
42 | JOBSERVER.get_or_init(|| { |
43 | unsafeOption { inherited_jobserver::JobServer::from_env() } |
44 | .map(Self::Inherited) |
45 | .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())) |
46 | }) |
47 | } |
48 | } |
49 | |
50 | pub(crate) enum ActiveJobTokenServer { |
51 | Inherited(inherited_jobserver::ActiveJobServer<'static>), |
52 | InProcess(&'static inprocess_jobserver::JobServer), |
53 | } |
54 | |
55 | impl ActiveJobTokenServer { |
56 | pub(crate) fn new() -> Self { |
57 | match JobTokenServer::new() { |
58 | JobTokenServer::Inherited(inherited_jobserver: &JobServer) => { |
59 | Self::Inherited(inherited_jobserver.enter_active()) |
60 | } |
61 | JobTokenServer::InProcess(inprocess_jobserver: &JobServer) => Self::InProcess(inprocess_jobserver), |
62 | } |
63 | } |
64 | |
65 | pub(crate) async fn acquire(&mut self) -> Result<JobToken, Error> { |
66 | match self { |
67 | Self::Inherited(jobserver: &mut ActiveJobServer<'static>) => jobserver.acquire().await, |
68 | Self::InProcess(jobserver: &mut &'static JobServer) => Ok(jobserver.acquire().await), |
69 | } |
70 | } |
71 | } |
72 | |
73 | mod inherited_jobserver { |
74 | use super::JobToken; |
75 | |
76 | use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; |
77 | |
78 | use std::{ |
79 | io, mem, |
80 | sync::{mpsc, Mutex, MutexGuard, PoisonError}, |
81 | }; |
82 | |
83 | pub(super) struct JobServer { |
84 | /// Implicit token for this process which is obtained and will be |
85 | /// released in parent. Since JobTokens only give back what they got, |
86 | /// there should be at most one global implicit token in the wild. |
87 | /// |
88 | /// Since Rust does not execute any `Drop` for global variables, |
89 | /// we can't just put it back to jobserver and then re-acquire it at |
90 | /// the end of the process. |
91 | /// |
92 | /// Use `Mutex` to avoid race between acquire and release. |
93 | /// If an `AtomicBool` is used, then it's possible for: |
94 | /// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already |
95 | /// set to `true`, continue to release it to jobserver |
96 | /// - `acquire` takes the global implicit token, set `global_implicit_token` to false |
97 | /// - `release_token_raw` now writes the token back into the jobserver, while |
98 | /// `global_implicit_token` is `false` |
99 | /// |
100 | /// If the program exits here, then cc effectively increases parallelism by one, which is |
101 | /// incorrect, hence we use a `Mutex` here. |
102 | global_implicit_token: Mutex<bool>, |
103 | inner: jobserver::Client, |
104 | } |
105 | |
106 | impl JobServer { |
107 | pub(super) unsafe fn from_env() -> Option<Self> { |
108 | jobserver::Client::from_env().map(|inner| Self { |
109 | inner, |
110 | global_implicit_token: Mutex::new(true), |
111 | }) |
112 | } |
113 | |
114 | fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> { |
115 | self.global_implicit_token |
116 | .lock() |
117 | .unwrap_or_else(PoisonError::into_inner) |
118 | } |
119 | |
120 | /// All tokens except for the global implicit token will be put back into the jobserver |
121 | /// immediately and they cannot be cached, since Rust does not call `Drop::drop` on |
122 | /// global variables. |
123 | pub(super) fn release_token_raw(&self) { |
124 | let mut global_implicit_token = self.get_global_implicit_token(); |
125 | |
126 | if *global_implicit_token { |
127 | // There's already a global implicit token, so this token must |
128 | // be released back into jobserver. |
129 | // |
130 | // `release_raw` should not block |
131 | let _ = self.inner.release_raw(); |
132 | } else { |
133 | *global_implicit_token = true; |
134 | } |
135 | } |
136 | |
137 | pub(super) fn enter_active(&self) -> ActiveJobServer<'_> { |
138 | ActiveJobServer { |
139 | jobserver: self, |
140 | helper_thread: None, |
141 | } |
142 | } |
143 | } |
144 | |
145 | struct HelperThread { |
146 | inner: jobserver::HelperThread, |
147 | /// When rx is dropped, all the token stored within it will be dropped. |
148 | rx: mpsc::Receiver<io::Result<jobserver::Acquired>>, |
149 | } |
150 | |
151 | impl HelperThread { |
152 | fn new(jobserver: &JobServer) -> Result<Self, Error> { |
153 | let (tx, rx) = mpsc::channel(); |
154 | |
155 | Ok(Self { |
156 | rx, |
157 | inner: jobserver.inner.clone().into_helper_thread(move |res| { |
158 | let _ = tx.send(res); |
159 | })?, |
160 | }) |
161 | } |
162 | } |
163 | |
164 | pub(crate) struct ActiveJobServer<'a> { |
165 | jobserver: &'a JobServer, |
166 | helper_thread: Option<HelperThread>, |
167 | } |
168 | |
169 | impl<'a> ActiveJobServer<'a> { |
170 | pub(super) async fn acquire(&mut self) -> Result<JobToken, Error> { |
171 | let mut has_requested_token = false; |
172 | |
173 | loop { |
174 | // Fast path |
175 | if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) { |
176 | break Ok(JobToken::new()); |
177 | } |
178 | |
179 | match self.jobserver.inner.try_acquire() { |
180 | Ok(Some(acquired)) => { |
181 | acquired.drop_without_releasing(); |
182 | break Ok(JobToken::new()); |
183 | } |
184 | Ok(None) => YieldOnce::default().await, |
185 | Err(err) if err.kind() == io::ErrorKind::Unsupported => { |
186 | // Fallback to creating a help thread with blocking acquire |
187 | let helper_thread = if let Some(thread) = self.helper_thread.as_ref() { |
188 | thread |
189 | } else { |
190 | self.helper_thread |
191 | .insert(HelperThread::new(self.jobserver)?) |
192 | }; |
193 | |
194 | match helper_thread.rx.try_recv() { |
195 | Ok(res) => { |
196 | let acquired = res?; |
197 | acquired.drop_without_releasing(); |
198 | break Ok(JobToken::new()); |
199 | } |
200 | Err(mpsc::TryRecvError::Disconnected) => break Err(Error::new( |
201 | ErrorKind::JobserverHelpThreadError, |
202 | "jobserver help thread has returned before ActiveJobServer is dropped" , |
203 | )), |
204 | Err(mpsc::TryRecvError::Empty) => { |
205 | if !has_requested_token { |
206 | helper_thread.inner.request_token(); |
207 | has_requested_token = true; |
208 | } |
209 | YieldOnce::default().await |
210 | } |
211 | } |
212 | } |
213 | Err(err) => break Err(err.into()), |
214 | } |
215 | } |
216 | } |
217 | } |
218 | } |
219 | |
220 | mod inprocess_jobserver { |
221 | use super::JobToken; |
222 | |
223 | use crate::parallel::async_executor::YieldOnce; |
224 | |
225 | use std::{ |
226 | env::var, |
227 | sync::atomic::{ |
228 | AtomicU32, |
229 | Ordering::{AcqRel, Acquire}, |
230 | }, |
231 | }; |
232 | |
233 | pub(crate) struct JobServer(AtomicU32); |
234 | |
235 | impl JobServer { |
236 | pub(super) fn new() -> Self { |
237 | // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise |
238 | // just fall back to a semi-reasonable number. |
239 | // |
240 | // Note that we could use `num_cpus` here but it's an extra |
241 | // dependency that will almost never be used, so |
242 | // it's generally not too worth it. |
243 | let mut parallelism = 4; |
244 | // TODO: Use std::thread::available_parallelism as an upper bound |
245 | // when MSRV is bumped. |
246 | if let Ok(amt) = var("NUM_JOBS" ) { |
247 | if let Ok(amt) = amt.parse() { |
248 | parallelism = amt; |
249 | } |
250 | } |
251 | |
252 | Self(AtomicU32::new(parallelism)) |
253 | } |
254 | |
255 | pub(super) async fn acquire(&self) -> JobToken { |
256 | loop { |
257 | let res = self |
258 | .0 |
259 | .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); |
260 | |
261 | if res.is_ok() { |
262 | break JobToken::new(); |
263 | } |
264 | |
265 | YieldOnce::default().await |
266 | } |
267 | } |
268 | |
269 | pub(super) fn release_token_raw(&self) { |
270 | self.0.fetch_add(1, AcqRel); |
271 | } |
272 | } |
273 | } |
274 | |