1use crate::io::interest::Interest;
2use crate::runtime::io::Registration;
3use crate::runtime::scheduler;
4
5use mio::event::Source;
6use std::fmt;
7use std::io;
8use std::ops::Deref;
9use std::panic::{RefUnwindSafe, UnwindSafe};
10
11cfg_io_driver! {
12 /// Associates an I/O resource that implements the [`std::io::Read`] and/or
13 /// [`std::io::Write`] traits with the reactor that drives it.
14 ///
15 /// `PollEvented` uses [`Registration`] internally to take a type that
16 /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or
17 /// [`std::io::Write`] and associate it with a reactor that will drive it.
18 ///
19 /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be
20 /// used from within the future's execution model. As such, the
21 /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
22 /// implementations using the underlying I/O resource as well as readiness
23 /// events provided by the reactor.
24 ///
25 /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is
26 /// `Sync`), the caller must ensure that there are at most two tasks that
27 /// use a `PollEvented` instance concurrently. One for reading and one for
28 /// writing. While violating this requirement is "safe" from a Rust memory
29 /// model point of view, it will result in unexpected behavior in the form
30 /// of lost notifications and tasks hanging.
31 ///
32 /// ## Readiness events
33 ///
34 /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations,
35 /// this type also supports access to the underlying readiness event stream.
36 /// While similar in function to what [`Registration`] provides, the
37 /// semantics are a bit different.
38 ///
39 /// Two functions are provided to access the readiness events:
40 /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the
41 /// current readiness state of the `PollEvented` instance. If
42 /// [`poll_read_ready`] indicates read readiness, immediately calling
43 /// [`poll_read_ready`] again will also indicate read readiness.
44 ///
45 /// When the operation is attempted and is unable to succeed due to the I/O
46 /// resource not being ready, the caller must call [`clear_readiness`].
47 /// This clears the readiness state until a new readiness event is received.
48 ///
49 /// This allows the caller to implement additional functions. For example,
50 /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
51 /// [`clear_readiness`].
52 ///
53 /// ## Platform-specific events
54 ///
55 /// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
56 /// These events are included as part of the read readiness event stream. The
57 /// write readiness event stream is only for `Ready::writable()` events.
58 ///
59 /// [`AsyncRead`]: crate::io::AsyncRead
60 /// [`AsyncWrite`]: crate::io::AsyncWrite
61 /// [`TcpListener`]: crate::net::TcpListener
62 /// [`clear_readiness`]: Registration::clear_readiness
63 /// [`poll_read_ready`]: Registration::poll_read_ready
64 /// [`poll_write_ready`]: Registration::poll_write_ready
65 pub(crate) struct PollEvented<E: Source> {
66 io: Option<E>,
67 registration: Registration,
68 }
69}
70
71// ===== impl PollEvented =====
72
73impl<E: Source> PollEvented<E> {
74 /// Creates a new `PollEvented` associated with the default reactor.
75 ///
76 /// The returned `PollEvented` has readable and writable interests. For more control, use
77 /// [`Self::new_with_interest`].
78 ///
79 /// # Panics
80 ///
81 /// This function panics if thread-local runtime is not set.
82 ///
83 /// The runtime is usually set implicitly when this function is called
84 /// from a future driven by a tokio runtime, otherwise runtime can be set
85 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
86 #[track_caller]
87 #[cfg_attr(feature = "signal", allow(unused))]
88 pub(crate) fn new(io: E) -> io::Result<Self> {
89 PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE)
90 }
91
92 /// Creates a new `PollEvented` associated with the default reactor, for
93 /// specific `Interest` state. `new_with_interest` should be used over `new`
94 /// when you need control over the readiness state, such as when a file
95 /// descriptor only allows reads. This does not add `hup` or `error` so if
96 /// you are interested in those states, you will need to add them to the
97 /// readiness state passed to this function.
98 ///
99 /// # Panics
100 ///
101 /// This function panics if thread-local runtime is not set.
102 ///
103 /// The runtime is usually set implicitly when this function is called from
104 /// a future driven by a tokio runtime, otherwise runtime can be set
105 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter)
106 /// function.
107 #[track_caller]
108 #[cfg_attr(feature = "signal", allow(unused))]
109 pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
110 Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current())
111 }
112
113 #[track_caller]
114 pub(crate) fn new_with_interest_and_handle(
115 mut io: E,
116 interest: Interest,
117 handle: scheduler::Handle,
118 ) -> io::Result<Self> {
119 let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
120 Ok(Self {
121 io: Some(io),
122 registration,
123 })
124 }
125
126 /// Returns a reference to the registration.
127 #[cfg(feature = "net")]
128 pub(crate) fn registration(&self) -> &Registration {
129 &self.registration
130 }
131
132 /// Deregisters the inner io from the registration and returns a Result containing the inner io.
133 #[cfg(any(feature = "net", feature = "process"))]
134 pub(crate) fn into_inner(mut self) -> io::Result<E> {
135 let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
136 self.registration.deregister(&mut inner)?;
137 Ok(inner)
138 }
139}
140
141feature! {
142 #![any(feature = "net", all(unix, feature = "process"))]
143
144 use crate::io::ReadBuf;
145 use std::task::{Context, Poll};
146
147 impl<E: Source> PollEvented<E> {
148 // Safety: The caller must ensure that `E` can read into uninitialized memory
149 pub(crate) unsafe fn poll_read<'a>(
150 &'a self,
151 cx: &mut Context<'_>,
152 buf: &mut ReadBuf<'_>,
153 ) -> Poll<io::Result<()>>
154 where
155 &'a E: io::Read + 'a,
156 {
157 use std::io::Read;
158
159 loop {
160 let evt = ready!(self.registration.poll_read_ready(cx))?;
161
162 let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
163 let len = b.len();
164
165 match self.io.as_ref().unwrap().read(b) {
166 Ok(n) => {
167 // if we read a partially full buffer, this is sufficient on unix to show
168 // that the socket buffer has been drained. Unfortunately this assumption
169 // fails for level-triggered selectors (like on Windows or poll even for
170 // UNIX): https://github.com/tokio-rs/tokio/issues/5866
171 if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < len) {
172 self.registration.clear_readiness(evt);
173 }
174
175 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
176 // buffer.
177 buf.assume_init(n);
178 buf.advance(n);
179 return Poll::Ready(Ok(()));
180 },
181 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
182 self.registration.clear_readiness(evt);
183 }
184 Err(e) => return Poll::Ready(Err(e)),
185 }
186 }
187 }
188
189 pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
190 where
191 &'a E: io::Write + 'a,
192 {
193 use std::io::Write;
194
195 loop {
196 let evt = ready!(self.registration.poll_write_ready(cx))?;
197
198 match self.io.as_ref().unwrap().write(buf) {
199 Ok(n) => {
200 // if we write only part of our buffer, this is sufficient on unix to show
201 // that the socket buffer is full. Unfortunately this assumption
202 // fails for level-triggered selectors (like on Windows or poll even for
203 // UNIX): https://github.com/tokio-rs/tokio/issues/5866
204 if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < buf.len()) {
205 self.registration.clear_readiness(evt);
206 }
207
208 return Poll::Ready(Ok(n));
209 },
210 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
211 self.registration.clear_readiness(evt);
212 }
213 Err(e) => return Poll::Ready(Err(e)),
214 }
215 }
216 }
217
218 #[cfg(any(feature = "net", feature = "process"))]
219 pub(crate) fn poll_write_vectored<'a>(
220 &'a self,
221 cx: &mut Context<'_>,
222 bufs: &[io::IoSlice<'_>],
223 ) -> Poll<io::Result<usize>>
224 where
225 &'a E: io::Write + 'a,
226 {
227 use std::io::Write;
228 self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs))
229 }
230 }
231}
232
233impl<E: Source> UnwindSafe for PollEvented<E> {}
234
235impl<E: Source> RefUnwindSafe for PollEvented<E> {}
236
237impl<E: Source> Deref for PollEvented<E> {
238 type Target = E;
239
240 fn deref(&self) -> &E {
241 self.io.as_ref().unwrap()
242 }
243}
244
245impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> {
246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247 f.debug_struct("PollEvented").field("io", &self.io).finish()
248 }
249}
250
251impl<E: Source> Drop for PollEvented<E> {
252 fn drop(&mut self) {
253 if let Some(mut io) = self.io.take() {
254 // Ignore errors
255 let _ = self.registration.deregister(&mut io);
256 }
257 }
258}
259