1//! Zero-capacity channel.
2//!
3//! This kind of channel is also known as *rendezvous* channel.
4
5use std::boxed::Box;
6use std::cell::UnsafeCell;
7use std::marker::PhantomData;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Mutex;
10use std::time::Instant;
11use std::{fmt, ptr};
12
13use crossbeam_utils::Backoff;
14
15use crate::context::Context;
16use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
17use crate::select::{Operation, SelectHandle, Selected, Token};
18use crate::waker::Waker;
19
20/// A pointer to a packet.
21pub(crate) struct ZeroToken(*mut ());
22
23impl Default for ZeroToken {
24 fn default() -> Self {
25 Self(ptr::null_mut())
26 }
27}
28
29impl fmt::Debug for ZeroToken {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 fmt::Debug::fmt(&(self.0 as usize), f)
32 }
33}
34
35/// A slot for passing one message from a sender to a receiver.
36struct Packet<T> {
37 /// Equals `true` if the packet is allocated on the stack.
38 on_stack: bool,
39
40 /// Equals `true` once the packet is ready for reading or writing.
41 ready: AtomicBool,
42
43 /// The message.
44 msg: UnsafeCell<Option<T>>,
45}
46
47impl<T> Packet<T> {
48 /// Creates an empty packet on the stack.
49 fn empty_on_stack() -> Packet<T> {
50 Packet {
51 on_stack: true,
52 ready: AtomicBool::new(false),
53 msg: UnsafeCell::new(None),
54 }
55 }
56
57 /// Creates an empty packet on the heap.
58 fn empty_on_heap() -> Box<Packet<T>> {
59 Box::new(Packet {
60 on_stack: false,
61 ready: AtomicBool::new(false),
62 msg: UnsafeCell::new(None),
63 })
64 }
65
66 /// Creates a packet on the stack, containing a message.
67 fn message_on_stack(msg: T) -> Packet<T> {
68 Packet {
69 on_stack: true,
70 ready: AtomicBool::new(false),
71 msg: UnsafeCell::new(Some(msg)),
72 }
73 }
74
75 /// Waits until the packet becomes ready for reading or writing.
76 fn wait_ready(&self) {
77 let backoff = Backoff::new();
78 while !self.ready.load(Ordering::Acquire) {
79 backoff.snooze();
80 }
81 }
82}
83
84/// Inner representation of a zero-capacity channel.
85struct Inner {
86 /// Senders waiting to pair up with a receive operation.
87 senders: Waker,
88
89 /// Receivers waiting to pair up with a send operation.
90 receivers: Waker,
91
92 /// Equals `true` when the channel is disconnected.
93 is_disconnected: bool,
94}
95
96/// Zero-capacity channel.
97pub(crate) struct Channel<T> {
98 /// Inner representation of the channel.
99 inner: Mutex<Inner>,
100
101 /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
102 _marker: PhantomData<T>,
103}
104
105impl<T> Channel<T> {
106 /// Constructs a new zero-capacity channel.
107 pub(crate) fn new() -> Self {
108 Channel {
109 inner: Mutex::new(Inner {
110 senders: Waker::new(),
111 receivers: Waker::new(),
112 is_disconnected: false,
113 }),
114 _marker: PhantomData,
115 }
116 }
117
118 /// Returns a receiver handle to the channel.
119 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
120 Receiver(self)
121 }
122
123 /// Returns a sender handle to the channel.
124 pub(crate) fn sender(&self) -> Sender<'_, T> {
125 Sender(self)
126 }
127
128 /// Attempts to reserve a slot for sending a message.
129 fn start_send(&self, token: &mut Token) -> bool {
130 let mut inner = self.inner.lock().unwrap();
131
132 // If there's a waiting receiver, pair up with it.
133 if let Some(operation) = inner.receivers.try_select() {
134 token.zero.0 = operation.packet;
135 true
136 } else if inner.is_disconnected {
137 token.zero.0 = ptr::null_mut();
138 true
139 } else {
140 false
141 }
142 }
143
144 /// Writes a message into the packet.
145 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
146 // If there is no packet, the channel is disconnected.
147 if token.zero.0.is_null() {
148 return Err(msg);
149 }
150
151 let packet = &*(token.zero.0 as *const Packet<T>);
152 packet.msg.get().write(Some(msg));
153 packet.ready.store(true, Ordering::Release);
154 Ok(())
155 }
156
157 /// Attempts to pair up with a sender.
158 fn start_recv(&self, token: &mut Token) -> bool {
159 let mut inner = self.inner.lock().unwrap();
160
161 // If there's a waiting sender, pair up with it.
162 if let Some(operation) = inner.senders.try_select() {
163 token.zero.0 = operation.packet;
164 true
165 } else if inner.is_disconnected {
166 token.zero.0 = ptr::null_mut();
167 true
168 } else {
169 false
170 }
171 }
172
173 /// Reads a message from the packet.
174 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
175 // If there is no packet, the channel is disconnected.
176 if token.zero.0.is_null() {
177 return Err(());
178 }
179
180 let packet = &*(token.zero.0 as *const Packet<T>);
181
182 if packet.on_stack {
183 // The message has been in the packet from the beginning, so there is no need to wait
184 // for it. However, after reading the message, we need to set `ready` to `true` in
185 // order to signal that the packet can be destroyed.
186 let msg = packet.msg.get().replace(None).unwrap();
187 packet.ready.store(true, Ordering::Release);
188 Ok(msg)
189 } else {
190 // Wait until the message becomes available, then read it and destroy the
191 // heap-allocated packet.
192 packet.wait_ready();
193 let msg = packet.msg.get().replace(None).unwrap();
194 drop(Box::from_raw(token.zero.0.cast::<Packet<T>>()));
195 Ok(msg)
196 }
197 }
198
199 /// Attempts to send a message into the channel.
200 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
201 let token = &mut Token::default();
202 let mut inner = self.inner.lock().unwrap();
203
204 // If there's a waiting receiver, pair up with it.
205 if let Some(operation) = inner.receivers.try_select() {
206 token.zero.0 = operation.packet;
207 drop(inner);
208 unsafe {
209 self.write(token, msg).ok().unwrap();
210 }
211 Ok(())
212 } else if inner.is_disconnected {
213 Err(TrySendError::Disconnected(msg))
214 } else {
215 Err(TrySendError::Full(msg))
216 }
217 }
218
219 /// Sends a message into the channel.
220 pub(crate) fn send(
221 &self,
222 msg: T,
223 deadline: Option<Instant>,
224 ) -> Result<(), SendTimeoutError<T>> {
225 let token = &mut Token::default();
226 let mut inner = self.inner.lock().unwrap();
227
228 // If there's a waiting receiver, pair up with it.
229 if let Some(operation) = inner.receivers.try_select() {
230 token.zero.0 = operation.packet;
231 drop(inner);
232 unsafe {
233 self.write(token, msg).ok().unwrap();
234 }
235 return Ok(());
236 }
237
238 if inner.is_disconnected {
239 return Err(SendTimeoutError::Disconnected(msg));
240 }
241
242 Context::with(|cx| {
243 // Prepare for blocking until a receiver wakes us up.
244 let oper = Operation::hook(token);
245 let mut packet = Packet::<T>::message_on_stack(msg);
246 inner
247 .senders
248 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
249 inner.receivers.notify();
250 drop(inner);
251
252 // Block the current thread.
253 let sel = cx.wait_until(deadline);
254
255 match sel {
256 Selected::Waiting => unreachable!(),
257 Selected::Aborted => {
258 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
259 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
260 Err(SendTimeoutError::Timeout(msg))
261 }
262 Selected::Disconnected => {
263 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
264 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
265 Err(SendTimeoutError::Disconnected(msg))
266 }
267 Selected::Operation(_) => {
268 // Wait until the message is read, then drop the packet.
269 packet.wait_ready();
270 Ok(())
271 }
272 }
273 })
274 }
275
276 /// Attempts to receive a message without blocking.
277 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
278 let token = &mut Token::default();
279 let mut inner = self.inner.lock().unwrap();
280
281 // If there's a waiting sender, pair up with it.
282 if let Some(operation) = inner.senders.try_select() {
283 token.zero.0 = operation.packet;
284 drop(inner);
285 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
286 } else if inner.is_disconnected {
287 Err(TryRecvError::Disconnected)
288 } else {
289 Err(TryRecvError::Empty)
290 }
291 }
292
293 /// Receives a message from the channel.
294 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
295 let token = &mut Token::default();
296 let mut inner = self.inner.lock().unwrap();
297
298 // If there's a waiting sender, pair up with it.
299 if let Some(operation) = inner.senders.try_select() {
300 token.zero.0 = operation.packet;
301 drop(inner);
302 unsafe {
303 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
304 }
305 }
306
307 if inner.is_disconnected {
308 return Err(RecvTimeoutError::Disconnected);
309 }
310
311 Context::with(|cx| {
312 // Prepare for blocking until a sender wakes us up.
313 let oper = Operation::hook(token);
314 let mut packet = Packet::<T>::empty_on_stack();
315 inner.receivers.register_with_packet(
316 oper,
317 &mut packet as *mut Packet<T> as *mut (),
318 cx,
319 );
320 inner.senders.notify();
321 drop(inner);
322
323 // Block the current thread.
324 let sel = cx.wait_until(deadline);
325
326 match sel {
327 Selected::Waiting => unreachable!(),
328 Selected::Aborted => {
329 self.inner
330 .lock()
331 .unwrap()
332 .receivers
333 .unregister(oper)
334 .unwrap();
335 Err(RecvTimeoutError::Timeout)
336 }
337 Selected::Disconnected => {
338 self.inner
339 .lock()
340 .unwrap()
341 .receivers
342 .unregister(oper)
343 .unwrap();
344 Err(RecvTimeoutError::Disconnected)
345 }
346 Selected::Operation(_) => {
347 // Wait until the message is provided, then read it.
348 packet.wait_ready();
349 unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
350 }
351 }
352 })
353 }
354
355 /// Disconnects the channel and wakes up all blocked senders and receivers.
356 ///
357 /// Returns `true` if this call disconnected the channel.
358 pub(crate) fn disconnect(&self) -> bool {
359 let mut inner = self.inner.lock().unwrap();
360
361 if !inner.is_disconnected {
362 inner.is_disconnected = true;
363 inner.senders.disconnect();
364 inner.receivers.disconnect();
365 true
366 } else {
367 false
368 }
369 }
370
371 /// Returns the current number of messages inside the channel.
372 pub(crate) fn len(&self) -> usize {
373 0
374 }
375
376 /// Returns the capacity of the channel.
377 pub(crate) fn capacity(&self) -> Option<usize> {
378 Some(0)
379 }
380
381 /// Returns `true` if the channel is empty.
382 pub(crate) fn is_empty(&self) -> bool {
383 true
384 }
385
386 /// Returns `true` if the channel is full.
387 pub(crate) fn is_full(&self) -> bool {
388 true
389 }
390}
391
392/// Receiver handle to a channel.
393pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
394
395/// Sender handle to a channel.
396pub(crate) struct Sender<'a, T>(&'a Channel<T>);
397
398impl<T> SelectHandle for Receiver<'_, T> {
399 fn try_select(&self, token: &mut Token) -> bool {
400 self.0.start_recv(token)
401 }
402
403 fn deadline(&self) -> Option<Instant> {
404 None
405 }
406
407 fn register(&self, oper: Operation, cx: &Context) -> bool {
408 let packet = Box::into_raw(Packet::<T>::empty_on_heap());
409
410 let mut inner = self.0.inner.lock().unwrap();
411 inner
412 .receivers
413 .register_with_packet(oper, packet.cast::<()>(), cx);
414 inner.senders.notify();
415 inner.senders.can_select() || inner.is_disconnected
416 }
417
418 fn unregister(&self, oper: Operation) {
419 if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
420 unsafe {
421 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
422 }
423 }
424 }
425
426 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
427 token.zero.0 = cx.wait_packet();
428 true
429 }
430
431 fn is_ready(&self) -> bool {
432 let inner = self.0.inner.lock().unwrap();
433 inner.senders.can_select() || inner.is_disconnected
434 }
435
436 fn watch(&self, oper: Operation, cx: &Context) -> bool {
437 let mut inner = self.0.inner.lock().unwrap();
438 inner.receivers.watch(oper, cx);
439 inner.senders.can_select() || inner.is_disconnected
440 }
441
442 fn unwatch(&self, oper: Operation) {
443 let mut inner = self.0.inner.lock().unwrap();
444 inner.receivers.unwatch(oper);
445 }
446}
447
448impl<T> SelectHandle for Sender<'_, T> {
449 fn try_select(&self, token: &mut Token) -> bool {
450 self.0.start_send(token)
451 }
452
453 fn deadline(&self) -> Option<Instant> {
454 None
455 }
456
457 fn register(&self, oper: Operation, cx: &Context) -> bool {
458 let packet = Box::into_raw(Packet::<T>::empty_on_heap());
459
460 let mut inner = self.0.inner.lock().unwrap();
461 inner
462 .senders
463 .register_with_packet(oper, packet.cast::<()>(), cx);
464 inner.receivers.notify();
465 inner.receivers.can_select() || inner.is_disconnected
466 }
467
468 fn unregister(&self, oper: Operation) {
469 if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
470 unsafe {
471 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
472 }
473 }
474 }
475
476 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
477 token.zero.0 = cx.wait_packet();
478 true
479 }
480
481 fn is_ready(&self) -> bool {
482 let inner = self.0.inner.lock().unwrap();
483 inner.receivers.can_select() || inner.is_disconnected
484 }
485
486 fn watch(&self, oper: Operation, cx: &Context) -> bool {
487 let mut inner = self.0.inner.lock().unwrap();
488 inner.senders.watch(oper, cx);
489 inner.receivers.can_select() || inner.is_disconnected
490 }
491
492 fn unwatch(&self, oper: Operation) {
493 let mut inner = self.0.inner.lock().unwrap();
494 inner.senders.unwatch(oper);
495 }
496}
497