1 | use crate::io::{Interest, PollEvented}; |
2 | use crate::net::unix::{SocketAddr, UnixStream}; |
3 | use crate::util::check_socket_for_blocking; |
4 | |
5 | use std::fmt; |
6 | use std::io; |
7 | #[cfg (target_os = "android" )] |
8 | use std::os::android::net::SocketAddrExt; |
9 | #[cfg (target_os = "linux" )] |
10 | use std::os::linux::net::SocketAddrExt; |
11 | #[cfg (any(target_os = "linux" , target_os = "android" ))] |
12 | use std::os::unix::ffi::OsStrExt; |
13 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; |
14 | use std::os::unix::net::{self, SocketAddr as StdSocketAddr}; |
15 | use std::path::Path; |
16 | use std::task::{ready, Context, Poll}; |
17 | |
18 | cfg_net_unix! { |
19 | /// A Unix socket which can accept connections from other Unix sockets. |
20 | /// |
21 | /// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method. |
22 | /// |
23 | /// A `UnixListener` can be turned into a `Stream` with [`UnixListenerStream`]. |
24 | /// |
25 | /// [`UnixListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnixListenerStream.html |
26 | /// |
27 | /// # Errors |
28 | /// |
29 | /// Note that accepting a connection can lead to various errors and not all |
30 | /// of them are necessarily fatal ‒ for example having too many open file |
31 | /// descriptors or the other side closing the connection while it waits in |
32 | /// an accept queue. These would terminate the stream if not handled in any |
33 | /// way. |
34 | /// |
35 | /// # Examples |
36 | /// |
37 | /// ```no_run |
38 | /// use tokio::net::UnixListener; |
39 | /// |
40 | /// #[tokio::main] |
41 | /// async fn main() { |
42 | /// let listener = UnixListener::bind("/path/to/the/socket").unwrap(); |
43 | /// loop { |
44 | /// match listener.accept().await { |
45 | /// Ok((stream, _addr)) => { |
46 | /// println!("new client!"); |
47 | /// } |
48 | /// Err(e) => { /* connection failed */ } |
49 | /// } |
50 | /// } |
51 | /// } |
52 | /// ``` |
53 | #[cfg_attr (docsrs, doc(alias = "uds" ))] |
54 | pub struct UnixListener { |
55 | io: PollEvented<mio::net::UnixListener>, |
56 | } |
57 | } |
58 | |
59 | impl UnixListener { |
60 | pub(crate) fn new(listener: mio::net::UnixListener) -> io::Result<UnixListener> { |
61 | let io = PollEvented::new(listener)?; |
62 | Ok(UnixListener { io }) |
63 | } |
64 | |
65 | /// Creates a new `UnixListener` bound to the specified path. |
66 | /// |
67 | /// # Panics |
68 | /// |
69 | /// This function panics if it is not called from within a runtime with |
70 | /// IO enabled. |
71 | /// |
72 | /// The runtime is usually set implicitly when this function is called |
73 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
74 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
75 | #[track_caller ] |
76 | pub fn bind<P>(path: P) -> io::Result<UnixListener> |
77 | where |
78 | P: AsRef<Path>, |
79 | { |
80 | // For now, we handle abstract socket paths on linux here. |
81 | #[cfg (any(target_os = "linux" , target_os = "android" ))] |
82 | let addr = { |
83 | let os_str_bytes = path.as_ref().as_os_str().as_bytes(); |
84 | if os_str_bytes.starts_with(b" \0" ) { |
85 | StdSocketAddr::from_abstract_name(&os_str_bytes[1..])? |
86 | } else { |
87 | StdSocketAddr::from_pathname(path)? |
88 | } |
89 | }; |
90 | #[cfg (not(any(target_os = "linux" , target_os = "android" )))] |
91 | let addr = StdSocketAddr::from_pathname(path)?; |
92 | |
93 | let listener = mio::net::UnixListener::bind_addr(&addr)?; |
94 | let io = PollEvented::new(listener)?; |
95 | Ok(UnixListener { io }) |
96 | } |
97 | |
98 | /// Creates new [`UnixListener`] from a [`std::os::unix::net::UnixListener`]. |
99 | /// |
100 | /// This function is intended to be used to wrap a `UnixListener` from the |
101 | /// standard library in the Tokio equivalent. |
102 | /// |
103 | /// # Notes |
104 | /// |
105 | /// The caller is responsible for ensuring that the listener is in |
106 | /// non-blocking mode. Otherwise all I/O operations on the listener |
107 | /// will block the thread, which will cause unexpected behavior. |
108 | /// Non-blocking mode can be set using [`set_nonblocking`]. |
109 | /// |
110 | /// Passing a listener in blocking mode is always erroneous, |
111 | /// and the behavior in that case may change in the future. |
112 | /// For example, it could panic. |
113 | /// |
114 | /// [`set_nonblocking`]: std::os::unix::net::UnixListener::set_nonblocking |
115 | /// |
116 | /// # Examples |
117 | /// |
118 | /// ```no_run |
119 | /// use tokio::net::UnixListener; |
120 | /// use std::os::unix::net::UnixListener as StdUnixListener; |
121 | /// # use std::error::Error; |
122 | /// |
123 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
124 | /// let std_listener = StdUnixListener::bind("/path/to/the/socket" )?; |
125 | /// std_listener.set_nonblocking(true)?; |
126 | /// let listener = UnixListener::from_std(std_listener)?; |
127 | /// # Ok(()) |
128 | /// # } |
129 | /// ``` |
130 | /// |
131 | /// # Panics |
132 | /// |
133 | /// This function panics if it is not called from within a runtime with |
134 | /// IO enabled. |
135 | /// |
136 | /// The runtime is usually set implicitly when this function is called |
137 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
138 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
139 | #[track_caller ] |
140 | pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> { |
141 | check_socket_for_blocking(&listener)?; |
142 | |
143 | let listener = mio::net::UnixListener::from_std(listener); |
144 | let io = PollEvented::new(listener)?; |
145 | Ok(UnixListener { io }) |
146 | } |
147 | |
148 | /// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`]. |
149 | /// |
150 | /// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode |
151 | /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed. |
152 | /// |
153 | /// # Examples |
154 | /// |
155 | /// ```rust,no_run |
156 | /// # use std::error::Error; |
157 | /// # async fn dox() -> Result<(), Box<dyn Error>> { |
158 | /// let tokio_listener = tokio::net::UnixListener::bind("/path/to/the/socket" )?; |
159 | /// let std_listener = tokio_listener.into_std()?; |
160 | /// std_listener.set_nonblocking(false)?; |
161 | /// # Ok(()) |
162 | /// # } |
163 | /// ``` |
164 | /// |
165 | /// [`tokio::net::UnixListener`]: UnixListener |
166 | /// [`std::os::unix::net::UnixListener`]: std::os::unix::net::UnixListener |
167 | /// [`set_nonblocking`]: fn@std::os::unix::net::UnixListener::set_nonblocking |
168 | pub fn into_std(self) -> io::Result<std::os::unix::net::UnixListener> { |
169 | self.io |
170 | .into_inner() |
171 | .map(IntoRawFd::into_raw_fd) |
172 | .map(|raw_fd| unsafe { net::UnixListener::from_raw_fd(raw_fd) }) |
173 | } |
174 | |
175 | /// Returns the local socket address of this listener. |
176 | pub fn local_addr(&self) -> io::Result<SocketAddr> { |
177 | self.io.local_addr().map(SocketAddr) |
178 | } |
179 | |
180 | /// Returns the value of the `SO_ERROR` option. |
181 | pub fn take_error(&self) -> io::Result<Option<io::Error>> { |
182 | self.io.take_error() |
183 | } |
184 | |
185 | /// Accepts a new incoming connection to this listener. |
186 | /// |
187 | /// # Cancel safety |
188 | /// |
189 | /// This method is cancel safe. If the method is used as the event in a |
190 | /// [`tokio::select!`](crate::select) statement and some other branch |
191 | /// completes first, then it is guaranteed that no new connections were |
192 | /// accepted by this method. |
193 | pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { |
194 | let (mio, addr) = self |
195 | .io |
196 | .registration() |
197 | .async_io(Interest::READABLE, || self.io.accept()) |
198 | .await?; |
199 | |
200 | let addr = SocketAddr(addr); |
201 | let stream = UnixStream::new(mio)?; |
202 | Ok((stream, addr)) |
203 | } |
204 | |
205 | /// Polls to accept a new incoming connection to this listener. |
206 | /// |
207 | /// If there is no connection to accept, `Poll::Pending` is returned and the |
208 | /// current task will be notified by a waker. Note that on multiple calls |
209 | /// to `poll_accept`, only the `Waker` from the `Context` passed to the most |
210 | /// recent call is scheduled to receive a wakeup. |
211 | pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> { |
212 | let (sock, addr) = ready!(self.io.registration().poll_read_io(cx, || self.io.accept()))?; |
213 | let addr = SocketAddr(addr); |
214 | let sock = UnixStream::new(sock)?; |
215 | Poll::Ready(Ok((sock, addr))) |
216 | } |
217 | } |
218 | |
219 | impl TryFrom<std::os::unix::net::UnixListener> for UnixListener { |
220 | type Error = io::Error; |
221 | |
222 | /// Consumes stream, returning the tokio I/O object. |
223 | /// |
224 | /// This is equivalent to |
225 | /// [`UnixListener::from_std(stream)`](UnixListener::from_std). |
226 | fn try_from(stream: std::os::unix::net::UnixListener) -> io::Result<Self> { |
227 | Self::from_std(listener:stream) |
228 | } |
229 | } |
230 | |
231 | impl fmt::Debug for UnixListener { |
232 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
233 | self.io.fmt(f) |
234 | } |
235 | } |
236 | |
237 | impl AsRawFd for UnixListener { |
238 | fn as_raw_fd(&self) -> RawFd { |
239 | self.io.as_raw_fd() |
240 | } |
241 | } |
242 | |
243 | impl AsFd for UnixListener { |
244 | fn as_fd(&self) -> BorrowedFd<'_> { |
245 | unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } |
246 | } |
247 | } |
248 | |