1#![cfg_attr(not(feature = "net"), allow(dead_code))]
2
3use crate::io::interest::Interest;
4use crate::runtime::io::{Direction, Handle, ReadyEvent, ScheduledIo};
5use crate::runtime::scheduler;
6use crate::util::slab;
7
8use mio::event::Source;
9use std::io;
10use std::task::{Context, Poll};
11
12cfg_io_driver! {
13 /// Associates an I/O resource with the reactor instance that drives it.
14 ///
15 /// A registration represents an I/O resource registered with a Reactor such
16 /// that it will receive task notifications on readiness. This is the lowest
17 /// level API for integrating with a reactor.
18 ///
19 /// The association between an I/O resource is made by calling
20 /// [`new_with_interest_and_handle`].
21 /// Once the association is established, it remains established until the
22 /// registration instance is dropped.
23 ///
24 /// A registration instance represents two separate readiness streams. One
25 /// for the read readiness and one for write readiness. These streams are
26 /// independent and can be consumed from separate tasks.
27 ///
28 /// **Note**: while `Registration` is `Sync`, the caller must ensure that
29 /// there are at most two tasks that use a registration instance
30 /// concurrently. One task for [`poll_read_ready`] and one task for
31 /// [`poll_write_ready`]. While violating this requirement is "safe" from a
32 /// Rust memory safety point of view, it will result in unexpected behavior
33 /// in the form of lost notifications and tasks hanging.
34 ///
35 /// ## Platform-specific events
36 ///
37 /// `Registration` also allows receiving platform-specific `mio::Ready`
38 /// events. These events are included as part of the read readiness event
39 /// stream. The write readiness event stream is only for `Ready::writable()`
40 /// events.
41 ///
42 /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
43 /// [`poll_read_ready`]: method@Self::poll_read_ready`
44 /// [`poll_write_ready`]: method@Self::poll_write_ready`
45 #[derive(Debug)]
46 pub(crate) struct Registration {
47 /// Handle to the associated runtime.
48 handle: scheduler::Handle,
49
50 /// Reference to state stored by the driver.
51 shared: slab::Ref<ScheduledIo>,
52 }
53}
54
55unsafe impl Send for Registration {}
56unsafe impl Sync for Registration {}
57
58// ===== impl Registration =====
59
60impl Registration {
61 /// Registers the I/O resource with the reactor for the provided handle, for
62 /// a specific `Interest`. This does not add `hup` or `error` so if you are
63 /// interested in those states, you will need to add them to the readiness
64 /// state passed to this function.
65 ///
66 /// # Return
67 ///
68 /// - `Ok` if the registration happened successfully
69 /// - `Err` if an error was encountered during registration
70 #[track_caller]
71 pub(crate) fn new_with_interest_and_handle(
72 io: &mut impl Source,
73 interest: Interest,
74 handle: scheduler::Handle,
75 ) -> io::Result<Registration> {
76 let shared = handle.driver().io().add_source(io, interest)?;
77
78 Ok(Registration { handle, shared })
79 }
80
81 /// Deregisters the I/O resource from the reactor it is associated with.
82 ///
83 /// This function must be called before the I/O resource associated with the
84 /// registration is dropped.
85 ///
86 /// Note that deregistering does not guarantee that the I/O resource can be
87 /// registered with a different reactor. Some I/O resource types can only be
88 /// associated with a single reactor instance for their lifetime.
89 ///
90 /// # Return
91 ///
92 /// If the deregistration was successful, `Ok` is returned. Any calls to
93 /// `Reactor::turn` that happen after a successful call to `deregister` will
94 /// no longer result in notifications getting sent for this registration.
95 ///
96 /// `Err` is returned if an error is encountered.
97 pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
98 self.handle().deregister_source(io)
99 }
100
101 pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
102 self.shared.clear_readiness(event);
103 }
104
105 // Uses the poll path, requiring the caller to ensure mutual exclusion for
106 // correctness. Only the last task to call this function is notified.
107 pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
108 self.poll_ready(cx, Direction::Read)
109 }
110
111 // Uses the poll path, requiring the caller to ensure mutual exclusion for
112 // correctness. Only the last task to call this function is notified.
113 pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
114 self.poll_ready(cx, Direction::Write)
115 }
116
117 // Uses the poll path, requiring the caller to ensure mutual exclusion for
118 // correctness. Only the last task to call this function is notified.
119 #[cfg(not(tokio_wasi))]
120 pub(crate) fn poll_read_io<R>(
121 &self,
122 cx: &mut Context<'_>,
123 f: impl FnMut() -> io::Result<R>,
124 ) -> Poll<io::Result<R>> {
125 self.poll_io(cx, Direction::Read, f)
126 }
127
128 // Uses the poll path, requiring the caller to ensure mutual exclusion for
129 // correctness. Only the last task to call this function is notified.
130 pub(crate) fn poll_write_io<R>(
131 &self,
132 cx: &mut Context<'_>,
133 f: impl FnMut() -> io::Result<R>,
134 ) -> Poll<io::Result<R>> {
135 self.poll_io(cx, Direction::Write, f)
136 }
137
138 /// Polls for events on the I/O resource's `direction` readiness stream.
139 ///
140 /// If called with a task context, notify the task when a new event is
141 /// received.
142 fn poll_ready(
143 &self,
144 cx: &mut Context<'_>,
145 direction: Direction,
146 ) -> Poll<io::Result<ReadyEvent>> {
147 ready!(crate::trace::trace_leaf(cx));
148 // Keep track of task budget
149 let coop = ready!(crate::runtime::coop::poll_proceed(cx));
150 let ev = ready!(self.shared.poll_readiness(cx, direction));
151
152 if ev.is_shutdown {
153 return Poll::Ready(Err(gone()));
154 }
155
156 coop.made_progress();
157 Poll::Ready(Ok(ev))
158 }
159
160 fn poll_io<R>(
161 &self,
162 cx: &mut Context<'_>,
163 direction: Direction,
164 mut f: impl FnMut() -> io::Result<R>,
165 ) -> Poll<io::Result<R>> {
166 loop {
167 let ev = ready!(self.poll_ready(cx, direction))?;
168
169 match f() {
170 Ok(ret) => {
171 return Poll::Ready(Ok(ret));
172 }
173 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
174 self.clear_readiness(ev);
175 }
176 Err(e) => return Poll::Ready(Err(e)),
177 }
178 }
179 }
180
181 pub(crate) fn try_io<R>(
182 &self,
183 interest: Interest,
184 f: impl FnOnce() -> io::Result<R>,
185 ) -> io::Result<R> {
186 let ev = self.shared.ready_event(interest);
187
188 // Don't attempt the operation if the resource is not ready.
189 if ev.ready.is_empty() {
190 return Err(io::ErrorKind::WouldBlock.into());
191 }
192
193 match f() {
194 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
195 self.clear_readiness(ev);
196 Err(io::ErrorKind::WouldBlock.into())
197 }
198 res => res,
199 }
200 }
201
202 fn handle(&self) -> &Handle {
203 self.handle.driver().io()
204 }
205}
206
207impl Drop for Registration {
208 fn drop(&mut self) {
209 // It is possible for a cycle to be created between wakers stored in
210 // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this
211 // cycle, wakers are cleared. This is an imperfect solution as it is
212 // possible to store a `Registration` in a waker. In this case, the
213 // cycle would remain.
214 //
215 // See tokio-rs/tokio#3481 for more details.
216 self.shared.clear_wakers();
217 }
218}
219
220fn gone() -> io::Error {
221 io::Error::new(
222 kind:io::ErrorKind::Other,
223 crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
224 )
225}
226
227cfg_io_readiness! {
228 impl Registration {
229 pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
230 let ev = self.shared.readiness(interest).await;
231
232 if ev.is_shutdown {
233 return Err(gone())
234 }
235
236 Ok(ev)
237 }
238
239 pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
240 loop {
241 let event = self.readiness(interest).await?;
242
243 match f() {
244 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
245 self.clear_readiness(event);
246 }
247 x => return x,
248 }
249 }
250 }
251 }
252}
253