1//! Waking mechanism for threads blocked on channel operations.
2
3use super::context::Context;
4use super::select::{Operation, Selected};
5
6use crate::ptr;
7use crate::sync::atomic::{AtomicBool, Ordering};
8use crate::sync::Mutex;
9
10/// Represents a thread blocked on a specific channel operation.
11pub(crate) struct Entry {
12 /// The operation.
13 pub(crate) oper: Operation,
14
15 /// Optional packet.
16 pub(crate) packet: *mut (),
17
18 /// Context associated with the thread owning this operation.
19 pub(crate) cx: Context,
20}
21
22/// A queue of threads blocked on channel operations.
23///
24/// This data structure is used by threads to register blocking operations and get woken up once
25/// an operation becomes ready.
26pub(crate) struct Waker {
27 /// A list of select operations.
28 selectors: Vec<Entry>,
29
30 /// A list of operations waiting to be ready.
31 observers: Vec<Entry>,
32}
33
34impl Waker {
35 /// Creates a new `Waker`.
36 #[inline]
37 pub(crate) fn new() -> Self {
38 Waker { selectors: Vec::new(), observers: Vec::new() }
39 }
40
41 /// Registers a select operation.
42 #[inline]
43 pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
44 self.register_with_packet(oper, ptr::null_mut(), cx);
45 }
46
47 /// Registers a select operation and a packet.
48 #[inline]
49 pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
50 self.selectors.push(Entry { oper, packet, cx: cx.clone() });
51 }
52
53 /// Unregisters a select operation.
54 #[inline]
55 pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
56 if let Some((i, _)) =
57 self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper)
58 {
59 let entry = self.selectors.remove(i);
60 Some(entry)
61 } else {
62 None
63 }
64 }
65
66 /// Attempts to find another thread's entry, select the operation, and wake it up.
67 #[inline]
68 pub(crate) fn try_select(&mut self) -> Option<Entry> {
69 if self.selectors.is_empty() {
70 None
71 } else {
72 let thread_id = current_thread_id();
73
74 self.selectors
75 .iter()
76 .position(|selector| {
77 // Does the entry belong to a different thread?
78 selector.cx.thread_id() != thread_id
79 && selector // Try selecting this operation.
80 .cx
81 .try_select(Selected::Operation(selector.oper))
82 .is_ok()
83 && {
84 // Provide the packet.
85 selector.cx.store_packet(selector.packet);
86 // Wake the thread up.
87 selector.cx.unpark();
88 true
89 }
90 })
91 // Remove the entry from the queue to keep it clean and improve
92 // performance.
93 .map(|pos| self.selectors.remove(pos))
94 }
95 }
96
97 /// Notifies all operations waiting to be ready.
98 #[inline]
99 pub(crate) fn notify(&mut self) {
100 for entry in self.observers.drain(..) {
101 if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
102 entry.cx.unpark();
103 }
104 }
105 }
106
107 /// Notifies all registered operations that the channel is disconnected.
108 #[inline]
109 pub(crate) fn disconnect(&mut self) {
110 for entry in self.selectors.iter() {
111 if entry.cx.try_select(Selected::Disconnected).is_ok() {
112 // Wake the thread up.
113 //
114 // Here we don't remove the entry from the queue. Registered threads must
115 // unregister from the waker by themselves. They might also want to recover the
116 // packet value and destroy it, if necessary.
117 entry.cx.unpark();
118 }
119 }
120
121 self.notify();
122 }
123}
124
125impl Drop for Waker {
126 #[inline]
127 fn drop(&mut self) {
128 debug_assert_eq!(self.selectors.len(), 0);
129 debug_assert_eq!(self.observers.len(), 0);
130 }
131}
132
133/// A waker that can be shared among threads without locking.
134///
135/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
136pub(crate) struct SyncWaker {
137 /// The inner `Waker`.
138 inner: Mutex<Waker>,
139
140 /// `true` if the waker is empty.
141 is_empty: AtomicBool,
142}
143
144impl SyncWaker {
145 /// Creates a new `SyncWaker`.
146 #[inline]
147 pub(crate) fn new() -> Self {
148 SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) }
149 }
150
151 /// Registers the current thread with an operation.
152 #[inline]
153 pub(crate) fn register(&self, oper: Operation, cx: &Context) {
154 let mut inner = self.inner.lock().unwrap();
155 inner.register(oper, cx);
156 self.is_empty
157 .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
158 }
159
160 /// Unregisters an operation previously registered by the current thread.
161 #[inline]
162 pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
163 let mut inner = self.inner.lock().unwrap();
164 let entry = inner.unregister(oper);
165 self.is_empty
166 .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
167 entry
168 }
169
170 /// Attempts to find one thread (not the current one), select its operation, and wake it up.
171 #[inline]
172 pub(crate) fn notify(&self) {
173 if !self.is_empty.load(Ordering::SeqCst) {
174 let mut inner = self.inner.lock().unwrap();
175 if !self.is_empty.load(Ordering::SeqCst) {
176 inner.try_select();
177 inner.notify();
178 self.is_empty.store(
179 inner.selectors.is_empty() && inner.observers.is_empty(),
180 Ordering::SeqCst,
181 );
182 }
183 }
184 }
185
186 /// Notifies all threads that the channel is disconnected.
187 #[inline]
188 pub(crate) fn disconnect(&self) {
189 let mut inner = self.inner.lock().unwrap();
190 inner.disconnect();
191 self.is_empty
192 .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
193 }
194}
195
196impl Drop for SyncWaker {
197 #[inline]
198 fn drop(&mut self) {
199 debug_assert!(self.is_empty.load(Ordering::SeqCst));
200 }
201}
202
203/// Returns a unique id for the current thread.
204#[inline]
205pub fn current_thread_id() -> usize {
206 // `u8` is not drop so this variable will be available during thread destruction,
207 // whereas `thread::current()` would not be
208 thread_local! { static DUMMY: u8 = 0 }
209 DUMMY.with(|x: &u8| (x as *const u8).addr())
210}
211