1use libc::c_int;
2
3use crate::FromEnvErrorInner;
4use std::fs::{File, OpenOptions};
5use std::io::{self, Read, Write};
6use std::mem;
7use std::mem::MaybeUninit;
8use std::os::unix::prelude::*;
9use std::path::{Path, PathBuf};
10use std::process::Command;
11use std::ptr;
12use std::sync::{Arc, Once};
13use std::thread::{self, Builder, JoinHandle};
14use std::time::Duration;
15
16#[derive(Debug)]
17pub enum Client {
18 /// `--jobserver-auth=R,W`
19 Pipe { read: File, write: File },
20 /// `--jobserver-auth=fifo:PATH`
21 Fifo { file: File, path: PathBuf },
22}
23
24#[derive(Debug)]
25pub struct Acquired {
26 byte: u8,
27}
28
29impl Client {
30 pub fn new(mut limit: usize) -> io::Result<Client> {
31 let client = unsafe { Client::mk()? };
32
33 // I don't think the character written here matters, but I could be
34 // wrong!
35 const BUFFER: [u8; 128] = [b'|'; 128];
36
37 let mut write = client.write();
38
39 set_nonblocking(write.as_raw_fd(), true)?;
40
41 while limit > 0 {
42 let n = limit.min(BUFFER.len());
43
44 write.write_all(&BUFFER[..n])?;
45 limit -= n;
46 }
47
48 set_nonblocking(write.as_raw_fd(), false)?;
49
50 Ok(client)
51 }
52
53 unsafe fn mk() -> io::Result<Client> {
54 let mut pipes = [0; 2];
55
56 // Attempt atomically-create-with-cloexec if we can on Linux,
57 // detected by using the `syscall` function in `libc` to try to work
58 // with as many kernels/glibc implementations as possible.
59 #[cfg(target_os = "linux")]
60 {
61 use std::sync::atomic::{AtomicBool, Ordering};
62
63 static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true);
64 if PIPE2_AVAILABLE.load(Ordering::SeqCst) {
65 match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) {
66 -1 => {
67 let err = io::Error::last_os_error();
68 if err.raw_os_error() == Some(libc::ENOSYS) {
69 PIPE2_AVAILABLE.store(false, Ordering::SeqCst);
70 } else {
71 return Err(err);
72 }
73 }
74 _ => return Ok(Client::from_fds(pipes[0], pipes[1])),
75 }
76 }
77 }
78
79 cvt(libc::pipe(pipes.as_mut_ptr()))?;
80 drop(set_cloexec(pipes[0], true));
81 drop(set_cloexec(pipes[1], true));
82 Ok(Client::from_fds(pipes[0], pipes[1]))
83 }
84
85 pub(crate) unsafe fn open(s: &str, check_pipe: bool) -> Result<Client, FromEnvErrorInner> {
86 if let Some(client) = Self::from_fifo(s)? {
87 return Ok(client);
88 }
89 if let Some(client) = Self::from_pipe(s, check_pipe)? {
90 return Ok(client);
91 }
92 Err(FromEnvErrorInner::CannotParse(format!(
93 "expected `fifo:PATH` or `R,W`, found `{s}`"
94 )))
95 }
96
97 /// `--jobserver-auth=fifo:PATH`
98 fn from_fifo(s: &str) -> Result<Option<Client>, FromEnvErrorInner> {
99 let mut parts = s.splitn(2, ':');
100 if parts.next().unwrap() != "fifo" {
101 return Ok(None);
102 }
103 let path_str = parts.next().ok_or_else(|| {
104 FromEnvErrorInner::CannotParse("expected a path after `fifo:`".to_string())
105 })?;
106 let path = Path::new(path_str);
107 let file = OpenOptions::new()
108 .read(true)
109 .write(true)
110 .open(path)
111 .map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))?;
112 Ok(Some(Client::Fifo {
113 file,
114 path: path.into(),
115 }))
116 }
117
118 /// `--jobserver-auth=R,W`
119 unsafe fn from_pipe(s: &str, check_pipe: bool) -> Result<Option<Client>, FromEnvErrorInner> {
120 let mut parts = s.splitn(2, ',');
121 let read = parts.next().unwrap();
122 let write = match parts.next() {
123 Some(w) => w,
124 None => return Ok(None),
125 };
126 let read = read
127 .parse()
128 .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `read` fd: {e}")))?;
129 let write = write
130 .parse()
131 .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `write` fd: {e}")))?;
132
133 // If either or both of these file descriptors are negative,
134 // it means the jobserver is disabled for this process.
135 if read < 0 {
136 return Err(FromEnvErrorInner::NegativeFd(read));
137 }
138 if write < 0 {
139 return Err(FromEnvErrorInner::NegativeFd(write));
140 }
141
142 // Ok so we've got two integers that look like file descriptors, but
143 // for extra sanity checking let's see if they actually look like
144 // valid files and instances of a pipe if feature enabled before we
145 // return the client.
146 //
147 // If we're called from `make` *without* the leading + on our rule
148 // then we'll have `MAKEFLAGS` env vars but won't actually have
149 // access to the file descriptors.
150 //
151 // `NotAPipe` is a worse error, return it if it's reported for any of the two fds.
152 match (fd_check(read, check_pipe), fd_check(write, check_pipe)) {
153 (read_err @ Err(FromEnvErrorInner::NotAPipe(..)), _) => read_err?,
154 (_, write_err @ Err(FromEnvErrorInner::NotAPipe(..))) => write_err?,
155 (read_err, write_err) => {
156 read_err?;
157 write_err?;
158 }
159 }
160
161 drop(set_cloexec(read, true));
162 drop(set_cloexec(write, true));
163 Ok(Some(Client::from_fds(read, write)))
164 }
165
166 unsafe fn from_fds(read: c_int, write: c_int) -> Client {
167 Client::Pipe {
168 read: File::from_raw_fd(read),
169 write: File::from_raw_fd(write),
170 }
171 }
172
173 /// Gets the read end of our jobserver client.
174 fn read(&self) -> &File {
175 match self {
176 Client::Pipe { read, .. } => read,
177 Client::Fifo { file, .. } => file,
178 }
179 }
180
181 /// Gets the write end of our jobserver client.
182 fn write(&self) -> &File {
183 match self {
184 Client::Pipe { write, .. } => write,
185 Client::Fifo { file, .. } => file,
186 }
187 }
188
189 pub fn acquire(&self) -> io::Result<Acquired> {
190 // Ignore interrupts and keep trying if that happens
191 loop {
192 if let Some(token) = self.acquire_allow_interrupts()? {
193 return Ok(token);
194 }
195 }
196 }
197
198 /// Block waiting for a token, returning `None` if we're interrupted with
199 /// EINTR.
200 fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> {
201 // We don't actually know if the file descriptor here is set in
202 // blocking or nonblocking mode. AFAIK all released versions of
203 // `make` use blocking fds for the jobserver, but the unreleased
204 // version of `make` doesn't. In the unreleased version jobserver
205 // fds are set to nonblocking and combined with `pselect`
206 // internally.
207 //
208 // Here we try to be compatible with both strategies. We optimistically
209 // try to read from the file descriptor which then may block, return
210 // a token or indicate that polling is needed.
211 // Blocking reads (if possible) allows the kernel to be more selective
212 // about which readers to wake up when a token is written to the pipe.
213 //
214 // We use `poll` here to block this thread waiting for read
215 // readiness, and then afterwards we perform the `read` itself. If
216 // the `read` returns that it would block then we start over and try
217 // again.
218 //
219 // Also note that we explicitly don't handle EINTR here. That's used
220 // to shut us down, so we otherwise punt all errors upwards.
221 unsafe {
222 let mut fd: libc::pollfd = mem::zeroed();
223 let mut read = self.read();
224 fd.fd = read.as_raw_fd();
225 fd.events = libc::POLLIN;
226 loop {
227 let mut buf = [0];
228 match read.read(&mut buf) {
229 Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
230 Ok(_) => {
231 return Err(io::Error::new(
232 io::ErrorKind::Other,
233 "early EOF on jobserver pipe",
234 ));
235 }
236 Err(e) => match e.kind() {
237 io::ErrorKind::WouldBlock => { /* fall through to polling */ }
238 io::ErrorKind::Interrupted => return Ok(None),
239 _ => return Err(e),
240 },
241 }
242
243 loop {
244 fd.revents = 0;
245 if libc::poll(&mut fd, 1, -1) == -1 {
246 let e = io::Error::last_os_error();
247 return match e.kind() {
248 io::ErrorKind::Interrupted => Ok(None),
249 _ => Err(e),
250 };
251 }
252 if fd.revents != 0 {
253 break;
254 }
255 }
256 }
257 }
258 }
259
260 pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> {
261 // Note that the fd may be nonblocking but we're going to go ahead
262 // and assume that the writes here are always nonblocking (we can
263 // always quickly release a token). If that turns out to not be the
264 // case we'll get an error anyway!
265 let byte = data.map(|d| d.byte).unwrap_or(b'+');
266 match self.write().write(&[byte])? {
267 1 => Ok(()),
268 _ => Err(io::Error::new(
269 io::ErrorKind::Other,
270 "failed to write token back to jobserver",
271 )),
272 }
273 }
274
275 pub fn string_arg(&self) -> String {
276 match self {
277 Client::Pipe { read, write } => format!("{},{}", read.as_raw_fd(), write.as_raw_fd()),
278 Client::Fifo { path, .. } => format!("fifo:{}", path.to_str().unwrap()),
279 }
280 }
281
282 pub fn available(&self) -> io::Result<usize> {
283 let mut len = MaybeUninit::<c_int>::uninit();
284 cvt(unsafe { libc::ioctl(self.read().as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
285 Ok(unsafe { len.assume_init() } as usize)
286 }
287
288 pub fn configure(&self, cmd: &mut Command) {
289 match self {
290 // We `File::open`ed it when inheriting from environment,
291 // so no need to set cloexec for fifo.
292 Client::Fifo { .. } => return,
293 Client::Pipe { .. } => {}
294 };
295 // Here we basically just want to say that in the child process
296 // we'll configure the read/write file descriptors to *not* be
297 // cloexec, so they're inherited across the exec and specified as
298 // integers through `string_arg` above.
299 let read = self.read().as_raw_fd();
300 let write = self.write().as_raw_fd();
301 unsafe {
302 cmd.pre_exec(move || {
303 set_cloexec(read, false)?;
304 set_cloexec(write, false)?;
305 Ok(())
306 });
307 }
308 }
309}
310
311#[derive(Debug)]
312pub struct Helper {
313 thread: JoinHandle<()>,
314 state: Arc<super::HelperState>,
315}
316
317pub(crate) fn spawn_helper(
318 client: crate::Client,
319 state: Arc<super::HelperState>,
320 mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
321) -> io::Result<Helper> {
322 static USR1_INIT: Once = Once::new();
323 let mut err = None;
324 USR1_INIT.call_once(|| unsafe {
325 let mut new: libc::sigaction = mem::zeroed();
326 #[cfg(target_os = "aix")]
327 {
328 new.sa_union.__su_sigaction = sigusr1_handler;
329 }
330 #[cfg(not(target_os = "aix"))]
331 {
332 new.sa_sigaction = sigusr1_handler as usize;
333 }
334 new.sa_flags = libc::SA_SIGINFO as _;
335 if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 {
336 err = Some(io::Error::last_os_error());
337 }
338 });
339
340 if let Some(e) = err.take() {
341 return Err(e);
342 }
343
344 let state2 = state.clone();
345 let thread = Builder::new().spawn(move || {
346 state2.for_each_request(|helper| loop {
347 match client.inner.acquire_allow_interrupts() {
348 Ok(Some(data)) => {
349 break f(Ok(crate::Acquired {
350 client: client.inner.clone(),
351 data,
352 disabled: false,
353 }));
354 }
355 Err(e) => break f(Err(e)),
356 Ok(None) if helper.producer_done() => break,
357 Ok(None) => {}
358 }
359 });
360 })?;
361
362 Ok(Helper { thread, state })
363}
364
365impl Helper {
366 pub fn join(self) {
367 let dur = Duration::from_millis(10);
368 let mut state = self.state.lock();
369 debug_assert!(state.producer_done);
370
371 // We need to join our helper thread, and it could be blocked in one
372 // of two locations. First is the wait for a request, but the
373 // initial drop of `HelperState` will take care of that. Otherwise
374 // it may be blocked in `client.acquire()`. We actually have no way
375 // of interrupting that, so resort to `pthread_kill` as a fallback.
376 // This signal should interrupt any blocking `read` call with
377 // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit.
378 //
379 // Note that we don't do this forever though since there's a chance
380 // of bugs, so only do this opportunistically to make a best effort
381 // at clearing ourselves up.
382 for _ in 0..100 {
383 if state.consumer_done {
384 break;
385 }
386 unsafe {
387 // Ignore the return value here of `pthread_kill`,
388 // apparently on OSX if you kill a dead thread it will
389 // return an error, but on other platforms it may not. In
390 // that sense we don't actually know if this will succeed or
391 // not!
392 libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1);
393 }
394 state = self
395 .state
396 .cvar
397 .wait_timeout(state, dur)
398 .unwrap_or_else(|e| e.into_inner())
399 .0;
400 thread::yield_now(); // we really want the other thread to run
401 }
402
403 // If we managed to actually see the consumer get done, then we can
404 // definitely wait for the thread. Otherwise it's... off in the ether
405 // I guess?
406 if state.consumer_done {
407 drop(self.thread.join());
408 }
409 }
410}
411
412unsafe fn fcntl_check(fd: c_int) -> Result<(), FromEnvErrorInner> {
413 match libc::fcntl(fd, cmd:libc::F_GETFD) {
414 -1 => Err(FromEnvErrorInner::CannotOpenFd(
415 fd,
416 io::Error::last_os_error(),
417 )),
418 _ => Ok(()),
419 }
420}
421
422unsafe fn fd_check(fd: c_int, check_pipe: bool) -> Result<(), FromEnvErrorInner> {
423 if check_pipe {
424 let mut stat: stat = mem::zeroed();
425 if libc::fstat(fildes:fd, &mut stat) == -1 {
426 let last_os_error: Error = io::Error::last_os_error();
427 fcntl_check(fd)?;
428 Err(FromEnvErrorInner::NotAPipe(fd, Some(last_os_error)))
429 } else {
430 // On android arm and i686 mode_t is u16 and st_mode is u32,
431 // this generates a type mismatch when S_IFIFO (declared as mode_t)
432 // is used in operations with st_mode, so we use this workaround
433 // to get the value of S_IFIFO with the same type of st_mode.
434 #[allow(unused_assignments)]
435 let mut s_ififo: u32 = stat.st_mode;
436 s_ififo = libc::S_IFIFO as _;
437 if stat.st_mode & s_ififo == s_ififo {
438 return Ok(());
439 }
440 Err(FromEnvErrorInner::NotAPipe(fd, None))
441 }
442 } else {
443 fcntl_check(fd)
444 }
445}
446
447fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
448 unsafe {
449 let previous: i32 = cvt(libc::fcntl(fd, cmd:libc::F_GETFD))?;
450 let new: i32 = if set {
451 previous | libc::FD_CLOEXEC
452 } else {
453 previous & !libc::FD_CLOEXEC
454 };
455 if new != previous {
456 cvt(libc::fcntl(fd, cmd:libc::F_SETFD, new))?;
457 }
458 Ok(())
459 }
460}
461
462fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> {
463 let status_flag: i32 = if set { libc::O_NONBLOCK } else { 0 };
464
465 unsafe {
466 cvt(libc::fcntl(fd, cmd:libc::F_SETFL, status_flag))?;
467 }
468
469 Ok(())
470}
471
472fn cvt(t: c_int) -> io::Result<c_int> {
473 if t == -1 {
474 Err(io::Error::last_os_error())
475 } else {
476 Ok(t)
477 }
478}
479
480extern "C" fn sigusr1_handler(
481 _signum: c_int,
482 _info: *mut libc::siginfo_t,
483 _ptr: *mut libc::c_void,
484) {
485 // nothing to do
486}
487