1 | use libc::c_int;
|
2 |
|
3 | use crate::FromEnvErrorInner;
|
4 | use std::fs::{File, OpenOptions};
|
5 | use std::io::{self, Read, Write};
|
6 | use std::mem;
|
7 | use std::mem::MaybeUninit;
|
8 | use std::os::unix::prelude::*;
|
9 | use std::path::{Path, PathBuf};
|
10 | use std::process::Command;
|
11 | use std::ptr;
|
12 | use std::sync::{Arc, Once};
|
13 | use std::thread::{self, Builder, JoinHandle};
|
14 | use std::time::Duration;
|
15 |
|
16 | #[derive (Debug)]
|
17 | pub enum Client {
|
18 | /// `--jobserver-auth=R,W`
|
19 | Pipe { read: File, write: File },
|
20 | /// `--jobserver-auth=fifo:PATH`
|
21 | Fifo { file: File, path: PathBuf },
|
22 | }
|
23 |
|
24 | #[derive (Debug)]
|
25 | pub struct Acquired {
|
26 | byte: u8,
|
27 | }
|
28 |
|
29 | impl Client {
|
30 | pub fn new(mut limit: usize) -> io::Result<Client> {
|
31 | let client = unsafe { Client::mk()? };
|
32 |
|
33 | // I don't think the character written here matters, but I could be
|
34 | // wrong!
|
35 | const BUFFER: [u8; 128] = [b'|' ; 128];
|
36 |
|
37 | let mut write = client.write();
|
38 |
|
39 | set_nonblocking(write.as_raw_fd(), true)?;
|
40 |
|
41 | while limit > 0 {
|
42 | let n = limit.min(BUFFER.len());
|
43 |
|
44 | write.write_all(&BUFFER[..n])?;
|
45 | limit -= n;
|
46 | }
|
47 |
|
48 | set_nonblocking(write.as_raw_fd(), false)?;
|
49 |
|
50 | Ok(client)
|
51 | }
|
52 |
|
53 | unsafe fn mk() -> io::Result<Client> {
|
54 | let mut pipes = [0; 2];
|
55 |
|
56 | // Attempt atomically-create-with-cloexec if we can on Linux,
|
57 | // detected by using the `syscall` function in `libc` to try to work
|
58 | // with as many kernels/glibc implementations as possible.
|
59 | #[cfg (target_os = "linux" )]
|
60 | {
|
61 | use std::sync::atomic::{AtomicBool, Ordering};
|
62 |
|
63 | static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true);
|
64 | if PIPE2_AVAILABLE.load(Ordering::SeqCst) {
|
65 | match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) {
|
66 | -1 => {
|
67 | let err = io::Error::last_os_error();
|
68 | if err.raw_os_error() == Some(libc::ENOSYS) {
|
69 | PIPE2_AVAILABLE.store(false, Ordering::SeqCst);
|
70 | } else {
|
71 | return Err(err);
|
72 | }
|
73 | }
|
74 | _ => return Ok(Client::from_fds(pipes[0], pipes[1])),
|
75 | }
|
76 | }
|
77 | }
|
78 |
|
79 | cvt(libc::pipe(pipes.as_mut_ptr()))?;
|
80 | drop(set_cloexec(pipes[0], true));
|
81 | drop(set_cloexec(pipes[1], true));
|
82 | Ok(Client::from_fds(pipes[0], pipes[1]))
|
83 | }
|
84 |
|
85 | pub(crate) unsafe fn open(s: &str, check_pipe: bool) -> Result<Client, FromEnvErrorInner> {
|
86 | if let Some(client) = Self::from_fifo(s)? {
|
87 | return Ok(client);
|
88 | }
|
89 | if let Some(client) = Self::from_pipe(s, check_pipe)? {
|
90 | return Ok(client);
|
91 | }
|
92 | Err(FromEnvErrorInner::CannotParse(format!(
|
93 | "expected `fifo:PATH` or `R,W`, found ` {s}`"
|
94 | )))
|
95 | }
|
96 |
|
97 | /// `--jobserver-auth=fifo:PATH`
|
98 | fn from_fifo(s: &str) -> Result<Option<Client>, FromEnvErrorInner> {
|
99 | let mut parts = s.splitn(2, ':' );
|
100 | if parts.next().unwrap() != "fifo" {
|
101 | return Ok(None);
|
102 | }
|
103 | let path_str = parts.next().ok_or_else(|| {
|
104 | FromEnvErrorInner::CannotParse("expected a path after `fifo:`" .to_string())
|
105 | })?;
|
106 | let path = Path::new(path_str);
|
107 | let file = OpenOptions::new()
|
108 | .read(true)
|
109 | .write(true)
|
110 | .open(path)
|
111 | .map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))?;
|
112 | Ok(Some(Client::Fifo {
|
113 | file,
|
114 | path: path.into(),
|
115 | }))
|
116 | }
|
117 |
|
118 | /// `--jobserver-auth=R,W`
|
119 | unsafe fn from_pipe(s: &str, check_pipe: bool) -> Result<Option<Client>, FromEnvErrorInner> {
|
120 | let mut parts = s.splitn(2, ',' );
|
121 | let read = parts.next().unwrap();
|
122 | let write = match parts.next() {
|
123 | Some(w) => w,
|
124 | None => return Ok(None),
|
125 | };
|
126 | let read = read
|
127 | .parse()
|
128 | .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `read` fd: {e}" )))?;
|
129 | let write = write
|
130 | .parse()
|
131 | .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `write` fd: {e}" )))?;
|
132 |
|
133 | // If either or both of these file descriptors are negative,
|
134 | // it means the jobserver is disabled for this process.
|
135 | if read < 0 {
|
136 | return Err(FromEnvErrorInner::NegativeFd(read));
|
137 | }
|
138 | if write < 0 {
|
139 | return Err(FromEnvErrorInner::NegativeFd(write));
|
140 | }
|
141 |
|
142 | // Ok so we've got two integers that look like file descriptors, but
|
143 | // for extra sanity checking let's see if they actually look like
|
144 | // valid files and instances of a pipe if feature enabled before we
|
145 | // return the client.
|
146 | //
|
147 | // If we're called from `make` *without* the leading + on our rule
|
148 | // then we'll have `MAKEFLAGS` env vars but won't actually have
|
149 | // access to the file descriptors.
|
150 | //
|
151 | // `NotAPipe` is a worse error, return it if it's reported for any of the two fds.
|
152 | match (fd_check(read, check_pipe), fd_check(write, check_pipe)) {
|
153 | (read_err @ Err(FromEnvErrorInner::NotAPipe(..)), _) => read_err?,
|
154 | (_, write_err @ Err(FromEnvErrorInner::NotAPipe(..))) => write_err?,
|
155 | (read_err, write_err) => {
|
156 | read_err?;
|
157 | write_err?;
|
158 | }
|
159 | }
|
160 |
|
161 | drop(set_cloexec(read, true));
|
162 | drop(set_cloexec(write, true));
|
163 | Ok(Some(Client::from_fds(read, write)))
|
164 | }
|
165 |
|
166 | unsafe fn from_fds(read: c_int, write: c_int) -> Client {
|
167 | Client::Pipe {
|
168 | read: File::from_raw_fd(read),
|
169 | write: File::from_raw_fd(write),
|
170 | }
|
171 | }
|
172 |
|
173 | /// Gets the read end of our jobserver client.
|
174 | fn read(&self) -> &File {
|
175 | match self {
|
176 | Client::Pipe { read, .. } => read,
|
177 | Client::Fifo { file, .. } => file,
|
178 | }
|
179 | }
|
180 |
|
181 | /// Gets the write end of our jobserver client.
|
182 | fn write(&self) -> &File {
|
183 | match self {
|
184 | Client::Pipe { write, .. } => write,
|
185 | Client::Fifo { file, .. } => file,
|
186 | }
|
187 | }
|
188 |
|
189 | pub fn acquire(&self) -> io::Result<Acquired> {
|
190 | // Ignore interrupts and keep trying if that happens
|
191 | loop {
|
192 | if let Some(token) = self.acquire_allow_interrupts()? {
|
193 | return Ok(token);
|
194 | }
|
195 | }
|
196 | }
|
197 |
|
198 | /// Block waiting for a token, returning `None` if we're interrupted with
|
199 | /// EINTR.
|
200 | fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> {
|
201 | // We don't actually know if the file descriptor here is set in
|
202 | // blocking or nonblocking mode. AFAIK all released versions of
|
203 | // `make` use blocking fds for the jobserver, but the unreleased
|
204 | // version of `make` doesn't. In the unreleased version jobserver
|
205 | // fds are set to nonblocking and combined with `pselect`
|
206 | // internally.
|
207 | //
|
208 | // Here we try to be compatible with both strategies. We optimistically
|
209 | // try to read from the file descriptor which then may block, return
|
210 | // a token or indicate that polling is needed.
|
211 | // Blocking reads (if possible) allows the kernel to be more selective
|
212 | // about which readers to wake up when a token is written to the pipe.
|
213 | //
|
214 | // We use `poll` here to block this thread waiting for read
|
215 | // readiness, and then afterwards we perform the `read` itself. If
|
216 | // the `read` returns that it would block then we start over and try
|
217 | // again.
|
218 | //
|
219 | // Also note that we explicitly don't handle EINTR here. That's used
|
220 | // to shut us down, so we otherwise punt all errors upwards.
|
221 | unsafe {
|
222 | let mut fd: libc::pollfd = mem::zeroed();
|
223 | let mut read = self.read();
|
224 | fd.fd = read.as_raw_fd();
|
225 | fd.events = libc::POLLIN;
|
226 | loop {
|
227 | let mut buf = [0];
|
228 | match read.read(&mut buf) {
|
229 | Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
|
230 | Ok(_) => {
|
231 | return Err(io::Error::new(
|
232 | io::ErrorKind::Other,
|
233 | "early EOF on jobserver pipe" ,
|
234 | ));
|
235 | }
|
236 | Err(e) => match e.kind() {
|
237 | io::ErrorKind::WouldBlock => { /* fall through to polling */ }
|
238 | io::ErrorKind::Interrupted => return Ok(None),
|
239 | _ => return Err(e),
|
240 | },
|
241 | }
|
242 |
|
243 | loop {
|
244 | fd.revents = 0;
|
245 | if libc::poll(&mut fd, 1, -1) == -1 {
|
246 | let e = io::Error::last_os_error();
|
247 | return match e.kind() {
|
248 | io::ErrorKind::Interrupted => Ok(None),
|
249 | _ => Err(e),
|
250 | };
|
251 | }
|
252 | if fd.revents != 0 {
|
253 | break;
|
254 | }
|
255 | }
|
256 | }
|
257 | }
|
258 | }
|
259 |
|
260 | pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> {
|
261 | // Note that the fd may be nonblocking but we're going to go ahead
|
262 | // and assume that the writes here are always nonblocking (we can
|
263 | // always quickly release a token). If that turns out to not be the
|
264 | // case we'll get an error anyway!
|
265 | let byte = data.map(|d| d.byte).unwrap_or(b'+' );
|
266 | match self.write().write(&[byte])? {
|
267 | 1 => Ok(()),
|
268 | _ => Err(io::Error::new(
|
269 | io::ErrorKind::Other,
|
270 | "failed to write token back to jobserver" ,
|
271 | )),
|
272 | }
|
273 | }
|
274 |
|
275 | pub fn string_arg(&self) -> String {
|
276 | match self {
|
277 | Client::Pipe { read, write } => format!(" {}, {}" , read.as_raw_fd(), write.as_raw_fd()),
|
278 | Client::Fifo { path, .. } => format!("fifo: {}" , path.to_str().unwrap()),
|
279 | }
|
280 | }
|
281 |
|
282 | pub fn available(&self) -> io::Result<usize> {
|
283 | let mut len = MaybeUninit::<c_int>::uninit();
|
284 | cvt(unsafe { libc::ioctl(self.read().as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
|
285 | Ok(unsafe { len.assume_init() } as usize)
|
286 | }
|
287 |
|
288 | pub fn configure(&self, cmd: &mut Command) {
|
289 | match self {
|
290 | // We `File::open`ed it when inheriting from environment,
|
291 | // so no need to set cloexec for fifo.
|
292 | Client::Fifo { .. } => return,
|
293 | Client::Pipe { .. } => {}
|
294 | };
|
295 | // Here we basically just want to say that in the child process
|
296 | // we'll configure the read/write file descriptors to *not* be
|
297 | // cloexec, so they're inherited across the exec and specified as
|
298 | // integers through `string_arg` above.
|
299 | let read = self.read().as_raw_fd();
|
300 | let write = self.write().as_raw_fd();
|
301 | unsafe {
|
302 | cmd.pre_exec(move || {
|
303 | set_cloexec(read, false)?;
|
304 | set_cloexec(write, false)?;
|
305 | Ok(())
|
306 | });
|
307 | }
|
308 | }
|
309 | }
|
310 |
|
311 | #[derive (Debug)]
|
312 | pub struct Helper {
|
313 | thread: JoinHandle<()>,
|
314 | state: Arc<super::HelperState>,
|
315 | }
|
316 |
|
317 | pub(crate) fn spawn_helper(
|
318 | client: crate::Client,
|
319 | state: Arc<super::HelperState>,
|
320 | mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
|
321 | ) -> io::Result<Helper> {
|
322 | static USR1_INIT: Once = Once::new();
|
323 | let mut err = None;
|
324 | USR1_INIT.call_once(|| unsafe {
|
325 | let mut new: libc::sigaction = mem::zeroed();
|
326 | #[cfg (target_os = "aix" )]
|
327 | {
|
328 | new.sa_union.__su_sigaction = sigusr1_handler;
|
329 | }
|
330 | #[cfg (not(target_os = "aix" ))]
|
331 | {
|
332 | new.sa_sigaction = sigusr1_handler as usize;
|
333 | }
|
334 | new.sa_flags = libc::SA_SIGINFO as _;
|
335 | if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 {
|
336 | err = Some(io::Error::last_os_error());
|
337 | }
|
338 | });
|
339 |
|
340 | if let Some(e) = err.take() {
|
341 | return Err(e);
|
342 | }
|
343 |
|
344 | let state2 = state.clone();
|
345 | let thread = Builder::new().spawn(move || {
|
346 | state2.for_each_request(|helper| loop {
|
347 | match client.inner.acquire_allow_interrupts() {
|
348 | Ok(Some(data)) => {
|
349 | break f(Ok(crate::Acquired {
|
350 | client: client.inner.clone(),
|
351 | data,
|
352 | disabled: false,
|
353 | }));
|
354 | }
|
355 | Err(e) => break f(Err(e)),
|
356 | Ok(None) if helper.producer_done() => break,
|
357 | Ok(None) => {}
|
358 | }
|
359 | });
|
360 | })?;
|
361 |
|
362 | Ok(Helper { thread, state })
|
363 | }
|
364 |
|
365 | impl Helper {
|
366 | pub fn join(self) {
|
367 | let dur = Duration::from_millis(10);
|
368 | let mut state = self.state.lock();
|
369 | debug_assert!(state.producer_done);
|
370 |
|
371 | // We need to join our helper thread, and it could be blocked in one
|
372 | // of two locations. First is the wait for a request, but the
|
373 | // initial drop of `HelperState` will take care of that. Otherwise
|
374 | // it may be blocked in `client.acquire()`. We actually have no way
|
375 | // of interrupting that, so resort to `pthread_kill` as a fallback.
|
376 | // This signal should interrupt any blocking `read` call with
|
377 | // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit.
|
378 | //
|
379 | // Note that we don't do this forever though since there's a chance
|
380 | // of bugs, so only do this opportunistically to make a best effort
|
381 | // at clearing ourselves up.
|
382 | for _ in 0..100 {
|
383 | if state.consumer_done {
|
384 | break;
|
385 | }
|
386 | unsafe {
|
387 | // Ignore the return value here of `pthread_kill`,
|
388 | // apparently on OSX if you kill a dead thread it will
|
389 | // return an error, but on other platforms it may not. In
|
390 | // that sense we don't actually know if this will succeed or
|
391 | // not!
|
392 | libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1);
|
393 | }
|
394 | state = self
|
395 | .state
|
396 | .cvar
|
397 | .wait_timeout(state, dur)
|
398 | .unwrap_or_else(|e| e.into_inner())
|
399 | .0;
|
400 | thread::yield_now(); // we really want the other thread to run
|
401 | }
|
402 |
|
403 | // If we managed to actually see the consumer get done, then we can
|
404 | // definitely wait for the thread. Otherwise it's... off in the ether
|
405 | // I guess?
|
406 | if state.consumer_done {
|
407 | drop(self.thread.join());
|
408 | }
|
409 | }
|
410 | }
|
411 |
|
412 | unsafe fn fcntl_check(fd: c_int) -> Result<(), FromEnvErrorInner> {
|
413 | match libc::fcntl(fd, cmd:libc::F_GETFD) {
|
414 | -1 => Err(FromEnvErrorInner::CannotOpenFd(
|
415 | fd,
|
416 | io::Error::last_os_error(),
|
417 | )),
|
418 | _ => Ok(()),
|
419 | }
|
420 | }
|
421 |
|
422 | unsafe fn fd_check(fd: c_int, check_pipe: bool) -> Result<(), FromEnvErrorInner> {
|
423 | if check_pipe {
|
424 | let mut stat: stat = mem::zeroed();
|
425 | if libc::fstat(fildes:fd, &mut stat) == -1 {
|
426 | let last_os_error: Error = io::Error::last_os_error();
|
427 | fcntl_check(fd)?;
|
428 | Err(FromEnvErrorInner::NotAPipe(fd, Some(last_os_error)))
|
429 | } else {
|
430 | // On android arm and i686 mode_t is u16 and st_mode is u32,
|
431 | // this generates a type mismatch when S_IFIFO (declared as mode_t)
|
432 | // is used in operations with st_mode, so we use this workaround
|
433 | // to get the value of S_IFIFO with the same type of st_mode.
|
434 | #[allow (unused_assignments)]
|
435 | let mut s_ififo: u32 = stat.st_mode;
|
436 | s_ififo = libc::S_IFIFO as _;
|
437 | if stat.st_mode & s_ififo == s_ififo {
|
438 | return Ok(());
|
439 | }
|
440 | Err(FromEnvErrorInner::NotAPipe(fd, None))
|
441 | }
|
442 | } else {
|
443 | fcntl_check(fd)
|
444 | }
|
445 | }
|
446 |
|
447 | fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
|
448 | unsafe {
|
449 | let previous: i32 = cvt(libc::fcntl(fd, cmd:libc::F_GETFD))?;
|
450 | let new: i32 = if set {
|
451 | previous | libc::FD_CLOEXEC
|
452 | } else {
|
453 | previous & !libc::FD_CLOEXEC
|
454 | };
|
455 | if new != previous {
|
456 | cvt(libc::fcntl(fd, cmd:libc::F_SETFD, new))?;
|
457 | }
|
458 | Ok(())
|
459 | }
|
460 | }
|
461 |
|
462 | fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> {
|
463 | let status_flag: i32 = if set { libc::O_NONBLOCK } else { 0 };
|
464 |
|
465 | unsafe {
|
466 | cvt(libc::fcntl(fd, cmd:libc::F_SETFL, status_flag))?;
|
467 | }
|
468 |
|
469 | Ok(())
|
470 | }
|
471 |
|
472 | fn cvt(t: c_int) -> io::Result<c_int> {
|
473 | if t == -1 {
|
474 | Err(io::Error::last_os_error())
|
475 | } else {
|
476 | Ok(t)
|
477 | }
|
478 | }
|
479 |
|
480 | extern "C" fn sigusr1_handler(
|
481 | _signum: c_int,
|
482 | _info: *mut libc::siginfo_t,
|
483 | _ptr: *mut libc::c_void,
|
484 | ) {
|
485 | // nothing to do
|
486 | }
|
487 | |