1//! Zero-capacity channel.
2//!
3//! This kind of channel is also known as *rendezvous* channel.
4
5use std::cell::UnsafeCell;
6use std::marker::PhantomData;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Mutex;
9use std::time::Instant;
10use std::{fmt, ptr};
11
12use crossbeam_utils::Backoff;
13
14use crate::context::Context;
15use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
16use crate::select::{Operation, SelectHandle, Selected, Token};
17use crate::waker::Waker;
18
19/// A pointer to a packet.
20pub(crate) struct ZeroToken(*mut ());
21
22impl Default for ZeroToken {
23 fn default() -> Self {
24 Self(ptr::null_mut())
25 }
26}
27
28impl fmt::Debug for ZeroToken {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 fmt::Debug::fmt(&(self.0 as usize), f)
31 }
32}
33
34/// A slot for passing one message from a sender to a receiver.
35struct Packet<T> {
36 /// Equals `true` if the packet is allocated on the stack.
37 on_stack: bool,
38
39 /// Equals `true` once the packet is ready for reading or writing.
40 ready: AtomicBool,
41
42 /// The message.
43 msg: UnsafeCell<Option<T>>,
44}
45
46impl<T> Packet<T> {
47 /// Creates an empty packet on the stack.
48 fn empty_on_stack() -> Packet<T> {
49 Packet {
50 on_stack: true,
51 ready: AtomicBool::new(false),
52 msg: UnsafeCell::new(None),
53 }
54 }
55
56 /// Creates an empty packet on the heap.
57 fn empty_on_heap() -> Box<Packet<T>> {
58 Box::new(Packet {
59 on_stack: false,
60 ready: AtomicBool::new(false),
61 msg: UnsafeCell::new(None),
62 })
63 }
64
65 /// Creates a packet on the stack, containing a message.
66 fn message_on_stack(msg: T) -> Packet<T> {
67 Packet {
68 on_stack: true,
69 ready: AtomicBool::new(false),
70 msg: UnsafeCell::new(Some(msg)),
71 }
72 }
73
74 /// Waits until the packet becomes ready for reading or writing.
75 fn wait_ready(&self) {
76 let backoff = Backoff::new();
77 while !self.ready.load(Ordering::Acquire) {
78 backoff.snooze();
79 }
80 }
81}
82
83/// Inner representation of a zero-capacity channel.
84struct Inner {
85 /// Senders waiting to pair up with a receive operation.
86 senders: Waker,
87
88 /// Receivers waiting to pair up with a send operation.
89 receivers: Waker,
90
91 /// Equals `true` when the channel is disconnected.
92 is_disconnected: bool,
93}
94
95/// Zero-capacity channel.
96pub(crate) struct Channel<T> {
97 /// Inner representation of the channel.
98 inner: Mutex<Inner>,
99
100 /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
101 _marker: PhantomData<T>,
102}
103
104impl<T> Channel<T> {
105 /// Constructs a new zero-capacity channel.
106 pub(crate) fn new() -> Self {
107 Channel {
108 inner: Mutex::new(Inner {
109 senders: Waker::new(),
110 receivers: Waker::new(),
111 is_disconnected: false,
112 }),
113 _marker: PhantomData,
114 }
115 }
116
117 /// Returns a receiver handle to the channel.
118 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
119 Receiver(self)
120 }
121
122 /// Returns a sender handle to the channel.
123 pub(crate) fn sender(&self) -> Sender<'_, T> {
124 Sender(self)
125 }
126
127 /// Attempts to reserve a slot for sending a message.
128 fn start_send(&self, token: &mut Token) -> bool {
129 let mut inner = self.inner.lock().unwrap();
130
131 // If there's a waiting receiver, pair up with it.
132 if let Some(operation) = inner.receivers.try_select() {
133 token.zero.0 = operation.packet;
134 true
135 } else if inner.is_disconnected {
136 token.zero.0 = ptr::null_mut();
137 true
138 } else {
139 false
140 }
141 }
142
143 /// Writes a message into the packet.
144 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
145 // If there is no packet, the channel is disconnected.
146 if token.zero.0.is_null() {
147 return Err(msg);
148 }
149
150 let packet = &*(token.zero.0 as *const Packet<T>);
151 packet.msg.get().write(Some(msg));
152 packet.ready.store(true, Ordering::Release);
153 Ok(())
154 }
155
156 /// Attempts to pair up with a sender.
157 fn start_recv(&self, token: &mut Token) -> bool {
158 let mut inner = self.inner.lock().unwrap();
159
160 // If there's a waiting sender, pair up with it.
161 if let Some(operation) = inner.senders.try_select() {
162 token.zero.0 = operation.packet;
163 true
164 } else if inner.is_disconnected {
165 token.zero.0 = ptr::null_mut();
166 true
167 } else {
168 false
169 }
170 }
171
172 /// Reads a message from the packet.
173 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
174 // If there is no packet, the channel is disconnected.
175 if token.zero.0.is_null() {
176 return Err(());
177 }
178
179 let packet = &*(token.zero.0 as *const Packet<T>);
180
181 if packet.on_stack {
182 // The message has been in the packet from the beginning, so there is no need to wait
183 // for it. However, after reading the message, we need to set `ready` to `true` in
184 // order to signal that the packet can be destroyed.
185 let msg = packet.msg.get().replace(None).unwrap();
186 packet.ready.store(true, Ordering::Release);
187 Ok(msg)
188 } else {
189 // Wait until the message becomes available, then read it and destroy the
190 // heap-allocated packet.
191 packet.wait_ready();
192 let msg = packet.msg.get().replace(None).unwrap();
193 drop(Box::from_raw(token.zero.0.cast::<Packet<T>>()));
194 Ok(msg)
195 }
196 }
197
198 /// Attempts to send a message into the channel.
199 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
200 let token = &mut Token::default();
201 let mut inner = self.inner.lock().unwrap();
202
203 // If there's a waiting receiver, pair up with it.
204 if let Some(operation) = inner.receivers.try_select() {
205 token.zero.0 = operation.packet;
206 drop(inner);
207 unsafe {
208 self.write(token, msg).ok().unwrap();
209 }
210 Ok(())
211 } else if inner.is_disconnected {
212 Err(TrySendError::Disconnected(msg))
213 } else {
214 Err(TrySendError::Full(msg))
215 }
216 }
217
218 /// Sends a message into the channel.
219 pub(crate) fn send(
220 &self,
221 msg: T,
222 deadline: Option<Instant>,
223 ) -> Result<(), SendTimeoutError<T>> {
224 let token = &mut Token::default();
225 let mut inner = self.inner.lock().unwrap();
226
227 // If there's a waiting receiver, pair up with it.
228 if let Some(operation) = inner.receivers.try_select() {
229 token.zero.0 = operation.packet;
230 drop(inner);
231 unsafe {
232 self.write(token, msg).ok().unwrap();
233 }
234 return Ok(());
235 }
236
237 if inner.is_disconnected {
238 return Err(SendTimeoutError::Disconnected(msg));
239 }
240
241 Context::with(|cx| {
242 // Prepare for blocking until a receiver wakes us up.
243 let oper = Operation::hook(token);
244 let mut packet = Packet::<T>::message_on_stack(msg);
245 inner
246 .senders
247 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
248 inner.receivers.notify();
249 drop(inner);
250
251 // Block the current thread.
252 let sel = cx.wait_until(deadline);
253
254 match sel {
255 Selected::Waiting => unreachable!(),
256 Selected::Aborted => {
257 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
258 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
259 Err(SendTimeoutError::Timeout(msg))
260 }
261 Selected::Disconnected => {
262 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
263 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
264 Err(SendTimeoutError::Disconnected(msg))
265 }
266 Selected::Operation(_) => {
267 // Wait until the message is read, then drop the packet.
268 packet.wait_ready();
269 Ok(())
270 }
271 }
272 })
273 }
274
275 /// Attempts to receive a message without blocking.
276 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
277 let token = &mut Token::default();
278 let mut inner = self.inner.lock().unwrap();
279
280 // If there's a waiting sender, pair up with it.
281 if let Some(operation) = inner.senders.try_select() {
282 token.zero.0 = operation.packet;
283 drop(inner);
284 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
285 } else if inner.is_disconnected {
286 Err(TryRecvError::Disconnected)
287 } else {
288 Err(TryRecvError::Empty)
289 }
290 }
291
292 /// Receives a message from the channel.
293 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
294 let token = &mut Token::default();
295 let mut inner = self.inner.lock().unwrap();
296
297 // If there's a waiting sender, pair up with it.
298 if let Some(operation) = inner.senders.try_select() {
299 token.zero.0 = operation.packet;
300 drop(inner);
301 unsafe {
302 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
303 }
304 }
305
306 if inner.is_disconnected {
307 return Err(RecvTimeoutError::Disconnected);
308 }
309
310 Context::with(|cx| {
311 // Prepare for blocking until a sender wakes us up.
312 let oper = Operation::hook(token);
313 let mut packet = Packet::<T>::empty_on_stack();
314 inner.receivers.register_with_packet(
315 oper,
316 &mut packet as *mut Packet<T> as *mut (),
317 cx,
318 );
319 inner.senders.notify();
320 drop(inner);
321
322 // Block the current thread.
323 let sel = cx.wait_until(deadline);
324
325 match sel {
326 Selected::Waiting => unreachable!(),
327 Selected::Aborted => {
328 self.inner
329 .lock()
330 .unwrap()
331 .receivers
332 .unregister(oper)
333 .unwrap();
334 Err(RecvTimeoutError::Timeout)
335 }
336 Selected::Disconnected => {
337 self.inner
338 .lock()
339 .unwrap()
340 .receivers
341 .unregister(oper)
342 .unwrap();
343 Err(RecvTimeoutError::Disconnected)
344 }
345 Selected::Operation(_) => {
346 // Wait until the message is provided, then read it.
347 packet.wait_ready();
348 unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
349 }
350 }
351 })
352 }
353
354 /// Disconnects the channel and wakes up all blocked senders and receivers.
355 ///
356 /// Returns `true` if this call disconnected the channel.
357 pub(crate) fn disconnect(&self) -> bool {
358 let mut inner = self.inner.lock().unwrap();
359
360 if !inner.is_disconnected {
361 inner.is_disconnected = true;
362 inner.senders.disconnect();
363 inner.receivers.disconnect();
364 true
365 } else {
366 false
367 }
368 }
369
370 /// Returns the current number of messages inside the channel.
371 pub(crate) fn len(&self) -> usize {
372 0
373 }
374
375 /// Returns the capacity of the channel.
376 pub(crate) fn capacity(&self) -> Option<usize> {
377 Some(0)
378 }
379
380 /// Returns `true` if the channel is empty.
381 pub(crate) fn is_empty(&self) -> bool {
382 true
383 }
384
385 /// Returns `true` if the channel is full.
386 pub(crate) fn is_full(&self) -> bool {
387 true
388 }
389}
390
391/// Receiver handle to a channel.
392pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
393
394/// Sender handle to a channel.
395pub(crate) struct Sender<'a, T>(&'a Channel<T>);
396
397impl<T> SelectHandle for Receiver<'_, T> {
398 fn try_select(&self, token: &mut Token) -> bool {
399 self.0.start_recv(token)
400 }
401
402 fn deadline(&self) -> Option<Instant> {
403 None
404 }
405
406 fn register(&self, oper: Operation, cx: &Context) -> bool {
407 let packet = Box::into_raw(Packet::<T>::empty_on_heap());
408
409 let mut inner = self.0.inner.lock().unwrap();
410 inner
411 .receivers
412 .register_with_packet(oper, packet.cast::<()>(), cx);
413 inner.senders.notify();
414 inner.senders.can_select() || inner.is_disconnected
415 }
416
417 fn unregister(&self, oper: Operation) {
418 if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
419 unsafe {
420 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
421 }
422 }
423 }
424
425 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
426 token.zero.0 = cx.wait_packet();
427 true
428 }
429
430 fn is_ready(&self) -> bool {
431 let inner = self.0.inner.lock().unwrap();
432 inner.senders.can_select() || inner.is_disconnected
433 }
434
435 fn watch(&self, oper: Operation, cx: &Context) -> bool {
436 let mut inner = self.0.inner.lock().unwrap();
437 inner.receivers.watch(oper, cx);
438 inner.senders.can_select() || inner.is_disconnected
439 }
440
441 fn unwatch(&self, oper: Operation) {
442 let mut inner = self.0.inner.lock().unwrap();
443 inner.receivers.unwatch(oper);
444 }
445}
446
447impl<T> SelectHandle for Sender<'_, T> {
448 fn try_select(&self, token: &mut Token) -> bool {
449 self.0.start_send(token)
450 }
451
452 fn deadline(&self) -> Option<Instant> {
453 None
454 }
455
456 fn register(&self, oper: Operation, cx: &Context) -> bool {
457 let packet = Box::into_raw(Packet::<T>::empty_on_heap());
458
459 let mut inner = self.0.inner.lock().unwrap();
460 inner
461 .senders
462 .register_with_packet(oper, packet.cast::<()>(), cx);
463 inner.receivers.notify();
464 inner.receivers.can_select() || inner.is_disconnected
465 }
466
467 fn unregister(&self, oper: Operation) {
468 if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
469 unsafe {
470 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
471 }
472 }
473 }
474
475 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
476 token.zero.0 = cx.wait_packet();
477 true
478 }
479
480 fn is_ready(&self) -> bool {
481 let inner = self.0.inner.lock().unwrap();
482 inner.receivers.can_select() || inner.is_disconnected
483 }
484
485 fn watch(&self, oper: Operation, cx: &Context) -> bool {
486 let mut inner = self.0.inner.lock().unwrap();
487 inner.senders.watch(oper, cx);
488 inner.receivers.can_select() || inner.is_disconnected
489 }
490
491 fn unwatch(&self, oper: Operation) {
492 let mut inner = self.0.inner.lock().unwrap();
493 inner.senders.unwatch(oper);
494 }
495}
496