1 | //! Unix pipe types. |
2 | |
3 | use crate::io::interest::Interest; |
4 | use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready}; |
5 | |
6 | use mio::unix::pipe as mio_pipe; |
7 | use std::fs::File; |
8 | use std::io::{self, Read, Write}; |
9 | use std::os::unix::fs::OpenOptionsExt; |
10 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; |
11 | use std::path::Path; |
12 | use std::pin::Pin; |
13 | use std::task::{Context, Poll}; |
14 | |
15 | cfg_io_util! { |
16 | use bytes::BufMut; |
17 | } |
18 | |
19 | /// Creates a new anonymous Unix pipe. |
20 | /// |
21 | /// This function will open a new pipe and associate both pipe ends with the default |
22 | /// event loop. |
23 | /// |
24 | /// If you need to create a pipe for communication with a spawned process, you can |
25 | /// use [`Stdio::piped()`] instead. |
26 | /// |
27 | /// [`Stdio::piped()`]: std::process::Stdio::piped |
28 | /// |
29 | /// # Errors |
30 | /// |
31 | /// If creating a pipe fails, this function will return with the related OS error. |
32 | /// |
33 | /// # Examples |
34 | /// |
35 | /// Create a pipe and pass the writing end to a spawned process. |
36 | /// |
37 | /// ```no_run |
38 | /// use tokio::net::unix::pipe; |
39 | /// use tokio::process::Command; |
40 | /// # use tokio::io::AsyncReadExt; |
41 | /// # use std::error::Error; |
42 | /// |
43 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
44 | /// let (tx, mut rx) = pipe::pipe()?; |
45 | /// let mut buffer = String::new(); |
46 | /// |
47 | /// let status = Command::new("echo" ) |
48 | /// .arg("Hello, world!" ) |
49 | /// .stdout(tx.into_blocking_fd()?) |
50 | /// .status(); |
51 | /// rx.read_to_string(&mut buffer).await?; |
52 | /// |
53 | /// assert!(status.await?.success()); |
54 | /// assert_eq!(buffer, "Hello, world! \n" ); |
55 | /// # Ok(()) |
56 | /// # } |
57 | /// ``` |
58 | /// |
59 | /// # Panics |
60 | /// |
61 | /// This function panics if it is not called from within a runtime with |
62 | /// IO enabled. |
63 | /// |
64 | /// The runtime is usually set implicitly when this function is called |
65 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
66 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
67 | pub fn pipe() -> io::Result<(Sender, Receiver)> { |
68 | let (tx: Sender, rx: Receiver) = mio_pipe::new()?; |
69 | Ok((Sender::from_mio(mio_tx:tx)?, Receiver::from_mio(mio_rx:rx)?)) |
70 | } |
71 | |
72 | /// Options and flags which can be used to configure how a FIFO file is opened. |
73 | /// |
74 | /// This builder allows configuring how to create a pipe end from a FIFO file. |
75 | /// Generally speaking, when using `OpenOptions`, you'll first call [`new`], |
76 | /// then chain calls to methods to set each option, then call either |
77 | /// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you |
78 | /// are trying to open. This will give you a [`io::Result`] with a pipe end |
79 | /// inside that you can further operate on. |
80 | /// |
81 | /// [`new`]: OpenOptions::new |
82 | /// [`open_receiver`]: OpenOptions::open_receiver |
83 | /// [`open_sender`]: OpenOptions::open_sender |
84 | /// |
85 | /// # Examples |
86 | /// |
87 | /// Opening a pair of pipe ends from a FIFO file: |
88 | /// |
89 | /// ```no_run |
90 | /// use tokio::net::unix::pipe; |
91 | /// # use std::error::Error; |
92 | /// |
93 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
94 | /// |
95 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
96 | /// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; |
97 | /// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?; |
98 | /// # Ok(()) |
99 | /// # } |
100 | /// ``` |
101 | /// |
102 | /// Opening a [`Sender`] on Linux when you are sure the file is a FIFO: |
103 | /// |
104 | /// ```ignore |
105 | /// use tokio::net::unix::pipe; |
106 | /// use nix::{unistd::mkfifo, sys::stat::Mode}; |
107 | /// # use std::error::Error; |
108 | /// |
109 | /// // Our program has exclusive access to this path. |
110 | /// const FIFO_NAME: &str = "path/to/a/new/fifo" ; |
111 | /// |
112 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
113 | /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; |
114 | /// let tx = pipe::OpenOptions::new() |
115 | /// .read_write(true) |
116 | /// .unchecked(true) |
117 | /// .open_sender(FIFO_NAME)?; |
118 | /// # Ok(()) |
119 | /// # } |
120 | /// ``` |
121 | #[derive (Clone, Debug)] |
122 | pub struct OpenOptions { |
123 | #[cfg (target_os = "linux" )] |
124 | read_write: bool, |
125 | unchecked: bool, |
126 | } |
127 | |
128 | impl OpenOptions { |
129 | /// Creates a blank new set of options ready for configuration. |
130 | /// |
131 | /// All options are initially set to `false`. |
132 | pub fn new() -> OpenOptions { |
133 | OpenOptions { |
134 | #[cfg (target_os = "linux" )] |
135 | read_write: false, |
136 | unchecked: false, |
137 | } |
138 | } |
139 | |
140 | /// Sets the option for read-write access. |
141 | /// |
142 | /// This option, when true, will indicate that a FIFO file will be opened |
143 | /// in read-write access mode. This operation is not defined by the POSIX |
144 | /// standard and is only guaranteed to work on Linux. |
145 | /// |
146 | /// # Examples |
147 | /// |
148 | /// Opening a [`Sender`] even if there are no open reading ends: |
149 | /// |
150 | /// ```ignore |
151 | /// use tokio::net::unix::pipe; |
152 | /// |
153 | /// let tx = pipe::OpenOptions::new() |
154 | /// .read_write(true) |
155 | /// .open_sender("path/to/a/fifo" ); |
156 | /// ``` |
157 | /// |
158 | /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not |
159 | /// fail with [`UnexpectedEof`] during reading if all writing ends of the |
160 | /// pipe close the FIFO file. |
161 | /// |
162 | /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof |
163 | /// |
164 | /// ```ignore |
165 | /// use tokio::net::unix::pipe; |
166 | /// |
167 | /// let tx = pipe::OpenOptions::new() |
168 | /// .read_write(true) |
169 | /// .open_receiver("path/to/a/fifo" ); |
170 | /// ``` |
171 | #[cfg (target_os = "linux" )] |
172 | #[cfg_attr (docsrs, doc(cfg(target_os = "linux" )))] |
173 | pub fn read_write(&mut self, value: bool) -> &mut Self { |
174 | self.read_write = value; |
175 | self |
176 | } |
177 | |
178 | /// Sets the option to skip the check for FIFO file type. |
179 | /// |
180 | /// By default, [`open_receiver`] and [`open_sender`] functions will check |
181 | /// if the opened file is a FIFO file. Set this option to `true` if you are |
182 | /// sure the file is a FIFO file. |
183 | /// |
184 | /// [`open_receiver`]: OpenOptions::open_receiver |
185 | /// [`open_sender`]: OpenOptions::open_sender |
186 | /// |
187 | /// # Examples |
188 | /// |
189 | /// ```no_run |
190 | /// use tokio::net::unix::pipe; |
191 | /// use nix::{unistd::mkfifo, sys::stat::Mode}; |
192 | /// # use std::error::Error; |
193 | /// |
194 | /// // Our program has exclusive access to this path. |
195 | /// const FIFO_NAME: &str = "path/to/a/new/fifo" ; |
196 | /// |
197 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
198 | /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; |
199 | /// let rx = pipe::OpenOptions::new() |
200 | /// .unchecked(true) |
201 | /// .open_receiver(FIFO_NAME)?; |
202 | /// # Ok(()) |
203 | /// # } |
204 | /// ``` |
205 | pub fn unchecked(&mut self, value: bool) -> &mut Self { |
206 | self.unchecked = value; |
207 | self |
208 | } |
209 | |
210 | /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`. |
211 | /// |
212 | /// This function will open the FIFO file at the specified path, possibly |
213 | /// check if it is a pipe, and associate the pipe with the default event |
214 | /// loop for reading. |
215 | /// |
216 | /// # Errors |
217 | /// |
218 | /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`. |
219 | /// This function may also fail with other standard OS errors. |
220 | /// |
221 | /// # Panics |
222 | /// |
223 | /// This function panics if it is not called from within a runtime with |
224 | /// IO enabled. |
225 | /// |
226 | /// The runtime is usually set implicitly when this function is called |
227 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
228 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
229 | pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> { |
230 | let file = self.open(path.as_ref(), PipeEnd::Receiver)?; |
231 | Receiver::from_file_unchecked(file) |
232 | } |
233 | |
234 | /// Creates a [`Sender`] from a FIFO file with the options specified by `self`. |
235 | /// |
236 | /// This function will open the FIFO file at the specified path, possibly |
237 | /// check if it is a pipe, and associate the pipe with the default event |
238 | /// loop for writing. |
239 | /// |
240 | /// # Errors |
241 | /// |
242 | /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`. |
243 | /// If the file is not opened in read-write access mode and the file is not |
244 | /// currently open for reading, this function will fail with `ENXIO`. |
245 | /// This function may also fail with other standard OS errors. |
246 | /// |
247 | /// # Panics |
248 | /// |
249 | /// This function panics if it is not called from within a runtime with |
250 | /// IO enabled. |
251 | /// |
252 | /// The runtime is usually set implicitly when this function is called |
253 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
254 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
255 | pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> { |
256 | let file = self.open(path.as_ref(), PipeEnd::Sender)?; |
257 | Sender::from_file_unchecked(file) |
258 | } |
259 | |
260 | fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> { |
261 | let mut options = std::fs::OpenOptions::new(); |
262 | options |
263 | .read(pipe_end == PipeEnd::Receiver) |
264 | .write(pipe_end == PipeEnd::Sender) |
265 | .custom_flags(libc::O_NONBLOCK); |
266 | |
267 | #[cfg (target_os = "linux" )] |
268 | if self.read_write { |
269 | options.read(true).write(true); |
270 | } |
271 | |
272 | let file = options.open(path)?; |
273 | |
274 | if !self.unchecked && !is_pipe(file.as_fd())? { |
275 | return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe" )); |
276 | } |
277 | |
278 | Ok(file) |
279 | } |
280 | } |
281 | |
282 | impl Default for OpenOptions { |
283 | fn default() -> OpenOptions { |
284 | OpenOptions::new() |
285 | } |
286 | } |
287 | |
288 | #[derive (Clone, Copy, PartialEq, Eq, Debug)] |
289 | enum PipeEnd { |
290 | Sender, |
291 | Receiver, |
292 | } |
293 | |
294 | /// Writing end of a Unix pipe. |
295 | /// |
296 | /// It can be constructed from a FIFO file with [`OpenOptions::open_sender`]. |
297 | /// |
298 | /// Opening a named pipe for writing involves a few steps. |
299 | /// Call to [`OpenOptions::open_sender`] might fail with an error indicating |
300 | /// different things: |
301 | /// |
302 | /// * [`io::ErrorKind::NotFound`] - There is no file at the specified path. |
303 | /// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO. |
304 | /// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading. |
305 | /// Sleep for a while and try again. |
306 | /// * Other OS errors not specific to opening FIFO files. |
307 | /// |
308 | /// Opening a `Sender` from a FIFO file should look like this: |
309 | /// |
310 | /// ```no_run |
311 | /// use tokio::net::unix::pipe; |
312 | /// use tokio::time::{self, Duration}; |
313 | /// |
314 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
315 | /// |
316 | /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { |
317 | /// // Wait for a reader to open the file. |
318 | /// let tx = loop { |
319 | /// match pipe::OpenOptions::new().open_sender(FIFO_NAME) { |
320 | /// Ok(tx) => break tx, |
321 | /// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}, |
322 | /// Err(e) => return Err(e.into()), |
323 | /// } |
324 | /// |
325 | /// time::sleep(Duration::from_millis(50)).await; |
326 | /// }; |
327 | /// # Ok(()) |
328 | /// # } |
329 | /// ``` |
330 | /// |
331 | /// On Linux, it is possible to create a `Sender` without waiting in a sleeping |
332 | /// loop. This is done by opening a named pipe in read-write access mode with |
333 | /// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold |
334 | /// both a writing end and a reading end, and the latter allows to open a FIFO |
335 | /// without [`ENXIO`] error since the pipe is open for reading as well. |
336 | /// |
337 | /// `Sender` cannot be used to read from a pipe, so in practice the read access |
338 | /// is only used when a FIFO is opened. However, using a `Sender` in read-write |
339 | /// mode **may lead to lost data**, because written data will be dropped by the |
340 | /// system as soon as all pipe ends are closed. To avoid lost data you have to |
341 | /// make sure that a reading end has been opened before dropping a `Sender`. |
342 | /// |
343 | /// Note that using read-write access mode with FIFO files is not defined by |
344 | /// the POSIX standard and it is only guaranteed to work on Linux. |
345 | /// |
346 | /// ```ignore |
347 | /// use tokio::io::AsyncWriteExt; |
348 | /// use tokio::net::unix::pipe; |
349 | /// |
350 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
351 | /// |
352 | /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { |
353 | /// let mut tx = pipe::OpenOptions::new() |
354 | /// .read_write(true) |
355 | /// .open_sender(FIFO_NAME)?; |
356 | /// |
357 | /// // Asynchronously write to the pipe before a reader. |
358 | /// tx.write_all(b"hello world" ).await?; |
359 | /// # Ok(()) |
360 | /// # } |
361 | /// ``` |
362 | /// |
363 | /// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html |
364 | #[derive (Debug)] |
365 | pub struct Sender { |
366 | io: PollEvented<mio_pipe::Sender>, |
367 | } |
368 | |
369 | impl Sender { |
370 | fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> { |
371 | let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?; |
372 | Ok(Sender { io }) |
373 | } |
374 | |
375 | /// Creates a new `Sender` from a [`File`]. |
376 | /// |
377 | /// This function is intended to construct a pipe from a [`File`] representing |
378 | /// a special FIFO file. It will check if the file is a pipe and has write access, |
379 | /// set it in non-blocking mode and perform the conversion. |
380 | /// |
381 | /// # Errors |
382 | /// |
383 | /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it |
384 | /// does not have write access. Also fails with any standard OS error if it occurs. |
385 | /// |
386 | /// # Panics |
387 | /// |
388 | /// This function panics if it is not called from within a runtime with |
389 | /// IO enabled. |
390 | /// |
391 | /// The runtime is usually set implicitly when this function is called |
392 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
393 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
394 | pub fn from_file(file: File) -> io::Result<Sender> { |
395 | Sender::from_owned_fd(file.into()) |
396 | } |
397 | |
398 | /// Creates a new `Sender` from an [`OwnedFd`]. |
399 | /// |
400 | /// This function is intended to construct a pipe from an [`OwnedFd`] representing |
401 | /// an anonymous pipe or a special FIFO file. It will check if the file descriptor |
402 | /// is a pipe and has write access, set it in non-blocking mode and perform the |
403 | /// conversion. |
404 | /// |
405 | /// # Errors |
406 | /// |
407 | /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe |
408 | /// or it does not have write access. Also fails with any standard OS error if it |
409 | /// occurs. |
410 | /// |
411 | /// # Panics |
412 | /// |
413 | /// This function panics if it is not called from within a runtime with |
414 | /// IO enabled. |
415 | /// |
416 | /// The runtime is usually set implicitly when this function is called |
417 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
418 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
419 | pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Sender> { |
420 | if !is_pipe(owned_fd.as_fd())? { |
421 | return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe" )); |
422 | } |
423 | |
424 | let flags = get_file_flags(owned_fd.as_fd())?; |
425 | if has_write_access(flags) { |
426 | set_nonblocking(owned_fd.as_fd(), flags)?; |
427 | Sender::from_owned_fd_unchecked(owned_fd) |
428 | } else { |
429 | Err(io::Error::new( |
430 | io::ErrorKind::InvalidInput, |
431 | "not in O_WRONLY or O_RDWR access mode" , |
432 | )) |
433 | } |
434 | } |
435 | |
436 | /// Creates a new `Sender` from a [`File`] without checking pipe properties. |
437 | /// |
438 | /// This function is intended to construct a pipe from a File representing |
439 | /// a special FIFO file. The conversion assumes nothing about the underlying |
440 | /// file; it is left up to the user to make sure it is opened with write access, |
441 | /// represents a pipe and is set in non-blocking mode. |
442 | /// |
443 | /// # Examples |
444 | /// |
445 | /// ```no_run |
446 | /// use tokio::net::unix::pipe; |
447 | /// use std::fs::OpenOptions; |
448 | /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; |
449 | /// # use std::error::Error; |
450 | /// |
451 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
452 | /// |
453 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
454 | /// let file = OpenOptions::new() |
455 | /// .write(true) |
456 | /// .custom_flags(libc::O_NONBLOCK) |
457 | /// .open(FIFO_NAME)?; |
458 | /// if file.metadata()?.file_type().is_fifo() { |
459 | /// let tx = pipe::Sender::from_file_unchecked(file)?; |
460 | /// /* use the Sender */ |
461 | /// } |
462 | /// # Ok(()) |
463 | /// # } |
464 | /// ``` |
465 | /// |
466 | /// # Panics |
467 | /// |
468 | /// This function panics if it is not called from within a runtime with |
469 | /// IO enabled. |
470 | /// |
471 | /// The runtime is usually set implicitly when this function is called |
472 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
473 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
474 | pub fn from_file_unchecked(file: File) -> io::Result<Sender> { |
475 | Sender::from_owned_fd_unchecked(file.into()) |
476 | } |
477 | |
478 | /// Creates a new `Sender` from an [`OwnedFd`] without checking pipe properties. |
479 | /// |
480 | /// This function is intended to construct a pipe from an [`OwnedFd`] representing |
481 | /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about |
482 | /// the underlying pipe; it is left up to the user to make sure that the file |
483 | /// descriptor represents the writing end of a pipe and the pipe is set in |
484 | /// non-blocking mode. |
485 | /// |
486 | /// # Panics |
487 | /// |
488 | /// This function panics if it is not called from within a runtime with |
489 | /// IO enabled. |
490 | /// |
491 | /// The runtime is usually set implicitly when this function is called |
492 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
493 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
494 | pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Sender> { |
495 | // Safety: OwnedFd represents a valid, open file descriptor. |
496 | let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(owned_fd.into_raw_fd()) }; |
497 | Sender::from_mio(mio_tx) |
498 | } |
499 | |
500 | /// Waits for any of the requested ready states. |
501 | /// |
502 | /// This function can be used instead of [`writable()`] to check the returned |
503 | /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events. |
504 | /// |
505 | /// The function may complete without the pipe being ready. This is a |
506 | /// false-positive and attempting an operation will return with |
507 | /// `io::ErrorKind::WouldBlock`. The function can also return with an empty |
508 | /// [`Ready`] set, so you should always check the returned value and possibly |
509 | /// wait again if the requested states are not set. |
510 | /// |
511 | /// [`writable()`]: Self::writable |
512 | /// |
513 | /// # Cancel safety |
514 | /// |
515 | /// This method is cancel safe. Once a readiness event occurs, the method |
516 | /// will continue to return immediately until the readiness event is |
517 | /// consumed by an attempt to write that fails with `WouldBlock` or |
518 | /// `Poll::Pending`. |
519 | pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { |
520 | let event = self.io.registration().readiness(interest).await?; |
521 | Ok(event.ready) |
522 | } |
523 | |
524 | /// Waits for the pipe to become writable. |
525 | /// |
526 | /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually |
527 | /// paired with [`try_write()`]. |
528 | /// |
529 | /// [`try_write()`]: Self::try_write |
530 | /// |
531 | /// # Examples |
532 | /// |
533 | /// ```no_run |
534 | /// use tokio::net::unix::pipe; |
535 | /// use std::io; |
536 | /// |
537 | /// #[tokio::main] |
538 | /// async fn main() -> io::Result<()> { |
539 | /// // Open a writing end of a fifo |
540 | /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo" )?; |
541 | /// |
542 | /// loop { |
543 | /// // Wait for the pipe to be writable |
544 | /// tx.writable().await?; |
545 | /// |
546 | /// // Try to write data, this may still fail with `WouldBlock` |
547 | /// // if the readiness event is a false positive. |
548 | /// match tx.try_write(b"hello world" ) { |
549 | /// Ok(n) => { |
550 | /// break; |
551 | /// } |
552 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
553 | /// continue; |
554 | /// } |
555 | /// Err(e) => { |
556 | /// return Err(e.into()); |
557 | /// } |
558 | /// } |
559 | /// } |
560 | /// |
561 | /// Ok(()) |
562 | /// } |
563 | /// ``` |
564 | pub async fn writable(&self) -> io::Result<()> { |
565 | self.ready(Interest::WRITABLE).await?; |
566 | Ok(()) |
567 | } |
568 | |
569 | /// Polls for write readiness. |
570 | /// |
571 | /// If the pipe is not currently ready for writing, this method will |
572 | /// store a clone of the `Waker` from the provided `Context`. When the pipe |
573 | /// becomes ready for writing, `Waker::wake` will be called on the waker. |
574 | /// |
575 | /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only |
576 | /// the `Waker` from the `Context` passed to the most recent call is |
577 | /// scheduled to receive a wakeup. |
578 | /// |
579 | /// This function is intended for cases where creating and pinning a future |
580 | /// via [`writable`] is not feasible. Where possible, using [`writable`] is |
581 | /// preferred, as this supports polling from multiple tasks at once. |
582 | /// |
583 | /// [`writable`]: Self::writable |
584 | /// |
585 | /// # Return value |
586 | /// |
587 | /// The function returns: |
588 | /// |
589 | /// * `Poll::Pending` if the pipe is not ready for writing. |
590 | /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing. |
591 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
592 | /// |
593 | /// # Errors |
594 | /// |
595 | /// This function may encounter any standard I/O error except `WouldBlock`. |
596 | pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
597 | self.io.registration().poll_write_ready(cx).map_ok(|_| ()) |
598 | } |
599 | |
600 | /// Tries to write a buffer to the pipe, returning how many bytes were |
601 | /// written. |
602 | /// |
603 | /// The function will attempt to write the entire contents of `buf`, but |
604 | /// only part of the buffer may be written. If the length of `buf` is not |
605 | /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the |
606 | /// write is guaranteed to be atomic, i.e. either the entire content of |
607 | /// `buf` will be written or this method will fail with `WouldBlock`. There |
608 | /// is no such guarantee if `buf` is larger than `PIPE_BUF`. |
609 | /// |
610 | /// This function is usually paired with [`writable`]. |
611 | /// |
612 | /// [`writable`]: Self::writable |
613 | /// |
614 | /// # Return |
615 | /// |
616 | /// If data is successfully written, `Ok(n)` is returned, where `n` is the |
617 | /// number of bytes written. If the pipe is not ready to write data, |
618 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
619 | /// |
620 | /// # Examples |
621 | /// |
622 | /// ```no_run |
623 | /// use tokio::net::unix::pipe; |
624 | /// use std::io; |
625 | /// |
626 | /// #[tokio::main] |
627 | /// async fn main() -> io::Result<()> { |
628 | /// // Open a writing end of a fifo |
629 | /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo" )?; |
630 | /// |
631 | /// loop { |
632 | /// // Wait for the pipe to be writable |
633 | /// tx.writable().await?; |
634 | /// |
635 | /// // Try to write data, this may still fail with `WouldBlock` |
636 | /// // if the readiness event is a false positive. |
637 | /// match tx.try_write(b"hello world" ) { |
638 | /// Ok(n) => { |
639 | /// break; |
640 | /// } |
641 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
642 | /// continue; |
643 | /// } |
644 | /// Err(e) => { |
645 | /// return Err(e.into()); |
646 | /// } |
647 | /// } |
648 | /// } |
649 | /// |
650 | /// Ok(()) |
651 | /// } |
652 | /// ``` |
653 | pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { |
654 | self.io |
655 | .registration() |
656 | .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) |
657 | } |
658 | |
659 | /// Tries to write several buffers to the pipe, returning how many bytes |
660 | /// were written. |
661 | /// |
662 | /// Data is written from each buffer in order, with the final buffer read |
663 | /// from possible being only partially consumed. This method behaves |
664 | /// equivalently to a single call to [`try_write()`] with concatenated |
665 | /// buffers. |
666 | /// |
667 | /// If the total length of buffers is not greater than `PIPE_BUF` (an OS |
668 | /// constant, 4096 under Linux), then the write is guaranteed to be atomic, |
669 | /// i.e. either the entire contents of buffers will be written or this |
670 | /// method will fail with `WouldBlock`. There is no such guarantee if the |
671 | /// total length of buffers is greater than `PIPE_BUF`. |
672 | /// |
673 | /// This function is usually paired with [`writable`]. |
674 | /// |
675 | /// [`try_write()`]: Self::try_write() |
676 | /// [`writable`]: Self::writable |
677 | /// |
678 | /// # Return |
679 | /// |
680 | /// If data is successfully written, `Ok(n)` is returned, where `n` is the |
681 | /// number of bytes written. If the pipe is not ready to write data, |
682 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
683 | /// |
684 | /// # Examples |
685 | /// |
686 | /// ```no_run |
687 | /// use tokio::net::unix::pipe; |
688 | /// use std::io; |
689 | /// |
690 | /// #[tokio::main] |
691 | /// async fn main() -> io::Result<()> { |
692 | /// // Open a writing end of a fifo |
693 | /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo" )?; |
694 | /// |
695 | /// let bufs = [io::IoSlice::new(b"hello " ), io::IoSlice::new(b"world" )]; |
696 | /// |
697 | /// loop { |
698 | /// // Wait for the pipe to be writable |
699 | /// tx.writable().await?; |
700 | /// |
701 | /// // Try to write data, this may still fail with `WouldBlock` |
702 | /// // if the readiness event is a false positive. |
703 | /// match tx.try_write_vectored(&bufs) { |
704 | /// Ok(n) => { |
705 | /// break; |
706 | /// } |
707 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
708 | /// continue; |
709 | /// } |
710 | /// Err(e) => { |
711 | /// return Err(e.into()); |
712 | /// } |
713 | /// } |
714 | /// } |
715 | /// |
716 | /// Ok(()) |
717 | /// } |
718 | /// ``` |
719 | pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { |
720 | self.io |
721 | .registration() |
722 | .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) |
723 | } |
724 | |
725 | /// Converts the pipe into an [`OwnedFd`] in blocking mode. |
726 | /// |
727 | /// This function will deregister this pipe end from the event loop, set |
728 | /// it in blocking mode and perform the conversion. |
729 | pub fn into_blocking_fd(self) -> io::Result<OwnedFd> { |
730 | let fd = self.into_nonblocking_fd()?; |
731 | set_blocking(&fd)?; |
732 | Ok(fd) |
733 | } |
734 | |
735 | /// Converts the pipe into an [`OwnedFd`] in nonblocking mode. |
736 | /// |
737 | /// This function will deregister this pipe end from the event loop and |
738 | /// perform the conversion. The returned file descriptor will be in nonblocking |
739 | /// mode. |
740 | pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> { |
741 | let mio_pipe = self.io.into_inner()?; |
742 | |
743 | // Safety: the pipe is now deregistered from the event loop |
744 | // and we are the only owner of this pipe end. |
745 | let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) }; |
746 | |
747 | Ok(owned_fd) |
748 | } |
749 | } |
750 | |
751 | impl AsyncWrite for Sender { |
752 | fn poll_write( |
753 | self: Pin<&mut Self>, |
754 | cx: &mut Context<'_>, |
755 | buf: &[u8], |
756 | ) -> Poll<io::Result<usize>> { |
757 | self.io.poll_write(cx, buf) |
758 | } |
759 | |
760 | fn poll_write_vectored( |
761 | self: Pin<&mut Self>, |
762 | cx: &mut Context<'_>, |
763 | bufs: &[io::IoSlice<'_>], |
764 | ) -> Poll<io::Result<usize>> { |
765 | self.io.poll_write_vectored(cx, bufs) |
766 | } |
767 | |
768 | fn is_write_vectored(&self) -> bool { |
769 | true |
770 | } |
771 | |
772 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { |
773 | Poll::Ready(Ok(())) |
774 | } |
775 | |
776 | fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { |
777 | Poll::Ready(Ok(())) |
778 | } |
779 | } |
780 | |
781 | impl AsRawFd for Sender { |
782 | fn as_raw_fd(&self) -> RawFd { |
783 | self.io.as_raw_fd() |
784 | } |
785 | } |
786 | |
787 | impl AsFd for Sender { |
788 | fn as_fd(&self) -> BorrowedFd<'_> { |
789 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
790 | } |
791 | } |
792 | |
793 | /// Reading end of a Unix pipe. |
794 | /// |
795 | /// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`]. |
796 | /// |
797 | /// # Examples |
798 | /// |
799 | /// Receiving messages from a named pipe in a loop: |
800 | /// |
801 | /// ```no_run |
802 | /// use tokio::net::unix::pipe; |
803 | /// use tokio::io::{self, AsyncReadExt}; |
804 | /// |
805 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
806 | /// |
807 | /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { |
808 | /// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; |
809 | /// loop { |
810 | /// let mut msg = vec![0; 256]; |
811 | /// match rx.read_exact(&mut msg).await { |
812 | /// Ok(_) => { |
813 | /// /* handle the message */ |
814 | /// } |
815 | /// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { |
816 | /// // Writing end has been closed, we should reopen the pipe. |
817 | /// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; |
818 | /// } |
819 | /// Err(e) => return Err(e.into()), |
820 | /// } |
821 | /// } |
822 | /// # } |
823 | /// ``` |
824 | /// |
825 | /// On Linux, you can use a `Receiver` in read-write access mode to implement |
826 | /// resilient reading from a named pipe. Unlike `Receiver` opened in read-only |
827 | /// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof` |
828 | /// when the writing end is closed. This way, a `Receiver` can asynchronously |
829 | /// wait for the next writer to open the pipe. |
830 | /// |
831 | /// You should not use functions waiting for EOF such as [`read_to_end`] with |
832 | /// a `Receiver` in read-write access mode, since it **may wait forever**. |
833 | /// `Receiver` in this mode also holds an open writing end, which prevents |
834 | /// receiving EOF. |
835 | /// |
836 | /// To set the read-write access mode you can use `OpenOptions::read_write`. |
837 | /// Note that using read-write access mode with FIFO files is not defined by |
838 | /// the POSIX standard and it is only guaranteed to work on Linux. |
839 | /// |
840 | /// ```ignore |
841 | /// use tokio::net::unix::pipe; |
842 | /// use tokio::io::AsyncReadExt; |
843 | /// # use std::error::Error; |
844 | /// |
845 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
846 | /// |
847 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
848 | /// let mut rx = pipe::OpenOptions::new() |
849 | /// .read_write(true) |
850 | /// .open_receiver(FIFO_NAME)?; |
851 | /// loop { |
852 | /// let mut msg = vec![0; 256]; |
853 | /// rx.read_exact(&mut msg).await?; |
854 | /// /* handle the message */ |
855 | /// } |
856 | /// # } |
857 | /// ``` |
858 | /// |
859 | /// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end |
860 | #[derive (Debug)] |
861 | pub struct Receiver { |
862 | io: PollEvented<mio_pipe::Receiver>, |
863 | } |
864 | |
865 | impl Receiver { |
866 | fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> { |
867 | let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?; |
868 | Ok(Receiver { io }) |
869 | } |
870 | |
871 | /// Creates a new `Receiver` from a [`File`]. |
872 | /// |
873 | /// This function is intended to construct a pipe from a [`File`] representing |
874 | /// a special FIFO file. It will check if the file is a pipe and has read access, |
875 | /// set it in non-blocking mode and perform the conversion. |
876 | /// |
877 | /// # Errors |
878 | /// |
879 | /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it |
880 | /// does not have read access. Also fails with any standard OS error if it occurs. |
881 | /// |
882 | /// # Panics |
883 | /// |
884 | /// This function panics if it is not called from within a runtime with |
885 | /// IO enabled. |
886 | /// |
887 | /// The runtime is usually set implicitly when this function is called |
888 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
889 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
890 | pub fn from_file(file: File) -> io::Result<Receiver> { |
891 | Receiver::from_owned_fd(file.into()) |
892 | } |
893 | |
894 | /// Creates a new `Receiver` from an [`OwnedFd`]. |
895 | /// |
896 | /// This function is intended to construct a pipe from an [`OwnedFd`] representing |
897 | /// an anonymous pipe or a special FIFO file. It will check if the file descriptor |
898 | /// is a pipe and has read access, set it in non-blocking mode and perform the |
899 | /// conversion. |
900 | /// |
901 | /// # Errors |
902 | /// |
903 | /// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe |
904 | /// or it does not have read access. Also fails with any standard OS error if it |
905 | /// occurs. |
906 | /// |
907 | /// # Panics |
908 | /// |
909 | /// This function panics if it is not called from within a runtime with |
910 | /// IO enabled. |
911 | /// |
912 | /// The runtime is usually set implicitly when this function is called |
913 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
914 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
915 | pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Receiver> { |
916 | if !is_pipe(owned_fd.as_fd())? { |
917 | return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe" )); |
918 | } |
919 | |
920 | let flags = get_file_flags(owned_fd.as_fd())?; |
921 | if has_read_access(flags) { |
922 | set_nonblocking(owned_fd.as_fd(), flags)?; |
923 | Receiver::from_owned_fd_unchecked(owned_fd) |
924 | } else { |
925 | Err(io::Error::new( |
926 | io::ErrorKind::InvalidInput, |
927 | "not in O_RDONLY or O_RDWR access mode" , |
928 | )) |
929 | } |
930 | } |
931 | |
932 | /// Creates a new `Receiver` from a [`File`] without checking pipe properties. |
933 | /// |
934 | /// This function is intended to construct a pipe from a File representing |
935 | /// a special FIFO file. The conversion assumes nothing about the underlying |
936 | /// file; it is left up to the user to make sure it is opened with read access, |
937 | /// represents a pipe and is set in non-blocking mode. |
938 | /// |
939 | /// # Examples |
940 | /// |
941 | /// ```no_run |
942 | /// use tokio::net::unix::pipe; |
943 | /// use std::fs::OpenOptions; |
944 | /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; |
945 | /// # use std::error::Error; |
946 | /// |
947 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
948 | /// |
949 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
950 | /// let file = OpenOptions::new() |
951 | /// .read(true) |
952 | /// .custom_flags(libc::O_NONBLOCK) |
953 | /// .open(FIFO_NAME)?; |
954 | /// if file.metadata()?.file_type().is_fifo() { |
955 | /// let rx = pipe::Receiver::from_file_unchecked(file)?; |
956 | /// /* use the Receiver */ |
957 | /// } |
958 | /// # Ok(()) |
959 | /// # } |
960 | /// ``` |
961 | /// |
962 | /// # Panics |
963 | /// |
964 | /// This function panics if it is not called from within a runtime with |
965 | /// IO enabled. |
966 | /// |
967 | /// The runtime is usually set implicitly when this function is called |
968 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
969 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
970 | pub fn from_file_unchecked(file: File) -> io::Result<Receiver> { |
971 | Receiver::from_owned_fd_unchecked(file.into()) |
972 | } |
973 | |
974 | /// Creates a new `Receiver` from an [`OwnedFd`] without checking pipe properties. |
975 | /// |
976 | /// This function is intended to construct a pipe from an [`OwnedFd`] representing |
977 | /// an anonymous pipe or a special FIFO file. The conversion assumes nothing about |
978 | /// the underlying pipe; it is left up to the user to make sure that the file |
979 | /// descriptor represents the reading end of a pipe and the pipe is set in |
980 | /// non-blocking mode. |
981 | /// |
982 | /// # Panics |
983 | /// |
984 | /// This function panics if it is not called from within a runtime with |
985 | /// IO enabled. |
986 | /// |
987 | /// The runtime is usually set implicitly when this function is called |
988 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
989 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
990 | pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Receiver> { |
991 | // Safety: OwnedFd represents a valid, open file descriptor. |
992 | let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(owned_fd.into_raw_fd()) }; |
993 | Receiver::from_mio(mio_rx) |
994 | } |
995 | |
996 | /// Waits for any of the requested ready states. |
997 | /// |
998 | /// This function can be used instead of [`readable()`] to check the returned |
999 | /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events. |
1000 | /// |
1001 | /// The function may complete without the pipe being ready. This is a |
1002 | /// false-positive and attempting an operation will return with |
1003 | /// `io::ErrorKind::WouldBlock`. The function can also return with an empty |
1004 | /// [`Ready`] set, so you should always check the returned value and possibly |
1005 | /// wait again if the requested states are not set. |
1006 | /// |
1007 | /// [`readable()`]: Self::readable |
1008 | /// |
1009 | /// # Cancel safety |
1010 | /// |
1011 | /// This method is cancel safe. Once a readiness event occurs, the method |
1012 | /// will continue to return immediately until the readiness event is |
1013 | /// consumed by an attempt to read that fails with `WouldBlock` or |
1014 | /// `Poll::Pending`. |
1015 | pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { |
1016 | let event = self.io.registration().readiness(interest).await?; |
1017 | Ok(event.ready) |
1018 | } |
1019 | |
1020 | /// Waits for the pipe to become readable. |
1021 | /// |
1022 | /// This function is equivalent to `ready(Interest::READABLE)` and is usually |
1023 | /// paired with [`try_read()`]. |
1024 | /// |
1025 | /// [`try_read()`]: Self::try_read() |
1026 | /// |
1027 | /// # Examples |
1028 | /// |
1029 | /// ```no_run |
1030 | /// use tokio::net::unix::pipe; |
1031 | /// use std::io; |
1032 | /// |
1033 | /// #[tokio::main] |
1034 | /// async fn main() -> io::Result<()> { |
1035 | /// // Open a reading end of a fifo |
1036 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo" )?; |
1037 | /// |
1038 | /// let mut msg = vec![0; 1024]; |
1039 | /// |
1040 | /// loop { |
1041 | /// // Wait for the pipe to be readable |
1042 | /// rx.readable().await?; |
1043 | /// |
1044 | /// // Try to read data, this may still fail with `WouldBlock` |
1045 | /// // if the readiness event is a false positive. |
1046 | /// match rx.try_read(&mut msg) { |
1047 | /// Ok(n) => { |
1048 | /// msg.truncate(n); |
1049 | /// break; |
1050 | /// } |
1051 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
1052 | /// continue; |
1053 | /// } |
1054 | /// Err(e) => { |
1055 | /// return Err(e.into()); |
1056 | /// } |
1057 | /// } |
1058 | /// } |
1059 | /// |
1060 | /// println!("GOT = {:?}" , msg); |
1061 | /// Ok(()) |
1062 | /// } |
1063 | /// ``` |
1064 | pub async fn readable(&self) -> io::Result<()> { |
1065 | self.ready(Interest::READABLE).await?; |
1066 | Ok(()) |
1067 | } |
1068 | |
1069 | /// Polls for read readiness. |
1070 | /// |
1071 | /// If the pipe is not currently ready for reading, this method will |
1072 | /// store a clone of the `Waker` from the provided `Context`. When the pipe |
1073 | /// becomes ready for reading, `Waker::wake` will be called on the waker. |
1074 | /// |
1075 | /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only |
1076 | /// the `Waker` from the `Context` passed to the most recent call is |
1077 | /// scheduled to receive a wakeup. |
1078 | /// |
1079 | /// This function is intended for cases where creating and pinning a future |
1080 | /// via [`readable`] is not feasible. Where possible, using [`readable`] is |
1081 | /// preferred, as this supports polling from multiple tasks at once. |
1082 | /// |
1083 | /// [`readable`]: Self::readable |
1084 | /// |
1085 | /// # Return value |
1086 | /// |
1087 | /// The function returns: |
1088 | /// |
1089 | /// * `Poll::Pending` if the pipe is not ready for reading. |
1090 | /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading. |
1091 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
1092 | /// |
1093 | /// # Errors |
1094 | /// |
1095 | /// This function may encounter any standard I/O error except `WouldBlock`. |
1096 | pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
1097 | self.io.registration().poll_read_ready(cx).map_ok(|_| ()) |
1098 | } |
1099 | |
1100 | /// Tries to read data from the pipe into the provided buffer, returning how |
1101 | /// many bytes were read. |
1102 | /// |
1103 | /// Reads any pending data from the pipe but does not wait for new data |
1104 | /// to arrive. On success, returns the number of bytes read. Because |
1105 | /// `try_read()` is non-blocking, the buffer does not have to be stored by |
1106 | /// the async task and can exist entirely on the stack. |
1107 | /// |
1108 | /// Usually [`readable()`] is used with this function. |
1109 | /// |
1110 | /// [`readable()`]: Self::readable() |
1111 | /// |
1112 | /// # Return |
1113 | /// |
1114 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
1115 | /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: |
1116 | /// |
1117 | /// 1. The pipe's writing end is closed and will no longer write data. |
1118 | /// 2. The specified buffer was 0 bytes in length. |
1119 | /// |
1120 | /// If the pipe is not ready to read data, |
1121 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
1122 | /// |
1123 | /// # Examples |
1124 | /// |
1125 | /// ```no_run |
1126 | /// use tokio::net::unix::pipe; |
1127 | /// use std::io; |
1128 | /// |
1129 | /// #[tokio::main] |
1130 | /// async fn main() -> io::Result<()> { |
1131 | /// // Open a reading end of a fifo |
1132 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo" )?; |
1133 | /// |
1134 | /// let mut msg = vec![0; 1024]; |
1135 | /// |
1136 | /// loop { |
1137 | /// // Wait for the pipe to be readable |
1138 | /// rx.readable().await?; |
1139 | /// |
1140 | /// // Try to read data, this may still fail with `WouldBlock` |
1141 | /// // if the readiness event is a false positive. |
1142 | /// match rx.try_read(&mut msg) { |
1143 | /// Ok(n) => { |
1144 | /// msg.truncate(n); |
1145 | /// break; |
1146 | /// } |
1147 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
1148 | /// continue; |
1149 | /// } |
1150 | /// Err(e) => { |
1151 | /// return Err(e.into()); |
1152 | /// } |
1153 | /// } |
1154 | /// } |
1155 | /// |
1156 | /// println!("GOT = {:?}" , msg); |
1157 | /// Ok(()) |
1158 | /// } |
1159 | /// ``` |
1160 | pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { |
1161 | self.io |
1162 | .registration() |
1163 | .try_io(Interest::READABLE, || (&*self.io).read(buf)) |
1164 | } |
1165 | |
1166 | /// Tries to read data from the pipe into the provided buffers, returning |
1167 | /// how many bytes were read. |
1168 | /// |
1169 | /// Data is copied to fill each buffer in order, with the final buffer |
1170 | /// written to possibly being only partially filled. This method behaves |
1171 | /// equivalently to a single call to [`try_read()`] with concatenated |
1172 | /// buffers. |
1173 | /// |
1174 | /// Reads any pending data from the pipe but does not wait for new data |
1175 | /// to arrive. On success, returns the number of bytes read. Because |
1176 | /// `try_read_vectored()` is non-blocking, the buffer does not have to be |
1177 | /// stored by the async task and can exist entirely on the stack. |
1178 | /// |
1179 | /// Usually, [`readable()`] is used with this function. |
1180 | /// |
1181 | /// [`try_read()`]: Self::try_read() |
1182 | /// [`readable()`]: Self::readable() |
1183 | /// |
1184 | /// # Return |
1185 | /// |
1186 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
1187 | /// number of bytes read. `Ok(0)` indicates the pipe's writing end is |
1188 | /// closed and will no longer write data. If the pipe is not ready to read |
1189 | /// data `Err(io::ErrorKind::WouldBlock)` is returned. |
1190 | /// |
1191 | /// # Examples |
1192 | /// |
1193 | /// ```no_run |
1194 | /// use tokio::net::unix::pipe; |
1195 | /// use std::io; |
1196 | /// |
1197 | /// #[tokio::main] |
1198 | /// async fn main() -> io::Result<()> { |
1199 | /// // Open a reading end of a fifo |
1200 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo" )?; |
1201 | /// |
1202 | /// loop { |
1203 | /// // Wait for the pipe to be readable |
1204 | /// rx.readable().await?; |
1205 | /// |
1206 | /// // Creating the buffer **after** the `await` prevents it from |
1207 | /// // being stored in the async task. |
1208 | /// let mut buf_a = [0; 512]; |
1209 | /// let mut buf_b = [0; 1024]; |
1210 | /// let mut bufs = [ |
1211 | /// io::IoSliceMut::new(&mut buf_a), |
1212 | /// io::IoSliceMut::new(&mut buf_b), |
1213 | /// ]; |
1214 | /// |
1215 | /// // Try to read data, this may still fail with `WouldBlock` |
1216 | /// // if the readiness event is a false positive. |
1217 | /// match rx.try_read_vectored(&mut bufs) { |
1218 | /// Ok(0) => break, |
1219 | /// Ok(n) => { |
1220 | /// println!("read {} bytes" , n); |
1221 | /// } |
1222 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
1223 | /// continue; |
1224 | /// } |
1225 | /// Err(e) => { |
1226 | /// return Err(e.into()); |
1227 | /// } |
1228 | /// } |
1229 | /// } |
1230 | /// |
1231 | /// Ok(()) |
1232 | /// } |
1233 | /// ``` |
1234 | pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { |
1235 | self.io |
1236 | .registration() |
1237 | .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) |
1238 | } |
1239 | |
1240 | cfg_io_util! { |
1241 | /// Tries to read data from the pipe into the provided buffer, advancing the |
1242 | /// buffer's internal cursor, returning how many bytes were read. |
1243 | /// |
1244 | /// Reads any pending data from the pipe but does not wait for new data |
1245 | /// to arrive. On success, returns the number of bytes read. Because |
1246 | /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by |
1247 | /// the async task and can exist entirely on the stack. |
1248 | /// |
1249 | /// Usually, [`readable()`] or [`ready()`] is used with this function. |
1250 | /// |
1251 | /// [`readable()`]: Self::readable |
1252 | /// [`ready()`]: Self::ready |
1253 | /// |
1254 | /// # Return |
1255 | /// |
1256 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
1257 | /// number of bytes read. `Ok(0)` indicates the pipe's writing end is |
1258 | /// closed and will no longer write data. If the pipe is not ready to read |
1259 | /// data `Err(io::ErrorKind::WouldBlock)` is returned. |
1260 | /// |
1261 | /// # Examples |
1262 | /// |
1263 | /// ```no_run |
1264 | /// use tokio::net::unix::pipe; |
1265 | /// use std::io; |
1266 | /// |
1267 | /// #[tokio::main] |
1268 | /// async fn main() -> io::Result<()> { |
1269 | /// // Open a reading end of a fifo |
1270 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?; |
1271 | /// |
1272 | /// loop { |
1273 | /// // Wait for the pipe to be readable |
1274 | /// rx.readable().await?; |
1275 | /// |
1276 | /// let mut buf = Vec::with_capacity(4096); |
1277 | /// |
1278 | /// // Try to read data, this may still fail with `WouldBlock` |
1279 | /// // if the readiness event is a false positive. |
1280 | /// match rx.try_read_buf(&mut buf) { |
1281 | /// Ok(0) => break, |
1282 | /// Ok(n) => { |
1283 | /// println!("read {} bytes", n); |
1284 | /// } |
1285 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
1286 | /// continue; |
1287 | /// } |
1288 | /// Err(e) => { |
1289 | /// return Err(e.into()); |
1290 | /// } |
1291 | /// } |
1292 | /// } |
1293 | /// |
1294 | /// Ok(()) |
1295 | /// } |
1296 | /// ``` |
1297 | pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { |
1298 | self.io.registration().try_io(Interest::READABLE, || { |
1299 | use std::io::Read; |
1300 | |
1301 | let dst = buf.chunk_mut(); |
1302 | let dst = |
1303 | unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; |
1304 | |
1305 | // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath, |
1306 | // which correctly handles reads into uninitialized memory. |
1307 | let n = (&*self.io).read(dst)?; |
1308 | |
1309 | unsafe { |
1310 | buf.advance_mut(n); |
1311 | } |
1312 | |
1313 | Ok(n) |
1314 | }) |
1315 | } |
1316 | } |
1317 | |
1318 | /// Converts the pipe into an [`OwnedFd`] in blocking mode. |
1319 | /// |
1320 | /// This function will deregister this pipe end from the event loop, set |
1321 | /// it in blocking mode and perform the conversion. |
1322 | pub fn into_blocking_fd(self) -> io::Result<OwnedFd> { |
1323 | let fd = self.into_nonblocking_fd()?; |
1324 | set_blocking(&fd)?; |
1325 | Ok(fd) |
1326 | } |
1327 | |
1328 | /// Converts the pipe into an [`OwnedFd`] in nonblocking mode. |
1329 | /// |
1330 | /// This function will deregister this pipe end from the event loop and |
1331 | /// perform the conversion. Returned file descriptor will be in nonblocking |
1332 | /// mode. |
1333 | pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> { |
1334 | let mio_pipe = self.io.into_inner()?; |
1335 | |
1336 | // Safety: the pipe is now deregistered from the event loop |
1337 | // and we are the only owner of this pipe end. |
1338 | let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) }; |
1339 | |
1340 | Ok(owned_fd) |
1341 | } |
1342 | } |
1343 | |
1344 | impl AsyncRead for Receiver { |
1345 | fn poll_read( |
1346 | self: Pin<&mut Self>, |
1347 | cx: &mut Context<'_>, |
1348 | buf: &mut ReadBuf<'_>, |
1349 | ) -> Poll<io::Result<()>> { |
1350 | // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath, |
1351 | // which correctly handles reads into uninitialized memory. |
1352 | unsafe { self.io.poll_read(cx, buf) } |
1353 | } |
1354 | } |
1355 | |
1356 | impl AsRawFd for Receiver { |
1357 | fn as_raw_fd(&self) -> RawFd { |
1358 | self.io.as_raw_fd() |
1359 | } |
1360 | } |
1361 | |
1362 | impl AsFd for Receiver { |
1363 | fn as_fd(&self) -> BorrowedFd<'_> { |
1364 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
1365 | } |
1366 | } |
1367 | |
1368 | /// Checks if the file descriptor is a pipe or a FIFO. |
1369 | fn is_pipe(fd: BorrowedFd<'_>) -> io::Result<bool> { |
1370 | // Safety: `libc::stat` is C-like struct used for syscalls and all-zero |
1371 | // byte pattern forms a valid value. |
1372 | let mut stat: libc::stat = unsafe { std::mem::zeroed() }; |
1373 | |
1374 | // Safety: it's safe to call `fstat` with a valid, open file descriptor |
1375 | // and a valid pointer to a `stat` struct. |
1376 | let r: i32 = unsafe { libc::fstat(fildes:fd.as_raw_fd(), &mut stat) }; |
1377 | |
1378 | if r == -1 { |
1379 | Err(io::Error::last_os_error()) |
1380 | } else { |
1381 | Ok((stat.st_mode as libc::mode_t & libc::S_IFMT) == libc::S_IFIFO) |
1382 | } |
1383 | } |
1384 | |
1385 | /// Gets file descriptor's flags by fcntl. |
1386 | fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result<libc::c_int> { |
1387 | // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor. |
1388 | let flags: i32 = unsafe { libc::fcntl(fd.as_raw_fd(), cmd:libc::F_GETFL) }; |
1389 | if flags < 0 { |
1390 | Err(io::Error::last_os_error()) |
1391 | } else { |
1392 | Ok(flags) |
1393 | } |
1394 | } |
1395 | |
1396 | /// Checks for `O_RDONLY` or `O_RDWR` access mode. |
1397 | fn has_read_access(flags: libc::c_int) -> bool { |
1398 | let mode: i32 = flags & libc::O_ACCMODE; |
1399 | mode == libc::O_RDONLY || mode == libc::O_RDWR |
1400 | } |
1401 | |
1402 | /// Checks for `O_WRONLY` or `O_RDWR` access mode. |
1403 | fn has_write_access(flags: libc::c_int) -> bool { |
1404 | let mode: i32 = flags & libc::O_ACCMODE; |
1405 | mode == libc::O_WRONLY || mode == libc::O_RDWR |
1406 | } |
1407 | |
1408 | /// Sets file descriptor's flags with `O_NONBLOCK` by fcntl. |
1409 | fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()> { |
1410 | let flags: i32 = current_flags | libc::O_NONBLOCK; |
1411 | |
1412 | if flags != current_flags { |
1413 | // Safety: it's safe to use `fcntl` to set the `O_NONBLOCK` flag of a valid, |
1414 | // open file descriptor. |
1415 | let ret: i32 = unsafe { libc::fcntl(fd.as_raw_fd(), cmd:libc::F_SETFL, flags) }; |
1416 | if ret < 0 { |
1417 | return Err(io::Error::last_os_error()); |
1418 | } |
1419 | } |
1420 | |
1421 | Ok(()) |
1422 | } |
1423 | |
1424 | /// Removes `O_NONBLOCK` from fd's flags. |
1425 | fn set_blocking<T: AsRawFd>(fd: &T) -> io::Result<()> { |
1426 | // Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor. |
1427 | let previous: i32 = unsafe { libc::fcntl(fd.as_raw_fd(), cmd:libc::F_GETFL) }; |
1428 | if previous == -1 { |
1429 | return Err(io::Error::last_os_error()); |
1430 | } |
1431 | |
1432 | let new: i32 = previous & !libc::O_NONBLOCK; |
1433 | |
1434 | // Safety: it's safe to use `fcntl` to unset the `O_NONBLOCK` flag of a valid, |
1435 | // open file descriptor. |
1436 | let r: i32 = unsafe { libc::fcntl(fd.as_raw_fd(), cmd:libc::F_SETFL, new) }; |
1437 | if r == -1 { |
1438 | Err(io::Error::last_os_error()) |
1439 | } else { |
1440 | Ok(()) |
1441 | } |
1442 | } |
1443 | |