1#![cfg_attr(not(feature = "net"), allow(dead_code))]
2
3use crate::io::interest::Interest;
4use crate::runtime::io::{Direction, Handle, ReadyEvent, ScheduledIo};
5use crate::runtime::scheduler;
6
7use mio::event::Source;
8use std::io;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12cfg_io_driver! {
13 /// Associates an I/O resource with the reactor instance that drives it.
14 ///
15 /// A registration represents an I/O resource registered with a Reactor such
16 /// that it will receive task notifications on readiness. This is the lowest
17 /// level API for integrating with a reactor.
18 ///
19 /// The association between an I/O resource is made by calling
20 /// [`new_with_interest_and_handle`].
21 /// Once the association is established, it remains established until the
22 /// registration instance is dropped.
23 ///
24 /// A registration instance represents two separate readiness streams. One
25 /// for the read readiness and one for write readiness. These streams are
26 /// independent and can be consumed from separate tasks.
27 ///
28 /// **Note**: while `Registration` is `Sync`, the caller must ensure that
29 /// there are at most two tasks that use a registration instance
30 /// concurrently. One task for [`poll_read_ready`] and one task for
31 /// [`poll_write_ready`]. While violating this requirement is "safe" from a
32 /// Rust memory safety point of view, it will result in unexpected behavior
33 /// in the form of lost notifications and tasks hanging.
34 ///
35 /// ## Platform-specific events
36 ///
37 /// `Registration` also allows receiving platform-specific `mio::Ready`
38 /// events. These events are included as part of the read readiness event
39 /// stream. The write readiness event stream is only for `Ready::writable()`
40 /// events.
41 ///
42 /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
43 /// [`poll_read_ready`]: method@Self::poll_read_ready`
44 /// [`poll_write_ready`]: method@Self::poll_write_ready`
45 #[derive(Debug)]
46 pub(crate) struct Registration {
47 /// Handle to the associated runtime.
48 ///
49 /// TODO: this can probably be moved into `ScheduledIo`.
50 handle: scheduler::Handle,
51
52 /// Reference to state stored by the driver.
53 shared: Arc<ScheduledIo>,
54 }
55}
56
57unsafe impl Send for Registration {}
58unsafe impl Sync for Registration {}
59
60// ===== impl Registration =====
61
62impl Registration {
63 /// Registers the I/O resource with the reactor for the provided handle, for
64 /// a specific `Interest`. This does not add `hup` or `error` so if you are
65 /// interested in those states, you will need to add them to the readiness
66 /// state passed to this function.
67 ///
68 /// # Return
69 ///
70 /// - `Ok` if the registration happened successfully
71 /// - `Err` if an error was encountered during registration
72 #[track_caller]
73 pub(crate) fn new_with_interest_and_handle(
74 io: &mut impl Source,
75 interest: Interest,
76 handle: scheduler::Handle,
77 ) -> io::Result<Registration> {
78 let shared = handle.driver().io().add_source(io, interest)?;
79
80 Ok(Registration { handle, shared })
81 }
82
83 /// Deregisters the I/O resource from the reactor it is associated with.
84 ///
85 /// This function must be called before the I/O resource associated with the
86 /// registration is dropped.
87 ///
88 /// Note that deregistering does not guarantee that the I/O resource can be
89 /// registered with a different reactor. Some I/O resource types can only be
90 /// associated with a single reactor instance for their lifetime.
91 ///
92 /// # Return
93 ///
94 /// If the deregistration was successful, `Ok` is returned. Any calls to
95 /// `Reactor::turn` that happen after a successful call to `deregister` will
96 /// no longer result in notifications getting sent for this registration.
97 ///
98 /// `Err` is returned if an error is encountered.
99 pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
100 self.handle().deregister_source(&self.shared, io)
101 }
102
103 pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
104 self.shared.clear_readiness(event);
105 }
106
107 // Uses the poll path, requiring the caller to ensure mutual exclusion for
108 // correctness. Only the last task to call this function is notified.
109 pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
110 self.poll_ready(cx, Direction::Read)
111 }
112
113 // Uses the poll path, requiring the caller to ensure mutual exclusion for
114 // correctness. Only the last task to call this function is notified.
115 pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
116 self.poll_ready(cx, Direction::Write)
117 }
118
119 // Uses the poll path, requiring the caller to ensure mutual exclusion for
120 // correctness. Only the last task to call this function is notified.
121 #[cfg(not(target_os = "wasi"))]
122 pub(crate) fn poll_read_io<R>(
123 &self,
124 cx: &mut Context<'_>,
125 f: impl FnMut() -> io::Result<R>,
126 ) -> Poll<io::Result<R>> {
127 self.poll_io(cx, Direction::Read, f)
128 }
129
130 // Uses the poll path, requiring the caller to ensure mutual exclusion for
131 // correctness. Only the last task to call this function is notified.
132 pub(crate) fn poll_write_io<R>(
133 &self,
134 cx: &mut Context<'_>,
135 f: impl FnMut() -> io::Result<R>,
136 ) -> Poll<io::Result<R>> {
137 self.poll_io(cx, Direction::Write, f)
138 }
139
140 /// Polls for events on the I/O resource's `direction` readiness stream.
141 ///
142 /// If called with a task context, notify the task when a new event is
143 /// received.
144 fn poll_ready(
145 &self,
146 cx: &mut Context<'_>,
147 direction: Direction,
148 ) -> Poll<io::Result<ReadyEvent>> {
149 ready!(crate::trace::trace_leaf(cx));
150 // Keep track of task budget
151 let coop = ready!(crate::runtime::coop::poll_proceed(cx));
152 let ev = ready!(self.shared.poll_readiness(cx, direction));
153
154 if ev.is_shutdown {
155 return Poll::Ready(Err(gone()));
156 }
157
158 coop.made_progress();
159 Poll::Ready(Ok(ev))
160 }
161
162 fn poll_io<R>(
163 &self,
164 cx: &mut Context<'_>,
165 direction: Direction,
166 mut f: impl FnMut() -> io::Result<R>,
167 ) -> Poll<io::Result<R>> {
168 loop {
169 let ev = ready!(self.poll_ready(cx, direction))?;
170
171 match f() {
172 Ok(ret) => {
173 return Poll::Ready(Ok(ret));
174 }
175 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
176 self.clear_readiness(ev);
177 }
178 Err(e) => return Poll::Ready(Err(e)),
179 }
180 }
181 }
182
183 pub(crate) fn try_io<R>(
184 &self,
185 interest: Interest,
186 f: impl FnOnce() -> io::Result<R>,
187 ) -> io::Result<R> {
188 let ev = self.shared.ready_event(interest);
189
190 // Don't attempt the operation if the resource is not ready.
191 if ev.ready.is_empty() {
192 return Err(io::ErrorKind::WouldBlock.into());
193 }
194
195 match f() {
196 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
197 self.clear_readiness(ev);
198 Err(io::ErrorKind::WouldBlock.into())
199 }
200 res => res,
201 }
202 }
203
204 pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
205 let ev = self.shared.readiness(interest).await;
206
207 if ev.is_shutdown {
208 return Err(gone());
209 }
210
211 Ok(ev)
212 }
213
214 pub(crate) async fn async_io<R>(
215 &self,
216 interest: Interest,
217 mut f: impl FnMut() -> io::Result<R>,
218 ) -> io::Result<R> {
219 loop {
220 let event = self.readiness(interest).await?;
221
222 match f() {
223 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
224 self.clear_readiness(event);
225 }
226 x => return x,
227 }
228 }
229 }
230
231 fn handle(&self) -> &Handle {
232 self.handle.driver().io()
233 }
234}
235
236impl Drop for Registration {
237 fn drop(&mut self) {
238 // It is possible for a cycle to be created between wakers stored in
239 // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this
240 // cycle, wakers are cleared. This is an imperfect solution as it is
241 // possible to store a `Registration` in a waker. In this case, the
242 // cycle would remain.
243 //
244 // See tokio-rs/tokio#3481 for more details.
245 self.shared.clear_wakers();
246 }
247}
248
249fn gone() -> io::Error {
250 io::Error::new(
251 io::ErrorKind::Other,
252 crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
253 )
254}
255