1use crate::io::{Interest, PollEvented, ReadBuf, Ready};
2use crate::net::unix::SocketAddr;
3
4use std::fmt;
5use std::io;
6use std::net::Shutdown;
7#[cfg(not(tokio_no_as_fd))]
8use std::os::unix::io::{AsFd, BorrowedFd};
9use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
10use std::os::unix::net;
11use std::path::Path;
12use std::task::{Context, Poll};
13
14cfg_io_util! {
15 use bytes::BufMut;
16}
17
18cfg_net_unix! {
19 /// An I/O object representing a Unix datagram socket.
20 ///
21 /// A socket can be either named (associated with a filesystem path) or
22 /// unnamed.
23 ///
24 /// This type does not provide a `split` method, because this functionality
25 /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
26 /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>`
27 /// is enough. This is because all of the methods take `&self` instead of
28 /// `&mut self`.
29 ///
30 /// **Note:** named sockets are persisted even after the object is dropped
31 /// and the program has exited, and cannot be reconnected. It is advised
32 /// that you either check for and unlink the existing socket if it exists,
33 /// or use a temporary file that is guaranteed to not already exist.
34 ///
35 /// [`Arc`]: std::sync::Arc
36 ///
37 /// # Examples
38 /// Using named sockets, associated with a filesystem path:
39 /// ```
40 /// # use std::error::Error;
41 /// # #[tokio::main]
42 /// # async fn main() -> Result<(), Box<dyn Error>> {
43 /// use tokio::net::UnixDatagram;
44 /// use tempfile::tempdir;
45 ///
46 /// // We use a temporary directory so that the socket
47 /// // files left by the bound sockets will get cleaned up.
48 /// let tmp = tempdir()?;
49 ///
50 /// // Bind each socket to a filesystem path
51 /// let tx_path = tmp.path().join("tx");
52 /// let tx = UnixDatagram::bind(&tx_path)?;
53 /// let rx_path = tmp.path().join("rx");
54 /// let rx = UnixDatagram::bind(&rx_path)?;
55 ///
56 /// let bytes = b"hello world";
57 /// tx.send_to(bytes, &rx_path).await?;
58 ///
59 /// let mut buf = vec![0u8; 24];
60 /// let (size, addr) = rx.recv_from(&mut buf).await?;
61 ///
62 /// let dgram = &buf[..size];
63 /// assert_eq!(dgram, bytes);
64 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
65 ///
66 /// # Ok(())
67 /// # }
68 /// ```
69 ///
70 /// Using unnamed sockets, created as a pair
71 /// ```
72 /// # use std::error::Error;
73 /// # #[tokio::main]
74 /// # async fn main() -> Result<(), Box<dyn Error>> {
75 /// use tokio::net::UnixDatagram;
76 ///
77 /// // Create the pair of sockets
78 /// let (sock1, sock2) = UnixDatagram::pair()?;
79 ///
80 /// // Since the sockets are paired, the paired send/recv
81 /// // functions can be used
82 /// let bytes = b"hello world";
83 /// sock1.send(bytes).await?;
84 ///
85 /// let mut buff = vec![0u8; 24];
86 /// let size = sock2.recv(&mut buff).await?;
87 ///
88 /// let dgram = &buff[..size];
89 /// assert_eq!(dgram, bytes);
90 ///
91 /// # Ok(())
92 /// # }
93 /// ```
94 #[cfg_attr(docsrs, doc(alias = "uds"))]
95 pub struct UnixDatagram {
96 io: PollEvented<mio::net::UnixDatagram>,
97 }
98}
99
100impl UnixDatagram {
101 /// Waits for any of the requested ready states.
102 ///
103 /// This function is usually paired with `try_recv()` or `try_send()`. It
104 /// can be used to concurrently recv / send to the same socket on a single
105 /// task without splitting the socket.
106 ///
107 /// The function may complete without the socket being ready. This is a
108 /// false-positive and attempting an operation will return with
109 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
110 /// [`Ready`] set, so you should always check the returned value and possibly
111 /// wait again if the requested states are not set.
112 ///
113 /// # Cancel safety
114 ///
115 /// This method is cancel safe. Once a readiness event occurs, the method
116 /// will continue to return immediately until the readiness event is
117 /// consumed by an attempt to read or write that fails with `WouldBlock` or
118 /// `Poll::Pending`.
119 ///
120 /// # Examples
121 ///
122 /// Concurrently receive from and send to the socket on the same task
123 /// without splitting.
124 ///
125 /// ```no_run
126 /// use tokio::io::Interest;
127 /// use tokio::net::UnixDatagram;
128 /// use std::io;
129 ///
130 /// #[tokio::main]
131 /// async fn main() -> io::Result<()> {
132 /// let dir = tempfile::tempdir().unwrap();
133 /// let client_path = dir.path().join("client.sock");
134 /// let server_path = dir.path().join("server.sock");
135 /// let socket = UnixDatagram::bind(&client_path)?;
136 /// socket.connect(&server_path)?;
137 ///
138 /// loop {
139 /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
140 ///
141 /// if ready.is_readable() {
142 /// let mut data = [0; 1024];
143 /// match socket.try_recv(&mut data[..]) {
144 /// Ok(n) => {
145 /// println!("received {:?}", &data[..n]);
146 /// }
147 /// // False-positive, continue
148 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
149 /// Err(e) => {
150 /// return Err(e);
151 /// }
152 /// }
153 /// }
154 ///
155 /// if ready.is_writable() {
156 /// // Write some data
157 /// match socket.try_send(b"hello world") {
158 /// Ok(n) => {
159 /// println!("sent {} bytes", n);
160 /// }
161 /// // False-positive, continue
162 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
163 /// Err(e) => {
164 /// return Err(e);
165 /// }
166 /// }
167 /// }
168 /// }
169 /// }
170 /// ```
171 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
172 let event = self.io.registration().readiness(interest).await?;
173 Ok(event.ready)
174 }
175
176 /// Waits for the socket to become writable.
177 ///
178 /// This function is equivalent to `ready(Interest::WRITABLE)` and is
179 /// usually paired with `try_send()` or `try_send_to()`.
180 ///
181 /// The function may complete without the socket being writable. This is a
182 /// false-positive and attempting a `try_send()` will return with
183 /// `io::ErrorKind::WouldBlock`.
184 ///
185 /// # Cancel safety
186 ///
187 /// This method is cancel safe. Once a readiness event occurs, the method
188 /// will continue to return immediately until the readiness event is
189 /// consumed by an attempt to write that fails with `WouldBlock` or
190 /// `Poll::Pending`.
191 ///
192 /// # Examples
193 ///
194 /// ```no_run
195 /// use tokio::net::UnixDatagram;
196 /// use std::io;
197 ///
198 /// #[tokio::main]
199 /// async fn main() -> io::Result<()> {
200 /// let dir = tempfile::tempdir().unwrap();
201 /// let client_path = dir.path().join("client.sock");
202 /// let server_path = dir.path().join("server.sock");
203 /// let socket = UnixDatagram::bind(&client_path)?;
204 /// socket.connect(&server_path)?;
205 ///
206 /// loop {
207 /// // Wait for the socket to be writable
208 /// socket.writable().await?;
209 ///
210 /// // Try to send data, this may still fail with `WouldBlock`
211 /// // if the readiness event is a false positive.
212 /// match socket.try_send(b"hello world") {
213 /// Ok(n) => {
214 /// break;
215 /// }
216 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
217 /// continue;
218 /// }
219 /// Err(e) => {
220 /// return Err(e);
221 /// }
222 /// }
223 /// }
224 ///
225 /// Ok(())
226 /// }
227 /// ```
228 pub async fn writable(&self) -> io::Result<()> {
229 self.ready(Interest::WRITABLE).await?;
230 Ok(())
231 }
232
233 /// Polls for write/send readiness.
234 ///
235 /// If the socket is not currently ready for sending, this method will
236 /// store a clone of the `Waker` from the provided `Context`. When the socket
237 /// becomes ready for sending, `Waker::wake` will be called on the
238 /// waker.
239 ///
240 /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
241 /// the `Waker` from the `Context` passed to the most recent call is
242 /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
243 /// second, independent waker.)
244 ///
245 /// This function is intended for cases where creating and pinning a future
246 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
247 /// preferred, as this supports polling from multiple tasks at once.
248 ///
249 /// # Return value
250 ///
251 /// The function returns:
252 ///
253 /// * `Poll::Pending` if the socket is not ready for writing.
254 /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
255 /// * `Poll::Ready(Err(e))` if an error is encountered.
256 ///
257 /// # Errors
258 ///
259 /// This function may encounter any standard I/O error except `WouldBlock`.
260 ///
261 /// [`writable`]: method@Self::writable
262 pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
263 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
264 }
265
266 /// Waits for the socket to become readable.
267 ///
268 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
269 /// paired with `try_recv()`.
270 ///
271 /// The function may complete without the socket being readable. This is a
272 /// false-positive and attempting a `try_recv()` will return with
273 /// `io::ErrorKind::WouldBlock`.
274 ///
275 /// # Cancel safety
276 ///
277 /// This method is cancel safe. Once a readiness event occurs, the method
278 /// will continue to return immediately until the readiness event is
279 /// consumed by an attempt to read that fails with `WouldBlock` or
280 /// `Poll::Pending`.
281 ///
282 /// # Examples
283 ///
284 /// ```no_run
285 /// use tokio::net::UnixDatagram;
286 /// use std::io;
287 ///
288 /// #[tokio::main]
289 /// async fn main() -> io::Result<()> {
290 /// // Connect to a peer
291 /// let dir = tempfile::tempdir().unwrap();
292 /// let client_path = dir.path().join("client.sock");
293 /// let server_path = dir.path().join("server.sock");
294 /// let socket = UnixDatagram::bind(&client_path)?;
295 /// socket.connect(&server_path)?;
296 ///
297 /// loop {
298 /// // Wait for the socket to be readable
299 /// socket.readable().await?;
300 ///
301 /// // The buffer is **not** included in the async task and will
302 /// // only exist on the stack.
303 /// let mut buf = [0; 1024];
304 ///
305 /// // Try to recv data, this may still fail with `WouldBlock`
306 /// // if the readiness event is a false positive.
307 /// match socket.try_recv(&mut buf) {
308 /// Ok(n) => {
309 /// println!("GOT {:?}", &buf[..n]);
310 /// break;
311 /// }
312 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
313 /// continue;
314 /// }
315 /// Err(e) => {
316 /// return Err(e);
317 /// }
318 /// }
319 /// }
320 ///
321 /// Ok(())
322 /// }
323 /// ```
324 pub async fn readable(&self) -> io::Result<()> {
325 self.ready(Interest::READABLE).await?;
326 Ok(())
327 }
328
329 /// Polls for read/receive readiness.
330 ///
331 /// If the socket is not currently ready for receiving, this method will
332 /// store a clone of the `Waker` from the provided `Context`. When the
333 /// socket becomes ready for reading, `Waker::wake` will be called on the
334 /// waker.
335 ///
336 /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
337 /// `poll_peek`, only the `Waker` from the `Context` passed to the most
338 /// recent call is scheduled to receive a wakeup. (However,
339 /// `poll_send_ready` retains a second, independent waker.)
340 ///
341 /// This function is intended for cases where creating and pinning a future
342 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
343 /// preferred, as this supports polling from multiple tasks at once.
344 ///
345 /// # Return value
346 ///
347 /// The function returns:
348 ///
349 /// * `Poll::Pending` if the socket is not ready for reading.
350 /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
351 /// * `Poll::Ready(Err(e))` if an error is encountered.
352 ///
353 /// # Errors
354 ///
355 /// This function may encounter any standard I/O error except `WouldBlock`.
356 ///
357 /// [`readable`]: method@Self::readable
358 pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
359 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
360 }
361
362 /// Creates a new `UnixDatagram` bound to the specified path.
363 ///
364 /// # Examples
365 /// ```
366 /// # use std::error::Error;
367 /// # #[tokio::main]
368 /// # async fn main() -> Result<(), Box<dyn Error>> {
369 /// use tokio::net::UnixDatagram;
370 /// use tempfile::tempdir;
371 ///
372 /// // We use a temporary directory so that the socket
373 /// // files left by the bound sockets will get cleaned up.
374 /// let tmp = tempdir()?;
375 ///
376 /// // Bind the socket to a filesystem path
377 /// let socket_path = tmp.path().join("socket");
378 /// let socket = UnixDatagram::bind(&socket_path)?;
379 ///
380 /// # Ok(())
381 /// # }
382 /// ```
383 pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
384 where
385 P: AsRef<Path>,
386 {
387 let socket = mio::net::UnixDatagram::bind(path)?;
388 UnixDatagram::new(socket)
389 }
390
391 /// Creates an unnamed pair of connected sockets.
392 ///
393 /// This function will create a pair of interconnected Unix sockets for
394 /// communicating back and forth between one another.
395 ///
396 /// # Examples
397 /// ```
398 /// # use std::error::Error;
399 /// # #[tokio::main]
400 /// # async fn main() -> Result<(), Box<dyn Error>> {
401 /// use tokio::net::UnixDatagram;
402 ///
403 /// // Create the pair of sockets
404 /// let (sock1, sock2) = UnixDatagram::pair()?;
405 ///
406 /// // Since the sockets are paired, the paired send/recv
407 /// // functions can be used
408 /// let bytes = b"hail eris";
409 /// sock1.send(bytes).await?;
410 ///
411 /// let mut buff = vec![0u8; 24];
412 /// let size = sock2.recv(&mut buff).await?;
413 ///
414 /// let dgram = &buff[..size];
415 /// assert_eq!(dgram, bytes);
416 ///
417 /// # Ok(())
418 /// # }
419 /// ```
420 pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
421 let (a, b) = mio::net::UnixDatagram::pair()?;
422 let a = UnixDatagram::new(a)?;
423 let b = UnixDatagram::new(b)?;
424
425 Ok((a, b))
426 }
427
428 /// Creates new `UnixDatagram` from a `std::os::unix::net::UnixDatagram`.
429 ///
430 /// This function is intended to be used to wrap a UnixDatagram from the
431 /// standard library in the Tokio equivalent.
432 ///
433 /// # Notes
434 ///
435 /// The caller is responsible for ensuring that the socker is in
436 /// non-blocking mode. Otherwise all I/O operations on the socket
437 /// will block the thread, which will cause unexpected behavior.
438 /// Non-blocking mode can be set using [`set_nonblocking`].
439 ///
440 /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking
441 ///
442 /// # Panics
443 ///
444 /// This function panics if it is not called from within a runtime with
445 /// IO enabled.
446 ///
447 /// The runtime is usually set implicitly when this function is called
448 /// from a future driven by a Tokio runtime, otherwise runtime can be set
449 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
450 /// # Examples
451 /// ```
452 /// # use std::error::Error;
453 /// # #[tokio::main]
454 /// # async fn main() -> Result<(), Box<dyn Error>> {
455 /// use tokio::net::UnixDatagram;
456 /// use std::os::unix::net::UnixDatagram as StdUDS;
457 /// use tempfile::tempdir;
458 ///
459 /// // We use a temporary directory so that the socket
460 /// // files left by the bound sockets will get cleaned up.
461 /// let tmp = tempdir()?;
462 ///
463 /// // Bind the socket to a filesystem path
464 /// let socket_path = tmp.path().join("socket");
465 /// let std_socket = StdUDS::bind(&socket_path)?;
466 /// std_socket.set_nonblocking(true)?;
467 /// let tokio_socket = UnixDatagram::from_std(std_socket)?;
468 ///
469 /// # Ok(())
470 /// # }
471 /// ```
472 #[track_caller]
473 pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
474 let socket = mio::net::UnixDatagram::from_std(datagram);
475 let io = PollEvented::new(socket)?;
476 Ok(UnixDatagram { io })
477 }
478
479 /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
480 ///
481 /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
482 /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
483 /// if needed.
484 ///
485 /// # Examples
486 ///
487 /// ```rust,no_run
488 /// # use std::error::Error;
489 /// # async fn dox() -> Result<(), Box<dyn Error>> {
490 /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?;
491 /// let std_socket = tokio_socket.into_std()?;
492 /// std_socket.set_nonblocking(false)?;
493 /// # Ok(())
494 /// # }
495 /// ```
496 ///
497 /// [`tokio::net::UnixDatagram`]: UnixDatagram
498 /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram
499 /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking
500 pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> {
501 self.io
502 .into_inner()
503 .map(|io| io.into_raw_fd())
504 .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) })
505 }
506
507 fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
508 let io = PollEvented::new(socket)?;
509 Ok(UnixDatagram { io })
510 }
511
512 /// Creates a new `UnixDatagram` which is not bound to any address.
513 ///
514 /// # Examples
515 /// ```
516 /// # use std::error::Error;
517 /// # #[tokio::main]
518 /// # async fn main() -> Result<(), Box<dyn Error>> {
519 /// use tokio::net::UnixDatagram;
520 /// use tempfile::tempdir;
521 ///
522 /// // Create an unbound socket
523 /// let tx = UnixDatagram::unbound()?;
524 ///
525 /// // Create another, bound socket
526 /// let tmp = tempdir()?;
527 /// let rx_path = tmp.path().join("rx");
528 /// let rx = UnixDatagram::bind(&rx_path)?;
529 ///
530 /// // Send to the bound socket
531 /// let bytes = b"hello world";
532 /// tx.send_to(bytes, &rx_path).await?;
533 ///
534 /// let mut buf = vec![0u8; 24];
535 /// let (size, addr) = rx.recv_from(&mut buf).await?;
536 ///
537 /// let dgram = &buf[..size];
538 /// assert_eq!(dgram, bytes);
539 ///
540 /// # Ok(())
541 /// # }
542 /// ```
543 pub fn unbound() -> io::Result<UnixDatagram> {
544 let socket = mio::net::UnixDatagram::unbound()?;
545 UnixDatagram::new(socket)
546 }
547
548 /// Connects the socket to the specified address.
549 ///
550 /// The `send` method may be used to send data to the specified address.
551 /// `recv` and `recv_from` will only receive data from that address.
552 ///
553 /// # Examples
554 /// ```
555 /// # use std::error::Error;
556 /// # #[tokio::main]
557 /// # async fn main() -> Result<(), Box<dyn Error>> {
558 /// use tokio::net::UnixDatagram;
559 /// use tempfile::tempdir;
560 ///
561 /// // Create an unbound socket
562 /// let tx = UnixDatagram::unbound()?;
563 ///
564 /// // Create another, bound socket
565 /// let tmp = tempdir()?;
566 /// let rx_path = tmp.path().join("rx");
567 /// let rx = UnixDatagram::bind(&rx_path)?;
568 ///
569 /// // Connect to the bound socket
570 /// tx.connect(&rx_path)?;
571 ///
572 /// // Send to the bound socket
573 /// let bytes = b"hello world";
574 /// tx.send(bytes).await?;
575 ///
576 /// let mut buf = vec![0u8; 24];
577 /// let (size, addr) = rx.recv_from(&mut buf).await?;
578 ///
579 /// let dgram = &buf[..size];
580 /// assert_eq!(dgram, bytes);
581 ///
582 /// # Ok(())
583 /// # }
584 /// ```
585 pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
586 self.io.connect(path)
587 }
588
589 /// Sends data on the socket to the socket's peer.
590 ///
591 /// # Cancel safety
592 ///
593 /// This method is cancel safe. If `send` is used as the event in a
594 /// [`tokio::select!`](crate::select) statement and some other branch
595 /// completes first, then it is guaranteed that the message was not sent.
596 ///
597 /// # Examples
598 /// ```
599 /// # use std::error::Error;
600 /// # #[tokio::main]
601 /// # async fn main() -> Result<(), Box<dyn Error>> {
602 /// use tokio::net::UnixDatagram;
603 ///
604 /// // Create the pair of sockets
605 /// let (sock1, sock2) = UnixDatagram::pair()?;
606 ///
607 /// // Since the sockets are paired, the paired send/recv
608 /// // functions can be used
609 /// let bytes = b"hello world";
610 /// sock1.send(bytes).await?;
611 ///
612 /// let mut buff = vec![0u8; 24];
613 /// let size = sock2.recv(&mut buff).await?;
614 ///
615 /// let dgram = &buff[..size];
616 /// assert_eq!(dgram, bytes);
617 ///
618 /// # Ok(())
619 /// # }
620 /// ```
621 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
622 self.io
623 .registration()
624 .async_io(Interest::WRITABLE, || self.io.send(buf))
625 .await
626 }
627
628 /// Tries to send a datagram to the peer without waiting.
629 ///
630 /// # Examples
631 ///
632 /// ```no_run
633 /// use tokio::net::UnixDatagram;
634 /// use std::io;
635 ///
636 /// #[tokio::main]
637 /// async fn main() -> io::Result<()> {
638 /// let dir = tempfile::tempdir().unwrap();
639 /// let client_path = dir.path().join("client.sock");
640 /// let server_path = dir.path().join("server.sock");
641 /// let socket = UnixDatagram::bind(&client_path)?;
642 /// socket.connect(&server_path)?;
643 ///
644 /// loop {
645 /// // Wait for the socket to be writable
646 /// socket.writable().await?;
647 ///
648 /// // Try to send data, this may still fail with `WouldBlock`
649 /// // if the readiness event is a false positive.
650 /// match socket.try_send(b"hello world") {
651 /// Ok(n) => {
652 /// break;
653 /// }
654 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
655 /// continue;
656 /// }
657 /// Err(e) => {
658 /// return Err(e);
659 /// }
660 /// }
661 /// }
662 ///
663 /// Ok(())
664 /// }
665 /// ```
666 pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
667 self.io
668 .registration()
669 .try_io(Interest::WRITABLE, || self.io.send(buf))
670 }
671
672 /// Tries to send a datagram to the peer without waiting.
673 ///
674 /// # Examples
675 ///
676 /// ```no_run
677 /// use tokio::net::UnixDatagram;
678 /// use std::io;
679 ///
680 /// #[tokio::main]
681 /// async fn main() -> io::Result<()> {
682 /// let dir = tempfile::tempdir().unwrap();
683 /// let client_path = dir.path().join("client.sock");
684 /// let server_path = dir.path().join("server.sock");
685 /// let socket = UnixDatagram::bind(&client_path)?;
686 ///
687 /// loop {
688 /// // Wait for the socket to be writable
689 /// socket.writable().await?;
690 ///
691 /// // Try to send data, this may still fail with `WouldBlock`
692 /// // if the readiness event is a false positive.
693 /// match socket.try_send_to(b"hello world", &server_path) {
694 /// Ok(n) => {
695 /// break;
696 /// }
697 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
698 /// continue;
699 /// }
700 /// Err(e) => {
701 /// return Err(e);
702 /// }
703 /// }
704 /// }
705 ///
706 /// Ok(())
707 /// }
708 /// ```
709 pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
710 where
711 P: AsRef<Path>,
712 {
713 self.io
714 .registration()
715 .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
716 }
717
718 /// Receives data from the socket.
719 ///
720 /// # Cancel safety
721 ///
722 /// This method is cancel safe. If `recv` is used as the event in a
723 /// [`tokio::select!`](crate::select) statement and some other branch
724 /// completes first, it is guaranteed that no messages were received on this
725 /// socket.
726 ///
727 /// # Examples
728 /// ```
729 /// # use std::error::Error;
730 /// # #[tokio::main]
731 /// # async fn main() -> Result<(), Box<dyn Error>> {
732 /// use tokio::net::UnixDatagram;
733 ///
734 /// // Create the pair of sockets
735 /// let (sock1, sock2) = UnixDatagram::pair()?;
736 ///
737 /// // Since the sockets are paired, the paired send/recv
738 /// // functions can be used
739 /// let bytes = b"hello world";
740 /// sock1.send(bytes).await?;
741 ///
742 /// let mut buff = vec![0u8; 24];
743 /// let size = sock2.recv(&mut buff).await?;
744 ///
745 /// let dgram = &buff[..size];
746 /// assert_eq!(dgram, bytes);
747 ///
748 /// # Ok(())
749 /// # }
750 /// ```
751 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
752 self.io
753 .registration()
754 .async_io(Interest::READABLE, || self.io.recv(buf))
755 .await
756 }
757
758 /// Tries to receive a datagram from the peer without waiting.
759 ///
760 /// # Examples
761 ///
762 /// ```no_run
763 /// use tokio::net::UnixDatagram;
764 /// use std::io;
765 ///
766 /// #[tokio::main]
767 /// async fn main() -> io::Result<()> {
768 /// // Connect to a peer
769 /// let dir = tempfile::tempdir().unwrap();
770 /// let client_path = dir.path().join("client.sock");
771 /// let server_path = dir.path().join("server.sock");
772 /// let socket = UnixDatagram::bind(&client_path)?;
773 /// socket.connect(&server_path)?;
774 ///
775 /// loop {
776 /// // Wait for the socket to be readable
777 /// socket.readable().await?;
778 ///
779 /// // The buffer is **not** included in the async task and will
780 /// // only exist on the stack.
781 /// let mut buf = [0; 1024];
782 ///
783 /// // Try to recv data, this may still fail with `WouldBlock`
784 /// // if the readiness event is a false positive.
785 /// match socket.try_recv(&mut buf) {
786 /// Ok(n) => {
787 /// println!("GOT {:?}", &buf[..n]);
788 /// break;
789 /// }
790 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
791 /// continue;
792 /// }
793 /// Err(e) => {
794 /// return Err(e);
795 /// }
796 /// }
797 /// }
798 ///
799 /// Ok(())
800 /// }
801 /// ```
802 pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
803 self.io
804 .registration()
805 .try_io(Interest::READABLE, || self.io.recv(buf))
806 }
807
808 cfg_io_util! {
809 /// Tries to receive data from the socket without waiting.
810 ///
811 /// This method can be used even if `buf` is uninitialized.
812 ///
813 /// # Examples
814 ///
815 /// ```no_run
816 /// use tokio::net::UnixDatagram;
817 /// use std::io;
818 ///
819 /// #[tokio::main]
820 /// async fn main() -> io::Result<()> {
821 /// // Connect to a peer
822 /// let dir = tempfile::tempdir().unwrap();
823 /// let client_path = dir.path().join("client.sock");
824 /// let server_path = dir.path().join("server.sock");
825 /// let socket = UnixDatagram::bind(&client_path)?;
826 ///
827 /// loop {
828 /// // Wait for the socket to be readable
829 /// socket.readable().await?;
830 ///
831 /// let mut buf = Vec::with_capacity(1024);
832 ///
833 /// // Try to recv data, this may still fail with `WouldBlock`
834 /// // if the readiness event is a false positive.
835 /// match socket.try_recv_buf_from(&mut buf) {
836 /// Ok((n, _addr)) => {
837 /// println!("GOT {:?}", &buf[..n]);
838 /// break;
839 /// }
840 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
841 /// continue;
842 /// }
843 /// Err(e) => {
844 /// return Err(e);
845 /// }
846 /// }
847 /// }
848 ///
849 /// Ok(())
850 /// }
851 /// ```
852 pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
853 let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
854 let dst = buf.chunk_mut();
855 let dst =
856 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
857
858 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
859 // buffer.
860 let (n, addr) = (*self.io).recv_from(dst)?;
861
862 unsafe {
863 buf.advance_mut(n);
864 }
865
866 Ok((n, addr))
867 })?;
868
869 Ok((n, SocketAddr(addr)))
870 }
871
872 /// Receives from the socket, advances the
873 /// buffer's internal cursor and returns how many bytes were read and the origin.
874 ///
875 /// This method can be used even if `buf` is uninitialized.
876 ///
877 /// # Examples
878 /// ```
879 /// # use std::error::Error;
880 /// # #[tokio::main]
881 /// # async fn main() -> Result<(), Box<dyn Error>> {
882 /// use tokio::net::UnixDatagram;
883 /// use tempfile::tempdir;
884 ///
885 /// // We use a temporary directory so that the socket
886 /// // files left by the bound sockets will get cleaned up.
887 /// let tmp = tempdir()?;
888 ///
889 /// // Bind each socket to a filesystem path
890 /// let tx_path = tmp.path().join("tx");
891 /// let tx = UnixDatagram::bind(&tx_path)?;
892 /// let rx_path = tmp.path().join("rx");
893 /// let rx = UnixDatagram::bind(&rx_path)?;
894 ///
895 /// let bytes = b"hello world";
896 /// tx.send_to(bytes, &rx_path).await?;
897 ///
898 /// let mut buf = Vec::with_capacity(24);
899 /// let (size, addr) = rx.recv_buf_from(&mut buf).await?;
900 ///
901 /// let dgram = &buf[..size];
902 /// assert_eq!(dgram, bytes);
903 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
904 ///
905 /// # Ok(())
906 /// # }
907 /// ```
908 pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
909 self.io.registration().async_io(Interest::READABLE, || {
910 let dst = buf.chunk_mut();
911 let dst =
912 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
913
914 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
915 // buffer.
916 let (n, addr) = (*self.io).recv_from(dst)?;
917
918 unsafe {
919 buf.advance_mut(n);
920 }
921 Ok((n,SocketAddr(addr)))
922 }).await
923 }
924
925 /// Tries to read data from the stream into the provided buffer, advancing the
926 /// buffer's internal cursor, returning how many bytes were read.
927 ///
928 /// This method can be used even if `buf` is uninitialized.
929 ///
930 /// # Examples
931 ///
932 /// ```no_run
933 /// use tokio::net::UnixDatagram;
934 /// use std::io;
935 ///
936 /// #[tokio::main]
937 /// async fn main() -> io::Result<()> {
938 /// // Connect to a peer
939 /// let dir = tempfile::tempdir().unwrap();
940 /// let client_path = dir.path().join("client.sock");
941 /// let server_path = dir.path().join("server.sock");
942 /// let socket = UnixDatagram::bind(&client_path)?;
943 /// socket.connect(&server_path)?;
944 ///
945 /// loop {
946 /// // Wait for the socket to be readable
947 /// socket.readable().await?;
948 ///
949 /// let mut buf = Vec::with_capacity(1024);
950 ///
951 /// // Try to recv data, this may still fail with `WouldBlock`
952 /// // if the readiness event is a false positive.
953 /// match socket.try_recv_buf(&mut buf) {
954 /// Ok(n) => {
955 /// println!("GOT {:?}", &buf[..n]);
956 /// break;
957 /// }
958 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
959 /// continue;
960 /// }
961 /// Err(e) => {
962 /// return Err(e);
963 /// }
964 /// }
965 /// }
966 ///
967 /// Ok(())
968 /// }
969 /// ```
970 pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
971 self.io.registration().try_io(Interest::READABLE, || {
972 let dst = buf.chunk_mut();
973 let dst =
974 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
975
976 // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
977 // buffer.
978 let n = (*self.io).recv(dst)?;
979
980 unsafe {
981 buf.advance_mut(n);
982 }
983
984 Ok(n)
985 })
986 }
987
988 /// Receives data from the socket from the address to which it is connected,
989 /// advancing the buffer's internal cursor, returning how many bytes were read.
990 ///
991 /// This method can be used even if `buf` is uninitialized.
992 ///
993 /// # Examples
994 /// ```
995 /// # use std::error::Error;
996 /// # #[tokio::main]
997 /// # async fn main() -> Result<(), Box<dyn Error>> {
998 /// use tokio::net::UnixDatagram;
999 ///
1000 /// // Create the pair of sockets
1001 /// let (sock1, sock2) = UnixDatagram::pair()?;
1002 ///
1003 /// // Since the sockets are paired, the paired send/recv
1004 /// // functions can be used
1005 /// let bytes = b"hello world";
1006 /// sock1.send(bytes).await?;
1007 ///
1008 /// let mut buff = Vec::with_capacity(24);
1009 /// let size = sock2.recv_buf(&mut buff).await?;
1010 ///
1011 /// let dgram = &buff[..size];
1012 /// assert_eq!(dgram, bytes);
1013 ///
1014 /// # Ok(())
1015 /// # }
1016 /// ```
1017 pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1018 self.io.registration().async_io(Interest::READABLE, || {
1019 let dst = buf.chunk_mut();
1020 let dst =
1021 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1022
1023 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
1024 // buffer.
1025 let n = (*self.io).recv(dst)?;
1026
1027 unsafe {
1028 buf.advance_mut(n);
1029 }
1030 Ok(n)
1031 }).await
1032 }
1033 }
1034
1035 /// Sends data on the socket to the specified address.
1036 ///
1037 /// # Cancel safety
1038 ///
1039 /// This method is cancel safe. If `send_to` is used as the event in a
1040 /// [`tokio::select!`](crate::select) statement and some other branch
1041 /// completes first, then it is guaranteed that the message was not sent.
1042 ///
1043 /// # Examples
1044 /// ```
1045 /// # use std::error::Error;
1046 /// # #[tokio::main]
1047 /// # async fn main() -> Result<(), Box<dyn Error>> {
1048 /// use tokio::net::UnixDatagram;
1049 /// use tempfile::tempdir;
1050 ///
1051 /// // We use a temporary directory so that the socket
1052 /// // files left by the bound sockets will get cleaned up.
1053 /// let tmp = tempdir()?;
1054 ///
1055 /// // Bind each socket to a filesystem path
1056 /// let tx_path = tmp.path().join("tx");
1057 /// let tx = UnixDatagram::bind(&tx_path)?;
1058 /// let rx_path = tmp.path().join("rx");
1059 /// let rx = UnixDatagram::bind(&rx_path)?;
1060 ///
1061 /// let bytes = b"hello world";
1062 /// tx.send_to(bytes, &rx_path).await?;
1063 ///
1064 /// let mut buf = vec![0u8; 24];
1065 /// let (size, addr) = rx.recv_from(&mut buf).await?;
1066 ///
1067 /// let dgram = &buf[..size];
1068 /// assert_eq!(dgram, bytes);
1069 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1070 ///
1071 /// # Ok(())
1072 /// # }
1073 /// ```
1074 pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
1075 where
1076 P: AsRef<Path>,
1077 {
1078 self.io
1079 .registration()
1080 .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref()))
1081 .await
1082 }
1083
1084 /// Receives data from the socket.
1085 ///
1086 /// # Cancel safety
1087 ///
1088 /// This method is cancel safe. If `recv_from` is used as the event in a
1089 /// [`tokio::select!`](crate::select) statement and some other branch
1090 /// completes first, it is guaranteed that no messages were received on this
1091 /// socket.
1092 ///
1093 /// # Examples
1094 /// ```
1095 /// # use std::error::Error;
1096 /// # #[tokio::main]
1097 /// # async fn main() -> Result<(), Box<dyn Error>> {
1098 /// use tokio::net::UnixDatagram;
1099 /// use tempfile::tempdir;
1100 ///
1101 /// // We use a temporary directory so that the socket
1102 /// // files left by the bound sockets will get cleaned up.
1103 /// let tmp = tempdir()?;
1104 ///
1105 /// // Bind each socket to a filesystem path
1106 /// let tx_path = tmp.path().join("tx");
1107 /// let tx = UnixDatagram::bind(&tx_path)?;
1108 /// let rx_path = tmp.path().join("rx");
1109 /// let rx = UnixDatagram::bind(&rx_path)?;
1110 ///
1111 /// let bytes = b"hello world";
1112 /// tx.send_to(bytes, &rx_path).await?;
1113 ///
1114 /// let mut buf = vec![0u8; 24];
1115 /// let (size, addr) = rx.recv_from(&mut buf).await?;
1116 ///
1117 /// let dgram = &buf[..size];
1118 /// assert_eq!(dgram, bytes);
1119 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1120 ///
1121 /// # Ok(())
1122 /// # }
1123 /// ```
1124 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1125 let (n, addr) = self
1126 .io
1127 .registration()
1128 .async_io(Interest::READABLE, || self.io.recv_from(buf))
1129 .await?;
1130
1131 Ok((n, SocketAddr(addr)))
1132 }
1133
1134 /// Attempts to receive a single datagram on the specified address.
1135 ///
1136 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
1137 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1138 /// receive a wakeup.
1139 ///
1140 /// # Return value
1141 ///
1142 /// The function returns:
1143 ///
1144 /// * `Poll::Pending` if the socket is not ready to read
1145 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1146 /// * `Poll::Ready(Err(e))` if an error is encountered.
1147 ///
1148 /// # Errors
1149 ///
1150 /// This function may encounter any standard I/O error except `WouldBlock`.
1151 pub fn poll_recv_from(
1152 &self,
1153 cx: &mut Context<'_>,
1154 buf: &mut ReadBuf<'_>,
1155 ) -> Poll<io::Result<SocketAddr>> {
1156 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1157 // Safety: will not read the maybe uninitialized bytes.
1158 let b = unsafe {
1159 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1160 };
1161
1162 self.io.recv_from(b)
1163 }))?;
1164
1165 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1166 unsafe {
1167 buf.assume_init(n);
1168 }
1169 buf.advance(n);
1170 Poll::Ready(Ok(SocketAddr(addr)))
1171 }
1172
1173 /// Attempts to send data to the specified address.
1174 ///
1175 /// Note that on multiple calls to a `poll_*` method in the send direction, only the
1176 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1177 /// receive a wakeup.
1178 ///
1179 /// # Return value
1180 ///
1181 /// The function returns:
1182 ///
1183 /// * `Poll::Pending` if the socket is not ready to write
1184 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
1185 /// * `Poll::Ready(Err(e))` if an error is encountered.
1186 ///
1187 /// # Errors
1188 ///
1189 /// This function may encounter any standard I/O error except `WouldBlock`.
1190 pub fn poll_send_to<P>(
1191 &self,
1192 cx: &mut Context<'_>,
1193 buf: &[u8],
1194 target: P,
1195 ) -> Poll<io::Result<usize>>
1196 where
1197 P: AsRef<Path>,
1198 {
1199 self.io
1200 .registration()
1201 .poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
1202 }
1203
1204 /// Attempts to send data on the socket to the remote address to which it
1205 /// was previously `connect`ed.
1206 ///
1207 /// The [`connect`] method will connect this socket to a remote address.
1208 /// This method will fail if the socket is not connected.
1209 ///
1210 /// Note that on multiple calls to a `poll_*` method in the send direction,
1211 /// only the `Waker` from the `Context` passed to the most recent call will
1212 /// be scheduled to receive a wakeup.
1213 ///
1214 /// # Return value
1215 ///
1216 /// The function returns:
1217 ///
1218 /// * `Poll::Pending` if the socket is not available to write
1219 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
1220 /// * `Poll::Ready(Err(e))` if an error is encountered.
1221 ///
1222 /// # Errors
1223 ///
1224 /// This function may encounter any standard I/O error except `WouldBlock`.
1225 ///
1226 /// [`connect`]: method@Self::connect
1227 pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
1228 self.io
1229 .registration()
1230 .poll_write_io(cx, || self.io.send(buf))
1231 }
1232
1233 /// Attempts to receive a single datagram message on the socket from the remote
1234 /// address to which it is `connect`ed.
1235 ///
1236 /// The [`connect`] method will connect this socket to a remote address. This method
1237 /// resolves to an error if the socket is not connected.
1238 ///
1239 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
1240 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1241 /// receive a wakeup.
1242 ///
1243 /// # Return value
1244 ///
1245 /// The function returns:
1246 ///
1247 /// * `Poll::Pending` if the socket is not ready to read
1248 /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
1249 /// * `Poll::Ready(Err(e))` if an error is encountered.
1250 ///
1251 /// # Errors
1252 ///
1253 /// This function may encounter any standard I/O error except `WouldBlock`.
1254 ///
1255 /// [`connect`]: method@Self::connect
1256 pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
1257 let n = ready!(self.io.registration().poll_read_io(cx, || {
1258 // Safety: will not read the maybe uninitialized bytes.
1259 let b = unsafe {
1260 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1261 };
1262
1263 self.io.recv(b)
1264 }))?;
1265
1266 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1267 unsafe {
1268 buf.assume_init(n);
1269 }
1270 buf.advance(n);
1271 Poll::Ready(Ok(()))
1272 }
1273
1274 /// Tries to receive data from the socket without waiting.
1275 ///
1276 /// # Examples
1277 ///
1278 /// ```no_run
1279 /// use tokio::net::UnixDatagram;
1280 /// use std::io;
1281 ///
1282 /// #[tokio::main]
1283 /// async fn main() -> io::Result<()> {
1284 /// // Connect to a peer
1285 /// let dir = tempfile::tempdir().unwrap();
1286 /// let client_path = dir.path().join("client.sock");
1287 /// let server_path = dir.path().join("server.sock");
1288 /// let socket = UnixDatagram::bind(&client_path)?;
1289 ///
1290 /// loop {
1291 /// // Wait for the socket to be readable
1292 /// socket.readable().await?;
1293 ///
1294 /// // The buffer is **not** included in the async task and will
1295 /// // only exist on the stack.
1296 /// let mut buf = [0; 1024];
1297 ///
1298 /// // Try to recv data, this may still fail with `WouldBlock`
1299 /// // if the readiness event is a false positive.
1300 /// match socket.try_recv_from(&mut buf) {
1301 /// Ok((n, _addr)) => {
1302 /// println!("GOT {:?}", &buf[..n]);
1303 /// break;
1304 /// }
1305 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1306 /// continue;
1307 /// }
1308 /// Err(e) => {
1309 /// return Err(e);
1310 /// }
1311 /// }
1312 /// }
1313 ///
1314 /// Ok(())
1315 /// }
1316 /// ```
1317 pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1318 let (n, addr) = self
1319 .io
1320 .registration()
1321 .try_io(Interest::READABLE, || self.io.recv_from(buf))?;
1322
1323 Ok((n, SocketAddr(addr)))
1324 }
1325
1326 /// Tries to read or write from the socket using a user-provided IO operation.
1327 ///
1328 /// If the socket is ready, the provided closure is called. The closure
1329 /// should attempt to perform IO operation on the socket by manually
1330 /// calling the appropriate syscall. If the operation fails because the
1331 /// socket is not actually ready, then the closure should return a
1332 /// `WouldBlock` error and the readiness flag is cleared. The return value
1333 /// of the closure is then returned by `try_io`.
1334 ///
1335 /// If the socket is not ready, then the closure is not called
1336 /// and a `WouldBlock` error is returned.
1337 ///
1338 /// The closure should only return a `WouldBlock` error if it has performed
1339 /// an IO operation on the socket that failed due to the socket not being
1340 /// ready. Returning a `WouldBlock` error in any other situation will
1341 /// incorrectly clear the readiness flag, which can cause the socket to
1342 /// behave incorrectly.
1343 ///
1344 /// The closure should not perform the IO operation using any of the methods
1345 /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1346 /// readiness flag and can cause the socket to behave incorrectly.
1347 ///
1348 /// This method is not intended to be used with combined interests.
1349 /// The closure should perform only one type of IO operation, so it should not
1350 /// require more than one ready state. This method may panic or sleep forever
1351 /// if it is called with a combined interest.
1352 ///
1353 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1354 ///
1355 /// [`readable()`]: UnixDatagram::readable()
1356 /// [`writable()`]: UnixDatagram::writable()
1357 /// [`ready()`]: UnixDatagram::ready()
1358 pub fn try_io<R>(
1359 &self,
1360 interest: Interest,
1361 f: impl FnOnce() -> io::Result<R>,
1362 ) -> io::Result<R> {
1363 self.io
1364 .registration()
1365 .try_io(interest, || self.io.try_io(f))
1366 }
1367
1368 /// Reads or writes from the socket using a user-provided IO operation.
1369 ///
1370 /// The readiness of the socket is awaited and when the socket is ready,
1371 /// the provided closure is called. The closure should attempt to perform
1372 /// IO operation on the socket by manually calling the appropriate syscall.
1373 /// If the operation fails because the socket is not actually ready,
1374 /// then the closure should return a `WouldBlock` error. In such case the
1375 /// readiness flag is cleared and the socket readiness is awaited again.
1376 /// This loop is repeated until the closure returns an `Ok` or an error
1377 /// other than `WouldBlock`.
1378 ///
1379 /// The closure should only return a `WouldBlock` error if it has performed
1380 /// an IO operation on the socket that failed due to the socket not being
1381 /// ready. Returning a `WouldBlock` error in any other situation will
1382 /// incorrectly clear the readiness flag, which can cause the socket to
1383 /// behave incorrectly.
1384 ///
1385 /// The closure should not perform the IO operation using any of the methods
1386 /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1387 /// readiness flag and can cause the socket to behave incorrectly.
1388 ///
1389 /// This method is not intended to be used with combined interests.
1390 /// The closure should perform only one type of IO operation, so it should not
1391 /// require more than one ready state. This method may panic or sleep forever
1392 /// if it is called with a combined interest.
1393 pub async fn async_io<R>(
1394 &self,
1395 interest: Interest,
1396 mut f: impl FnMut() -> io::Result<R>,
1397 ) -> io::Result<R> {
1398 self.io
1399 .registration()
1400 .async_io(interest, || self.io.try_io(&mut f))
1401 .await
1402 }
1403
1404 /// Returns the local address that this socket is bound to.
1405 ///
1406 /// # Examples
1407 /// For a socket bound to a local path
1408 /// ```
1409 /// # use std::error::Error;
1410 /// # #[tokio::main]
1411 /// # async fn main() -> Result<(), Box<dyn Error>> {
1412 /// use tokio::net::UnixDatagram;
1413 /// use tempfile::tempdir;
1414 ///
1415 /// // We use a temporary directory so that the socket
1416 /// // files left by the bound sockets will get cleaned up.
1417 /// let tmp = tempdir()?;
1418 ///
1419 /// // Bind socket to a filesystem path
1420 /// let socket_path = tmp.path().join("socket");
1421 /// let socket = UnixDatagram::bind(&socket_path)?;
1422 ///
1423 /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path);
1424 ///
1425 /// # Ok(())
1426 /// # }
1427 /// ```
1428 ///
1429 /// For an unbound socket
1430 /// ```
1431 /// # use std::error::Error;
1432 /// # #[tokio::main]
1433 /// # async fn main() -> Result<(), Box<dyn Error>> {
1434 /// use tokio::net::UnixDatagram;
1435 ///
1436 /// // Create an unbound socket
1437 /// let socket = UnixDatagram::unbound()?;
1438 ///
1439 /// assert!(socket.local_addr()?.is_unnamed());
1440 ///
1441 /// # Ok(())
1442 /// # }
1443 /// ```
1444 pub fn local_addr(&self) -> io::Result<SocketAddr> {
1445 self.io.local_addr().map(SocketAddr)
1446 }
1447
1448 /// Returns the address of this socket's peer.
1449 ///
1450 /// The `connect` method will connect the socket to a peer.
1451 ///
1452 /// # Examples
1453 /// For a peer with a local path
1454 /// ```
1455 /// # use std::error::Error;
1456 /// # #[tokio::main]
1457 /// # async fn main() -> Result<(), Box<dyn Error>> {
1458 /// use tokio::net::UnixDatagram;
1459 /// use tempfile::tempdir;
1460 ///
1461 /// // Create an unbound socket
1462 /// let tx = UnixDatagram::unbound()?;
1463 ///
1464 /// // Create another, bound socket
1465 /// let tmp = tempdir()?;
1466 /// let rx_path = tmp.path().join("rx");
1467 /// let rx = UnixDatagram::bind(&rx_path)?;
1468 ///
1469 /// // Connect to the bound socket
1470 /// tx.connect(&rx_path)?;
1471 ///
1472 /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path);
1473 ///
1474 /// # Ok(())
1475 /// # }
1476 /// ```
1477 ///
1478 /// For an unbound peer
1479 /// ```
1480 /// # use std::error::Error;
1481 /// # #[tokio::main]
1482 /// # async fn main() -> Result<(), Box<dyn Error>> {
1483 /// use tokio::net::UnixDatagram;
1484 ///
1485 /// // Create the pair of sockets
1486 /// let (sock1, sock2) = UnixDatagram::pair()?;
1487 ///
1488 /// assert!(sock1.peer_addr()?.is_unnamed());
1489 ///
1490 /// # Ok(())
1491 /// # }
1492 /// ```
1493 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
1494 self.io.peer_addr().map(SocketAddr)
1495 }
1496
1497 /// Returns the value of the `SO_ERROR` option.
1498 ///
1499 /// # Examples
1500 /// ```
1501 /// # use std::error::Error;
1502 /// # #[tokio::main]
1503 /// # async fn main() -> Result<(), Box<dyn Error>> {
1504 /// use tokio::net::UnixDatagram;
1505 ///
1506 /// // Create an unbound socket
1507 /// let socket = UnixDatagram::unbound()?;
1508 ///
1509 /// if let Ok(Some(err)) = socket.take_error() {
1510 /// println!("Got error: {:?}", err);
1511 /// }
1512 ///
1513 /// # Ok(())
1514 /// # }
1515 /// ```
1516 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1517 self.io.take_error()
1518 }
1519
1520 /// Shuts down the read, write, or both halves of this connection.
1521 ///
1522 /// This function will cause all pending and future I/O calls on the
1523 /// specified portions to immediately return with an appropriate value
1524 /// (see the documentation of `Shutdown`).
1525 ///
1526 /// # Examples
1527 /// ```
1528 /// # use std::error::Error;
1529 /// # #[tokio::main]
1530 /// # async fn main() -> Result<(), Box<dyn Error>> {
1531 /// use tokio::net::UnixDatagram;
1532 /// use std::net::Shutdown;
1533 ///
1534 /// // Create an unbound socket
1535 /// let (socket, other) = UnixDatagram::pair()?;
1536 ///
1537 /// socket.shutdown(Shutdown::Both)?;
1538 ///
1539 /// // NOTE: the following commented out code does NOT work as expected.
1540 /// // Due to an underlying issue, the recv call will block indefinitely.
1541 /// // See: https://github.com/tokio-rs/tokio/issues/1679
1542 /// //let mut buff = vec![0u8; 24];
1543 /// //let size = socket.recv(&mut buff).await?;
1544 /// //assert_eq!(size, 0);
1545 ///
1546 /// let send_result = socket.send(b"hello world").await;
1547 /// assert!(send_result.is_err());
1548 ///
1549 /// # Ok(())
1550 /// # }
1551 /// ```
1552 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
1553 self.io.shutdown(how)
1554 }
1555}
1556
1557impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
1558 type Error = io::Error;
1559
1560 /// Consumes stream, returning the Tokio I/O object.
1561 ///
1562 /// This is equivalent to
1563 /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
1564 fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> {
1565 Self::from_std(datagram:stream)
1566 }
1567}
1568
1569impl fmt::Debug for UnixDatagram {
1570 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1571 self.io.fmt(f)
1572 }
1573}
1574
1575impl AsRawFd for UnixDatagram {
1576 fn as_raw_fd(&self) -> RawFd {
1577 self.io.as_raw_fd()
1578 }
1579}
1580
1581#[cfg(not(tokio_no_as_fd))]
1582impl AsFd for UnixDatagram {
1583 fn as_fd(&self) -> BorrowedFd<'_> {
1584 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1585 }
1586}
1587