| 1 | use crate::io::interest::Interest; |
| 2 | use crate::runtime::io::Registration; |
| 3 | use crate::runtime::scheduler; |
| 4 | |
| 5 | use mio::event::Source; |
| 6 | use std::fmt; |
| 7 | use std::io; |
| 8 | use std::ops::Deref; |
| 9 | use std::panic::{RefUnwindSafe, UnwindSafe}; |
| 10 | use std::task::ready; |
| 11 | |
| 12 | cfg_io_driver! { |
| 13 | /// Associates an I/O resource that implements the [`std::io::Read`] and/or |
| 14 | /// [`std::io::Write`] traits with the reactor that drives it. |
| 15 | /// |
| 16 | /// `PollEvented` uses [`Registration`] internally to take a type that |
| 17 | /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or |
| 18 | /// [`std::io::Write`] and associate it with a reactor that will drive it. |
| 19 | /// |
| 20 | /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be |
| 21 | /// used from within the future's execution model. As such, the |
| 22 | /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`] |
| 23 | /// implementations using the underlying I/O resource as well as readiness |
| 24 | /// events provided by the reactor. |
| 25 | /// |
| 26 | /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is |
| 27 | /// `Sync`), the caller must ensure that there are at most two tasks that |
| 28 | /// use a `PollEvented` instance concurrently. One for reading and one for |
| 29 | /// writing. While violating this requirement is "safe" from a Rust memory |
| 30 | /// model point of view, it will result in unexpected behavior in the form |
| 31 | /// of lost notifications and tasks hanging. |
| 32 | /// |
| 33 | /// ## Readiness events |
| 34 | /// |
| 35 | /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations, |
| 36 | /// this type also supports access to the underlying readiness event stream. |
| 37 | /// While similar in function to what [`Registration`] provides, the |
| 38 | /// semantics are a bit different. |
| 39 | /// |
| 40 | /// Two functions are provided to access the readiness events: |
| 41 | /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the |
| 42 | /// current readiness state of the `PollEvented` instance. If |
| 43 | /// [`poll_read_ready`] indicates read readiness, immediately calling |
| 44 | /// [`poll_read_ready`] again will also indicate read readiness. |
| 45 | /// |
| 46 | /// When the operation is attempted and is unable to succeed due to the I/O |
| 47 | /// resource not being ready, the caller must call [`clear_readiness`]. |
| 48 | /// This clears the readiness state until a new readiness event is received. |
| 49 | /// |
| 50 | /// This allows the caller to implement additional functions. For example, |
| 51 | /// [`TcpListener`] implements `poll_accept` by using [`poll_read_ready`] and |
| 52 | /// [`clear_readiness`]. |
| 53 | /// |
| 54 | /// ## Platform-specific events |
| 55 | /// |
| 56 | /// `PollEvented` also allows receiving platform-specific `mio::Ready` events. |
| 57 | /// These events are included as part of the read readiness event stream. The |
| 58 | /// write readiness event stream is only for `Ready::writable()` events. |
| 59 | /// |
| 60 | /// [`AsyncRead`]: crate::io::AsyncRead |
| 61 | /// [`AsyncWrite`]: crate::io::AsyncWrite |
| 62 | /// [`TcpListener`]: crate::net::TcpListener |
| 63 | /// [`clear_readiness`]: Registration::clear_readiness |
| 64 | /// [`poll_read_ready`]: Registration::poll_read_ready |
| 65 | /// [`poll_write_ready`]: Registration::poll_write_ready |
| 66 | pub(crate) struct PollEvented<E: Source> { |
| 67 | io: Option<E>, |
| 68 | registration: Registration, |
| 69 | } |
| 70 | } |
| 71 | |
| 72 | // ===== impl PollEvented ===== |
| 73 | |
| 74 | impl<E: Source> PollEvented<E> { |
| 75 | /// Creates a new `PollEvented` associated with the default reactor. |
| 76 | /// |
| 77 | /// The returned `PollEvented` has readable and writable interests. For more control, use |
| 78 | /// [`Self::new_with_interest`]. |
| 79 | /// |
| 80 | /// # Panics |
| 81 | /// |
| 82 | /// This function panics if thread-local runtime is not set. |
| 83 | /// |
| 84 | /// The runtime is usually set implicitly when this function is called |
| 85 | /// from a future driven by a tokio runtime, otherwise runtime can be set |
| 86 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. |
| 87 | #[track_caller ] |
| 88 | #[cfg_attr (feature = "signal" , allow(unused))] |
| 89 | pub(crate) fn new(io: E) -> io::Result<Self> { |
| 90 | PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE) |
| 91 | } |
| 92 | |
| 93 | /// Creates a new `PollEvented` associated with the default reactor, for |
| 94 | /// specific `Interest` state. `new_with_interest` should be used over `new` |
| 95 | /// when you need control over the readiness state, such as when a file |
| 96 | /// descriptor only allows reads. This does not add `hup` or `error` so if |
| 97 | /// you are interested in those states, you will need to add them to the |
| 98 | /// readiness state passed to this function. |
| 99 | /// |
| 100 | /// # Panics |
| 101 | /// |
| 102 | /// This function panics if thread-local runtime is not set. |
| 103 | /// |
| 104 | /// The runtime is usually set implicitly when this function is called from |
| 105 | /// a future driven by a tokio runtime, otherwise runtime can be set |
| 106 | /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) |
| 107 | /// function. |
| 108 | #[track_caller ] |
| 109 | #[cfg_attr (feature = "signal" , allow(unused))] |
| 110 | pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { |
| 111 | Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current()) |
| 112 | } |
| 113 | |
| 114 | #[track_caller ] |
| 115 | pub(crate) fn new_with_interest_and_handle( |
| 116 | mut io: E, |
| 117 | interest: Interest, |
| 118 | handle: scheduler::Handle, |
| 119 | ) -> io::Result<Self> { |
| 120 | let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; |
| 121 | Ok(Self { |
| 122 | io: Some(io), |
| 123 | registration, |
| 124 | }) |
| 125 | } |
| 126 | |
| 127 | /// Returns a reference to the registration. |
| 128 | #[cfg (feature = "net" )] |
| 129 | pub(crate) fn registration(&self) -> &Registration { |
| 130 | &self.registration |
| 131 | } |
| 132 | |
| 133 | /// Deregisters the inner io from the registration and returns a Result containing the inner io. |
| 134 | #[cfg (any(feature = "net" , feature = "process" ))] |
| 135 | pub(crate) fn into_inner(mut self) -> io::Result<E> { |
| 136 | let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. |
| 137 | self.registration.deregister(&mut inner)?; |
| 138 | Ok(inner) |
| 139 | } |
| 140 | |
| 141 | #[cfg (all(feature = "process" , target_os = "linux" ))] |
| 142 | pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 143 | self.registration |
| 144 | .poll_read_ready(cx) |
| 145 | .map_err(io::Error::from) |
| 146 | .map_ok(|_| ()) |
| 147 | } |
| 148 | |
| 149 | /// Re-register under new runtime with `interest`. |
| 150 | #[cfg (all(feature = "process" , target_os = "linux" ))] |
| 151 | pub(crate) fn reregister(&mut self, interest: Interest) -> io::Result<()> { |
| 152 | let io = self.io.as_mut().unwrap(); // As io shouldn't ever be None, just unwrap here. |
| 153 | let _ = self.registration.deregister(io); |
| 154 | self.registration = |
| 155 | Registration::new_with_interest_and_handle(io, interest, scheduler::Handle::current())?; |
| 156 | |
| 157 | Ok(()) |
| 158 | } |
| 159 | } |
| 160 | |
| 161 | feature! { |
| 162 | #![any(feature = "net" , all(unix, feature = "process" ))] |
| 163 | |
| 164 | use crate::io::ReadBuf; |
| 165 | use std::task::{Context, Poll}; |
| 166 | |
| 167 | impl<E: Source> PollEvented<E> { |
| 168 | // Safety: The caller must ensure that `E` can read into uninitialized memory |
| 169 | pub(crate) unsafe fn poll_read<'a>( |
| 170 | &'a self, |
| 171 | cx: &mut Context<'_>, |
| 172 | buf: &mut ReadBuf<'_>, |
| 173 | ) -> Poll<io::Result<()>> |
| 174 | where |
| 175 | &'a E: io::Read + 'a, |
| 176 | { |
| 177 | use std::io::Read; |
| 178 | |
| 179 | loop { |
| 180 | let evt = ready!(self.registration.poll_read_ready(cx))?; |
| 181 | |
| 182 | let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); |
| 183 | |
| 184 | // used only when the cfgs below apply |
| 185 | #[allow (unused_variables)] |
| 186 | let len = b.len(); |
| 187 | |
| 188 | match self.io.as_ref().unwrap().read(b) { |
| 189 | Ok(n) => { |
| 190 | // When mio is using the epoll or kqueue selector, reading a partially full |
| 191 | // buffer is sufficient to show that the socket buffer has been drained. |
| 192 | // |
| 193 | // This optimization does not work for level-triggered selectors such as |
| 194 | // windows or when poll is used. |
| 195 | // |
| 196 | // Read more: |
| 197 | // https://github.com/tokio-rs/tokio/issues/5866 |
| 198 | #[cfg (all( |
| 199 | not(mio_unsupported_force_poll_poll), |
| 200 | any( |
| 201 | // epoll |
| 202 | target_os = "android" , |
| 203 | target_os = "illumos" , |
| 204 | target_os = "linux" , |
| 205 | target_os = "redox" , |
| 206 | // kqueue |
| 207 | target_os = "dragonfly" , |
| 208 | target_os = "freebsd" , |
| 209 | target_os = "ios" , |
| 210 | target_os = "macos" , |
| 211 | target_os = "netbsd" , |
| 212 | target_os = "openbsd" , |
| 213 | target_os = "tvos" , |
| 214 | target_os = "visionos" , |
| 215 | target_os = "watchos" , |
| 216 | ) |
| 217 | ))] |
| 218 | if 0 < n && n < len { |
| 219 | self.registration.clear_readiness(evt); |
| 220 | } |
| 221 | |
| 222 | // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the |
| 223 | // buffer. |
| 224 | buf.assume_init(n); |
| 225 | buf.advance(n); |
| 226 | return Poll::Ready(Ok(())); |
| 227 | }, |
| 228 | Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
| 229 | self.registration.clear_readiness(evt); |
| 230 | } |
| 231 | Err(e) => return Poll::Ready(Err(e)), |
| 232 | } |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> |
| 237 | where |
| 238 | &'a E: io::Write + 'a, |
| 239 | { |
| 240 | use std::io::Write; |
| 241 | |
| 242 | loop { |
| 243 | let evt = ready!(self.registration.poll_write_ready(cx))?; |
| 244 | |
| 245 | match self.io.as_ref().unwrap().write(buf) { |
| 246 | Ok(n) => { |
| 247 | // if we write only part of our buffer, this is sufficient on unix to show |
| 248 | // that the socket buffer is full. Unfortunately this assumption |
| 249 | // fails for level-triggered selectors (like on Windows or poll even for |
| 250 | // UNIX): https://github.com/tokio-rs/tokio/issues/5866 |
| 251 | if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < buf.len()) { |
| 252 | self.registration.clear_readiness(evt); |
| 253 | } |
| 254 | |
| 255 | return Poll::Ready(Ok(n)); |
| 256 | }, |
| 257 | Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
| 258 | self.registration.clear_readiness(evt); |
| 259 | } |
| 260 | Err(e) => return Poll::Ready(Err(e)), |
| 261 | } |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | #[cfg (any(feature = "net" , feature = "process" ))] |
| 266 | pub(crate) fn poll_write_vectored<'a>( |
| 267 | &'a self, |
| 268 | cx: &mut Context<'_>, |
| 269 | bufs: &[io::IoSlice<'_>], |
| 270 | ) -> Poll<io::Result<usize>> |
| 271 | where |
| 272 | &'a E: io::Write + 'a, |
| 273 | { |
| 274 | use std::io::Write; |
| 275 | self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs)) |
| 276 | } |
| 277 | } |
| 278 | } |
| 279 | |
| 280 | impl<E: Source> UnwindSafe for PollEvented<E> {} |
| 281 | |
| 282 | impl<E: Source> RefUnwindSafe for PollEvented<E> {} |
| 283 | |
| 284 | impl<E: Source> Deref for PollEvented<E> { |
| 285 | type Target = E; |
| 286 | |
| 287 | fn deref(&self) -> &E { |
| 288 | self.io.as_ref().unwrap() |
| 289 | } |
| 290 | } |
| 291 | |
| 292 | impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> { |
| 293 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 294 | f.debug_struct("PollEvented" ).field(name:"io" , &self.io).finish() |
| 295 | } |
| 296 | } |
| 297 | |
| 298 | impl<E: Source> Drop for PollEvented<E> { |
| 299 | fn drop(&mut self) { |
| 300 | if let Some(mut io: E) = self.io.take() { |
| 301 | // Ignore errors |
| 302 | let _ = self.registration.deregister(&mut io); |
| 303 | } |
| 304 | } |
| 305 | } |
| 306 | |