1 | use crate::io::{Interest, PollEvented, ReadBuf, Ready}; |
2 | use crate::net::unix::SocketAddr; |
3 | use crate::util::check_socket_for_blocking; |
4 | |
5 | use std::fmt; |
6 | use std::io; |
7 | use std::net::Shutdown; |
8 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; |
9 | use std::os::unix::net; |
10 | use std::path::Path; |
11 | use std::task::{ready, Context, Poll}; |
12 | |
13 | cfg_io_util! { |
14 | use bytes::BufMut; |
15 | } |
16 | |
17 | cfg_net_unix! { |
18 | /// An I/O object representing a Unix datagram socket. |
19 | /// |
20 | /// A socket can be either named (associated with a filesystem path) or |
21 | /// unnamed. |
22 | /// |
23 | /// This type does not provide a `split` method, because this functionality |
24 | /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do |
25 | /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>` |
26 | /// is enough. This is because all of the methods take `&self` instead of |
27 | /// `&mut self`. |
28 | /// |
29 | /// **Note:** named sockets are persisted even after the object is dropped |
30 | /// and the program has exited, and cannot be reconnected. It is advised |
31 | /// that you either check for and unlink the existing socket if it exists, |
32 | /// or use a temporary file that is guaranteed to not already exist. |
33 | /// |
34 | /// [`Arc`]: std::sync::Arc |
35 | /// |
36 | /// # Examples |
37 | /// Using named sockets, associated with a filesystem path: |
38 | /// ``` |
39 | /// # if cfg!(miri) { return } // No `socket` in miri. |
40 | /// # use std::error::Error; |
41 | /// # #[tokio::main] |
42 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
43 | /// use tokio::net::UnixDatagram; |
44 | /// use tempfile::tempdir; |
45 | /// |
46 | /// // We use a temporary directory so that the socket |
47 | /// // files left by the bound sockets will get cleaned up. |
48 | /// let tmp = tempdir()?; |
49 | /// |
50 | /// // Bind each socket to a filesystem path |
51 | /// let tx_path = tmp.path().join("tx"); |
52 | /// let tx = UnixDatagram::bind(&tx_path)?; |
53 | /// let rx_path = tmp.path().join("rx"); |
54 | /// let rx = UnixDatagram::bind(&rx_path)?; |
55 | /// |
56 | /// let bytes = b"hello world"; |
57 | /// tx.send_to(bytes, &rx_path).await?; |
58 | /// |
59 | /// let mut buf = vec![0u8; 24]; |
60 | /// let (size, addr) = rx.recv_from(&mut buf).await?; |
61 | /// |
62 | /// let dgram = &buf[..size]; |
63 | /// assert_eq!(dgram, bytes); |
64 | /// assert_eq!(addr.as_pathname().unwrap(), &tx_path); |
65 | /// |
66 | /// # Ok(()) |
67 | /// # } |
68 | /// ``` |
69 | /// |
70 | /// Using unnamed sockets, created as a pair |
71 | /// ``` |
72 | /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri. |
73 | /// # use std::error::Error; |
74 | /// # #[tokio::main] |
75 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
76 | /// use tokio::net::UnixDatagram; |
77 | /// |
78 | /// // Create the pair of sockets |
79 | /// let (sock1, sock2) = UnixDatagram::pair()?; |
80 | /// |
81 | /// // Since the sockets are paired, the paired send/recv |
82 | /// // functions can be used |
83 | /// let bytes = b"hello world"; |
84 | /// sock1.send(bytes).await?; |
85 | /// |
86 | /// let mut buff = vec![0u8; 24]; |
87 | /// let size = sock2.recv(&mut buff).await?; |
88 | /// |
89 | /// let dgram = &buff[..size]; |
90 | /// assert_eq!(dgram, bytes); |
91 | /// |
92 | /// # Ok(()) |
93 | /// # } |
94 | /// ``` |
95 | #[cfg_attr (docsrs, doc(alias = "uds" ))] |
96 | pub struct UnixDatagram { |
97 | io: PollEvented<mio::net::UnixDatagram>, |
98 | } |
99 | } |
100 | |
101 | impl UnixDatagram { |
102 | pub(crate) fn from_mio(sys: mio::net::UnixDatagram) -> io::Result<UnixDatagram> { |
103 | let datagram = UnixDatagram::new(sys)?; |
104 | |
105 | if let Some(e) = datagram.io.take_error()? { |
106 | return Err(e); |
107 | } |
108 | |
109 | Ok(datagram) |
110 | } |
111 | |
112 | /// Waits for any of the requested ready states. |
113 | /// |
114 | /// This function is usually paired with `try_recv()` or `try_send()`. It |
115 | /// can be used to concurrently `recv` / `send` to the same socket on a single |
116 | /// task without splitting the socket. |
117 | /// |
118 | /// The function may complete without the socket being ready. This is a |
119 | /// false-positive and attempting an operation will return with |
120 | /// `io::ErrorKind::WouldBlock`. The function can also return with an empty |
121 | /// [`Ready`] set, so you should always check the returned value and possibly |
122 | /// wait again if the requested states are not set. |
123 | /// |
124 | /// # Cancel safety |
125 | /// |
126 | /// This method is cancel safe. Once a readiness event occurs, the method |
127 | /// will continue to return immediately until the readiness event is |
128 | /// consumed by an attempt to read or write that fails with `WouldBlock` or |
129 | /// `Poll::Pending`. |
130 | /// |
131 | /// # Examples |
132 | /// |
133 | /// Concurrently receive from and send to the socket on the same task |
134 | /// without splitting. |
135 | /// |
136 | /// ```no_run |
137 | /// use tokio::io::Interest; |
138 | /// use tokio::net::UnixDatagram; |
139 | /// use std::io; |
140 | /// |
141 | /// #[tokio::main] |
142 | /// async fn main() -> io::Result<()> { |
143 | /// let dir = tempfile::tempdir().unwrap(); |
144 | /// let client_path = dir.path().join("client.sock" ); |
145 | /// let server_path = dir.path().join("server.sock" ); |
146 | /// let socket = UnixDatagram::bind(&client_path)?; |
147 | /// socket.connect(&server_path)?; |
148 | /// |
149 | /// loop { |
150 | /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?; |
151 | /// |
152 | /// if ready.is_readable() { |
153 | /// let mut data = [0; 1024]; |
154 | /// match socket.try_recv(&mut data[..]) { |
155 | /// Ok(n) => { |
156 | /// println!("received {:?}" , &data[..n]); |
157 | /// } |
158 | /// // False-positive, continue |
159 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} |
160 | /// Err(e) => { |
161 | /// return Err(e); |
162 | /// } |
163 | /// } |
164 | /// } |
165 | /// |
166 | /// if ready.is_writable() { |
167 | /// // Write some data |
168 | /// match socket.try_send(b"hello world" ) { |
169 | /// Ok(n) => { |
170 | /// println!("sent {} bytes" , n); |
171 | /// } |
172 | /// // False-positive, continue |
173 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} |
174 | /// Err(e) => { |
175 | /// return Err(e); |
176 | /// } |
177 | /// } |
178 | /// } |
179 | /// } |
180 | /// } |
181 | /// ``` |
182 | pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { |
183 | let event = self.io.registration().readiness(interest).await?; |
184 | Ok(event.ready) |
185 | } |
186 | |
187 | /// Waits for the socket to become writable. |
188 | /// |
189 | /// This function is equivalent to `ready(Interest::WRITABLE)` and is |
190 | /// usually paired with `try_send()` or `try_send_to()`. |
191 | /// |
192 | /// The function may complete without the socket being writable. This is a |
193 | /// false-positive and attempting a `try_send()` will return with |
194 | /// `io::ErrorKind::WouldBlock`. |
195 | /// |
196 | /// # Cancel safety |
197 | /// |
198 | /// This method is cancel safe. Once a readiness event occurs, the method |
199 | /// will continue to return immediately until the readiness event is |
200 | /// consumed by an attempt to write that fails with `WouldBlock` or |
201 | /// `Poll::Pending`. |
202 | /// |
203 | /// # Examples |
204 | /// |
205 | /// ```no_run |
206 | /// use tokio::net::UnixDatagram; |
207 | /// use std::io; |
208 | /// |
209 | /// #[tokio::main] |
210 | /// async fn main() -> io::Result<()> { |
211 | /// let dir = tempfile::tempdir().unwrap(); |
212 | /// let client_path = dir.path().join("client.sock" ); |
213 | /// let server_path = dir.path().join("server.sock" ); |
214 | /// let socket = UnixDatagram::bind(&client_path)?; |
215 | /// socket.connect(&server_path)?; |
216 | /// |
217 | /// loop { |
218 | /// // Wait for the socket to be writable |
219 | /// socket.writable().await?; |
220 | /// |
221 | /// // Try to send data, this may still fail with `WouldBlock` |
222 | /// // if the readiness event is a false positive. |
223 | /// match socket.try_send(b"hello world" ) { |
224 | /// Ok(n) => { |
225 | /// break; |
226 | /// } |
227 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
228 | /// continue; |
229 | /// } |
230 | /// Err(e) => { |
231 | /// return Err(e); |
232 | /// } |
233 | /// } |
234 | /// } |
235 | /// |
236 | /// Ok(()) |
237 | /// } |
238 | /// ``` |
239 | pub async fn writable(&self) -> io::Result<()> { |
240 | self.ready(Interest::WRITABLE).await?; |
241 | Ok(()) |
242 | } |
243 | |
244 | /// Polls for write/send readiness. |
245 | /// |
246 | /// If the socket is not currently ready for sending, this method will |
247 | /// store a clone of the `Waker` from the provided `Context`. When the socket |
248 | /// becomes ready for sending, `Waker::wake` will be called on the |
249 | /// waker. |
250 | /// |
251 | /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only |
252 | /// the `Waker` from the `Context` passed to the most recent call is |
253 | /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a |
254 | /// second, independent waker.) |
255 | /// |
256 | /// This function is intended for cases where creating and pinning a future |
257 | /// via [`writable`] is not feasible. Where possible, using [`writable`] is |
258 | /// preferred, as this supports polling from multiple tasks at once. |
259 | /// |
260 | /// # Return value |
261 | /// |
262 | /// The function returns: |
263 | /// |
264 | /// * `Poll::Pending` if the socket is not ready for writing. |
265 | /// * `Poll::Ready(Ok(()))` if the socket is ready for writing. |
266 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
267 | /// |
268 | /// # Errors |
269 | /// |
270 | /// This function may encounter any standard I/O error except `WouldBlock`. |
271 | /// |
272 | /// [`writable`]: method@Self::writable |
273 | pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
274 | self.io.registration().poll_write_ready(cx).map_ok(|_| ()) |
275 | } |
276 | |
277 | /// Waits for the socket to become readable. |
278 | /// |
279 | /// This function is equivalent to `ready(Interest::READABLE)` and is usually |
280 | /// paired with `try_recv()`. |
281 | /// |
282 | /// The function may complete without the socket being readable. This is a |
283 | /// false-positive and attempting a `try_recv()` will return with |
284 | /// `io::ErrorKind::WouldBlock`. |
285 | /// |
286 | /// # Cancel safety |
287 | /// |
288 | /// This method is cancel safe. Once a readiness event occurs, the method |
289 | /// will continue to return immediately until the readiness event is |
290 | /// consumed by an attempt to read that fails with `WouldBlock` or |
291 | /// `Poll::Pending`. |
292 | /// |
293 | /// # Examples |
294 | /// |
295 | /// ```no_run |
296 | /// use tokio::net::UnixDatagram; |
297 | /// use std::io; |
298 | /// |
299 | /// #[tokio::main] |
300 | /// async fn main() -> io::Result<()> { |
301 | /// // Connect to a peer |
302 | /// let dir = tempfile::tempdir().unwrap(); |
303 | /// let client_path = dir.path().join("client.sock" ); |
304 | /// let server_path = dir.path().join("server.sock" ); |
305 | /// let socket = UnixDatagram::bind(&client_path)?; |
306 | /// socket.connect(&server_path)?; |
307 | /// |
308 | /// loop { |
309 | /// // Wait for the socket to be readable |
310 | /// socket.readable().await?; |
311 | /// |
312 | /// // The buffer is **not** included in the async task and will |
313 | /// // only exist on the stack. |
314 | /// let mut buf = [0; 1024]; |
315 | /// |
316 | /// // Try to recv data, this may still fail with `WouldBlock` |
317 | /// // if the readiness event is a false positive. |
318 | /// match socket.try_recv(&mut buf) { |
319 | /// Ok(n) => { |
320 | /// println!("GOT {:?}" , &buf[..n]); |
321 | /// break; |
322 | /// } |
323 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
324 | /// continue; |
325 | /// } |
326 | /// Err(e) => { |
327 | /// return Err(e); |
328 | /// } |
329 | /// } |
330 | /// } |
331 | /// |
332 | /// Ok(()) |
333 | /// } |
334 | /// ``` |
335 | pub async fn readable(&self) -> io::Result<()> { |
336 | self.ready(Interest::READABLE).await?; |
337 | Ok(()) |
338 | } |
339 | |
340 | /// Polls for read/receive readiness. |
341 | /// |
342 | /// If the socket is not currently ready for receiving, this method will |
343 | /// store a clone of the `Waker` from the provided `Context`. When the |
344 | /// socket becomes ready for reading, `Waker::wake` will be called on the |
345 | /// waker. |
346 | /// |
347 | /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or |
348 | /// `poll_peek`, only the `Waker` from the `Context` passed to the most |
349 | /// recent call is scheduled to receive a wakeup. (However, |
350 | /// `poll_send_ready` retains a second, independent waker.) |
351 | /// |
352 | /// This function is intended for cases where creating and pinning a future |
353 | /// via [`readable`] is not feasible. Where possible, using [`readable`] is |
354 | /// preferred, as this supports polling from multiple tasks at once. |
355 | /// |
356 | /// # Return value |
357 | /// |
358 | /// The function returns: |
359 | /// |
360 | /// * `Poll::Pending` if the socket is not ready for reading. |
361 | /// * `Poll::Ready(Ok(()))` if the socket is ready for reading. |
362 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
363 | /// |
364 | /// # Errors |
365 | /// |
366 | /// This function may encounter any standard I/O error except `WouldBlock`. |
367 | /// |
368 | /// [`readable`]: method@Self::readable |
369 | pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
370 | self.io.registration().poll_read_ready(cx).map_ok(|_| ()) |
371 | } |
372 | |
373 | /// Creates a new `UnixDatagram` bound to the specified path. |
374 | /// |
375 | /// # Examples |
376 | /// ``` |
377 | /// # if cfg!(miri) { return } // No `socket` in miri. |
378 | /// # use std::error::Error; |
379 | /// # #[tokio::main] |
380 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
381 | /// use tokio::net::UnixDatagram; |
382 | /// use tempfile::tempdir; |
383 | /// |
384 | /// // We use a temporary directory so that the socket |
385 | /// // files left by the bound sockets will get cleaned up. |
386 | /// let tmp = tempdir()?; |
387 | /// |
388 | /// // Bind the socket to a filesystem path |
389 | /// let socket_path = tmp.path().join("socket" ); |
390 | /// let socket = UnixDatagram::bind(&socket_path)?; |
391 | /// |
392 | /// # Ok(()) |
393 | /// # } |
394 | /// ``` |
395 | pub fn bind<P>(path: P) -> io::Result<UnixDatagram> |
396 | where |
397 | P: AsRef<Path>, |
398 | { |
399 | let socket = mio::net::UnixDatagram::bind(path)?; |
400 | UnixDatagram::new(socket) |
401 | } |
402 | |
403 | /// Creates an unnamed pair of connected sockets. |
404 | /// |
405 | /// This function will create a pair of interconnected Unix sockets for |
406 | /// communicating back and forth between one another. |
407 | /// |
408 | /// # Examples |
409 | /// ``` |
410 | /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri. |
411 | /// # use std::error::Error; |
412 | /// # #[tokio::main] |
413 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
414 | /// use tokio::net::UnixDatagram; |
415 | /// |
416 | /// // Create the pair of sockets |
417 | /// let (sock1, sock2) = UnixDatagram::pair()?; |
418 | /// |
419 | /// // Since the sockets are paired, the paired send/recv |
420 | /// // functions can be used |
421 | /// let bytes = b"hail eris" ; |
422 | /// sock1.send(bytes).await?; |
423 | /// |
424 | /// let mut buff = vec![0u8; 24]; |
425 | /// let size = sock2.recv(&mut buff).await?; |
426 | /// |
427 | /// let dgram = &buff[..size]; |
428 | /// assert_eq!(dgram, bytes); |
429 | /// |
430 | /// # Ok(()) |
431 | /// # } |
432 | /// ``` |
433 | pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { |
434 | let (a, b) = mio::net::UnixDatagram::pair()?; |
435 | let a = UnixDatagram::new(a)?; |
436 | let b = UnixDatagram::new(b)?; |
437 | |
438 | Ok((a, b)) |
439 | } |
440 | |
441 | /// Creates new [`UnixDatagram`] from a [`std::os::unix::net::UnixDatagram`]. |
442 | /// |
443 | /// This function is intended to be used to wrap a `UnixDatagram` from the |
444 | /// standard library in the Tokio equivalent. |
445 | /// |
446 | /// # Notes |
447 | /// |
448 | /// The caller is responsible for ensuring that the socket is in |
449 | /// non-blocking mode. Otherwise all I/O operations on the socket |
450 | /// will block the thread, which will cause unexpected behavior. |
451 | /// Non-blocking mode can be set using [`set_nonblocking`]. |
452 | /// |
453 | /// Passing a listener in blocking mode is always erroneous, |
454 | /// and the behavior in that case may change in the future. |
455 | /// For example, it could panic. |
456 | /// |
457 | /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking |
458 | /// |
459 | /// # Panics |
460 | /// |
461 | /// This function panics if it is not called from within a runtime with |
462 | /// IO enabled. |
463 | /// |
464 | /// The runtime is usually set implicitly when this function is called |
465 | /// from a future driven by a Tokio runtime, otherwise runtime can be set |
466 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
467 | /// # Examples |
468 | /// ``` |
469 | /// # if cfg!(miri) { return } // No `socket` in miri. |
470 | /// # use std::error::Error; |
471 | /// # #[tokio::main] |
472 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
473 | /// use tokio::net::UnixDatagram; |
474 | /// use std::os::unix::net::UnixDatagram as StdUDS; |
475 | /// use tempfile::tempdir; |
476 | /// |
477 | /// // We use a temporary directory so that the socket |
478 | /// // files left by the bound sockets will get cleaned up. |
479 | /// let tmp = tempdir()?; |
480 | /// |
481 | /// // Bind the socket to a filesystem path |
482 | /// let socket_path = tmp.path().join("socket" ); |
483 | /// let std_socket = StdUDS::bind(&socket_path)?; |
484 | /// std_socket.set_nonblocking(true)?; |
485 | /// let tokio_socket = UnixDatagram::from_std(std_socket)?; |
486 | /// |
487 | /// # Ok(()) |
488 | /// # } |
489 | /// ``` |
490 | #[track_caller ] |
491 | pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> { |
492 | check_socket_for_blocking(&datagram)?; |
493 | |
494 | let socket = mio::net::UnixDatagram::from_std(datagram); |
495 | let io = PollEvented::new(socket)?; |
496 | Ok(UnixDatagram { io }) |
497 | } |
498 | |
499 | /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`]. |
500 | /// |
501 | /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking |
502 | /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode |
503 | /// if needed. |
504 | /// |
505 | /// # Examples |
506 | /// |
507 | /// ```rust,no_run |
508 | /// # use std::error::Error; |
509 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
510 | /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket" )?; |
511 | /// let std_socket = tokio_socket.into_std()?; |
512 | /// std_socket.set_nonblocking(false)?; |
513 | /// # Ok(()) |
514 | /// # } |
515 | /// ``` |
516 | /// |
517 | /// [`tokio::net::UnixDatagram`]: UnixDatagram |
518 | /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram |
519 | /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking |
520 | pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> { |
521 | self.io |
522 | .into_inner() |
523 | .map(IntoRawFd::into_raw_fd) |
524 | .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) }) |
525 | } |
526 | |
527 | fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> { |
528 | let io = PollEvented::new(socket)?; |
529 | Ok(UnixDatagram { io }) |
530 | } |
531 | |
532 | /// Creates a new `UnixDatagram` which is not bound to any address. |
533 | /// |
534 | /// # Examples |
535 | /// ``` |
536 | /// # if cfg!(miri) { return } // No `socket` in miri. |
537 | /// # use std::error::Error; |
538 | /// # #[tokio::main] |
539 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
540 | /// use tokio::net::UnixDatagram; |
541 | /// use tempfile::tempdir; |
542 | /// |
543 | /// // Create an unbound socket |
544 | /// let tx = UnixDatagram::unbound()?; |
545 | /// |
546 | /// // Create another, bound socket |
547 | /// let tmp = tempdir()?; |
548 | /// let rx_path = tmp.path().join("rx" ); |
549 | /// let rx = UnixDatagram::bind(&rx_path)?; |
550 | /// |
551 | /// // Send to the bound socket |
552 | /// let bytes = b"hello world" ; |
553 | /// tx.send_to(bytes, &rx_path).await?; |
554 | /// |
555 | /// let mut buf = vec![0u8; 24]; |
556 | /// let (size, addr) = rx.recv_from(&mut buf).await?; |
557 | /// |
558 | /// let dgram = &buf[..size]; |
559 | /// assert_eq!(dgram, bytes); |
560 | /// |
561 | /// # Ok(()) |
562 | /// # } |
563 | /// ``` |
564 | pub fn unbound() -> io::Result<UnixDatagram> { |
565 | let socket = mio::net::UnixDatagram::unbound()?; |
566 | UnixDatagram::new(socket) |
567 | } |
568 | |
569 | /// Connects the socket to the specified address. |
570 | /// |
571 | /// The `send` method may be used to send data to the specified address. |
572 | /// `recv` and `recv_from` will only receive data from that address. |
573 | /// |
574 | /// # Examples |
575 | /// ``` |
576 | /// # if cfg!(miri) { return } // No `socket` in miri. |
577 | /// # use std::error::Error; |
578 | /// # #[tokio::main] |
579 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
580 | /// use tokio::net::UnixDatagram; |
581 | /// use tempfile::tempdir; |
582 | /// |
583 | /// // Create an unbound socket |
584 | /// let tx = UnixDatagram::unbound()?; |
585 | /// |
586 | /// // Create another, bound socket |
587 | /// let tmp = tempdir()?; |
588 | /// let rx_path = tmp.path().join("rx" ); |
589 | /// let rx = UnixDatagram::bind(&rx_path)?; |
590 | /// |
591 | /// // Connect to the bound socket |
592 | /// tx.connect(&rx_path)?; |
593 | /// |
594 | /// // Send to the bound socket |
595 | /// let bytes = b"hello world" ; |
596 | /// tx.send(bytes).await?; |
597 | /// |
598 | /// let mut buf = vec![0u8; 24]; |
599 | /// let (size, addr) = rx.recv_from(&mut buf).await?; |
600 | /// |
601 | /// let dgram = &buf[..size]; |
602 | /// assert_eq!(dgram, bytes); |
603 | /// |
604 | /// # Ok(()) |
605 | /// # } |
606 | /// ``` |
607 | pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> { |
608 | self.io.connect(path) |
609 | } |
610 | |
611 | /// Sends data on the socket to the socket's peer. |
612 | /// |
613 | /// # Cancel safety |
614 | /// |
615 | /// This method is cancel safe. If `send` is used as the event in a |
616 | /// [`tokio::select!`](crate::select) statement and some other branch |
617 | /// completes first, then it is guaranteed that the message was not sent. |
618 | /// |
619 | /// # Examples |
620 | /// ``` |
621 | /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri. |
622 | /// # use std::error::Error; |
623 | /// # #[tokio::main] |
624 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
625 | /// use tokio::net::UnixDatagram; |
626 | /// |
627 | /// // Create the pair of sockets |
628 | /// let (sock1, sock2) = UnixDatagram::pair()?; |
629 | /// |
630 | /// // Since the sockets are paired, the paired send/recv |
631 | /// // functions can be used |
632 | /// let bytes = b"hello world" ; |
633 | /// sock1.send(bytes).await?; |
634 | /// |
635 | /// let mut buff = vec![0u8; 24]; |
636 | /// let size = sock2.recv(&mut buff).await?; |
637 | /// |
638 | /// let dgram = &buff[..size]; |
639 | /// assert_eq!(dgram, bytes); |
640 | /// |
641 | /// # Ok(()) |
642 | /// # } |
643 | /// ``` |
644 | pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { |
645 | self.io |
646 | .registration() |
647 | .async_io(Interest::WRITABLE, || self.io.send(buf)) |
648 | .await |
649 | } |
650 | |
651 | /// Tries to send a datagram to the peer without waiting. |
652 | /// |
653 | /// # Examples |
654 | /// |
655 | /// ```no_run |
656 | /// use tokio::net::UnixDatagram; |
657 | /// use std::io; |
658 | /// |
659 | /// #[tokio::main] |
660 | /// async fn main() -> io::Result<()> { |
661 | /// let dir = tempfile::tempdir().unwrap(); |
662 | /// let client_path = dir.path().join("client.sock" ); |
663 | /// let server_path = dir.path().join("server.sock" ); |
664 | /// let socket = UnixDatagram::bind(&client_path)?; |
665 | /// socket.connect(&server_path)?; |
666 | /// |
667 | /// loop { |
668 | /// // Wait for the socket to be writable |
669 | /// socket.writable().await?; |
670 | /// |
671 | /// // Try to send data, this may still fail with `WouldBlock` |
672 | /// // if the readiness event is a false positive. |
673 | /// match socket.try_send(b"hello world" ) { |
674 | /// Ok(n) => { |
675 | /// break; |
676 | /// } |
677 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
678 | /// continue; |
679 | /// } |
680 | /// Err(e) => { |
681 | /// return Err(e); |
682 | /// } |
683 | /// } |
684 | /// } |
685 | /// |
686 | /// Ok(()) |
687 | /// } |
688 | /// ``` |
689 | pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> { |
690 | self.io |
691 | .registration() |
692 | .try_io(Interest::WRITABLE, || self.io.send(buf)) |
693 | } |
694 | |
695 | /// Tries to send a datagram to the peer without waiting. |
696 | /// |
697 | /// # Examples |
698 | /// |
699 | /// ```no_run |
700 | /// use tokio::net::UnixDatagram; |
701 | /// use std::io; |
702 | /// |
703 | /// #[tokio::main] |
704 | /// async fn main() -> io::Result<()> { |
705 | /// let dir = tempfile::tempdir().unwrap(); |
706 | /// let client_path = dir.path().join("client.sock" ); |
707 | /// let server_path = dir.path().join("server.sock" ); |
708 | /// let socket = UnixDatagram::bind(&client_path)?; |
709 | /// |
710 | /// loop { |
711 | /// // Wait for the socket to be writable |
712 | /// socket.writable().await?; |
713 | /// |
714 | /// // Try to send data, this may still fail with `WouldBlock` |
715 | /// // if the readiness event is a false positive. |
716 | /// match socket.try_send_to(b"hello world" , &server_path) { |
717 | /// Ok(n) => { |
718 | /// break; |
719 | /// } |
720 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
721 | /// continue; |
722 | /// } |
723 | /// Err(e) => { |
724 | /// return Err(e); |
725 | /// } |
726 | /// } |
727 | /// } |
728 | /// |
729 | /// Ok(()) |
730 | /// } |
731 | /// ``` |
732 | pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> |
733 | where |
734 | P: AsRef<Path>, |
735 | { |
736 | self.io |
737 | .registration() |
738 | .try_io(Interest::WRITABLE, || self.io.send_to(buf, target)) |
739 | } |
740 | |
741 | /// Receives data from the socket. |
742 | /// |
743 | /// # Cancel safety |
744 | /// |
745 | /// This method is cancel safe. If `recv` is used as the event in a |
746 | /// [`tokio::select!`](crate::select) statement and some other branch |
747 | /// completes first, it is guaranteed that no messages were received on this |
748 | /// socket. |
749 | /// |
750 | /// # Examples |
751 | /// ``` |
752 | /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri. |
753 | /// # use std::error::Error; |
754 | /// # #[tokio::main] |
755 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
756 | /// use tokio::net::UnixDatagram; |
757 | /// |
758 | /// // Create the pair of sockets |
759 | /// let (sock1, sock2) = UnixDatagram::pair()?; |
760 | /// |
761 | /// // Since the sockets are paired, the paired send/recv |
762 | /// // functions can be used |
763 | /// let bytes = b"hello world" ; |
764 | /// sock1.send(bytes).await?; |
765 | /// |
766 | /// let mut buff = vec![0u8; 24]; |
767 | /// let size = sock2.recv(&mut buff).await?; |
768 | /// |
769 | /// let dgram = &buff[..size]; |
770 | /// assert_eq!(dgram, bytes); |
771 | /// |
772 | /// # Ok(()) |
773 | /// # } |
774 | /// ``` |
775 | pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { |
776 | self.io |
777 | .registration() |
778 | .async_io(Interest::READABLE, || self.io.recv(buf)) |
779 | .await |
780 | } |
781 | |
782 | /// Tries to receive a datagram from the peer without waiting. |
783 | /// |
784 | /// # Examples |
785 | /// |
786 | /// ```no_run |
787 | /// use tokio::net::UnixDatagram; |
788 | /// use std::io; |
789 | /// |
790 | /// #[tokio::main] |
791 | /// async fn main() -> io::Result<()> { |
792 | /// // Connect to a peer |
793 | /// let dir = tempfile::tempdir().unwrap(); |
794 | /// let client_path = dir.path().join("client.sock" ); |
795 | /// let server_path = dir.path().join("server.sock" ); |
796 | /// let socket = UnixDatagram::bind(&client_path)?; |
797 | /// socket.connect(&server_path)?; |
798 | /// |
799 | /// loop { |
800 | /// // Wait for the socket to be readable |
801 | /// socket.readable().await?; |
802 | /// |
803 | /// // The buffer is **not** included in the async task and will |
804 | /// // only exist on the stack. |
805 | /// let mut buf = [0; 1024]; |
806 | /// |
807 | /// // Try to recv data, this may still fail with `WouldBlock` |
808 | /// // if the readiness event is a false positive. |
809 | /// match socket.try_recv(&mut buf) { |
810 | /// Ok(n) => { |
811 | /// println!("GOT {:?}" , &buf[..n]); |
812 | /// break; |
813 | /// } |
814 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
815 | /// continue; |
816 | /// } |
817 | /// Err(e) => { |
818 | /// return Err(e); |
819 | /// } |
820 | /// } |
821 | /// } |
822 | /// |
823 | /// Ok(()) |
824 | /// } |
825 | /// ``` |
826 | pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> { |
827 | self.io |
828 | .registration() |
829 | .try_io(Interest::READABLE, || self.io.recv(buf)) |
830 | } |
831 | |
832 | cfg_io_util! { |
833 | /// Tries to receive data from the socket without waiting. |
834 | /// |
835 | /// This method can be used even if `buf` is uninitialized. |
836 | /// |
837 | /// # Examples |
838 | /// |
839 | /// ```no_run |
840 | /// use tokio::net::UnixDatagram; |
841 | /// use std::io; |
842 | /// |
843 | /// #[tokio::main] |
844 | /// async fn main() -> io::Result<()> { |
845 | /// // Connect to a peer |
846 | /// let dir = tempfile::tempdir().unwrap(); |
847 | /// let client_path = dir.path().join("client.sock"); |
848 | /// let server_path = dir.path().join("server.sock"); |
849 | /// let socket = UnixDatagram::bind(&client_path)?; |
850 | /// |
851 | /// loop { |
852 | /// // Wait for the socket to be readable |
853 | /// socket.readable().await?; |
854 | /// |
855 | /// let mut buf = Vec::with_capacity(1024); |
856 | /// |
857 | /// // Try to recv data, this may still fail with `WouldBlock` |
858 | /// // if the readiness event is a false positive. |
859 | /// match socket.try_recv_buf_from(&mut buf) { |
860 | /// Ok((n, _addr)) => { |
861 | /// println!("GOT {:?}", &buf[..n]); |
862 | /// break; |
863 | /// } |
864 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
865 | /// continue; |
866 | /// } |
867 | /// Err(e) => { |
868 | /// return Err(e); |
869 | /// } |
870 | /// } |
871 | /// } |
872 | /// |
873 | /// Ok(()) |
874 | /// } |
875 | /// ``` |
876 | pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> { |
877 | let (n, addr) = self.io.registration().try_io(Interest::READABLE, || { |
878 | let dst = buf.chunk_mut(); |
879 | let dst = |
880 | unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; |
881 | |
882 | // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the |
883 | // buffer. |
884 | let (n, addr) = (*self.io).recv_from(dst)?; |
885 | |
886 | unsafe { |
887 | buf.advance_mut(n); |
888 | } |
889 | |
890 | Ok((n, addr)) |
891 | })?; |
892 | |
893 | Ok((n, SocketAddr(addr))) |
894 | } |
895 | |
896 | /// Receives from the socket, advances the |
897 | /// buffer's internal cursor and returns how many bytes were read and the origin. |
898 | /// |
899 | /// This method can be used even if `buf` is uninitialized. |
900 | /// |
901 | /// # Examples |
902 | /// ``` |
903 | /// # if cfg!(miri) { return } // No `socket` in miri. |
904 | /// # use std::error::Error; |
905 | /// # #[tokio::main] |
906 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
907 | /// use tokio::net::UnixDatagram; |
908 | /// use tempfile::tempdir; |
909 | /// |
910 | /// // We use a temporary directory so that the socket |
911 | /// // files left by the bound sockets will get cleaned up. |
912 | /// let tmp = tempdir()?; |
913 | /// |
914 | /// // Bind each socket to a filesystem path |
915 | /// let tx_path = tmp.path().join("tx"); |
916 | /// let tx = UnixDatagram::bind(&tx_path)?; |
917 | /// let rx_path = tmp.path().join("rx"); |
918 | /// let rx = UnixDatagram::bind(&rx_path)?; |
919 | /// |
920 | /// let bytes = b"hello world"; |
921 | /// tx.send_to(bytes, &rx_path).await?; |
922 | /// |
923 | /// let mut buf = Vec::with_capacity(24); |
924 | /// let (size, addr) = rx.recv_buf_from(&mut buf).await?; |
925 | /// |
926 | /// let dgram = &buf[..size]; |
927 | /// assert_eq!(dgram, bytes); |
928 | /// assert_eq!(addr.as_pathname().unwrap(), &tx_path); |
929 | /// |
930 | /// # Ok(()) |
931 | /// # } |
932 | /// ``` |
933 | pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> { |
934 | self.io.registration().async_io(Interest::READABLE, || { |
935 | let dst = buf.chunk_mut(); |
936 | let dst = |
937 | unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; |
938 | |
939 | // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the |
940 | // buffer. |
941 | let (n, addr) = (*self.io).recv_from(dst)?; |
942 | |
943 | unsafe { |
944 | buf.advance_mut(n); |
945 | } |
946 | Ok((n,SocketAddr(addr))) |
947 | }).await |
948 | } |
949 | |
950 | /// Tries to read data from the stream into the provided buffer, advancing the |
951 | /// buffer's internal cursor, returning how many bytes were read. |
952 | /// |
953 | /// This method can be used even if `buf` is uninitialized. |
954 | /// |
955 | /// # Examples |
956 | /// |
957 | /// ```no_run |
958 | /// use tokio::net::UnixDatagram; |
959 | /// use std::io; |
960 | /// |
961 | /// #[tokio::main] |
962 | /// async fn main() -> io::Result<()> { |
963 | /// // Connect to a peer |
964 | /// let dir = tempfile::tempdir().unwrap(); |
965 | /// let client_path = dir.path().join("client.sock"); |
966 | /// let server_path = dir.path().join("server.sock"); |
967 | /// let socket = UnixDatagram::bind(&client_path)?; |
968 | /// socket.connect(&server_path)?; |
969 | /// |
970 | /// loop { |
971 | /// // Wait for the socket to be readable |
972 | /// socket.readable().await?; |
973 | /// |
974 | /// let mut buf = Vec::with_capacity(1024); |
975 | /// |
976 | /// // Try to recv data, this may still fail with `WouldBlock` |
977 | /// // if the readiness event is a false positive. |
978 | /// match socket.try_recv_buf(&mut buf) { |
979 | /// Ok(n) => { |
980 | /// println!("GOT {:?}", &buf[..n]); |
981 | /// break; |
982 | /// } |
983 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
984 | /// continue; |
985 | /// } |
986 | /// Err(e) => { |
987 | /// return Err(e); |
988 | /// } |
989 | /// } |
990 | /// } |
991 | /// |
992 | /// Ok(()) |
993 | /// } |
994 | /// ``` |
995 | pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { |
996 | self.io.registration().try_io(Interest::READABLE, || { |
997 | let dst = buf.chunk_mut(); |
998 | let dst = |
999 | unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; |
1000 | |
1001 | // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the |
1002 | // buffer. |
1003 | let n = (*self.io).recv(dst)?; |
1004 | |
1005 | unsafe { |
1006 | buf.advance_mut(n); |
1007 | } |
1008 | |
1009 | Ok(n) |
1010 | }) |
1011 | } |
1012 | |
1013 | /// Receives data from the socket from the address to which it is connected, |
1014 | /// advancing the buffer's internal cursor, returning how many bytes were read. |
1015 | /// |
1016 | /// This method can be used even if `buf` is uninitialized. |
1017 | /// |
1018 | /// # Examples |
1019 | /// ``` |
1020 | /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri. |
1021 | /// # use std::error::Error; |
1022 | /// # #[tokio::main] |
1023 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
1024 | /// use tokio::net::UnixDatagram; |
1025 | /// |
1026 | /// // Create the pair of sockets |
1027 | /// let (sock1, sock2) = UnixDatagram::pair()?; |
1028 | /// |
1029 | /// // Since the sockets are paired, the paired send/recv |
1030 | /// // functions can be used |
1031 | /// let bytes = b"hello world"; |
1032 | /// sock1.send(bytes).await?; |
1033 | /// |
1034 | /// let mut buff = Vec::with_capacity(24); |
1035 | /// let size = sock2.recv_buf(&mut buff).await?; |
1036 | /// |
1037 | /// let dgram = &buff[..size]; |
1038 | /// assert_eq!(dgram, bytes); |
1039 | /// |
1040 | /// # Ok(()) |
1041 | /// # } |
1042 | /// ``` |
1043 | pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { |
1044 | self.io.registration().async_io(Interest::READABLE, || { |
1045 | let dst = buf.chunk_mut(); |
1046 | let dst = |
1047 | unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; |
1048 | |
1049 | // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the |
1050 | // buffer. |
1051 | let n = (*self.io).recv(dst)?; |
1052 | |
1053 | unsafe { |
1054 | buf.advance_mut(n); |
1055 | } |
1056 | Ok(n) |
1057 | }).await |
1058 | } |
1059 | } |
1060 | |
1061 | /// Sends data on the socket to the specified address. |
1062 | /// |
1063 | /// # Cancel safety |
1064 | /// |
1065 | /// This method is cancel safe. If `send_to` is used as the event in a |
1066 | /// [`tokio::select!`](crate::select) statement and some other branch |
1067 | /// completes first, then it is guaranteed that the message was not sent. |
1068 | /// |
1069 | /// # Examples |
1070 | /// ``` |
1071 | /// # if cfg!(miri) { return } // No `socket` in miri. |
1072 | /// # use std::error::Error; |
1073 | /// # #[tokio::main] |
1074 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
1075 | /// use tokio::net::UnixDatagram; |
1076 | /// use tempfile::tempdir; |
1077 | /// |
1078 | /// // We use a temporary directory so that the socket |
1079 | /// // files left by the bound sockets will get cleaned up. |
1080 | /// let tmp = tempdir()?; |
1081 | /// |
1082 | /// // Bind each socket to a filesystem path |
1083 | /// let tx_path = tmp.path().join("tx" ); |
1084 | /// let tx = UnixDatagram::bind(&tx_path)?; |
1085 | /// let rx_path = tmp.path().join("rx" ); |
1086 | /// let rx = UnixDatagram::bind(&rx_path)?; |
1087 | /// |
1088 | /// let bytes = b"hello world" ; |
1089 | /// tx.send_to(bytes, &rx_path).await?; |
1090 | /// |
1091 | /// let mut buf = vec![0u8; 24]; |
1092 | /// let (size, addr) = rx.recv_from(&mut buf).await?; |
1093 | /// |
1094 | /// let dgram = &buf[..size]; |
1095 | /// assert_eq!(dgram, bytes); |
1096 | /// assert_eq!(addr.as_pathname().unwrap(), &tx_path); |
1097 | /// |
1098 | /// # Ok(()) |
1099 | /// # } |
1100 | /// ``` |
1101 | pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> |
1102 | where |
1103 | P: AsRef<Path>, |
1104 | { |
1105 | self.io |
1106 | .registration() |
1107 | .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref())) |
1108 | .await |
1109 | } |
1110 | |
1111 | /// Receives data from the socket. |
1112 | /// |
1113 | /// # Cancel safety |
1114 | /// |
1115 | /// This method is cancel safe. If `recv_from` is used as the event in a |
1116 | /// [`tokio::select!`](crate::select) statement and some other branch |
1117 | /// completes first, it is guaranteed that no messages were received on this |
1118 | /// socket. |
1119 | /// |
1120 | /// # Examples |
1121 | /// ``` |
1122 | /// # if cfg!(miri) { return } // No `socket` in miri. |
1123 | /// # use std::error::Error; |
1124 | /// # #[tokio::main] |
1125 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
1126 | /// use tokio::net::UnixDatagram; |
1127 | /// use tempfile::tempdir; |
1128 | /// |
1129 | /// // We use a temporary directory so that the socket |
1130 | /// // files left by the bound sockets will get cleaned up. |
1131 | /// let tmp = tempdir()?; |
1132 | /// |
1133 | /// // Bind each socket to a filesystem path |
1134 | /// let tx_path = tmp.path().join("tx" ); |
1135 | /// let tx = UnixDatagram::bind(&tx_path)?; |
1136 | /// let rx_path = tmp.path().join("rx" ); |
1137 | /// let rx = UnixDatagram::bind(&rx_path)?; |
1138 | /// |
1139 | /// let bytes = b"hello world" ; |
1140 | /// tx.send_to(bytes, &rx_path).await?; |
1141 | /// |
1142 | /// let mut buf = vec![0u8; 24]; |
1143 | /// let (size, addr) = rx.recv_from(&mut buf).await?; |
1144 | /// |
1145 | /// let dgram = &buf[..size]; |
1146 | /// assert_eq!(dgram, bytes); |
1147 | /// assert_eq!(addr.as_pathname().unwrap(), &tx_path); |
1148 | /// |
1149 | /// # Ok(()) |
1150 | /// # } |
1151 | /// ``` |
1152 | pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { |
1153 | let (n, addr) = self |
1154 | .io |
1155 | .registration() |
1156 | .async_io(Interest::READABLE, || self.io.recv_from(buf)) |
1157 | .await?; |
1158 | |
1159 | Ok((n, SocketAddr(addr))) |
1160 | } |
1161 | |
1162 | /// Attempts to receive a single datagram on the specified address. |
1163 | /// |
1164 | /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the |
1165 | /// `Waker` from the `Context` passed to the most recent call will be scheduled to |
1166 | /// receive a wakeup. |
1167 | /// |
1168 | /// # Return value |
1169 | /// |
1170 | /// The function returns: |
1171 | /// |
1172 | /// * `Poll::Pending` if the socket is not ready to read |
1173 | /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready |
1174 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
1175 | /// |
1176 | /// # Errors |
1177 | /// |
1178 | /// This function may encounter any standard I/O error except `WouldBlock`. |
1179 | pub fn poll_recv_from( |
1180 | &self, |
1181 | cx: &mut Context<'_>, |
1182 | buf: &mut ReadBuf<'_>, |
1183 | ) -> Poll<io::Result<SocketAddr>> { |
1184 | #[allow (clippy::blocks_in_conditions)] |
1185 | let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { |
1186 | // Safety: will not read the maybe uninitialized bytes. |
1187 | let b = unsafe { |
1188 | &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) |
1189 | }; |
1190 | |
1191 | self.io.recv_from(b) |
1192 | }))?; |
1193 | |
1194 | // Safety: We trust `recv` to have filled up `n` bytes in the buffer. |
1195 | unsafe { |
1196 | buf.assume_init(n); |
1197 | } |
1198 | buf.advance(n); |
1199 | Poll::Ready(Ok(SocketAddr(addr))) |
1200 | } |
1201 | |
1202 | /// Attempts to send data to the specified address. |
1203 | /// |
1204 | /// Note that on multiple calls to a `poll_*` method in the send direction, only the |
1205 | /// `Waker` from the `Context` passed to the most recent call will be scheduled to |
1206 | /// receive a wakeup. |
1207 | /// |
1208 | /// # Return value |
1209 | /// |
1210 | /// The function returns: |
1211 | /// |
1212 | /// * `Poll::Pending` if the socket is not ready to write |
1213 | /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent. |
1214 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
1215 | /// |
1216 | /// # Errors |
1217 | /// |
1218 | /// This function may encounter any standard I/O error except `WouldBlock`. |
1219 | pub fn poll_send_to<P>( |
1220 | &self, |
1221 | cx: &mut Context<'_>, |
1222 | buf: &[u8], |
1223 | target: P, |
1224 | ) -> Poll<io::Result<usize>> |
1225 | where |
1226 | P: AsRef<Path>, |
1227 | { |
1228 | self.io |
1229 | .registration() |
1230 | .poll_write_io(cx, || self.io.send_to(buf, target.as_ref())) |
1231 | } |
1232 | |
1233 | /// Attempts to send data on the socket to the remote address to which it |
1234 | /// was previously `connect`ed. |
1235 | /// |
1236 | /// The [`connect`] method will connect this socket to a remote address. |
1237 | /// This method will fail if the socket is not connected. |
1238 | /// |
1239 | /// Note that on multiple calls to a `poll_*` method in the send direction, |
1240 | /// only the `Waker` from the `Context` passed to the most recent call will |
1241 | /// be scheduled to receive a wakeup. |
1242 | /// |
1243 | /// # Return value |
1244 | /// |
1245 | /// The function returns: |
1246 | /// |
1247 | /// * `Poll::Pending` if the socket is not available to write |
1248 | /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent |
1249 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
1250 | /// |
1251 | /// # Errors |
1252 | /// |
1253 | /// This function may encounter any standard I/O error except `WouldBlock`. |
1254 | /// |
1255 | /// [`connect`]: method@Self::connect |
1256 | pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { |
1257 | self.io |
1258 | .registration() |
1259 | .poll_write_io(cx, || self.io.send(buf)) |
1260 | } |
1261 | |
1262 | /// Attempts to receive a single datagram message on the socket from the remote |
1263 | /// address to which it is `connect`ed. |
1264 | /// |
1265 | /// The [`connect`] method will connect this socket to a remote address. This method |
1266 | /// resolves to an error if the socket is not connected. |
1267 | /// |
1268 | /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the |
1269 | /// `Waker` from the `Context` passed to the most recent call will be scheduled to |
1270 | /// receive a wakeup. |
1271 | /// |
1272 | /// # Return value |
1273 | /// |
1274 | /// The function returns: |
1275 | /// |
1276 | /// * `Poll::Pending` if the socket is not ready to read |
1277 | /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready |
1278 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
1279 | /// |
1280 | /// # Errors |
1281 | /// |
1282 | /// This function may encounter any standard I/O error except `WouldBlock`. |
1283 | /// |
1284 | /// [`connect`]: method@Self::connect |
1285 | pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> { |
1286 | #[allow (clippy::blocks_in_conditions)] |
1287 | let n = ready!(self.io.registration().poll_read_io(cx, || { |
1288 | // Safety: will not read the maybe uninitialized bytes. |
1289 | let b = unsafe { |
1290 | &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) |
1291 | }; |
1292 | |
1293 | self.io.recv(b) |
1294 | }))?; |
1295 | |
1296 | // Safety: We trust `recv` to have filled up `n` bytes in the buffer. |
1297 | unsafe { |
1298 | buf.assume_init(n); |
1299 | } |
1300 | buf.advance(n); |
1301 | Poll::Ready(Ok(())) |
1302 | } |
1303 | |
1304 | /// Tries to receive data from the socket without waiting. |
1305 | /// |
1306 | /// # Examples |
1307 | /// |
1308 | /// ```no_run |
1309 | /// use tokio::net::UnixDatagram; |
1310 | /// use std::io; |
1311 | /// |
1312 | /// #[tokio::main] |
1313 | /// async fn main() -> io::Result<()> { |
1314 | /// // Connect to a peer |
1315 | /// let dir = tempfile::tempdir().unwrap(); |
1316 | /// let client_path = dir.path().join("client.sock" ); |
1317 | /// let server_path = dir.path().join("server.sock" ); |
1318 | /// let socket = UnixDatagram::bind(&client_path)?; |
1319 | /// |
1320 | /// loop { |
1321 | /// // Wait for the socket to be readable |
1322 | /// socket.readable().await?; |
1323 | /// |
1324 | /// // The buffer is **not** included in the async task and will |
1325 | /// // only exist on the stack. |
1326 | /// let mut buf = [0; 1024]; |
1327 | /// |
1328 | /// // Try to recv data, this may still fail with `WouldBlock` |
1329 | /// // if the readiness event is a false positive. |
1330 | /// match socket.try_recv_from(&mut buf) { |
1331 | /// Ok((n, _addr)) => { |
1332 | /// println!("GOT {:?}" , &buf[..n]); |
1333 | /// break; |
1334 | /// } |
1335 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
1336 | /// continue; |
1337 | /// } |
1338 | /// Err(e) => { |
1339 | /// return Err(e); |
1340 | /// } |
1341 | /// } |
1342 | /// } |
1343 | /// |
1344 | /// Ok(()) |
1345 | /// } |
1346 | /// ``` |
1347 | pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { |
1348 | let (n, addr) = self |
1349 | .io |
1350 | .registration() |
1351 | .try_io(Interest::READABLE, || self.io.recv_from(buf))?; |
1352 | |
1353 | Ok((n, SocketAddr(addr))) |
1354 | } |
1355 | |
1356 | /// Tries to read or write from the socket using a user-provided IO operation. |
1357 | /// |
1358 | /// If the socket is ready, the provided closure is called. The closure |
1359 | /// should attempt to perform IO operation on the socket by manually |
1360 | /// calling the appropriate syscall. If the operation fails because the |
1361 | /// socket is not actually ready, then the closure should return a |
1362 | /// `WouldBlock` error and the readiness flag is cleared. The return value |
1363 | /// of the closure is then returned by `try_io`. |
1364 | /// |
1365 | /// If the socket is not ready, then the closure is not called |
1366 | /// and a `WouldBlock` error is returned. |
1367 | /// |
1368 | /// The closure should only return a `WouldBlock` error if it has performed |
1369 | /// an IO operation on the socket that failed due to the socket not being |
1370 | /// ready. Returning a `WouldBlock` error in any other situation will |
1371 | /// incorrectly clear the readiness flag, which can cause the socket to |
1372 | /// behave incorrectly. |
1373 | /// |
1374 | /// The closure should not perform the IO operation using any of the methods |
1375 | /// defined on the Tokio `UnixDatagram` type, as this will mess with the |
1376 | /// readiness flag and can cause the socket to behave incorrectly. |
1377 | /// |
1378 | /// This method is not intended to be used with combined interests. |
1379 | /// The closure should perform only one type of IO operation, so it should not |
1380 | /// require more than one ready state. This method may panic or sleep forever |
1381 | /// if it is called with a combined interest. |
1382 | /// |
1383 | /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. |
1384 | /// |
1385 | /// [`readable()`]: UnixDatagram::readable() |
1386 | /// [`writable()`]: UnixDatagram::writable() |
1387 | /// [`ready()`]: UnixDatagram::ready() |
1388 | pub fn try_io<R>( |
1389 | &self, |
1390 | interest: Interest, |
1391 | f: impl FnOnce() -> io::Result<R>, |
1392 | ) -> io::Result<R> { |
1393 | self.io |
1394 | .registration() |
1395 | .try_io(interest, || self.io.try_io(f)) |
1396 | } |
1397 | |
1398 | /// Reads or writes from the socket using a user-provided IO operation. |
1399 | /// |
1400 | /// The readiness of the socket is awaited and when the socket is ready, |
1401 | /// the provided closure is called. The closure should attempt to perform |
1402 | /// IO operation on the socket by manually calling the appropriate syscall. |
1403 | /// If the operation fails because the socket is not actually ready, |
1404 | /// then the closure should return a `WouldBlock` error. In such case the |
1405 | /// readiness flag is cleared and the socket readiness is awaited again. |
1406 | /// This loop is repeated until the closure returns an `Ok` or an error |
1407 | /// other than `WouldBlock`. |
1408 | /// |
1409 | /// The closure should only return a `WouldBlock` error if it has performed |
1410 | /// an IO operation on the socket that failed due to the socket not being |
1411 | /// ready. Returning a `WouldBlock` error in any other situation will |
1412 | /// incorrectly clear the readiness flag, which can cause the socket to |
1413 | /// behave incorrectly. |
1414 | /// |
1415 | /// The closure should not perform the IO operation using any of the methods |
1416 | /// defined on the Tokio `UnixDatagram` type, as this will mess with the |
1417 | /// readiness flag and can cause the socket to behave incorrectly. |
1418 | /// |
1419 | /// This method is not intended to be used with combined interests. |
1420 | /// The closure should perform only one type of IO operation, so it should not |
1421 | /// require more than one ready state. This method may panic or sleep forever |
1422 | /// if it is called with a combined interest. |
1423 | pub async fn async_io<R>( |
1424 | &self, |
1425 | interest: Interest, |
1426 | mut f: impl FnMut() -> io::Result<R>, |
1427 | ) -> io::Result<R> { |
1428 | self.io |
1429 | .registration() |
1430 | .async_io(interest, || self.io.try_io(&mut f)) |
1431 | .await |
1432 | } |
1433 | |
1434 | /// Returns the local address that this socket is bound to. |
1435 | /// |
1436 | /// # Examples |
1437 | /// For a socket bound to a local path |
1438 | /// ``` |
1439 | /// # if cfg!(miri) { return } // No `socket` in miri. |
1440 | /// # use std::error::Error; |
1441 | /// # #[tokio::main] |
1442 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
1443 | /// use tokio::net::UnixDatagram; |
1444 | /// use tempfile::tempdir; |
1445 | /// |
1446 | /// // We use a temporary directory so that the socket |
1447 | /// // files left by the bound sockets will get cleaned up. |
1448 | /// let tmp = tempdir()?; |
1449 | /// |
1450 | /// // Bind socket to a filesystem path |
1451 | /// let socket_path = tmp.path().join("socket" ); |
1452 | /// let socket = UnixDatagram::bind(&socket_path)?; |
1453 | /// |
1454 | /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path); |
1455 | /// |
1456 | /// # Ok(()) |
1457 | /// # } |
1458 | /// ``` |
1459 | /// |
1460 | /// For an unbound socket |
1461 | /// ``` |
1462 | /// # if cfg!(miri) { return } // No `socket` in miri. |
1463 | /// # use std::error::Error; |
1464 | /// # #[tokio::main] |
1465 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
1466 | /// use tokio::net::UnixDatagram; |
1467 | /// |
1468 | /// // Create an unbound socket |
1469 | /// let socket = UnixDatagram::unbound()?; |
1470 | /// |
1471 | /// assert!(socket.local_addr()?.is_unnamed()); |
1472 | /// |
1473 | /// # Ok(()) |
1474 | /// # } |
1475 | /// ``` |
1476 | pub fn local_addr(&self) -> io::Result<SocketAddr> { |
1477 | self.io.local_addr().map(SocketAddr) |
1478 | } |
1479 | |
1480 | /// Returns the address of this socket's peer. |
1481 | /// |
1482 | /// The `connect` method will connect the socket to a peer. |
1483 | /// |
1484 | /// # Examples |
1485 | /// For a peer with a local path |
1486 | /// ``` |
1487 | /// # if cfg!(miri) { return } // No `socket` in miri. |
1488 | /// # use std::error::Error; |
1489 | /// # #[tokio::main] |
1490 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
1491 | /// use tokio::net::UnixDatagram; |
1492 | /// use tempfile::tempdir; |
1493 | /// |
1494 | /// // Create an unbound socket |
1495 | /// let tx = UnixDatagram::unbound()?; |
1496 | /// |
1497 | /// // Create another, bound socket |
1498 | /// let tmp = tempdir()?; |
1499 | /// let rx_path = tmp.path().join("rx" ); |
1500 | /// let rx = UnixDatagram::bind(&rx_path)?; |
1501 | /// |
1502 | /// // Connect to the bound socket |
1503 | /// tx.connect(&rx_path)?; |
1504 | /// |
1505 | /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path); |
1506 | /// |
1507 | /// # Ok(()) |
1508 | /// # } |
1509 | /// ``` |
1510 | /// |
1511 | /// For an unbound peer |
1512 | /// ``` |
1513 | /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri. |
1514 | /// # use std::error::Error; |
1515 | /// # #[tokio::main] |
1516 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
1517 | /// use tokio::net::UnixDatagram; |
1518 | /// |
1519 | /// // Create the pair of sockets |
1520 | /// let (sock1, sock2) = UnixDatagram::pair()?; |
1521 | /// |
1522 | /// assert!(sock1.peer_addr()?.is_unnamed()); |
1523 | /// |
1524 | /// # Ok(()) |
1525 | /// # } |
1526 | /// ``` |
1527 | pub fn peer_addr(&self) -> io::Result<SocketAddr> { |
1528 | self.io.peer_addr().map(SocketAddr) |
1529 | } |
1530 | |
1531 | /// Returns the value of the `SO_ERROR` option. |
1532 | /// |
1533 | /// # Examples |
1534 | /// ``` |
1535 | /// # if cfg!(miri) { return } // No `socket` in miri. |
1536 | /// # use std::error::Error; |
1537 | /// # #[tokio::main] |
1538 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
1539 | /// use tokio::net::UnixDatagram; |
1540 | /// |
1541 | /// // Create an unbound socket |
1542 | /// let socket = UnixDatagram::unbound()?; |
1543 | /// |
1544 | /// if let Ok(Some(err)) = socket.take_error() { |
1545 | /// println!("Got error: {:?}" , err); |
1546 | /// } |
1547 | /// |
1548 | /// # Ok(()) |
1549 | /// # } |
1550 | /// ``` |
1551 | pub fn take_error(&self) -> io::Result<Option<io::Error>> { |
1552 | self.io.take_error() |
1553 | } |
1554 | |
1555 | /// Shuts down the read, write, or both halves of this connection. |
1556 | /// |
1557 | /// This function will cause all pending and future I/O calls on the |
1558 | /// specified portions to immediately return with an appropriate value |
1559 | /// (see the documentation of `Shutdown`). |
1560 | /// |
1561 | /// # Examples |
1562 | /// ``` |
1563 | /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri. |
1564 | /// # use std::error::Error; |
1565 | /// # #[tokio::main] |
1566 | /// # async fn main() -> Result<(), Box<dyn Error>> { |
1567 | /// use tokio::net::UnixDatagram; |
1568 | /// use std::net::Shutdown; |
1569 | /// |
1570 | /// // Create an unbound socket |
1571 | /// let (socket, other) = UnixDatagram::pair()?; |
1572 | /// |
1573 | /// socket.shutdown(Shutdown::Both)?; |
1574 | /// |
1575 | /// // NOTE: the following commented out code does NOT work as expected. |
1576 | /// // Due to an underlying issue, the recv call will block indefinitely. |
1577 | /// // See: https://github.com/tokio-rs/tokio/issues/1679 |
1578 | /// //let mut buff = vec![0u8; 24]; |
1579 | /// //let size = socket.recv(&mut buff).await?; |
1580 | /// //assert_eq!(size, 0); |
1581 | /// |
1582 | /// let send_result = socket.send(b"hello world" ).await; |
1583 | /// assert!(send_result.is_err()); |
1584 | /// |
1585 | /// # Ok(()) |
1586 | /// # } |
1587 | /// ``` |
1588 | pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { |
1589 | self.io.shutdown(how) |
1590 | } |
1591 | } |
1592 | |
1593 | impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram { |
1594 | type Error = io::Error; |
1595 | |
1596 | /// Consumes stream, returning the Tokio I/O object. |
1597 | /// |
1598 | /// This is equivalent to |
1599 | /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std). |
1600 | fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> { |
1601 | Self::from_std(datagram:stream) |
1602 | } |
1603 | } |
1604 | |
1605 | impl fmt::Debug for UnixDatagram { |
1606 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1607 | self.io.fmt(f) |
1608 | } |
1609 | } |
1610 | |
1611 | impl AsRawFd for UnixDatagram { |
1612 | fn as_raw_fd(&self) -> RawFd { |
1613 | self.io.as_raw_fd() |
1614 | } |
1615 | } |
1616 | |
1617 | impl AsFd for UnixDatagram { |
1618 | fn as_fd(&self) -> BorrowedFd<'_> { |
1619 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
1620 | } |
1621 | } |
1622 | |