1use crate::loom::sync::atomic::AtomicUsize;
2use crate::runtime::io::ScheduledIo;
3use crate::util::linked_list::{self, LinkedList};
4
5use std::io;
6use std::ptr::NonNull;
7use std::sync::atomic::Ordering::{Acquire, Release};
8use std::sync::Arc;
9
10pub(super) struct RegistrationSet {
11 num_pending_release: AtomicUsize,
12}
13
14pub(super) struct Synced {
15 // True when the I/O driver shutdown. At this point, no more registrations
16 // should be added to the set.
17 is_shutdown: bool,
18
19 // List of all registrations tracked by the set
20 registrations: LinkedList<Arc<ScheduledIo>, ScheduledIo>,
21
22 // Registrations that are pending drop. When a `Registration` is dropped, it
23 // stores its `ScheduledIo` in this list. The I/O driver is responsible for
24 // dropping it. This ensures the `ScheduledIo` is not freed while it can
25 // still be included in an I/O event.
26 pending_release: Vec<Arc<ScheduledIo>>,
27}
28
29impl RegistrationSet {
30 pub(super) fn new() -> (RegistrationSet, Synced) {
31 let set = RegistrationSet {
32 num_pending_release: AtomicUsize::new(0),
33 };
34
35 let synced = Synced {
36 is_shutdown: false,
37 registrations: LinkedList::new(),
38 pending_release: Vec::with_capacity(16),
39 };
40
41 (set, synced)
42 }
43
44 pub(super) fn is_shutdown(&self, synced: &Synced) -> bool {
45 synced.is_shutdown
46 }
47
48 /// Returns `true` if there are registrations that need to be released
49 pub(super) fn needs_release(&self) -> bool {
50 self.num_pending_release.load(Acquire) != 0
51 }
52
53 pub(super) fn allocate(&self, synced: &mut Synced) -> io::Result<Arc<ScheduledIo>> {
54 if synced.is_shutdown {
55 return Err(io::Error::new(
56 io::ErrorKind::Other,
57 crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
58 ));
59 }
60
61 let ret = Arc::new(ScheduledIo::default());
62
63 // Push a ref into the list of all resources.
64 synced.registrations.push_front(ret.clone());
65
66 Ok(ret)
67 }
68
69 // Returns `true` if the caller should unblock the I/O driver to purge
70 // registrations pending release.
71 pub(super) fn deregister(&self, synced: &mut Synced, registration: &Arc<ScheduledIo>) -> bool {
72 // Kind of arbitrary, but buffering 16 `ScheduledIo`s doesn't seem like much
73 const NOTIFY_AFTER: usize = 16;
74
75 synced.pending_release.push(registration.clone());
76
77 let len = synced.pending_release.len();
78 self.num_pending_release.store(len, Release);
79
80 len == NOTIFY_AFTER
81 }
82
83 pub(super) fn shutdown(&self, synced: &mut Synced) -> Vec<Arc<ScheduledIo>> {
84 if synced.is_shutdown {
85 return vec![];
86 }
87
88 synced.is_shutdown = true;
89 synced.pending_release.clear();
90
91 // Building a vec of all outstanding I/O handles could be expensive, but
92 // this is the shutdown operation. In theory, shutdowns should be
93 // "clean" with no outstanding I/O resources. Even if it is slow, we
94 // aren't optimizing for shutdown.
95 let mut ret = vec![];
96
97 while let Some(io) = synced.registrations.pop_back() {
98 ret.push(io);
99 }
100
101 ret
102 }
103
104 pub(super) fn release(&self, synced: &mut Synced) {
105 for io in synced.pending_release.drain(..) {
106 // safety: the registration is part of our list
107 let _ = unsafe { synced.registrations.remove(io.as_ref().into()) };
108 }
109
110 self.num_pending_release.store(0, Release);
111 }
112}
113
114// Safety: `Arc` pins the inner data
115unsafe impl linked_list::Link for Arc<ScheduledIo> {
116 type Handle = Arc<ScheduledIo>;
117 type Target = ScheduledIo;
118
119 fn as_raw(handle: &Self::Handle) -> NonNull<ScheduledIo> {
120 // safety: Arc::as_ptr never returns null
121 unsafe { NonNull::new_unchecked(Arc::as_ptr(handle) as *mut _) }
122 }
123
124 unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Arc<ScheduledIo> {
125 // safety: the linked list currently owns a ref count
126 unsafe { Arc::from_raw(ptr.as_ptr() as *const _) }
127 }
128
129 unsafe fn pointers(
130 target: NonNull<Self::Target>,
131 ) -> NonNull<linked_list::Pointers<ScheduledIo>> {
132 NonNull::new_unchecked(target.as_ref().linked_list_pointers.get())
133 }
134}
135