1 | use crate::loom::sync::atomic::AtomicUsize; |
2 | use crate::runtime::io::ScheduledIo; |
3 | use crate::util::linked_list::{self, LinkedList}; |
4 | |
5 | use std::io; |
6 | use std::ptr::NonNull; |
7 | use std::sync::atomic::Ordering::{Acquire, Release}; |
8 | use std::sync::Arc; |
9 | |
10 | pub(super) struct RegistrationSet { |
11 | num_pending_release: AtomicUsize, |
12 | } |
13 | |
14 | pub(super) struct Synced { |
15 | // True when the I/O driver shutdown. At this point, no more registrations |
16 | // should be added to the set. |
17 | is_shutdown: bool, |
18 | |
19 | // List of all registrations tracked by the set |
20 | registrations: LinkedList<Arc<ScheduledIo>, ScheduledIo>, |
21 | |
22 | // Registrations that are pending drop. When a `Registration` is dropped, it |
23 | // stores its `ScheduledIo` in this list. The I/O driver is responsible for |
24 | // dropping it. This ensures the `ScheduledIo` is not freed while it can |
25 | // still be included in an I/O event. |
26 | pending_release: Vec<Arc<ScheduledIo>>, |
27 | } |
28 | |
29 | impl RegistrationSet { |
30 | pub(super) fn new() -> (RegistrationSet, Synced) { |
31 | let set = RegistrationSet { |
32 | num_pending_release: AtomicUsize::new(0), |
33 | }; |
34 | |
35 | let synced = Synced { |
36 | is_shutdown: false, |
37 | registrations: LinkedList::new(), |
38 | pending_release: Vec::with_capacity(16), |
39 | }; |
40 | |
41 | (set, synced) |
42 | } |
43 | |
44 | pub(super) fn is_shutdown(&self, synced: &Synced) -> bool { |
45 | synced.is_shutdown |
46 | } |
47 | |
48 | /// Returns `true` if there are registrations that need to be released |
49 | pub(super) fn needs_release(&self) -> bool { |
50 | self.num_pending_release.load(Acquire) != 0 |
51 | } |
52 | |
53 | pub(super) fn allocate(&self, synced: &mut Synced) -> io::Result<Arc<ScheduledIo>> { |
54 | if synced.is_shutdown { |
55 | return Err(io::Error::new( |
56 | io::ErrorKind::Other, |
57 | crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, |
58 | )); |
59 | } |
60 | |
61 | let ret = Arc::new(ScheduledIo::default()); |
62 | |
63 | // Push a ref into the list of all resources. |
64 | synced.registrations.push_front(ret.clone()); |
65 | |
66 | Ok(ret) |
67 | } |
68 | |
69 | // Returns `true` if the caller should unblock the I/O driver to purge |
70 | // registrations pending release. |
71 | pub(super) fn deregister(&self, synced: &mut Synced, registration: &Arc<ScheduledIo>) -> bool { |
72 | // Kind of arbitrary, but buffering 16 `ScheduledIo`s doesn't seem like much |
73 | const NOTIFY_AFTER: usize = 16; |
74 | |
75 | synced.pending_release.push(registration.clone()); |
76 | |
77 | let len = synced.pending_release.len(); |
78 | self.num_pending_release.store(len, Release); |
79 | |
80 | len == NOTIFY_AFTER |
81 | } |
82 | |
83 | pub(super) fn shutdown(&self, synced: &mut Synced) -> Vec<Arc<ScheduledIo>> { |
84 | if synced.is_shutdown { |
85 | return vec![]; |
86 | } |
87 | |
88 | synced.is_shutdown = true; |
89 | synced.pending_release.clear(); |
90 | |
91 | // Building a vec of all outstanding I/O handles could be expensive, but |
92 | // this is the shutdown operation. In theory, shutdowns should be |
93 | // "clean" with no outstanding I/O resources. Even if it is slow, we |
94 | // aren't optimizing for shutdown. |
95 | let mut ret = vec![]; |
96 | |
97 | while let Some(io) = synced.registrations.pop_back() { |
98 | ret.push(io); |
99 | } |
100 | |
101 | ret |
102 | } |
103 | |
104 | pub(super) fn release(&self, synced: &mut Synced) { |
105 | for io in synced.pending_release.drain(..) { |
106 | // safety: the registration is part of our list |
107 | let _ = unsafe { synced.registrations.remove(io.as_ref().into()) }; |
108 | } |
109 | |
110 | self.num_pending_release.store(0, Release); |
111 | } |
112 | } |
113 | |
114 | // Safety: `Arc` pins the inner data |
115 | unsafe impl linked_list::Link for Arc<ScheduledIo> { |
116 | type Handle = Arc<ScheduledIo>; |
117 | type Target = ScheduledIo; |
118 | |
119 | fn as_raw(handle: &Self::Handle) -> NonNull<ScheduledIo> { |
120 | // safety: Arc::as_ptr never returns null |
121 | unsafe { NonNull::new_unchecked(Arc::as_ptr(handle) as *mut _) } |
122 | } |
123 | |
124 | unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Arc<ScheduledIo> { |
125 | // safety: the linked list currently owns a ref count |
126 | unsafe { Arc::from_raw(ptr.as_ptr() as *const _) } |
127 | } |
128 | |
129 | unsafe fn pointers( |
130 | target: NonNull<Self::Target>, |
131 | ) -> NonNull<linked_list::Pointers<ScheduledIo>> { |
132 | NonNull::new_unchecked(target.as_ref().linked_list_pointers.get()) |
133 | } |
134 | } |
135 | |