| 1 | #![cfg_attr (not(feature = "net" ), allow(dead_code))] |
| 2 | |
| 3 | use crate::io::interest::Interest; |
| 4 | use crate::runtime::io::{Direction, Handle, ReadyEvent, ScheduledIo}; |
| 5 | use crate::runtime::scheduler; |
| 6 | |
| 7 | use mio::event::Source; |
| 8 | use std::io; |
| 9 | use std::sync::Arc; |
| 10 | use std::task::{ready, Context, Poll}; |
| 11 | |
| 12 | cfg_io_driver! { |
| 13 | /// Associates an I/O resource with the reactor instance that drives it. |
| 14 | /// |
| 15 | /// A registration represents an I/O resource registered with a Reactor such |
| 16 | /// that it will receive task notifications on readiness. This is the lowest |
| 17 | /// level API for integrating with a reactor. |
| 18 | /// |
| 19 | /// The association between an I/O resource is made by calling |
| 20 | /// [`new_with_interest_and_handle`]. |
| 21 | /// Once the association is established, it remains established until the |
| 22 | /// registration instance is dropped. |
| 23 | /// |
| 24 | /// A registration instance represents two separate readiness streams. One |
| 25 | /// for the read readiness and one for write readiness. These streams are |
| 26 | /// independent and can be consumed from separate tasks. |
| 27 | /// |
| 28 | /// **Note**: while `Registration` is `Sync`, the caller must ensure that |
| 29 | /// there are at most two tasks that use a registration instance |
| 30 | /// concurrently. One task for [`poll_read_ready`] and one task for |
| 31 | /// [`poll_write_ready`]. While violating this requirement is "safe" from a |
| 32 | /// Rust memory safety point of view, it will result in unexpected behavior |
| 33 | /// in the form of lost notifications and tasks hanging. |
| 34 | /// |
| 35 | /// ## Platform-specific events |
| 36 | /// |
| 37 | /// `Registration` also allows receiving platform-specific `mio::Ready` |
| 38 | /// events. These events are included as part of the read readiness event |
| 39 | /// stream. The write readiness event stream is only for `Ready::writable()` |
| 40 | /// events. |
| 41 | /// |
| 42 | /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle |
| 43 | /// [`poll_read_ready`]: method@Self::poll_read_ready` |
| 44 | /// [`poll_write_ready`]: method@Self::poll_write_ready` |
| 45 | #[derive (Debug)] |
| 46 | pub(crate) struct Registration { |
| 47 | /// Handle to the associated runtime. |
| 48 | /// |
| 49 | /// TODO: this can probably be moved into `ScheduledIo`. |
| 50 | handle: scheduler::Handle, |
| 51 | |
| 52 | /// Reference to state stored by the driver. |
| 53 | shared: Arc<ScheduledIo>, |
| 54 | } |
| 55 | } |
| 56 | |
| 57 | unsafe impl Send for Registration {} |
| 58 | unsafe impl Sync for Registration {} |
| 59 | |
| 60 | // ===== impl Registration ===== |
| 61 | |
| 62 | impl Registration { |
| 63 | /// Registers the I/O resource with the reactor for the provided handle, for |
| 64 | /// a specific `Interest`. This does not add `hup` or `error` so if you are |
| 65 | /// interested in those states, you will need to add them to the readiness |
| 66 | /// state passed to this function. |
| 67 | /// |
| 68 | /// # Return |
| 69 | /// |
| 70 | /// - `Ok` if the registration happened successfully |
| 71 | /// - `Err` if an error was encountered during registration |
| 72 | #[track_caller ] |
| 73 | pub(crate) fn new_with_interest_and_handle( |
| 74 | io: &mut impl Source, |
| 75 | interest: Interest, |
| 76 | handle: scheduler::Handle, |
| 77 | ) -> io::Result<Registration> { |
| 78 | let shared = handle.driver().io().add_source(io, interest)?; |
| 79 | |
| 80 | Ok(Registration { handle, shared }) |
| 81 | } |
| 82 | |
| 83 | /// Deregisters the I/O resource from the reactor it is associated with. |
| 84 | /// |
| 85 | /// This function must be called before the I/O resource associated with the |
| 86 | /// registration is dropped. |
| 87 | /// |
| 88 | /// Note that deregistering does not guarantee that the I/O resource can be |
| 89 | /// registered with a different reactor. Some I/O resource types can only be |
| 90 | /// associated with a single reactor instance for their lifetime. |
| 91 | /// |
| 92 | /// # Return |
| 93 | /// |
| 94 | /// If the deregistration was successful, `Ok` is returned. Any calls to |
| 95 | /// `Reactor::turn` that happen after a successful call to `deregister` will |
| 96 | /// no longer result in notifications getting sent for this registration. |
| 97 | /// |
| 98 | /// `Err` is returned if an error is encountered. |
| 99 | pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { |
| 100 | self.handle().deregister_source(&self.shared, io) |
| 101 | } |
| 102 | |
| 103 | pub(crate) fn clear_readiness(&self, event: ReadyEvent) { |
| 104 | self.shared.clear_readiness(event); |
| 105 | } |
| 106 | |
| 107 | // Uses the poll path, requiring the caller to ensure mutual exclusion for |
| 108 | // correctness. Only the last task to call this function is notified. |
| 109 | pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { |
| 110 | self.poll_ready(cx, Direction::Read) |
| 111 | } |
| 112 | |
| 113 | // Uses the poll path, requiring the caller to ensure mutual exclusion for |
| 114 | // correctness. Only the last task to call this function is notified. |
| 115 | pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { |
| 116 | self.poll_ready(cx, Direction::Write) |
| 117 | } |
| 118 | |
| 119 | // Uses the poll path, requiring the caller to ensure mutual exclusion for |
| 120 | // correctness. Only the last task to call this function is notified. |
| 121 | #[cfg (not(target_os = "wasi" ))] |
| 122 | pub(crate) fn poll_read_io<R>( |
| 123 | &self, |
| 124 | cx: &mut Context<'_>, |
| 125 | f: impl FnMut() -> io::Result<R>, |
| 126 | ) -> Poll<io::Result<R>> { |
| 127 | self.poll_io(cx, Direction::Read, f) |
| 128 | } |
| 129 | |
| 130 | // Uses the poll path, requiring the caller to ensure mutual exclusion for |
| 131 | // correctness. Only the last task to call this function is notified. |
| 132 | pub(crate) fn poll_write_io<R>( |
| 133 | &self, |
| 134 | cx: &mut Context<'_>, |
| 135 | f: impl FnMut() -> io::Result<R>, |
| 136 | ) -> Poll<io::Result<R>> { |
| 137 | self.poll_io(cx, Direction::Write, f) |
| 138 | } |
| 139 | |
| 140 | /// Polls for events on the I/O resource's `direction` readiness stream. |
| 141 | /// |
| 142 | /// If called with a task context, notify the task when a new event is |
| 143 | /// received. |
| 144 | fn poll_ready( |
| 145 | &self, |
| 146 | cx: &mut Context<'_>, |
| 147 | direction: Direction, |
| 148 | ) -> Poll<io::Result<ReadyEvent>> { |
| 149 | ready!(crate::trace::trace_leaf(cx)); |
| 150 | // Keep track of task budget |
| 151 | let coop = ready!(crate::task::coop::poll_proceed(cx)); |
| 152 | let ev = ready!(self.shared.poll_readiness(cx, direction)); |
| 153 | |
| 154 | if ev.is_shutdown { |
| 155 | return Poll::Ready(Err(gone())); |
| 156 | } |
| 157 | |
| 158 | coop.made_progress(); |
| 159 | Poll::Ready(Ok(ev)) |
| 160 | } |
| 161 | |
| 162 | fn poll_io<R>( |
| 163 | &self, |
| 164 | cx: &mut Context<'_>, |
| 165 | direction: Direction, |
| 166 | mut f: impl FnMut() -> io::Result<R>, |
| 167 | ) -> Poll<io::Result<R>> { |
| 168 | loop { |
| 169 | let ev = ready!(self.poll_ready(cx, direction))?; |
| 170 | |
| 171 | match f() { |
| 172 | Ok(ret) => { |
| 173 | return Poll::Ready(Ok(ret)); |
| 174 | } |
| 175 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
| 176 | self.clear_readiness(ev); |
| 177 | } |
| 178 | Err(e) => return Poll::Ready(Err(e)), |
| 179 | } |
| 180 | } |
| 181 | } |
| 182 | |
| 183 | pub(crate) fn try_io<R>( |
| 184 | &self, |
| 185 | interest: Interest, |
| 186 | f: impl FnOnce() -> io::Result<R>, |
| 187 | ) -> io::Result<R> { |
| 188 | let ev = self.shared.ready_event(interest); |
| 189 | |
| 190 | // Don't attempt the operation if the resource is not ready. |
| 191 | if ev.ready.is_empty() { |
| 192 | return Err(io::ErrorKind::WouldBlock.into()); |
| 193 | } |
| 194 | |
| 195 | match f() { |
| 196 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
| 197 | self.clear_readiness(ev); |
| 198 | Err(io::ErrorKind::WouldBlock.into()) |
| 199 | } |
| 200 | res => res, |
| 201 | } |
| 202 | } |
| 203 | |
| 204 | pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> { |
| 205 | let ev = self.shared.readiness(interest).await; |
| 206 | |
| 207 | if ev.is_shutdown { |
| 208 | return Err(gone()); |
| 209 | } |
| 210 | |
| 211 | Ok(ev) |
| 212 | } |
| 213 | |
| 214 | pub(crate) async fn async_io<R>( |
| 215 | &self, |
| 216 | interest: Interest, |
| 217 | mut f: impl FnMut() -> io::Result<R>, |
| 218 | ) -> io::Result<R> { |
| 219 | loop { |
| 220 | let event = self.readiness(interest).await?; |
| 221 | |
| 222 | let coop = std::future::poll_fn(crate::task::coop::poll_proceed).await; |
| 223 | |
| 224 | match f() { |
| 225 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
| 226 | self.clear_readiness(event); |
| 227 | } |
| 228 | x => { |
| 229 | coop.made_progress(); |
| 230 | return x; |
| 231 | } |
| 232 | } |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | fn handle(&self) -> &Handle { |
| 237 | self.handle.driver().io() |
| 238 | } |
| 239 | } |
| 240 | |
| 241 | impl Drop for Registration { |
| 242 | fn drop(&mut self) { |
| 243 | // It is possible for a cycle to be created between wakers stored in |
| 244 | // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this |
| 245 | // cycle, wakers are cleared. This is an imperfect solution as it is |
| 246 | // possible to store a `Registration` in a waker. In this case, the |
| 247 | // cycle would remain. |
| 248 | // |
| 249 | // See tokio-rs/tokio#3481 for more details. |
| 250 | self.shared.clear_wakers(); |
| 251 | } |
| 252 | } |
| 253 | |
| 254 | fn gone() -> io::Error { |
| 255 | io::Error::new( |
| 256 | kind:io::ErrorKind::Other, |
| 257 | error:crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, |
| 258 | ) |
| 259 | } |
| 260 | |