1use std::{marker::PhantomData, mem::MaybeUninit, sync::Once};
2
3use crate::Error;
4
5pub(crate) struct JobToken(PhantomData<()>);
6
7impl JobToken {
8 fn new() -> Self {
9 Self(PhantomData)
10 }
11}
12
13impl Drop for JobToken {
14 fn drop(&mut self) {
15 match JobTokenServer::new() {
16 JobTokenServer::Inherited(jobserver: &JobServer) => jobserver.release_token_raw(),
17 JobTokenServer::InProcess(jobserver: &JobServer) => jobserver.release_token_raw(),
18 }
19 }
20}
21
22enum JobTokenServer {
23 Inherited(inherited_jobserver::JobServer),
24 InProcess(inprocess_jobserver::JobServer),
25}
26
27impl JobTokenServer {
28 /// This function returns a static reference to the jobserver because
29 /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might
30 /// be closed by other jobserver users in the process) and better do it
31 /// at the start of the program.
32 /// - in case a jobserver cannot be created from env (e.g. it's not
33 /// present), we will create a global in-process only jobserver
34 /// that has to be static so that it will be shared by all cc
35 /// compilation.
36 fn new() -> &'static Self {
37 static INIT: Once = Once::new();
38 static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();
39
40 unsafe {
41 INIT.call_once(|| {
42 let server = inherited_jobserver::JobServer::from_env()
43 .map(Self::Inherited)
44 .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new()));
45 JOBSERVER = MaybeUninit::new(server);
46 });
47 // TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55.
48 &*JOBSERVER.as_ptr()
49 }
50 }
51}
52
53pub(crate) enum ActiveJobTokenServer {
54 Inherited(inherited_jobserver::ActiveJobServer<'static>),
55 InProcess(&'static inprocess_jobserver::JobServer),
56}
57
58impl ActiveJobTokenServer {
59 pub(crate) fn new() -> Result<Self, Error> {
60 match JobTokenServer::new() {
61 JobTokenServer::Inherited(inherited_jobserver: &JobServer) => {
62 inherited_jobserver.enter_active().map(Self::Inherited)
63 }
64 JobTokenServer::InProcess(inprocess_jobserver: &JobServer) => {
65 Ok(Self::InProcess(inprocess_jobserver))
66 }
67 }
68 }
69
70 pub(crate) async fn acquire(&self) -> Result<JobToken, Error> {
71 match &self {
72 Self::Inherited(jobserver: &ActiveJobServer<'_>) => jobserver.acquire().await,
73 Self::InProcess(jobserver: &&JobServer) => Ok(jobserver.acquire().await),
74 }
75 }
76}
77
78mod inherited_jobserver {
79 use super::JobToken;
80
81 use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind};
82
83 use std::{
84 io, mem,
85 sync::{mpsc, Mutex, MutexGuard, PoisonError},
86 };
87
88 pub(super) struct JobServer {
89 /// Implicit token for this process which is obtained and will be
90 /// released in parent. Since JobTokens only give back what they got,
91 /// there should be at most one global implicit token in the wild.
92 ///
93 /// Since Rust does not execute any `Drop` for global variables,
94 /// we can't just put it back to jobserver and then re-acquire it at
95 /// the end of the process.
96 ///
97 /// Use `Mutex` to avoid race between acquire and release.
98 /// If an `AtomicBool` is used, then it's possible for:
99 /// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already
100 /// set to `true`, continue to release it to jobserver
101 /// - `acquire` takes the global implicit token, set `global_implicit_token` to false
102 /// - `release_token_raw` now writes the token back into the jobserver, while
103 /// `global_implicit_token` is `false`
104 ///
105 /// If the program exits here, then cc effectively increases parallelism by one, which is
106 /// incorrect, hence we use a `Mutex` here.
107 global_implicit_token: Mutex<bool>,
108 inner: jobserver::Client,
109 }
110
111 impl JobServer {
112 pub(super) unsafe fn from_env() -> Option<Self> {
113 jobserver::Client::from_env().map(|inner| Self {
114 inner,
115 global_implicit_token: Mutex::new(true),
116 })
117 }
118
119 fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> {
120 self.global_implicit_token
121 .lock()
122 .unwrap_or_else(PoisonError::into_inner)
123 }
124
125 /// All tokens except for the global implicit token will be put back into the jobserver
126 /// immediately and they cannot be cached, since Rust does not call `Drop::drop` on
127 /// global variables.
128 pub(super) fn release_token_raw(&self) {
129 let mut global_implicit_token = self.get_global_implicit_token();
130
131 if *global_implicit_token {
132 // There's already a global implicit token, so this token must
133 // be released back into jobserver.
134 //
135 // `release_raw` should not block
136 let _ = self.inner.release_raw();
137 } else {
138 *global_implicit_token = true;
139 }
140 }
141
142 pub(super) fn enter_active(&self) -> Result<ActiveJobServer<'_>, Error> {
143 ActiveJobServer::new(self)
144 }
145 }
146
147 pub(crate) struct ActiveJobServer<'a> {
148 jobserver: &'a JobServer,
149 helper_thread: jobserver::HelperThread,
150 /// When rx is dropped, all the token stored within it will be dropped.
151 rx: mpsc::Receiver<io::Result<jobserver::Acquired>>,
152 }
153
154 impl<'a> ActiveJobServer<'a> {
155 fn new(jobserver: &'a JobServer) -> Result<Self, Error> {
156 let (tx, rx) = mpsc::channel();
157
158 Ok(Self {
159 rx,
160 helper_thread: jobserver.inner.clone().into_helper_thread(move |res| {
161 let _ = tx.send(res);
162 })?,
163 jobserver,
164 })
165 }
166
167 pub(super) async fn acquire(&self) -> Result<JobToken, Error> {
168 let mut has_requested_token = false;
169
170 loop {
171 // Fast path
172 if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) {
173 break Ok(JobToken::new());
174 }
175
176 // Cold path, no global implicit token, obtain one
177 match self.rx.try_recv() {
178 Ok(res) => {
179 let acquired = res?;
180 acquired.drop_without_releasing();
181 break Ok(JobToken::new());
182 }
183 Err(mpsc::TryRecvError::Disconnected) => {
184 break Err(Error::new(
185 ErrorKind::JobserverHelpThreadError,
186 "jobserver help thread has returned before ActiveJobServer is dropped",
187 ))
188 }
189 Err(mpsc::TryRecvError::Empty) => {
190 if !has_requested_token {
191 self.helper_thread.request_token();
192 has_requested_token = true;
193 }
194 YieldOnce::default().await
195 }
196 }
197 }
198 }
199 }
200}
201
202mod inprocess_jobserver {
203 use super::JobToken;
204
205 use crate::parallel::async_executor::YieldOnce;
206
207 use std::{
208 env::var,
209 sync::atomic::{
210 AtomicU32,
211 Ordering::{AcqRel, Acquire},
212 },
213 };
214
215 pub(crate) struct JobServer(AtomicU32);
216
217 impl JobServer {
218 pub(super) fn new() -> Self {
219 // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise
220 // just fall back to a semi-reasonable number.
221 //
222 // Note that we could use `num_cpus` here but it's an extra
223 // dependency that will almost never be used, so
224 // it's generally not too worth it.
225 let mut parallelism = 4;
226 // TODO: Use std::thread::available_parallelism as an upper bound
227 // when MSRV is bumped.
228 if let Ok(amt) = var("NUM_JOBS") {
229 if let Ok(amt) = amt.parse() {
230 parallelism = amt;
231 }
232 }
233
234 Self(AtomicU32::new(parallelism))
235 }
236
237 pub(super) async fn acquire(&self) -> JobToken {
238 loop {
239 let res = self
240 .0
241 .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1));
242
243 if res.is_ok() {
244 break JobToken::new();
245 }
246
247 YieldOnce::default().await
248 }
249 }
250
251 pub(super) fn release_token_raw(&self) {
252 self.0.fetch_add(1, AcqRel);
253 }
254 }
255}
256