1use crate::io::{Interest, PollEvented, ReadBuf, Ready};
2use crate::net::unix::SocketAddr;
3
4use std::fmt;
5use std::io;
6use std::net::Shutdown;
7use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
8use std::os::unix::net;
9use std::path::Path;
10use std::task::{Context, Poll};
11
12cfg_io_util! {
13 use bytes::BufMut;
14}
15
16cfg_net_unix! {
17 /// An I/O object representing a Unix datagram socket.
18 ///
19 /// A socket can be either named (associated with a filesystem path) or
20 /// unnamed.
21 ///
22 /// This type does not provide a `split` method, because this functionality
23 /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
24 /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>`
25 /// is enough. This is because all of the methods take `&self` instead of
26 /// `&mut self`.
27 ///
28 /// **Note:** named sockets are persisted even after the object is dropped
29 /// and the program has exited, and cannot be reconnected. It is advised
30 /// that you either check for and unlink the existing socket if it exists,
31 /// or use a temporary file that is guaranteed to not already exist.
32 ///
33 /// [`Arc`]: std::sync::Arc
34 ///
35 /// # Examples
36 /// Using named sockets, associated with a filesystem path:
37 /// ```
38 /// # use std::error::Error;
39 /// # #[tokio::main]
40 /// # async fn main() -> Result<(), Box<dyn Error>> {
41 /// use tokio::net::UnixDatagram;
42 /// use tempfile::tempdir;
43 ///
44 /// // We use a temporary directory so that the socket
45 /// // files left by the bound sockets will get cleaned up.
46 /// let tmp = tempdir()?;
47 ///
48 /// // Bind each socket to a filesystem path
49 /// let tx_path = tmp.path().join("tx");
50 /// let tx = UnixDatagram::bind(&tx_path)?;
51 /// let rx_path = tmp.path().join("rx");
52 /// let rx = UnixDatagram::bind(&rx_path)?;
53 ///
54 /// let bytes = b"hello world";
55 /// tx.send_to(bytes, &rx_path).await?;
56 ///
57 /// let mut buf = vec![0u8; 24];
58 /// let (size, addr) = rx.recv_from(&mut buf).await?;
59 ///
60 /// let dgram = &buf[..size];
61 /// assert_eq!(dgram, bytes);
62 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
63 ///
64 /// # Ok(())
65 /// # }
66 /// ```
67 ///
68 /// Using unnamed sockets, created as a pair
69 /// ```
70 /// # use std::error::Error;
71 /// # #[tokio::main]
72 /// # async fn main() -> Result<(), Box<dyn Error>> {
73 /// use tokio::net::UnixDatagram;
74 ///
75 /// // Create the pair of sockets
76 /// let (sock1, sock2) = UnixDatagram::pair()?;
77 ///
78 /// // Since the sockets are paired, the paired send/recv
79 /// // functions can be used
80 /// let bytes = b"hello world";
81 /// sock1.send(bytes).await?;
82 ///
83 /// let mut buff = vec![0u8; 24];
84 /// let size = sock2.recv(&mut buff).await?;
85 ///
86 /// let dgram = &buff[..size];
87 /// assert_eq!(dgram, bytes);
88 ///
89 /// # Ok(())
90 /// # }
91 /// ```
92 #[cfg_attr(docsrs, doc(alias = "uds"))]
93 pub struct UnixDatagram {
94 io: PollEvented<mio::net::UnixDatagram>,
95 }
96}
97
98impl UnixDatagram {
99 /// Waits for any of the requested ready states.
100 ///
101 /// This function is usually paired with `try_recv()` or `try_send()`. It
102 /// can be used to concurrently recv / send to the same socket on a single
103 /// task without splitting the socket.
104 ///
105 /// The function may complete without the socket being ready. This is a
106 /// false-positive and attempting an operation will return with
107 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
108 /// [`Ready`] set, so you should always check the returned value and possibly
109 /// wait again if the requested states are not set.
110 ///
111 /// # Cancel safety
112 ///
113 /// This method is cancel safe. Once a readiness event occurs, the method
114 /// will continue to return immediately until the readiness event is
115 /// consumed by an attempt to read or write that fails with `WouldBlock` or
116 /// `Poll::Pending`.
117 ///
118 /// # Examples
119 ///
120 /// Concurrently receive from and send to the socket on the same task
121 /// without splitting.
122 ///
123 /// ```no_run
124 /// use tokio::io::Interest;
125 /// use tokio::net::UnixDatagram;
126 /// use std::io;
127 ///
128 /// #[tokio::main]
129 /// async fn main() -> io::Result<()> {
130 /// let dir = tempfile::tempdir().unwrap();
131 /// let client_path = dir.path().join("client.sock");
132 /// let server_path = dir.path().join("server.sock");
133 /// let socket = UnixDatagram::bind(&client_path)?;
134 /// socket.connect(&server_path)?;
135 ///
136 /// loop {
137 /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
138 ///
139 /// if ready.is_readable() {
140 /// let mut data = [0; 1024];
141 /// match socket.try_recv(&mut data[..]) {
142 /// Ok(n) => {
143 /// println!("received {:?}", &data[..n]);
144 /// }
145 /// // False-positive, continue
146 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
147 /// Err(e) => {
148 /// return Err(e);
149 /// }
150 /// }
151 /// }
152 ///
153 /// if ready.is_writable() {
154 /// // Write some data
155 /// match socket.try_send(b"hello world") {
156 /// Ok(n) => {
157 /// println!("sent {} bytes", n);
158 /// }
159 /// // False-positive, continue
160 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
161 /// Err(e) => {
162 /// return Err(e);
163 /// }
164 /// }
165 /// }
166 /// }
167 /// }
168 /// ```
169 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
170 let event = self.io.registration().readiness(interest).await?;
171 Ok(event.ready)
172 }
173
174 /// Waits for the socket to become writable.
175 ///
176 /// This function is equivalent to `ready(Interest::WRITABLE)` and is
177 /// usually paired with `try_send()` or `try_send_to()`.
178 ///
179 /// The function may complete without the socket being writable. This is a
180 /// false-positive and attempting a `try_send()` will return with
181 /// `io::ErrorKind::WouldBlock`.
182 ///
183 /// # Cancel safety
184 ///
185 /// This method is cancel safe. Once a readiness event occurs, the method
186 /// will continue to return immediately until the readiness event is
187 /// consumed by an attempt to write that fails with `WouldBlock` or
188 /// `Poll::Pending`.
189 ///
190 /// # Examples
191 ///
192 /// ```no_run
193 /// use tokio::net::UnixDatagram;
194 /// use std::io;
195 ///
196 /// #[tokio::main]
197 /// async fn main() -> io::Result<()> {
198 /// let dir = tempfile::tempdir().unwrap();
199 /// let client_path = dir.path().join("client.sock");
200 /// let server_path = dir.path().join("server.sock");
201 /// let socket = UnixDatagram::bind(&client_path)?;
202 /// socket.connect(&server_path)?;
203 ///
204 /// loop {
205 /// // Wait for the socket to be writable
206 /// socket.writable().await?;
207 ///
208 /// // Try to send data, this may still fail with `WouldBlock`
209 /// // if the readiness event is a false positive.
210 /// match socket.try_send(b"hello world") {
211 /// Ok(n) => {
212 /// break;
213 /// }
214 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
215 /// continue;
216 /// }
217 /// Err(e) => {
218 /// return Err(e);
219 /// }
220 /// }
221 /// }
222 ///
223 /// Ok(())
224 /// }
225 /// ```
226 pub async fn writable(&self) -> io::Result<()> {
227 self.ready(Interest::WRITABLE).await?;
228 Ok(())
229 }
230
231 /// Polls for write/send readiness.
232 ///
233 /// If the socket is not currently ready for sending, this method will
234 /// store a clone of the `Waker` from the provided `Context`. When the socket
235 /// becomes ready for sending, `Waker::wake` will be called on the
236 /// waker.
237 ///
238 /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
239 /// the `Waker` from the `Context` passed to the most recent call is
240 /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
241 /// second, independent waker.)
242 ///
243 /// This function is intended for cases where creating and pinning a future
244 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
245 /// preferred, as this supports polling from multiple tasks at once.
246 ///
247 /// # Return value
248 ///
249 /// The function returns:
250 ///
251 /// * `Poll::Pending` if the socket is not ready for writing.
252 /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
253 /// * `Poll::Ready(Err(e))` if an error is encountered.
254 ///
255 /// # Errors
256 ///
257 /// This function may encounter any standard I/O error except `WouldBlock`.
258 ///
259 /// [`writable`]: method@Self::writable
260 pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
261 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
262 }
263
264 /// Waits for the socket to become readable.
265 ///
266 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
267 /// paired with `try_recv()`.
268 ///
269 /// The function may complete without the socket being readable. This is a
270 /// false-positive and attempting a `try_recv()` will return with
271 /// `io::ErrorKind::WouldBlock`.
272 ///
273 /// # Cancel safety
274 ///
275 /// This method is cancel safe. Once a readiness event occurs, the method
276 /// will continue to return immediately until the readiness event is
277 /// consumed by an attempt to read that fails with `WouldBlock` or
278 /// `Poll::Pending`.
279 ///
280 /// # Examples
281 ///
282 /// ```no_run
283 /// use tokio::net::UnixDatagram;
284 /// use std::io;
285 ///
286 /// #[tokio::main]
287 /// async fn main() -> io::Result<()> {
288 /// // Connect to a peer
289 /// let dir = tempfile::tempdir().unwrap();
290 /// let client_path = dir.path().join("client.sock");
291 /// let server_path = dir.path().join("server.sock");
292 /// let socket = UnixDatagram::bind(&client_path)?;
293 /// socket.connect(&server_path)?;
294 ///
295 /// loop {
296 /// // Wait for the socket to be readable
297 /// socket.readable().await?;
298 ///
299 /// // The buffer is **not** included in the async task and will
300 /// // only exist on the stack.
301 /// let mut buf = [0; 1024];
302 ///
303 /// // Try to recv data, this may still fail with `WouldBlock`
304 /// // if the readiness event is a false positive.
305 /// match socket.try_recv(&mut buf) {
306 /// Ok(n) => {
307 /// println!("GOT {:?}", &buf[..n]);
308 /// break;
309 /// }
310 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
311 /// continue;
312 /// }
313 /// Err(e) => {
314 /// return Err(e);
315 /// }
316 /// }
317 /// }
318 ///
319 /// Ok(())
320 /// }
321 /// ```
322 pub async fn readable(&self) -> io::Result<()> {
323 self.ready(Interest::READABLE).await?;
324 Ok(())
325 }
326
327 /// Polls for read/receive readiness.
328 ///
329 /// If the socket is not currently ready for receiving, this method will
330 /// store a clone of the `Waker` from the provided `Context`. When the
331 /// socket becomes ready for reading, `Waker::wake` will be called on the
332 /// waker.
333 ///
334 /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
335 /// `poll_peek`, only the `Waker` from the `Context` passed to the most
336 /// recent call is scheduled to receive a wakeup. (However,
337 /// `poll_send_ready` retains a second, independent waker.)
338 ///
339 /// This function is intended for cases where creating and pinning a future
340 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
341 /// preferred, as this supports polling from multiple tasks at once.
342 ///
343 /// # Return value
344 ///
345 /// The function returns:
346 ///
347 /// * `Poll::Pending` if the socket is not ready for reading.
348 /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
349 /// * `Poll::Ready(Err(e))` if an error is encountered.
350 ///
351 /// # Errors
352 ///
353 /// This function may encounter any standard I/O error except `WouldBlock`.
354 ///
355 /// [`readable`]: method@Self::readable
356 pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
357 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
358 }
359
360 /// Creates a new `UnixDatagram` bound to the specified path.
361 ///
362 /// # Examples
363 /// ```
364 /// # use std::error::Error;
365 /// # #[tokio::main]
366 /// # async fn main() -> Result<(), Box<dyn Error>> {
367 /// use tokio::net::UnixDatagram;
368 /// use tempfile::tempdir;
369 ///
370 /// // We use a temporary directory so that the socket
371 /// // files left by the bound sockets will get cleaned up.
372 /// let tmp = tempdir()?;
373 ///
374 /// // Bind the socket to a filesystem path
375 /// let socket_path = tmp.path().join("socket");
376 /// let socket = UnixDatagram::bind(&socket_path)?;
377 ///
378 /// # Ok(())
379 /// # }
380 /// ```
381 pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
382 where
383 P: AsRef<Path>,
384 {
385 let socket = mio::net::UnixDatagram::bind(path)?;
386 UnixDatagram::new(socket)
387 }
388
389 /// Creates an unnamed pair of connected sockets.
390 ///
391 /// This function will create a pair of interconnected Unix sockets for
392 /// communicating back and forth between one another.
393 ///
394 /// # Examples
395 /// ```
396 /// # use std::error::Error;
397 /// # #[tokio::main]
398 /// # async fn main() -> Result<(), Box<dyn Error>> {
399 /// use tokio::net::UnixDatagram;
400 ///
401 /// // Create the pair of sockets
402 /// let (sock1, sock2) = UnixDatagram::pair()?;
403 ///
404 /// // Since the sockets are paired, the paired send/recv
405 /// // functions can be used
406 /// let bytes = b"hail eris";
407 /// sock1.send(bytes).await?;
408 ///
409 /// let mut buff = vec![0u8; 24];
410 /// let size = sock2.recv(&mut buff).await?;
411 ///
412 /// let dgram = &buff[..size];
413 /// assert_eq!(dgram, bytes);
414 ///
415 /// # Ok(())
416 /// # }
417 /// ```
418 pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
419 let (a, b) = mio::net::UnixDatagram::pair()?;
420 let a = UnixDatagram::new(a)?;
421 let b = UnixDatagram::new(b)?;
422
423 Ok((a, b))
424 }
425
426 /// Creates new [`UnixDatagram`] from a [`std::os::unix::net::UnixDatagram`].
427 ///
428 /// This function is intended to be used to wrap a UnixDatagram from the
429 /// standard library in the Tokio equivalent.
430 ///
431 /// # Notes
432 ///
433 /// The caller is responsible for ensuring that the socker is in
434 /// non-blocking mode. Otherwise all I/O operations on the socket
435 /// will block the thread, which will cause unexpected behavior.
436 /// Non-blocking mode can be set using [`set_nonblocking`].
437 ///
438 /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking
439 ///
440 /// # Panics
441 ///
442 /// This function panics if it is not called from within a runtime with
443 /// IO enabled.
444 ///
445 /// The runtime is usually set implicitly when this function is called
446 /// from a future driven by a Tokio runtime, otherwise runtime can be set
447 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
448 /// # Examples
449 /// ```
450 /// # use std::error::Error;
451 /// # #[tokio::main]
452 /// # async fn main() -> Result<(), Box<dyn Error>> {
453 /// use tokio::net::UnixDatagram;
454 /// use std::os::unix::net::UnixDatagram as StdUDS;
455 /// use tempfile::tempdir;
456 ///
457 /// // We use a temporary directory so that the socket
458 /// // files left by the bound sockets will get cleaned up.
459 /// let tmp = tempdir()?;
460 ///
461 /// // Bind the socket to a filesystem path
462 /// let socket_path = tmp.path().join("socket");
463 /// let std_socket = StdUDS::bind(&socket_path)?;
464 /// std_socket.set_nonblocking(true)?;
465 /// let tokio_socket = UnixDatagram::from_std(std_socket)?;
466 ///
467 /// # Ok(())
468 /// # }
469 /// ```
470 #[track_caller]
471 pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
472 let socket = mio::net::UnixDatagram::from_std(datagram);
473 let io = PollEvented::new(socket)?;
474 Ok(UnixDatagram { io })
475 }
476
477 /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
478 ///
479 /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
480 /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
481 /// if needed.
482 ///
483 /// # Examples
484 ///
485 /// ```rust,no_run
486 /// # use std::error::Error;
487 /// # async fn dox() -> Result<(), Box<dyn Error>> {
488 /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?;
489 /// let std_socket = tokio_socket.into_std()?;
490 /// std_socket.set_nonblocking(false)?;
491 /// # Ok(())
492 /// # }
493 /// ```
494 ///
495 /// [`tokio::net::UnixDatagram`]: UnixDatagram
496 /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram
497 /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking
498 pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> {
499 self.io
500 .into_inner()
501 .map(IntoRawFd::into_raw_fd)
502 .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) })
503 }
504
505 fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
506 let io = PollEvented::new(socket)?;
507 Ok(UnixDatagram { io })
508 }
509
510 /// Creates a new `UnixDatagram` which is not bound to any address.
511 ///
512 /// # Examples
513 /// ```
514 /// # use std::error::Error;
515 /// # #[tokio::main]
516 /// # async fn main() -> Result<(), Box<dyn Error>> {
517 /// use tokio::net::UnixDatagram;
518 /// use tempfile::tempdir;
519 ///
520 /// // Create an unbound socket
521 /// let tx = UnixDatagram::unbound()?;
522 ///
523 /// // Create another, bound socket
524 /// let tmp = tempdir()?;
525 /// let rx_path = tmp.path().join("rx");
526 /// let rx = UnixDatagram::bind(&rx_path)?;
527 ///
528 /// // Send to the bound socket
529 /// let bytes = b"hello world";
530 /// tx.send_to(bytes, &rx_path).await?;
531 ///
532 /// let mut buf = vec![0u8; 24];
533 /// let (size, addr) = rx.recv_from(&mut buf).await?;
534 ///
535 /// let dgram = &buf[..size];
536 /// assert_eq!(dgram, bytes);
537 ///
538 /// # Ok(())
539 /// # }
540 /// ```
541 pub fn unbound() -> io::Result<UnixDatagram> {
542 let socket = mio::net::UnixDatagram::unbound()?;
543 UnixDatagram::new(socket)
544 }
545
546 /// Connects the socket to the specified address.
547 ///
548 /// The `send` method may be used to send data to the specified address.
549 /// `recv` and `recv_from` will only receive data from that address.
550 ///
551 /// # Examples
552 /// ```
553 /// # use std::error::Error;
554 /// # #[tokio::main]
555 /// # async fn main() -> Result<(), Box<dyn Error>> {
556 /// use tokio::net::UnixDatagram;
557 /// use tempfile::tempdir;
558 ///
559 /// // Create an unbound socket
560 /// let tx = UnixDatagram::unbound()?;
561 ///
562 /// // Create another, bound socket
563 /// let tmp = tempdir()?;
564 /// let rx_path = tmp.path().join("rx");
565 /// let rx = UnixDatagram::bind(&rx_path)?;
566 ///
567 /// // Connect to the bound socket
568 /// tx.connect(&rx_path)?;
569 ///
570 /// // Send to the bound socket
571 /// let bytes = b"hello world";
572 /// tx.send(bytes).await?;
573 ///
574 /// let mut buf = vec![0u8; 24];
575 /// let (size, addr) = rx.recv_from(&mut buf).await?;
576 ///
577 /// let dgram = &buf[..size];
578 /// assert_eq!(dgram, bytes);
579 ///
580 /// # Ok(())
581 /// # }
582 /// ```
583 pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
584 self.io.connect(path)
585 }
586
587 /// Sends data on the socket to the socket's peer.
588 ///
589 /// # Cancel safety
590 ///
591 /// This method is cancel safe. If `send` is used as the event in a
592 /// [`tokio::select!`](crate::select) statement and some other branch
593 /// completes first, then it is guaranteed that the message was not sent.
594 ///
595 /// # Examples
596 /// ```
597 /// # use std::error::Error;
598 /// # #[tokio::main]
599 /// # async fn main() -> Result<(), Box<dyn Error>> {
600 /// use tokio::net::UnixDatagram;
601 ///
602 /// // Create the pair of sockets
603 /// let (sock1, sock2) = UnixDatagram::pair()?;
604 ///
605 /// // Since the sockets are paired, the paired send/recv
606 /// // functions can be used
607 /// let bytes = b"hello world";
608 /// sock1.send(bytes).await?;
609 ///
610 /// let mut buff = vec![0u8; 24];
611 /// let size = sock2.recv(&mut buff).await?;
612 ///
613 /// let dgram = &buff[..size];
614 /// assert_eq!(dgram, bytes);
615 ///
616 /// # Ok(())
617 /// # }
618 /// ```
619 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
620 self.io
621 .registration()
622 .async_io(Interest::WRITABLE, || self.io.send(buf))
623 .await
624 }
625
626 /// Tries to send a datagram to the peer without waiting.
627 ///
628 /// # Examples
629 ///
630 /// ```no_run
631 /// use tokio::net::UnixDatagram;
632 /// use std::io;
633 ///
634 /// #[tokio::main]
635 /// async fn main() -> io::Result<()> {
636 /// let dir = tempfile::tempdir().unwrap();
637 /// let client_path = dir.path().join("client.sock");
638 /// let server_path = dir.path().join("server.sock");
639 /// let socket = UnixDatagram::bind(&client_path)?;
640 /// socket.connect(&server_path)?;
641 ///
642 /// loop {
643 /// // Wait for the socket to be writable
644 /// socket.writable().await?;
645 ///
646 /// // Try to send data, this may still fail with `WouldBlock`
647 /// // if the readiness event is a false positive.
648 /// match socket.try_send(b"hello world") {
649 /// Ok(n) => {
650 /// break;
651 /// }
652 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
653 /// continue;
654 /// }
655 /// Err(e) => {
656 /// return Err(e);
657 /// }
658 /// }
659 /// }
660 ///
661 /// Ok(())
662 /// }
663 /// ```
664 pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
665 self.io
666 .registration()
667 .try_io(Interest::WRITABLE, || self.io.send(buf))
668 }
669
670 /// Tries to send a datagram to the peer without waiting.
671 ///
672 /// # Examples
673 ///
674 /// ```no_run
675 /// use tokio::net::UnixDatagram;
676 /// use std::io;
677 ///
678 /// #[tokio::main]
679 /// async fn main() -> io::Result<()> {
680 /// let dir = tempfile::tempdir().unwrap();
681 /// let client_path = dir.path().join("client.sock");
682 /// let server_path = dir.path().join("server.sock");
683 /// let socket = UnixDatagram::bind(&client_path)?;
684 ///
685 /// loop {
686 /// // Wait for the socket to be writable
687 /// socket.writable().await?;
688 ///
689 /// // Try to send data, this may still fail with `WouldBlock`
690 /// // if the readiness event is a false positive.
691 /// match socket.try_send_to(b"hello world", &server_path) {
692 /// Ok(n) => {
693 /// break;
694 /// }
695 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
696 /// continue;
697 /// }
698 /// Err(e) => {
699 /// return Err(e);
700 /// }
701 /// }
702 /// }
703 ///
704 /// Ok(())
705 /// }
706 /// ```
707 pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
708 where
709 P: AsRef<Path>,
710 {
711 self.io
712 .registration()
713 .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
714 }
715
716 /// Receives data from the socket.
717 ///
718 /// # Cancel safety
719 ///
720 /// This method is cancel safe. If `recv` is used as the event in a
721 /// [`tokio::select!`](crate::select) statement and some other branch
722 /// completes first, it is guaranteed that no messages were received on this
723 /// socket.
724 ///
725 /// # Examples
726 /// ```
727 /// # use std::error::Error;
728 /// # #[tokio::main]
729 /// # async fn main() -> Result<(), Box<dyn Error>> {
730 /// use tokio::net::UnixDatagram;
731 ///
732 /// // Create the pair of sockets
733 /// let (sock1, sock2) = UnixDatagram::pair()?;
734 ///
735 /// // Since the sockets are paired, the paired send/recv
736 /// // functions can be used
737 /// let bytes = b"hello world";
738 /// sock1.send(bytes).await?;
739 ///
740 /// let mut buff = vec![0u8; 24];
741 /// let size = sock2.recv(&mut buff).await?;
742 ///
743 /// let dgram = &buff[..size];
744 /// assert_eq!(dgram, bytes);
745 ///
746 /// # Ok(())
747 /// # }
748 /// ```
749 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
750 self.io
751 .registration()
752 .async_io(Interest::READABLE, || self.io.recv(buf))
753 .await
754 }
755
756 /// Tries to receive a datagram from the peer without waiting.
757 ///
758 /// # Examples
759 ///
760 /// ```no_run
761 /// use tokio::net::UnixDatagram;
762 /// use std::io;
763 ///
764 /// #[tokio::main]
765 /// async fn main() -> io::Result<()> {
766 /// // Connect to a peer
767 /// let dir = tempfile::tempdir().unwrap();
768 /// let client_path = dir.path().join("client.sock");
769 /// let server_path = dir.path().join("server.sock");
770 /// let socket = UnixDatagram::bind(&client_path)?;
771 /// socket.connect(&server_path)?;
772 ///
773 /// loop {
774 /// // Wait for the socket to be readable
775 /// socket.readable().await?;
776 ///
777 /// // The buffer is **not** included in the async task and will
778 /// // only exist on the stack.
779 /// let mut buf = [0; 1024];
780 ///
781 /// // Try to recv data, this may still fail with `WouldBlock`
782 /// // if the readiness event is a false positive.
783 /// match socket.try_recv(&mut buf) {
784 /// Ok(n) => {
785 /// println!("GOT {:?}", &buf[..n]);
786 /// break;
787 /// }
788 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
789 /// continue;
790 /// }
791 /// Err(e) => {
792 /// return Err(e);
793 /// }
794 /// }
795 /// }
796 ///
797 /// Ok(())
798 /// }
799 /// ```
800 pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
801 self.io
802 .registration()
803 .try_io(Interest::READABLE, || self.io.recv(buf))
804 }
805
806 cfg_io_util! {
807 /// Tries to receive data from the socket without waiting.
808 ///
809 /// This method can be used even if `buf` is uninitialized.
810 ///
811 /// # Examples
812 ///
813 /// ```no_run
814 /// use tokio::net::UnixDatagram;
815 /// use std::io;
816 ///
817 /// #[tokio::main]
818 /// async fn main() -> io::Result<()> {
819 /// // Connect to a peer
820 /// let dir = tempfile::tempdir().unwrap();
821 /// let client_path = dir.path().join("client.sock");
822 /// let server_path = dir.path().join("server.sock");
823 /// let socket = UnixDatagram::bind(&client_path)?;
824 ///
825 /// loop {
826 /// // Wait for the socket to be readable
827 /// socket.readable().await?;
828 ///
829 /// let mut buf = Vec::with_capacity(1024);
830 ///
831 /// // Try to recv data, this may still fail with `WouldBlock`
832 /// // if the readiness event is a false positive.
833 /// match socket.try_recv_buf_from(&mut buf) {
834 /// Ok((n, _addr)) => {
835 /// println!("GOT {:?}", &buf[..n]);
836 /// break;
837 /// }
838 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
839 /// continue;
840 /// }
841 /// Err(e) => {
842 /// return Err(e);
843 /// }
844 /// }
845 /// }
846 ///
847 /// Ok(())
848 /// }
849 /// ```
850 pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
851 let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
852 let dst = buf.chunk_mut();
853 let dst =
854 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
855
856 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
857 // buffer.
858 let (n, addr) = (*self.io).recv_from(dst)?;
859
860 unsafe {
861 buf.advance_mut(n);
862 }
863
864 Ok((n, addr))
865 })?;
866
867 Ok((n, SocketAddr(addr)))
868 }
869
870 /// Receives from the socket, advances the
871 /// buffer's internal cursor and returns how many bytes were read and the origin.
872 ///
873 /// This method can be used even if `buf` is uninitialized.
874 ///
875 /// # Examples
876 /// ```
877 /// # use std::error::Error;
878 /// # #[tokio::main]
879 /// # async fn main() -> Result<(), Box<dyn Error>> {
880 /// use tokio::net::UnixDatagram;
881 /// use tempfile::tempdir;
882 ///
883 /// // We use a temporary directory so that the socket
884 /// // files left by the bound sockets will get cleaned up.
885 /// let tmp = tempdir()?;
886 ///
887 /// // Bind each socket to a filesystem path
888 /// let tx_path = tmp.path().join("tx");
889 /// let tx = UnixDatagram::bind(&tx_path)?;
890 /// let rx_path = tmp.path().join("rx");
891 /// let rx = UnixDatagram::bind(&rx_path)?;
892 ///
893 /// let bytes = b"hello world";
894 /// tx.send_to(bytes, &rx_path).await?;
895 ///
896 /// let mut buf = Vec::with_capacity(24);
897 /// let (size, addr) = rx.recv_buf_from(&mut buf).await?;
898 ///
899 /// let dgram = &buf[..size];
900 /// assert_eq!(dgram, bytes);
901 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
902 ///
903 /// # Ok(())
904 /// # }
905 /// ```
906 pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
907 self.io.registration().async_io(Interest::READABLE, || {
908 let dst = buf.chunk_mut();
909 let dst =
910 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
911
912 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
913 // buffer.
914 let (n, addr) = (*self.io).recv_from(dst)?;
915
916 unsafe {
917 buf.advance_mut(n);
918 }
919 Ok((n,SocketAddr(addr)))
920 }).await
921 }
922
923 /// Tries to read data from the stream into the provided buffer, advancing the
924 /// buffer's internal cursor, returning how many bytes were read.
925 ///
926 /// This method can be used even if `buf` is uninitialized.
927 ///
928 /// # Examples
929 ///
930 /// ```no_run
931 /// use tokio::net::UnixDatagram;
932 /// use std::io;
933 ///
934 /// #[tokio::main]
935 /// async fn main() -> io::Result<()> {
936 /// // Connect to a peer
937 /// let dir = tempfile::tempdir().unwrap();
938 /// let client_path = dir.path().join("client.sock");
939 /// let server_path = dir.path().join("server.sock");
940 /// let socket = UnixDatagram::bind(&client_path)?;
941 /// socket.connect(&server_path)?;
942 ///
943 /// loop {
944 /// // Wait for the socket to be readable
945 /// socket.readable().await?;
946 ///
947 /// let mut buf = Vec::with_capacity(1024);
948 ///
949 /// // Try to recv data, this may still fail with `WouldBlock`
950 /// // if the readiness event is a false positive.
951 /// match socket.try_recv_buf(&mut buf) {
952 /// Ok(n) => {
953 /// println!("GOT {:?}", &buf[..n]);
954 /// break;
955 /// }
956 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
957 /// continue;
958 /// }
959 /// Err(e) => {
960 /// return Err(e);
961 /// }
962 /// }
963 /// }
964 ///
965 /// Ok(())
966 /// }
967 /// ```
968 pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
969 self.io.registration().try_io(Interest::READABLE, || {
970 let dst = buf.chunk_mut();
971 let dst =
972 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
973
974 // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
975 // buffer.
976 let n = (*self.io).recv(dst)?;
977
978 unsafe {
979 buf.advance_mut(n);
980 }
981
982 Ok(n)
983 })
984 }
985
986 /// Receives data from the socket from the address to which it is connected,
987 /// advancing the buffer's internal cursor, returning how many bytes were read.
988 ///
989 /// This method can be used even if `buf` is uninitialized.
990 ///
991 /// # Examples
992 /// ```
993 /// # use std::error::Error;
994 /// # #[tokio::main]
995 /// # async fn main() -> Result<(), Box<dyn Error>> {
996 /// use tokio::net::UnixDatagram;
997 ///
998 /// // Create the pair of sockets
999 /// let (sock1, sock2) = UnixDatagram::pair()?;
1000 ///
1001 /// // Since the sockets are paired, the paired send/recv
1002 /// // functions can be used
1003 /// let bytes = b"hello world";
1004 /// sock1.send(bytes).await?;
1005 ///
1006 /// let mut buff = Vec::with_capacity(24);
1007 /// let size = sock2.recv_buf(&mut buff).await?;
1008 ///
1009 /// let dgram = &buff[..size];
1010 /// assert_eq!(dgram, bytes);
1011 ///
1012 /// # Ok(())
1013 /// # }
1014 /// ```
1015 pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1016 self.io.registration().async_io(Interest::READABLE, || {
1017 let dst = buf.chunk_mut();
1018 let dst =
1019 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1020
1021 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
1022 // buffer.
1023 let n = (*self.io).recv(dst)?;
1024
1025 unsafe {
1026 buf.advance_mut(n);
1027 }
1028 Ok(n)
1029 }).await
1030 }
1031 }
1032
1033 /// Sends data on the socket to the specified address.
1034 ///
1035 /// # Cancel safety
1036 ///
1037 /// This method is cancel safe. If `send_to` is used as the event in a
1038 /// [`tokio::select!`](crate::select) statement and some other branch
1039 /// completes first, then it is guaranteed that the message was not sent.
1040 ///
1041 /// # Examples
1042 /// ```
1043 /// # use std::error::Error;
1044 /// # #[tokio::main]
1045 /// # async fn main() -> Result<(), Box<dyn Error>> {
1046 /// use tokio::net::UnixDatagram;
1047 /// use tempfile::tempdir;
1048 ///
1049 /// // We use a temporary directory so that the socket
1050 /// // files left by the bound sockets will get cleaned up.
1051 /// let tmp = tempdir()?;
1052 ///
1053 /// // Bind each socket to a filesystem path
1054 /// let tx_path = tmp.path().join("tx");
1055 /// let tx = UnixDatagram::bind(&tx_path)?;
1056 /// let rx_path = tmp.path().join("rx");
1057 /// let rx = UnixDatagram::bind(&rx_path)?;
1058 ///
1059 /// let bytes = b"hello world";
1060 /// tx.send_to(bytes, &rx_path).await?;
1061 ///
1062 /// let mut buf = vec![0u8; 24];
1063 /// let (size, addr) = rx.recv_from(&mut buf).await?;
1064 ///
1065 /// let dgram = &buf[..size];
1066 /// assert_eq!(dgram, bytes);
1067 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1068 ///
1069 /// # Ok(())
1070 /// # }
1071 /// ```
1072 pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
1073 where
1074 P: AsRef<Path>,
1075 {
1076 self.io
1077 .registration()
1078 .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref()))
1079 .await
1080 }
1081
1082 /// Receives data from the socket.
1083 ///
1084 /// # Cancel safety
1085 ///
1086 /// This method is cancel safe. If `recv_from` is used as the event in a
1087 /// [`tokio::select!`](crate::select) statement and some other branch
1088 /// completes first, it is guaranteed that no messages were received on this
1089 /// socket.
1090 ///
1091 /// # Examples
1092 /// ```
1093 /// # use std::error::Error;
1094 /// # #[tokio::main]
1095 /// # async fn main() -> Result<(), Box<dyn Error>> {
1096 /// use tokio::net::UnixDatagram;
1097 /// use tempfile::tempdir;
1098 ///
1099 /// // We use a temporary directory so that the socket
1100 /// // files left by the bound sockets will get cleaned up.
1101 /// let tmp = tempdir()?;
1102 ///
1103 /// // Bind each socket to a filesystem path
1104 /// let tx_path = tmp.path().join("tx");
1105 /// let tx = UnixDatagram::bind(&tx_path)?;
1106 /// let rx_path = tmp.path().join("rx");
1107 /// let rx = UnixDatagram::bind(&rx_path)?;
1108 ///
1109 /// let bytes = b"hello world";
1110 /// tx.send_to(bytes, &rx_path).await?;
1111 ///
1112 /// let mut buf = vec![0u8; 24];
1113 /// let (size, addr) = rx.recv_from(&mut buf).await?;
1114 ///
1115 /// let dgram = &buf[..size];
1116 /// assert_eq!(dgram, bytes);
1117 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1118 ///
1119 /// # Ok(())
1120 /// # }
1121 /// ```
1122 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1123 let (n, addr) = self
1124 .io
1125 .registration()
1126 .async_io(Interest::READABLE, || self.io.recv_from(buf))
1127 .await?;
1128
1129 Ok((n, SocketAddr(addr)))
1130 }
1131
1132 /// Attempts to receive a single datagram on the specified address.
1133 ///
1134 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
1135 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1136 /// receive a wakeup.
1137 ///
1138 /// # Return value
1139 ///
1140 /// The function returns:
1141 ///
1142 /// * `Poll::Pending` if the socket is not ready to read
1143 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1144 /// * `Poll::Ready(Err(e))` if an error is encountered.
1145 ///
1146 /// # Errors
1147 ///
1148 /// This function may encounter any standard I/O error except `WouldBlock`.
1149 pub fn poll_recv_from(
1150 &self,
1151 cx: &mut Context<'_>,
1152 buf: &mut ReadBuf<'_>,
1153 ) -> Poll<io::Result<SocketAddr>> {
1154 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1155 // Safety: will not read the maybe uninitialized bytes.
1156 let b = unsafe {
1157 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1158 };
1159
1160 self.io.recv_from(b)
1161 }))?;
1162
1163 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1164 unsafe {
1165 buf.assume_init(n);
1166 }
1167 buf.advance(n);
1168 Poll::Ready(Ok(SocketAddr(addr)))
1169 }
1170
1171 /// Attempts to send data to the specified address.
1172 ///
1173 /// Note that on multiple calls to a `poll_*` method in the send direction, only the
1174 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1175 /// receive a wakeup.
1176 ///
1177 /// # Return value
1178 ///
1179 /// The function returns:
1180 ///
1181 /// * `Poll::Pending` if the socket is not ready to write
1182 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
1183 /// * `Poll::Ready(Err(e))` if an error is encountered.
1184 ///
1185 /// # Errors
1186 ///
1187 /// This function may encounter any standard I/O error except `WouldBlock`.
1188 pub fn poll_send_to<P>(
1189 &self,
1190 cx: &mut Context<'_>,
1191 buf: &[u8],
1192 target: P,
1193 ) -> Poll<io::Result<usize>>
1194 where
1195 P: AsRef<Path>,
1196 {
1197 self.io
1198 .registration()
1199 .poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
1200 }
1201
1202 /// Attempts to send data on the socket to the remote address to which it
1203 /// was previously `connect`ed.
1204 ///
1205 /// The [`connect`] method will connect this socket to a remote address.
1206 /// This method will fail if the socket is not connected.
1207 ///
1208 /// Note that on multiple calls to a `poll_*` method in the send direction,
1209 /// only the `Waker` from the `Context` passed to the most recent call will
1210 /// be scheduled to receive a wakeup.
1211 ///
1212 /// # Return value
1213 ///
1214 /// The function returns:
1215 ///
1216 /// * `Poll::Pending` if the socket is not available to write
1217 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
1218 /// * `Poll::Ready(Err(e))` if an error is encountered.
1219 ///
1220 /// # Errors
1221 ///
1222 /// This function may encounter any standard I/O error except `WouldBlock`.
1223 ///
1224 /// [`connect`]: method@Self::connect
1225 pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
1226 self.io
1227 .registration()
1228 .poll_write_io(cx, || self.io.send(buf))
1229 }
1230
1231 /// Attempts to receive a single datagram message on the socket from the remote
1232 /// address to which it is `connect`ed.
1233 ///
1234 /// The [`connect`] method will connect this socket to a remote address. This method
1235 /// resolves to an error if the socket is not connected.
1236 ///
1237 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
1238 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1239 /// receive a wakeup.
1240 ///
1241 /// # Return value
1242 ///
1243 /// The function returns:
1244 ///
1245 /// * `Poll::Pending` if the socket is not ready to read
1246 /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
1247 /// * `Poll::Ready(Err(e))` if an error is encountered.
1248 ///
1249 /// # Errors
1250 ///
1251 /// This function may encounter any standard I/O error except `WouldBlock`.
1252 ///
1253 /// [`connect`]: method@Self::connect
1254 pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
1255 let n = ready!(self.io.registration().poll_read_io(cx, || {
1256 // Safety: will not read the maybe uninitialized bytes.
1257 let b = unsafe {
1258 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1259 };
1260
1261 self.io.recv(b)
1262 }))?;
1263
1264 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1265 unsafe {
1266 buf.assume_init(n);
1267 }
1268 buf.advance(n);
1269 Poll::Ready(Ok(()))
1270 }
1271
1272 /// Tries to receive data from the socket without waiting.
1273 ///
1274 /// # Examples
1275 ///
1276 /// ```no_run
1277 /// use tokio::net::UnixDatagram;
1278 /// use std::io;
1279 ///
1280 /// #[tokio::main]
1281 /// async fn main() -> io::Result<()> {
1282 /// // Connect to a peer
1283 /// let dir = tempfile::tempdir().unwrap();
1284 /// let client_path = dir.path().join("client.sock");
1285 /// let server_path = dir.path().join("server.sock");
1286 /// let socket = UnixDatagram::bind(&client_path)?;
1287 ///
1288 /// loop {
1289 /// // Wait for the socket to be readable
1290 /// socket.readable().await?;
1291 ///
1292 /// // The buffer is **not** included in the async task and will
1293 /// // only exist on the stack.
1294 /// let mut buf = [0; 1024];
1295 ///
1296 /// // Try to recv data, this may still fail with `WouldBlock`
1297 /// // if the readiness event is a false positive.
1298 /// match socket.try_recv_from(&mut buf) {
1299 /// Ok((n, _addr)) => {
1300 /// println!("GOT {:?}", &buf[..n]);
1301 /// break;
1302 /// }
1303 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1304 /// continue;
1305 /// }
1306 /// Err(e) => {
1307 /// return Err(e);
1308 /// }
1309 /// }
1310 /// }
1311 ///
1312 /// Ok(())
1313 /// }
1314 /// ```
1315 pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1316 let (n, addr) = self
1317 .io
1318 .registration()
1319 .try_io(Interest::READABLE, || self.io.recv_from(buf))?;
1320
1321 Ok((n, SocketAddr(addr)))
1322 }
1323
1324 /// Tries to read or write from the socket using a user-provided IO operation.
1325 ///
1326 /// If the socket is ready, the provided closure is called. The closure
1327 /// should attempt to perform IO operation on the socket by manually
1328 /// calling the appropriate syscall. If the operation fails because the
1329 /// socket is not actually ready, then the closure should return a
1330 /// `WouldBlock` error and the readiness flag is cleared. The return value
1331 /// of the closure is then returned by `try_io`.
1332 ///
1333 /// If the socket is not ready, then the closure is not called
1334 /// and a `WouldBlock` error is returned.
1335 ///
1336 /// The closure should only return a `WouldBlock` error if it has performed
1337 /// an IO operation on the socket that failed due to the socket not being
1338 /// ready. Returning a `WouldBlock` error in any other situation will
1339 /// incorrectly clear the readiness flag, which can cause the socket to
1340 /// behave incorrectly.
1341 ///
1342 /// The closure should not perform the IO operation using any of the methods
1343 /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1344 /// readiness flag and can cause the socket to behave incorrectly.
1345 ///
1346 /// This method is not intended to be used with combined interests.
1347 /// The closure should perform only one type of IO operation, so it should not
1348 /// require more than one ready state. This method may panic or sleep forever
1349 /// if it is called with a combined interest.
1350 ///
1351 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1352 ///
1353 /// [`readable()`]: UnixDatagram::readable()
1354 /// [`writable()`]: UnixDatagram::writable()
1355 /// [`ready()`]: UnixDatagram::ready()
1356 pub fn try_io<R>(
1357 &self,
1358 interest: Interest,
1359 f: impl FnOnce() -> io::Result<R>,
1360 ) -> io::Result<R> {
1361 self.io
1362 .registration()
1363 .try_io(interest, || self.io.try_io(f))
1364 }
1365
1366 /// Reads or writes from the socket using a user-provided IO operation.
1367 ///
1368 /// The readiness of the socket is awaited and when the socket is ready,
1369 /// the provided closure is called. The closure should attempt to perform
1370 /// IO operation on the socket by manually calling the appropriate syscall.
1371 /// If the operation fails because the socket is not actually ready,
1372 /// then the closure should return a `WouldBlock` error. In such case the
1373 /// readiness flag is cleared and the socket readiness is awaited again.
1374 /// This loop is repeated until the closure returns an `Ok` or an error
1375 /// other than `WouldBlock`.
1376 ///
1377 /// The closure should only return a `WouldBlock` error if it has performed
1378 /// an IO operation on the socket that failed due to the socket not being
1379 /// ready. Returning a `WouldBlock` error in any other situation will
1380 /// incorrectly clear the readiness flag, which can cause the socket to
1381 /// behave incorrectly.
1382 ///
1383 /// The closure should not perform the IO operation using any of the methods
1384 /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1385 /// readiness flag and can cause the socket to behave incorrectly.
1386 ///
1387 /// This method is not intended to be used with combined interests.
1388 /// The closure should perform only one type of IO operation, so it should not
1389 /// require more than one ready state. This method may panic or sleep forever
1390 /// if it is called with a combined interest.
1391 pub async fn async_io<R>(
1392 &self,
1393 interest: Interest,
1394 mut f: impl FnMut() -> io::Result<R>,
1395 ) -> io::Result<R> {
1396 self.io
1397 .registration()
1398 .async_io(interest, || self.io.try_io(&mut f))
1399 .await
1400 }
1401
1402 /// Returns the local address that this socket is bound to.
1403 ///
1404 /// # Examples
1405 /// For a socket bound to a local path
1406 /// ```
1407 /// # use std::error::Error;
1408 /// # #[tokio::main]
1409 /// # async fn main() -> Result<(), Box<dyn Error>> {
1410 /// use tokio::net::UnixDatagram;
1411 /// use tempfile::tempdir;
1412 ///
1413 /// // We use a temporary directory so that the socket
1414 /// // files left by the bound sockets will get cleaned up.
1415 /// let tmp = tempdir()?;
1416 ///
1417 /// // Bind socket to a filesystem path
1418 /// let socket_path = tmp.path().join("socket");
1419 /// let socket = UnixDatagram::bind(&socket_path)?;
1420 ///
1421 /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path);
1422 ///
1423 /// # Ok(())
1424 /// # }
1425 /// ```
1426 ///
1427 /// For an unbound socket
1428 /// ```
1429 /// # use std::error::Error;
1430 /// # #[tokio::main]
1431 /// # async fn main() -> Result<(), Box<dyn Error>> {
1432 /// use tokio::net::UnixDatagram;
1433 ///
1434 /// // Create an unbound socket
1435 /// let socket = UnixDatagram::unbound()?;
1436 ///
1437 /// assert!(socket.local_addr()?.is_unnamed());
1438 ///
1439 /// # Ok(())
1440 /// # }
1441 /// ```
1442 pub fn local_addr(&self) -> io::Result<SocketAddr> {
1443 self.io.local_addr().map(SocketAddr)
1444 }
1445
1446 /// Returns the address of this socket's peer.
1447 ///
1448 /// The `connect` method will connect the socket to a peer.
1449 ///
1450 /// # Examples
1451 /// For a peer with a local path
1452 /// ```
1453 /// # use std::error::Error;
1454 /// # #[tokio::main]
1455 /// # async fn main() -> Result<(), Box<dyn Error>> {
1456 /// use tokio::net::UnixDatagram;
1457 /// use tempfile::tempdir;
1458 ///
1459 /// // Create an unbound socket
1460 /// let tx = UnixDatagram::unbound()?;
1461 ///
1462 /// // Create another, bound socket
1463 /// let tmp = tempdir()?;
1464 /// let rx_path = tmp.path().join("rx");
1465 /// let rx = UnixDatagram::bind(&rx_path)?;
1466 ///
1467 /// // Connect to the bound socket
1468 /// tx.connect(&rx_path)?;
1469 ///
1470 /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path);
1471 ///
1472 /// # Ok(())
1473 /// # }
1474 /// ```
1475 ///
1476 /// For an unbound peer
1477 /// ```
1478 /// # use std::error::Error;
1479 /// # #[tokio::main]
1480 /// # async fn main() -> Result<(), Box<dyn Error>> {
1481 /// use tokio::net::UnixDatagram;
1482 ///
1483 /// // Create the pair of sockets
1484 /// let (sock1, sock2) = UnixDatagram::pair()?;
1485 ///
1486 /// assert!(sock1.peer_addr()?.is_unnamed());
1487 ///
1488 /// # Ok(())
1489 /// # }
1490 /// ```
1491 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
1492 self.io.peer_addr().map(SocketAddr)
1493 }
1494
1495 /// Returns the value of the `SO_ERROR` option.
1496 ///
1497 /// # Examples
1498 /// ```
1499 /// # use std::error::Error;
1500 /// # #[tokio::main]
1501 /// # async fn main() -> Result<(), Box<dyn Error>> {
1502 /// use tokio::net::UnixDatagram;
1503 ///
1504 /// // Create an unbound socket
1505 /// let socket = UnixDatagram::unbound()?;
1506 ///
1507 /// if let Ok(Some(err)) = socket.take_error() {
1508 /// println!("Got error: {:?}", err);
1509 /// }
1510 ///
1511 /// # Ok(())
1512 /// # }
1513 /// ```
1514 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1515 self.io.take_error()
1516 }
1517
1518 /// Shuts down the read, write, or both halves of this connection.
1519 ///
1520 /// This function will cause all pending and future I/O calls on the
1521 /// specified portions to immediately return with an appropriate value
1522 /// (see the documentation of `Shutdown`).
1523 ///
1524 /// # Examples
1525 /// ```
1526 /// # use std::error::Error;
1527 /// # #[tokio::main]
1528 /// # async fn main() -> Result<(), Box<dyn Error>> {
1529 /// use tokio::net::UnixDatagram;
1530 /// use std::net::Shutdown;
1531 ///
1532 /// // Create an unbound socket
1533 /// let (socket, other) = UnixDatagram::pair()?;
1534 ///
1535 /// socket.shutdown(Shutdown::Both)?;
1536 ///
1537 /// // NOTE: the following commented out code does NOT work as expected.
1538 /// // Due to an underlying issue, the recv call will block indefinitely.
1539 /// // See: https://github.com/tokio-rs/tokio/issues/1679
1540 /// //let mut buff = vec![0u8; 24];
1541 /// //let size = socket.recv(&mut buff).await?;
1542 /// //assert_eq!(size, 0);
1543 ///
1544 /// let send_result = socket.send(b"hello world").await;
1545 /// assert!(send_result.is_err());
1546 ///
1547 /// # Ok(())
1548 /// # }
1549 /// ```
1550 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
1551 self.io.shutdown(how)
1552 }
1553}
1554
1555impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
1556 type Error = io::Error;
1557
1558 /// Consumes stream, returning the Tokio I/O object.
1559 ///
1560 /// This is equivalent to
1561 /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
1562 fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> {
1563 Self::from_std(stream)
1564 }
1565}
1566
1567impl fmt::Debug for UnixDatagram {
1568 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1569 self.io.fmt(f)
1570 }
1571}
1572
1573impl AsRawFd for UnixDatagram {
1574 fn as_raw_fd(&self) -> RawFd {
1575 self.io.as_raw_fd()
1576 }
1577}
1578
1579impl AsFd for UnixDatagram {
1580 fn as_fd(&self) -> BorrowedFd<'_> {
1581 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1582 }
1583}
1584