1use crate::future::poll_fn;
2use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
3use crate::net::unix::split::{split, ReadHalf, WriteHalf};
4use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
5use crate::net::unix::ucred::{self, UCred};
6use crate::net::unix::SocketAddr;
7
8use std::fmt;
9use std::io::{self, Read, Write};
10use std::net::Shutdown;
11#[cfg(not(tokio_no_as_fd))]
12use std::os::unix::io::{AsFd, BorrowedFd};
13use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
14use std::os::unix::net;
15use std::path::Path;
16use std::pin::Pin;
17use std::task::{Context, Poll};
18
19cfg_io_util! {
20 use bytes::BufMut;
21}
22
23cfg_net_unix! {
24 /// A structure representing a connected Unix socket.
25 ///
26 /// This socket can be connected directly with [`UnixStream::connect`] or accepted
27 /// from a listener with [`UnixListener::accept`]. Additionally, a pair of
28 /// anonymous Unix sockets can be created with `UnixStream::pair`.
29 ///
30 /// To shut down the stream in the write direction, you can call the
31 /// [`shutdown()`] method. This will cause the other peer to receive a read of
32 /// length 0, indicating that no more data will be sent. This only closes
33 /// the stream in one direction.
34 ///
35 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
36 /// [`UnixListener::accept`]: crate::net::UnixListener::accept
37 #[cfg_attr(docsrs, doc(alias = "uds"))]
38 pub struct UnixStream {
39 io: PollEvented<mio::net::UnixStream>,
40 }
41}
42
43impl UnixStream {
44 /// Connects to the socket named by `path`.
45 ///
46 /// This function will create a new Unix socket and connect to the path
47 /// specified, associating the returned stream with the default event loop's
48 /// handle.
49 pub async fn connect<P>(path: P) -> io::Result<UnixStream>
50 where
51 P: AsRef<Path>,
52 {
53 let stream = mio::net::UnixStream::connect(path)?;
54 let stream = UnixStream::new(stream)?;
55
56 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
57
58 if let Some(e) = stream.io.take_error()? {
59 return Err(e);
60 }
61
62 Ok(stream)
63 }
64
65 /// Waits for any of the requested ready states.
66 ///
67 /// This function is usually paired with `try_read()` or `try_write()`. It
68 /// can be used to concurrently read / write to the same socket on a single
69 /// task without splitting the socket.
70 ///
71 /// The function may complete without the socket being ready. This is a
72 /// false-positive and attempting an operation will return with
73 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
74 /// [`Ready`] set, so you should always check the returned value and possibly
75 /// wait again if the requested states are not set.
76 ///
77 /// # Cancel safety
78 ///
79 /// This method is cancel safe. Once a readiness event occurs, the method
80 /// will continue to return immediately until the readiness event is
81 /// consumed by an attempt to read or write that fails with `WouldBlock` or
82 /// `Poll::Pending`.
83 ///
84 /// # Examples
85 ///
86 /// Concurrently read and write to the stream on the same task without
87 /// splitting.
88 ///
89 /// ```no_run
90 /// use tokio::io::Interest;
91 /// use tokio::net::UnixStream;
92 /// use std::error::Error;
93 /// use std::io;
94 ///
95 /// #[tokio::main]
96 /// async fn main() -> Result<(), Box<dyn Error>> {
97 /// let dir = tempfile::tempdir().unwrap();
98 /// let bind_path = dir.path().join("bind_path");
99 /// let stream = UnixStream::connect(bind_path).await?;
100 ///
101 /// loop {
102 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
103 ///
104 /// if ready.is_readable() {
105 /// let mut data = vec![0; 1024];
106 /// // Try to read data, this may still fail with `WouldBlock`
107 /// // if the readiness event is a false positive.
108 /// match stream.try_read(&mut data) {
109 /// Ok(n) => {
110 /// println!("read {} bytes", n);
111 /// }
112 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
113 /// continue;
114 /// }
115 /// Err(e) => {
116 /// return Err(e.into());
117 /// }
118 /// }
119 ///
120 /// }
121 ///
122 /// if ready.is_writable() {
123 /// // Try to write data, this may still fail with `WouldBlock`
124 /// // if the readiness event is a false positive.
125 /// match stream.try_write(b"hello world") {
126 /// Ok(n) => {
127 /// println!("write {} bytes", n);
128 /// }
129 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
130 /// continue;
131 /// }
132 /// Err(e) => {
133 /// return Err(e.into());
134 /// }
135 /// }
136 /// }
137 /// }
138 /// }
139 /// ```
140 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
141 let event = self.io.registration().readiness(interest).await?;
142 Ok(event.ready)
143 }
144
145 /// Waits for the socket to become readable.
146 ///
147 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
148 /// paired with `try_read()`.
149 ///
150 /// # Cancel safety
151 ///
152 /// This method is cancel safe. Once a readiness event occurs, the method
153 /// will continue to return immediately until the readiness event is
154 /// consumed by an attempt to read that fails with `WouldBlock` or
155 /// `Poll::Pending`.
156 ///
157 /// # Examples
158 ///
159 /// ```no_run
160 /// use tokio::net::UnixStream;
161 /// use std::error::Error;
162 /// use std::io;
163 ///
164 /// #[tokio::main]
165 /// async fn main() -> Result<(), Box<dyn Error>> {
166 /// // Connect to a peer
167 /// let dir = tempfile::tempdir().unwrap();
168 /// let bind_path = dir.path().join("bind_path");
169 /// let stream = UnixStream::connect(bind_path).await?;
170 ///
171 /// let mut msg = vec![0; 1024];
172 ///
173 /// loop {
174 /// // Wait for the socket to be readable
175 /// stream.readable().await?;
176 ///
177 /// // Try to read data, this may still fail with `WouldBlock`
178 /// // if the readiness event is a false positive.
179 /// match stream.try_read(&mut msg) {
180 /// Ok(n) => {
181 /// msg.truncate(n);
182 /// break;
183 /// }
184 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
185 /// continue;
186 /// }
187 /// Err(e) => {
188 /// return Err(e.into());
189 /// }
190 /// }
191 /// }
192 ///
193 /// println!("GOT = {:?}", msg);
194 /// Ok(())
195 /// }
196 /// ```
197 pub async fn readable(&self) -> io::Result<()> {
198 self.ready(Interest::READABLE).await?;
199 Ok(())
200 }
201
202 /// Polls for read readiness.
203 ///
204 /// If the unix stream is not currently ready for reading, this method will
205 /// store a clone of the `Waker` from the provided `Context`. When the unix
206 /// stream becomes ready for reading, `Waker::wake` will be called on the
207 /// waker.
208 ///
209 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
210 /// the `Waker` from the `Context` passed to the most recent call is
211 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
212 /// second, independent waker.)
213 ///
214 /// This function is intended for cases where creating and pinning a future
215 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
216 /// preferred, as this supports polling from multiple tasks at once.
217 ///
218 /// # Return value
219 ///
220 /// The function returns:
221 ///
222 /// * `Poll::Pending` if the unix stream is not ready for reading.
223 /// * `Poll::Ready(Ok(()))` if the unix stream is ready for reading.
224 /// * `Poll::Ready(Err(e))` if an error is encountered.
225 ///
226 /// # Errors
227 ///
228 /// This function may encounter any standard I/O error except `WouldBlock`.
229 ///
230 /// [`readable`]: method@Self::readable
231 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
232 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
233 }
234
235 /// Try to read data from the stream into the provided buffer, returning how
236 /// many bytes were read.
237 ///
238 /// Receives any pending data from the socket but does not wait for new data
239 /// to arrive. On success, returns the number of bytes read. Because
240 /// `try_read()` is non-blocking, the buffer does not have to be stored by
241 /// the async task and can exist entirely on the stack.
242 ///
243 /// Usually, [`readable()`] or [`ready()`] is used with this function.
244 ///
245 /// [`readable()`]: UnixStream::readable()
246 /// [`ready()`]: UnixStream::ready()
247 ///
248 /// # Return
249 ///
250 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
251 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
252 ///
253 /// 1. The stream's read half is closed and will no longer yield data.
254 /// 2. The specified buffer was 0 bytes in length.
255 ///
256 /// If the stream is not ready to read data,
257 /// `Err(io::ErrorKind::WouldBlock)` is returned.
258 ///
259 /// # Examples
260 ///
261 /// ```no_run
262 /// use tokio::net::UnixStream;
263 /// use std::error::Error;
264 /// use std::io;
265 ///
266 /// #[tokio::main]
267 /// async fn main() -> Result<(), Box<dyn Error>> {
268 /// // Connect to a peer
269 /// let dir = tempfile::tempdir().unwrap();
270 /// let bind_path = dir.path().join("bind_path");
271 /// let stream = UnixStream::connect(bind_path).await?;
272 ///
273 /// loop {
274 /// // Wait for the socket to be readable
275 /// stream.readable().await?;
276 ///
277 /// // Creating the buffer **after** the `await` prevents it from
278 /// // being stored in the async task.
279 /// let mut buf = [0; 4096];
280 ///
281 /// // Try to read data, this may still fail with `WouldBlock`
282 /// // if the readiness event is a false positive.
283 /// match stream.try_read(&mut buf) {
284 /// Ok(0) => break,
285 /// Ok(n) => {
286 /// println!("read {} bytes", n);
287 /// }
288 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
289 /// continue;
290 /// }
291 /// Err(e) => {
292 /// return Err(e.into());
293 /// }
294 /// }
295 /// }
296 ///
297 /// Ok(())
298 /// }
299 /// ```
300 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
301 self.io
302 .registration()
303 .try_io(Interest::READABLE, || (&*self.io).read(buf))
304 }
305
306 /// Tries to read data from the stream into the provided buffers, returning
307 /// how many bytes were read.
308 ///
309 /// Data is copied to fill each buffer in order, with the final buffer
310 /// written to possibly being only partially filled. This method behaves
311 /// equivalently to a single call to [`try_read()`] with concatenated
312 /// buffers.
313 ///
314 /// Receives any pending data from the socket but does not wait for new data
315 /// to arrive. On success, returns the number of bytes read. Because
316 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
317 /// stored by the async task and can exist entirely on the stack.
318 ///
319 /// Usually, [`readable()`] or [`ready()`] is used with this function.
320 ///
321 /// [`try_read()`]: UnixStream::try_read()
322 /// [`readable()`]: UnixStream::readable()
323 /// [`ready()`]: UnixStream::ready()
324 ///
325 /// # Return
326 ///
327 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
328 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
329 /// and will no longer yield data. If the stream is not ready to read data
330 /// `Err(io::ErrorKind::WouldBlock)` is returned.
331 ///
332 /// # Examples
333 ///
334 /// ```no_run
335 /// use tokio::net::UnixStream;
336 /// use std::error::Error;
337 /// use std::io::{self, IoSliceMut};
338 ///
339 /// #[tokio::main]
340 /// async fn main() -> Result<(), Box<dyn Error>> {
341 /// // Connect to a peer
342 /// let dir = tempfile::tempdir().unwrap();
343 /// let bind_path = dir.path().join("bind_path");
344 /// let stream = UnixStream::connect(bind_path).await?;
345 ///
346 /// loop {
347 /// // Wait for the socket to be readable
348 /// stream.readable().await?;
349 ///
350 /// // Creating the buffer **after** the `await` prevents it from
351 /// // being stored in the async task.
352 /// let mut buf_a = [0; 512];
353 /// let mut buf_b = [0; 1024];
354 /// let mut bufs = [
355 /// IoSliceMut::new(&mut buf_a),
356 /// IoSliceMut::new(&mut buf_b),
357 /// ];
358 ///
359 /// // Try to read data, this may still fail with `WouldBlock`
360 /// // if the readiness event is a false positive.
361 /// match stream.try_read_vectored(&mut bufs) {
362 /// Ok(0) => break,
363 /// Ok(n) => {
364 /// println!("read {} bytes", n);
365 /// }
366 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
367 /// continue;
368 /// }
369 /// Err(e) => {
370 /// return Err(e.into());
371 /// }
372 /// }
373 /// }
374 ///
375 /// Ok(())
376 /// }
377 /// ```
378 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
379 self.io
380 .registration()
381 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
382 }
383
384 cfg_io_util! {
385 /// Tries to read data from the stream into the provided buffer, advancing the
386 /// buffer's internal cursor, returning how many bytes were read.
387 ///
388 /// Receives any pending data from the socket but does not wait for new data
389 /// to arrive. On success, returns the number of bytes read. Because
390 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
391 /// the async task and can exist entirely on the stack.
392 ///
393 /// Usually, [`readable()`] or [`ready()`] is used with this function.
394 ///
395 /// [`readable()`]: UnixStream::readable()
396 /// [`ready()`]: UnixStream::ready()
397 ///
398 /// # Return
399 ///
400 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
401 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
402 /// and will no longer yield data. If the stream is not ready to read data
403 /// `Err(io::ErrorKind::WouldBlock)` is returned.
404 ///
405 /// # Examples
406 ///
407 /// ```no_run
408 /// use tokio::net::UnixStream;
409 /// use std::error::Error;
410 /// use std::io;
411 ///
412 /// #[tokio::main]
413 /// async fn main() -> Result<(), Box<dyn Error>> {
414 /// // Connect to a peer
415 /// let dir = tempfile::tempdir().unwrap();
416 /// let bind_path = dir.path().join("bind_path");
417 /// let stream = UnixStream::connect(bind_path).await?;
418 ///
419 /// loop {
420 /// // Wait for the socket to be readable
421 /// stream.readable().await?;
422 ///
423 /// let mut buf = Vec::with_capacity(4096);
424 ///
425 /// // Try to read data, this may still fail with `WouldBlock`
426 /// // if the readiness event is a false positive.
427 /// match stream.try_read_buf(&mut buf) {
428 /// Ok(0) => break,
429 /// Ok(n) => {
430 /// println!("read {} bytes", n);
431 /// }
432 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
433 /// continue;
434 /// }
435 /// Err(e) => {
436 /// return Err(e.into());
437 /// }
438 /// }
439 /// }
440 ///
441 /// Ok(())
442 /// }
443 /// ```
444 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
445 self.io.registration().try_io(Interest::READABLE, || {
446 use std::io::Read;
447
448 let dst = buf.chunk_mut();
449 let dst =
450 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
451
452 // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the
453 // buffer.
454 let n = (&*self.io).read(dst)?;
455
456 unsafe {
457 buf.advance_mut(n);
458 }
459
460 Ok(n)
461 })
462 }
463 }
464
465 /// Waits for the socket to become writable.
466 ///
467 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
468 /// paired with `try_write()`.
469 ///
470 /// # Cancel safety
471 ///
472 /// This method is cancel safe. Once a readiness event occurs, the method
473 /// will continue to return immediately until the readiness event is
474 /// consumed by an attempt to write that fails with `WouldBlock` or
475 /// `Poll::Pending`.
476 ///
477 /// # Examples
478 ///
479 /// ```no_run
480 /// use tokio::net::UnixStream;
481 /// use std::error::Error;
482 /// use std::io;
483 ///
484 /// #[tokio::main]
485 /// async fn main() -> Result<(), Box<dyn Error>> {
486 /// // Connect to a peer
487 /// let dir = tempfile::tempdir().unwrap();
488 /// let bind_path = dir.path().join("bind_path");
489 /// let stream = UnixStream::connect(bind_path).await?;
490 ///
491 /// loop {
492 /// // Wait for the socket to be writable
493 /// stream.writable().await?;
494 ///
495 /// // Try to write data, this may still fail with `WouldBlock`
496 /// // if the readiness event is a false positive.
497 /// match stream.try_write(b"hello world") {
498 /// Ok(n) => {
499 /// break;
500 /// }
501 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
502 /// continue;
503 /// }
504 /// Err(e) => {
505 /// return Err(e.into());
506 /// }
507 /// }
508 /// }
509 ///
510 /// Ok(())
511 /// }
512 /// ```
513 pub async fn writable(&self) -> io::Result<()> {
514 self.ready(Interest::WRITABLE).await?;
515 Ok(())
516 }
517
518 /// Polls for write readiness.
519 ///
520 /// If the unix stream is not currently ready for writing, this method will
521 /// store a clone of the `Waker` from the provided `Context`. When the unix
522 /// stream becomes ready for writing, `Waker::wake` will be called on the
523 /// waker.
524 ///
525 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
526 /// the `Waker` from the `Context` passed to the most recent call is
527 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
528 /// second, independent waker.)
529 ///
530 /// This function is intended for cases where creating and pinning a future
531 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
532 /// preferred, as this supports polling from multiple tasks at once.
533 ///
534 /// # Return value
535 ///
536 /// The function returns:
537 ///
538 /// * `Poll::Pending` if the unix stream is not ready for writing.
539 /// * `Poll::Ready(Ok(()))` if the unix stream is ready for writing.
540 /// * `Poll::Ready(Err(e))` if an error is encountered.
541 ///
542 /// # Errors
543 ///
544 /// This function may encounter any standard I/O error except `WouldBlock`.
545 ///
546 /// [`writable`]: method@Self::writable
547 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
548 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
549 }
550
551 /// Tries to write a buffer to the stream, returning how many bytes were
552 /// written.
553 ///
554 /// The function will attempt to write the entire contents of `buf`, but
555 /// only part of the buffer may be written.
556 ///
557 /// This function is usually paired with `writable()`.
558 ///
559 /// # Return
560 ///
561 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
562 /// number of bytes written. If the stream is not ready to write data,
563 /// `Err(io::ErrorKind::WouldBlock)` is returned.
564 ///
565 /// # Examples
566 ///
567 /// ```no_run
568 /// use tokio::net::UnixStream;
569 /// use std::error::Error;
570 /// use std::io;
571 ///
572 /// #[tokio::main]
573 /// async fn main() -> Result<(), Box<dyn Error>> {
574 /// // Connect to a peer
575 /// let dir = tempfile::tempdir().unwrap();
576 /// let bind_path = dir.path().join("bind_path");
577 /// let stream = UnixStream::connect(bind_path).await?;
578 ///
579 /// loop {
580 /// // Wait for the socket to be writable
581 /// stream.writable().await?;
582 ///
583 /// // Try to write data, this may still fail with `WouldBlock`
584 /// // if the readiness event is a false positive.
585 /// match stream.try_write(b"hello world") {
586 /// Ok(n) => {
587 /// break;
588 /// }
589 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
590 /// continue;
591 /// }
592 /// Err(e) => {
593 /// return Err(e.into());
594 /// }
595 /// }
596 /// }
597 ///
598 /// Ok(())
599 /// }
600 /// ```
601 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
602 self.io
603 .registration()
604 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
605 }
606
607 /// Tries to write several buffers to the stream, returning how many bytes
608 /// were written.
609 ///
610 /// Data is written from each buffer in order, with the final buffer read
611 /// from possible being only partially consumed. This method behaves
612 /// equivalently to a single call to [`try_write()`] with concatenated
613 /// buffers.
614 ///
615 /// This function is usually paired with `writable()`.
616 ///
617 /// [`try_write()`]: UnixStream::try_write()
618 ///
619 /// # Return
620 ///
621 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
622 /// number of bytes written. If the stream is not ready to write data,
623 /// `Err(io::ErrorKind::WouldBlock)` is returned.
624 ///
625 /// # Examples
626 ///
627 /// ```no_run
628 /// use tokio::net::UnixStream;
629 /// use std::error::Error;
630 /// use std::io;
631 ///
632 /// #[tokio::main]
633 /// async fn main() -> Result<(), Box<dyn Error>> {
634 /// // Connect to a peer
635 /// let dir = tempfile::tempdir().unwrap();
636 /// let bind_path = dir.path().join("bind_path");
637 /// let stream = UnixStream::connect(bind_path).await?;
638 ///
639 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
640 ///
641 /// loop {
642 /// // Wait for the socket to be writable
643 /// stream.writable().await?;
644 ///
645 /// // Try to write data, this may still fail with `WouldBlock`
646 /// // if the readiness event is a false positive.
647 /// match stream.try_write_vectored(&bufs) {
648 /// Ok(n) => {
649 /// break;
650 /// }
651 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
652 /// continue;
653 /// }
654 /// Err(e) => {
655 /// return Err(e.into());
656 /// }
657 /// }
658 /// }
659 ///
660 /// Ok(())
661 /// }
662 /// ```
663 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
664 self.io
665 .registration()
666 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
667 }
668
669 /// Tries to read or write from the socket using a user-provided IO operation.
670 ///
671 /// If the socket is ready, the provided closure is called. The closure
672 /// should attempt to perform IO operation on the socket by manually
673 /// calling the appropriate syscall. If the operation fails because the
674 /// socket is not actually ready, then the closure should return a
675 /// `WouldBlock` error and the readiness flag is cleared. The return value
676 /// of the closure is then returned by `try_io`.
677 ///
678 /// If the socket is not ready, then the closure is not called
679 /// and a `WouldBlock` error is returned.
680 ///
681 /// The closure should only return a `WouldBlock` error if it has performed
682 /// an IO operation on the socket that failed due to the socket not being
683 /// ready. Returning a `WouldBlock` error in any other situation will
684 /// incorrectly clear the readiness flag, which can cause the socket to
685 /// behave incorrectly.
686 ///
687 /// The closure should not perform the IO operation using any of the methods
688 /// defined on the Tokio `UnixStream` type, as this will mess with the
689 /// readiness flag and can cause the socket to behave incorrectly.
690 ///
691 /// This method is not intended to be used with combined interests.
692 /// The closure should perform only one type of IO operation, so it should not
693 /// require more than one ready state. This method may panic or sleep forever
694 /// if it is called with a combined interest.
695 ///
696 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
697 ///
698 /// [`readable()`]: UnixStream::readable()
699 /// [`writable()`]: UnixStream::writable()
700 /// [`ready()`]: UnixStream::ready()
701 pub fn try_io<R>(
702 &self,
703 interest: Interest,
704 f: impl FnOnce() -> io::Result<R>,
705 ) -> io::Result<R> {
706 self.io
707 .registration()
708 .try_io(interest, || self.io.try_io(f))
709 }
710
711 /// Reads or writes from the socket using a user-provided IO operation.
712 ///
713 /// The readiness of the socket is awaited and when the socket is ready,
714 /// the provided closure is called. The closure should attempt to perform
715 /// IO operation on the socket by manually calling the appropriate syscall.
716 /// If the operation fails because the socket is not actually ready,
717 /// then the closure should return a `WouldBlock` error. In such case the
718 /// readiness flag is cleared and the socket readiness is awaited again.
719 /// This loop is repeated until the closure returns an `Ok` or an error
720 /// other than `WouldBlock`.
721 ///
722 /// The closure should only return a `WouldBlock` error if it has performed
723 /// an IO operation on the socket that failed due to the socket not being
724 /// ready. Returning a `WouldBlock` error in any other situation will
725 /// incorrectly clear the readiness flag, which can cause the socket to
726 /// behave incorrectly.
727 ///
728 /// The closure should not perform the IO operation using any of the methods
729 /// defined on the Tokio `UnixStream` type, as this will mess with the
730 /// readiness flag and can cause the socket to behave incorrectly.
731 ///
732 /// This method is not intended to be used with combined interests.
733 /// The closure should perform only one type of IO operation, so it should not
734 /// require more than one ready state. This method may panic or sleep forever
735 /// if it is called with a combined interest.
736 pub async fn async_io<R>(
737 &self,
738 interest: Interest,
739 mut f: impl FnMut() -> io::Result<R>,
740 ) -> io::Result<R> {
741 self.io
742 .registration()
743 .async_io(interest, || self.io.try_io(&mut f))
744 .await
745 }
746
747 /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
748 ///
749 /// This function is intended to be used to wrap a UnixStream from the
750 /// standard library in the Tokio equivalent.
751 ///
752 /// # Notes
753 ///
754 /// The caller is responsible for ensuring that the stream is in
755 /// non-blocking mode. Otherwise all I/O operations on the stream
756 /// will block the thread, which will cause unexpected behavior.
757 /// Non-blocking mode can be set using [`set_nonblocking`].
758 ///
759 /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking
760 ///
761 /// # Examples
762 ///
763 /// ```no_run
764 /// use tokio::net::UnixStream;
765 /// use std::os::unix::net::UnixStream as StdUnixStream;
766 /// # use std::error::Error;
767 ///
768 /// # async fn dox() -> Result<(), Box<dyn Error>> {
769 /// let std_stream = StdUnixStream::connect("/path/to/the/socket")?;
770 /// std_stream.set_nonblocking(true)?;
771 /// let stream = UnixStream::from_std(std_stream)?;
772 /// # Ok(())
773 /// # }
774 /// ```
775 ///
776 /// # Panics
777 ///
778 /// This function panics if it is not called from within a runtime with
779 /// IO enabled.
780 ///
781 /// The runtime is usually set implicitly when this function is called
782 /// from a future driven by a tokio runtime, otherwise runtime can be set
783 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
784 #[track_caller]
785 pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
786 let stream = mio::net::UnixStream::from_std(stream);
787 let io = PollEvented::new(stream)?;
788
789 Ok(UnixStream { io })
790 }
791
792 /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`].
793 ///
794 /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking
795 /// mode set as `true`. Use [`set_nonblocking`] to change the blocking
796 /// mode if needed.
797 ///
798 /// # Examples
799 ///
800 /// ```
801 /// use std::error::Error;
802 /// use std::io::Read;
803 /// use tokio::net::UnixListener;
804 /// # use tokio::net::UnixStream;
805 /// # use tokio::io::AsyncWriteExt;
806 ///
807 /// #[tokio::main]
808 /// async fn main() -> Result<(), Box<dyn Error>> {
809 /// let dir = tempfile::tempdir().unwrap();
810 /// let bind_path = dir.path().join("bind_path");
811 ///
812 /// let mut data = [0u8; 12];
813 /// let listener = UnixListener::bind(&bind_path)?;
814 /// # let handle = tokio::spawn(async {
815 /// # let mut stream = UnixStream::connect(bind_path).await.unwrap();
816 /// # stream.write(b"Hello world!").await.unwrap();
817 /// # });
818 /// let (tokio_unix_stream, _) = listener.accept().await?;
819 /// let mut std_unix_stream = tokio_unix_stream.into_std()?;
820 /// # handle.await.expect("The task being joined has panicked");
821 /// std_unix_stream.set_nonblocking(false)?;
822 /// std_unix_stream.read_exact(&mut data)?;
823 /// # assert_eq!(b"Hello world!", &data);
824 /// Ok(())
825 /// }
826 /// ```
827 /// [`tokio::net::UnixStream`]: UnixStream
828 /// [`std::os::unix::net::UnixStream`]: std::os::unix::net::UnixStream
829 /// [`set_nonblocking`]: fn@std::os::unix::net::UnixStream::set_nonblocking
830 pub fn into_std(self) -> io::Result<std::os::unix::net::UnixStream> {
831 self.io
832 .into_inner()
833 .map(|io| io.into_raw_fd())
834 .map(|raw_fd| unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) })
835 }
836
837 /// Creates an unnamed pair of connected sockets.
838 ///
839 /// This function will create a pair of interconnected Unix sockets for
840 /// communicating back and forth between one another. Each socket will
841 /// be associated with the default event loop's handle.
842 pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
843 let (a, b) = mio::net::UnixStream::pair()?;
844 let a = UnixStream::new(a)?;
845 let b = UnixStream::new(b)?;
846
847 Ok((a, b))
848 }
849
850 pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> {
851 let io = PollEvented::new(stream)?;
852 Ok(UnixStream { io })
853 }
854
855 /// Returns the socket address of the local half of this connection.
856 ///
857 /// # Examples
858 ///
859 /// ```no_run
860 /// use tokio::net::UnixStream;
861 ///
862 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
863 /// let dir = tempfile::tempdir().unwrap();
864 /// let bind_path = dir.path().join("bind_path");
865 /// let stream = UnixStream::connect(bind_path).await?;
866 ///
867 /// println!("{:?}", stream.local_addr()?);
868 /// # Ok(())
869 /// # }
870 /// ```
871 pub fn local_addr(&self) -> io::Result<SocketAddr> {
872 self.io.local_addr().map(SocketAddr)
873 }
874
875 /// Returns the socket address of the remote half of this connection.
876 ///
877 /// # Examples
878 ///
879 /// ```no_run
880 /// use tokio::net::UnixStream;
881 ///
882 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
883 /// let dir = tempfile::tempdir().unwrap();
884 /// let bind_path = dir.path().join("bind_path");
885 /// let stream = UnixStream::connect(bind_path).await?;
886 ///
887 /// println!("{:?}", stream.peer_addr()?);
888 /// # Ok(())
889 /// # }
890 /// ```
891 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
892 self.io.peer_addr().map(SocketAddr)
893 }
894
895 /// Returns effective credentials of the process which called `connect` or `pair`.
896 pub fn peer_cred(&self) -> io::Result<UCred> {
897 ucred::get_peer_cred(self)
898 }
899
900 /// Returns the value of the `SO_ERROR` option.
901 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
902 self.io.take_error()
903 }
904
905 /// Shuts down the read, write, or both halves of this connection.
906 ///
907 /// This function will cause all pending and future I/O calls on the
908 /// specified portions to immediately return with an appropriate value
909 /// (see the documentation of `Shutdown`).
910 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
911 self.io.shutdown(how)
912 }
913
914 // These lifetime markers also appear in the generated documentation, and make
915 // it more clear that this is a *borrowed* split.
916 #[allow(clippy::needless_lifetimes)]
917 /// Splits a `UnixStream` into a read half and a write half, which can be used
918 /// to read and write the stream concurrently.
919 ///
920 /// This method is more efficient than [`into_split`], but the halves cannot be
921 /// moved into independently spawned tasks.
922 ///
923 /// [`into_split`]: Self::into_split()
924 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
925 split(self)
926 }
927
928 /// Splits a `UnixStream` into a read half and a write half, which can be used
929 /// to read and write the stream concurrently.
930 ///
931 /// Unlike [`split`], the owned halves can be moved to separate tasks, however
932 /// this comes at the cost of a heap allocation.
933 ///
934 /// **Note:** Dropping the write half will shut down the write half of the
935 /// stream. This is equivalent to calling [`shutdown()`] on the `UnixStream`.
936 ///
937 /// [`split`]: Self::split()
938 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
939 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
940 split_owned(self)
941 }
942}
943
944impl TryFrom<net::UnixStream> for UnixStream {
945 type Error = io::Error;
946
947 /// Consumes stream, returning the tokio I/O object.
948 ///
949 /// This is equivalent to
950 /// [`UnixStream::from_std(stream)`](UnixStream::from_std).
951 fn try_from(stream: net::UnixStream) -> io::Result<Self> {
952 Self::from_std(stream)
953 }
954}
955
956impl AsyncRead for UnixStream {
957 fn poll_read(
958 self: Pin<&mut Self>,
959 cx: &mut Context<'_>,
960 buf: &mut ReadBuf<'_>,
961 ) -> Poll<io::Result<()>> {
962 self.poll_read_priv(cx, buf)
963 }
964}
965
966impl AsyncWrite for UnixStream {
967 fn poll_write(
968 self: Pin<&mut Self>,
969 cx: &mut Context<'_>,
970 buf: &[u8],
971 ) -> Poll<io::Result<usize>> {
972 self.poll_write_priv(cx, buf)
973 }
974
975 fn poll_write_vectored(
976 self: Pin<&mut Self>,
977 cx: &mut Context<'_>,
978 bufs: &[io::IoSlice<'_>],
979 ) -> Poll<io::Result<usize>> {
980 self.poll_write_vectored_priv(cx, bufs)
981 }
982
983 fn is_write_vectored(&self) -> bool {
984 true
985 }
986
987 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
988 Poll::Ready(Ok(()))
989 }
990
991 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
992 self.shutdown_std(std::net::Shutdown::Write)?;
993 Poll::Ready(Ok(()))
994 }
995}
996
997impl UnixStream {
998 // == Poll IO functions that takes `&self` ==
999 //
1000 // To read or write without mutable access to the `UnixStream`, combine the
1001 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1002 // `try_write` methods.
1003
1004 pub(crate) fn poll_read_priv(
1005 &self,
1006 cx: &mut Context<'_>,
1007 buf: &mut ReadBuf<'_>,
1008 ) -> Poll<io::Result<()>> {
1009 // Safety: `UnixStream::read` correctly handles reads into uninitialized memory
1010 unsafe { self.io.poll_read(cx, buf) }
1011 }
1012
1013 pub(crate) fn poll_write_priv(
1014 &self,
1015 cx: &mut Context<'_>,
1016 buf: &[u8],
1017 ) -> Poll<io::Result<usize>> {
1018 self.io.poll_write(cx, buf)
1019 }
1020
1021 pub(super) fn poll_write_vectored_priv(
1022 &self,
1023 cx: &mut Context<'_>,
1024 bufs: &[io::IoSlice<'_>],
1025 ) -> Poll<io::Result<usize>> {
1026 self.io.poll_write_vectored(cx, bufs)
1027 }
1028}
1029
1030impl fmt::Debug for UnixStream {
1031 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1032 self.io.fmt(f)
1033 }
1034}
1035
1036impl AsRawFd for UnixStream {
1037 fn as_raw_fd(&self) -> RawFd {
1038 self.io.as_raw_fd()
1039 }
1040}
1041
1042#[cfg(not(tokio_no_as_fd))]
1043impl AsFd for UnixStream {
1044 fn as_fd(&self) -> BorrowedFd<'_> {
1045 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1046 }
1047}
1048