1use crate::io::{Interest, PollEvented, ReadBuf, Ready};
2use crate::net::unix::SocketAddr;
3
4use std::fmt;
5use std::io;
6use std::net::Shutdown;
7use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
8use std::os::unix::net;
9use std::path::Path;
10use std::task::{Context, Poll};
11
12cfg_io_util! {
13 use bytes::BufMut;
14}
15
16cfg_net_unix! {
17 /// An I/O object representing a Unix datagram socket.
18 ///
19 /// A socket can be either named (associated with a filesystem path) or
20 /// unnamed.
21 ///
22 /// This type does not provide a `split` method, because this functionality
23 /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
24 /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>`
25 /// is enough. This is because all of the methods take `&self` instead of
26 /// `&mut self`.
27 ///
28 /// **Note:** named sockets are persisted even after the object is dropped
29 /// and the program has exited, and cannot be reconnected. It is advised
30 /// that you either check for and unlink the existing socket if it exists,
31 /// or use a temporary file that is guaranteed to not already exist.
32 ///
33 /// [`Arc`]: std::sync::Arc
34 ///
35 /// # Examples
36 /// Using named sockets, associated with a filesystem path:
37 /// ```
38 /// # use std::error::Error;
39 /// # #[tokio::main]
40 /// # async fn main() -> Result<(), Box<dyn Error>> {
41 /// use tokio::net::UnixDatagram;
42 /// use tempfile::tempdir;
43 ///
44 /// // We use a temporary directory so that the socket
45 /// // files left by the bound sockets will get cleaned up.
46 /// let tmp = tempdir()?;
47 ///
48 /// // Bind each socket to a filesystem path
49 /// let tx_path = tmp.path().join("tx");
50 /// let tx = UnixDatagram::bind(&tx_path)?;
51 /// let rx_path = tmp.path().join("rx");
52 /// let rx = UnixDatagram::bind(&rx_path)?;
53 ///
54 /// let bytes = b"hello world";
55 /// tx.send_to(bytes, &rx_path).await?;
56 ///
57 /// let mut buf = vec![0u8; 24];
58 /// let (size, addr) = rx.recv_from(&mut buf).await?;
59 ///
60 /// let dgram = &buf[..size];
61 /// assert_eq!(dgram, bytes);
62 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
63 ///
64 /// # Ok(())
65 /// # }
66 /// ```
67 ///
68 /// Using unnamed sockets, created as a pair
69 /// ```
70 /// # use std::error::Error;
71 /// # #[tokio::main]
72 /// # async fn main() -> Result<(), Box<dyn Error>> {
73 /// use tokio::net::UnixDatagram;
74 ///
75 /// // Create the pair of sockets
76 /// let (sock1, sock2) = UnixDatagram::pair()?;
77 ///
78 /// // Since the sockets are paired, the paired send/recv
79 /// // functions can be used
80 /// let bytes = b"hello world";
81 /// sock1.send(bytes).await?;
82 ///
83 /// let mut buff = vec![0u8; 24];
84 /// let size = sock2.recv(&mut buff).await?;
85 ///
86 /// let dgram = &buff[..size];
87 /// assert_eq!(dgram, bytes);
88 ///
89 /// # Ok(())
90 /// # }
91 /// ```
92 #[cfg_attr(docsrs, doc(alias = "uds"))]
93 pub struct UnixDatagram {
94 io: PollEvented<mio::net::UnixDatagram>,
95 }
96}
97
98impl UnixDatagram {
99 pub(crate) fn from_mio(sys: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
100 let datagram = UnixDatagram::new(sys)?;
101
102 if let Some(e) = datagram.io.take_error()? {
103 return Err(e);
104 }
105
106 Ok(datagram)
107 }
108
109 /// Waits for any of the requested ready states.
110 ///
111 /// This function is usually paired with `try_recv()` or `try_send()`. It
112 /// can be used to concurrently `recv` / `send` to the same socket on a single
113 /// task without splitting the socket.
114 ///
115 /// The function may complete without the socket being ready. This is a
116 /// false-positive and attempting an operation will return with
117 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
118 /// [`Ready`] set, so you should always check the returned value and possibly
119 /// wait again if the requested states are not set.
120 ///
121 /// # Cancel safety
122 ///
123 /// This method is cancel safe. Once a readiness event occurs, the method
124 /// will continue to return immediately until the readiness event is
125 /// consumed by an attempt to read or write that fails with `WouldBlock` or
126 /// `Poll::Pending`.
127 ///
128 /// # Examples
129 ///
130 /// Concurrently receive from and send to the socket on the same task
131 /// without splitting.
132 ///
133 /// ```no_run
134 /// use tokio::io::Interest;
135 /// use tokio::net::UnixDatagram;
136 /// use std::io;
137 ///
138 /// #[tokio::main]
139 /// async fn main() -> io::Result<()> {
140 /// let dir = tempfile::tempdir().unwrap();
141 /// let client_path = dir.path().join("client.sock");
142 /// let server_path = dir.path().join("server.sock");
143 /// let socket = UnixDatagram::bind(&client_path)?;
144 /// socket.connect(&server_path)?;
145 ///
146 /// loop {
147 /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
148 ///
149 /// if ready.is_readable() {
150 /// let mut data = [0; 1024];
151 /// match socket.try_recv(&mut data[..]) {
152 /// Ok(n) => {
153 /// println!("received {:?}", &data[..n]);
154 /// }
155 /// // False-positive, continue
156 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
157 /// Err(e) => {
158 /// return Err(e);
159 /// }
160 /// }
161 /// }
162 ///
163 /// if ready.is_writable() {
164 /// // Write some data
165 /// match socket.try_send(b"hello world") {
166 /// Ok(n) => {
167 /// println!("sent {} bytes", n);
168 /// }
169 /// // False-positive, continue
170 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
171 /// Err(e) => {
172 /// return Err(e);
173 /// }
174 /// }
175 /// }
176 /// }
177 /// }
178 /// ```
179 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
180 let event = self.io.registration().readiness(interest).await?;
181 Ok(event.ready)
182 }
183
184 /// Waits for the socket to become writable.
185 ///
186 /// This function is equivalent to `ready(Interest::WRITABLE)` and is
187 /// usually paired with `try_send()` or `try_send_to()`.
188 ///
189 /// The function may complete without the socket being writable. This is a
190 /// false-positive and attempting a `try_send()` will return with
191 /// `io::ErrorKind::WouldBlock`.
192 ///
193 /// # Cancel safety
194 ///
195 /// This method is cancel safe. Once a readiness event occurs, the method
196 /// will continue to return immediately until the readiness event is
197 /// consumed by an attempt to write that fails with `WouldBlock` or
198 /// `Poll::Pending`.
199 ///
200 /// # Examples
201 ///
202 /// ```no_run
203 /// use tokio::net::UnixDatagram;
204 /// use std::io;
205 ///
206 /// #[tokio::main]
207 /// async fn main() -> io::Result<()> {
208 /// let dir = tempfile::tempdir().unwrap();
209 /// let client_path = dir.path().join("client.sock");
210 /// let server_path = dir.path().join("server.sock");
211 /// let socket = UnixDatagram::bind(&client_path)?;
212 /// socket.connect(&server_path)?;
213 ///
214 /// loop {
215 /// // Wait for the socket to be writable
216 /// socket.writable().await?;
217 ///
218 /// // Try to send data, this may still fail with `WouldBlock`
219 /// // if the readiness event is a false positive.
220 /// match socket.try_send(b"hello world") {
221 /// Ok(n) => {
222 /// break;
223 /// }
224 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
225 /// continue;
226 /// }
227 /// Err(e) => {
228 /// return Err(e);
229 /// }
230 /// }
231 /// }
232 ///
233 /// Ok(())
234 /// }
235 /// ```
236 pub async fn writable(&self) -> io::Result<()> {
237 self.ready(Interest::WRITABLE).await?;
238 Ok(())
239 }
240
241 /// Polls for write/send readiness.
242 ///
243 /// If the socket is not currently ready for sending, this method will
244 /// store a clone of the `Waker` from the provided `Context`. When the socket
245 /// becomes ready for sending, `Waker::wake` will be called on the
246 /// waker.
247 ///
248 /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
249 /// the `Waker` from the `Context` passed to the most recent call is
250 /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
251 /// second, independent waker.)
252 ///
253 /// This function is intended for cases where creating and pinning a future
254 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
255 /// preferred, as this supports polling from multiple tasks at once.
256 ///
257 /// # Return value
258 ///
259 /// The function returns:
260 ///
261 /// * `Poll::Pending` if the socket is not ready for writing.
262 /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
263 /// * `Poll::Ready(Err(e))` if an error is encountered.
264 ///
265 /// # Errors
266 ///
267 /// This function may encounter any standard I/O error except `WouldBlock`.
268 ///
269 /// [`writable`]: method@Self::writable
270 pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
271 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
272 }
273
274 /// Waits for the socket to become readable.
275 ///
276 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
277 /// paired with `try_recv()`.
278 ///
279 /// The function may complete without the socket being readable. This is a
280 /// false-positive and attempting a `try_recv()` will return with
281 /// `io::ErrorKind::WouldBlock`.
282 ///
283 /// # Cancel safety
284 ///
285 /// This method is cancel safe. Once a readiness event occurs, the method
286 /// will continue to return immediately until the readiness event is
287 /// consumed by an attempt to read that fails with `WouldBlock` or
288 /// `Poll::Pending`.
289 ///
290 /// # Examples
291 ///
292 /// ```no_run
293 /// use tokio::net::UnixDatagram;
294 /// use std::io;
295 ///
296 /// #[tokio::main]
297 /// async fn main() -> io::Result<()> {
298 /// // Connect to a peer
299 /// let dir = tempfile::tempdir().unwrap();
300 /// let client_path = dir.path().join("client.sock");
301 /// let server_path = dir.path().join("server.sock");
302 /// let socket = UnixDatagram::bind(&client_path)?;
303 /// socket.connect(&server_path)?;
304 ///
305 /// loop {
306 /// // Wait for the socket to be readable
307 /// socket.readable().await?;
308 ///
309 /// // The buffer is **not** included in the async task and will
310 /// // only exist on the stack.
311 /// let mut buf = [0; 1024];
312 ///
313 /// // Try to recv data, this may still fail with `WouldBlock`
314 /// // if the readiness event is a false positive.
315 /// match socket.try_recv(&mut buf) {
316 /// Ok(n) => {
317 /// println!("GOT {:?}", &buf[..n]);
318 /// break;
319 /// }
320 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
321 /// continue;
322 /// }
323 /// Err(e) => {
324 /// return Err(e);
325 /// }
326 /// }
327 /// }
328 ///
329 /// Ok(())
330 /// }
331 /// ```
332 pub async fn readable(&self) -> io::Result<()> {
333 self.ready(Interest::READABLE).await?;
334 Ok(())
335 }
336
337 /// Polls for read/receive readiness.
338 ///
339 /// If the socket is not currently ready for receiving, this method will
340 /// store a clone of the `Waker` from the provided `Context`. When the
341 /// socket becomes ready for reading, `Waker::wake` will be called on the
342 /// waker.
343 ///
344 /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
345 /// `poll_peek`, only the `Waker` from the `Context` passed to the most
346 /// recent call is scheduled to receive a wakeup. (However,
347 /// `poll_send_ready` retains a second, independent waker.)
348 ///
349 /// This function is intended for cases where creating and pinning a future
350 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
351 /// preferred, as this supports polling from multiple tasks at once.
352 ///
353 /// # Return value
354 ///
355 /// The function returns:
356 ///
357 /// * `Poll::Pending` if the socket is not ready for reading.
358 /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
359 /// * `Poll::Ready(Err(e))` if an error is encountered.
360 ///
361 /// # Errors
362 ///
363 /// This function may encounter any standard I/O error except `WouldBlock`.
364 ///
365 /// [`readable`]: method@Self::readable
366 pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
367 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
368 }
369
370 /// Creates a new `UnixDatagram` bound to the specified path.
371 ///
372 /// # Examples
373 /// ```
374 /// # use std::error::Error;
375 /// # #[tokio::main]
376 /// # async fn main() -> Result<(), Box<dyn Error>> {
377 /// use tokio::net::UnixDatagram;
378 /// use tempfile::tempdir;
379 ///
380 /// // We use a temporary directory so that the socket
381 /// // files left by the bound sockets will get cleaned up.
382 /// let tmp = tempdir()?;
383 ///
384 /// // Bind the socket to a filesystem path
385 /// let socket_path = tmp.path().join("socket");
386 /// let socket = UnixDatagram::bind(&socket_path)?;
387 ///
388 /// # Ok(())
389 /// # }
390 /// ```
391 pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
392 where
393 P: AsRef<Path>,
394 {
395 let socket = mio::net::UnixDatagram::bind(path)?;
396 UnixDatagram::new(socket)
397 }
398
399 /// Creates an unnamed pair of connected sockets.
400 ///
401 /// This function will create a pair of interconnected Unix sockets for
402 /// communicating back and forth between one another.
403 ///
404 /// # Examples
405 /// ```
406 /// # use std::error::Error;
407 /// # #[tokio::main]
408 /// # async fn main() -> Result<(), Box<dyn Error>> {
409 /// use tokio::net::UnixDatagram;
410 ///
411 /// // Create the pair of sockets
412 /// let (sock1, sock2) = UnixDatagram::pair()?;
413 ///
414 /// // Since the sockets are paired, the paired send/recv
415 /// // functions can be used
416 /// let bytes = b"hail eris";
417 /// sock1.send(bytes).await?;
418 ///
419 /// let mut buff = vec![0u8; 24];
420 /// let size = sock2.recv(&mut buff).await?;
421 ///
422 /// let dgram = &buff[..size];
423 /// assert_eq!(dgram, bytes);
424 ///
425 /// # Ok(())
426 /// # }
427 /// ```
428 pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
429 let (a, b) = mio::net::UnixDatagram::pair()?;
430 let a = UnixDatagram::new(a)?;
431 let b = UnixDatagram::new(b)?;
432
433 Ok((a, b))
434 }
435
436 /// Creates new [`UnixDatagram`] from a [`std::os::unix::net::UnixDatagram`].
437 ///
438 /// This function is intended to be used to wrap a `UnixDatagram` from the
439 /// standard library in the Tokio equivalent.
440 ///
441 /// # Notes
442 ///
443 /// The caller is responsible for ensuring that the socket is in
444 /// non-blocking mode. Otherwise all I/O operations on the socket
445 /// will block the thread, which will cause unexpected behavior.
446 /// Non-blocking mode can be set using [`set_nonblocking`].
447 ///
448 /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking
449 ///
450 /// # Panics
451 ///
452 /// This function panics if it is not called from within a runtime with
453 /// IO enabled.
454 ///
455 /// The runtime is usually set implicitly when this function is called
456 /// from a future driven by a Tokio runtime, otherwise runtime can be set
457 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
458 /// # Examples
459 /// ```
460 /// # use std::error::Error;
461 /// # #[tokio::main]
462 /// # async fn main() -> Result<(), Box<dyn Error>> {
463 /// use tokio::net::UnixDatagram;
464 /// use std::os::unix::net::UnixDatagram as StdUDS;
465 /// use tempfile::tempdir;
466 ///
467 /// // We use a temporary directory so that the socket
468 /// // files left by the bound sockets will get cleaned up.
469 /// let tmp = tempdir()?;
470 ///
471 /// // Bind the socket to a filesystem path
472 /// let socket_path = tmp.path().join("socket");
473 /// let std_socket = StdUDS::bind(&socket_path)?;
474 /// std_socket.set_nonblocking(true)?;
475 /// let tokio_socket = UnixDatagram::from_std(std_socket)?;
476 ///
477 /// # Ok(())
478 /// # }
479 /// ```
480 #[track_caller]
481 pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
482 let socket = mio::net::UnixDatagram::from_std(datagram);
483 let io = PollEvented::new(socket)?;
484 Ok(UnixDatagram { io })
485 }
486
487 /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
488 ///
489 /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
490 /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
491 /// if needed.
492 ///
493 /// # Examples
494 ///
495 /// ```rust,no_run
496 /// # use std::error::Error;
497 /// # async fn dox() -> Result<(), Box<dyn Error>> {
498 /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?;
499 /// let std_socket = tokio_socket.into_std()?;
500 /// std_socket.set_nonblocking(false)?;
501 /// # Ok(())
502 /// # }
503 /// ```
504 ///
505 /// [`tokio::net::UnixDatagram`]: UnixDatagram
506 /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram
507 /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking
508 pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> {
509 self.io
510 .into_inner()
511 .map(IntoRawFd::into_raw_fd)
512 .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) })
513 }
514
515 fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
516 let io = PollEvented::new(socket)?;
517 Ok(UnixDatagram { io })
518 }
519
520 /// Creates a new `UnixDatagram` which is not bound to any address.
521 ///
522 /// # Examples
523 /// ```
524 /// # use std::error::Error;
525 /// # #[tokio::main]
526 /// # async fn main() -> Result<(), Box<dyn Error>> {
527 /// use tokio::net::UnixDatagram;
528 /// use tempfile::tempdir;
529 ///
530 /// // Create an unbound socket
531 /// let tx = UnixDatagram::unbound()?;
532 ///
533 /// // Create another, bound socket
534 /// let tmp = tempdir()?;
535 /// let rx_path = tmp.path().join("rx");
536 /// let rx = UnixDatagram::bind(&rx_path)?;
537 ///
538 /// // Send to the bound socket
539 /// let bytes = b"hello world";
540 /// tx.send_to(bytes, &rx_path).await?;
541 ///
542 /// let mut buf = vec![0u8; 24];
543 /// let (size, addr) = rx.recv_from(&mut buf).await?;
544 ///
545 /// let dgram = &buf[..size];
546 /// assert_eq!(dgram, bytes);
547 ///
548 /// # Ok(())
549 /// # }
550 /// ```
551 pub fn unbound() -> io::Result<UnixDatagram> {
552 let socket = mio::net::UnixDatagram::unbound()?;
553 UnixDatagram::new(socket)
554 }
555
556 /// Connects the socket to the specified address.
557 ///
558 /// The `send` method may be used to send data to the specified address.
559 /// `recv` and `recv_from` will only receive data from that address.
560 ///
561 /// # Examples
562 /// ```
563 /// # use std::error::Error;
564 /// # #[tokio::main]
565 /// # async fn main() -> Result<(), Box<dyn Error>> {
566 /// use tokio::net::UnixDatagram;
567 /// use tempfile::tempdir;
568 ///
569 /// // Create an unbound socket
570 /// let tx = UnixDatagram::unbound()?;
571 ///
572 /// // Create another, bound socket
573 /// let tmp = tempdir()?;
574 /// let rx_path = tmp.path().join("rx");
575 /// let rx = UnixDatagram::bind(&rx_path)?;
576 ///
577 /// // Connect to the bound socket
578 /// tx.connect(&rx_path)?;
579 ///
580 /// // Send to the bound socket
581 /// let bytes = b"hello world";
582 /// tx.send(bytes).await?;
583 ///
584 /// let mut buf = vec![0u8; 24];
585 /// let (size, addr) = rx.recv_from(&mut buf).await?;
586 ///
587 /// let dgram = &buf[..size];
588 /// assert_eq!(dgram, bytes);
589 ///
590 /// # Ok(())
591 /// # }
592 /// ```
593 pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
594 self.io.connect(path)
595 }
596
597 /// Sends data on the socket to the socket's peer.
598 ///
599 /// # Cancel safety
600 ///
601 /// This method is cancel safe. If `send` is used as the event in a
602 /// [`tokio::select!`](crate::select) statement and some other branch
603 /// completes first, then it is guaranteed that the message was not sent.
604 ///
605 /// # Examples
606 /// ```
607 /// # use std::error::Error;
608 /// # #[tokio::main]
609 /// # async fn main() -> Result<(), Box<dyn Error>> {
610 /// use tokio::net::UnixDatagram;
611 ///
612 /// // Create the pair of sockets
613 /// let (sock1, sock2) = UnixDatagram::pair()?;
614 ///
615 /// // Since the sockets are paired, the paired send/recv
616 /// // functions can be used
617 /// let bytes = b"hello world";
618 /// sock1.send(bytes).await?;
619 ///
620 /// let mut buff = vec![0u8; 24];
621 /// let size = sock2.recv(&mut buff).await?;
622 ///
623 /// let dgram = &buff[..size];
624 /// assert_eq!(dgram, bytes);
625 ///
626 /// # Ok(())
627 /// # }
628 /// ```
629 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
630 self.io
631 .registration()
632 .async_io(Interest::WRITABLE, || self.io.send(buf))
633 .await
634 }
635
636 /// Tries to send a datagram to the peer without waiting.
637 ///
638 /// # Examples
639 ///
640 /// ```no_run
641 /// use tokio::net::UnixDatagram;
642 /// use std::io;
643 ///
644 /// #[tokio::main]
645 /// async fn main() -> io::Result<()> {
646 /// let dir = tempfile::tempdir().unwrap();
647 /// let client_path = dir.path().join("client.sock");
648 /// let server_path = dir.path().join("server.sock");
649 /// let socket = UnixDatagram::bind(&client_path)?;
650 /// socket.connect(&server_path)?;
651 ///
652 /// loop {
653 /// // Wait for the socket to be writable
654 /// socket.writable().await?;
655 ///
656 /// // Try to send data, this may still fail with `WouldBlock`
657 /// // if the readiness event is a false positive.
658 /// match socket.try_send(b"hello world") {
659 /// Ok(n) => {
660 /// break;
661 /// }
662 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
663 /// continue;
664 /// }
665 /// Err(e) => {
666 /// return Err(e);
667 /// }
668 /// }
669 /// }
670 ///
671 /// Ok(())
672 /// }
673 /// ```
674 pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
675 self.io
676 .registration()
677 .try_io(Interest::WRITABLE, || self.io.send(buf))
678 }
679
680 /// Tries to send a datagram to the peer without waiting.
681 ///
682 /// # Examples
683 ///
684 /// ```no_run
685 /// use tokio::net::UnixDatagram;
686 /// use std::io;
687 ///
688 /// #[tokio::main]
689 /// async fn main() -> io::Result<()> {
690 /// let dir = tempfile::tempdir().unwrap();
691 /// let client_path = dir.path().join("client.sock");
692 /// let server_path = dir.path().join("server.sock");
693 /// let socket = UnixDatagram::bind(&client_path)?;
694 ///
695 /// loop {
696 /// // Wait for the socket to be writable
697 /// socket.writable().await?;
698 ///
699 /// // Try to send data, this may still fail with `WouldBlock`
700 /// // if the readiness event is a false positive.
701 /// match socket.try_send_to(b"hello world", &server_path) {
702 /// Ok(n) => {
703 /// break;
704 /// }
705 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
706 /// continue;
707 /// }
708 /// Err(e) => {
709 /// return Err(e);
710 /// }
711 /// }
712 /// }
713 ///
714 /// Ok(())
715 /// }
716 /// ```
717 pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
718 where
719 P: AsRef<Path>,
720 {
721 self.io
722 .registration()
723 .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
724 }
725
726 /// Receives data from the socket.
727 ///
728 /// # Cancel safety
729 ///
730 /// This method is cancel safe. If `recv` is used as the event in a
731 /// [`tokio::select!`](crate::select) statement and some other branch
732 /// completes first, it is guaranteed that no messages were received on this
733 /// socket.
734 ///
735 /// # Examples
736 /// ```
737 /// # use std::error::Error;
738 /// # #[tokio::main]
739 /// # async fn main() -> Result<(), Box<dyn Error>> {
740 /// use tokio::net::UnixDatagram;
741 ///
742 /// // Create the pair of sockets
743 /// let (sock1, sock2) = UnixDatagram::pair()?;
744 ///
745 /// // Since the sockets are paired, the paired send/recv
746 /// // functions can be used
747 /// let bytes = b"hello world";
748 /// sock1.send(bytes).await?;
749 ///
750 /// let mut buff = vec![0u8; 24];
751 /// let size = sock2.recv(&mut buff).await?;
752 ///
753 /// let dgram = &buff[..size];
754 /// assert_eq!(dgram, bytes);
755 ///
756 /// # Ok(())
757 /// # }
758 /// ```
759 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
760 self.io
761 .registration()
762 .async_io(Interest::READABLE, || self.io.recv(buf))
763 .await
764 }
765
766 /// Tries to receive a datagram from the peer without waiting.
767 ///
768 /// # Examples
769 ///
770 /// ```no_run
771 /// use tokio::net::UnixDatagram;
772 /// use std::io;
773 ///
774 /// #[tokio::main]
775 /// async fn main() -> io::Result<()> {
776 /// // Connect to a peer
777 /// let dir = tempfile::tempdir().unwrap();
778 /// let client_path = dir.path().join("client.sock");
779 /// let server_path = dir.path().join("server.sock");
780 /// let socket = UnixDatagram::bind(&client_path)?;
781 /// socket.connect(&server_path)?;
782 ///
783 /// loop {
784 /// // Wait for the socket to be readable
785 /// socket.readable().await?;
786 ///
787 /// // The buffer is **not** included in the async task and will
788 /// // only exist on the stack.
789 /// let mut buf = [0; 1024];
790 ///
791 /// // Try to recv data, this may still fail with `WouldBlock`
792 /// // if the readiness event is a false positive.
793 /// match socket.try_recv(&mut buf) {
794 /// Ok(n) => {
795 /// println!("GOT {:?}", &buf[..n]);
796 /// break;
797 /// }
798 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
799 /// continue;
800 /// }
801 /// Err(e) => {
802 /// return Err(e);
803 /// }
804 /// }
805 /// }
806 ///
807 /// Ok(())
808 /// }
809 /// ```
810 pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
811 self.io
812 .registration()
813 .try_io(Interest::READABLE, || self.io.recv(buf))
814 }
815
816 cfg_io_util! {
817 /// Tries to receive data from the socket without waiting.
818 ///
819 /// This method can be used even if `buf` is uninitialized.
820 ///
821 /// # Examples
822 ///
823 /// ```no_run
824 /// use tokio::net::UnixDatagram;
825 /// use std::io;
826 ///
827 /// #[tokio::main]
828 /// async fn main() -> io::Result<()> {
829 /// // Connect to a peer
830 /// let dir = tempfile::tempdir().unwrap();
831 /// let client_path = dir.path().join("client.sock");
832 /// let server_path = dir.path().join("server.sock");
833 /// let socket = UnixDatagram::bind(&client_path)?;
834 ///
835 /// loop {
836 /// // Wait for the socket to be readable
837 /// socket.readable().await?;
838 ///
839 /// let mut buf = Vec::with_capacity(1024);
840 ///
841 /// // Try to recv data, this may still fail with `WouldBlock`
842 /// // if the readiness event is a false positive.
843 /// match socket.try_recv_buf_from(&mut buf) {
844 /// Ok((n, _addr)) => {
845 /// println!("GOT {:?}", &buf[..n]);
846 /// break;
847 /// }
848 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
849 /// continue;
850 /// }
851 /// Err(e) => {
852 /// return Err(e);
853 /// }
854 /// }
855 /// }
856 ///
857 /// Ok(())
858 /// }
859 /// ```
860 pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
861 let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
862 let dst = buf.chunk_mut();
863 let dst =
864 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
865
866 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
867 // buffer.
868 let (n, addr) = (*self.io).recv_from(dst)?;
869
870 unsafe {
871 buf.advance_mut(n);
872 }
873
874 Ok((n, addr))
875 })?;
876
877 Ok((n, SocketAddr(addr)))
878 }
879
880 /// Receives from the socket, advances the
881 /// buffer's internal cursor and returns how many bytes were read and the origin.
882 ///
883 /// This method can be used even if `buf` is uninitialized.
884 ///
885 /// # Examples
886 /// ```
887 /// # use std::error::Error;
888 /// # #[tokio::main]
889 /// # async fn main() -> Result<(), Box<dyn Error>> {
890 /// use tokio::net::UnixDatagram;
891 /// use tempfile::tempdir;
892 ///
893 /// // We use a temporary directory so that the socket
894 /// // files left by the bound sockets will get cleaned up.
895 /// let tmp = tempdir()?;
896 ///
897 /// // Bind each socket to a filesystem path
898 /// let tx_path = tmp.path().join("tx");
899 /// let tx = UnixDatagram::bind(&tx_path)?;
900 /// let rx_path = tmp.path().join("rx");
901 /// let rx = UnixDatagram::bind(&rx_path)?;
902 ///
903 /// let bytes = b"hello world";
904 /// tx.send_to(bytes, &rx_path).await?;
905 ///
906 /// let mut buf = Vec::with_capacity(24);
907 /// let (size, addr) = rx.recv_buf_from(&mut buf).await?;
908 ///
909 /// let dgram = &buf[..size];
910 /// assert_eq!(dgram, bytes);
911 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
912 ///
913 /// # Ok(())
914 /// # }
915 /// ```
916 pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
917 self.io.registration().async_io(Interest::READABLE, || {
918 let dst = buf.chunk_mut();
919 let dst =
920 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
921
922 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
923 // buffer.
924 let (n, addr) = (*self.io).recv_from(dst)?;
925
926 unsafe {
927 buf.advance_mut(n);
928 }
929 Ok((n,SocketAddr(addr)))
930 }).await
931 }
932
933 /// Tries to read data from the stream into the provided buffer, advancing the
934 /// buffer's internal cursor, returning how many bytes were read.
935 ///
936 /// This method can be used even if `buf` is uninitialized.
937 ///
938 /// # Examples
939 ///
940 /// ```no_run
941 /// use tokio::net::UnixDatagram;
942 /// use std::io;
943 ///
944 /// #[tokio::main]
945 /// async fn main() -> io::Result<()> {
946 /// // Connect to a peer
947 /// let dir = tempfile::tempdir().unwrap();
948 /// let client_path = dir.path().join("client.sock");
949 /// let server_path = dir.path().join("server.sock");
950 /// let socket = UnixDatagram::bind(&client_path)?;
951 /// socket.connect(&server_path)?;
952 ///
953 /// loop {
954 /// // Wait for the socket to be readable
955 /// socket.readable().await?;
956 ///
957 /// let mut buf = Vec::with_capacity(1024);
958 ///
959 /// // Try to recv data, this may still fail with `WouldBlock`
960 /// // if the readiness event is a false positive.
961 /// match socket.try_recv_buf(&mut buf) {
962 /// Ok(n) => {
963 /// println!("GOT {:?}", &buf[..n]);
964 /// break;
965 /// }
966 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
967 /// continue;
968 /// }
969 /// Err(e) => {
970 /// return Err(e);
971 /// }
972 /// }
973 /// }
974 ///
975 /// Ok(())
976 /// }
977 /// ```
978 pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
979 self.io.registration().try_io(Interest::READABLE, || {
980 let dst = buf.chunk_mut();
981 let dst =
982 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
983
984 // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
985 // buffer.
986 let n = (*self.io).recv(dst)?;
987
988 unsafe {
989 buf.advance_mut(n);
990 }
991
992 Ok(n)
993 })
994 }
995
996 /// Receives data from the socket from the address to which it is connected,
997 /// advancing the buffer's internal cursor, returning how many bytes were read.
998 ///
999 /// This method can be used even if `buf` is uninitialized.
1000 ///
1001 /// # Examples
1002 /// ```
1003 /// # use std::error::Error;
1004 /// # #[tokio::main]
1005 /// # async fn main() -> Result<(), Box<dyn Error>> {
1006 /// use tokio::net::UnixDatagram;
1007 ///
1008 /// // Create the pair of sockets
1009 /// let (sock1, sock2) = UnixDatagram::pair()?;
1010 ///
1011 /// // Since the sockets are paired, the paired send/recv
1012 /// // functions can be used
1013 /// let bytes = b"hello world";
1014 /// sock1.send(bytes).await?;
1015 ///
1016 /// let mut buff = Vec::with_capacity(24);
1017 /// let size = sock2.recv_buf(&mut buff).await?;
1018 ///
1019 /// let dgram = &buff[..size];
1020 /// assert_eq!(dgram, bytes);
1021 ///
1022 /// # Ok(())
1023 /// # }
1024 /// ```
1025 pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1026 self.io.registration().async_io(Interest::READABLE, || {
1027 let dst = buf.chunk_mut();
1028 let dst =
1029 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1030
1031 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
1032 // buffer.
1033 let n = (*self.io).recv(dst)?;
1034
1035 unsafe {
1036 buf.advance_mut(n);
1037 }
1038 Ok(n)
1039 }).await
1040 }
1041 }
1042
1043 /// Sends data on the socket to the specified address.
1044 ///
1045 /// # Cancel safety
1046 ///
1047 /// This method is cancel safe. If `send_to` is used as the event in a
1048 /// [`tokio::select!`](crate::select) statement and some other branch
1049 /// completes first, then it is guaranteed that the message was not sent.
1050 ///
1051 /// # Examples
1052 /// ```
1053 /// # use std::error::Error;
1054 /// # #[tokio::main]
1055 /// # async fn main() -> Result<(), Box<dyn Error>> {
1056 /// use tokio::net::UnixDatagram;
1057 /// use tempfile::tempdir;
1058 ///
1059 /// // We use a temporary directory so that the socket
1060 /// // files left by the bound sockets will get cleaned up.
1061 /// let tmp = tempdir()?;
1062 ///
1063 /// // Bind each socket to a filesystem path
1064 /// let tx_path = tmp.path().join("tx");
1065 /// let tx = UnixDatagram::bind(&tx_path)?;
1066 /// let rx_path = tmp.path().join("rx");
1067 /// let rx = UnixDatagram::bind(&rx_path)?;
1068 ///
1069 /// let bytes = b"hello world";
1070 /// tx.send_to(bytes, &rx_path).await?;
1071 ///
1072 /// let mut buf = vec![0u8; 24];
1073 /// let (size, addr) = rx.recv_from(&mut buf).await?;
1074 ///
1075 /// let dgram = &buf[..size];
1076 /// assert_eq!(dgram, bytes);
1077 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1078 ///
1079 /// # Ok(())
1080 /// # }
1081 /// ```
1082 pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
1083 where
1084 P: AsRef<Path>,
1085 {
1086 self.io
1087 .registration()
1088 .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref()))
1089 .await
1090 }
1091
1092 /// Receives data from the socket.
1093 ///
1094 /// # Cancel safety
1095 ///
1096 /// This method is cancel safe. If `recv_from` is used as the event in a
1097 /// [`tokio::select!`](crate::select) statement and some other branch
1098 /// completes first, it is guaranteed that no messages were received on this
1099 /// socket.
1100 ///
1101 /// # Examples
1102 /// ```
1103 /// # use std::error::Error;
1104 /// # #[tokio::main]
1105 /// # async fn main() -> Result<(), Box<dyn Error>> {
1106 /// use tokio::net::UnixDatagram;
1107 /// use tempfile::tempdir;
1108 ///
1109 /// // We use a temporary directory so that the socket
1110 /// // files left by the bound sockets will get cleaned up.
1111 /// let tmp = tempdir()?;
1112 ///
1113 /// // Bind each socket to a filesystem path
1114 /// let tx_path = tmp.path().join("tx");
1115 /// let tx = UnixDatagram::bind(&tx_path)?;
1116 /// let rx_path = tmp.path().join("rx");
1117 /// let rx = UnixDatagram::bind(&rx_path)?;
1118 ///
1119 /// let bytes = b"hello world";
1120 /// tx.send_to(bytes, &rx_path).await?;
1121 ///
1122 /// let mut buf = vec![0u8; 24];
1123 /// let (size, addr) = rx.recv_from(&mut buf).await?;
1124 ///
1125 /// let dgram = &buf[..size];
1126 /// assert_eq!(dgram, bytes);
1127 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1128 ///
1129 /// # Ok(())
1130 /// # }
1131 /// ```
1132 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1133 let (n, addr) = self
1134 .io
1135 .registration()
1136 .async_io(Interest::READABLE, || self.io.recv_from(buf))
1137 .await?;
1138
1139 Ok((n, SocketAddr(addr)))
1140 }
1141
1142 /// Attempts to receive a single datagram on the specified address.
1143 ///
1144 /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1145 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1146 /// receive a wakeup.
1147 ///
1148 /// # Return value
1149 ///
1150 /// The function returns:
1151 ///
1152 /// * `Poll::Pending` if the socket is not ready to read
1153 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1154 /// * `Poll::Ready(Err(e))` if an error is encountered.
1155 ///
1156 /// # Errors
1157 ///
1158 /// This function may encounter any standard I/O error except `WouldBlock`.
1159 pub fn poll_recv_from(
1160 &self,
1161 cx: &mut Context<'_>,
1162 buf: &mut ReadBuf<'_>,
1163 ) -> Poll<io::Result<SocketAddr>> {
1164 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1165 // Safety: will not read the maybe uninitialized bytes.
1166 let b = unsafe {
1167 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1168 };
1169
1170 self.io.recv_from(b)
1171 }))?;
1172
1173 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1174 unsafe {
1175 buf.assume_init(n);
1176 }
1177 buf.advance(n);
1178 Poll::Ready(Ok(SocketAddr(addr)))
1179 }
1180
1181 /// Attempts to send data to the specified address.
1182 ///
1183 /// Note that on multiple calls to a `poll_*` method in the send direction, only the
1184 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1185 /// receive a wakeup.
1186 ///
1187 /// # Return value
1188 ///
1189 /// The function returns:
1190 ///
1191 /// * `Poll::Pending` if the socket is not ready to write
1192 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
1193 /// * `Poll::Ready(Err(e))` if an error is encountered.
1194 ///
1195 /// # Errors
1196 ///
1197 /// This function may encounter any standard I/O error except `WouldBlock`.
1198 pub fn poll_send_to<P>(
1199 &self,
1200 cx: &mut Context<'_>,
1201 buf: &[u8],
1202 target: P,
1203 ) -> Poll<io::Result<usize>>
1204 where
1205 P: AsRef<Path>,
1206 {
1207 self.io
1208 .registration()
1209 .poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
1210 }
1211
1212 /// Attempts to send data on the socket to the remote address to which it
1213 /// was previously `connect`ed.
1214 ///
1215 /// The [`connect`] method will connect this socket to a remote address.
1216 /// This method will fail if the socket is not connected.
1217 ///
1218 /// Note that on multiple calls to a `poll_*` method in the send direction,
1219 /// only the `Waker` from the `Context` passed to the most recent call will
1220 /// be scheduled to receive a wakeup.
1221 ///
1222 /// # Return value
1223 ///
1224 /// The function returns:
1225 ///
1226 /// * `Poll::Pending` if the socket is not available to write
1227 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
1228 /// * `Poll::Ready(Err(e))` if an error is encountered.
1229 ///
1230 /// # Errors
1231 ///
1232 /// This function may encounter any standard I/O error except `WouldBlock`.
1233 ///
1234 /// [`connect`]: method@Self::connect
1235 pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
1236 self.io
1237 .registration()
1238 .poll_write_io(cx, || self.io.send(buf))
1239 }
1240
1241 /// Attempts to receive a single datagram message on the socket from the remote
1242 /// address to which it is `connect`ed.
1243 ///
1244 /// The [`connect`] method will connect this socket to a remote address. This method
1245 /// resolves to an error if the socket is not connected.
1246 ///
1247 /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1248 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1249 /// receive a wakeup.
1250 ///
1251 /// # Return value
1252 ///
1253 /// The function returns:
1254 ///
1255 /// * `Poll::Pending` if the socket is not ready to read
1256 /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
1257 /// * `Poll::Ready(Err(e))` if an error is encountered.
1258 ///
1259 /// # Errors
1260 ///
1261 /// This function may encounter any standard I/O error except `WouldBlock`.
1262 ///
1263 /// [`connect`]: method@Self::connect
1264 pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
1265 let n = ready!(self.io.registration().poll_read_io(cx, || {
1266 // Safety: will not read the maybe uninitialized bytes.
1267 let b = unsafe {
1268 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1269 };
1270
1271 self.io.recv(b)
1272 }))?;
1273
1274 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1275 unsafe {
1276 buf.assume_init(n);
1277 }
1278 buf.advance(n);
1279 Poll::Ready(Ok(()))
1280 }
1281
1282 /// Tries to receive data from the socket without waiting.
1283 ///
1284 /// # Examples
1285 ///
1286 /// ```no_run
1287 /// use tokio::net::UnixDatagram;
1288 /// use std::io;
1289 ///
1290 /// #[tokio::main]
1291 /// async fn main() -> io::Result<()> {
1292 /// // Connect to a peer
1293 /// let dir = tempfile::tempdir().unwrap();
1294 /// let client_path = dir.path().join("client.sock");
1295 /// let server_path = dir.path().join("server.sock");
1296 /// let socket = UnixDatagram::bind(&client_path)?;
1297 ///
1298 /// loop {
1299 /// // Wait for the socket to be readable
1300 /// socket.readable().await?;
1301 ///
1302 /// // The buffer is **not** included in the async task and will
1303 /// // only exist on the stack.
1304 /// let mut buf = [0; 1024];
1305 ///
1306 /// // Try to recv data, this may still fail with `WouldBlock`
1307 /// // if the readiness event is a false positive.
1308 /// match socket.try_recv_from(&mut buf) {
1309 /// Ok((n, _addr)) => {
1310 /// println!("GOT {:?}", &buf[..n]);
1311 /// break;
1312 /// }
1313 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1314 /// continue;
1315 /// }
1316 /// Err(e) => {
1317 /// return Err(e);
1318 /// }
1319 /// }
1320 /// }
1321 ///
1322 /// Ok(())
1323 /// }
1324 /// ```
1325 pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1326 let (n, addr) = self
1327 .io
1328 .registration()
1329 .try_io(Interest::READABLE, || self.io.recv_from(buf))?;
1330
1331 Ok((n, SocketAddr(addr)))
1332 }
1333
1334 /// Tries to read or write from the socket using a user-provided IO operation.
1335 ///
1336 /// If the socket is ready, the provided closure is called. The closure
1337 /// should attempt to perform IO operation on the socket by manually
1338 /// calling the appropriate syscall. If the operation fails because the
1339 /// socket is not actually ready, then the closure should return a
1340 /// `WouldBlock` error and the readiness flag is cleared. The return value
1341 /// of the closure is then returned by `try_io`.
1342 ///
1343 /// If the socket is not ready, then the closure is not called
1344 /// and a `WouldBlock` error is returned.
1345 ///
1346 /// The closure should only return a `WouldBlock` error if it has performed
1347 /// an IO operation on the socket that failed due to the socket not being
1348 /// ready. Returning a `WouldBlock` error in any other situation will
1349 /// incorrectly clear the readiness flag, which can cause the socket to
1350 /// behave incorrectly.
1351 ///
1352 /// The closure should not perform the IO operation using any of the methods
1353 /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1354 /// readiness flag and can cause the socket to behave incorrectly.
1355 ///
1356 /// This method is not intended to be used with combined interests.
1357 /// The closure should perform only one type of IO operation, so it should not
1358 /// require more than one ready state. This method may panic or sleep forever
1359 /// if it is called with a combined interest.
1360 ///
1361 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1362 ///
1363 /// [`readable()`]: UnixDatagram::readable()
1364 /// [`writable()`]: UnixDatagram::writable()
1365 /// [`ready()`]: UnixDatagram::ready()
1366 pub fn try_io<R>(
1367 &self,
1368 interest: Interest,
1369 f: impl FnOnce() -> io::Result<R>,
1370 ) -> io::Result<R> {
1371 self.io
1372 .registration()
1373 .try_io(interest, || self.io.try_io(f))
1374 }
1375
1376 /// Reads or writes from the socket using a user-provided IO operation.
1377 ///
1378 /// The readiness of the socket is awaited and when the socket is ready,
1379 /// the provided closure is called. The closure should attempt to perform
1380 /// IO operation on the socket by manually calling the appropriate syscall.
1381 /// If the operation fails because the socket is not actually ready,
1382 /// then the closure should return a `WouldBlock` error. In such case the
1383 /// readiness flag is cleared and the socket readiness is awaited again.
1384 /// This loop is repeated until the closure returns an `Ok` or an error
1385 /// other than `WouldBlock`.
1386 ///
1387 /// The closure should only return a `WouldBlock` error if it has performed
1388 /// an IO operation on the socket that failed due to the socket not being
1389 /// ready. Returning a `WouldBlock` error in any other situation will
1390 /// incorrectly clear the readiness flag, which can cause the socket to
1391 /// behave incorrectly.
1392 ///
1393 /// The closure should not perform the IO operation using any of the methods
1394 /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1395 /// readiness flag and can cause the socket to behave incorrectly.
1396 ///
1397 /// This method is not intended to be used with combined interests.
1398 /// The closure should perform only one type of IO operation, so it should not
1399 /// require more than one ready state. This method may panic or sleep forever
1400 /// if it is called with a combined interest.
1401 pub async fn async_io<R>(
1402 &self,
1403 interest: Interest,
1404 mut f: impl FnMut() -> io::Result<R>,
1405 ) -> io::Result<R> {
1406 self.io
1407 .registration()
1408 .async_io(interest, || self.io.try_io(&mut f))
1409 .await
1410 }
1411
1412 /// Returns the local address that this socket is bound to.
1413 ///
1414 /// # Examples
1415 /// For a socket bound to a local path
1416 /// ```
1417 /// # use std::error::Error;
1418 /// # #[tokio::main]
1419 /// # async fn main() -> Result<(), Box<dyn Error>> {
1420 /// use tokio::net::UnixDatagram;
1421 /// use tempfile::tempdir;
1422 ///
1423 /// // We use a temporary directory so that the socket
1424 /// // files left by the bound sockets will get cleaned up.
1425 /// let tmp = tempdir()?;
1426 ///
1427 /// // Bind socket to a filesystem path
1428 /// let socket_path = tmp.path().join("socket");
1429 /// let socket = UnixDatagram::bind(&socket_path)?;
1430 ///
1431 /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path);
1432 ///
1433 /// # Ok(())
1434 /// # }
1435 /// ```
1436 ///
1437 /// For an unbound socket
1438 /// ```
1439 /// # use std::error::Error;
1440 /// # #[tokio::main]
1441 /// # async fn main() -> Result<(), Box<dyn Error>> {
1442 /// use tokio::net::UnixDatagram;
1443 ///
1444 /// // Create an unbound socket
1445 /// let socket = UnixDatagram::unbound()?;
1446 ///
1447 /// assert!(socket.local_addr()?.is_unnamed());
1448 ///
1449 /// # Ok(())
1450 /// # }
1451 /// ```
1452 pub fn local_addr(&self) -> io::Result<SocketAddr> {
1453 self.io.local_addr().map(SocketAddr)
1454 }
1455
1456 /// Returns the address of this socket's peer.
1457 ///
1458 /// The `connect` method will connect the socket to a peer.
1459 ///
1460 /// # Examples
1461 /// For a peer with a local path
1462 /// ```
1463 /// # use std::error::Error;
1464 /// # #[tokio::main]
1465 /// # async fn main() -> Result<(), Box<dyn Error>> {
1466 /// use tokio::net::UnixDatagram;
1467 /// use tempfile::tempdir;
1468 ///
1469 /// // Create an unbound socket
1470 /// let tx = UnixDatagram::unbound()?;
1471 ///
1472 /// // Create another, bound socket
1473 /// let tmp = tempdir()?;
1474 /// let rx_path = tmp.path().join("rx");
1475 /// let rx = UnixDatagram::bind(&rx_path)?;
1476 ///
1477 /// // Connect to the bound socket
1478 /// tx.connect(&rx_path)?;
1479 ///
1480 /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path);
1481 ///
1482 /// # Ok(())
1483 /// # }
1484 /// ```
1485 ///
1486 /// For an unbound peer
1487 /// ```
1488 /// # use std::error::Error;
1489 /// # #[tokio::main]
1490 /// # async fn main() -> Result<(), Box<dyn Error>> {
1491 /// use tokio::net::UnixDatagram;
1492 ///
1493 /// // Create the pair of sockets
1494 /// let (sock1, sock2) = UnixDatagram::pair()?;
1495 ///
1496 /// assert!(sock1.peer_addr()?.is_unnamed());
1497 ///
1498 /// # Ok(())
1499 /// # }
1500 /// ```
1501 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
1502 self.io.peer_addr().map(SocketAddr)
1503 }
1504
1505 /// Returns the value of the `SO_ERROR` option.
1506 ///
1507 /// # Examples
1508 /// ```
1509 /// # use std::error::Error;
1510 /// # #[tokio::main]
1511 /// # async fn main() -> Result<(), Box<dyn Error>> {
1512 /// use tokio::net::UnixDatagram;
1513 ///
1514 /// // Create an unbound socket
1515 /// let socket = UnixDatagram::unbound()?;
1516 ///
1517 /// if let Ok(Some(err)) = socket.take_error() {
1518 /// println!("Got error: {:?}", err);
1519 /// }
1520 ///
1521 /// # Ok(())
1522 /// # }
1523 /// ```
1524 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1525 self.io.take_error()
1526 }
1527
1528 /// Shuts down the read, write, or both halves of this connection.
1529 ///
1530 /// This function will cause all pending and future I/O calls on the
1531 /// specified portions to immediately return with an appropriate value
1532 /// (see the documentation of `Shutdown`).
1533 ///
1534 /// # Examples
1535 /// ```
1536 /// # use std::error::Error;
1537 /// # #[tokio::main]
1538 /// # async fn main() -> Result<(), Box<dyn Error>> {
1539 /// use tokio::net::UnixDatagram;
1540 /// use std::net::Shutdown;
1541 ///
1542 /// // Create an unbound socket
1543 /// let (socket, other) = UnixDatagram::pair()?;
1544 ///
1545 /// socket.shutdown(Shutdown::Both)?;
1546 ///
1547 /// // NOTE: the following commented out code does NOT work as expected.
1548 /// // Due to an underlying issue, the recv call will block indefinitely.
1549 /// // See: https://github.com/tokio-rs/tokio/issues/1679
1550 /// //let mut buff = vec![0u8; 24];
1551 /// //let size = socket.recv(&mut buff).await?;
1552 /// //assert_eq!(size, 0);
1553 ///
1554 /// let send_result = socket.send(b"hello world").await;
1555 /// assert!(send_result.is_err());
1556 ///
1557 /// # Ok(())
1558 /// # }
1559 /// ```
1560 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
1561 self.io.shutdown(how)
1562 }
1563}
1564
1565impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
1566 type Error = io::Error;
1567
1568 /// Consumes stream, returning the Tokio I/O object.
1569 ///
1570 /// This is equivalent to
1571 /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
1572 fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> {
1573 Self::from_std(datagram:stream)
1574 }
1575}
1576
1577impl fmt::Debug for UnixDatagram {
1578 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1579 self.io.fmt(f)
1580 }
1581}
1582
1583impl AsRawFd for UnixDatagram {
1584 fn as_raw_fd(&self) -> RawFd {
1585 self.io.as_raw_fd()
1586 }
1587}
1588
1589impl AsFd for UnixDatagram {
1590 fn as_fd(&self) -> BorrowedFd<'_> {
1591 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1592 }
1593}
1594