1use std::marker::PhantomData;
2
3use crate::Error;
4
5use super::once_lock::OnceLock;
6
7pub(crate) struct JobToken(PhantomData<()>);
8
9impl JobToken {
10 fn new() -> Self {
11 Self(PhantomData)
12 }
13}
14
15impl Drop for JobToken {
16 fn drop(&mut self) {
17 match JobTokenServer::new() {
18 JobTokenServer::Inherited(jobserver: &JobServer) => jobserver.release_token_raw(),
19 JobTokenServer::InProcess(jobserver: &JobServer) => jobserver.release_token_raw(),
20 }
21 }
22}
23
24enum JobTokenServer {
25 Inherited(inherited_jobserver::JobServer),
26 InProcess(inprocess_jobserver::JobServer),
27}
28
29impl JobTokenServer {
30 /// This function returns a static reference to the jobserver because
31 /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might
32 /// be closed by other jobserver users in the process) and better do it
33 /// at the start of the program.
34 /// - in case a jobserver cannot be created from env (e.g. it's not
35 /// present), we will create a global in-process only jobserver
36 /// that has to be static so that it will be shared by all cc
37 /// compilation.
38 fn new() -> &'static Self {
39 // TODO: Replace with a OnceLock once MSRV is 1.70
40 static JOBSERVER: OnceLock<JobTokenServer> = OnceLock::new();
41
42 JOBSERVER.get_or_init(|| {
43 unsafeOption { inherited_jobserver::JobServer::from_env() }
44 .map(Self::Inherited)
45 .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new()))
46 })
47 }
48}
49
50pub(crate) enum ActiveJobTokenServer {
51 Inherited(inherited_jobserver::ActiveJobServer<'static>),
52 InProcess(&'static inprocess_jobserver::JobServer),
53}
54
55impl ActiveJobTokenServer {
56 pub(crate) fn new() -> Self {
57 match JobTokenServer::new() {
58 JobTokenServer::Inherited(inherited_jobserver: &JobServer) => {
59 Self::Inherited(inherited_jobserver.enter_active())
60 }
61 JobTokenServer::InProcess(inprocess_jobserver: &JobServer) => Self::InProcess(inprocess_jobserver),
62 }
63 }
64
65 pub(crate) async fn acquire(&mut self) -> Result<JobToken, Error> {
66 match self {
67 Self::Inherited(jobserver: &mut ActiveJobServer<'static>) => jobserver.acquire().await,
68 Self::InProcess(jobserver: &mut &'static JobServer) => Ok(jobserver.acquire().await),
69 }
70 }
71}
72
73mod inherited_jobserver {
74 use super::JobToken;
75
76 use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind};
77
78 use std::{
79 io, mem,
80 sync::{mpsc, Mutex, MutexGuard, PoisonError},
81 };
82
83 pub(super) struct JobServer {
84 /// Implicit token for this process which is obtained and will be
85 /// released in parent. Since JobTokens only give back what they got,
86 /// there should be at most one global implicit token in the wild.
87 ///
88 /// Since Rust does not execute any `Drop` for global variables,
89 /// we can't just put it back to jobserver and then re-acquire it at
90 /// the end of the process.
91 ///
92 /// Use `Mutex` to avoid race between acquire and release.
93 /// If an `AtomicBool` is used, then it's possible for:
94 /// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already
95 /// set to `true`, continue to release it to jobserver
96 /// - `acquire` takes the global implicit token, set `global_implicit_token` to false
97 /// - `release_token_raw` now writes the token back into the jobserver, while
98 /// `global_implicit_token` is `false`
99 ///
100 /// If the program exits here, then cc effectively increases parallelism by one, which is
101 /// incorrect, hence we use a `Mutex` here.
102 global_implicit_token: Mutex<bool>,
103 inner: jobserver::Client,
104 }
105
106 impl JobServer {
107 pub(super) unsafe fn from_env() -> Option<Self> {
108 jobserver::Client::from_env().map(|inner| Self {
109 inner,
110 global_implicit_token: Mutex::new(true),
111 })
112 }
113
114 fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> {
115 self.global_implicit_token
116 .lock()
117 .unwrap_or_else(PoisonError::into_inner)
118 }
119
120 /// All tokens except for the global implicit token will be put back into the jobserver
121 /// immediately and they cannot be cached, since Rust does not call `Drop::drop` on
122 /// global variables.
123 pub(super) fn release_token_raw(&self) {
124 let mut global_implicit_token = self.get_global_implicit_token();
125
126 if *global_implicit_token {
127 // There's already a global implicit token, so this token must
128 // be released back into jobserver.
129 //
130 // `release_raw` should not block
131 let _ = self.inner.release_raw();
132 } else {
133 *global_implicit_token = true;
134 }
135 }
136
137 pub(super) fn enter_active(&self) -> ActiveJobServer<'_> {
138 ActiveJobServer {
139 jobserver: self,
140 helper_thread: None,
141 }
142 }
143 }
144
145 struct HelperThread {
146 inner: jobserver::HelperThread,
147 /// When rx is dropped, all the token stored within it will be dropped.
148 rx: mpsc::Receiver<io::Result<jobserver::Acquired>>,
149 }
150
151 impl HelperThread {
152 fn new(jobserver: &JobServer) -> Result<Self, Error> {
153 let (tx, rx) = mpsc::channel();
154
155 Ok(Self {
156 rx,
157 inner: jobserver.inner.clone().into_helper_thread(move |res| {
158 let _ = tx.send(res);
159 })?,
160 })
161 }
162 }
163
164 pub(crate) struct ActiveJobServer<'a> {
165 jobserver: &'a JobServer,
166 helper_thread: Option<HelperThread>,
167 }
168
169 impl<'a> ActiveJobServer<'a> {
170 pub(super) async fn acquire(&mut self) -> Result<JobToken, Error> {
171 let mut has_requested_token = false;
172
173 loop {
174 // Fast path
175 if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) {
176 break Ok(JobToken::new());
177 }
178
179 match self.jobserver.inner.try_acquire() {
180 Ok(Some(acquired)) => {
181 acquired.drop_without_releasing();
182 break Ok(JobToken::new());
183 }
184 Ok(None) => YieldOnce::default().await,
185 Err(err) if err.kind() == io::ErrorKind::Unsupported => {
186 // Fallback to creating a help thread with blocking acquire
187 let helper_thread = if let Some(thread) = self.helper_thread.as_ref() {
188 thread
189 } else {
190 self.helper_thread
191 .insert(HelperThread::new(self.jobserver)?)
192 };
193
194 match helper_thread.rx.try_recv() {
195 Ok(res) => {
196 let acquired = res?;
197 acquired.drop_without_releasing();
198 break Ok(JobToken::new());
199 }
200 Err(mpsc::TryRecvError::Disconnected) => break Err(Error::new(
201 ErrorKind::JobserverHelpThreadError,
202 "jobserver help thread has returned before ActiveJobServer is dropped",
203 )),
204 Err(mpsc::TryRecvError::Empty) => {
205 if !has_requested_token {
206 helper_thread.inner.request_token();
207 has_requested_token = true;
208 }
209 YieldOnce::default().await
210 }
211 }
212 }
213 Err(err) => break Err(err.into()),
214 }
215 }
216 }
217 }
218}
219
220mod inprocess_jobserver {
221 use super::JobToken;
222
223 use crate::parallel::async_executor::YieldOnce;
224
225 use std::{
226 env::var,
227 sync::atomic::{
228 AtomicU32,
229 Ordering::{AcqRel, Acquire},
230 },
231 };
232
233 pub(crate) struct JobServer(AtomicU32);
234
235 impl JobServer {
236 pub(super) fn new() -> Self {
237 // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise
238 // just fall back to a semi-reasonable number.
239 //
240 // Note that we could use `num_cpus` here but it's an extra
241 // dependency that will almost never be used, so
242 // it's generally not too worth it.
243 let mut parallelism = 4;
244 // TODO: Use std::thread::available_parallelism as an upper bound
245 // when MSRV is bumped.
246 if let Ok(amt) = var("NUM_JOBS") {
247 if let Ok(amt) = amt.parse() {
248 parallelism = amt;
249 }
250 }
251
252 Self(AtomicU32::new(parallelism))
253 }
254
255 pub(super) async fn acquire(&self) -> JobToken {
256 loop {
257 let res = self
258 .0
259 .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1));
260
261 if res.is_ok() {
262 break JobToken::new();
263 }
264
265 YieldOnce::default().await
266 }
267 }
268
269 pub(super) fn release_token_raw(&self) {
270 self.0.fetch_add(1, AcqRel);
271 }
272 }
273}
274