1 | use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; |
2 | use crate::net::unix::split::{split, ReadHalf, WriteHalf}; |
3 | use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; |
4 | use crate::net::unix::ucred::{self, UCred}; |
5 | use crate::net::unix::SocketAddr; |
6 | use crate::util::check_socket_for_blocking; |
7 | |
8 | use std::fmt; |
9 | use std::future::poll_fn; |
10 | use std::io::{self, Read, Write}; |
11 | use std::net::Shutdown; |
12 | #[cfg (target_os = "android" )] |
13 | use std::os::android::net::SocketAddrExt; |
14 | #[cfg (target_os = "linux" )] |
15 | use std::os::linux::net::SocketAddrExt; |
16 | #[cfg (any(target_os = "linux" , target_os = "android" ))] |
17 | use std::os::unix::ffi::OsStrExt; |
18 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; |
19 | use std::os::unix::net::{self, SocketAddr as StdSocketAddr}; |
20 | use std::path::Path; |
21 | use std::pin::Pin; |
22 | use std::task::{Context, Poll}; |
23 | |
24 | cfg_io_util! { |
25 | use bytes::BufMut; |
26 | } |
27 | |
28 | cfg_net_unix! { |
29 | /// A structure representing a connected Unix socket. |
30 | /// |
31 | /// This socket can be connected directly with [`UnixStream::connect`] or accepted |
32 | /// from a listener with [`UnixListener::accept`]. Additionally, a pair of |
33 | /// anonymous Unix sockets can be created with `UnixStream::pair`. |
34 | /// |
35 | /// To shut down the stream in the write direction, you can call the |
36 | /// [`shutdown()`] method. This will cause the other peer to receive a read of |
37 | /// length 0, indicating that no more data will be sent. This only closes |
38 | /// the stream in one direction. |
39 | /// |
40 | /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown |
41 | /// [`UnixListener::accept`]: crate::net::UnixListener::accept |
42 | #[cfg_attr (docsrs, doc(alias = "uds" ))] |
43 | pub struct UnixStream { |
44 | io: PollEvented<mio::net::UnixStream>, |
45 | } |
46 | } |
47 | |
48 | impl UnixStream { |
49 | pub(crate) async fn connect_mio(sys: mio::net::UnixStream) -> io::Result<UnixStream> { |
50 | let stream = UnixStream::new(sys)?; |
51 | |
52 | // Once we've connected, wait for the stream to be writable as |
53 | // that's when the actual connection has been initiated. Once we're |
54 | // writable we check for `take_socket_error` to see if the connect |
55 | // actually hit an error or not. |
56 | // |
57 | // If all that succeeded then we ship everything on up. |
58 | poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; |
59 | |
60 | if let Some(e) = stream.io.take_error()? { |
61 | return Err(e); |
62 | } |
63 | |
64 | Ok(stream) |
65 | } |
66 | |
67 | /// Connects to the socket named by `path`. |
68 | /// |
69 | /// This function will create a new Unix socket and connect to the path |
70 | /// specified, associating the returned stream with the default event loop's |
71 | /// handle. |
72 | pub async fn connect<P>(path: P) -> io::Result<UnixStream> |
73 | where |
74 | P: AsRef<Path>, |
75 | { |
76 | // On linux, abstract socket paths need to be considered. |
77 | #[cfg (any(target_os = "linux" , target_os = "android" ))] |
78 | let addr = { |
79 | let os_str_bytes = path.as_ref().as_os_str().as_bytes(); |
80 | if os_str_bytes.starts_with(b" \0" ) { |
81 | StdSocketAddr::from_abstract_name(&os_str_bytes[1..])? |
82 | } else { |
83 | StdSocketAddr::from_pathname(path)? |
84 | } |
85 | }; |
86 | #[cfg (not(any(target_os = "linux" , target_os = "android" )))] |
87 | let addr = StdSocketAddr::from_pathname(path)?; |
88 | |
89 | let stream = mio::net::UnixStream::connect_addr(&addr)?; |
90 | let stream = UnixStream::new(stream)?; |
91 | |
92 | poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; |
93 | |
94 | if let Some(e) = stream.io.take_error()? { |
95 | return Err(e); |
96 | } |
97 | |
98 | Ok(stream) |
99 | } |
100 | |
101 | /// Waits for any of the requested ready states. |
102 | /// |
103 | /// This function is usually paired with `try_read()` or `try_write()`. It |
104 | /// can be used to concurrently read / write to the same socket on a single |
105 | /// task without splitting the socket. |
106 | /// |
107 | /// The function may complete without the socket being ready. This is a |
108 | /// false-positive and attempting an operation will return with |
109 | /// `io::ErrorKind::WouldBlock`. The function can also return with an empty |
110 | /// [`Ready`] set, so you should always check the returned value and possibly |
111 | /// wait again if the requested states are not set. |
112 | /// |
113 | /// # Cancel safety |
114 | /// |
115 | /// This method is cancel safe. Once a readiness event occurs, the method |
116 | /// will continue to return immediately until the readiness event is |
117 | /// consumed by an attempt to read or write that fails with `WouldBlock` or |
118 | /// `Poll::Pending`. |
119 | /// |
120 | /// # Examples |
121 | /// |
122 | /// Concurrently read and write to the stream on the same task without |
123 | /// splitting. |
124 | /// |
125 | /// ```no_run |
126 | /// use tokio::io::Interest; |
127 | /// use tokio::net::UnixStream; |
128 | /// use std::error::Error; |
129 | /// use std::io; |
130 | /// |
131 | /// #[tokio::main] |
132 | /// async fn main() -> Result<(), Box<dyn Error>> { |
133 | /// let dir = tempfile::tempdir().unwrap(); |
134 | /// let bind_path = dir.path().join("bind_path" ); |
135 | /// let stream = UnixStream::connect(bind_path).await?; |
136 | /// |
137 | /// loop { |
138 | /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?; |
139 | /// |
140 | /// if ready.is_readable() { |
141 | /// let mut data = vec![0; 1024]; |
142 | /// // Try to read data, this may still fail with `WouldBlock` |
143 | /// // if the readiness event is a false positive. |
144 | /// match stream.try_read(&mut data) { |
145 | /// Ok(n) => { |
146 | /// println!("read {} bytes" , n); |
147 | /// } |
148 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
149 | /// continue; |
150 | /// } |
151 | /// Err(e) => { |
152 | /// return Err(e.into()); |
153 | /// } |
154 | /// } |
155 | /// |
156 | /// } |
157 | /// |
158 | /// if ready.is_writable() { |
159 | /// // Try to write data, this may still fail with `WouldBlock` |
160 | /// // if the readiness event is a false positive. |
161 | /// match stream.try_write(b"hello world" ) { |
162 | /// Ok(n) => { |
163 | /// println!("write {} bytes" , n); |
164 | /// } |
165 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
166 | /// continue; |
167 | /// } |
168 | /// Err(e) => { |
169 | /// return Err(e.into()); |
170 | /// } |
171 | /// } |
172 | /// } |
173 | /// } |
174 | /// } |
175 | /// ``` |
176 | pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { |
177 | let event = self.io.registration().readiness(interest).await?; |
178 | Ok(event.ready) |
179 | } |
180 | |
181 | /// Waits for the socket to become readable. |
182 | /// |
183 | /// This function is equivalent to `ready(Interest::READABLE)` and is usually |
184 | /// paired with `try_read()`. |
185 | /// |
186 | /// # Cancel safety |
187 | /// |
188 | /// This method is cancel safe. Once a readiness event occurs, the method |
189 | /// will continue to return immediately until the readiness event is |
190 | /// consumed by an attempt to read that fails with `WouldBlock` or |
191 | /// `Poll::Pending`. |
192 | /// |
193 | /// # Examples |
194 | /// |
195 | /// ```no_run |
196 | /// use tokio::net::UnixStream; |
197 | /// use std::error::Error; |
198 | /// use std::io; |
199 | /// |
200 | /// #[tokio::main] |
201 | /// async fn main() -> Result<(), Box<dyn Error>> { |
202 | /// // Connect to a peer |
203 | /// let dir = tempfile::tempdir().unwrap(); |
204 | /// let bind_path = dir.path().join("bind_path" ); |
205 | /// let stream = UnixStream::connect(bind_path).await?; |
206 | /// |
207 | /// let mut msg = vec![0; 1024]; |
208 | /// |
209 | /// loop { |
210 | /// // Wait for the socket to be readable |
211 | /// stream.readable().await?; |
212 | /// |
213 | /// // Try to read data, this may still fail with `WouldBlock` |
214 | /// // if the readiness event is a false positive. |
215 | /// match stream.try_read(&mut msg) { |
216 | /// Ok(n) => { |
217 | /// msg.truncate(n); |
218 | /// break; |
219 | /// } |
220 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
221 | /// continue; |
222 | /// } |
223 | /// Err(e) => { |
224 | /// return Err(e.into()); |
225 | /// } |
226 | /// } |
227 | /// } |
228 | /// |
229 | /// println!("GOT = {:?}" , msg); |
230 | /// Ok(()) |
231 | /// } |
232 | /// ``` |
233 | pub async fn readable(&self) -> io::Result<()> { |
234 | self.ready(Interest::READABLE).await?; |
235 | Ok(()) |
236 | } |
237 | |
238 | /// Polls for read readiness. |
239 | /// |
240 | /// If the unix stream is not currently ready for reading, this method will |
241 | /// store a clone of the `Waker` from the provided `Context`. When the unix |
242 | /// stream becomes ready for reading, `Waker::wake` will be called on the |
243 | /// waker. |
244 | /// |
245 | /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only |
246 | /// the `Waker` from the `Context` passed to the most recent call is |
247 | /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a |
248 | /// second, independent waker.) |
249 | /// |
250 | /// This function is intended for cases where creating and pinning a future |
251 | /// via [`readable`] is not feasible. Where possible, using [`readable`] is |
252 | /// preferred, as this supports polling from multiple tasks at once. |
253 | /// |
254 | /// # Return value |
255 | /// |
256 | /// The function returns: |
257 | /// |
258 | /// * `Poll::Pending` if the unix stream is not ready for reading. |
259 | /// * `Poll::Ready(Ok(()))` if the unix stream is ready for reading. |
260 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
261 | /// |
262 | /// # Errors |
263 | /// |
264 | /// This function may encounter any standard I/O error except `WouldBlock`. |
265 | /// |
266 | /// [`readable`]: method@Self::readable |
267 | pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
268 | self.io.registration().poll_read_ready(cx).map_ok(|_| ()) |
269 | } |
270 | |
271 | /// Try to read data from the stream into the provided buffer, returning how |
272 | /// many bytes were read. |
273 | /// |
274 | /// Receives any pending data from the socket but does not wait for new data |
275 | /// to arrive. On success, returns the number of bytes read. Because |
276 | /// `try_read()` is non-blocking, the buffer does not have to be stored by |
277 | /// the async task and can exist entirely on the stack. |
278 | /// |
279 | /// Usually, [`readable()`] or [`ready()`] is used with this function. |
280 | /// |
281 | /// [`readable()`]: UnixStream::readable() |
282 | /// [`ready()`]: UnixStream::ready() |
283 | /// |
284 | /// # Return |
285 | /// |
286 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
287 | /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: |
288 | /// |
289 | /// 1. The stream's read half is closed and will no longer yield data. |
290 | /// 2. The specified buffer was 0 bytes in length. |
291 | /// |
292 | /// If the stream is not ready to read data, |
293 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
294 | /// |
295 | /// # Examples |
296 | /// |
297 | /// ```no_run |
298 | /// use tokio::net::UnixStream; |
299 | /// use std::error::Error; |
300 | /// use std::io; |
301 | /// |
302 | /// #[tokio::main] |
303 | /// async fn main() -> Result<(), Box<dyn Error>> { |
304 | /// // Connect to a peer |
305 | /// let dir = tempfile::tempdir().unwrap(); |
306 | /// let bind_path = dir.path().join("bind_path" ); |
307 | /// let stream = UnixStream::connect(bind_path).await?; |
308 | /// |
309 | /// loop { |
310 | /// // Wait for the socket to be readable |
311 | /// stream.readable().await?; |
312 | /// |
313 | /// // Creating the buffer **after** the `await` prevents it from |
314 | /// // being stored in the async task. |
315 | /// let mut buf = [0; 4096]; |
316 | /// |
317 | /// // Try to read data, this may still fail with `WouldBlock` |
318 | /// // if the readiness event is a false positive. |
319 | /// match stream.try_read(&mut buf) { |
320 | /// Ok(0) => break, |
321 | /// Ok(n) => { |
322 | /// println!("read {} bytes" , n); |
323 | /// } |
324 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
325 | /// continue; |
326 | /// } |
327 | /// Err(e) => { |
328 | /// return Err(e.into()); |
329 | /// } |
330 | /// } |
331 | /// } |
332 | /// |
333 | /// Ok(()) |
334 | /// } |
335 | /// ``` |
336 | pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { |
337 | self.io |
338 | .registration() |
339 | .try_io(Interest::READABLE, || (&*self.io).read(buf)) |
340 | } |
341 | |
342 | /// Tries to read data from the stream into the provided buffers, returning |
343 | /// how many bytes were read. |
344 | /// |
345 | /// Data is copied to fill each buffer in order, with the final buffer |
346 | /// written to possibly being only partially filled. This method behaves |
347 | /// equivalently to a single call to [`try_read()`] with concatenated |
348 | /// buffers. |
349 | /// |
350 | /// Receives any pending data from the socket but does not wait for new data |
351 | /// to arrive. On success, returns the number of bytes read. Because |
352 | /// `try_read_vectored()` is non-blocking, the buffer does not have to be |
353 | /// stored by the async task and can exist entirely on the stack. |
354 | /// |
355 | /// Usually, [`readable()`] or [`ready()`] is used with this function. |
356 | /// |
357 | /// [`try_read()`]: UnixStream::try_read() |
358 | /// [`readable()`]: UnixStream::readable() |
359 | /// [`ready()`]: UnixStream::ready() |
360 | /// |
361 | /// # Return |
362 | /// |
363 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
364 | /// number of bytes read. `Ok(0)` indicates the stream's read half is closed |
365 | /// and will no longer yield data. If the stream is not ready to read data |
366 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
367 | /// |
368 | /// # Examples |
369 | /// |
370 | /// ```no_run |
371 | /// use tokio::net::UnixStream; |
372 | /// use std::error::Error; |
373 | /// use std::io::{self, IoSliceMut}; |
374 | /// |
375 | /// #[tokio::main] |
376 | /// async fn main() -> Result<(), Box<dyn Error>> { |
377 | /// // Connect to a peer |
378 | /// let dir = tempfile::tempdir().unwrap(); |
379 | /// let bind_path = dir.path().join("bind_path" ); |
380 | /// let stream = UnixStream::connect(bind_path).await?; |
381 | /// |
382 | /// loop { |
383 | /// // Wait for the socket to be readable |
384 | /// stream.readable().await?; |
385 | /// |
386 | /// // Creating the buffer **after** the `await` prevents it from |
387 | /// // being stored in the async task. |
388 | /// let mut buf_a = [0; 512]; |
389 | /// let mut buf_b = [0; 1024]; |
390 | /// let mut bufs = [ |
391 | /// IoSliceMut::new(&mut buf_a), |
392 | /// IoSliceMut::new(&mut buf_b), |
393 | /// ]; |
394 | /// |
395 | /// // Try to read data, this may still fail with `WouldBlock` |
396 | /// // if the readiness event is a false positive. |
397 | /// match stream.try_read_vectored(&mut bufs) { |
398 | /// Ok(0) => break, |
399 | /// Ok(n) => { |
400 | /// println!("read {} bytes" , n); |
401 | /// } |
402 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
403 | /// continue; |
404 | /// } |
405 | /// Err(e) => { |
406 | /// return Err(e.into()); |
407 | /// } |
408 | /// } |
409 | /// } |
410 | /// |
411 | /// Ok(()) |
412 | /// } |
413 | /// ``` |
414 | pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { |
415 | self.io |
416 | .registration() |
417 | .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) |
418 | } |
419 | |
420 | cfg_io_util! { |
421 | /// Tries to read data from the stream into the provided buffer, advancing the |
422 | /// buffer's internal cursor, returning how many bytes were read. |
423 | /// |
424 | /// Receives any pending data from the socket but does not wait for new data |
425 | /// to arrive. On success, returns the number of bytes read. Because |
426 | /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by |
427 | /// the async task and can exist entirely on the stack. |
428 | /// |
429 | /// Usually, [`readable()`] or [`ready()`] is used with this function. |
430 | /// |
431 | /// [`readable()`]: UnixStream::readable() |
432 | /// [`ready()`]: UnixStream::ready() |
433 | /// |
434 | /// # Return |
435 | /// |
436 | /// If data is successfully read, `Ok(n)` is returned, where `n` is the |
437 | /// number of bytes read. `Ok(0)` indicates the stream's read half is closed |
438 | /// and will no longer yield data. If the stream is not ready to read data |
439 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
440 | /// |
441 | /// # Examples |
442 | /// |
443 | /// ```no_run |
444 | /// use tokio::net::UnixStream; |
445 | /// use std::error::Error; |
446 | /// use std::io; |
447 | /// |
448 | /// #[tokio::main] |
449 | /// async fn main() -> Result<(), Box<dyn Error>> { |
450 | /// // Connect to a peer |
451 | /// let dir = tempfile::tempdir().unwrap(); |
452 | /// let bind_path = dir.path().join("bind_path"); |
453 | /// let stream = UnixStream::connect(bind_path).await?; |
454 | /// |
455 | /// loop { |
456 | /// // Wait for the socket to be readable |
457 | /// stream.readable().await?; |
458 | /// |
459 | /// let mut buf = Vec::with_capacity(4096); |
460 | /// |
461 | /// // Try to read data, this may still fail with `WouldBlock` |
462 | /// // if the readiness event is a false positive. |
463 | /// match stream.try_read_buf(&mut buf) { |
464 | /// Ok(0) => break, |
465 | /// Ok(n) => { |
466 | /// println!("read {} bytes", n); |
467 | /// } |
468 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
469 | /// continue; |
470 | /// } |
471 | /// Err(e) => { |
472 | /// return Err(e.into()); |
473 | /// } |
474 | /// } |
475 | /// } |
476 | /// |
477 | /// Ok(()) |
478 | /// } |
479 | /// ``` |
480 | pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { |
481 | self.io.registration().try_io(Interest::READABLE, || { |
482 | use std::io::Read; |
483 | |
484 | let dst = buf.chunk_mut(); |
485 | let dst = |
486 | unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; |
487 | |
488 | // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the |
489 | // buffer. |
490 | let n = (&*self.io).read(dst)?; |
491 | |
492 | unsafe { |
493 | buf.advance_mut(n); |
494 | } |
495 | |
496 | Ok(n) |
497 | }) |
498 | } |
499 | } |
500 | |
501 | /// Waits for the socket to become writable. |
502 | /// |
503 | /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually |
504 | /// paired with `try_write()`. |
505 | /// |
506 | /// # Cancel safety |
507 | /// |
508 | /// This method is cancel safe. Once a readiness event occurs, the method |
509 | /// will continue to return immediately until the readiness event is |
510 | /// consumed by an attempt to write that fails with `WouldBlock` or |
511 | /// `Poll::Pending`. |
512 | /// |
513 | /// # Examples |
514 | /// |
515 | /// ```no_run |
516 | /// use tokio::net::UnixStream; |
517 | /// use std::error::Error; |
518 | /// use std::io; |
519 | /// |
520 | /// #[tokio::main] |
521 | /// async fn main() -> Result<(), Box<dyn Error>> { |
522 | /// // Connect to a peer |
523 | /// let dir = tempfile::tempdir().unwrap(); |
524 | /// let bind_path = dir.path().join("bind_path" ); |
525 | /// let stream = UnixStream::connect(bind_path).await?; |
526 | /// |
527 | /// loop { |
528 | /// // Wait for the socket to be writable |
529 | /// stream.writable().await?; |
530 | /// |
531 | /// // Try to write data, this may still fail with `WouldBlock` |
532 | /// // if the readiness event is a false positive. |
533 | /// match stream.try_write(b"hello world" ) { |
534 | /// Ok(n) => { |
535 | /// break; |
536 | /// } |
537 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
538 | /// continue; |
539 | /// } |
540 | /// Err(e) => { |
541 | /// return Err(e.into()); |
542 | /// } |
543 | /// } |
544 | /// } |
545 | /// |
546 | /// Ok(()) |
547 | /// } |
548 | /// ``` |
549 | pub async fn writable(&self) -> io::Result<()> { |
550 | self.ready(Interest::WRITABLE).await?; |
551 | Ok(()) |
552 | } |
553 | |
554 | /// Polls for write readiness. |
555 | /// |
556 | /// If the unix stream is not currently ready for writing, this method will |
557 | /// store a clone of the `Waker` from the provided `Context`. When the unix |
558 | /// stream becomes ready for writing, `Waker::wake` will be called on the |
559 | /// waker. |
560 | /// |
561 | /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only |
562 | /// the `Waker` from the `Context` passed to the most recent call is |
563 | /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a |
564 | /// second, independent waker.) |
565 | /// |
566 | /// This function is intended for cases where creating and pinning a future |
567 | /// via [`writable`] is not feasible. Where possible, using [`writable`] is |
568 | /// preferred, as this supports polling from multiple tasks at once. |
569 | /// |
570 | /// # Return value |
571 | /// |
572 | /// The function returns: |
573 | /// |
574 | /// * `Poll::Pending` if the unix stream is not ready for writing. |
575 | /// * `Poll::Ready(Ok(()))` if the unix stream is ready for writing. |
576 | /// * `Poll::Ready(Err(e))` if an error is encountered. |
577 | /// |
578 | /// # Errors |
579 | /// |
580 | /// This function may encounter any standard I/O error except `WouldBlock`. |
581 | /// |
582 | /// [`writable`]: method@Self::writable |
583 | pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
584 | self.io.registration().poll_write_ready(cx).map_ok(|_| ()) |
585 | } |
586 | |
587 | /// Tries to write a buffer to the stream, returning how many bytes were |
588 | /// written. |
589 | /// |
590 | /// The function will attempt to write the entire contents of `buf`, but |
591 | /// only part of the buffer may be written. |
592 | /// |
593 | /// This function is usually paired with `writable()`. |
594 | /// |
595 | /// # Return |
596 | /// |
597 | /// If data is successfully written, `Ok(n)` is returned, where `n` is the |
598 | /// number of bytes written. If the stream is not ready to write data, |
599 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
600 | /// |
601 | /// # Examples |
602 | /// |
603 | /// ```no_run |
604 | /// use tokio::net::UnixStream; |
605 | /// use std::error::Error; |
606 | /// use std::io; |
607 | /// |
608 | /// #[tokio::main] |
609 | /// async fn main() -> Result<(), Box<dyn Error>> { |
610 | /// // Connect to a peer |
611 | /// let dir = tempfile::tempdir().unwrap(); |
612 | /// let bind_path = dir.path().join("bind_path" ); |
613 | /// let stream = UnixStream::connect(bind_path).await?; |
614 | /// |
615 | /// loop { |
616 | /// // Wait for the socket to be writable |
617 | /// stream.writable().await?; |
618 | /// |
619 | /// // Try to write data, this may still fail with `WouldBlock` |
620 | /// // if the readiness event is a false positive. |
621 | /// match stream.try_write(b"hello world" ) { |
622 | /// Ok(n) => { |
623 | /// break; |
624 | /// } |
625 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
626 | /// continue; |
627 | /// } |
628 | /// Err(e) => { |
629 | /// return Err(e.into()); |
630 | /// } |
631 | /// } |
632 | /// } |
633 | /// |
634 | /// Ok(()) |
635 | /// } |
636 | /// ``` |
637 | pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { |
638 | self.io |
639 | .registration() |
640 | .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) |
641 | } |
642 | |
643 | /// Tries to write several buffers to the stream, returning how many bytes |
644 | /// were written. |
645 | /// |
646 | /// Data is written from each buffer in order, with the final buffer read |
647 | /// from possible being only partially consumed. This method behaves |
648 | /// equivalently to a single call to [`try_write()`] with concatenated |
649 | /// buffers. |
650 | /// |
651 | /// This function is usually paired with `writable()`. |
652 | /// |
653 | /// [`try_write()`]: UnixStream::try_write() |
654 | /// |
655 | /// # Return |
656 | /// |
657 | /// If data is successfully written, `Ok(n)` is returned, where `n` is the |
658 | /// number of bytes written. If the stream is not ready to write data, |
659 | /// `Err(io::ErrorKind::WouldBlock)` is returned. |
660 | /// |
661 | /// # Examples |
662 | /// |
663 | /// ```no_run |
664 | /// use tokio::net::UnixStream; |
665 | /// use std::error::Error; |
666 | /// use std::io; |
667 | /// |
668 | /// #[tokio::main] |
669 | /// async fn main() -> Result<(), Box<dyn Error>> { |
670 | /// // Connect to a peer |
671 | /// let dir = tempfile::tempdir().unwrap(); |
672 | /// let bind_path = dir.path().join("bind_path" ); |
673 | /// let stream = UnixStream::connect(bind_path).await?; |
674 | /// |
675 | /// let bufs = [io::IoSlice::new(b"hello " ), io::IoSlice::new(b"world" )]; |
676 | /// |
677 | /// loop { |
678 | /// // Wait for the socket to be writable |
679 | /// stream.writable().await?; |
680 | /// |
681 | /// // Try to write data, this may still fail with `WouldBlock` |
682 | /// // if the readiness event is a false positive. |
683 | /// match stream.try_write_vectored(&bufs) { |
684 | /// Ok(n) => { |
685 | /// break; |
686 | /// } |
687 | /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
688 | /// continue; |
689 | /// } |
690 | /// Err(e) => { |
691 | /// return Err(e.into()); |
692 | /// } |
693 | /// } |
694 | /// } |
695 | /// |
696 | /// Ok(()) |
697 | /// } |
698 | /// ``` |
699 | pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { |
700 | self.io |
701 | .registration() |
702 | .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) |
703 | } |
704 | |
705 | /// Tries to read or write from the socket using a user-provided IO operation. |
706 | /// |
707 | /// If the socket is ready, the provided closure is called. The closure |
708 | /// should attempt to perform IO operation on the socket by manually |
709 | /// calling the appropriate syscall. If the operation fails because the |
710 | /// socket is not actually ready, then the closure should return a |
711 | /// `WouldBlock` error and the readiness flag is cleared. The return value |
712 | /// of the closure is then returned by `try_io`. |
713 | /// |
714 | /// If the socket is not ready, then the closure is not called |
715 | /// and a `WouldBlock` error is returned. |
716 | /// |
717 | /// The closure should only return a `WouldBlock` error if it has performed |
718 | /// an IO operation on the socket that failed due to the socket not being |
719 | /// ready. Returning a `WouldBlock` error in any other situation will |
720 | /// incorrectly clear the readiness flag, which can cause the socket to |
721 | /// behave incorrectly. |
722 | /// |
723 | /// The closure should not perform the IO operation using any of the methods |
724 | /// defined on the Tokio `UnixStream` type, as this will mess with the |
725 | /// readiness flag and can cause the socket to behave incorrectly. |
726 | /// |
727 | /// This method is not intended to be used with combined interests. |
728 | /// The closure should perform only one type of IO operation, so it should not |
729 | /// require more than one ready state. This method may panic or sleep forever |
730 | /// if it is called with a combined interest. |
731 | /// |
732 | /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. |
733 | /// |
734 | /// [`readable()`]: UnixStream::readable() |
735 | /// [`writable()`]: UnixStream::writable() |
736 | /// [`ready()`]: UnixStream::ready() |
737 | pub fn try_io<R>( |
738 | &self, |
739 | interest: Interest, |
740 | f: impl FnOnce() -> io::Result<R>, |
741 | ) -> io::Result<R> { |
742 | self.io |
743 | .registration() |
744 | .try_io(interest, || self.io.try_io(f)) |
745 | } |
746 | |
747 | /// Reads or writes from the socket using a user-provided IO operation. |
748 | /// |
749 | /// The readiness of the socket is awaited and when the socket is ready, |
750 | /// the provided closure is called. The closure should attempt to perform |
751 | /// IO operation on the socket by manually calling the appropriate syscall. |
752 | /// If the operation fails because the socket is not actually ready, |
753 | /// then the closure should return a `WouldBlock` error. In such case the |
754 | /// readiness flag is cleared and the socket readiness is awaited again. |
755 | /// This loop is repeated until the closure returns an `Ok` or an error |
756 | /// other than `WouldBlock`. |
757 | /// |
758 | /// The closure should only return a `WouldBlock` error if it has performed |
759 | /// an IO operation on the socket that failed due to the socket not being |
760 | /// ready. Returning a `WouldBlock` error in any other situation will |
761 | /// incorrectly clear the readiness flag, which can cause the socket to |
762 | /// behave incorrectly. |
763 | /// |
764 | /// The closure should not perform the IO operation using any of the methods |
765 | /// defined on the Tokio `UnixStream` type, as this will mess with the |
766 | /// readiness flag and can cause the socket to behave incorrectly. |
767 | /// |
768 | /// This method is not intended to be used with combined interests. |
769 | /// The closure should perform only one type of IO operation, so it should not |
770 | /// require more than one ready state. This method may panic or sleep forever |
771 | /// if it is called with a combined interest. |
772 | pub async fn async_io<R>( |
773 | &self, |
774 | interest: Interest, |
775 | mut f: impl FnMut() -> io::Result<R>, |
776 | ) -> io::Result<R> { |
777 | self.io |
778 | .registration() |
779 | .async_io(interest, || self.io.try_io(&mut f)) |
780 | .await |
781 | } |
782 | |
783 | /// Creates new [`UnixStream`] from a [`std::os::unix::net::UnixStream`]. |
784 | /// |
785 | /// This function is intended to be used to wrap a `UnixStream` from the |
786 | /// standard library in the Tokio equivalent. |
787 | /// |
788 | /// # Notes |
789 | /// |
790 | /// The caller is responsible for ensuring that the stream is in |
791 | /// non-blocking mode. Otherwise all I/O operations on the stream |
792 | /// will block the thread, which will cause unexpected behavior. |
793 | /// Non-blocking mode can be set using [`set_nonblocking`]. |
794 | /// |
795 | /// Passing a listener in blocking mode is always erroneous, |
796 | /// and the behavior in that case may change in the future. |
797 | /// For example, it could panic. |
798 | /// |
799 | /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking |
800 | /// |
801 | /// # Examples |
802 | /// |
803 | /// ```no_run |
804 | /// use tokio::net::UnixStream; |
805 | /// use std::os::unix::net::UnixStream as StdUnixStream; |
806 | /// # use std::error::Error; |
807 | /// |
808 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
809 | /// let std_stream = StdUnixStream::connect("/path/to/the/socket" )?; |
810 | /// std_stream.set_nonblocking(true)?; |
811 | /// let stream = UnixStream::from_std(std_stream)?; |
812 | /// # Ok(()) |
813 | /// # } |
814 | /// ``` |
815 | /// |
816 | /// # Panics |
817 | /// |
818 | /// This function panics if it is not called from within a runtime with |
819 | /// IO enabled. |
820 | /// |
821 | /// The runtime is usually set implicitly when this function is called |
822 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
823 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
824 | #[track_caller ] |
825 | pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> { |
826 | check_socket_for_blocking(&stream)?; |
827 | |
828 | let stream = mio::net::UnixStream::from_std(stream); |
829 | let io = PollEvented::new(stream)?; |
830 | |
831 | Ok(UnixStream { io }) |
832 | } |
833 | |
834 | /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`]. |
835 | /// |
836 | /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking |
837 | /// mode set as `true`. Use [`set_nonblocking`] to change the blocking |
838 | /// mode if needed. |
839 | /// |
840 | /// # Examples |
841 | /// |
842 | /// ``` |
843 | /// # if cfg!(miri) { return } // No `socket` in miri. |
844 | /// use std::error::Error; |
845 | /// use std::io::Read; |
846 | /// use tokio::net::UnixListener; |
847 | /// # use tokio::net::UnixStream; |
848 | /// # use tokio::io::AsyncWriteExt; |
849 | /// |
850 | /// #[tokio::main] |
851 | /// async fn main() -> Result<(), Box<dyn Error>> { |
852 | /// let dir = tempfile::tempdir().unwrap(); |
853 | /// let bind_path = dir.path().join("bind_path" ); |
854 | /// |
855 | /// let mut data = [0u8; 12]; |
856 | /// let listener = UnixListener::bind(&bind_path)?; |
857 | /// # let handle = tokio::spawn(async { |
858 | /// # let mut stream = UnixStream::connect(bind_path).await.unwrap(); |
859 | /// # stream.write(b"Hello world!" ).await.unwrap(); |
860 | /// # }); |
861 | /// let (tokio_unix_stream, _) = listener.accept().await?; |
862 | /// let mut std_unix_stream = tokio_unix_stream.into_std()?; |
863 | /// # handle.await.expect("The task being joined has panicked" ); |
864 | /// std_unix_stream.set_nonblocking(false)?; |
865 | /// std_unix_stream.read_exact(&mut data)?; |
866 | /// # assert_eq!(b"Hello world!" , &data); |
867 | /// Ok(()) |
868 | /// } |
869 | /// ``` |
870 | /// [`tokio::net::UnixStream`]: UnixStream |
871 | /// [`std::os::unix::net::UnixStream`]: std::os::unix::net::UnixStream |
872 | /// [`set_nonblocking`]: fn@std::os::unix::net::UnixStream::set_nonblocking |
873 | pub fn into_std(self) -> io::Result<std::os::unix::net::UnixStream> { |
874 | self.io |
875 | .into_inner() |
876 | .map(IntoRawFd::into_raw_fd) |
877 | .map(|raw_fd| unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) }) |
878 | } |
879 | |
880 | /// Creates an unnamed pair of connected sockets. |
881 | /// |
882 | /// This function will create a pair of interconnected Unix sockets for |
883 | /// communicating back and forth between one another. Each socket will |
884 | /// be associated with the default event loop's handle. |
885 | pub fn pair() -> io::Result<(UnixStream, UnixStream)> { |
886 | let (a, b) = mio::net::UnixStream::pair()?; |
887 | let a = UnixStream::new(a)?; |
888 | let b = UnixStream::new(b)?; |
889 | |
890 | Ok((a, b)) |
891 | } |
892 | |
893 | pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> { |
894 | let io = PollEvented::new(stream)?; |
895 | Ok(UnixStream { io }) |
896 | } |
897 | |
898 | /// Returns the socket address of the local half of this connection. |
899 | /// |
900 | /// # Examples |
901 | /// |
902 | /// ```no_run |
903 | /// use tokio::net::UnixStream; |
904 | /// |
905 | /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { |
906 | /// let dir = tempfile::tempdir().unwrap(); |
907 | /// let bind_path = dir.path().join("bind_path" ); |
908 | /// let stream = UnixStream::connect(bind_path).await?; |
909 | /// |
910 | /// println!("{:?}" , stream.local_addr()?); |
911 | /// # Ok(()) |
912 | /// # } |
913 | /// ``` |
914 | pub fn local_addr(&self) -> io::Result<SocketAddr> { |
915 | self.io.local_addr().map(SocketAddr) |
916 | } |
917 | |
918 | /// Returns the socket address of the remote half of this connection. |
919 | /// |
920 | /// # Examples |
921 | /// |
922 | /// ```no_run |
923 | /// use tokio::net::UnixStream; |
924 | /// |
925 | /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { |
926 | /// let dir = tempfile::tempdir().unwrap(); |
927 | /// let bind_path = dir.path().join("bind_path" ); |
928 | /// let stream = UnixStream::connect(bind_path).await?; |
929 | /// |
930 | /// println!("{:?}" , stream.peer_addr()?); |
931 | /// # Ok(()) |
932 | /// # } |
933 | /// ``` |
934 | pub fn peer_addr(&self) -> io::Result<SocketAddr> { |
935 | self.io.peer_addr().map(SocketAddr) |
936 | } |
937 | |
938 | /// Returns effective credentials of the process which called `connect` or `pair`. |
939 | pub fn peer_cred(&self) -> io::Result<UCred> { |
940 | ucred::get_peer_cred(self) |
941 | } |
942 | |
943 | /// Returns the value of the `SO_ERROR` option. |
944 | pub fn take_error(&self) -> io::Result<Option<io::Error>> { |
945 | self.io.take_error() |
946 | } |
947 | |
948 | /// Shuts down the read, write, or both halves of this connection. |
949 | /// |
950 | /// This function will cause all pending and future I/O calls on the |
951 | /// specified portions to immediately return with an appropriate value |
952 | /// (see the documentation of `Shutdown`). |
953 | pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> { |
954 | self.io.shutdown(how) |
955 | } |
956 | |
957 | // These lifetime markers also appear in the generated documentation, and make |
958 | // it more clear that this is a *borrowed* split. |
959 | #[allow (clippy::needless_lifetimes)] |
960 | /// Splits a `UnixStream` into a read half and a write half, which can be used |
961 | /// to read and write the stream concurrently. |
962 | /// |
963 | /// This method is more efficient than [`into_split`], but the halves cannot be |
964 | /// moved into independently spawned tasks. |
965 | /// |
966 | /// [`into_split`]: Self::into_split() |
967 | pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { |
968 | split(self) |
969 | } |
970 | |
971 | /// Splits a `UnixStream` into a read half and a write half, which can be used |
972 | /// to read and write the stream concurrently. |
973 | /// |
974 | /// Unlike [`split`], the owned halves can be moved to separate tasks, however |
975 | /// this comes at the cost of a heap allocation. |
976 | /// |
977 | /// **Note:** Dropping the write half will shut down the write half of the |
978 | /// stream. This is equivalent to calling [`shutdown()`] on the `UnixStream`. |
979 | /// |
980 | /// [`split`]: Self::split() |
981 | /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown |
982 | pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { |
983 | split_owned(self) |
984 | } |
985 | } |
986 | |
987 | impl TryFrom<net::UnixStream> for UnixStream { |
988 | type Error = io::Error; |
989 | |
990 | /// Consumes stream, returning the tokio I/O object. |
991 | /// |
992 | /// This is equivalent to |
993 | /// [`UnixStream::from_std(stream)`](UnixStream::from_std). |
994 | fn try_from(stream: net::UnixStream) -> io::Result<Self> { |
995 | Self::from_std(stream) |
996 | } |
997 | } |
998 | |
999 | impl AsyncRead for UnixStream { |
1000 | fn poll_read( |
1001 | self: Pin<&mut Self>, |
1002 | cx: &mut Context<'_>, |
1003 | buf: &mut ReadBuf<'_>, |
1004 | ) -> Poll<io::Result<()>> { |
1005 | self.poll_read_priv(cx, buf) |
1006 | } |
1007 | } |
1008 | |
1009 | impl AsyncWrite for UnixStream { |
1010 | fn poll_write( |
1011 | self: Pin<&mut Self>, |
1012 | cx: &mut Context<'_>, |
1013 | buf: &[u8], |
1014 | ) -> Poll<io::Result<usize>> { |
1015 | self.poll_write_priv(cx, buf) |
1016 | } |
1017 | |
1018 | fn poll_write_vectored( |
1019 | self: Pin<&mut Self>, |
1020 | cx: &mut Context<'_>, |
1021 | bufs: &[io::IoSlice<'_>], |
1022 | ) -> Poll<io::Result<usize>> { |
1023 | self.poll_write_vectored_priv(cx, bufs) |
1024 | } |
1025 | |
1026 | fn is_write_vectored(&self) -> bool { |
1027 | true |
1028 | } |
1029 | |
1030 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { |
1031 | Poll::Ready(Ok(())) |
1032 | } |
1033 | |
1034 | fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { |
1035 | self.shutdown_std(std::net::Shutdown::Write)?; |
1036 | Poll::Ready(Ok(())) |
1037 | } |
1038 | } |
1039 | |
1040 | impl UnixStream { |
1041 | // == Poll IO functions that takes `&self` == |
1042 | // |
1043 | // To read or write without mutable access to the `UnixStream`, combine the |
1044 | // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or |
1045 | // `try_write` methods. |
1046 | |
1047 | pub(crate) fn poll_read_priv( |
1048 | &self, |
1049 | cx: &mut Context<'_>, |
1050 | buf: &mut ReadBuf<'_>, |
1051 | ) -> Poll<io::Result<()>> { |
1052 | // Safety: `UnixStream::read` correctly handles reads into uninitialized memory |
1053 | unsafe { self.io.poll_read(cx, buf) } |
1054 | } |
1055 | |
1056 | pub(crate) fn poll_write_priv( |
1057 | &self, |
1058 | cx: &mut Context<'_>, |
1059 | buf: &[u8], |
1060 | ) -> Poll<io::Result<usize>> { |
1061 | self.io.poll_write(cx, buf) |
1062 | } |
1063 | |
1064 | pub(super) fn poll_write_vectored_priv( |
1065 | &self, |
1066 | cx: &mut Context<'_>, |
1067 | bufs: &[io::IoSlice<'_>], |
1068 | ) -> Poll<io::Result<usize>> { |
1069 | self.io.poll_write_vectored(cx, bufs) |
1070 | } |
1071 | } |
1072 | |
1073 | impl fmt::Debug for UnixStream { |
1074 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1075 | self.io.fmt(f) |
1076 | } |
1077 | } |
1078 | |
1079 | impl AsRawFd for UnixStream { |
1080 | fn as_raw_fd(&self) -> RawFd { |
1081 | self.io.as_raw_fd() |
1082 | } |
1083 | } |
1084 | |
1085 | impl AsFd for UnixStream { |
1086 | fn as_fd(&self) -> BorrowedFd<'_> { |
1087 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
1088 | } |
1089 | } |
1090 | |