| 1 | //! Waking mechanism for threads blocked on channel operations. |
| 2 | |
| 3 | use std::ptr; |
| 4 | use std::sync::atomic::{AtomicBool, Ordering}; |
| 5 | use std::sync::Mutex; |
| 6 | use std::thread::{self, ThreadId}; |
| 7 | use std::vec::Vec; |
| 8 | |
| 9 | use crate::context::Context; |
| 10 | use crate::select::{Operation, Selected}; |
| 11 | |
| 12 | /// Represents a thread blocked on a specific channel operation. |
| 13 | pub(crate) struct Entry { |
| 14 | /// The operation. |
| 15 | pub(crate) oper: Operation, |
| 16 | |
| 17 | /// Optional packet. |
| 18 | pub(crate) packet: *mut (), |
| 19 | |
| 20 | /// Context associated with the thread owning this operation. |
| 21 | pub(crate) cx: Context, |
| 22 | } |
| 23 | |
| 24 | /// A queue of threads blocked on channel operations. |
| 25 | /// |
| 26 | /// This data structure is used by threads to register blocking operations and get woken up once |
| 27 | /// an operation becomes ready. |
| 28 | pub(crate) struct Waker { |
| 29 | /// A list of select operations. |
| 30 | selectors: Vec<Entry>, |
| 31 | |
| 32 | /// A list of operations waiting to be ready. |
| 33 | observers: Vec<Entry>, |
| 34 | } |
| 35 | |
| 36 | impl Waker { |
| 37 | /// Creates a new `Waker`. |
| 38 | #[inline ] |
| 39 | pub(crate) fn new() -> Self { |
| 40 | Waker { |
| 41 | selectors: Vec::new(), |
| 42 | observers: Vec::new(), |
| 43 | } |
| 44 | } |
| 45 | |
| 46 | /// Registers a select operation. |
| 47 | #[inline ] |
| 48 | pub(crate) fn register(&mut self, oper: Operation, cx: &Context) { |
| 49 | self.register_with_packet(oper, ptr::null_mut(), cx); |
| 50 | } |
| 51 | |
| 52 | /// Registers a select operation and a packet. |
| 53 | #[inline ] |
| 54 | pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) { |
| 55 | self.selectors.push(Entry { |
| 56 | oper, |
| 57 | packet, |
| 58 | cx: cx.clone(), |
| 59 | }); |
| 60 | } |
| 61 | |
| 62 | /// Unregisters a select operation. |
| 63 | #[inline ] |
| 64 | pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> { |
| 65 | if let Some((i, _)) = self |
| 66 | .selectors |
| 67 | .iter() |
| 68 | .enumerate() |
| 69 | .find(|&(_, entry)| entry.oper == oper) |
| 70 | { |
| 71 | let entry = self.selectors.remove(i); |
| 72 | Some(entry) |
| 73 | } else { |
| 74 | None |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | /// Attempts to find another thread's entry, select the operation, and wake it up. |
| 79 | #[inline ] |
| 80 | pub(crate) fn try_select(&mut self) -> Option<Entry> { |
| 81 | if self.selectors.is_empty() { |
| 82 | None |
| 83 | } else { |
| 84 | let thread_id = current_thread_id(); |
| 85 | |
| 86 | self.selectors |
| 87 | .iter() |
| 88 | .position(|selector| { |
| 89 | // Does the entry belong to a different thread? |
| 90 | selector.cx.thread_id() != thread_id |
| 91 | && selector // Try selecting this operation. |
| 92 | .cx |
| 93 | .try_select(Selected::Operation(selector.oper)) |
| 94 | .is_ok() |
| 95 | && { |
| 96 | // Provide the packet. |
| 97 | selector.cx.store_packet(selector.packet); |
| 98 | // Wake the thread up. |
| 99 | selector.cx.unpark(); |
| 100 | true |
| 101 | } |
| 102 | }) |
| 103 | // Remove the entry from the queue to keep it clean and improve |
| 104 | // performance. |
| 105 | .map(|pos| self.selectors.remove(pos)) |
| 106 | } |
| 107 | } |
| 108 | |
| 109 | /// Returns `true` if there is an entry which can be selected by the current thread. |
| 110 | #[inline ] |
| 111 | pub(crate) fn can_select(&self) -> bool { |
| 112 | if self.selectors.is_empty() { |
| 113 | false |
| 114 | } else { |
| 115 | let thread_id = current_thread_id(); |
| 116 | |
| 117 | self.selectors.iter().any(|entry| { |
| 118 | entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting |
| 119 | }) |
| 120 | } |
| 121 | } |
| 122 | |
| 123 | /// Registers an operation waiting to be ready. |
| 124 | #[inline ] |
| 125 | pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) { |
| 126 | self.observers.push(Entry { |
| 127 | oper, |
| 128 | packet: ptr::null_mut(), |
| 129 | cx: cx.clone(), |
| 130 | }); |
| 131 | } |
| 132 | |
| 133 | /// Unregisters an operation waiting to be ready. |
| 134 | #[inline ] |
| 135 | pub(crate) fn unwatch(&mut self, oper: Operation) { |
| 136 | self.observers.retain(|e| e.oper != oper); |
| 137 | } |
| 138 | |
| 139 | /// Notifies all operations waiting to be ready. |
| 140 | #[inline ] |
| 141 | pub(crate) fn notify(&mut self) { |
| 142 | for entry in self.observers.drain(..) { |
| 143 | if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() { |
| 144 | entry.cx.unpark(); |
| 145 | } |
| 146 | } |
| 147 | } |
| 148 | |
| 149 | /// Notifies all registered operations that the channel is disconnected. |
| 150 | #[inline ] |
| 151 | pub(crate) fn disconnect(&mut self) { |
| 152 | for entry in self.selectors.iter() { |
| 153 | if entry.cx.try_select(Selected::Disconnected).is_ok() { |
| 154 | // Wake the thread up. |
| 155 | // |
| 156 | // Here we don't remove the entry from the queue. Registered threads must |
| 157 | // unregister from the waker by themselves. They might also want to recover the |
| 158 | // packet value and destroy it, if necessary. |
| 159 | entry.cx.unpark(); |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | self.notify(); |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | impl Drop for Waker { |
| 168 | #[inline ] |
| 169 | fn drop(&mut self) { |
| 170 | debug_assert_eq!(self.selectors.len(), 0); |
| 171 | debug_assert_eq!(self.observers.len(), 0); |
| 172 | } |
| 173 | } |
| 174 | |
| 175 | /// A waker that can be shared among threads without locking. |
| 176 | /// |
| 177 | /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization. |
| 178 | pub(crate) struct SyncWaker { |
| 179 | /// The inner `Waker`. |
| 180 | inner: Mutex<Waker>, |
| 181 | |
| 182 | /// `true` if the waker is empty. |
| 183 | is_empty: AtomicBool, |
| 184 | } |
| 185 | |
| 186 | impl SyncWaker { |
| 187 | /// Creates a new `SyncWaker`. |
| 188 | #[inline ] |
| 189 | pub(crate) fn new() -> Self { |
| 190 | SyncWaker { |
| 191 | inner: Mutex::new(Waker::new()), |
| 192 | is_empty: AtomicBool::new(true), |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | /// Registers the current thread with an operation. |
| 197 | #[inline ] |
| 198 | pub(crate) fn register(&self, oper: Operation, cx: &Context) { |
| 199 | let mut inner = self.inner.lock().unwrap(); |
| 200 | inner.register(oper, cx); |
| 201 | self.is_empty.store( |
| 202 | inner.selectors.is_empty() && inner.observers.is_empty(), |
| 203 | Ordering::SeqCst, |
| 204 | ); |
| 205 | } |
| 206 | |
| 207 | /// Unregisters an operation previously registered by the current thread. |
| 208 | #[inline ] |
| 209 | pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> { |
| 210 | let mut inner = self.inner.lock().unwrap(); |
| 211 | let entry = inner.unregister(oper); |
| 212 | self.is_empty.store( |
| 213 | inner.selectors.is_empty() && inner.observers.is_empty(), |
| 214 | Ordering::SeqCst, |
| 215 | ); |
| 216 | entry |
| 217 | } |
| 218 | |
| 219 | /// Attempts to find one thread (not the current one), select its operation, and wake it up. |
| 220 | #[inline ] |
| 221 | pub(crate) fn notify(&self) { |
| 222 | if !self.is_empty.load(Ordering::SeqCst) { |
| 223 | let mut inner = self.inner.lock().unwrap(); |
| 224 | if !self.is_empty.load(Ordering::SeqCst) { |
| 225 | inner.try_select(); |
| 226 | inner.notify(); |
| 227 | self.is_empty.store( |
| 228 | inner.selectors.is_empty() && inner.observers.is_empty(), |
| 229 | Ordering::SeqCst, |
| 230 | ); |
| 231 | } |
| 232 | } |
| 233 | } |
| 234 | |
| 235 | /// Registers an operation waiting to be ready. |
| 236 | #[inline ] |
| 237 | pub(crate) fn watch(&self, oper: Operation, cx: &Context) { |
| 238 | let mut inner = self.inner.lock().unwrap(); |
| 239 | inner.watch(oper, cx); |
| 240 | self.is_empty.store( |
| 241 | inner.selectors.is_empty() && inner.observers.is_empty(), |
| 242 | Ordering::SeqCst, |
| 243 | ); |
| 244 | } |
| 245 | |
| 246 | /// Unregisters an operation waiting to be ready. |
| 247 | #[inline ] |
| 248 | pub(crate) fn unwatch(&self, oper: Operation) { |
| 249 | let mut inner = self.inner.lock().unwrap(); |
| 250 | inner.unwatch(oper); |
| 251 | self.is_empty.store( |
| 252 | inner.selectors.is_empty() && inner.observers.is_empty(), |
| 253 | Ordering::SeqCst, |
| 254 | ); |
| 255 | } |
| 256 | |
| 257 | /// Notifies all threads that the channel is disconnected. |
| 258 | #[inline ] |
| 259 | pub(crate) fn disconnect(&self) { |
| 260 | let mut inner = self.inner.lock().unwrap(); |
| 261 | inner.disconnect(); |
| 262 | self.is_empty.store( |
| 263 | inner.selectors.is_empty() && inner.observers.is_empty(), |
| 264 | Ordering::SeqCst, |
| 265 | ); |
| 266 | } |
| 267 | } |
| 268 | |
| 269 | impl Drop for SyncWaker { |
| 270 | #[inline ] |
| 271 | fn drop(&mut self) { |
| 272 | debug_assert!(self.is_empty.load(Ordering::SeqCst)); |
| 273 | } |
| 274 | } |
| 275 | |
| 276 | /// Returns the id of the current thread. |
| 277 | #[inline ] |
| 278 | fn current_thread_id() -> ThreadId { |
| 279 | std::thread_local! { |
| 280 | /// Cached thread-local id. |
| 281 | static THREAD_ID: ThreadId = thread::current().id(); |
| 282 | } |
| 283 | |
| 284 | THREAD_ID |
| 285 | .try_with(|id| *id) |
| 286 | .unwrap_or_else(|_| thread::current().id()) |
| 287 | } |
| 288 | |