1//! Zero-capacity channel.
2//!
3//! This kind of channel is also known as *rendezvous* channel.
4
5use super::context::Context;
6use super::error::*;
7use super::select::{Operation, Selected, Token};
8use super::utils::Backoff;
9use super::waker::Waker;
10use crate::cell::UnsafeCell;
11use crate::marker::PhantomData;
12use crate::sync::Mutex;
13use crate::sync::atomic::{Atomic, AtomicBool, Ordering};
14use crate::time::Instant;
15use crate::{fmt, ptr};
16
17/// A pointer to a packet.
18pub(crate) struct ZeroToken(*mut ());
19
20impl Default for ZeroToken {
21 fn default() -> Self {
22 Self(ptr::null_mut())
23 }
24}
25
26impl fmt::Debug for ZeroToken {
27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28 fmt::Debug::fmt(&(self.0 as usize), f)
29 }
30}
31
32/// A slot for passing one message from a sender to a receiver.
33struct Packet<T> {
34 /// Equals `true` if the packet is allocated on the stack.
35 on_stack: bool,
36
37 /// Equals `true` once the packet is ready for reading or writing.
38 ready: Atomic<bool>,
39
40 /// The message.
41 msg: UnsafeCell<Option<T>>,
42}
43
44impl<T> Packet<T> {
45 /// Creates an empty packet on the stack.
46 fn empty_on_stack() -> Packet<T> {
47 Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(None) }
48 }
49
50 /// Creates a packet on the stack, containing a message.
51 fn message_on_stack(msg: T) -> Packet<T> {
52 Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(Some(msg)) }
53 }
54
55 /// Waits until the packet becomes ready for reading or writing.
56 fn wait_ready(&self) {
57 let backoff: Backoff = Backoff::new();
58 while !self.ready.load(order:Ordering::Acquire) {
59 backoff.spin_heavy();
60 }
61 }
62}
63
64/// Inner representation of a zero-capacity channel.
65struct Inner {
66 /// Senders waiting to pair up with a receive operation.
67 senders: Waker,
68
69 /// Receivers waiting to pair up with a send operation.
70 receivers: Waker,
71
72 /// Equals `true` when the channel is disconnected.
73 is_disconnected: bool,
74}
75
76/// Zero-capacity channel.
77pub(crate) struct Channel<T> {
78 /// Inner representation of the channel.
79 inner: Mutex<Inner>,
80
81 /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
82 _marker: PhantomData<T>,
83}
84
85impl<T> Channel<T> {
86 /// Constructs a new zero-capacity channel.
87 pub(crate) fn new() -> Self {
88 Channel {
89 inner: Mutex::new(Inner {
90 senders: Waker::new(),
91 receivers: Waker::new(),
92 is_disconnected: false,
93 }),
94 _marker: PhantomData,
95 }
96 }
97
98 /// Writes a message into the packet.
99 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
100 // If there is no packet, the channel is disconnected.
101 if token.zero.0.is_null() {
102 return Err(msg);
103 }
104
105 unsafe {
106 let packet = &*(token.zero.0 as *const Packet<T>);
107 packet.msg.get().write(Some(msg));
108 packet.ready.store(true, Ordering::Release);
109 }
110 Ok(())
111 }
112
113 /// Reads a message from the packet.
114 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
115 // If there is no packet, the channel is disconnected.
116 if token.zero.0.is_null() {
117 return Err(());
118 }
119
120 let packet = unsafe { &*(token.zero.0 as *const Packet<T>) };
121
122 if packet.on_stack {
123 // The message has been in the packet from the beginning, so there is no need to wait
124 // for it. However, after reading the message, we need to set `ready` to `true` in
125 // order to signal that the packet can be destroyed.
126 let msg = unsafe { packet.msg.get().replace(None) }.unwrap();
127 packet.ready.store(true, Ordering::Release);
128 Ok(msg)
129 } else {
130 // Wait until the message becomes available, then read it and destroy the
131 // heap-allocated packet.
132 packet.wait_ready();
133 unsafe {
134 let msg = packet.msg.get().replace(None).unwrap();
135 drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
136 Ok(msg)
137 }
138 }
139 }
140
141 /// Attempts to send a message into the channel.
142 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
143 let token = &mut Token::default();
144 let mut inner = self.inner.lock().unwrap();
145
146 // If there's a waiting receiver, pair up with it.
147 if let Some(operation) = inner.receivers.try_select() {
148 token.zero.0 = operation.packet;
149 drop(inner);
150 unsafe {
151 self.write(token, msg).ok().unwrap();
152 }
153 Ok(())
154 } else if inner.is_disconnected {
155 Err(TrySendError::Disconnected(msg))
156 } else {
157 Err(TrySendError::Full(msg))
158 }
159 }
160
161 /// Sends a message into the channel.
162 pub(crate) fn send(
163 &self,
164 msg: T,
165 deadline: Option<Instant>,
166 ) -> Result<(), SendTimeoutError<T>> {
167 let token = &mut Token::default();
168 let mut inner = self.inner.lock().unwrap();
169
170 // If there's a waiting receiver, pair up with it.
171 if let Some(operation) = inner.receivers.try_select() {
172 token.zero.0 = operation.packet;
173 drop(inner);
174 unsafe {
175 self.write(token, msg).ok().unwrap();
176 }
177 return Ok(());
178 }
179
180 if inner.is_disconnected {
181 return Err(SendTimeoutError::Disconnected(msg));
182 }
183
184 Context::with(|cx| {
185 // Prepare for blocking until a receiver wakes us up.
186 let oper = Operation::hook(token);
187 let mut packet = Packet::<T>::message_on_stack(msg);
188 inner.senders.register_with_packet(oper, (&raw mut packet) as *mut (), cx);
189 inner.receivers.notify();
190 drop(inner);
191
192 // Block the current thread.
193 // SAFETY: the context belongs to the current thread.
194 let sel = unsafe { cx.wait_until(deadline) };
195
196 match sel {
197 Selected::Waiting => unreachable!(),
198 Selected::Aborted => {
199 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
200 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
201 Err(SendTimeoutError::Timeout(msg))
202 }
203 Selected::Disconnected => {
204 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
205 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
206 Err(SendTimeoutError::Disconnected(msg))
207 }
208 Selected::Operation(_) => {
209 // Wait until the message is read, then drop the packet.
210 packet.wait_ready();
211 Ok(())
212 }
213 }
214 })
215 }
216
217 /// Attempts to receive a message without blocking.
218 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
219 let token = &mut Token::default();
220 let mut inner = self.inner.lock().unwrap();
221
222 // If there's a waiting sender, pair up with it.
223 if let Some(operation) = inner.senders.try_select() {
224 token.zero.0 = operation.packet;
225 drop(inner);
226 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
227 } else if inner.is_disconnected {
228 Err(TryRecvError::Disconnected)
229 } else {
230 Err(TryRecvError::Empty)
231 }
232 }
233
234 /// Receives a message from the channel.
235 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
236 let token = &mut Token::default();
237 let mut inner = self.inner.lock().unwrap();
238
239 // If there's a waiting sender, pair up with it.
240 if let Some(operation) = inner.senders.try_select() {
241 token.zero.0 = operation.packet;
242 drop(inner);
243 unsafe {
244 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
245 }
246 }
247
248 if inner.is_disconnected {
249 return Err(RecvTimeoutError::Disconnected);
250 }
251
252 Context::with(|cx| {
253 // Prepare for blocking until a sender wakes us up.
254 let oper = Operation::hook(token);
255 let mut packet = Packet::<T>::empty_on_stack();
256 inner.receivers.register_with_packet(oper, (&raw mut packet) as *mut (), cx);
257 inner.senders.notify();
258 drop(inner);
259
260 // Block the current thread.
261 // SAFETY: the context belongs to the current thread.
262 let sel = unsafe { cx.wait_until(deadline) };
263
264 match sel {
265 Selected::Waiting => unreachable!(),
266 Selected::Aborted => {
267 self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
268 Err(RecvTimeoutError::Timeout)
269 }
270 Selected::Disconnected => {
271 self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
272 Err(RecvTimeoutError::Disconnected)
273 }
274 Selected::Operation(_) => {
275 // Wait until the message is provided, then read it.
276 packet.wait_ready();
277 unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
278 }
279 }
280 })
281 }
282
283 /// Disconnects the channel and wakes up all blocked senders and receivers.
284 ///
285 /// Returns `true` if this call disconnected the channel.
286 pub(crate) fn disconnect(&self) -> bool {
287 let mut inner = self.inner.lock().unwrap();
288
289 if !inner.is_disconnected {
290 inner.is_disconnected = true;
291 inner.senders.disconnect();
292 inner.receivers.disconnect();
293 true
294 } else {
295 false
296 }
297 }
298
299 /// Returns the current number of messages inside the channel.
300 pub(crate) fn len(&self) -> usize {
301 0
302 }
303
304 /// Returns the capacity of the channel.
305 #[allow(clippy::unnecessary_wraps)] // This is intentional.
306 pub(crate) fn capacity(&self) -> Option<usize> {
307 Some(0)
308 }
309
310 /// Returns `true` if the channel is empty.
311 pub(crate) fn is_empty(&self) -> bool {
312 true
313 }
314
315 /// Returns `true` if the channel is full.
316 pub(crate) fn is_full(&self) -> bool {
317 true
318 }
319}
320