1use std::fmt;
2use std::io::{self, IoSlice, IoSliceMut, Read, Write};
3use std::net::{self, Shutdown, SocketAddr};
4#[cfg(unix)]
5use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
6#[cfg(target_os = "wasi")]
7use std::os::wasi::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
8#[cfg(windows)]
9use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
10
11use crate::io_source::IoSource;
12#[cfg(not(target_os = "wasi"))]
13use crate::sys::tcp::{connect, new_for_addr};
14use crate::{event, Interest, Registry, Token};
15
16/// A non-blocking TCP stream between a local socket and a remote socket.
17///
18/// The socket will be closed when the value is dropped.
19///
20/// # Examples
21///
22#[cfg_attr(feature = "os-poll", doc = "```")]
23#[cfg_attr(not(feature = "os-poll"), doc = "```ignore")]
24/// # use std::net::{TcpListener, SocketAddr};
25/// # use std::error::Error;
26/// #
27/// # fn main() -> Result<(), Box<dyn Error>> {
28/// let address: SocketAddr = "127.0.0.1:0".parse()?;
29/// let listener = TcpListener::bind(address)?;
30/// use mio::{Events, Interest, Poll, Token};
31/// use mio::net::TcpStream;
32/// use std::time::Duration;
33///
34/// let mut stream = TcpStream::connect(listener.local_addr()?)?;
35///
36/// let mut poll = Poll::new()?;
37/// let mut events = Events::with_capacity(128);
38///
39/// // Register the socket with `Poll`
40/// poll.registry().register(&mut stream, Token(0), Interest::WRITABLE)?;
41///
42/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
43///
44/// // The socket might be ready at this point
45/// # Ok(())
46/// # }
47/// ```
48pub struct TcpStream {
49 inner: IoSource<net::TcpStream>,
50}
51
52impl TcpStream {
53 /// Create a new TCP stream and issue a non-blocking connect to the
54 /// specified address.
55 ///
56 /// # Notes
57 ///
58 /// The returned `TcpStream` may not be connected (and thus usable), unlike
59 /// the API found in `std::net::TcpStream`. Because Mio issues a
60 /// *non-blocking* connect it will not block the thread and instead return
61 /// an unconnected `TcpStream`.
62 ///
63 /// Ensuring the returned stream is connected is surprisingly complex when
64 /// considering cross-platform support. Doing this properly should follow
65 /// the steps below, an example implementation can be found
66 /// [here](https://github.com/Thomasdezeeuw/heph/blob/0c4f1ab3eaf08bea1d65776528bfd6114c9f8374/src/net/tcp/stream.rs#L560-L622).
67 ///
68 /// 1. Call `TcpStream::connect`
69 /// 2. Register the returned stream with at least [write interest].
70 /// 3. Wait for a (writable) event.
71 /// 4. Check `TcpStream::peer_addr`. If it returns `libc::EINPROGRESS` or
72 /// `ErrorKind::NotConnected` it means the stream is not yet connected,
73 /// go back to step 3. If it returns an address it means the stream is
74 /// connected, go to step 5. If another error is returned something
75 /// went wrong.
76 /// 5. Now the stream can be used.
77 ///
78 /// This may return a `WouldBlock` in which case the socket connection
79 /// cannot be completed immediately, it usually means there are insufficient
80 /// entries in the routing cache.
81 ///
82 /// [write interest]: Interest::WRITABLE
83 #[cfg(not(target_os = "wasi"))]
84 pub fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
85 let socket = new_for_addr(addr)?;
86 #[cfg(unix)]
87 let stream = unsafe { TcpStream::from_raw_fd(socket) };
88 #[cfg(windows)]
89 let stream = unsafe { TcpStream::from_raw_socket(socket as _) };
90 connect(&stream.inner, addr)?;
91 Ok(stream)
92 }
93
94 /// Creates a new `TcpStream` from a standard `net::TcpStream`.
95 ///
96 /// This function is intended to be used to wrap a TCP stream from the
97 /// standard library in the Mio equivalent. The conversion assumes nothing
98 /// about the underlying stream; it is left up to the user to set it in
99 /// non-blocking mode.
100 ///
101 /// # Note
102 ///
103 /// The TCP stream here will not have `connect` called on it, so it
104 /// should already be connected via some other means (be it manually, or
105 /// the standard library).
106 pub fn from_std(stream: net::TcpStream) -> TcpStream {
107 TcpStream {
108 inner: IoSource::new(stream),
109 }
110 }
111
112 /// Returns the socket address of the remote peer of this TCP connection.
113 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
114 self.inner.peer_addr()
115 }
116
117 /// Returns the socket address of the local half of this TCP connection.
118 pub fn local_addr(&self) -> io::Result<SocketAddr> {
119 self.inner.local_addr()
120 }
121
122 /// Shuts down the read, write, or both halves of this connection.
123 ///
124 /// This function will cause all pending and future I/O on the specified
125 /// portions to return immediately with an appropriate value (see the
126 /// documentation of `Shutdown`).
127 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
128 self.inner.shutdown(how)
129 }
130
131 /// Sets the value of the `TCP_NODELAY` option on this socket.
132 ///
133 /// If set, this option disables the Nagle algorithm. This means that
134 /// segments are always sent as soon as possible, even if there is only a
135 /// small amount of data. When not set, data is buffered until there is a
136 /// sufficient amount to send out, thereby avoiding the frequent sending of
137 /// small packets.
138 ///
139 /// # Notes
140 ///
141 /// On Windows make sure the stream is connected before calling this method,
142 /// by receiving an (writable) event. Trying to set `nodelay` on an
143 /// unconnected `TcpStream` is unspecified behavior.
144 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
145 self.inner.set_nodelay(nodelay)
146 }
147
148 /// Gets the value of the `TCP_NODELAY` option on this socket.
149 ///
150 /// For more information about this option, see [`set_nodelay`][link].
151 ///
152 /// [link]: #method.set_nodelay
153 ///
154 /// # Notes
155 ///
156 /// On Windows make sure the stream is connected before calling this method,
157 /// by receiving an (writable) event. Trying to get `nodelay` on an
158 /// unconnected `TcpStream` is unspecified behavior.
159 pub fn nodelay(&self) -> io::Result<bool> {
160 self.inner.nodelay()
161 }
162
163 /// Sets the value for the `IP_TTL` option on this socket.
164 ///
165 /// This value sets the time-to-live field that is used in every packet sent
166 /// from this socket.
167 ///
168 /// # Notes
169 ///
170 /// On Windows make sure the stream is connected before calling this method,
171 /// by receiving an (writable) event. Trying to set `ttl` on an
172 /// unconnected `TcpStream` is unspecified behavior.
173 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
174 self.inner.set_ttl(ttl)
175 }
176
177 /// Gets the value of the `IP_TTL` option for this socket.
178 ///
179 /// For more information about this option, see [`set_ttl`][link].
180 ///
181 /// # Notes
182 ///
183 /// On Windows make sure the stream is connected before calling this method,
184 /// by receiving an (writable) event. Trying to get `ttl` on an
185 /// unconnected `TcpStream` is unspecified behavior.
186 ///
187 /// [link]: #method.set_ttl
188 pub fn ttl(&self) -> io::Result<u32> {
189 self.inner.ttl()
190 }
191
192 /// Get the value of the `SO_ERROR` option on this socket.
193 ///
194 /// This will retrieve the stored error in the underlying socket, clearing
195 /// the field in the process. This can be useful for checking errors between
196 /// calls.
197 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
198 self.inner.take_error()
199 }
200
201 /// Receives data on the socket from the remote address to which it is
202 /// connected, without removing that data from the queue. On success,
203 /// returns the number of bytes peeked.
204 ///
205 /// Successive calls return the same data. This is accomplished by passing
206 /// `MSG_PEEK` as a flag to the underlying recv system call.
207 pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
208 self.inner.peek(buf)
209 }
210
211 /// Execute an I/O operation ensuring that the socket receives more events
212 /// if it hits a [`WouldBlock`] error.
213 ///
214 /// # Notes
215 ///
216 /// This method is required to be called for **all** I/O operations to
217 /// ensure the user will receive events once the socket is ready again after
218 /// returning a [`WouldBlock`] error.
219 ///
220 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
221 ///
222 /// # Examples
223 ///
224 #[cfg_attr(unix, doc = "```no_run")]
225 #[cfg_attr(windows, doc = "```ignore")]
226 /// # use std::error::Error;
227 /// #
228 /// # fn main() -> Result<(), Box<dyn Error>> {
229 /// use std::io;
230 /// #[cfg(unix)]
231 /// use std::os::unix::io::AsRawFd;
232 /// #[cfg(windows)]
233 /// use std::os::windows::io::AsRawSocket;
234 /// use mio::net::TcpStream;
235 ///
236 /// let address = "127.0.0.1:8080".parse().unwrap();
237 /// let stream = TcpStream::connect(address)?;
238 ///
239 /// // Wait until the stream is readable...
240 ///
241 /// // Read from the stream using a direct libc call, of course the
242 /// // `io::Read` implementation would be easier to use.
243 /// let mut buf = [0; 512];
244 /// let n = stream.try_io(|| {
245 /// let buf_ptr = &mut buf as *mut _ as *mut _;
246 /// #[cfg(unix)]
247 /// let res = unsafe { libc::recv(stream.as_raw_fd(), buf_ptr, buf.len(), 0) };
248 /// #[cfg(windows)]
249 /// let res = unsafe { libc::recvfrom(stream.as_raw_socket() as usize, buf_ptr, buf.len() as i32, 0, std::ptr::null_mut(), std::ptr::null_mut()) };
250 /// if res != -1 {
251 /// Ok(res as usize)
252 /// } else {
253 /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
254 /// // should return `WouldBlock` error.
255 /// Err(io::Error::last_os_error())
256 /// }
257 /// })?;
258 /// eprintln!("read {} bytes", n);
259 /// # Ok(())
260 /// # }
261 /// ```
262 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
263 where
264 F: FnOnce() -> io::Result<T>,
265 {
266 self.inner.do_io(|_| f())
267 }
268}
269
270impl Read for TcpStream {
271 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
272 self.inner.do_io(|mut inner: &TcpStream| inner.read(buf))
273 }
274
275 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
276 self.inner.do_io(|mut inner: &TcpStream| inner.read_vectored(bufs))
277 }
278}
279
280impl<'a> Read for &'a TcpStream {
281 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
282 self.inner.do_io(|mut inner: &TcpStream| inner.read(buf))
283 }
284
285 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
286 self.inner.do_io(|mut inner: &TcpStream| inner.read_vectored(bufs))
287 }
288}
289
290impl Write for TcpStream {
291 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
292 self.inner.do_io(|mut inner: &TcpStream| inner.write(buf))
293 }
294
295 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
296 self.inner.do_io(|mut inner: &TcpStream| inner.write_vectored(bufs))
297 }
298
299 fn flush(&mut self) -> io::Result<()> {
300 self.inner.do_io(|mut inner: &TcpStream| inner.flush())
301 }
302}
303
304impl<'a> Write for &'a TcpStream {
305 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
306 self.inner.do_io(|mut inner: &TcpStream| inner.write(buf))
307 }
308
309 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
310 self.inner.do_io(|mut inner: &TcpStream| inner.write_vectored(bufs))
311 }
312
313 fn flush(&mut self) -> io::Result<()> {
314 self.inner.do_io(|mut inner: &TcpStream| inner.flush())
315 }
316}
317
318impl event::Source for TcpStream {
319 fn register(
320 &mut self,
321 registry: &Registry,
322 token: Token,
323 interests: Interest,
324 ) -> io::Result<()> {
325 self.inner.register(registry, token, interests)
326 }
327
328 fn reregister(
329 &mut self,
330 registry: &Registry,
331 token: Token,
332 interests: Interest,
333 ) -> io::Result<()> {
334 self.inner.reregister(registry, token, interests)
335 }
336
337 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
338 self.inner.deregister(registry)
339 }
340}
341
342impl fmt::Debug for TcpStream {
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 self.inner.fmt(f)
345 }
346}
347
348#[cfg(unix)]
349impl IntoRawFd for TcpStream {
350 fn into_raw_fd(self) -> RawFd {
351 self.inner.into_inner().into_raw_fd()
352 }
353}
354
355#[cfg(unix)]
356impl AsRawFd for TcpStream {
357 fn as_raw_fd(&self) -> RawFd {
358 self.inner.as_raw_fd()
359 }
360}
361
362#[cfg(unix)]
363impl FromRawFd for TcpStream {
364 /// Converts a `RawFd` to a `TcpStream`.
365 ///
366 /// # Notes
367 ///
368 /// The caller is responsible for ensuring that the socket is in
369 /// non-blocking mode.
370 unsafe fn from_raw_fd(fd: RawFd) -> TcpStream {
371 TcpStream::from_std(stream:FromRawFd::from_raw_fd(fd))
372 }
373}
374
375#[cfg(windows)]
376impl IntoRawSocket for TcpStream {
377 fn into_raw_socket(self) -> RawSocket {
378 self.inner.into_inner().into_raw_socket()
379 }
380}
381
382#[cfg(windows)]
383impl AsRawSocket for TcpStream {
384 fn as_raw_socket(&self) -> RawSocket {
385 self.inner.as_raw_socket()
386 }
387}
388
389#[cfg(windows)]
390impl FromRawSocket for TcpStream {
391 /// Converts a `RawSocket` to a `TcpStream`.
392 ///
393 /// # Notes
394 ///
395 /// The caller is responsible for ensuring that the socket is in
396 /// non-blocking mode.
397 unsafe fn from_raw_socket(socket: RawSocket) -> TcpStream {
398 TcpStream::from_std(FromRawSocket::from_raw_socket(socket))
399 }
400}
401
402#[cfg(target_os = "wasi")]
403impl IntoRawFd for TcpStream {
404 fn into_raw_fd(self) -> RawFd {
405 self.inner.into_inner().into_raw_fd()
406 }
407}
408
409#[cfg(target_os = "wasi")]
410impl AsRawFd for TcpStream {
411 fn as_raw_fd(&self) -> RawFd {
412 self.inner.as_raw_fd()
413 }
414}
415
416#[cfg(target_os = "wasi")]
417impl FromRawFd for TcpStream {
418 /// Converts a `RawFd` to a `TcpStream`.
419 ///
420 /// # Notes
421 ///
422 /// The caller is responsible for ensuring that the socket is in
423 /// non-blocking mode.
424 unsafe fn from_raw_fd(fd: RawFd) -> TcpStream {
425 TcpStream::from_std(FromRawFd::from_raw_fd(fd))
426 }
427}
428