1 | use std::{marker::PhantomData, mem::MaybeUninit, sync::Once}; |
2 | |
3 | use crate::Error; |
4 | |
5 | pub(crate) struct JobToken(PhantomData<()>); |
6 | |
7 | impl JobToken { |
8 | fn new() -> Self { |
9 | Self(PhantomData) |
10 | } |
11 | } |
12 | |
13 | impl Drop for JobToken { |
14 | fn drop(&mut self) { |
15 | match JobTokenServer::new() { |
16 | JobTokenServer::Inherited(jobserver: &JobServer) => jobserver.release_token_raw(), |
17 | JobTokenServer::InProcess(jobserver: &JobServer) => jobserver.release_token_raw(), |
18 | } |
19 | } |
20 | } |
21 | |
22 | enum JobTokenServer { |
23 | Inherited(inherited_jobserver::JobServer), |
24 | InProcess(inprocess_jobserver::JobServer), |
25 | } |
26 | |
27 | impl JobTokenServer { |
28 | /// This function returns a static reference to the jobserver because |
29 | /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might |
30 | /// be closed by other jobserver users in the process) and better do it |
31 | /// at the start of the program. |
32 | /// - in case a jobserver cannot be created from env (e.g. it's not |
33 | /// present), we will create a global in-process only jobserver |
34 | /// that has to be static so that it will be shared by all cc |
35 | /// compilation. |
36 | fn new() -> &'static Self { |
37 | static INIT: Once = Once::new(); |
38 | static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit(); |
39 | |
40 | unsafe { |
41 | INIT.call_once(|| { |
42 | let server = inherited_jobserver::JobServer::from_env() |
43 | .map(Self::Inherited) |
44 | .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())); |
45 | JOBSERVER = MaybeUninit::new(server); |
46 | }); |
47 | // TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55. |
48 | &*JOBSERVER.as_ptr() |
49 | } |
50 | } |
51 | } |
52 | |
53 | pub(crate) enum ActiveJobTokenServer { |
54 | Inherited(inherited_jobserver::ActiveJobServer<'static>), |
55 | InProcess(&'static inprocess_jobserver::JobServer), |
56 | } |
57 | |
58 | impl ActiveJobTokenServer { |
59 | pub(crate) fn new() -> Result<Self, Error> { |
60 | match JobTokenServer::new() { |
61 | JobTokenServer::Inherited(inherited_jobserver: &JobServer) => { |
62 | inherited_jobserver.enter_active().map(Self::Inherited) |
63 | } |
64 | JobTokenServer::InProcess(inprocess_jobserver: &JobServer) => { |
65 | Ok(Self::InProcess(inprocess_jobserver)) |
66 | } |
67 | } |
68 | } |
69 | |
70 | pub(crate) async fn acquire(&self) -> Result<JobToken, Error> { |
71 | match &self { |
72 | Self::Inherited(jobserver: &ActiveJobServer<'_>) => jobserver.acquire().await, |
73 | Self::InProcess(jobserver: &&JobServer) => Ok(jobserver.acquire().await), |
74 | } |
75 | } |
76 | } |
77 | |
78 | mod inherited_jobserver { |
79 | use super::JobToken; |
80 | |
81 | use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; |
82 | |
83 | use std::{ |
84 | io, mem, |
85 | sync::{mpsc, Mutex, MutexGuard, PoisonError}, |
86 | }; |
87 | |
88 | pub(super) struct JobServer { |
89 | /// Implicit token for this process which is obtained and will be |
90 | /// released in parent. Since JobTokens only give back what they got, |
91 | /// there should be at most one global implicit token in the wild. |
92 | /// |
93 | /// Since Rust does not execute any `Drop` for global variables, |
94 | /// we can't just put it back to jobserver and then re-acquire it at |
95 | /// the end of the process. |
96 | /// |
97 | /// Use `Mutex` to avoid race between acquire and release. |
98 | /// If an `AtomicBool` is used, then it's possible for: |
99 | /// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already |
100 | /// set to `true`, continue to release it to jobserver |
101 | /// - `acquire` takes the global implicit token, set `global_implicit_token` to false |
102 | /// - `release_token_raw` now writes the token back into the jobserver, while |
103 | /// `global_implicit_token` is `false` |
104 | /// |
105 | /// If the program exits here, then cc effectively increases parallelism by one, which is |
106 | /// incorrect, hence we use a `Mutex` here. |
107 | global_implicit_token: Mutex<bool>, |
108 | inner: jobserver::Client, |
109 | } |
110 | |
111 | impl JobServer { |
112 | pub(super) unsafe fn from_env() -> Option<Self> { |
113 | jobserver::Client::from_env().map(|inner| Self { |
114 | inner, |
115 | global_implicit_token: Mutex::new(true), |
116 | }) |
117 | } |
118 | |
119 | fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> { |
120 | self.global_implicit_token |
121 | .lock() |
122 | .unwrap_or_else(PoisonError::into_inner) |
123 | } |
124 | |
125 | /// All tokens except for the global implicit token will be put back into the jobserver |
126 | /// immediately and they cannot be cached, since Rust does not call `Drop::drop` on |
127 | /// global variables. |
128 | pub(super) fn release_token_raw(&self) { |
129 | let mut global_implicit_token = self.get_global_implicit_token(); |
130 | |
131 | if *global_implicit_token { |
132 | // There's already a global implicit token, so this token must |
133 | // be released back into jobserver. |
134 | // |
135 | // `release_raw` should not block |
136 | let _ = self.inner.release_raw(); |
137 | } else { |
138 | *global_implicit_token = true; |
139 | } |
140 | } |
141 | |
142 | pub(super) fn enter_active(&self) -> Result<ActiveJobServer<'_>, Error> { |
143 | ActiveJobServer::new(self) |
144 | } |
145 | } |
146 | |
147 | pub(crate) struct ActiveJobServer<'a> { |
148 | jobserver: &'a JobServer, |
149 | helper_thread: jobserver::HelperThread, |
150 | /// When rx is dropped, all the token stored within it will be dropped. |
151 | rx: mpsc::Receiver<io::Result<jobserver::Acquired>>, |
152 | } |
153 | |
154 | impl<'a> ActiveJobServer<'a> { |
155 | fn new(jobserver: &'a JobServer) -> Result<Self, Error> { |
156 | let (tx, rx) = mpsc::channel(); |
157 | |
158 | Ok(Self { |
159 | rx, |
160 | helper_thread: jobserver.inner.clone().into_helper_thread(move |res| { |
161 | let _ = tx.send(res); |
162 | })?, |
163 | jobserver, |
164 | }) |
165 | } |
166 | |
167 | pub(super) async fn acquire(&self) -> Result<JobToken, Error> { |
168 | let mut has_requested_token = false; |
169 | |
170 | loop { |
171 | // Fast path |
172 | if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) { |
173 | break Ok(JobToken::new()); |
174 | } |
175 | |
176 | // Cold path, no global implicit token, obtain one |
177 | match self.rx.try_recv() { |
178 | Ok(res) => { |
179 | let acquired = res?; |
180 | acquired.drop_without_releasing(); |
181 | break Ok(JobToken::new()); |
182 | } |
183 | Err(mpsc::TryRecvError::Disconnected) => { |
184 | break Err(Error::new( |
185 | ErrorKind::JobserverHelpThreadError, |
186 | "jobserver help thread has returned before ActiveJobServer is dropped" , |
187 | )) |
188 | } |
189 | Err(mpsc::TryRecvError::Empty) => { |
190 | if !has_requested_token { |
191 | self.helper_thread.request_token(); |
192 | has_requested_token = true; |
193 | } |
194 | YieldOnce::default().await |
195 | } |
196 | } |
197 | } |
198 | } |
199 | } |
200 | } |
201 | |
202 | mod inprocess_jobserver { |
203 | use super::JobToken; |
204 | |
205 | use crate::parallel::async_executor::YieldOnce; |
206 | |
207 | use std::{ |
208 | env::var, |
209 | sync::atomic::{ |
210 | AtomicU32, |
211 | Ordering::{AcqRel, Acquire}, |
212 | }, |
213 | }; |
214 | |
215 | pub(crate) struct JobServer(AtomicU32); |
216 | |
217 | impl JobServer { |
218 | pub(super) fn new() -> Self { |
219 | // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise |
220 | // just fall back to a semi-reasonable number. |
221 | // |
222 | // Note that we could use `num_cpus` here but it's an extra |
223 | // dependency that will almost never be used, so |
224 | // it's generally not too worth it. |
225 | let mut parallelism = 4; |
226 | // TODO: Use std::thread::available_parallelism as an upper bound |
227 | // when MSRV is bumped. |
228 | if let Ok(amt) = var("NUM_JOBS" ) { |
229 | if let Ok(amt) = amt.parse() { |
230 | parallelism = amt; |
231 | } |
232 | } |
233 | |
234 | Self(AtomicU32::new(parallelism)) |
235 | } |
236 | |
237 | pub(super) async fn acquire(&self) -> JobToken { |
238 | loop { |
239 | let res = self |
240 | .0 |
241 | .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); |
242 | |
243 | if res.is_ok() { |
244 | break JobToken::new(); |
245 | } |
246 | |
247 | YieldOnce::default().await |
248 | } |
249 | } |
250 | |
251 | pub(super) fn release_token_raw(&self) { |
252 | self.0.fetch_add(1, AcqRel); |
253 | } |
254 | } |
255 | } |
256 | |