1 | use crate::io::interest::Interest; |
2 | use crate::runtime::io::Registration; |
3 | use crate::runtime::scheduler; |
4 | |
5 | use mio::event::Source; |
6 | use std::fmt; |
7 | use std::io; |
8 | use std::ops::Deref; |
9 | use std::panic::{RefUnwindSafe, UnwindSafe}; |
10 | |
11 | cfg_io_driver! { |
12 | /// Associates an I/O resource that implements the [`std::io::Read`] and/or |
13 | /// [`std::io::Write`] traits with the reactor that drives it. |
14 | /// |
15 | /// `PollEvented` uses [`Registration`] internally to take a type that |
16 | /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or |
17 | /// [`std::io::Write`] and associate it with a reactor that will drive it. |
18 | /// |
19 | /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be |
20 | /// used from within the future's execution model. As such, the |
21 | /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`] |
22 | /// implementations using the underlying I/O resource as well as readiness |
23 | /// events provided by the reactor. |
24 | /// |
25 | /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is |
26 | /// `Sync`), the caller must ensure that there are at most two tasks that |
27 | /// use a `PollEvented` instance concurrently. One for reading and one for |
28 | /// writing. While violating this requirement is "safe" from a Rust memory |
29 | /// model point of view, it will result in unexpected behavior in the form |
30 | /// of lost notifications and tasks hanging. |
31 | /// |
32 | /// ## Readiness events |
33 | /// |
34 | /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations, |
35 | /// this type also supports access to the underlying readiness event stream. |
36 | /// While similar in function to what [`Registration`] provides, the |
37 | /// semantics are a bit different. |
38 | /// |
39 | /// Two functions are provided to access the readiness events: |
40 | /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the |
41 | /// current readiness state of the `PollEvented` instance. If |
42 | /// [`poll_read_ready`] indicates read readiness, immediately calling |
43 | /// [`poll_read_ready`] again will also indicate read readiness. |
44 | /// |
45 | /// When the operation is attempted and is unable to succeed due to the I/O |
46 | /// resource not being ready, the caller must call [`clear_readiness`]. |
47 | /// This clears the readiness state until a new readiness event is received. |
48 | /// |
49 | /// This allows the caller to implement additional functions. For example, |
50 | /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and |
51 | /// [`clear_readiness`]. |
52 | /// |
53 | /// ## Platform-specific events |
54 | /// |
55 | /// `PollEvented` also allows receiving platform-specific `mio::Ready` events. |
56 | /// These events are included as part of the read readiness event stream. The |
57 | /// write readiness event stream is only for `Ready::writable()` events. |
58 | /// |
59 | /// [`AsyncRead`]: crate::io::AsyncRead |
60 | /// [`AsyncWrite`]: crate::io::AsyncWrite |
61 | /// [`TcpListener`]: crate::net::TcpListener |
62 | /// [`clear_readiness`]: Registration::clear_readiness |
63 | /// [`poll_read_ready`]: Registration::poll_read_ready |
64 | /// [`poll_write_ready`]: Registration::poll_write_ready |
65 | pub(crate) struct PollEvented<E: Source> { |
66 | io: Option<E>, |
67 | registration: Registration, |
68 | } |
69 | } |
70 | |
71 | // ===== impl PollEvented ===== |
72 | |
73 | impl<E: Source> PollEvented<E> { |
74 | /// Creates a new `PollEvented` associated with the default reactor. |
75 | /// |
76 | /// The returned `PollEvented` has readable and writable interests. For more control, use |
77 | /// [`Self::new_with_interest`]. |
78 | /// |
79 | /// # Panics |
80 | /// |
81 | /// This function panics if thread-local runtime is not set. |
82 | /// |
83 | /// The runtime is usually set implicitly when this function is called |
84 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
85 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
86 | #[track_caller ] |
87 | #[cfg_attr (feature = "signal" , allow(unused))] |
88 | pub(crate) fn new(io: E) -> io::Result<Self> { |
89 | PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE) |
90 | } |
91 | |
92 | /// Creates a new `PollEvented` associated with the default reactor, for |
93 | /// specific `Interest` state. `new_with_interest` should be used over `new` |
94 | /// when you need control over the readiness state, such as when a file |
95 | /// descriptor only allows reads. This does not add `hup` or `error` so if |
96 | /// you are interested in those states, you will need to add them to the |
97 | /// readiness state passed to this function. |
98 | /// |
99 | /// # Panics |
100 | /// |
101 | /// This function panics if thread-local runtime is not set. |
102 | /// |
103 | /// The runtime is usually set implicitly when this function is called from |
104 | /// a future driven by a tokio runtime, otherwise runtime can be set |
105 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) |
106 | /// function. |
107 | #[track_caller ] |
108 | #[cfg_attr (feature = "signal" , allow(unused))] |
109 | pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { |
110 | Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current()) |
111 | } |
112 | |
113 | #[track_caller ] |
114 | pub(crate) fn new_with_interest_and_handle( |
115 | mut io: E, |
116 | interest: Interest, |
117 | handle: scheduler::Handle, |
118 | ) -> io::Result<Self> { |
119 | let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; |
120 | Ok(Self { |
121 | io: Some(io), |
122 | registration, |
123 | }) |
124 | } |
125 | |
126 | /// Returns a reference to the registration. |
127 | #[cfg (any(feature = "net" ))] |
128 | pub(crate) fn registration(&self) -> &Registration { |
129 | &self.registration |
130 | } |
131 | |
132 | /// Deregisters the inner io from the registration and returns a Result containing the inner io. |
133 | #[cfg (any(feature = "net" , feature = "process" ))] |
134 | pub(crate) fn into_inner(mut self) -> io::Result<E> { |
135 | let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. |
136 | self.registration.deregister(&mut inner)?; |
137 | Ok(inner) |
138 | } |
139 | } |
140 | |
141 | feature! { |
142 | #![any(feature = "net" , all(unix, feature = "process" ))] |
143 | |
144 | use crate::io::ReadBuf; |
145 | use std::task::{Context, Poll}; |
146 | |
147 | impl<E: Source> PollEvented<E> { |
148 | // Safety: The caller must ensure that `E` can read into uninitialized memory |
149 | pub(crate) unsafe fn poll_read<'a>( |
150 | &'a self, |
151 | cx: &mut Context<'_>, |
152 | buf: &mut ReadBuf<'_>, |
153 | ) -> Poll<io::Result<()>> |
154 | where |
155 | &'a E: io::Read + 'a, |
156 | { |
157 | use std::io::Read; |
158 | |
159 | loop { |
160 | let evt = ready!(self.registration.poll_read_ready(cx))?; |
161 | |
162 | let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); |
163 | let len = b.len(); |
164 | |
165 | match self.io.as_ref().unwrap().read(b) { |
166 | Ok(n) => { |
167 | // if we read a partially full buffer, this is sufficient on unix to show |
168 | // that the socket buffer has been drained |
169 | if n > 0 && (!cfg!(windows) && n < len) { |
170 | self.registration.clear_readiness(evt); |
171 | } |
172 | |
173 | // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the |
174 | // buffer. |
175 | buf.assume_init(n); |
176 | buf.advance(n); |
177 | return Poll::Ready(Ok(())); |
178 | }, |
179 | Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
180 | self.registration.clear_readiness(evt); |
181 | } |
182 | Err(e) => return Poll::Ready(Err(e)), |
183 | } |
184 | } |
185 | } |
186 | |
187 | pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> |
188 | where |
189 | &'a E: io::Write + 'a, |
190 | { |
191 | use std::io::Write; |
192 | |
193 | loop { |
194 | let evt = ready!(self.registration.poll_write_ready(cx))?; |
195 | |
196 | match self.io.as_ref().unwrap().write(buf) { |
197 | Ok(n) => { |
198 | // if we write only part of our buffer, this is sufficient on unix to show |
199 | // that the socket buffer is full |
200 | if n > 0 && (!cfg!(windows) && n < buf.len()) { |
201 | self.registration.clear_readiness(evt); |
202 | } |
203 | |
204 | return Poll::Ready(Ok(n)); |
205 | }, |
206 | Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
207 | self.registration.clear_readiness(evt); |
208 | } |
209 | Err(e) => return Poll::Ready(Err(e)), |
210 | } |
211 | } |
212 | } |
213 | |
214 | #[cfg (any(feature = "net" , feature = "process" ))] |
215 | pub(crate) fn poll_write_vectored<'a>( |
216 | &'a self, |
217 | cx: &mut Context<'_>, |
218 | bufs: &[io::IoSlice<'_>], |
219 | ) -> Poll<io::Result<usize>> |
220 | where |
221 | &'a E: io::Write + 'a, |
222 | { |
223 | use std::io::Write; |
224 | self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs)) |
225 | } |
226 | } |
227 | } |
228 | |
229 | impl<E: Source> UnwindSafe for PollEvented<E> {} |
230 | |
231 | impl<E: Source> RefUnwindSafe for PollEvented<E> {} |
232 | |
233 | impl<E: Source> Deref for PollEvented<E> { |
234 | type Target = E; |
235 | |
236 | fn deref(&self) -> &E { |
237 | self.io.as_ref().unwrap() |
238 | } |
239 | } |
240 | |
241 | impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> { |
242 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
243 | f.debug_struct("PollEvented" ).field(name:"io" , &self.io).finish() |
244 | } |
245 | } |
246 | |
247 | impl<E: Source> Drop for PollEvented<E> { |
248 | fn drop(&mut self) { |
249 | if let Some(mut io: E) = self.io.take() { |
250 | // Ignore errors |
251 | let _ = self.registration.deregister(&mut io); |
252 | } |
253 | } |
254 | } |
255 | |