1//! Waking mechanism for threads blocked on channel operations.
2
3use std::ptr;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Mutex;
6use std::thread::{self, ThreadId};
7
8use crate::context::Context;
9use crate::select::{Operation, Selected};
10
11/// Represents a thread blocked on a specific channel operation.
12pub(crate) struct Entry {
13 /// The operation.
14 pub(crate) oper: Operation,
15
16 /// Optional packet.
17 pub(crate) packet: *mut (),
18
19 /// Context associated with the thread owning this operation.
20 pub(crate) cx: Context,
21}
22
23/// A queue of threads blocked on channel operations.
24///
25/// This data structure is used by threads to register blocking operations and get woken up once
26/// an operation becomes ready.
27pub(crate) struct Waker {
28 /// A list of select operations.
29 selectors: Vec<Entry>,
30
31 /// A list of operations waiting to be ready.
32 observers: Vec<Entry>,
33}
34
35impl Waker {
36 /// Creates a new `Waker`.
37 #[inline]
38 pub(crate) fn new() -> Self {
39 Waker {
40 selectors: Vec::new(),
41 observers: Vec::new(),
42 }
43 }
44
45 /// Registers a select operation.
46 #[inline]
47 pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
48 self.register_with_packet(oper, ptr::null_mut(), cx);
49 }
50
51 /// Registers a select operation and a packet.
52 #[inline]
53 pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
54 self.selectors.push(Entry {
55 oper,
56 packet,
57 cx: cx.clone(),
58 });
59 }
60
61 /// Unregisters a select operation.
62 #[inline]
63 pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
64 if let Some((i, _)) = self
65 .selectors
66 .iter()
67 .enumerate()
68 .find(|&(_, entry)| entry.oper == oper)
69 {
70 let entry = self.selectors.remove(i);
71 Some(entry)
72 } else {
73 None
74 }
75 }
76
77 /// Attempts to find another thread's entry, select the operation, and wake it up.
78 #[inline]
79 pub(crate) fn try_select(&mut self) -> Option<Entry> {
80 if self.selectors.is_empty() {
81 None
82 } else {
83 let thread_id = current_thread_id();
84
85 self.selectors
86 .iter()
87 .position(|selector| {
88 // Does the entry belong to a different thread?
89 selector.cx.thread_id() != thread_id
90 && selector // Try selecting this operation.
91 .cx
92 .try_select(Selected::Operation(selector.oper))
93 .is_ok()
94 && {
95 // Provide the packet.
96 selector.cx.store_packet(selector.packet);
97 // Wake the thread up.
98 selector.cx.unpark();
99 true
100 }
101 })
102 // Remove the entry from the queue to keep it clean and improve
103 // performance.
104 .map(|pos| self.selectors.remove(pos))
105 }
106 }
107
108 /// Returns `true` if there is an entry which can be selected by the current thread.
109 #[inline]
110 pub(crate) fn can_select(&self) -> bool {
111 if self.selectors.is_empty() {
112 false
113 } else {
114 let thread_id = current_thread_id();
115
116 self.selectors.iter().any(|entry| {
117 entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
118 })
119 }
120 }
121
122 /// Registers an operation waiting to be ready.
123 #[inline]
124 pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
125 self.observers.push(Entry {
126 oper,
127 packet: ptr::null_mut(),
128 cx: cx.clone(),
129 });
130 }
131
132 /// Unregisters an operation waiting to be ready.
133 #[inline]
134 pub(crate) fn unwatch(&mut self, oper: Operation) {
135 self.observers.retain(|e| e.oper != oper);
136 }
137
138 /// Notifies all operations waiting to be ready.
139 #[inline]
140 pub(crate) fn notify(&mut self) {
141 for entry in self.observers.drain(..) {
142 if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
143 entry.cx.unpark();
144 }
145 }
146 }
147
148 /// Notifies all registered operations that the channel is disconnected.
149 #[inline]
150 pub(crate) fn disconnect(&mut self) {
151 for entry in self.selectors.iter() {
152 if entry.cx.try_select(Selected::Disconnected).is_ok() {
153 // Wake the thread up.
154 //
155 // Here we don't remove the entry from the queue. Registered threads must
156 // unregister from the waker by themselves. They might also want to recover the
157 // packet value and destroy it, if necessary.
158 entry.cx.unpark();
159 }
160 }
161
162 self.notify();
163 }
164}
165
166impl Drop for Waker {
167 #[inline]
168 fn drop(&mut self) {
169 debug_assert_eq!(self.selectors.len(), 0);
170 debug_assert_eq!(self.observers.len(), 0);
171 }
172}
173
174/// A waker that can be shared among threads without locking.
175///
176/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
177pub(crate) struct SyncWaker {
178 /// The inner `Waker`.
179 inner: Mutex<Waker>,
180
181 /// `true` if the waker is empty.
182 is_empty: AtomicBool,
183}
184
185impl SyncWaker {
186 /// Creates a new `SyncWaker`.
187 #[inline]
188 pub(crate) fn new() -> Self {
189 SyncWaker {
190 inner: Mutex::new(Waker::new()),
191 is_empty: AtomicBool::new(true),
192 }
193 }
194
195 /// Registers the current thread with an operation.
196 #[inline]
197 pub(crate) fn register(&self, oper: Operation, cx: &Context) {
198 let mut inner = self.inner.lock().unwrap();
199 inner.register(oper, cx);
200 self.is_empty.store(
201 inner.selectors.is_empty() && inner.observers.is_empty(),
202 Ordering::SeqCst,
203 );
204 }
205
206 /// Unregisters an operation previously registered by the current thread.
207 #[inline]
208 pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
209 let mut inner = self.inner.lock().unwrap();
210 let entry = inner.unregister(oper);
211 self.is_empty.store(
212 inner.selectors.is_empty() && inner.observers.is_empty(),
213 Ordering::SeqCst,
214 );
215 entry
216 }
217
218 /// Attempts to find one thread (not the current one), select its operation, and wake it up.
219 #[inline]
220 pub(crate) fn notify(&self) {
221 if !self.is_empty.load(Ordering::SeqCst) {
222 let mut inner = self.inner.lock().unwrap();
223 if !self.is_empty.load(Ordering::SeqCst) {
224 inner.try_select();
225 inner.notify();
226 self.is_empty.store(
227 inner.selectors.is_empty() && inner.observers.is_empty(),
228 Ordering::SeqCst,
229 );
230 }
231 }
232 }
233
234 /// Registers an operation waiting to be ready.
235 #[inline]
236 pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
237 let mut inner = self.inner.lock().unwrap();
238 inner.watch(oper, cx);
239 self.is_empty.store(
240 inner.selectors.is_empty() && inner.observers.is_empty(),
241 Ordering::SeqCst,
242 );
243 }
244
245 /// Unregisters an operation waiting to be ready.
246 #[inline]
247 pub(crate) fn unwatch(&self, oper: Operation) {
248 let mut inner = self.inner.lock().unwrap();
249 inner.unwatch(oper);
250 self.is_empty.store(
251 inner.selectors.is_empty() && inner.observers.is_empty(),
252 Ordering::SeqCst,
253 );
254 }
255
256 /// Notifies all threads that the channel is disconnected.
257 #[inline]
258 pub(crate) fn disconnect(&self) {
259 let mut inner = self.inner.lock().unwrap();
260 inner.disconnect();
261 self.is_empty.store(
262 inner.selectors.is_empty() && inner.observers.is_empty(),
263 Ordering::SeqCst,
264 );
265 }
266}
267
268impl Drop for SyncWaker {
269 #[inline]
270 fn drop(&mut self) {
271 debug_assert!(self.is_empty.load(Ordering::SeqCst));
272 }
273}
274
275/// Returns the id of the current thread.
276#[inline]
277fn current_thread_id() -> ThreadId {
278 thread_local! {
279 /// Cached thread-local id.
280 static THREAD_ID: ThreadId = thread::current().id();
281 }
282
283 THREAD_ID
284 .try_with(|id| *id)
285 .unwrap_or_else(|_| thread::current().id())
286}
287