1 | //! Unix pipe types. |
2 | |
3 | use crate::io::interest::Interest; |
4 | use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready}; |
5 | |
6 | use mio::unix::pipe as mio_pipe; |
7 | use std::fs::File; |
8 | use std::io::{self, Read, Write}; |
9 | use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; |
10 | #[cfg (not(tokio_no_as_fd))] |
11 | use std::os::unix::io::{AsFd, BorrowedFd}; |
12 | use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; |
13 | use std::path::Path; |
14 | use std::pin::Pin; |
15 | use std::task::{Context, Poll}; |
16 | |
17 | cfg_io_util! { |
18 | use bytes::BufMut; |
19 | } |
20 | |
21 | /// Options and flags which can be used to configure how a FIFO file is opened. |
22 | /// |
23 | /// This builder allows configuring how to create a pipe end from a FIFO file. |
24 | /// Generally speaking, when using `OpenOptions`, you'll first call [`new`], |
25 | /// then chain calls to methods to set each option, then call either |
26 | /// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you |
27 | /// are trying to open. This will give you a [`io::Result`][result] with a pipe |
28 | /// end inside that you can further operate on. |
29 | /// |
30 | /// [`new`]: OpenOptions::new |
31 | /// [`open_receiver`]: OpenOptions::open_receiver |
32 | /// [`open_sender`]: OpenOptions::open_sender |
33 | /// [result]: std::io::Result |
34 | /// |
35 | /// # Examples |
36 | /// |
37 | /// Opening a pair of pipe ends from a FIFO file: |
38 | /// |
39 | /// ```no_run |
40 | /// use tokio::net::unix::pipe; |
41 | /// # use std::error::Error; |
42 | /// |
43 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
44 | /// |
45 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
46 | /// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; |
47 | /// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?; |
48 | /// # Ok(()) |
49 | /// # } |
50 | /// ``` |
51 | /// |
52 | /// Opening a [`Sender`] on Linux when you are sure the file is a FIFO: |
53 | /// |
54 | /// ```ignore |
55 | /// use tokio::net::unix::pipe; |
56 | /// use nix::{unistd::mkfifo, sys::stat::Mode}; |
57 | /// # use std::error::Error; |
58 | /// |
59 | /// // Our program has exclusive access to this path. |
60 | /// const FIFO_NAME: &str = "path/to/a/new/fifo" ; |
61 | /// |
62 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
63 | /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; |
64 | /// let tx = pipe::OpenOptions::new() |
65 | /// .read_write(true) |
66 | /// .unchecked(true) |
67 | /// .open_sender(FIFO_NAME)?; |
68 | /// # Ok(()) |
69 | /// # } |
70 | /// ``` |
71 | #[derive (Clone, Debug)] |
72 | pub struct OpenOptions { |
73 | #[cfg (target_os = "linux" )] |
74 | read_write: bool, |
75 | unchecked: bool, |
76 | } |
77 | |
78 | impl OpenOptions { |
79 | /// Creates a blank new set of options ready for configuration. |
80 | /// |
81 | /// All options are initially set to `false`. |
82 | pub fn new() -> OpenOptions { |
83 | OpenOptions { |
84 | #[cfg (target_os = "linux" )] |
85 | read_write: false, |
86 | unchecked: false, |
87 | } |
88 | } |
89 | |
90 | /// Sets the option for read-write access. |
91 | /// |
92 | /// This option, when true, will indicate that a FIFO file will be opened |
93 | /// in read-write access mode. This operation is not defined by the POSIX |
94 | /// standard and is only guaranteed to work on Linux. |
95 | /// |
96 | /// # Examples |
97 | /// |
98 | /// Opening a [`Sender`] even if there are no open reading ends: |
99 | /// |
100 | /// ```ignore |
101 | /// use tokio::net::unix::pipe; |
102 | /// |
103 | /// let tx = pipe::OpenOptions::new() |
104 | /// .read_write(true) |
105 | /// .open_sender("path/to/a/fifo" ); |
106 | /// ``` |
107 | /// |
108 | /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not |
109 | /// fail with [`UnexpectedEof`] during reading if all writing ends of the |
110 | /// pipe close the FIFO file. |
111 | /// |
112 | /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof |
113 | /// |
114 | /// ```ignore |
115 | /// use tokio::net::unix::pipe; |
116 | /// |
117 | /// let tx = pipe::OpenOptions::new() |
118 | /// .read_write(true) |
119 | /// .open_receiver("path/to/a/fifo" ); |
120 | /// ``` |
121 | #[cfg (target_os = "linux" )] |
122 | #[cfg_attr (docsrs, doc(cfg(target_os = "linux" )))] |
123 | pub fn read_write(&mut self, value: bool) -> &mut Self { |
124 | self.read_write = value; |
125 | self |
126 | } |
127 | |
128 | /// Sets the option to skip the check for FIFO file type. |
129 | /// |
130 | /// By default, [`open_receiver`] and [`open_sender`] functions will check |
131 | /// if the opened file is a FIFO file. Set this option to `true` if you are |
132 | /// sure the file is a FIFO file. |
133 | /// |
134 | /// [`open_receiver`]: OpenOptions::open_receiver |
135 | /// [`open_sender`]: OpenOptions::open_sender |
136 | /// |
137 | /// # Examples |
138 | /// |
139 | /// ```no_run |
140 | /// use tokio::net::unix::pipe; |
141 | /// use nix::{unistd::mkfifo, sys::stat::Mode}; |
142 | /// # use std::error::Error; |
143 | /// |
144 | /// // Our program has exclusive access to this path. |
145 | /// const FIFO_NAME: &str = "path/to/a/new/fifo" ; |
146 | /// |
147 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
148 | /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; |
149 | /// let rx = pipe::OpenOptions::new() |
150 | /// .unchecked(true) |
151 | /// .open_receiver(FIFO_NAME)?; |
152 | /// # Ok(()) |
153 | /// # } |
154 | /// ``` |
155 | pub fn unchecked(&mut self, value: bool) -> &mut Self { |
156 | self.unchecked = value; |
157 | self |
158 | } |
159 | |
160 | /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`. |
161 | /// |
162 | /// This function will open the FIFO file at the specified path, possibly |
163 | /// check if it is a pipe, and associate the pipe with the default event |
164 | /// loop for reading. |
165 | /// |
166 | /// # Errors |
167 | /// |
168 | /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`. |
169 | /// This function may also fail with other standard OS errors. |
170 | /// |
171 | /// # Panics |
172 | /// |
173 | /// This function panics if it is not called from within a runtime with |
174 | /// IO enabled. |
175 | /// |
176 | /// The runtime is usually set implicitly when this function is called |
177 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
178 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
179 | pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> { |
180 | let file = self.open(path.as_ref(), PipeEnd::Receiver)?; |
181 | Receiver::from_file_unchecked(file) |
182 | } |
183 | |
184 | /// Creates a [`Sender`] from a FIFO file with the options specified by `self`. |
185 | /// |
186 | /// This function will open the FIFO file at the specified path, possibly |
187 | /// check if it is a pipe, and associate the pipe with the default event |
188 | /// loop for writing. |
189 | /// |
190 | /// # Errors |
191 | /// |
192 | /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`. |
193 | /// If the file is not opened in read-write access mode and the file is not |
194 | /// currently open for reading, this function will fail with `ENXIO`. |
195 | /// This function may also fail with other standard OS errors. |
196 | /// |
197 | /// # Panics |
198 | /// |
199 | /// This function panics if it is not called from within a runtime with |
200 | /// IO enabled. |
201 | /// |
202 | /// The runtime is usually set implicitly when this function is called |
203 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
204 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
205 | pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> { |
206 | let file = self.open(path.as_ref(), PipeEnd::Sender)?; |
207 | Sender::from_file_unchecked(file) |
208 | } |
209 | |
210 | fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> { |
211 | let mut options = std::fs::OpenOptions::new(); |
212 | options |
213 | .read(pipe_end == PipeEnd::Receiver) |
214 | .write(pipe_end == PipeEnd::Sender) |
215 | .custom_flags(libc::O_NONBLOCK); |
216 | |
217 | #[cfg (target_os = "linux" )] |
218 | if self.read_write { |
219 | options.read(true).write(true); |
220 | } |
221 | |
222 | let file = options.open(path)?; |
223 | |
224 | if !self.unchecked && !is_fifo(&file)? { |
225 | return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe" )); |
226 | } |
227 | |
228 | Ok(file) |
229 | } |
230 | } |
231 | |
232 | impl Default for OpenOptions { |
233 | fn default() -> OpenOptions { |
234 | OpenOptions::new() |
235 | } |
236 | } |
237 | |
238 | #[derive (Clone, Copy, PartialEq, Eq, Debug)] |
239 | enum PipeEnd { |
240 | Sender, |
241 | Receiver, |
242 | } |
243 | |
244 | /// Writing end of a Unix pipe. |
245 | /// |
246 | /// It can be constructed from a FIFO file with [`OpenOptions::open_sender`]. |
247 | /// |
248 | /// Opening a named pipe for writing involves a few steps. |
249 | /// Call to [`OpenOptions::open_sender`] might fail with an error indicating |
250 | /// different things: |
251 | /// |
252 | /// * [`io::ErrorKind::NotFound`] - There is no file at the specified path. |
253 | /// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO. |
254 | /// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading. |
255 | /// Sleep for a while and try again. |
256 | /// * Other OS errors not specific to opening FIFO files. |
257 | /// |
258 | /// Opening a `Sender` from a FIFO file should look like this: |
259 | /// |
260 | /// ```no_run |
261 | /// use tokio::net::unix::pipe; |
262 | /// use tokio::time::{self, Duration}; |
263 | /// |
264 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
265 | /// |
266 | /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { |
267 | /// // Wait for a reader to open the file. |
268 | /// let tx = loop { |
269 | /// match pipe::OpenOptions::new().open_sender(FIFO_NAME) { |
270 | /// Ok(tx) => break tx, |
271 | /// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}, |
272 | /// Err(e) => return Err(e.into()), |
273 | /// } |
274 | /// |
275 | /// time::sleep(Duration::from_millis(50)).await; |
276 | /// }; |
277 | /// # Ok(()) |
278 | /// # } |
279 | /// ``` |
280 | /// |
281 | /// On Linux, it is possible to create a `Sender` without waiting in a sleeping |
282 | /// loop. This is done by opening a named pipe in read-write access mode with |
283 | /// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold |
284 | /// both a writing end and a reading end, and the latter allows to open a FIFO |
285 | /// without [`ENXIO`] error since the pipe is open for reading as well. |
286 | /// |
287 | /// `Sender` cannot be used to read from a pipe, so in practice the read access |
288 | /// is only used when a FIFO is opened. However, using a `Sender` in read-write |
289 | /// mode **may lead to lost data**, because written data will be dropped by the |
290 | /// system as soon as all pipe ends are closed. To avoid lost data you have to |
291 | /// make sure that a reading end has been opened before dropping a `Sender`. |
292 | /// |
293 | /// Note that using read-write access mode with FIFO files is not defined by |
294 | /// the POSIX standard and it is only guaranteed to work on Linux. |
295 | /// |
296 | /// ```ignore |
297 | /// use tokio::io::AsyncWriteExt; |
298 | /// use tokio::net::unix::pipe; |
299 | /// |
300 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
301 | /// |
302 | /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { |
303 | /// let mut tx = pipe::OpenOptions::new() |
304 | /// .read_write(true) |
305 | /// .open_sender(FIFO_NAME)?; |
306 | /// |
307 | /// // Asynchronously write to the pipe before a reader. |
308 | /// tx.write_all(b"hello world" ).await?; |
309 | /// # Ok(()) |
310 | /// # } |
311 | /// ``` |
312 | /// |
313 | /// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html |
314 | #[derive (Debug)] |
315 | pub struct Sender { |
316 | io: PollEvented<mio_pipe::Sender>, |
317 | } |
318 | |
319 | impl Sender { |
320 | fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> { |
321 | let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?; |
322 | Ok(Sender { io }) |
323 | } |
324 | |
325 | /// Creates a new `Sender` from a [`File`]. |
326 | /// |
327 | /// This function is intended to construct a pipe from a [`File`] representing |
328 | /// a special FIFO file. It will check if the file is a pipe and has write access, |
329 | /// set it in non-blocking mode and perform the conversion. |
330 | /// |
331 | /// # Errors |
332 | /// |
333 | /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it |
334 | /// does not have write access. Also fails with any standard OS error if it occurs. |
335 | /// |
336 | /// # Panics |
337 | /// |
338 | /// This function panics if it is not called from within a runtime with |
339 | /// IO enabled. |
340 | /// |
341 | /// The runtime is usually set implicitly when this function is called |
342 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
343 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
344 | pub fn from_file(mut file: File) -> io::Result<Sender> { |
345 | if !is_fifo(&file)? { |
346 | return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe" )); |
347 | } |
348 | |
349 | let flags = get_file_flags(&file)?; |
350 | if has_write_access(flags) { |
351 | set_nonblocking(&mut file, flags)?; |
352 | Sender::from_file_unchecked(file) |
353 | } else { |
354 | Err(io::Error::new( |
355 | io::ErrorKind::InvalidInput, |
356 | "not in O_WRONLY or O_RDWR access mode" , |
357 | )) |
358 | } |
359 | } |
360 | |
361 | /// Creates a new `Sender` from a [`File`] without checking pipe properties. |
362 | /// |
363 | /// This function is intended to construct a pipe from a File representing |
364 | /// a special FIFO file. The conversion assumes nothing about the underlying |
365 | /// file; it is left up to the user to make sure it is opened with write access, |
366 | /// represents a pipe and is set in non-blocking mode. |
367 | /// |
368 | /// # Examples |
369 | /// |
370 | /// ```no_run |
371 | /// use tokio::net::unix::pipe; |
372 | /// use std::fs::OpenOptions; |
373 | /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; |
374 | /// # use std::error::Error; |
375 | /// |
376 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
377 | /// |
378 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
379 | /// let file = OpenOptions::new() |
380 | /// .write(true) |
381 | /// .custom_flags(libc::O_NONBLOCK) |
382 | /// .open(FIFO_NAME)?; |
383 | /// if file.metadata()?.file_type().is_fifo() { |
384 | /// let tx = pipe::Sender::from_file_unchecked(file)?; |
385 | /// /* use the Sender */ |
386 | /// } |
387 | /// # Ok(()) |
388 | /// # } |
389 | /// ``` |
390 | /// |
391 | /// # Panics |
392 | /// |
393 | /// This function panics if it is not called from within a runtime with |
394 | /// IO enabled. |
395 | /// |
396 | /// The runtime is usually set implicitly when this function is called |
397 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
398 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
399 | pub fn from_file_unchecked(file: File) -> io::Result<Sender> { |
400 | let raw_fd = file.into_raw_fd(); |
401 | let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) }; |
402 | Sender::from_mio(mio_tx) |
403 | } |
404 | |
405 | /// Waits for any of the requested ready states. |
406 | /// |
407 | /// This function can be used instead of [`writable()`] to check the returned |
408 | /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events. |
409 | /// |
410 | /// The function may complete without the pipe being ready. This is a |
411 | /// false-positive and attempting an operation will return with |
412 | /// `io::ErrorKind::WouldBlock`. The function can also return with an empty |
413 | /// [`Ready`] set, so you should always check the returned value and possibly |
414 | /// wait again if the requested states are not set. |
415 | /// |
416 | /// [`writable()`]: Self::writable |
417 | /// |
418 | /// # Cancel safety |
419 | /// |
420 | /// This method is cancel safe. Once a readiness event occurs, the method |
421 | /// will continue to return immediately until the readiness event is |
422 | /// consumed by an attempt to write that fails with `WouldBlock` or |
423 | /// `Poll::Pending`. |
424 | pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { |
425 | let event = self.io.registration().readiness(interest).await?; |
426 | Ok(event.ready) |
427 | } |
428 | |
429 | /// Waits for the pipe to become writable. |
430 | /// |
431 | /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually |
432 | /// paired with [`try_write()`]. |
433 | /// |
434 | /// [`try_write()`]: Self::try_write |
435 | /// |
436 | /// # Examples |
437 | /// |
438 | /// ```no_run |
439 | /// use tokio::net::unix::pipe; |
440 | /// use std::io; |
441 | /// |
442 | /// #[tokio::main] |
443 | /// async fn main() -> io::Result<()> { |
444 | /// // Open a writing end of a fifo |
445 | /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo" )?; |
446 | /// |
447 | /// loop { |
448 | /// // Wait for the pipe to be writable |
449 | /// tx.writable().await?; |
450 | /// |
451 | /// // Try to write data, this may still fail with `WouldBlock` |
452 | /// // if the readiness event is a false positive. |
453 | /// match tx.try_write(b"hello world" ) { |
454 | /// Ok(n) => { |
455 | /// break; |
456 | /// } |
457 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
458 | /// continue; |
459 | /// } |
460 | /// Err(e) => { |
461 | /// return Err(e.into()); |
462 | /// } |
463 | /// } |
464 | /// } |
465 | /// |
466 | /// Ok(()) |
467 | /// } |
468 | /// ``` |
469 | pub async fn writable(&self) -> io::Result<()> { |
470 | self.ready(Interest::WRITABLE).await?; |
471 | Ok(()) |
472 | } |
473 | |
474 | /// Polls for write readiness. |
475 | /// |
476 | /// If the pipe is not currently ready for writing, this method will |
477 | /// store a clone of the `Waker` from the provided `Context`. When the pipe |
478 | /// becomes ready for writing, `Waker::wake` will be called on the waker. |
479 | /// |
480 | /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only |
481 | /// the `Waker` from the `Context` passed to the most recent call is |
482 | /// scheduled to receive a wakeup. |
483 | /// |
484 | /// This function is intended for cases where creating and pinning a future |
485 | /// via [`writable`] is not feasible. Where possible, using [`writable`] is |
486 | /// preferred, as this supports polling from multiple tasks at once. |
487 | /// |
488 | /// [`writable`]: Self::writable |
489 | /// |
490 | /// # Return value |
491 | /// |
492 | /// The function returns: |
493 | /// |
494 | /// * `Poll::Pending` if the pipe is not ready for writing. |
495 | /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing. |
496 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
497 | /// |
498 | /// # Errors |
499 | /// |
500 | /// This function may encounter any standard I/O error except `WouldBlock`. |
501 | pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
502 | self.io.registration().poll_write_ready(cx).map_ok(|_| ()) |
503 | } |
504 | |
505 | /// Tries to write a buffer to the pipe, returning how many bytes were |
506 | /// written. |
507 | /// |
508 | /// The function will attempt to write the entire contents of `buf`, but |
509 | /// only part of the buffer may be written. If the length of `buf` is not |
510 | /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the |
511 | /// write is guaranteed to be atomic, i.e. either the entire content of |
512 | /// `buf` will be written or this method will fail with `WouldBlock`. There |
513 | /// is no such guarantee if `buf` is larger than `PIPE_BUF`. |
514 | /// |
515 | /// This function is usually paired with [`writable`]. |
516 | /// |
517 | /// [`writable`]: Self::writable |
518 | /// |
519 | /// # Return |
520 | /// |
521 | /// If data is successfully written, `Ok(n)` is returned, where `n` is the |
522 | /// number of bytes written. If the pipe is not ready to write data, |
523 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
524 | /// |
525 | /// # Examples |
526 | /// |
527 | /// ```no_run |
528 | /// use tokio::net::unix::pipe; |
529 | /// use std::io; |
530 | /// |
531 | /// #[tokio::main] |
532 | /// async fn main() -> io::Result<()> { |
533 | /// // Open a writing end of a fifo |
534 | /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo" )?; |
535 | /// |
536 | /// loop { |
537 | /// // Wait for the pipe to be writable |
538 | /// tx.writable().await?; |
539 | /// |
540 | /// // Try to write data, this may still fail with `WouldBlock` |
541 | /// // if the readiness event is a false positive. |
542 | /// match tx.try_write(b"hello world" ) { |
543 | /// Ok(n) => { |
544 | /// break; |
545 | /// } |
546 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
547 | /// continue; |
548 | /// } |
549 | /// Err(e) => { |
550 | /// return Err(e.into()); |
551 | /// } |
552 | /// } |
553 | /// } |
554 | /// |
555 | /// Ok(()) |
556 | /// } |
557 | /// ``` |
558 | pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { |
559 | self.io |
560 | .registration() |
561 | .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) |
562 | } |
563 | |
564 | /// Tries to write several buffers to the pipe, returning how many bytes |
565 | /// were written. |
566 | /// |
567 | /// Data is written from each buffer in order, with the final buffer read |
568 | /// from possible being only partially consumed. This method behaves |
569 | /// equivalently to a single call to [`try_write()`] with concatenated |
570 | /// buffers. |
571 | /// |
572 | /// If the total length of buffers is not greater than `PIPE_BUF` (an OS |
573 | /// constant, 4096 under Linux), then the write is guaranteed to be atomic, |
574 | /// i.e. either the entire contents of buffers will be written or this |
575 | /// method will fail with `WouldBlock`. There is no such guarantee if the |
576 | /// total length of buffers is greater than `PIPE_BUF`. |
577 | /// |
578 | /// This function is usually paired with [`writable`]. |
579 | /// |
580 | /// [`try_write()`]: Self::try_write() |
581 | /// [`writable`]: Self::writable |
582 | /// |
583 | /// # Return |
584 | /// |
585 | /// If data is successfully written, `Ok(n)` is returned, where `n` is the |
586 | /// number of bytes written. If the pipe is not ready to write data, |
587 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
588 | /// |
589 | /// # Examples |
590 | /// |
591 | /// ```no_run |
592 | /// use tokio::net::unix::pipe; |
593 | /// use std::io; |
594 | /// |
595 | /// #[tokio::main] |
596 | /// async fn main() -> io::Result<()> { |
597 | /// // Open a writing end of a fifo |
598 | /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo" )?; |
599 | /// |
600 | /// let bufs = [io::IoSlice::new(b"hello " ), io::IoSlice::new(b"world" )]; |
601 | /// |
602 | /// loop { |
603 | /// // Wait for the pipe to be writable |
604 | /// tx.writable().await?; |
605 | /// |
606 | /// // Try to write data, this may still fail with `WouldBlock` |
607 | /// // if the readiness event is a false positive. |
608 | /// match tx.try_write_vectored(&bufs) { |
609 | /// Ok(n) => { |
610 | /// break; |
611 | /// } |
612 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
613 | /// continue; |
614 | /// } |
615 | /// Err(e) => { |
616 | /// return Err(e.into()); |
617 | /// } |
618 | /// } |
619 | /// } |
620 | /// |
621 | /// Ok(()) |
622 | /// } |
623 | /// ``` |
624 | pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { |
625 | self.io |
626 | .registration() |
627 | .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) |
628 | } |
629 | } |
630 | |
631 | impl AsyncWrite for Sender { |
632 | fn poll_write( |
633 | self: Pin<&mut Self>, |
634 | cx: &mut Context<'_>, |
635 | buf: &[u8], |
636 | ) -> Poll<io::Result<usize>> { |
637 | self.io.poll_write(cx, buf) |
638 | } |
639 | |
640 | fn poll_write_vectored( |
641 | self: Pin<&mut Self>, |
642 | cx: &mut Context<'_>, |
643 | bufs: &[io::IoSlice<'_>], |
644 | ) -> Poll<io::Result<usize>> { |
645 | self.io.poll_write_vectored(cx, bufs) |
646 | } |
647 | |
648 | fn is_write_vectored(&self) -> bool { |
649 | true |
650 | } |
651 | |
652 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { |
653 | Poll::Ready(Ok(())) |
654 | } |
655 | |
656 | fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { |
657 | Poll::Ready(Ok(())) |
658 | } |
659 | } |
660 | |
661 | impl AsRawFd for Sender { |
662 | fn as_raw_fd(&self) -> RawFd { |
663 | self.io.as_raw_fd() |
664 | } |
665 | } |
666 | |
667 | #[cfg (not(tokio_no_as_fd))] |
668 | impl AsFd for Sender { |
669 | fn as_fd(&self) -> BorrowedFd<'_> { |
670 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
671 | } |
672 | } |
673 | |
674 | /// Reading end of a Unix pipe. |
675 | /// |
676 | /// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`]. |
677 | /// |
678 | /// # Examples |
679 | /// |
680 | /// Receiving messages from a named pipe in a loop: |
681 | /// |
682 | /// ```no_run |
683 | /// use tokio::net::unix::pipe; |
684 | /// use tokio::io::{self, AsyncReadExt}; |
685 | /// |
686 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
687 | /// |
688 | /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { |
689 | /// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; |
690 | /// loop { |
691 | /// let mut msg = vec![0; 256]; |
692 | /// match rx.read_exact(&mut msg).await { |
693 | /// Ok(_) => { |
694 | /// /* handle the message */ |
695 | /// } |
696 | /// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { |
697 | /// // Writing end has been closed, we should reopen the pipe. |
698 | /// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; |
699 | /// } |
700 | /// Err(e) => return Err(e.into()), |
701 | /// } |
702 | /// } |
703 | /// # } |
704 | /// ``` |
705 | /// |
706 | /// On Linux, you can use a `Receiver` in read-write access mode to implement |
707 | /// resilient reading from a named pipe. Unlike `Receiver` opened in read-only |
708 | /// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof` |
709 | /// when the writing end is closed. This way, a `Receiver` can asynchronously |
710 | /// wait for the next writer to open the pipe. |
711 | /// |
712 | /// You should not use functions waiting for EOF such as [`read_to_end`] with |
713 | /// a `Receiver` in read-write access mode, since it **may wait forever**. |
714 | /// `Receiver` in this mode also holds an open writing end, which prevents |
715 | /// receiving EOF. |
716 | /// |
717 | /// To set the read-write access mode you can use `OpenOptions::read_write`. |
718 | /// Note that using read-write access mode with FIFO files is not defined by |
719 | /// the POSIX standard and it is only guaranteed to work on Linux. |
720 | /// |
721 | /// ```ignore |
722 | /// use tokio::net::unix::pipe; |
723 | /// use tokio::io::AsyncReadExt; |
724 | /// # use std::error::Error; |
725 | /// |
726 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
727 | /// |
728 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
729 | /// let mut rx = pipe::OpenOptions::new() |
730 | /// .read_write(true) |
731 | /// .open_receiver(FIFO_NAME)?; |
732 | /// loop { |
733 | /// let mut msg = vec![0; 256]; |
734 | /// rx.read_exact(&mut msg).await?; |
735 | /// /* handle the message */ |
736 | /// } |
737 | /// # } |
738 | /// ``` |
739 | /// |
740 | /// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end |
741 | #[derive (Debug)] |
742 | pub struct Receiver { |
743 | io: PollEvented<mio_pipe::Receiver>, |
744 | } |
745 | |
746 | impl Receiver { |
747 | fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> { |
748 | let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?; |
749 | Ok(Receiver { io }) |
750 | } |
751 | |
752 | /// Creates a new `Receiver` from a [`File`]. |
753 | /// |
754 | /// This function is intended to construct a pipe from a [`File`] representing |
755 | /// a special FIFO file. It will check if the file is a pipe and has read access, |
756 | /// set it in non-blocking mode and perform the conversion. |
757 | /// |
758 | /// # Errors |
759 | /// |
760 | /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it |
761 | /// does not have read access. Also fails with any standard OS error if it occurs. |
762 | /// |
763 | /// # Panics |
764 | /// |
765 | /// This function panics if it is not called from within a runtime with |
766 | /// IO enabled. |
767 | /// |
768 | /// The runtime is usually set implicitly when this function is called |
769 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
770 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
771 | pub fn from_file(mut file: File) -> io::Result<Receiver> { |
772 | if !is_fifo(&file)? { |
773 | return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe" )); |
774 | } |
775 | |
776 | let flags = get_file_flags(&file)?; |
777 | if has_read_access(flags) { |
778 | set_nonblocking(&mut file, flags)?; |
779 | Receiver::from_file_unchecked(file) |
780 | } else { |
781 | Err(io::Error::new( |
782 | io::ErrorKind::InvalidInput, |
783 | "not in O_RDONLY or O_RDWR access mode" , |
784 | )) |
785 | } |
786 | } |
787 | |
788 | /// Creates a new `Receiver` from a [`File`] without checking pipe properties. |
789 | /// |
790 | /// This function is intended to construct a pipe from a File representing |
791 | /// a special FIFO file. The conversion assumes nothing about the underlying |
792 | /// file; it is left up to the user to make sure it is opened with read access, |
793 | /// represents a pipe and is set in non-blocking mode. |
794 | /// |
795 | /// # Examples |
796 | /// |
797 | /// ```no_run |
798 | /// use tokio::net::unix::pipe; |
799 | /// use std::fs::OpenOptions; |
800 | /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; |
801 | /// # use std::error::Error; |
802 | /// |
803 | /// const FIFO_NAME: &str = "path/to/a/fifo" ; |
804 | /// |
805 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
806 | /// let file = OpenOptions::new() |
807 | /// .read(true) |
808 | /// .custom_flags(libc::O_NONBLOCK) |
809 | /// .open(FIFO_NAME)?; |
810 | /// if file.metadata()?.file_type().is_fifo() { |
811 | /// let rx = pipe::Receiver::from_file_unchecked(file)?; |
812 | /// /* use the Receiver */ |
813 | /// } |
814 | /// # Ok(()) |
815 | /// # } |
816 | /// ``` |
817 | /// |
818 | /// # Panics |
819 | /// |
820 | /// This function panics if it is not called from within a runtime with |
821 | /// IO enabled. |
822 | /// |
823 | /// The runtime is usually set implicitly when this function is called |
824 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
825 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
826 | pub fn from_file_unchecked(file: File) -> io::Result<Receiver> { |
827 | let raw_fd = file.into_raw_fd(); |
828 | let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) }; |
829 | Receiver::from_mio(mio_rx) |
830 | } |
831 | |
832 | /// Waits for any of the requested ready states. |
833 | /// |
834 | /// This function can be used instead of [`readable()`] to check the returned |
835 | /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events. |
836 | /// |
837 | /// The function may complete without the pipe being ready. This is a |
838 | /// false-positive and attempting an operation will return with |
839 | /// `io::ErrorKind::WouldBlock`. The function can also return with an empty |
840 | /// [`Ready`] set, so you should always check the returned value and possibly |
841 | /// wait again if the requested states are not set. |
842 | /// |
843 | /// [`readable()`]: Self::readable |
844 | /// |
845 | /// # Cancel safety |
846 | /// |
847 | /// This method is cancel safe. Once a readiness event occurs, the method |
848 | /// will continue to return immediately until the readiness event is |
849 | /// consumed by an attempt to read that fails with `WouldBlock` or |
850 | /// `Poll::Pending`. |
851 | pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { |
852 | let event = self.io.registration().readiness(interest).await?; |
853 | Ok(event.ready) |
854 | } |
855 | |
856 | /// Waits for the pipe to become readable. |
857 | /// |
858 | /// This function is equivalent to `ready(Interest::READABLE)` and is usually |
859 | /// paired with [`try_read()`]. |
860 | /// |
861 | /// [`try_read()`]: Self::try_read() |
862 | /// |
863 | /// # Examples |
864 | /// |
865 | /// ```no_run |
866 | /// use tokio::net::unix::pipe; |
867 | /// use std::io; |
868 | /// |
869 | /// #[tokio::main] |
870 | /// async fn main() -> io::Result<()> { |
871 | /// // Open a reading end of a fifo |
872 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo" )?; |
873 | /// |
874 | /// let mut msg = vec![0; 1024]; |
875 | /// |
876 | /// loop { |
877 | /// // Wait for the pipe to be readable |
878 | /// rx.readable().await?; |
879 | /// |
880 | /// // Try to read data, this may still fail with `WouldBlock` |
881 | /// // if the readiness event is a false positive. |
882 | /// match rx.try_read(&mut msg) { |
883 | /// Ok(n) => { |
884 | /// msg.truncate(n); |
885 | /// break; |
886 | /// } |
887 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
888 | /// continue; |
889 | /// } |
890 | /// Err(e) => { |
891 | /// return Err(e.into()); |
892 | /// } |
893 | /// } |
894 | /// } |
895 | /// |
896 | /// println!("GOT = {:?}" , msg); |
897 | /// Ok(()) |
898 | /// } |
899 | /// ``` |
900 | pub async fn readable(&self) -> io::Result<()> { |
901 | self.ready(Interest::READABLE).await?; |
902 | Ok(()) |
903 | } |
904 | |
905 | /// Polls for read readiness. |
906 | /// |
907 | /// If the pipe is not currently ready for reading, this method will |
908 | /// store a clone of the `Waker` from the provided `Context`. When the pipe |
909 | /// becomes ready for reading, `Waker::wake` will be called on the waker. |
910 | /// |
911 | /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only |
912 | /// the `Waker` from the `Context` passed to the most recent call is |
913 | /// scheduled to receive a wakeup. |
914 | /// |
915 | /// This function is intended for cases where creating and pinning a future |
916 | /// via [`readable`] is not feasible. Where possible, using [`readable`] is |
917 | /// preferred, as this supports polling from multiple tasks at once. |
918 | /// |
919 | /// [`readable`]: Self::readable |
920 | /// |
921 | /// # Return value |
922 | /// |
923 | /// The function returns: |
924 | /// |
925 | /// * `Poll::Pending` if the pipe is not ready for reading. |
926 | /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading. |
927 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
928 | /// |
929 | /// # Errors |
930 | /// |
931 | /// This function may encounter any standard I/O error except `WouldBlock`. |
932 | pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
933 | self.io.registration().poll_read_ready(cx).map_ok(|_| ()) |
934 | } |
935 | |
936 | /// Tries to read data from the pipe into the provided buffer, returning how |
937 | /// many bytes were read. |
938 | /// |
939 | /// Reads any pending data from the pipe but does not wait for new data |
940 | /// to arrive. On success, returns the number of bytes read. Because |
941 | /// `try_read()` is non-blocking, the buffer does not have to be stored by |
942 | /// the async task and can exist entirely on the stack. |
943 | /// |
944 | /// Usually [`readable()`] is used with this function. |
945 | /// |
946 | /// [`readable()`]: Self::readable() |
947 | /// |
948 | /// # Return |
949 | /// |
950 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
951 | /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: |
952 | /// |
953 | /// 1. The pipe's writing end is closed and will no longer write data. |
954 | /// 2. The specified buffer was 0 bytes in length. |
955 | /// |
956 | /// If the pipe is not ready to read data, |
957 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
958 | /// |
959 | /// # Examples |
960 | /// |
961 | /// ```no_run |
962 | /// use tokio::net::unix::pipe; |
963 | /// use std::io; |
964 | /// |
965 | /// #[tokio::main] |
966 | /// async fn main() -> io::Result<()> { |
967 | /// // Open a reading end of a fifo |
968 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo" )?; |
969 | /// |
970 | /// let mut msg = vec![0; 1024]; |
971 | /// |
972 | /// loop { |
973 | /// // Wait for the pipe to be readable |
974 | /// rx.readable().await?; |
975 | /// |
976 | /// // Try to read data, this may still fail with `WouldBlock` |
977 | /// // if the readiness event is a false positive. |
978 | /// match rx.try_read(&mut msg) { |
979 | /// Ok(n) => { |
980 | /// msg.truncate(n); |
981 | /// break; |
982 | /// } |
983 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
984 | /// continue; |
985 | /// } |
986 | /// Err(e) => { |
987 | /// return Err(e.into()); |
988 | /// } |
989 | /// } |
990 | /// } |
991 | /// |
992 | /// println!("GOT = {:?}" , msg); |
993 | /// Ok(()) |
994 | /// } |
995 | /// ``` |
996 | pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { |
997 | self.io |
998 | .registration() |
999 | .try_io(Interest::READABLE, || (&*self.io).read(buf)) |
1000 | } |
1001 | |
1002 | /// Tries to read data from the pipe into the provided buffers, returning |
1003 | /// how many bytes were read. |
1004 | /// |
1005 | /// Data is copied to fill each buffer in order, with the final buffer |
1006 | /// written to possibly being only partially filled. This method behaves |
1007 | /// equivalently to a single call to [`try_read()`] with concatenated |
1008 | /// buffers. |
1009 | /// |
1010 | /// Reads any pending data from the pipe but does not wait for new data |
1011 | /// to arrive. On success, returns the number of bytes read. Because |
1012 | /// `try_read_vectored()` is non-blocking, the buffer does not have to be |
1013 | /// stored by the async task and can exist entirely on the stack. |
1014 | /// |
1015 | /// Usually, [`readable()`] is used with this function. |
1016 | /// |
1017 | /// [`try_read()`]: Self::try_read() |
1018 | /// [`readable()`]: Self::readable() |
1019 | /// |
1020 | /// # Return |
1021 | /// |
1022 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
1023 | /// number of bytes read. `Ok(0)` indicates the pipe's writing end is |
1024 | /// closed and will no longer write data. If the pipe is not ready to read |
1025 | /// data `Err(io::ErrorKind::WouldBlock)` is returned. |
1026 | /// |
1027 | /// # Examples |
1028 | /// |
1029 | /// ```no_run |
1030 | /// use tokio::net::unix::pipe; |
1031 | /// use std::io; |
1032 | /// |
1033 | /// #[tokio::main] |
1034 | /// async fn main() -> io::Result<()> { |
1035 | /// // Open a reading end of a fifo |
1036 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo" )?; |
1037 | /// |
1038 | /// loop { |
1039 | /// // Wait for the pipe to be readable |
1040 | /// rx.readable().await?; |
1041 | /// |
1042 | /// // Creating the buffer **after** the `await` prevents it from |
1043 | /// // being stored in the async task. |
1044 | /// let mut buf_a = [0; 512]; |
1045 | /// let mut buf_b = [0; 1024]; |
1046 | /// let mut bufs = [ |
1047 | /// io::IoSliceMut::new(&mut buf_a), |
1048 | /// io::IoSliceMut::new(&mut buf_b), |
1049 | /// ]; |
1050 | /// |
1051 | /// // Try to read data, this may still fail with `WouldBlock` |
1052 | /// // if the readiness event is a false positive. |
1053 | /// match rx.try_read_vectored(&mut bufs) { |
1054 | /// Ok(0) => break, |
1055 | /// Ok(n) => { |
1056 | /// println!("read {} bytes" , n); |
1057 | /// } |
1058 | /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
1059 | /// continue; |
1060 | /// } |
1061 | /// Err(e) => { |
1062 | /// return Err(e.into()); |
1063 | /// } |
1064 | /// } |
1065 | /// } |
1066 | /// |
1067 | /// Ok(()) |
1068 | /// } |
1069 | /// ``` |
1070 | pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { |
1071 | self.io |
1072 | .registration() |
1073 | .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) |
1074 | } |
1075 | |
1076 | cfg_io_util! { |
1077 | /// Tries to read data from the pipe into the provided buffer, advancing the |
1078 | /// buffer's internal cursor, returning how many bytes were read. |
1079 | /// |
1080 | /// Reads any pending data from the pipe but does not wait for new data |
1081 | /// to arrive. On success, returns the number of bytes read. Because |
1082 | /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by |
1083 | /// the async task and can exist entirely on the stack. |
1084 | /// |
1085 | /// Usually, [`readable()`] or [`ready()`] is used with this function. |
1086 | /// |
1087 | /// [`readable()`]: Self::readable |
1088 | /// [`ready()`]: Self::ready |
1089 | /// |
1090 | /// # Return |
1091 | /// |
1092 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
1093 | /// number of bytes read. `Ok(0)` indicates the pipe's writing end is |
1094 | /// closed and will no longer write data. If the pipe is not ready to read |
1095 | /// data `Err(io::ErrorKind::WouldBlock)` is returned. |
1096 | /// |
1097 | /// # Examples |
1098 | /// |
1099 | /// ```no_run |
1100 | /// use tokio::net::unix::pipe; |
1101 | /// use std::io; |
1102 | /// |
1103 | /// #[tokio::main] |
1104 | /// async fn main() -> io::Result<()> { |
1105 | /// // Open a reading end of a fifo |
1106 | /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?; |
1107 | /// |
1108 | /// loop { |
1109 | /// // Wait for the pipe to be readable |
1110 | /// rx.readable().await?; |
1111 | /// |
1112 | /// let mut buf = Vec::with_capacity(4096); |
1113 | /// |
1114 | /// // Try to read data, this may still fail with `WouldBlock` |
1115 | /// // if the readiness event is a false positive. |
1116 | /// match rx.try_read_buf(&mut buf) { |
1117 | /// Ok(0) => break, |
1118 | /// Ok(n) => { |
1119 | /// println!("read {} bytes", n); |
1120 | /// } |
1121 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
1122 | /// continue; |
1123 | /// } |
1124 | /// Err(e) => { |
1125 | /// return Err(e.into()); |
1126 | /// } |
1127 | /// } |
1128 | /// } |
1129 | /// |
1130 | /// Ok(()) |
1131 | /// } |
1132 | /// ``` |
1133 | pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { |
1134 | self.io.registration().try_io(Interest::READABLE, || { |
1135 | use std::io::Read; |
1136 | |
1137 | let dst = buf.chunk_mut(); |
1138 | let dst = |
1139 | unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; |
1140 | |
1141 | // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath, |
1142 | // which correctly handles reads into uninitialized memory. |
1143 | let n = (&*self.io).read(dst)?; |
1144 | |
1145 | unsafe { |
1146 | buf.advance_mut(n); |
1147 | } |
1148 | |
1149 | Ok(n) |
1150 | }) |
1151 | } |
1152 | } |
1153 | } |
1154 | |
1155 | impl AsyncRead for Receiver { |
1156 | fn poll_read( |
1157 | self: Pin<&mut Self>, |
1158 | cx: &mut Context<'_>, |
1159 | buf: &mut ReadBuf<'_>, |
1160 | ) -> Poll<io::Result<()>> { |
1161 | // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath, |
1162 | // which correctly handles reads into uninitialized memory. |
1163 | unsafe { self.io.poll_read(cx, buf) } |
1164 | } |
1165 | } |
1166 | |
1167 | impl AsRawFd for Receiver { |
1168 | fn as_raw_fd(&self) -> RawFd { |
1169 | self.io.as_raw_fd() |
1170 | } |
1171 | } |
1172 | |
1173 | #[cfg (not(tokio_no_as_fd))] |
1174 | impl AsFd for Receiver { |
1175 | fn as_fd(&self) -> BorrowedFd<'_> { |
1176 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
1177 | } |
1178 | } |
1179 | |
1180 | /// Checks if file is a FIFO |
1181 | fn is_fifo(file: &File) -> io::Result<bool> { |
1182 | Ok(file.metadata()?.file_type().is_fifo()) |
1183 | } |
1184 | |
1185 | /// Gets file descriptor's flags by fcntl. |
1186 | fn get_file_flags(file: &File) -> io::Result<libc::c_int> { |
1187 | let fd: i32 = file.as_raw_fd(); |
1188 | let flags: i32 = unsafe { libc::fcntl(fd, cmd:libc::F_GETFL) }; |
1189 | if flags < 0 { |
1190 | Err(io::Error::last_os_error()) |
1191 | } else { |
1192 | Ok(flags) |
1193 | } |
1194 | } |
1195 | |
1196 | /// Checks for O_RDONLY or O_RDWR access mode. |
1197 | fn has_read_access(flags: libc::c_int) -> bool { |
1198 | let mode: i32 = flags & libc::O_ACCMODE; |
1199 | mode == libc::O_RDONLY || mode == libc::O_RDWR |
1200 | } |
1201 | |
1202 | /// Checks for O_WRONLY or O_RDWR access mode. |
1203 | fn has_write_access(flags: libc::c_int) -> bool { |
1204 | let mode: i32 = flags & libc::O_ACCMODE; |
1205 | mode == libc::O_WRONLY || mode == libc::O_RDWR |
1206 | } |
1207 | |
1208 | /// Sets file's flags with O_NONBLOCK by fcntl. |
1209 | fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> { |
1210 | let fd: i32 = file.as_raw_fd(); |
1211 | |
1212 | let flags: i32 = current_flags | libc::O_NONBLOCK; |
1213 | |
1214 | if flags != current_flags { |
1215 | let ret: i32 = unsafe { libc::fcntl(fd, cmd:libc::F_SETFL, flags) }; |
1216 | if ret < 0 { |
1217 | return Err(io::Error::last_os_error()); |
1218 | } |
1219 | } |
1220 | |
1221 | Ok(()) |
1222 | } |
1223 | |