1 | #![cfg_attr (not(feature = "net" ), allow(dead_code))] |
2 | |
3 | use crate::io::interest::Interest; |
4 | use crate::runtime::io::{Direction, Handle, ReadyEvent, ScheduledIo}; |
5 | use crate::runtime::scheduler; |
6 | |
7 | use mio::event::Source; |
8 | use std::io; |
9 | use std::sync::Arc; |
10 | use std::task::{Context, Poll}; |
11 | |
12 | cfg_io_driver! { |
13 | /// Associates an I/O resource with the reactor instance that drives it. |
14 | /// |
15 | /// A registration represents an I/O resource registered with a Reactor such |
16 | /// that it will receive task notifications on readiness. This is the lowest |
17 | /// level API for integrating with a reactor. |
18 | /// |
19 | /// The association between an I/O resource is made by calling |
20 | /// [`new_with_interest_and_handle`]. |
21 | /// Once the association is established, it remains established until the |
22 | /// registration instance is dropped. |
23 | /// |
24 | /// A registration instance represents two separate readiness streams. One |
25 | /// for the read readiness and one for write readiness. These streams are |
26 | /// independent and can be consumed from separate tasks. |
27 | /// |
28 | /// **Note**: while `Registration` is `Sync`, the caller must ensure that |
29 | /// there are at most two tasks that use a registration instance |
30 | /// concurrently. One task for [`poll_read_ready`] and one task for |
31 | /// [`poll_write_ready`]. While violating this requirement is "safe" from a |
32 | /// Rust memory safety point of view, it will result in unexpected behavior |
33 | /// in the form of lost notifications and tasks hanging. |
34 | /// |
35 | /// ## Platform-specific events |
36 | /// |
37 | /// `Registration` also allows receiving platform-specific `mio::Ready` |
38 | /// events. These events are included as part of the read readiness event |
39 | /// stream. The write readiness event stream is only for `Ready::writable()` |
40 | /// events. |
41 | /// |
42 | /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle |
43 | /// [`poll_read_ready`]: method@Self::poll_read_ready` |
44 | /// [`poll_write_ready`]: method@Self::poll_write_ready` |
45 | #[derive(Debug)] |
46 | pub(crate) struct Registration { |
47 | /// Handle to the associated runtime. |
48 | /// |
49 | /// TODO: this can probably be moved into `ScheduledIo`. |
50 | handle: scheduler::Handle, |
51 | |
52 | /// Reference to state stored by the driver. |
53 | shared: Arc<ScheduledIo>, |
54 | } |
55 | } |
56 | |
57 | unsafe impl Send for Registration {} |
58 | unsafe impl Sync for Registration {} |
59 | |
60 | // ===== impl Registration ===== |
61 | |
62 | impl Registration { |
63 | /// Registers the I/O resource with the reactor for the provided handle, for |
64 | /// a specific `Interest`. This does not add `hup` or `error` so if you are |
65 | /// interested in those states, you will need to add them to the readiness |
66 | /// state passed to this function. |
67 | /// |
68 | /// # Return |
69 | /// |
70 | /// - `Ok` if the registration happened successfully |
71 | /// - `Err` if an error was encountered during registration |
72 | #[track_caller ] |
73 | pub(crate) fn new_with_interest_and_handle( |
74 | io: &mut impl Source, |
75 | interest: Interest, |
76 | handle: scheduler::Handle, |
77 | ) -> io::Result<Registration> { |
78 | let shared = handle.driver().io().add_source(io, interest)?; |
79 | |
80 | Ok(Registration { handle, shared }) |
81 | } |
82 | |
83 | /// Deregisters the I/O resource from the reactor it is associated with. |
84 | /// |
85 | /// This function must be called before the I/O resource associated with the |
86 | /// registration is dropped. |
87 | /// |
88 | /// Note that deregistering does not guarantee that the I/O resource can be |
89 | /// registered with a different reactor. Some I/O resource types can only be |
90 | /// associated with a single reactor instance for their lifetime. |
91 | /// |
92 | /// # Return |
93 | /// |
94 | /// If the deregistration was successful, `Ok` is returned. Any calls to |
95 | /// `Reactor::turn` that happen after a successful call to `deregister` will |
96 | /// no longer result in notifications getting sent for this registration. |
97 | /// |
98 | /// `Err` is returned if an error is encountered. |
99 | pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { |
100 | self.handle().deregister_source(&self.shared, io) |
101 | } |
102 | |
103 | pub(crate) fn clear_readiness(&self, event: ReadyEvent) { |
104 | self.shared.clear_readiness(event); |
105 | } |
106 | |
107 | // Uses the poll path, requiring the caller to ensure mutual exclusion for |
108 | // correctness. Only the last task to call this function is notified. |
109 | pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { |
110 | self.poll_ready(cx, Direction::Read) |
111 | } |
112 | |
113 | // Uses the poll path, requiring the caller to ensure mutual exclusion for |
114 | // correctness. Only the last task to call this function is notified. |
115 | pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { |
116 | self.poll_ready(cx, Direction::Write) |
117 | } |
118 | |
119 | // Uses the poll path, requiring the caller to ensure mutual exclusion for |
120 | // correctness. Only the last task to call this function is notified. |
121 | #[cfg (not(target_os = "wasi" ))] |
122 | pub(crate) fn poll_read_io<R>( |
123 | &self, |
124 | cx: &mut Context<'_>, |
125 | f: impl FnMut() -> io::Result<R>, |
126 | ) -> Poll<io::Result<R>> { |
127 | self.poll_io(cx, Direction::Read, f) |
128 | } |
129 | |
130 | // Uses the poll path, requiring the caller to ensure mutual exclusion for |
131 | // correctness. Only the last task to call this function is notified. |
132 | pub(crate) fn poll_write_io<R>( |
133 | &self, |
134 | cx: &mut Context<'_>, |
135 | f: impl FnMut() -> io::Result<R>, |
136 | ) -> Poll<io::Result<R>> { |
137 | self.poll_io(cx, Direction::Write, f) |
138 | } |
139 | |
140 | /// Polls for events on the I/O resource's `direction` readiness stream. |
141 | /// |
142 | /// If called with a task context, notify the task when a new event is |
143 | /// received. |
144 | fn poll_ready( |
145 | &self, |
146 | cx: &mut Context<'_>, |
147 | direction: Direction, |
148 | ) -> Poll<io::Result<ReadyEvent>> { |
149 | ready!(crate::trace::trace_leaf(cx)); |
150 | // Keep track of task budget |
151 | let coop = ready!(crate::runtime::coop::poll_proceed(cx)); |
152 | let ev = ready!(self.shared.poll_readiness(cx, direction)); |
153 | |
154 | if ev.is_shutdown { |
155 | return Poll::Ready(Err(gone())); |
156 | } |
157 | |
158 | coop.made_progress(); |
159 | Poll::Ready(Ok(ev)) |
160 | } |
161 | |
162 | fn poll_io<R>( |
163 | &self, |
164 | cx: &mut Context<'_>, |
165 | direction: Direction, |
166 | mut f: impl FnMut() -> io::Result<R>, |
167 | ) -> Poll<io::Result<R>> { |
168 | loop { |
169 | let ev = ready!(self.poll_ready(cx, direction))?; |
170 | |
171 | match f() { |
172 | Ok(ret) => { |
173 | return Poll::Ready(Ok(ret)); |
174 | } |
175 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
176 | self.clear_readiness(ev); |
177 | } |
178 | Err(e) => return Poll::Ready(Err(e)), |
179 | } |
180 | } |
181 | } |
182 | |
183 | pub(crate) fn try_io<R>( |
184 | &self, |
185 | interest: Interest, |
186 | f: impl FnOnce() -> io::Result<R>, |
187 | ) -> io::Result<R> { |
188 | let ev = self.shared.ready_event(interest); |
189 | |
190 | // Don't attempt the operation if the resource is not ready. |
191 | if ev.ready.is_empty() { |
192 | return Err(io::ErrorKind::WouldBlock.into()); |
193 | } |
194 | |
195 | match f() { |
196 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
197 | self.clear_readiness(ev); |
198 | Err(io::ErrorKind::WouldBlock.into()) |
199 | } |
200 | res => res, |
201 | } |
202 | } |
203 | |
204 | pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> { |
205 | let ev = self.shared.readiness(interest).await; |
206 | |
207 | if ev.is_shutdown { |
208 | return Err(gone()); |
209 | } |
210 | |
211 | Ok(ev) |
212 | } |
213 | |
214 | pub(crate) async fn async_io<R>( |
215 | &self, |
216 | interest: Interest, |
217 | mut f: impl FnMut() -> io::Result<R>, |
218 | ) -> io::Result<R> { |
219 | loop { |
220 | let event = self.readiness(interest).await?; |
221 | |
222 | match f() { |
223 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
224 | self.clear_readiness(event); |
225 | } |
226 | x => return x, |
227 | } |
228 | } |
229 | } |
230 | |
231 | fn handle(&self) -> &Handle { |
232 | self.handle.driver().io() |
233 | } |
234 | } |
235 | |
236 | impl Drop for Registration { |
237 | fn drop(&mut self) { |
238 | // It is possible for a cycle to be created between wakers stored in |
239 | // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this |
240 | // cycle, wakers are cleared. This is an imperfect solution as it is |
241 | // possible to store a `Registration` in a waker. In this case, the |
242 | // cycle would remain. |
243 | // |
244 | // See tokio-rs/tokio#3481 for more details. |
245 | self.shared.clear_wakers(); |
246 | } |
247 | } |
248 | |
249 | fn gone() -> io::Error { |
250 | io::Error::new( |
251 | io::ErrorKind::Other, |
252 | crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, |
253 | ) |
254 | } |
255 | |