1 | //! Unix pipe types. |
2 | |
3 | use crate::io::interest::Interest; |
4 | use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready}; |
5 | |
6 | use mio::unix::pipe as mio_pipe; |
7 | use std::fs::File; |
8 | use std::io::{self, Read, Write}; |
9 | use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; |
10 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; |
11 | use std::path::Path; |
12 | use std::pin::Pin; |
13 | use std::task::{Context, Poll}; |
14 | |
15 | cfg_io_util! { |
16 | use bytes::BufMut; |
17 | } |
18 | |
19 | /// Options and flags which can be used to configure how a FIFO file is opened. |
20 | /// |
21 | /// This builder allows configuring how to create a pipe end from a FIFO file. |
22 | /// Generally speaking, when using `OpenOptions`, you'll first call [`new`], |
23 | /// then chain calls to methods to set each option, then call either |
24 | /// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you |
25 | /// are trying to open. This will give you a [`io::Result`] with a pipe end |
26 | /// inside that you can further operate on. |
27 | /// |
28 | /// [`new`]: OpenOptions::new |
29 | /// [`open_receiver`]: OpenOptions::open_receiver |
30 | /// [`open_sender`]: OpenOptions::open_sender |
31 | /// |
32 | /// # Examples |
33 | /// |
34 | /// Opening a pair of pipe ends from a FIFO file: |
35 | /// |
36 | /// ```no_run |
37 | /// use tokio::net::unix::pipe; |
38 | /// # use std::error::Error; |
39 | /// |
40 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
41 | /// |
42 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
43 | /// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; |
44 | /// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?; |
45 | /// # Ok(()) |
46 | /// # } |
47 | /// ``` |
48 | /// |
49 | /// Opening a [`Sender`] on Linux when you are sure the file is a FIFO: |
50 | /// |
51 | /// ```ignore |
52 | /// use tokio::net::unix::pipe; |
53 | /// use nix::{unistd::mkfifo, sys::stat::Mode}; |
54 | /// # use std::error::Error; |
55 | /// |
56 | /// // Our program has exclusive access to this path. |
57 | /// const FIFO_NAME: &str = "path/to/a/new/fifo" ; |
58 | /// |
59 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
60 | /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; |
61 | /// let tx = pipe::OpenOptions::new() |
62 | /// .read_write(true) |
63 | /// .unchecked(true) |
64 | /// .open_sender(FIFO_NAME)?; |
65 | /// # Ok(()) |
66 | /// # } |
67 | /// ``` |
68 | #[derive(Clone, Debug)] |
69 | pub struct OpenOptions { |
70 | #[cfg (target_os = "linux" )] |
71 | read_write: bool, |
72 | unchecked: bool, |
73 | } |
74 | |
75 | impl OpenOptions { |
76 | /// Creates a blank new set of options ready for configuration. |
77 | /// |
78 | /// All options are initially set to `false`. |
79 | pub fn new() -> OpenOptions { |
80 | OpenOptions { |
81 | #[cfg (target_os = "linux" )] |
82 | read_write: false, |
83 | unchecked: false, |
84 | } |
85 | } |
86 | |
87 | /// Sets the option for read-write access. |
88 | /// |
89 | /// This option, when true, will indicate that a FIFO file will be opened |
90 | /// in read-write access mode. This operation is not defined by the POSIX |
91 | /// standard and is only guaranteed to work on Linux. |
92 | /// |
93 | /// # Examples |
94 | /// |
95 | /// Opening a [`Sender`] even if there are no open reading ends: |
96 | /// |
97 | /// ```ignore |
98 | /// use tokio::net::unix::pipe; |
99 | /// |
100 | /// let tx = pipe::OpenOptions::new() |
101 | /// .read_write(true) |
102 | /// .open_sender("path/to/a/fifo" ); |
103 | /// ``` |
104 | /// |
105 | /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not |
106 | /// fail with [`UnexpectedEof`] during reading if all writing ends of the |
107 | /// pipe close the FIFO file. |
108 | /// |
109 | /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof |
110 | /// |
111 | /// ```ignore |
112 | /// use tokio::net::unix::pipe; |
113 | /// |
114 | /// let tx = pipe::OpenOptions::new() |
115 | /// .read_write(true) |
116 | /// .open_receiver("path/to/a/fifo" ); |
117 | /// ``` |
118 | #[cfg (target_os = "linux" )] |
119 | #[cfg_attr (docsrs, doc(cfg(target_os = "linux" )))] |
120 | pub fn read_write(&mut self, value: bool) -> &mut Self { |
121 | self.read_write = value; |
122 | self |
123 | } |
124 | |
125 | /// Sets the option to skip the check for FIFO file type. |
126 | /// |
127 | /// By default, [`open_receiver`] and [`open_sender`] functions will check |
128 | /// if the opened file is a FIFO file. Set this option to `true` if you are |
129 | /// sure the file is a FIFO file. |
130 | /// |
131 | /// [`open_receiver`]: OpenOptions::open_receiver |
132 | /// [`open_sender`]: OpenOptions::open_sender |
133 | /// |
134 | /// # Examples |
135 | /// |
136 | /// ```no_run |
137 | /// use tokio::net::unix::pipe; |
138 | /// use nix::{unistd::mkfifo, sys::stat::Mode}; |
139 | /// # use std::error::Error; |
140 | /// |
141 | /// // Our program has exclusive access to this path. |
142 | /// const FIFO_NAME: &str = "path/to/a/new/fifo" ; |
143 | /// |
144 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
145 | /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; |
146 | /// let rx = pipe::OpenOptions::new() |
147 | /// .unchecked(true) |
148 | /// .open_receiver(FIFO_NAME)?; |
149 | /// # Ok(()) |
150 | /// # } |
151 | /// ``` |
152 | pub fn unchecked(&mut self, value: bool) -> &mut Self { |
153 | self.unchecked = value; |
154 | self |
155 | } |
156 | |
157 | /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`. |
158 | /// |
159 | /// This function will open the FIFO file at the specified path, possibly |
160 | /// check if it is a pipe, and associate the pipe with the default event |
161 | /// loop for reading. |
162 | /// |
163 | /// # Errors |
164 | /// |
165 | /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`. |
166 | /// This function may also fail with other standard OS errors. |
167 | /// |
168 | /// # Panics |
169 | /// |
170 | /// This function panics if it is not called from within a runtime with |
171 | /// IO enabled. |
172 | /// |
173 | /// The runtime is usually set implicitly when this function is called |
174 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
175 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
176 | pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> { |
177 | let file = self.open(path.as_ref(), PipeEnd::Receiver)?; |
178 | Receiver::from_file_unchecked(file) |
179 | } |
180 | |
181 | /// Creates a [`Sender`] from a FIFO file with the options specified by `self`. |
182 | /// |
183 | /// This function will open the FIFO file at the specified path, possibly |
184 | /// check if it is a pipe, and associate the pipe with the default event |
185 | /// loop for writing. |
186 | /// |
187 | /// # Errors |
188 | /// |
189 | /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`. |
190 | /// If the file is not opened in read-write access mode and the file is not |
191 | /// currently open for reading, this function will fail with `ENXIO`. |
192 | /// This function may also fail with other standard OS errors. |
193 | /// |
194 | /// # Panics |
195 | /// |
196 | /// This function panics if it is not called from within a runtime with |
197 | /// IO enabled. |
198 | /// |
199 | /// The runtime is usually set implicitly when this function is called |
200 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
201 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
202 | pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> { |
203 | let file = self.open(path.as_ref(), PipeEnd::Sender)?; |
204 | Sender::from_file_unchecked(file) |
205 | } |
206 | |
207 | fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> { |
208 | let mut options = std::fs::OpenOptions::new(); |
209 | options |
210 | .read(pipe_end == PipeEnd::Receiver) |
211 | .write(pipe_end == PipeEnd::Sender) |
212 | .custom_flags(libc::O_NONBLOCK); |
213 | |
214 | #[cfg (target_os = "linux" )] |
215 | if self.read_write { |
216 | options.read(true).write(true); |
217 | } |
218 | |
219 | let file = options.open(path)?; |
220 | |
221 | if !self.unchecked && !is_fifo(&file)? { |
222 | return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe" )); |
223 | } |
224 | |
225 | Ok(file) |
226 | } |
227 | } |
228 | |
229 | impl Default for OpenOptions { |
230 | fn default() -> OpenOptions { |
231 | OpenOptions::new() |
232 | } |
233 | } |
234 | |
235 | #[derive(Clone, Copy, PartialEq, Eq, Debug)] |
236 | enum PipeEnd { |
237 | Sender, |
238 | Receiver, |
239 | } |
240 | |
241 | /// Writing end of a Unix pipe. |
242 | /// |
243 | /// It can be constructed from a FIFO file with [`OpenOptions::open_sender`]. |
244 | /// |
245 | /// Opening a named pipe for writing involves a few steps. |
246 | /// Call to [`OpenOptions::open_sender`] might fail with an error indicating |
247 | /// different things: |
248 | /// |
249 | /// * [`io::ErrorKind::NotFound`] - There is no file at the specified path. |
250 | /// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO. |
251 | /// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading. |
252 | /// Sleep for a while and try again. |
253 | /// * Other OS errors not specific to opening FIFO files. |
254 | /// |
255 | /// Opening a `Sender` from a FIFO file should look like this: |
256 | /// |
257 | /// ```no_run |
258 | /// use tokio::net::unix::pipe; |
259 | /// use tokio::time::{self, Duration}; |
260 | /// |
261 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
262 | /// |
263 | /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { |
264 | /// // Wait for a reader to open the file. |
265 | /// let tx = loop { |
266 | /// match pipe::OpenOptions::new().open_sender(FIFO_NAME) { |
267 | /// Ok(tx) => break tx, |
268 | /// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}, |
269 | /// Err(e) => return Err(e.into()), |
270 | /// } |
271 | /// |
272 | /// time::sleep(Duration::from_millis(50)).await; |
273 | /// }; |
274 | /// # Ok(()) |
275 | /// # } |
276 | /// ``` |
277 | /// |
278 | /// On Linux, it is possible to create a `Sender` without waiting in a sleeping |
279 | /// loop. This is done by opening a named pipe in read-write access mode with |
280 | /// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold |
281 | /// both a writing end and a reading end, and the latter allows to open a FIFO |
282 | /// without [`ENXIO`] error since the pipe is open for reading as well. |
283 | /// |
284 | /// `Sender` cannot be used to read from a pipe, so in practice the read access |
285 | /// is only used when a FIFO is opened. However, using a `Sender` in read-write |
286 | /// mode **may lead to lost data**, because written data will be dropped by the |
287 | /// system as soon as all pipe ends are closed. To avoid lost data you have to |
288 | /// make sure that a reading end has been opened before dropping a `Sender`. |
289 | /// |
290 | /// Note that using read-write access mode with FIFO files is not defined by |
291 | /// the POSIX standard and it is only guaranteed to work on Linux. |
292 | /// |
293 | /// ```ignore |
294 | /// use tokio::io::AsyncWriteExt; |
295 | /// use tokio::net::unix::pipe; |
296 | /// |
297 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
298 | /// |
299 | /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { |
300 | /// let mut tx = pipe::OpenOptions::new() |
301 | /// .read_write(true) |
302 | /// .open_sender(FIFO_NAME)?; |
303 | /// |
304 | /// // Asynchronously write to the pipe before a reader. |
305 | /// tx.write_all(b"hello world" ).await?; |
306 | /// # Ok(()) |
307 | /// # } |
308 | /// ``` |
309 | /// |
310 | /// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html |
311 | #[derive(Debug)] |
312 | pub struct Sender { |
313 | io: PollEvented<mio_pipe::Sender>, |
314 | } |
315 | |
316 | impl Sender { |
317 | fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> { |
318 | let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?; |
319 | Ok(Sender { io }) |
320 | } |
321 | |
322 | /// Creates a new `Sender` from a [`File`]. |
323 | /// |
324 | /// This function is intended to construct a pipe from a [`File`] representing |
325 | /// a special FIFO file. It will check if the file is a pipe and has write access, |
326 | /// set it in non-blocking mode and perform the conversion. |
327 | /// |
328 | /// # Errors |
329 | /// |
330 | /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it |
331 | /// does not have write access. Also fails with any standard OS error if it occurs. |
332 | /// |
333 | /// # Panics |
334 | /// |
335 | /// This function panics if it is not called from within a runtime with |
336 | /// IO enabled. |
337 | /// |
338 | /// The runtime is usually set implicitly when this function is called |
339 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
340 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
341 | pub fn from_file(mut file: File) -> io::Result<Sender> { |
342 | if !is_fifo(&file)? { |
343 | return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe" )); |
344 | } |
345 | |
346 | let flags = get_file_flags(&file)?; |
347 | if has_write_access(flags) { |
348 | set_nonblocking(&mut file, flags)?; |
349 | Sender::from_file_unchecked(file) |
350 | } else { |
351 | Err(io::Error::new( |
352 | io::ErrorKind::InvalidInput, |
353 | "not in O_WRONLY or O_RDWR access mode" , |
354 | )) |
355 | } |
356 | } |
357 | |
358 | /// Creates a new `Sender` from a [`File`] without checking pipe properties. |
359 | /// |
360 | /// This function is intended to construct a pipe from a File representing |
361 | /// a special FIFO file. The conversion assumes nothing about the underlying |
362 | /// file; it is left up to the user to make sure it is opened with write access, |
363 | /// represents a pipe and is set in non-blocking mode. |
364 | /// |
365 | /// # Examples |
366 | /// |
367 | /// ```no_run |
368 | /// use tokio::net::unix::pipe; |
369 | /// use std::fs::OpenOptions; |
370 | /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; |
371 | /// # use std::error::Error; |
372 | /// |
373 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
374 | /// |
375 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
376 | /// let file = OpenOptions::new() |
377 | /// .write(true) |
378 | /// .custom_flags(libc::O_NONBLOCK) |
379 | /// .open(FIFO_NAME)?; |
380 | /// if file.metadata()?.file_type().is_fifo() { |
381 | /// let tx = pipe::Sender::from_file_unchecked(file)?; |
382 | /// /* use the Sender */ |
383 | /// } |
384 | /// # Ok(()) |
385 | /// # } |
386 | /// ``` |
387 | /// |
388 | /// # Panics |
389 | /// |
390 | /// This function panics if it is not called from within a runtime with |
391 | /// IO enabled. |
392 | /// |
393 | /// The runtime is usually set implicitly when this function is called |
394 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
395 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
396 | pub fn from_file_unchecked(file: File) -> io::Result<Sender> { |
397 | let raw_fd = file.into_raw_fd(); |
398 | let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) }; |
399 | Sender::from_mio(mio_tx) |
400 | } |
401 | |
402 | /// Waits for any of the requested ready states. |
403 | /// |
404 | /// This function can be used instead of [`writable()`] to check the returned |
405 | /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events. |
406 | /// |
407 | /// The function may complete without the pipe being ready. This is a |
408 | /// false-positive and attempting an operation will return with |
409 | /// `io::ErrorKind::WouldBlock`. The function can also return with an empty |
410 | /// [`Ready`] set, so you should always check the returned value and possibly |
411 | /// wait again if the requested states are not set. |
412 | /// |
413 | /// [`writable()`]: Self::writable |
414 | /// |
415 | /// # Cancel safety |
416 | /// |
417 | /// This method is cancel safe. Once a readiness event occurs, the method |
418 | /// will continue to return immediately until the readiness event is |
419 | /// consumed by an attempt to write that fails with `WouldBlock` or |
420 | /// `Poll::Pending`. |
421 | pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { |
422 | let event = self.io.registration().readiness(interest).await?; |
423 | Ok(event.ready) |
424 | } |
425 | |
426 | /// Waits for the pipe to become writable. |
427 | /// |
428 | /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually |
429 | /// paired with [`try_write()`]. |
430 | /// |
431 | /// [`try_write()`]: Self::try_write |
432 | /// |
433 | /// # Examples |
434 | /// |
435 | /// ```no_run |
436 | /// use tokio::net::unix::pipe; |
437 | /// use std::io; |
438 | /// |
439 | /// #[tokio::main] |
440 | /// async fn main() -> io::Result<()> { |
441 | /// // Open a writing end of a fifo |
442 | /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo" )?; |
443 | /// |
444 | /// loop { |
445 | /// // Wait for the pipe to be writable |
446 | /// tx.writable().await?; |
447 | /// |
448 | /// // Try to write data, this may still fail with `WouldBlock` |
449 | /// // if the readiness event is a false positive. |
450 | /// match tx.try_write(b"hello world" ) { |
451 | /// Ok(n) => { |
452 | /// break; |
453 | /// } |
454 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
455 | /// continue; |
456 | /// } |
457 | /// Err(e) => { |
458 | /// return Err(e.into()); |
459 | /// } |
460 | /// } |
461 | /// } |
462 | /// |
463 | /// Ok(()) |
464 | /// } |
465 | /// ``` |
466 | pub async fn writable(&self) -> io::Result<()> { |
467 | self.ready(Interest::WRITABLE).await?; |
468 | Ok(()) |
469 | } |
470 | |
471 | /// Polls for write readiness. |
472 | /// |
473 | /// If the pipe is not currently ready for writing, this method will |
474 | /// store a clone of the `Waker` from the provided `Context`. When the pipe |
475 | /// becomes ready for writing, `Waker::wake` will be called on the waker. |
476 | /// |
477 | /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only |
478 | /// the `Waker` from the `Context` passed to the most recent call is |
479 | /// scheduled to receive a wakeup. |
480 | /// |
481 | /// This function is intended for cases where creating and pinning a future |
482 | /// via [`writable`] is not feasible. Where possible, using [`writable`] is |
483 | /// preferred, as this supports polling from multiple tasks at once. |
484 | /// |
485 | /// [`writable`]: Self::writable |
486 | /// |
487 | /// # Return value |
488 | /// |
489 | /// The function returns: |
490 | /// |
491 | /// * `Poll::Pending` if the pipe is not ready for writing. |
492 | /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing. |
493 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
494 | /// |
495 | /// # Errors |
496 | /// |
497 | /// This function may encounter any standard I/O error except `WouldBlock`. |
498 | pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
499 | self.io.registration().poll_write_ready(cx).map_ok(|_| ()) |
500 | } |
501 | |
502 | /// Tries to write a buffer to the pipe, returning how many bytes were |
503 | /// written. |
504 | /// |
505 | /// The function will attempt to write the entire contents of `buf`, but |
506 | /// only part of the buffer may be written. If the length of `buf` is not |
507 | /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the |
508 | /// write is guaranteed to be atomic, i.e. either the entire content of |
509 | /// `buf` will be written or this method will fail with `WouldBlock`. There |
510 | /// is no such guarantee if `buf` is larger than `PIPE_BUF`. |
511 | /// |
512 | /// This function is usually paired with [`writable`]. |
513 | /// |
514 | /// [`writable`]: Self::writable |
515 | /// |
516 | /// # Return |
517 | /// |
518 | /// If data is successfully written, `Ok(n)` is returned, where `n` is the |
519 | /// number of bytes written. If the pipe is not ready to write data, |
520 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
521 | /// |
522 | /// # Examples |
523 | /// |
524 | /// ```no_run |
525 | /// use tokio::net::unix::pipe; |
526 | /// use std::io; |
527 | /// |
528 | /// #[tokio::main] |
529 | /// async fn main() -> io::Result<()> { |
530 | /// // Open a writing end of a fifo |
531 | /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo" )?; |
532 | /// |
533 | /// loop { |
534 | /// // Wait for the pipe to be writable |
535 | /// tx.writable().await?; |
536 | /// |
537 | /// // Try to write data, this may still fail with `WouldBlock` |
538 | /// // if the readiness event is a false positive. |
539 | /// match tx.try_write(b"hello world" ) { |
540 | /// Ok(n) => { |
541 | /// break; |
542 | /// } |
543 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
544 | /// continue; |
545 | /// } |
546 | /// Err(e) => { |
547 | /// return Err(e.into()); |
548 | /// } |
549 | /// } |
550 | /// } |
551 | /// |
552 | /// Ok(()) |
553 | /// } |
554 | /// ``` |
555 | pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { |
556 | self.io |
557 | .registration() |
558 | .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) |
559 | } |
560 | |
561 | /// Tries to write several buffers to the pipe, returning how many bytes |
562 | /// were written. |
563 | /// |
564 | /// Data is written from each buffer in order, with the final buffer read |
565 | /// from possible being only partially consumed. This method behaves |
566 | /// equivalently to a single call to [`try_write()`] with concatenated |
567 | /// buffers. |
568 | /// |
569 | /// If the total length of buffers is not greater than `PIPE_BUF` (an OS |
570 | /// constant, 4096 under Linux), then the write is guaranteed to be atomic, |
571 | /// i.e. either the entire contents of buffers will be written or this |
572 | /// method will fail with `WouldBlock`. There is no such guarantee if the |
573 | /// total length of buffers is greater than `PIPE_BUF`. |
574 | /// |
575 | /// This function is usually paired with [`writable`]. |
576 | /// |
577 | /// [`try_write()`]: Self::try_write() |
578 | /// [`writable`]: Self::writable |
579 | /// |
580 | /// # Return |
581 | /// |
582 | /// If data is successfully written, `Ok(n)` is returned, where `n` is the |
583 | /// number of bytes written. If the pipe is not ready to write data, |
584 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
585 | /// |
586 | /// # Examples |
587 | /// |
588 | /// ```no_run |
589 | /// use tokio::net::unix::pipe; |
590 | /// use std::io; |
591 | /// |
592 | /// #[tokio::main] |
593 | /// async fn main() -> io::Result<()> { |
594 | /// // Open a writing end of a fifo |
595 | /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo" )?; |
596 | /// |
597 | /// let bufs = [io::IoSlice::new(b"hello " ), io::IoSlice::new(b"world" )]; |
598 | /// |
599 | /// loop { |
600 | /// // Wait for the pipe to be writable |
601 | /// tx.writable().await?; |
602 | /// |
603 | /// // Try to write data, this may still fail with `WouldBlock` |
604 | /// // if the readiness event is a false positive. |
605 | /// match tx.try_write_vectored(&bufs) { |
606 | /// Ok(n) => { |
607 | /// break; |
608 | /// } |
609 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
610 | /// continue; |
611 | /// } |
612 | /// Err(e) => { |
613 | /// return Err(e.into()); |
614 | /// } |
615 | /// } |
616 | /// } |
617 | /// |
618 | /// Ok(()) |
619 | /// } |
620 | /// ``` |
621 | pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { |
622 | self.io |
623 | .registration() |
624 | .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) |
625 | } |
626 | } |
627 | |
628 | impl AsyncWrite for Sender { |
629 | fn poll_write( |
630 | self: Pin<&mut Self>, |
631 | cx: &mut Context<'_>, |
632 | buf: &[u8], |
633 | ) -> Poll<io::Result<usize>> { |
634 | self.io.poll_write(cx, buf) |
635 | } |
636 | |
637 | fn poll_write_vectored( |
638 | self: Pin<&mut Self>, |
639 | cx: &mut Context<'_>, |
640 | bufs: &[io::IoSlice<'_>], |
641 | ) -> Poll<io::Result<usize>> { |
642 | self.io.poll_write_vectored(cx, bufs) |
643 | } |
644 | |
645 | fn is_write_vectored(&self) -> bool { |
646 | true |
647 | } |
648 | |
649 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { |
650 | Poll::Ready(Ok(())) |
651 | } |
652 | |
653 | fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { |
654 | Poll::Ready(Ok(())) |
655 | } |
656 | } |
657 | |
658 | impl AsRawFd for Sender { |
659 | fn as_raw_fd(&self) -> RawFd { |
660 | self.io.as_raw_fd() |
661 | } |
662 | } |
663 | |
664 | impl AsFd for Sender { |
665 | fn as_fd(&self) -> BorrowedFd<'_> { |
666 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
667 | } |
668 | } |
669 | |
670 | /// Reading end of a Unix pipe. |
671 | /// |
672 | /// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`]. |
673 | /// |
674 | /// # Examples |
675 | /// |
676 | /// Receiving messages from a named pipe in a loop: |
677 | /// |
678 | /// ```no_run |
679 | /// use tokio::net::unix::pipe; |
680 | /// use tokio::io::{self, AsyncReadExt}; |
681 | /// |
682 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
683 | /// |
684 | /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { |
685 | /// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; |
686 | /// loop { |
687 | /// let mut msg = vec![0; 256]; |
688 | /// match rx.read_exact(&mut msg).await { |
689 | /// Ok(_) => { |
690 | /// /* handle the message */ |
691 | /// } |
692 | /// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { |
693 | /// // Writing end has been closed, we should reopen the pipe. |
694 | /// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; |
695 | /// } |
696 | /// Err(e) => return Err(e.into()), |
697 | /// } |
698 | /// } |
699 | /// # } |
700 | /// ``` |
701 | /// |
702 | /// On Linux, you can use a `Receiver` in read-write access mode to implement |
703 | /// resilient reading from a named pipe. Unlike `Receiver` opened in read-only |
704 | /// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof` |
705 | /// when the writing end is closed. This way, a `Receiver` can asynchronously |
706 | /// wait for the next writer to open the pipe. |
707 | /// |
708 | /// You should not use functions waiting for EOF such as [`read_to_end`] with |
709 | /// a `Receiver` in read-write access mode, since it **may wait forever**. |
710 | /// `Receiver` in this mode also holds an open writing end, which prevents |
711 | /// receiving EOF. |
712 | /// |
713 | /// To set the read-write access mode you can use `OpenOptions::read_write`. |
714 | /// Note that using read-write access mode with FIFO files is not defined by |
715 | /// the POSIX standard and it is only guaranteed to work on Linux. |
716 | /// |
717 | /// ```ignore |
718 | /// use tokio::net::unix::pipe; |
719 | /// use tokio::io::AsyncReadExt; |
720 | /// # use std::error::Error; |
721 | /// |
722 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
723 | /// |
724 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
725 | /// let mut rx = pipe::OpenOptions::new() |
726 | /// .read_write(true) |
727 | /// .open_receiver(FIFO_NAME)?; |
728 | /// loop { |
729 | /// let mut msg = vec![0; 256]; |
730 | /// rx.read_exact(&mut msg).await?; |
731 | /// /* handle the message */ |
732 | /// } |
733 | /// # } |
734 | /// ``` |
735 | /// |
736 | /// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end |
737 | #[derive(Debug)] |
738 | pub struct Receiver { |
739 | io: PollEvented<mio_pipe::Receiver>, |
740 | } |
741 | |
742 | impl Receiver { |
743 | fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> { |
744 | let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?; |
745 | Ok(Receiver { io }) |
746 | } |
747 | |
748 | /// Creates a new `Receiver` from a [`File`]. |
749 | /// |
750 | /// This function is intended to construct a pipe from a [`File`] representing |
751 | /// a special FIFO file. It will check if the file is a pipe and has read access, |
752 | /// set it in non-blocking mode and perform the conversion. |
753 | /// |
754 | /// # Errors |
755 | /// |
756 | /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it |
757 | /// does not have read access. Also fails with any standard OS error if it occurs. |
758 | /// |
759 | /// # Panics |
760 | /// |
761 | /// This function panics if it is not called from within a runtime with |
762 | /// IO enabled. |
763 | /// |
764 | /// The runtime is usually set implicitly when this function is called |
765 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
766 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
767 | pub fn from_file(mut file: File) -> io::Result<Receiver> { |
768 | if !is_fifo(&file)? { |
769 | return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe" )); |
770 | } |
771 | |
772 | let flags = get_file_flags(&file)?; |
773 | if has_read_access(flags) { |
774 | set_nonblocking(&mut file, flags)?; |
775 | Receiver::from_file_unchecked(file) |
776 | } else { |
777 | Err(io::Error::new( |
778 | io::ErrorKind::InvalidInput, |
779 | "not in O_RDONLY or O_RDWR access mode" , |
780 | )) |
781 | } |
782 | } |
783 | |
784 | /// Creates a new `Receiver` from a [`File`] without checking pipe properties. |
785 | /// |
786 | /// This function is intended to construct a pipe from a File representing |
787 | /// a special FIFO file. The conversion assumes nothing about the underlying |
788 | /// file; it is left up to the user to make sure it is opened with read access, |
789 | /// represents a pipe and is set in non-blocking mode. |
790 | /// |
791 | /// # Examples |
792 | /// |
793 | /// ```no_run |
794 | /// use tokio::net::unix::pipe; |
795 | /// use std::fs::OpenOptions; |
796 | /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; |
797 | /// # use std::error::Error; |
798 | /// |
799 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
800 | /// |
801 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
802 | /// let file = OpenOptions::new() |
803 | /// .read(true) |
804 | /// .custom_flags(libc::O_NONBLOCK) |
805 | /// .open(FIFO_NAME)?; |
806 | /// if file.metadata()?.file_type().is_fifo() { |
807 | /// let rx = pipe::Receiver::from_file_unchecked(file)?; |
808 | /// /* use the Receiver */ |
809 | /// } |
810 | /// # Ok(()) |
811 | /// # } |
812 | /// ``` |
813 | /// |
814 | /// # Panics |
815 | /// |
816 | /// This function panics if it is not called from within a runtime with |
817 | /// IO enabled. |
818 | /// |
819 | /// The runtime is usually set implicitly when this function is called |
820 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
821 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
822 | pub fn from_file_unchecked(file: File) -> io::Result<Receiver> { |
823 | let raw_fd = file.into_raw_fd(); |
824 | let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) }; |
825 | Receiver::from_mio(mio_rx) |
826 | } |
827 | |
828 | /// Waits for any of the requested ready states. |
829 | /// |
830 | /// This function can be used instead of [`readable()`] to check the returned |
831 | /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events. |
832 | /// |
833 | /// The function may complete without the pipe being ready. This is a |
834 | /// false-positive and attempting an operation will return with |
835 | /// `io::ErrorKind::WouldBlock`. The function can also return with an empty |
836 | /// [`Ready`] set, so you should always check the returned value and possibly |
837 | /// wait again if the requested states are not set. |
838 | /// |
839 | /// [`readable()`]: Self::readable |
840 | /// |
841 | /// # Cancel safety |
842 | /// |
843 | /// This method is cancel safe. Once a readiness event occurs, the method |
844 | /// will continue to return immediately until the readiness event is |
845 | /// consumed by an attempt to read that fails with `WouldBlock` or |
846 | /// `Poll::Pending`. |
847 | pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { |
848 | let event = self.io.registration().readiness(interest).await?; |
849 | Ok(event.ready) |
850 | } |
851 | |
852 | /// Waits for the pipe to become readable. |
853 | /// |
854 | /// This function is equivalent to `ready(Interest::READABLE)` and is usually |
855 | /// paired with [`try_read()`]. |
856 | /// |
857 | /// [`try_read()`]: Self::try_read() |
858 | /// |
859 | /// # Examples |
860 | /// |
861 | /// ```no_run |
862 | /// use tokio::net::unix::pipe; |
863 | /// use std::io; |
864 | /// |
865 | /// #[tokio::main] |
866 | /// async fn main() -> io::Result<()> { |
867 | /// // Open a reading end of a fifo |
868 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo" )?; |
869 | /// |
870 | /// let mut msg = vec![0; 1024]; |
871 | /// |
872 | /// loop { |
873 | /// // Wait for the pipe to be readable |
874 | /// rx.readable().await?; |
875 | /// |
876 | /// // Try to read data, this may still fail with `WouldBlock` |
877 | /// // if the readiness event is a false positive. |
878 | /// match rx.try_read(&mut msg) { |
879 | /// Ok(n) => { |
880 | /// msg.truncate(n); |
881 | /// break; |
882 | /// } |
883 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
884 | /// continue; |
885 | /// } |
886 | /// Err(e) => { |
887 | /// return Err(e.into()); |
888 | /// } |
889 | /// } |
890 | /// } |
891 | /// |
892 | /// println!("GOT = {:?}" , msg); |
893 | /// Ok(()) |
894 | /// } |
895 | /// ``` |
896 | pub async fn readable(&self) -> io::Result<()> { |
897 | self.ready(Interest::READABLE).await?; |
898 | Ok(()) |
899 | } |
900 | |
901 | /// Polls for read readiness. |
902 | /// |
903 | /// If the pipe is not currently ready for reading, this method will |
904 | /// store a clone of the `Waker` from the provided `Context`. When the pipe |
905 | /// becomes ready for reading, `Waker::wake` will be called on the waker. |
906 | /// |
907 | /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only |
908 | /// the `Waker` from the `Context` passed to the most recent call is |
909 | /// scheduled to receive a wakeup. |
910 | /// |
911 | /// This function is intended for cases where creating and pinning a future |
912 | /// via [`readable`] is not feasible. Where possible, using [`readable`] is |
913 | /// preferred, as this supports polling from multiple tasks at once. |
914 | /// |
915 | /// [`readable`]: Self::readable |
916 | /// |
917 | /// # Return value |
918 | /// |
919 | /// The function returns: |
920 | /// |
921 | /// * `Poll::Pending` if the pipe is not ready for reading. |
922 | /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading. |
923 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
924 | /// |
925 | /// # Errors |
926 | /// |
927 | /// This function may encounter any standard I/O error except `WouldBlock`. |
928 | pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
929 | self.io.registration().poll_read_ready(cx).map_ok(|_| ()) |
930 | } |
931 | |
932 | /// Tries to read data from the pipe into the provided buffer, returning how |
933 | /// many bytes were read. |
934 | /// |
935 | /// Reads any pending data from the pipe but does not wait for new data |
936 | /// to arrive. On success, returns the number of bytes read. Because |
937 | /// `try_read()` is non-blocking, the buffer does not have to be stored by |
938 | /// the async task and can exist entirely on the stack. |
939 | /// |
940 | /// Usually [`readable()`] is used with this function. |
941 | /// |
942 | /// [`readable()`]: Self::readable() |
943 | /// |
944 | /// # Return |
945 | /// |
946 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
947 | /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: |
948 | /// |
949 | /// 1. The pipe's writing end is closed and will no longer write data. |
950 | /// 2. The specified buffer was 0 bytes in length. |
951 | /// |
952 | /// If the pipe is not ready to read data, |
953 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
954 | /// |
955 | /// # Examples |
956 | /// |
957 | /// ```no_run |
958 | /// use tokio::net::unix::pipe; |
959 | /// use std::io; |
960 | /// |
961 | /// #[tokio::main] |
962 | /// async fn main() -> io::Result<()> { |
963 | /// // Open a reading end of a fifo |
964 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo" )?; |
965 | /// |
966 | /// let mut msg = vec![0; 1024]; |
967 | /// |
968 | /// loop { |
969 | /// // Wait for the pipe to be readable |
970 | /// rx.readable().await?; |
971 | /// |
972 | /// // Try to read data, this may still fail with `WouldBlock` |
973 | /// // if the readiness event is a false positive. |
974 | /// match rx.try_read(&mut msg) { |
975 | /// Ok(n) => { |
976 | /// msg.truncate(n); |
977 | /// break; |
978 | /// } |
979 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
980 | /// continue; |
981 | /// } |
982 | /// Err(e) => { |
983 | /// return Err(e.into()); |
984 | /// } |
985 | /// } |
986 | /// } |
987 | /// |
988 | /// println!("GOT = {:?}" , msg); |
989 | /// Ok(()) |
990 | /// } |
991 | /// ``` |
992 | pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { |
993 | self.io |
994 | .registration() |
995 | .try_io(Interest::READABLE, || (&*self.io).read(buf)) |
996 | } |
997 | |
998 | /// Tries to read data from the pipe into the provided buffers, returning |
999 | /// how many bytes were read. |
1000 | /// |
1001 | /// Data is copied to fill each buffer in order, with the final buffer |
1002 | /// written to possibly being only partially filled. This method behaves |
1003 | /// equivalently to a single call to [`try_read()`] with concatenated |
1004 | /// buffers. |
1005 | /// |
1006 | /// Reads any pending data from the pipe but does not wait for new data |
1007 | /// to arrive. On success, returns the number of bytes read. Because |
1008 | /// `try_read_vectored()` is non-blocking, the buffer does not have to be |
1009 | /// stored by the async task and can exist entirely on the stack. |
1010 | /// |
1011 | /// Usually, [`readable()`] is used with this function. |
1012 | /// |
1013 | /// [`try_read()`]: Self::try_read() |
1014 | /// [`readable()`]: Self::readable() |
1015 | /// |
1016 | /// # Return |
1017 | /// |
1018 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
1019 | /// number of bytes read. `Ok(0)` indicates the pipe's writing end is |
1020 | /// closed and will no longer write data. If the pipe is not ready to read |
1021 | /// data `Err(io::ErrorKind::WouldBlock)` is returned. |
1022 | /// |
1023 | /// # Examples |
1024 | /// |
1025 | /// ```no_run |
1026 | /// use tokio::net::unix::pipe; |
1027 | /// use std::io; |
1028 | /// |
1029 | /// #[tokio::main] |
1030 | /// async fn main() -> io::Result<()> { |
1031 | /// // Open a reading end of a fifo |
1032 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo" )?; |
1033 | /// |
1034 | /// loop { |
1035 | /// // Wait for the pipe to be readable |
1036 | /// rx.readable().await?; |
1037 | /// |
1038 | /// // Creating the buffer **after** the `await` prevents it from |
1039 | /// // being stored in the async task. |
1040 | /// let mut buf_a = [0; 512]; |
1041 | /// let mut buf_b = [0; 1024]; |
1042 | /// let mut bufs = [ |
1043 | /// io::IoSliceMut::new(&mut buf_a), |
1044 | /// io::IoSliceMut::new(&mut buf_b), |
1045 | /// ]; |
1046 | /// |
1047 | /// // Try to read data, this may still fail with `WouldBlock` |
1048 | /// // if the readiness event is a false positive. |
1049 | /// match rx.try_read_vectored(&mut bufs) { |
1050 | /// Ok(0) => break, |
1051 | /// Ok(n) => { |
1052 | /// println!("read {} bytes" , n); |
1053 | /// } |
1054 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
1055 | /// continue; |
1056 | /// } |
1057 | /// Err(e) => { |
1058 | /// return Err(e.into()); |
1059 | /// } |
1060 | /// } |
1061 | /// } |
1062 | /// |
1063 | /// Ok(()) |
1064 | /// } |
1065 | /// ``` |
1066 | pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { |
1067 | self.io |
1068 | .registration() |
1069 | .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) |
1070 | } |
1071 | |
1072 | cfg_io_util! { |
1073 | /// Tries to read data from the pipe into the provided buffer, advancing the |
1074 | /// buffer's internal cursor, returning how many bytes were read. |
1075 | /// |
1076 | /// Reads any pending data from the pipe but does not wait for new data |
1077 | /// to arrive. On success, returns the number of bytes read. Because |
1078 | /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by |
1079 | /// the async task and can exist entirely on the stack. |
1080 | /// |
1081 | /// Usually, [`readable()`] or [`ready()`] is used with this function. |
1082 | /// |
1083 | /// [`readable()`]: Self::readable |
1084 | /// [`ready()`]: Self::ready |
1085 | /// |
1086 | /// # Return |
1087 | /// |
1088 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
1089 | /// number of bytes read. `Ok(0)` indicates the pipe's writing end is |
1090 | /// closed and will no longer write data. If the pipe is not ready to read |
1091 | /// data `Err(io::ErrorKind::WouldBlock)` is returned. |
1092 | /// |
1093 | /// # Examples |
1094 | /// |
1095 | /// ```no_run |
1096 | /// use tokio::net::unix::pipe; |
1097 | /// use std::io; |
1098 | /// |
1099 | /// #[tokio::main] |
1100 | /// async fn main() -> io::Result<()> { |
1101 | /// // Open a reading end of a fifo |
1102 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?; |
1103 | /// |
1104 | /// loop { |
1105 | /// // Wait for the pipe to be readable |
1106 | /// rx.readable().await?; |
1107 | /// |
1108 | /// let mut buf = Vec::with_capacity(4096); |
1109 | /// |
1110 | /// // Try to read data, this may still fail with `WouldBlock` |
1111 | /// // if the readiness event is a false positive. |
1112 | /// match rx.try_read_buf(&mut buf) { |
1113 | /// Ok(0) => break, |
1114 | /// Ok(n) => { |
1115 | /// println!("read {} bytes", n); |
1116 | /// } |
1117 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
1118 | /// continue; |
1119 | /// } |
1120 | /// Err(e) => { |
1121 | /// return Err(e.into()); |
1122 | /// } |
1123 | /// } |
1124 | /// } |
1125 | /// |
1126 | /// Ok(()) |
1127 | /// } |
1128 | /// ``` |
1129 | pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { |
1130 | self.io.registration().try_io(Interest::READABLE, || { |
1131 | use std::io::Read; |
1132 | |
1133 | let dst = buf.chunk_mut(); |
1134 | let dst = |
1135 | unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; |
1136 | |
1137 | // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath, |
1138 | // which correctly handles reads into uninitialized memory. |
1139 | let n = (&*self.io).read(dst)?; |
1140 | |
1141 | unsafe { |
1142 | buf.advance_mut(n); |
1143 | } |
1144 | |
1145 | Ok(n) |
1146 | }) |
1147 | } |
1148 | } |
1149 | } |
1150 | |
1151 | impl AsyncRead for Receiver { |
1152 | fn poll_read( |
1153 | self: Pin<&mut Self>, |
1154 | cx: &mut Context<'_>, |
1155 | buf: &mut ReadBuf<'_>, |
1156 | ) -> Poll<io::Result<()>> { |
1157 | // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath, |
1158 | // which correctly handles reads into uninitialized memory. |
1159 | unsafe { self.io.poll_read(cx, buf) } |
1160 | } |
1161 | } |
1162 | |
1163 | impl AsRawFd for Receiver { |
1164 | fn as_raw_fd(&self) -> RawFd { |
1165 | self.io.as_raw_fd() |
1166 | } |
1167 | } |
1168 | |
1169 | impl AsFd for Receiver { |
1170 | fn as_fd(&self) -> BorrowedFd<'_> { |
1171 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
1172 | } |
1173 | } |
1174 | |
1175 | /// Checks if file is a FIFO |
1176 | fn is_fifo(file: &File) -> io::Result<bool> { |
1177 | Ok(file.metadata()?.file_type().is_fifo()) |
1178 | } |
1179 | |
1180 | /// Gets file descriptor's flags by fcntl. |
1181 | fn get_file_flags(file: &File) -> io::Result<libc::c_int> { |
1182 | let fd = file.as_raw_fd(); |
1183 | let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; |
1184 | if flags < 0 { |
1185 | Err(io::Error::last_os_error()) |
1186 | } else { |
1187 | Ok(flags) |
1188 | } |
1189 | } |
1190 | |
1191 | /// Checks for `O_RDONLY` or `O_RDWR` access mode. |
1192 | fn has_read_access(flags: libc::c_int) -> bool { |
1193 | let mode = flags & libc::O_ACCMODE; |
1194 | mode == libc::O_RDONLY || mode == libc::O_RDWR |
1195 | } |
1196 | |
1197 | /// Checks for `O_WRONLY` or `O_RDWR` access mode. |
1198 | fn has_write_access(flags: libc::c_int) -> bool { |
1199 | let mode = flags & libc::O_ACCMODE; |
1200 | mode == libc::O_WRONLY || mode == libc::O_RDWR |
1201 | } |
1202 | |
1203 | /// Sets file's flags with `O_NONBLOCK` by fcntl. |
1204 | fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> { |
1205 | let fd = file.as_raw_fd(); |
1206 | |
1207 | let flags = current_flags | libc::O_NONBLOCK; |
1208 | |
1209 | if flags != current_flags { |
1210 | let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) }; |
1211 | if ret < 0 { |
1212 | return Err(io::Error::last_os_error()); |
1213 | } |
1214 | } |
1215 | |
1216 | Ok(()) |
1217 | } |
1218 | |