1//! `UnixStream` owned split support.
2//!
3//! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf`
4//! with the `UnixStream::into_split` method. `OwnedReadHalf` implements
5//! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`.
6//!
7//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8//! split has no associated overhead and enforces all invariants at the type
9//! level.
10
11use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
12use crate::net::UnixStream;
13
14use crate::net::unix::SocketAddr;
15use std::error::Error;
16use std::net::Shutdown;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20use std::{fmt, io};
21
22cfg_io_util! {
23 use bytes::BufMut;
24}
25
26/// Owned read half of a [`UnixStream`], created by [`into_split`].
27///
28/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
29/// on the [`AsyncReadExt`] trait.
30///
31/// [`UnixStream`]: crate::net::UnixStream
32/// [`into_split`]: crate::net::UnixStream::into_split()
33/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
34#[derive(Debug)]
35pub struct OwnedReadHalf {
36 inner: Arc<UnixStream>,
37}
38
39/// Owned write half of a [`UnixStream`], created by [`into_split`].
40///
41/// Note that in the [`AsyncWrite`] implementation of this type,
42/// [`poll_shutdown`] will shut down the stream in the write direction.
43/// Dropping the write half will also shut down the write half of the stream.
44///
45/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods
46/// found on the [`AsyncWriteExt`] trait.
47///
48/// [`UnixStream`]: crate::net::UnixStream
49/// [`into_split`]: crate::net::UnixStream::into_split()
50/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
51/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
52/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
53#[derive(Debug)]
54pub struct OwnedWriteHalf {
55 inner: Arc<UnixStream>,
56 shutdown_on_drop: bool,
57}
58
59pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) {
60 let arc = Arc::new(stream);
61 let read = OwnedReadHalf {
62 inner: Arc::clone(&arc),
63 };
64 let write = OwnedWriteHalf {
65 inner: arc,
66 shutdown_on_drop: true,
67 };
68 (read, write)
69}
70
71pub(crate) fn reunite(
72 read: OwnedReadHalf,
73 write: OwnedWriteHalf,
74) -> Result<UnixStream, ReuniteError> {
75 if Arc::ptr_eq(&read.inner, &write.inner) {
76 write.forget();
77 // This unwrap cannot fail as the api does not allow creating more than two Arcs,
78 // and we just dropped the other half.
79 Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite"))
80 } else {
81 Err(ReuniteError(read, write))
82 }
83}
84
85/// Error indicating that two halves were not from the same socket, and thus could
86/// not be reunited.
87#[derive(Debug)]
88pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf);
89
90impl fmt::Display for ReuniteError {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 write!(
93 f,
94 "tried to reunite halves that are not from the same socket"
95 )
96 }
97}
98
99impl Error for ReuniteError {}
100
101impl OwnedReadHalf {
102 /// Attempts to put the two halves of a `UnixStream` back together and
103 /// recover the original socket. Succeeds only if the two halves
104 /// originated from the same call to [`into_split`].
105 ///
106 /// [`into_split`]: crate::net::UnixStream::into_split()
107 pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> {
108 reunite(self, other)
109 }
110
111 /// Waits for any of the requested ready states.
112 ///
113 /// This function is usually paired with [`try_read()`]. It can be used instead
114 /// of [`readable()`] to check the returned ready set for [`Ready::READABLE`]
115 /// and [`Ready::READ_CLOSED`] events.
116 ///
117 /// The function may complete without the socket being ready. This is a
118 /// false-positive and attempting an operation will return with
119 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
120 /// [`Ready`] set, so you should always check the returned value and possibly
121 /// wait again if the requested states are not set.
122 ///
123 /// This function is equivalent to [`UnixStream::ready`].
124 ///
125 /// [`try_read()`]: Self::try_read
126 /// [`readable()`]: Self::readable
127 ///
128 /// # Cancel safety
129 ///
130 /// This method is cancel safe. Once a readiness event occurs, the method
131 /// will continue to return immediately until the readiness event is
132 /// consumed by an attempt to read or write that fails with `WouldBlock` or
133 /// `Poll::Pending`.
134 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
135 self.inner.ready(interest).await
136 }
137
138 /// Waits for the socket to become readable.
139 ///
140 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
141 /// paired with `try_read()`.
142 ///
143 /// # Cancel safety
144 ///
145 /// This method is cancel safe. Once a readiness event occurs, the method
146 /// will continue to return immediately until the readiness event is
147 /// consumed by an attempt to read that fails with `WouldBlock` or
148 /// `Poll::Pending`.
149 pub async fn readable(&self) -> io::Result<()> {
150 self.inner.readable().await
151 }
152
153 /// Tries to read data from the stream into the provided buffer, returning how
154 /// many bytes were read.
155 ///
156 /// Receives any pending data from the socket but does not wait for new data
157 /// to arrive. On success, returns the number of bytes read. Because
158 /// `try_read()` is non-blocking, the buffer does not have to be stored by
159 /// the async task and can exist entirely on the stack.
160 ///
161 /// Usually, [`readable()`] or [`ready()`] is used with this function.
162 ///
163 /// [`readable()`]: Self::readable()
164 /// [`ready()`]: Self::ready()
165 ///
166 /// # Return
167 ///
168 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
169 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
170 ///
171 /// 1. The stream's read half is closed and will no longer yield data.
172 /// 2. The specified buffer was 0 bytes in length.
173 ///
174 /// If the stream is not ready to read data,
175 /// `Err(io::ErrorKind::WouldBlock)` is returned.
176 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
177 self.inner.try_read(buf)
178 }
179
180 cfg_io_util! {
181 /// Tries to read data from the stream into the provided buffer, advancing the
182 /// buffer's internal cursor, returning how many bytes were read.
183 ///
184 /// Receives any pending data from the socket but does not wait for new data
185 /// to arrive. On success, returns the number of bytes read. Because
186 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
187 /// the async task and can exist entirely on the stack.
188 ///
189 /// Usually, [`readable()`] or [`ready()`] is used with this function.
190 ///
191 /// [`readable()`]: Self::readable()
192 /// [`ready()`]: Self::ready()
193 ///
194 /// # Return
195 ///
196 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
197 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
198 /// and will no longer yield data. If the stream is not ready to read data
199 /// `Err(io::ErrorKind::WouldBlock)` is returned.
200 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
201 self.inner.try_read_buf(buf)
202 }
203 }
204
205 /// Tries to read data from the stream into the provided buffers, returning
206 /// how many bytes were read.
207 ///
208 /// Data is copied to fill each buffer in order, with the final buffer
209 /// written to possibly being only partially filled. This method behaves
210 /// equivalently to a single call to [`try_read()`] with concatenated
211 /// buffers.
212 ///
213 /// Receives any pending data from the socket but does not wait for new data
214 /// to arrive. On success, returns the number of bytes read. Because
215 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
216 /// stored by the async task and can exist entirely on the stack.
217 ///
218 /// Usually, [`readable()`] or [`ready()`] is used with this function.
219 ///
220 /// [`try_read()`]: Self::try_read()
221 /// [`readable()`]: Self::readable()
222 /// [`ready()`]: Self::ready()
223 ///
224 /// # Return
225 ///
226 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
227 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
228 /// and will no longer yield data. If the stream is not ready to read data
229 /// `Err(io::ErrorKind::WouldBlock)` is returned.
230 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
231 self.inner.try_read_vectored(bufs)
232 }
233
234 /// Returns the socket address of the remote half of this connection.
235 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
236 self.inner.peer_addr()
237 }
238
239 /// Returns the socket address of the local half of this connection.
240 pub fn local_addr(&self) -> io::Result<SocketAddr> {
241 self.inner.local_addr()
242 }
243}
244
245impl AsyncRead for OwnedReadHalf {
246 fn poll_read(
247 self: Pin<&mut Self>,
248 cx: &mut Context<'_>,
249 buf: &mut ReadBuf<'_>,
250 ) -> Poll<io::Result<()>> {
251 self.inner.poll_read_priv(cx, buf)
252 }
253}
254
255impl OwnedWriteHalf {
256 /// Attempts to put the two halves of a `UnixStream` back together and
257 /// recover the original socket. Succeeds only if the two halves
258 /// originated from the same call to [`into_split`].
259 ///
260 /// [`into_split`]: crate::net::UnixStream::into_split()
261 pub fn reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError> {
262 reunite(other, self)
263 }
264
265 /// Destroys the write half, but don't close the write half of the stream
266 /// until the read half is dropped. If the read half has already been
267 /// dropped, this closes the stream.
268 pub fn forget(mut self) {
269 self.shutdown_on_drop = false;
270 drop(self);
271 }
272
273 /// Waits for any of the requested ready states.
274 ///
275 /// This function is usually paired with [`try_write()`]. It can be used instead
276 /// of [`writable()`] to check the returned ready set for [`Ready::WRITABLE`]
277 /// and [`Ready::WRITE_CLOSED`] events.
278 ///
279 /// The function may complete without the socket being ready. This is a
280 /// false-positive and attempting an operation will return with
281 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
282 /// [`Ready`] set, so you should always check the returned value and possibly
283 /// wait again if the requested states are not set.
284 ///
285 /// This function is equivalent to [`UnixStream::ready`].
286 ///
287 /// [`try_write()`]: Self::try_write
288 /// [`writable()`]: Self::writable
289 ///
290 /// # Cancel safety
291 ///
292 /// This method is cancel safe. Once a readiness event occurs, the method
293 /// will continue to return immediately until the readiness event is
294 /// consumed by an attempt to read or write that fails with `WouldBlock` or
295 /// `Poll::Pending`.
296 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
297 self.inner.ready(interest).await
298 }
299
300 /// Waits for the socket to become writable.
301 ///
302 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
303 /// paired with `try_write()`.
304 ///
305 /// # Cancel safety
306 ///
307 /// This method is cancel safe. Once a readiness event occurs, the method
308 /// will continue to return immediately until the readiness event is
309 /// consumed by an attempt to write that fails with `WouldBlock` or
310 /// `Poll::Pending`.
311 pub async fn writable(&self) -> io::Result<()> {
312 self.inner.writable().await
313 }
314
315 /// Tries to write a buffer to the stream, returning how many bytes were
316 /// written.
317 ///
318 /// The function will attempt to write the entire contents of `buf`, but
319 /// only part of the buffer may be written.
320 ///
321 /// This function is usually paired with `writable()`.
322 ///
323 /// # Return
324 ///
325 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
326 /// number of bytes written. If the stream is not ready to write data,
327 /// `Err(io::ErrorKind::WouldBlock)` is returned.
328 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
329 self.inner.try_write(buf)
330 }
331
332 /// Tries to write several buffers to the stream, returning how many bytes
333 /// were written.
334 ///
335 /// Data is written from each buffer in order, with the final buffer read
336 /// from possible being only partially consumed. This method behaves
337 /// equivalently to a single call to [`try_write()`] with concatenated
338 /// buffers.
339 ///
340 /// This function is usually paired with `writable()`.
341 ///
342 /// [`try_write()`]: Self::try_write()
343 ///
344 /// # Return
345 ///
346 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
347 /// number of bytes written. If the stream is not ready to write data,
348 /// `Err(io::ErrorKind::WouldBlock)` is returned.
349 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
350 self.inner.try_write_vectored(buf)
351 }
352
353 /// Returns the socket address of the remote half of this connection.
354 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
355 self.inner.peer_addr()
356 }
357
358 /// Returns the socket address of the local half of this connection.
359 pub fn local_addr(&self) -> io::Result<SocketAddr> {
360 self.inner.local_addr()
361 }
362}
363
364impl Drop for OwnedWriteHalf {
365 fn drop(&mut self) {
366 if self.shutdown_on_drop {
367 let _ = self.inner.shutdown_std(Shutdown::Write);
368 }
369 }
370}
371
372impl AsyncWrite for OwnedWriteHalf {
373 fn poll_write(
374 self: Pin<&mut Self>,
375 cx: &mut Context<'_>,
376 buf: &[u8],
377 ) -> Poll<io::Result<usize>> {
378 self.inner.poll_write_priv(cx, buf)
379 }
380
381 fn poll_write_vectored(
382 self: Pin<&mut Self>,
383 cx: &mut Context<'_>,
384 bufs: &[io::IoSlice<'_>],
385 ) -> Poll<io::Result<usize>> {
386 self.inner.poll_write_vectored_priv(cx, bufs)
387 }
388
389 fn is_write_vectored(&self) -> bool {
390 self.inner.is_write_vectored()
391 }
392
393 #[inline]
394 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
395 // flush is a no-op
396 Poll::Ready(Ok(()))
397 }
398
399 // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
400 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
401 let res = self.inner.shutdown_std(Shutdown::Write);
402 if res.is_ok() {
403 Pin::into_inner(self).shutdown_on_drop = false;
404 }
405 res.into()
406 }
407}
408
409impl AsRef<UnixStream> for OwnedReadHalf {
410 fn as_ref(&self) -> &UnixStream {
411 &self.inner
412 }
413}
414
415impl AsRef<UnixStream> for OwnedWriteHalf {
416 fn as_ref(&self) -> &UnixStream {
417 &self.inner
418 }
419}
420