1//! Zero-capacity channel.
2//!
3//! This kind of channel is also known as *rendezvous* channel.
4
5use super::context::Context;
6use super::error::*;
7use super::select::{Operation, Selected, Token};
8use super::utils::Backoff;
9use super::waker::Waker;
10
11use crate::cell::UnsafeCell;
12use crate::marker::PhantomData;
13use crate::sync::atomic::{AtomicBool, Ordering};
14use crate::sync::Mutex;
15use crate::time::Instant;
16use crate::{fmt, ptr};
17
18/// A pointer to a packet.
19pub(crate) struct ZeroToken(*mut ());
20
21impl Default for ZeroToken {
22 fn default() -> Self {
23 Self(ptr::null_mut())
24 }
25}
26
27impl fmt::Debug for ZeroToken {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 fmt::Debug::fmt(&(self.0 as usize), f)
30 }
31}
32
33/// A slot for passing one message from a sender to a receiver.
34struct Packet<T> {
35 /// Equals `true` if the packet is allocated on the stack.
36 on_stack: bool,
37
38 /// Equals `true` once the packet is ready for reading or writing.
39 ready: AtomicBool,
40
41 /// The message.
42 msg: UnsafeCell<Option<T>>,
43}
44
45impl<T> Packet<T> {
46 /// Creates an empty packet on the stack.
47 fn empty_on_stack() -> Packet<T> {
48 Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(None) }
49 }
50
51 /// Creates a packet on the stack, containing a message.
52 fn message_on_stack(msg: T) -> Packet<T> {
53 Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(Some(msg)) }
54 }
55
56 /// Waits until the packet becomes ready for reading or writing.
57 fn wait_ready(&self) {
58 let backoff: Backoff = Backoff::new();
59 while !self.ready.load(order:Ordering::Acquire) {
60 backoff.spin_heavy();
61 }
62 }
63}
64
65/// Inner representation of a zero-capacity channel.
66struct Inner {
67 /// Senders waiting to pair up with a receive operation.
68 senders: Waker,
69
70 /// Receivers waiting to pair up with a send operation.
71 receivers: Waker,
72
73 /// Equals `true` when the channel is disconnected.
74 is_disconnected: bool,
75}
76
77/// Zero-capacity channel.
78pub(crate) struct Channel<T> {
79 /// Inner representation of the channel.
80 inner: Mutex<Inner>,
81
82 /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
83 _marker: PhantomData<T>,
84}
85
86impl<T> Channel<T> {
87 /// Constructs a new zero-capacity channel.
88 pub(crate) fn new() -> Self {
89 Channel {
90 inner: Mutex::new(Inner {
91 senders: Waker::new(),
92 receivers: Waker::new(),
93 is_disconnected: false,
94 }),
95 _marker: PhantomData,
96 }
97 }
98
99 /// Writes a message into the packet.
100 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
101 // If there is no packet, the channel is disconnected.
102 if token.zero.0.is_null() {
103 return Err(msg);
104 }
105
106 let packet = &*(token.zero.0 as *const Packet<T>);
107 packet.msg.get().write(Some(msg));
108 packet.ready.store(true, Ordering::Release);
109 Ok(())
110 }
111
112 /// Reads a message from the packet.
113 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
114 // If there is no packet, the channel is disconnected.
115 if token.zero.0.is_null() {
116 return Err(());
117 }
118
119 let packet = &*(token.zero.0 as *const Packet<T>);
120
121 if packet.on_stack {
122 // The message has been in the packet from the beginning, so there is no need to wait
123 // for it. However, after reading the message, we need to set `ready` to `true` in
124 // order to signal that the packet can be destroyed.
125 let msg = packet.msg.get().replace(None).unwrap();
126 packet.ready.store(true, Ordering::Release);
127 Ok(msg)
128 } else {
129 // Wait until the message becomes available, then read it and destroy the
130 // heap-allocated packet.
131 packet.wait_ready();
132 let msg = packet.msg.get().replace(None).unwrap();
133 drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
134 Ok(msg)
135 }
136 }
137
138 /// Attempts to send a message into the channel.
139 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
140 let token = &mut Token::default();
141 let mut inner = self.inner.lock().unwrap();
142
143 // If there's a waiting receiver, pair up with it.
144 if let Some(operation) = inner.receivers.try_select() {
145 token.zero.0 = operation.packet;
146 drop(inner);
147 unsafe {
148 self.write(token, msg).ok().unwrap();
149 }
150 Ok(())
151 } else if inner.is_disconnected {
152 Err(TrySendError::Disconnected(msg))
153 } else {
154 Err(TrySendError::Full(msg))
155 }
156 }
157
158 /// Sends a message into the channel.
159 pub(crate) fn send(
160 &self,
161 msg: T,
162 deadline: Option<Instant>,
163 ) -> Result<(), SendTimeoutError<T>> {
164 let token = &mut Token::default();
165 let mut inner = self.inner.lock().unwrap();
166
167 // If there's a waiting receiver, pair up with it.
168 if let Some(operation) = inner.receivers.try_select() {
169 token.zero.0 = operation.packet;
170 drop(inner);
171 unsafe {
172 self.write(token, msg).ok().unwrap();
173 }
174 return Ok(());
175 }
176
177 if inner.is_disconnected {
178 return Err(SendTimeoutError::Disconnected(msg));
179 }
180
181 Context::with(|cx| {
182 // Prepare for blocking until a receiver wakes us up.
183 let oper = Operation::hook(token);
184 let mut packet = Packet::<T>::message_on_stack(msg);
185 inner.senders.register_with_packet(
186 oper,
187 core::ptr::addr_of_mut!(packet) as *mut (),
188 cx,
189 );
190 inner.receivers.notify();
191 drop(inner);
192
193 // Block the current thread.
194 let sel = cx.wait_until(deadline);
195
196 match sel {
197 Selected::Waiting => unreachable!(),
198 Selected::Aborted => {
199 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
200 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
201 Err(SendTimeoutError::Timeout(msg))
202 }
203 Selected::Disconnected => {
204 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
205 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
206 Err(SendTimeoutError::Disconnected(msg))
207 }
208 Selected::Operation(_) => {
209 // Wait until the message is read, then drop the packet.
210 packet.wait_ready();
211 Ok(())
212 }
213 }
214 })
215 }
216
217 /// Attempts to receive a message without blocking.
218 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
219 let token = &mut Token::default();
220 let mut inner = self.inner.lock().unwrap();
221
222 // If there's a waiting sender, pair up with it.
223 if let Some(operation) = inner.senders.try_select() {
224 token.zero.0 = operation.packet;
225 drop(inner);
226 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
227 } else if inner.is_disconnected {
228 Err(TryRecvError::Disconnected)
229 } else {
230 Err(TryRecvError::Empty)
231 }
232 }
233
234 /// Receives a message from the channel.
235 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
236 let token = &mut Token::default();
237 let mut inner = self.inner.lock().unwrap();
238
239 // If there's a waiting sender, pair up with it.
240 if let Some(operation) = inner.senders.try_select() {
241 token.zero.0 = operation.packet;
242 drop(inner);
243 unsafe {
244 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
245 }
246 }
247
248 if inner.is_disconnected {
249 return Err(RecvTimeoutError::Disconnected);
250 }
251
252 Context::with(|cx| {
253 // Prepare for blocking until a sender wakes us up.
254 let oper = Operation::hook(token);
255 let mut packet = Packet::<T>::empty_on_stack();
256 inner.receivers.register_with_packet(
257 oper,
258 core::ptr::addr_of_mut!(packet) as *mut (),
259 cx,
260 );
261 inner.senders.notify();
262 drop(inner);
263
264 // Block the current thread.
265 let sel = cx.wait_until(deadline);
266
267 match sel {
268 Selected::Waiting => unreachable!(),
269 Selected::Aborted => {
270 self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
271 Err(RecvTimeoutError::Timeout)
272 }
273 Selected::Disconnected => {
274 self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
275 Err(RecvTimeoutError::Disconnected)
276 }
277 Selected::Operation(_) => {
278 // Wait until the message is provided, then read it.
279 packet.wait_ready();
280 unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
281 }
282 }
283 })
284 }
285
286 /// Disconnects the channel and wakes up all blocked senders and receivers.
287 ///
288 /// Returns `true` if this call disconnected the channel.
289 pub(crate) fn disconnect(&self) -> bool {
290 let mut inner = self.inner.lock().unwrap();
291
292 if !inner.is_disconnected {
293 inner.is_disconnected = true;
294 inner.senders.disconnect();
295 inner.receivers.disconnect();
296 true
297 } else {
298 false
299 }
300 }
301
302 /// Returns the current number of messages inside the channel.
303 pub(crate) fn len(&self) -> usize {
304 0
305 }
306
307 /// Returns the capacity of the channel.
308 #[allow(clippy::unnecessary_wraps)] // This is intentional.
309 pub(crate) fn capacity(&self) -> Option<usize> {
310 Some(0)
311 }
312
313 /// Returns `true` if the channel is empty.
314 pub(crate) fn is_empty(&self) -> bool {
315 true
316 }
317
318 /// Returns `true` if the channel is full.
319 pub(crate) fn is_full(&self) -> bool {
320 true
321 }
322}
323