| 1 | use crate::io::{Interest, PollEvented}; |
| 2 | use crate::net::unix::{SocketAddr, UnixStream}; |
| 3 | use crate::util::check_socket_for_blocking; |
| 4 | |
| 5 | use std::fmt; |
| 6 | use std::io; |
| 7 | #[cfg (target_os = "android" )] |
| 8 | use std::os::android::net::SocketAddrExt; |
| 9 | #[cfg (target_os = "linux" )] |
| 10 | use std::os::linux::net::SocketAddrExt; |
| 11 | #[cfg (any(target_os = "linux" , target_os = "android" ))] |
| 12 | use std::os::unix::ffi::OsStrExt; |
| 13 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; |
| 14 | use std::os::unix::net::{self, SocketAddr as StdSocketAddr}; |
| 15 | use std::path::Path; |
| 16 | use std::task::{ready, Context, Poll}; |
| 17 | |
| 18 | cfg_net_unix! { |
| 19 | /// A Unix socket which can accept connections from other Unix sockets. |
| 20 | /// |
| 21 | /// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method. |
| 22 | /// |
| 23 | /// A `UnixListener` can be turned into a `Stream` with [`UnixListenerStream`]. |
| 24 | /// |
| 25 | /// [`UnixListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnixListenerStream.html |
| 26 | /// |
| 27 | /// # Errors |
| 28 | /// |
| 29 | /// Note that accepting a connection can lead to various errors and not all |
| 30 | /// of them are necessarily fatal ‒ for example having too many open file |
| 31 | /// descriptors or the other side closing the connection while it waits in |
| 32 | /// an accept queue. These would terminate the stream if not handled in any |
| 33 | /// way. |
| 34 | /// |
| 35 | /// # Examples |
| 36 | /// |
| 37 | /// ```no_run |
| 38 | /// use tokio::net::UnixListener; |
| 39 | /// |
| 40 | /// #[tokio::main] |
| 41 | /// async fn main() { |
| 42 | /// let listener = UnixListener::bind("/path/to/the/socket").unwrap(); |
| 43 | /// loop { |
| 44 | /// match listener.accept().await { |
| 45 | /// Ok((stream, _addr)) => { |
| 46 | /// println!("new client!"); |
| 47 | /// } |
| 48 | /// Err(e) => { /* connection failed */ } |
| 49 | /// } |
| 50 | /// } |
| 51 | /// } |
| 52 | /// ``` |
| 53 | #[cfg_attr (docsrs, doc(alias = "uds" ))] |
| 54 | pub struct UnixListener { |
| 55 | io: PollEvented<mio::net::UnixListener>, |
| 56 | } |
| 57 | } |
| 58 | |
| 59 | impl UnixListener { |
| 60 | pub(crate) fn new(listener: mio::net::UnixListener) -> io::Result<UnixListener> { |
| 61 | let io = PollEvented::new(listener)?; |
| 62 | Ok(UnixListener { io }) |
| 63 | } |
| 64 | |
| 65 | /// Creates a new `UnixListener` bound to the specified path. |
| 66 | /// |
| 67 | /// # Panics |
| 68 | /// |
| 69 | /// This function panics if it is not called from within a runtime with |
| 70 | /// IO enabled. |
| 71 | /// |
| 72 | /// The runtime is usually set implicitly when this function is called |
| 73 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
| 74 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
| 75 | #[track_caller ] |
| 76 | pub fn bind<P>(path: P) -> io::Result<UnixListener> |
| 77 | where |
| 78 | P: AsRef<Path>, |
| 79 | { |
| 80 | // For now, we handle abstract socket paths on linux here. |
| 81 | #[cfg (any(target_os = "linux" , target_os = "android" ))] |
| 82 | let addr = { |
| 83 | let os_str_bytes = path.as_ref().as_os_str().as_bytes(); |
| 84 | if os_str_bytes.starts_with(b" \0" ) { |
| 85 | StdSocketAddr::from_abstract_name(&os_str_bytes[1..])? |
| 86 | } else { |
| 87 | StdSocketAddr::from_pathname(path)? |
| 88 | } |
| 89 | }; |
| 90 | #[cfg (not(any(target_os = "linux" , target_os = "android" )))] |
| 91 | let addr = StdSocketAddr::from_pathname(path)?; |
| 92 | |
| 93 | let listener = mio::net::UnixListener::bind_addr(&addr)?; |
| 94 | let io = PollEvented::new(listener)?; |
| 95 | Ok(UnixListener { io }) |
| 96 | } |
| 97 | |
| 98 | /// Creates new [`UnixListener`] from a [`std::os::unix::net::UnixListener`]. |
| 99 | /// |
| 100 | /// This function is intended to be used to wrap a `UnixListener` from the |
| 101 | /// standard library in the Tokio equivalent. |
| 102 | /// |
| 103 | /// # Notes |
| 104 | /// |
| 105 | /// The caller is responsible for ensuring that the listener is in |
| 106 | /// non-blocking mode. Otherwise all I/O operations on the listener |
| 107 | /// will block the thread, which will cause unexpected behavior. |
| 108 | /// Non-blocking mode can be set using [`set_nonblocking`]. |
| 109 | /// |
| 110 | /// Passing a listener in blocking mode is always erroneous, |
| 111 | /// and the behavior in that case may change in the future. |
| 112 | /// For example, it could panic. |
| 113 | /// |
| 114 | /// [`set_nonblocking`]: std::os::unix::net::UnixListener::set_nonblocking |
| 115 | /// |
| 116 | /// # Examples |
| 117 | /// |
| 118 | /// ```no_run |
| 119 | /// use tokio::net::UnixListener; |
| 120 | /// use std::os::unix::net::UnixListener as StdUnixListener; |
| 121 | /// # use std::error::Error; |
| 122 | /// |
| 123 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
| 124 | /// let std_listener = StdUnixListener::bind("/path/to/the/socket" )?; |
| 125 | /// std_listener.set_nonblocking(true)?; |
| 126 | /// let listener = UnixListener::from_std(std_listener)?; |
| 127 | /// # Ok(()) |
| 128 | /// # } |
| 129 | /// ``` |
| 130 | /// |
| 131 | /// # Panics |
| 132 | /// |
| 133 | /// This function panics if it is not called from within a runtime with |
| 134 | /// IO enabled. |
| 135 | /// |
| 136 | /// The runtime is usually set implicitly when this function is called |
| 137 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
| 138 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
| 139 | #[track_caller ] |
| 140 | pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> { |
| 141 | check_socket_for_blocking(&listener)?; |
| 142 | |
| 143 | let listener = mio::net::UnixListener::from_std(listener); |
| 144 | let io = PollEvented::new(listener)?; |
| 145 | Ok(UnixListener { io }) |
| 146 | } |
| 147 | |
| 148 | /// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`]. |
| 149 | /// |
| 150 | /// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode |
| 151 | /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed. |
| 152 | /// |
| 153 | /// # Examples |
| 154 | /// |
| 155 | /// ```rust,no_run |
| 156 | /// # use std::error::Error; |
| 157 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
| 158 | /// let tokio_listener = tokio::net::UnixListener::bind("/path/to/the/socket" )?; |
| 159 | /// let std_listener = tokio_listener.into_std()?; |
| 160 | /// std_listener.set_nonblocking(false)?; |
| 161 | /// # Ok(()) |
| 162 | /// # } |
| 163 | /// ``` |
| 164 | /// |
| 165 | /// [`tokio::net::UnixListener`]: UnixListener |
| 166 | /// [`std::os::unix::net::UnixListener`]: std::os::unix::net::UnixListener |
| 167 | /// [`set_nonblocking`]: fn@std::os::unix::net::UnixListener::set_nonblocking |
| 168 | pub fn into_std(self) -> io::Result<std::os::unix::net::UnixListener> { |
| 169 | self.io |
| 170 | .into_inner() |
| 171 | .map(IntoRawFd::into_raw_fd) |
| 172 | .map(|raw_fd| unsafe { net::UnixListener::from_raw_fd(raw_fd) }) |
| 173 | } |
| 174 | |
| 175 | /// Returns the local socket address of this listener. |
| 176 | pub fn local_addr(&self) -> io::Result<SocketAddr> { |
| 177 | self.io.local_addr().map(SocketAddr) |
| 178 | } |
| 179 | |
| 180 | /// Returns the value of the `SO_ERROR` option. |
| 181 | pub fn take_error(&self) -> io::Result<Option<io::Error>> { |
| 182 | self.io.take_error() |
| 183 | } |
| 184 | |
| 185 | /// Accepts a new incoming connection to this listener. |
| 186 | /// |
| 187 | /// # Cancel safety |
| 188 | /// |
| 189 | /// This method is cancel safe. If the method is used as the event in a |
| 190 | /// [`tokio::select!`](crate::select) statement and some other branch |
| 191 | /// completes first, then it is guaranteed that no new connections were |
| 192 | /// accepted by this method. |
| 193 | pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { |
| 194 | let (mio, addr) = self |
| 195 | .io |
| 196 | .registration() |
| 197 | .async_io(Interest::READABLE, || self.io.accept()) |
| 198 | .await?; |
| 199 | |
| 200 | let addr = SocketAddr(addr); |
| 201 | let stream = UnixStream::new(mio)?; |
| 202 | Ok((stream, addr)) |
| 203 | } |
| 204 | |
| 205 | /// Polls to accept a new incoming connection to this listener. |
| 206 | /// |
| 207 | /// If there is no connection to accept, `Poll::Pending` is returned and the |
| 208 | /// current task will be notified by a waker. Note that on multiple calls |
| 209 | /// to `poll_accept`, only the `Waker` from the `Context` passed to the most |
| 210 | /// recent call is scheduled to receive a wakeup. |
| 211 | pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> { |
| 212 | let (sock, addr) = ready!(self.io.registration().poll_read_io(cx, || self.io.accept()))?; |
| 213 | let addr = SocketAddr(addr); |
| 214 | let sock = UnixStream::new(sock)?; |
| 215 | Poll::Ready(Ok((sock, addr))) |
| 216 | } |
| 217 | } |
| 218 | |
| 219 | impl TryFrom<std::os::unix::net::UnixListener> for UnixListener { |
| 220 | type Error = io::Error; |
| 221 | |
| 222 | /// Consumes stream, returning the tokio I/O object. |
| 223 | /// |
| 224 | /// This is equivalent to |
| 225 | /// [`UnixListener::from_std(stream)`](UnixListener::from_std). |
| 226 | fn try_from(stream: std::os::unix::net::UnixListener) -> io::Result<Self> { |
| 227 | Self::from_std(listener:stream) |
| 228 | } |
| 229 | } |
| 230 | |
| 231 | impl fmt::Debug for UnixListener { |
| 232 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 233 | self.io.fmt(f) |
| 234 | } |
| 235 | } |
| 236 | |
| 237 | impl AsRawFd for UnixListener { |
| 238 | fn as_raw_fd(&self) -> RawFd { |
| 239 | self.io.as_raw_fd() |
| 240 | } |
| 241 | } |
| 242 | |
| 243 | impl AsFd for UnixListener { |
| 244 | fn as_fd(&self) -> BorrowedFd<'_> { |
| 245 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
| 246 | } |
| 247 | } |
| 248 | |