1 | //! Waking mechanism for threads blocked on channel operations. |
2 | |
3 | use super::context::Context; |
4 | use super::select::{Operation, Selected}; |
5 | use crate::ptr; |
6 | use crate::sync::Mutex; |
7 | use crate::sync::atomic::{AtomicBool, Ordering}; |
8 | |
9 | /// Represents a thread blocked on a specific channel operation. |
10 | pub(crate) struct Entry { |
11 | /// The operation. |
12 | pub(crate) oper: Operation, |
13 | |
14 | /// Optional packet. |
15 | pub(crate) packet: *mut (), |
16 | |
17 | /// Context associated with the thread owning this operation. |
18 | pub(crate) cx: Context, |
19 | } |
20 | |
21 | /// A queue of threads blocked on channel operations. |
22 | /// |
23 | /// This data structure is used by threads to register blocking operations and get woken up once |
24 | /// an operation becomes ready. |
25 | pub(crate) struct Waker { |
26 | /// A list of select operations. |
27 | selectors: Vec<Entry>, |
28 | |
29 | /// A list of operations waiting to be ready. |
30 | observers: Vec<Entry>, |
31 | } |
32 | |
33 | impl Waker { |
34 | /// Creates a new `Waker`. |
35 | #[inline ] |
36 | pub(crate) fn new() -> Self { |
37 | Waker { selectors: Vec::new(), observers: Vec::new() } |
38 | } |
39 | |
40 | /// Registers a select operation. |
41 | #[inline ] |
42 | pub(crate) fn register(&mut self, oper: Operation, cx: &Context) { |
43 | self.register_with_packet(oper, ptr::null_mut(), cx); |
44 | } |
45 | |
46 | /// Registers a select operation and a packet. |
47 | #[inline ] |
48 | pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) { |
49 | self.selectors.push(Entry { oper, packet, cx: cx.clone() }); |
50 | } |
51 | |
52 | /// Unregisters a select operation. |
53 | #[inline ] |
54 | pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> { |
55 | if let Some((i, _)) = |
56 | self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper) |
57 | { |
58 | let entry = self.selectors.remove(i); |
59 | Some(entry) |
60 | } else { |
61 | None |
62 | } |
63 | } |
64 | |
65 | /// Attempts to find another thread's entry, select the operation, and wake it up. |
66 | #[inline ] |
67 | pub(crate) fn try_select(&mut self) -> Option<Entry> { |
68 | if self.selectors.is_empty() { |
69 | None |
70 | } else { |
71 | let thread_id = current_thread_id(); |
72 | |
73 | self.selectors |
74 | .iter() |
75 | .position(|selector| { |
76 | // Does the entry belong to a different thread? |
77 | selector.cx.thread_id() != thread_id |
78 | && selector // Try selecting this operation. |
79 | .cx |
80 | .try_select(Selected::Operation(selector.oper)) |
81 | .is_ok() |
82 | && { |
83 | // Provide the packet. |
84 | selector.cx.store_packet(selector.packet); |
85 | // Wake the thread up. |
86 | selector.cx.unpark(); |
87 | true |
88 | } |
89 | }) |
90 | // Remove the entry from the queue to keep it clean and improve |
91 | // performance. |
92 | .map(|pos| self.selectors.remove(pos)) |
93 | } |
94 | } |
95 | |
96 | /// Notifies all operations waiting to be ready. |
97 | #[inline ] |
98 | pub(crate) fn notify(&mut self) { |
99 | for entry in self.observers.drain(..) { |
100 | if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() { |
101 | entry.cx.unpark(); |
102 | } |
103 | } |
104 | } |
105 | |
106 | /// Notifies all registered operations that the channel is disconnected. |
107 | #[inline ] |
108 | pub(crate) fn disconnect(&mut self) { |
109 | for entry in self.selectors.iter() { |
110 | if entry.cx.try_select(Selected::Disconnected).is_ok() { |
111 | // Wake the thread up. |
112 | // |
113 | // Here we don't remove the entry from the queue. Registered threads must |
114 | // unregister from the waker by themselves. They might also want to recover the |
115 | // packet value and destroy it, if necessary. |
116 | entry.cx.unpark(); |
117 | } |
118 | } |
119 | |
120 | self.notify(); |
121 | } |
122 | } |
123 | |
124 | impl Drop for Waker { |
125 | #[inline ] |
126 | fn drop(&mut self) { |
127 | debug_assert_eq!(self.selectors.len(), 0); |
128 | debug_assert_eq!(self.observers.len(), 0); |
129 | } |
130 | } |
131 | |
132 | /// A waker that can be shared among threads without locking. |
133 | /// |
134 | /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization. |
135 | pub(crate) struct SyncWaker { |
136 | /// The inner `Waker`. |
137 | inner: Mutex<Waker>, |
138 | |
139 | /// `true` if the waker is empty. |
140 | is_empty: AtomicBool, |
141 | } |
142 | |
143 | impl SyncWaker { |
144 | /// Creates a new `SyncWaker`. |
145 | #[inline ] |
146 | pub(crate) fn new() -> Self { |
147 | SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) } |
148 | } |
149 | |
150 | /// Registers the current thread with an operation. |
151 | #[inline ] |
152 | pub(crate) fn register(&self, oper: Operation, cx: &Context) { |
153 | let mut inner = self.inner.lock().unwrap(); |
154 | inner.register(oper, cx); |
155 | self.is_empty |
156 | .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); |
157 | } |
158 | |
159 | /// Unregisters an operation previously registered by the current thread. |
160 | #[inline ] |
161 | pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> { |
162 | let mut inner = self.inner.lock().unwrap(); |
163 | let entry = inner.unregister(oper); |
164 | self.is_empty |
165 | .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); |
166 | entry |
167 | } |
168 | |
169 | /// Attempts to find one thread (not the current one), select its operation, and wake it up. |
170 | #[inline ] |
171 | pub(crate) fn notify(&self) { |
172 | if !self.is_empty.load(Ordering::SeqCst) { |
173 | let mut inner = self.inner.lock().unwrap(); |
174 | if !self.is_empty.load(Ordering::SeqCst) { |
175 | inner.try_select(); |
176 | inner.notify(); |
177 | self.is_empty.store( |
178 | inner.selectors.is_empty() && inner.observers.is_empty(), |
179 | Ordering::SeqCst, |
180 | ); |
181 | } |
182 | } |
183 | } |
184 | |
185 | /// Notifies all threads that the channel is disconnected. |
186 | #[inline ] |
187 | pub(crate) fn disconnect(&self) { |
188 | let mut inner = self.inner.lock().unwrap(); |
189 | inner.disconnect(); |
190 | self.is_empty |
191 | .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); |
192 | } |
193 | } |
194 | |
195 | impl Drop for SyncWaker { |
196 | #[inline ] |
197 | fn drop(&mut self) { |
198 | debug_assert!(self.is_empty.load(Ordering::SeqCst)); |
199 | } |
200 | } |
201 | |
202 | /// Returns a unique id for the current thread. |
203 | #[inline ] |
204 | pub fn current_thread_id() -> usize { |
205 | // `u8` is not drop so this variable will be available during thread destruction, |
206 | // whereas `thread::current()` would not be |
207 | thread_local! { static DUMMY: u8 = const { 0 } } |
208 | DUMMY.with(|x: &u8| (x as *const u8).addr()) |
209 | } |
210 | |