| 1 | use libc::c_int; |
| 2 | |
| 3 | use crate::FromEnvErrorInner; |
| 4 | use std::fs::{File, OpenOptions}; |
| 5 | use std::io::{self, Read, Write}; |
| 6 | use std::mem; |
| 7 | use std::mem::MaybeUninit; |
| 8 | use std::os::unix::prelude::*; |
| 9 | use std::path::Path; |
| 10 | use std::process::Command; |
| 11 | use std::ptr; |
| 12 | use std::sync::{ |
| 13 | atomic::{AtomicBool, Ordering}, |
| 14 | Arc, Once, |
| 15 | }; |
| 16 | use std::thread::{self, Builder, JoinHandle}; |
| 17 | use std::time::Duration; |
| 18 | |
| 19 | #[derive (Debug)] |
| 20 | /// This preserves the `--jobserver-auth` type at creation time, |
| 21 | /// so auth type will be passed down to and inherit from sub-Make processes correctly. |
| 22 | /// |
| 23 | /// See <https://github.com/rust-lang/jobserver-rs/issues/99> for details. |
| 24 | enum ClientCreationArg { |
| 25 | Fds { read: c_int, write: c_int }, |
| 26 | Fifo(Box<Path>), |
| 27 | } |
| 28 | |
| 29 | #[derive (Debug)] |
| 30 | pub struct Client { |
| 31 | read: File, |
| 32 | write: File, |
| 33 | creation_arg: ClientCreationArg, |
| 34 | /// It is set to `None` if the pipe is shared with other processes, so it |
| 35 | /// cannot support non-blocking mode. |
| 36 | /// |
| 37 | /// If it is set to `Some`, then it can only go from |
| 38 | /// `Some(false)` -> `Some(true)` but not the other way around, |
| 39 | /// since that could cause a race condition. |
| 40 | is_non_blocking: Option<AtomicBool>, |
| 41 | } |
| 42 | |
| 43 | #[derive (Debug)] |
| 44 | pub struct Acquired { |
| 45 | byte: u8, |
| 46 | } |
| 47 | |
| 48 | impl Client { |
| 49 | pub fn new(mut limit: usize) -> io::Result<Client> { |
| 50 | let client = unsafe { Client::mk()? }; |
| 51 | |
| 52 | // I don't think the character written here matters, but I could be |
| 53 | // wrong! |
| 54 | const BUFFER: [u8; 128] = [b'|' ; 128]; |
| 55 | |
| 56 | let mut write = &client.write; |
| 57 | |
| 58 | set_nonblocking(write.as_raw_fd(), true)?; |
| 59 | |
| 60 | while limit > 0 { |
| 61 | let n = limit.min(BUFFER.len()); |
| 62 | |
| 63 | write.write_all(&BUFFER[..n])?; |
| 64 | limit -= n; |
| 65 | } |
| 66 | |
| 67 | set_nonblocking(write.as_raw_fd(), false)?; |
| 68 | |
| 69 | Ok(client) |
| 70 | } |
| 71 | |
| 72 | unsafe fn mk() -> io::Result<Client> { |
| 73 | let mut pipes = [0; 2]; |
| 74 | |
| 75 | // Attempt atomically-create-with-cloexec if we can on Linux, |
| 76 | // detected by using the `syscall` function in `libc` to try to work |
| 77 | // with as many kernels/glibc implementations as possible. |
| 78 | #[cfg (target_os = "linux" )] |
| 79 | { |
| 80 | static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true); |
| 81 | if PIPE2_AVAILABLE.load(Ordering::SeqCst) { |
| 82 | match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) { |
| 83 | -1 => { |
| 84 | let err = io::Error::last_os_error(); |
| 85 | if err.raw_os_error() == Some(libc::ENOSYS) { |
| 86 | PIPE2_AVAILABLE.store(false, Ordering::SeqCst); |
| 87 | } else { |
| 88 | return Err(err); |
| 89 | } |
| 90 | } |
| 91 | _ => return Ok(Client::from_fds(pipes[0], pipes[1])), |
| 92 | } |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | cvt(libc::pipe(pipes.as_mut_ptr()))?; |
| 97 | drop(set_cloexec(pipes[0], true)); |
| 98 | drop(set_cloexec(pipes[1], true)); |
| 99 | Ok(Client::from_fds(pipes[0], pipes[1])) |
| 100 | } |
| 101 | |
| 102 | pub(crate) unsafe fn open(s: &str, check_pipe: bool) -> Result<Client, FromEnvErrorInner> { |
| 103 | if let Some(client) = Self::from_fifo(s)? { |
| 104 | return Ok(client); |
| 105 | } |
| 106 | if let Some(client) = Self::from_pipe(s, check_pipe)? { |
| 107 | return Ok(client); |
| 108 | } |
| 109 | Err(FromEnvErrorInner::CannotParse(format!( |
| 110 | "expected `fifo:PATH` or `R,W`, found ` {s}`" |
| 111 | ))) |
| 112 | } |
| 113 | |
| 114 | /// `--jobserver-auth=fifo:PATH` |
| 115 | fn from_fifo(s: &str) -> Result<Option<Client>, FromEnvErrorInner> { |
| 116 | let mut parts = s.splitn(2, ':' ); |
| 117 | if parts.next().unwrap() != "fifo" { |
| 118 | return Ok(None); |
| 119 | } |
| 120 | let path_str = parts.next().ok_or_else(|| { |
| 121 | FromEnvErrorInner::CannotParse("expected a path after `fifo:`" .to_string()) |
| 122 | })?; |
| 123 | let path = Path::new(path_str); |
| 124 | |
| 125 | let open_file = || { |
| 126 | // Opening with read write is necessary, since opening with |
| 127 | // read-only or write-only could block the thread until another |
| 128 | // thread opens it with write-only or read-only (or RDWR) |
| 129 | // correspondingly. |
| 130 | OpenOptions::new() |
| 131 | .read(true) |
| 132 | .write(true) |
| 133 | .open(path) |
| 134 | .map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err)) |
| 135 | }; |
| 136 | |
| 137 | Ok(Some(Client { |
| 138 | read: open_file()?, |
| 139 | write: open_file()?, |
| 140 | creation_arg: ClientCreationArg::Fifo(path.into()), |
| 141 | is_non_blocking: Some(AtomicBool::new(false)), |
| 142 | })) |
| 143 | } |
| 144 | |
| 145 | /// `--jobserver-auth=R,W` |
| 146 | unsafe fn from_pipe(s: &str, check_pipe: bool) -> Result<Option<Client>, FromEnvErrorInner> { |
| 147 | let mut parts = s.splitn(2, ',' ); |
| 148 | let read = parts.next().unwrap(); |
| 149 | let write = match parts.next() { |
| 150 | Some(w) => w, |
| 151 | None => return Ok(None), |
| 152 | }; |
| 153 | let read = read |
| 154 | .parse() |
| 155 | .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `read` fd: {e}" )))?; |
| 156 | let write = write |
| 157 | .parse() |
| 158 | .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `write` fd: {e}" )))?; |
| 159 | |
| 160 | // If either or both of these file descriptors are negative, |
| 161 | // it means the jobserver is disabled for this process. |
| 162 | if read < 0 { |
| 163 | return Err(FromEnvErrorInner::NegativeFd(read)); |
| 164 | } |
| 165 | if write < 0 { |
| 166 | return Err(FromEnvErrorInner::NegativeFd(write)); |
| 167 | } |
| 168 | |
| 169 | let creation_arg = ClientCreationArg::Fds { read, write }; |
| 170 | |
| 171 | // Ok so we've got two integers that look like file descriptors, but |
| 172 | // for extra sanity checking let's see if they actually look like |
| 173 | // valid files and instances of a pipe if feature enabled before we |
| 174 | // return the client. |
| 175 | // |
| 176 | // If we're called from `make` *without* the leading + on our rule |
| 177 | // then we'll have `MAKEFLAGS` env vars but won't actually have |
| 178 | // access to the file descriptors. |
| 179 | // |
| 180 | // `NotAPipe` is a worse error, return it if it's reported for any of the two fds. |
| 181 | match (fd_check(read, check_pipe), fd_check(write, check_pipe)) { |
| 182 | (read_err @ Err(FromEnvErrorInner::NotAPipe(..)), _) => read_err?, |
| 183 | (_, write_err @ Err(FromEnvErrorInner::NotAPipe(..))) => write_err?, |
| 184 | (read_err, write_err) => { |
| 185 | read_err?; |
| 186 | write_err?; |
| 187 | |
| 188 | // Optimization: Try converting it to a fifo by using /dev/fd |
| 189 | // |
| 190 | // On linux, opening `/dev/fd/$fd` returns a fd with a new file description, |
| 191 | // so we can set `O_NONBLOCK` on it without affecting other processes. |
| 192 | // |
| 193 | // On macOS, opening `/dev/fd/$fd` seems to be the same as `File::try_clone`. |
| 194 | // |
| 195 | // I tested this on macOS 14 and Linux 6.5.13 |
| 196 | #[cfg (target_os = "linux" )] |
| 197 | if let (Ok(read), Ok(write)) = ( |
| 198 | File::open(format!("/dev/fd/ {}" , read)), |
| 199 | OpenOptions::new() |
| 200 | .write(true) |
| 201 | .open(format!("/dev/fd/ {}" , write)), |
| 202 | ) { |
| 203 | return Ok(Some(Client { |
| 204 | read, |
| 205 | write, |
| 206 | creation_arg, |
| 207 | is_non_blocking: Some(AtomicBool::new(false)), |
| 208 | })); |
| 209 | } |
| 210 | } |
| 211 | } |
| 212 | |
| 213 | Ok(Some(Client { |
| 214 | read: clone_fd_and_set_cloexec(read)?, |
| 215 | write: clone_fd_and_set_cloexec(write)?, |
| 216 | creation_arg, |
| 217 | is_non_blocking: None, |
| 218 | })) |
| 219 | } |
| 220 | |
| 221 | unsafe fn from_fds(read: c_int, write: c_int) -> Client { |
| 222 | Client { |
| 223 | read: File::from_raw_fd(read), |
| 224 | write: File::from_raw_fd(write), |
| 225 | creation_arg: ClientCreationArg::Fds { read, write }, |
| 226 | is_non_blocking: None, |
| 227 | } |
| 228 | } |
| 229 | |
| 230 | pub fn acquire(&self) -> io::Result<Acquired> { |
| 231 | // Ignore interrupts and keep trying if that happens |
| 232 | loop { |
| 233 | if let Some(token) = self.acquire_allow_interrupts()? { |
| 234 | return Ok(token); |
| 235 | } |
| 236 | } |
| 237 | } |
| 238 | |
| 239 | /// Block waiting for a token, returning `None` if we're interrupted with |
| 240 | /// EINTR. |
| 241 | fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> { |
| 242 | // We don't actually know if the file descriptor here is set in |
| 243 | // blocking or nonblocking mode. AFAIK all released versions of |
| 244 | // `make` use blocking fds for the jobserver, but the unreleased |
| 245 | // version of `make` doesn't. In the unreleased version jobserver |
| 246 | // fds are set to nonblocking and combined with `pselect` |
| 247 | // internally. |
| 248 | // |
| 249 | // Here we try to be compatible with both strategies. We optimistically |
| 250 | // try to read from the file descriptor which then may block, return |
| 251 | // a token or indicate that polling is needed. |
| 252 | // Blocking reads (if possible) allows the kernel to be more selective |
| 253 | // about which readers to wake up when a token is written to the pipe. |
| 254 | // |
| 255 | // We use `poll` here to block this thread waiting for read |
| 256 | // readiness, and then afterwards we perform the `read` itself. If |
| 257 | // the `read` returns that it would block then we start over and try |
| 258 | // again. |
| 259 | // |
| 260 | // Also note that we explicitly don't handle EINTR here. That's used |
| 261 | // to shut us down, so we otherwise punt all errors upwards. |
| 262 | unsafe { |
| 263 | let mut fd: libc::pollfd = mem::zeroed(); |
| 264 | let mut read = &self.read; |
| 265 | fd.fd = read.as_raw_fd(); |
| 266 | fd.events = libc::POLLIN; |
| 267 | loop { |
| 268 | let mut buf = [0]; |
| 269 | match read.read(&mut buf) { |
| 270 | Ok(1) => return Ok(Some(Acquired { byte: buf[0] })), |
| 271 | Ok(_) => { |
| 272 | return Err(io::Error::new( |
| 273 | io::ErrorKind::UnexpectedEof, |
| 274 | "early EOF on jobserver pipe" , |
| 275 | )); |
| 276 | } |
| 277 | Err(e) => match e.kind() { |
| 278 | io::ErrorKind::WouldBlock => { /* fall through to polling */ } |
| 279 | io::ErrorKind::Interrupted => return Ok(None), |
| 280 | _ => return Err(e), |
| 281 | }, |
| 282 | } |
| 283 | |
| 284 | loop { |
| 285 | fd.revents = 0; |
| 286 | if libc::poll(&mut fd, 1, -1) == -1 { |
| 287 | let e = io::Error::last_os_error(); |
| 288 | return match e.kind() { |
| 289 | io::ErrorKind::Interrupted => Ok(None), |
| 290 | _ => Err(e), |
| 291 | }; |
| 292 | } |
| 293 | if fd.revents != 0 { |
| 294 | break; |
| 295 | } |
| 296 | } |
| 297 | } |
| 298 | } |
| 299 | } |
| 300 | |
| 301 | pub fn try_acquire(&self) -> io::Result<Option<Acquired>> { |
| 302 | let mut buf = [0]; |
| 303 | let mut fifo = &self.read; |
| 304 | |
| 305 | if let Some(is_non_blocking) = self.is_non_blocking.as_ref() { |
| 306 | if !is_non_blocking.load(Ordering::Relaxed) { |
| 307 | set_nonblocking(fifo.as_raw_fd(), true)?; |
| 308 | is_non_blocking.store(true, Ordering::Relaxed); |
| 309 | } |
| 310 | } else { |
| 311 | return Err(io::ErrorKind::Unsupported.into()); |
| 312 | } |
| 313 | |
| 314 | loop { |
| 315 | match fifo.read(&mut buf) { |
| 316 | Ok(1) => break Ok(Some(Acquired { byte: buf[0] })), |
| 317 | Ok(_) => { |
| 318 | break Err(io::Error::new( |
| 319 | io::ErrorKind::UnexpectedEof, |
| 320 | "early EOF on jobserver pipe" , |
| 321 | )) |
| 322 | } |
| 323 | |
| 324 | Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(None), |
| 325 | Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, |
| 326 | |
| 327 | Err(err) => break Err(err), |
| 328 | } |
| 329 | } |
| 330 | } |
| 331 | |
| 332 | pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> { |
| 333 | // Note that the fd may be nonblocking but we're going to go ahead |
| 334 | // and assume that the writes here are always nonblocking (we can |
| 335 | // always quickly release a token). If that turns out to not be the |
| 336 | // case we'll get an error anyway! |
| 337 | let byte = data.map(|d| d.byte).unwrap_or(b'+' ); |
| 338 | match (&self.write).write(&[byte])? { |
| 339 | 1 => Ok(()), |
| 340 | _ => Err(io::Error::new( |
| 341 | io::ErrorKind::Other, |
| 342 | "failed to write token back to jobserver" , |
| 343 | )), |
| 344 | } |
| 345 | } |
| 346 | |
| 347 | pub fn string_arg(&self) -> String { |
| 348 | match &self.creation_arg { |
| 349 | ClientCreationArg::Fifo(path) => format!("fifo: {}" , path.display()), |
| 350 | ClientCreationArg::Fds { read, write } => format!(" {}, {}" , read, write), |
| 351 | } |
| 352 | } |
| 353 | |
| 354 | pub fn available(&self) -> io::Result<usize> { |
| 355 | let mut len = MaybeUninit::<c_int>::uninit(); |
| 356 | cvt(unsafe { libc::ioctl(self.read.as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?; |
| 357 | Ok(unsafe { len.assume_init() } as usize) |
| 358 | } |
| 359 | |
| 360 | pub fn configure(&self, cmd: &mut Command) { |
| 361 | if matches!(self.creation_arg, ClientCreationArg::Fifo { .. }) { |
| 362 | // We `File::open`ed it when inheriting from environment, |
| 363 | // so no need to set cloexec for fifo. |
| 364 | return; |
| 365 | } |
| 366 | // Here we basically just want to say that in the child process |
| 367 | // we'll configure the read/write file descriptors to *not* be |
| 368 | // cloexec, so they're inherited across the exec and specified as |
| 369 | // integers through `string_arg` above. |
| 370 | let read = self.read.as_raw_fd(); |
| 371 | let write = self.write.as_raw_fd(); |
| 372 | unsafe { |
| 373 | cmd.pre_exec(move || { |
| 374 | set_cloexec(read, false)?; |
| 375 | set_cloexec(write, false)?; |
| 376 | Ok(()) |
| 377 | }); |
| 378 | } |
| 379 | } |
| 380 | } |
| 381 | |
| 382 | #[derive (Debug)] |
| 383 | pub struct Helper { |
| 384 | thread: JoinHandle<()>, |
| 385 | state: Arc<super::HelperState>, |
| 386 | } |
| 387 | |
| 388 | pub(crate) fn spawn_helper( |
| 389 | client: crate::Client, |
| 390 | state: Arc<super::HelperState>, |
| 391 | mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>, |
| 392 | ) -> io::Result<Helper> { |
| 393 | static USR1_INIT: Once = Once::new(); |
| 394 | let mut err = None; |
| 395 | USR1_INIT.call_once(|| unsafe { |
| 396 | let mut new: libc::sigaction = mem::zeroed(); |
| 397 | #[cfg (target_os = "aix" )] |
| 398 | { |
| 399 | new.sa_union.__su_sigaction = sigusr1_handler; |
| 400 | } |
| 401 | #[cfg (not(target_os = "aix" ))] |
| 402 | { |
| 403 | new.sa_sigaction = sigusr1_handler as usize; |
| 404 | } |
| 405 | new.sa_flags = libc::SA_SIGINFO as _; |
| 406 | if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 { |
| 407 | err = Some(io::Error::last_os_error()); |
| 408 | } |
| 409 | }); |
| 410 | |
| 411 | if let Some(e) = err.take() { |
| 412 | return Err(e); |
| 413 | } |
| 414 | |
| 415 | let state2 = state.clone(); |
| 416 | let thread = Builder::new().spawn(move || { |
| 417 | state2.for_each_request(|helper| loop { |
| 418 | match client.inner.acquire_allow_interrupts() { |
| 419 | Ok(Some(data)) => { |
| 420 | break f(Ok(crate::Acquired { |
| 421 | client: client.inner.clone(), |
| 422 | data, |
| 423 | disabled: false, |
| 424 | })); |
| 425 | } |
| 426 | Err(e) => break f(Err(e)), |
| 427 | Ok(None) if helper.lock().producer_done => break, |
| 428 | Ok(None) => {} |
| 429 | } |
| 430 | }); |
| 431 | })?; |
| 432 | |
| 433 | Ok(Helper { thread, state }) |
| 434 | } |
| 435 | |
| 436 | impl Helper { |
| 437 | pub fn join(self) { |
| 438 | let dur = Duration::from_millis(10); |
| 439 | let mut state = self.state.lock(); |
| 440 | debug_assert!(state.producer_done); |
| 441 | |
| 442 | // We need to join our helper thread, and it could be blocked in one |
| 443 | // of two locations. First is the wait for a request, but the |
| 444 | // initial drop of `HelperState` will take care of that. Otherwise |
| 445 | // it may be blocked in `client.acquire()`. We actually have no way |
| 446 | // of interrupting that, so resort to `pthread_kill` as a fallback. |
| 447 | // This signal should interrupt any blocking `read` call with |
| 448 | // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit. |
| 449 | // |
| 450 | // Note that we don't do this forever though since there's a chance |
| 451 | // of bugs, so only do this opportunistically to make a best effort |
| 452 | // at clearing ourselves up. |
| 453 | for _ in 0..100 { |
| 454 | if state.consumer_done { |
| 455 | break; |
| 456 | } |
| 457 | unsafe { |
| 458 | // Ignore the return value here of `pthread_kill`, |
| 459 | // apparently on OSX if you kill a dead thread it will |
| 460 | // return an error, but on other platforms it may not. In |
| 461 | // that sense we don't actually know if this will succeed or |
| 462 | // not! |
| 463 | libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1); |
| 464 | } |
| 465 | state = self |
| 466 | .state |
| 467 | .cvar |
| 468 | .wait_timeout(state, dur) |
| 469 | .unwrap_or_else(|e| e.into_inner()) |
| 470 | .0; |
| 471 | thread::yield_now(); // we really want the other thread to run |
| 472 | } |
| 473 | |
| 474 | // If we managed to actually see the consumer get done, then we can |
| 475 | // definitely wait for the thread. Otherwise it's... off in the ether |
| 476 | // I guess? |
| 477 | if state.consumer_done { |
| 478 | drop(self.thread.join()); |
| 479 | } |
| 480 | } |
| 481 | } |
| 482 | |
| 483 | unsafe fn fcntl_check(fd: c_int) -> Result<(), FromEnvErrorInner> { |
| 484 | match libc::fcntl(fd, cmd:libc::F_GETFD) { |
| 485 | -1 => Err(FromEnvErrorInner::CannotOpenFd( |
| 486 | fd, |
| 487 | io::Error::last_os_error(), |
| 488 | )), |
| 489 | _ => Ok(()), |
| 490 | } |
| 491 | } |
| 492 | |
| 493 | unsafe fn fd_check(fd: c_int, check_pipe: bool) -> Result<(), FromEnvErrorInner> { |
| 494 | if check_pipe { |
| 495 | let mut stat: stat = mem::zeroed(); |
| 496 | if libc::fstat(fildes:fd, &mut stat) == -1 { |
| 497 | let last_os_error: Error = io::Error::last_os_error(); |
| 498 | fcntl_check(fd)?; |
| 499 | Err(FromEnvErrorInner::NotAPipe(fd, Some(last_os_error))) |
| 500 | } else { |
| 501 | // On android arm and i686 mode_t is u16 and st_mode is u32, |
| 502 | // this generates a type mismatch when S_IFIFO (declared as mode_t) |
| 503 | // is used in operations with st_mode, so we use this workaround |
| 504 | // to get the value of S_IFIFO with the same type of st_mode. |
| 505 | #[allow (unused_assignments)] |
| 506 | let mut s_ififo: u32 = stat.st_mode; |
| 507 | s_ififo = libc::S_IFIFO as _; |
| 508 | if stat.st_mode & s_ififo == s_ififo { |
| 509 | return Ok(()); |
| 510 | } |
| 511 | Err(FromEnvErrorInner::NotAPipe(fd, None)) |
| 512 | } |
| 513 | } else { |
| 514 | fcntl_check(fd) |
| 515 | } |
| 516 | } |
| 517 | |
| 518 | fn clone_fd_and_set_cloexec(fd: c_int) -> Result<File, FromEnvErrorInner> { |
| 519 | // Safety: fd is a valid fd dand it remains open until returns |
| 520 | unsafe { BorrowedFd::borrow_raw(fd) } |
| 521 | .try_clone_to_owned() |
| 522 | .map(File::from) |
| 523 | .map_err(|err: Error| FromEnvErrorInner::CannotOpenFd(fd, err)) |
| 524 | } |
| 525 | |
| 526 | fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> { |
| 527 | unsafe { |
| 528 | let previous: i32 = cvt(libc::fcntl(fd, cmd:libc::F_GETFD))?; |
| 529 | let new: i32 = if set { |
| 530 | previous | libc::FD_CLOEXEC |
| 531 | } else { |
| 532 | previous & !libc::FD_CLOEXEC |
| 533 | }; |
| 534 | if new != previous { |
| 535 | cvt(libc::fcntl(fd, cmd:libc::F_SETFD, new))?; |
| 536 | } |
| 537 | Ok(()) |
| 538 | } |
| 539 | } |
| 540 | |
| 541 | fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> { |
| 542 | let status_flag: i32 = if set { libc::O_NONBLOCK } else { 0 }; |
| 543 | |
| 544 | unsafe { |
| 545 | cvt(libc::fcntl(fd, cmd:libc::F_SETFL, status_flag))?; |
| 546 | } |
| 547 | |
| 548 | Ok(()) |
| 549 | } |
| 550 | |
| 551 | fn cvt(t: c_int) -> io::Result<c_int> { |
| 552 | if t == -1 { |
| 553 | Err(io::Error::last_os_error()) |
| 554 | } else { |
| 555 | Ok(t) |
| 556 | } |
| 557 | } |
| 558 | |
| 559 | extern "C" fn sigusr1_handler( |
| 560 | _signum: c_int, |
| 561 | _info: *mut libc::siginfo_t, |
| 562 | _ptr: *mut libc::c_void, |
| 563 | ) { |
| 564 | // nothing to do |
| 565 | } |
| 566 | |
| 567 | #[cfg (test)] |
| 568 | mod test { |
| 569 | use super::Client as ClientImp; |
| 570 | |
| 571 | use crate::{test::run_named_fifo_try_acquire_tests, Client}; |
| 572 | |
| 573 | use std::{ |
| 574 | fs::File, |
| 575 | io::{self, Write}, |
| 576 | os::unix::io::AsRawFd, |
| 577 | sync::Arc, |
| 578 | }; |
| 579 | |
| 580 | fn from_imp_client(imp: ClientImp) -> Client { |
| 581 | Client { |
| 582 | inner: Arc::new(imp), |
| 583 | } |
| 584 | } |
| 585 | |
| 586 | fn new_client_from_fifo() -> (Client, String) { |
| 587 | let file = tempfile::NamedTempFile::new().unwrap(); |
| 588 | let fifo_path = file.path().to_owned(); |
| 589 | file.close().unwrap(); // Remove the NamedTempFile to create fifo |
| 590 | |
| 591 | nix::unistd::mkfifo(&fifo_path, nix::sys::stat::Mode::S_IRWXU).unwrap(); |
| 592 | |
| 593 | let arg = format!("fifo:{}" , fifo_path.to_str().unwrap()); |
| 594 | |
| 595 | ( |
| 596 | ClientImp::from_fifo(&arg) |
| 597 | .unwrap() |
| 598 | .map(from_imp_client) |
| 599 | .unwrap(), |
| 600 | arg, |
| 601 | ) |
| 602 | } |
| 603 | |
| 604 | fn new_client_from_pipe() -> (Client, String) { |
| 605 | let (read, write) = nix::unistd::pipe().unwrap(); |
| 606 | let read = File::from(read); |
| 607 | let mut write = File::from(write); |
| 608 | |
| 609 | write.write_all(b"1" ).unwrap(); |
| 610 | |
| 611 | let arg = format!("{},{}" , read.as_raw_fd(), write.as_raw_fd()); |
| 612 | |
| 613 | ( |
| 614 | unsafe { ClientImp::from_pipe(&arg, true) } |
| 615 | .unwrap() |
| 616 | .map(from_imp_client) |
| 617 | .unwrap(), |
| 618 | arg, |
| 619 | ) |
| 620 | } |
| 621 | |
| 622 | #[test ] |
| 623 | fn test_try_acquire_named_fifo() { |
| 624 | run_named_fifo_try_acquire_tests(&new_client_from_fifo().0); |
| 625 | } |
| 626 | |
| 627 | #[test ] |
| 628 | fn test_try_acquire_annoymous_pipe_linux_specific_optimization() { |
| 629 | #[cfg (not(target_os = "linux" ))] |
| 630 | assert_eq!( |
| 631 | new_client_from_pipe().0.try_acquire().unwrap_err().kind(), |
| 632 | io::ErrorKind::Unsupported |
| 633 | ); |
| 634 | |
| 635 | #[cfg (target_os = "linux" )] |
| 636 | { |
| 637 | let client = new_client_from_pipe().0; |
| 638 | client.acquire().unwrap().drop_without_releasing(); |
| 639 | run_named_fifo_try_acquire_tests(&client); |
| 640 | } |
| 641 | } |
| 642 | |
| 643 | #[test ] |
| 644 | fn test_string_arg() { |
| 645 | let (client, arg) = new_client_from_fifo(); |
| 646 | assert_eq!(client.inner.string_arg(), arg); |
| 647 | |
| 648 | let (client, arg) = new_client_from_pipe(); |
| 649 | assert_eq!(client.inner.string_arg(), arg); |
| 650 | } |
| 651 | } |
| 652 | |