1 | //! Waking mechanism for threads blocked on channel operations. |
2 | |
3 | use super::context::Context; |
4 | use super::select::{Operation, Selected}; |
5 | |
6 | use crate::ptr; |
7 | use crate::sync::atomic::{AtomicBool, Ordering}; |
8 | use crate::sync::Mutex; |
9 | |
10 | /// Represents a thread blocked on a specific channel operation. |
11 | pub(crate) struct Entry { |
12 | /// The operation. |
13 | pub(crate) oper: Operation, |
14 | |
15 | /// Optional packet. |
16 | pub(crate) packet: *mut (), |
17 | |
18 | /// Context associated with the thread owning this operation. |
19 | pub(crate) cx: Context, |
20 | } |
21 | |
22 | /// A queue of threads blocked on channel operations. |
23 | /// |
24 | /// This data structure is used by threads to register blocking operations and get woken up once |
25 | /// an operation becomes ready. |
26 | pub(crate) struct Waker { |
27 | /// A list of select operations. |
28 | selectors: Vec<Entry>, |
29 | |
30 | /// A list of operations waiting to be ready. |
31 | observers: Vec<Entry>, |
32 | } |
33 | |
34 | impl Waker { |
35 | /// Creates a new `Waker`. |
36 | #[inline ] |
37 | pub(crate) fn new() -> Self { |
38 | Waker { selectors: Vec::new(), observers: Vec::new() } |
39 | } |
40 | |
41 | /// Registers a select operation. |
42 | #[inline ] |
43 | pub(crate) fn register(&mut self, oper: Operation, cx: &Context) { |
44 | self.register_with_packet(oper, ptr::null_mut(), cx); |
45 | } |
46 | |
47 | /// Registers a select operation and a packet. |
48 | #[inline ] |
49 | pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) { |
50 | self.selectors.push(Entry { oper, packet, cx: cx.clone() }); |
51 | } |
52 | |
53 | /// Unregisters a select operation. |
54 | #[inline ] |
55 | pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> { |
56 | if let Some((i, _)) = |
57 | self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper) |
58 | { |
59 | let entry = self.selectors.remove(i); |
60 | Some(entry) |
61 | } else { |
62 | None |
63 | } |
64 | } |
65 | |
66 | /// Attempts to find another thread's entry, select the operation, and wake it up. |
67 | #[inline ] |
68 | pub(crate) fn try_select(&mut self) -> Option<Entry> { |
69 | if self.selectors.is_empty() { |
70 | None |
71 | } else { |
72 | let thread_id = current_thread_id(); |
73 | |
74 | self.selectors |
75 | .iter() |
76 | .position(|selector| { |
77 | // Does the entry belong to a different thread? |
78 | selector.cx.thread_id() != thread_id |
79 | && selector // Try selecting this operation. |
80 | .cx |
81 | .try_select(Selected::Operation(selector.oper)) |
82 | .is_ok() |
83 | && { |
84 | // Provide the packet. |
85 | selector.cx.store_packet(selector.packet); |
86 | // Wake the thread up. |
87 | selector.cx.unpark(); |
88 | true |
89 | } |
90 | }) |
91 | // Remove the entry from the queue to keep it clean and improve |
92 | // performance. |
93 | .map(|pos| self.selectors.remove(pos)) |
94 | } |
95 | } |
96 | |
97 | /// Notifies all operations waiting to be ready. |
98 | #[inline ] |
99 | pub(crate) fn notify(&mut self) { |
100 | for entry in self.observers.drain(..) { |
101 | if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() { |
102 | entry.cx.unpark(); |
103 | } |
104 | } |
105 | } |
106 | |
107 | /// Notifies all registered operations that the channel is disconnected. |
108 | #[inline ] |
109 | pub(crate) fn disconnect(&mut self) { |
110 | for entry in self.selectors.iter() { |
111 | if entry.cx.try_select(Selected::Disconnected).is_ok() { |
112 | // Wake the thread up. |
113 | // |
114 | // Here we don't remove the entry from the queue. Registered threads must |
115 | // unregister from the waker by themselves. They might also want to recover the |
116 | // packet value and destroy it, if necessary. |
117 | entry.cx.unpark(); |
118 | } |
119 | } |
120 | |
121 | self.notify(); |
122 | } |
123 | } |
124 | |
125 | impl Drop for Waker { |
126 | #[inline ] |
127 | fn drop(&mut self) { |
128 | debug_assert_eq!(self.selectors.len(), 0); |
129 | debug_assert_eq!(self.observers.len(), 0); |
130 | } |
131 | } |
132 | |
133 | /// A waker that can be shared among threads without locking. |
134 | /// |
135 | /// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization. |
136 | pub(crate) struct SyncWaker { |
137 | /// The inner `Waker`. |
138 | inner: Mutex<Waker>, |
139 | |
140 | /// `true` if the waker is empty. |
141 | is_empty: AtomicBool, |
142 | } |
143 | |
144 | impl SyncWaker { |
145 | /// Creates a new `SyncWaker`. |
146 | #[inline ] |
147 | pub(crate) fn new() -> Self { |
148 | SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) } |
149 | } |
150 | |
151 | /// Registers the current thread with an operation. |
152 | #[inline ] |
153 | pub(crate) fn register(&self, oper: Operation, cx: &Context) { |
154 | let mut inner = self.inner.lock().unwrap(); |
155 | inner.register(oper, cx); |
156 | self.is_empty |
157 | .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); |
158 | } |
159 | |
160 | /// Unregisters an operation previously registered by the current thread. |
161 | #[inline ] |
162 | pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> { |
163 | let mut inner = self.inner.lock().unwrap(); |
164 | let entry = inner.unregister(oper); |
165 | self.is_empty |
166 | .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); |
167 | entry |
168 | } |
169 | |
170 | /// Attempts to find one thread (not the current one), select its operation, and wake it up. |
171 | #[inline ] |
172 | pub(crate) fn notify(&self) { |
173 | if !self.is_empty.load(Ordering::SeqCst) { |
174 | let mut inner = self.inner.lock().unwrap(); |
175 | if !self.is_empty.load(Ordering::SeqCst) { |
176 | inner.try_select(); |
177 | inner.notify(); |
178 | self.is_empty.store( |
179 | inner.selectors.is_empty() && inner.observers.is_empty(), |
180 | Ordering::SeqCst, |
181 | ); |
182 | } |
183 | } |
184 | } |
185 | |
186 | /// Notifies all threads that the channel is disconnected. |
187 | #[inline ] |
188 | pub(crate) fn disconnect(&self) { |
189 | let mut inner = self.inner.lock().unwrap(); |
190 | inner.disconnect(); |
191 | self.is_empty |
192 | .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); |
193 | } |
194 | } |
195 | |
196 | impl Drop for SyncWaker { |
197 | #[inline ] |
198 | fn drop(&mut self) { |
199 | debug_assert!(self.is_empty.load(Ordering::SeqCst)); |
200 | } |
201 | } |
202 | |
203 | /// Returns a unique id for the current thread. |
204 | #[inline ] |
205 | pub fn current_thread_id() -> usize { |
206 | // `u8` is not drop so this variable will be available during thread destruction, |
207 | // whereas `thread::current()` would not be |
208 | thread_local! { static DUMMY: u8 = 0 } |
209 | DUMMY.with(|x: &u8| (x as *const u8).addr()) |
210 | } |
211 | |