1 | //! Zero-capacity channel. |
2 | //! |
3 | //! This kind of channel is also known as *rendezvous* channel. |
4 | |
5 | use std::boxed::Box; |
6 | use std::cell::UnsafeCell; |
7 | use std::marker::PhantomData; |
8 | use std::sync::atomic::{AtomicBool, Ordering}; |
9 | use std::sync::Mutex; |
10 | use std::time::Instant; |
11 | use std::{fmt, ptr}; |
12 | |
13 | use crossbeam_utils::Backoff; |
14 | |
15 | use crate::context::Context; |
16 | use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; |
17 | use crate::select::{Operation, SelectHandle, Selected, Token}; |
18 | use crate::waker::Waker; |
19 | |
20 | /// A pointer to a packet. |
21 | pub(crate) struct ZeroToken(*mut ()); |
22 | |
23 | impl Default for ZeroToken { |
24 | fn default() -> Self { |
25 | Self(ptr::null_mut()) |
26 | } |
27 | } |
28 | |
29 | impl fmt::Debug for ZeroToken { |
30 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
31 | fmt::Debug::fmt(&(self.0 as usize), f) |
32 | } |
33 | } |
34 | |
35 | /// A slot for passing one message from a sender to a receiver. |
36 | struct Packet<T> { |
37 | /// Equals `true` if the packet is allocated on the stack. |
38 | on_stack: bool, |
39 | |
40 | /// Equals `true` once the packet is ready for reading or writing. |
41 | ready: AtomicBool, |
42 | |
43 | /// The message. |
44 | msg: UnsafeCell<Option<T>>, |
45 | } |
46 | |
47 | impl<T> Packet<T> { |
48 | /// Creates an empty packet on the stack. |
49 | fn empty_on_stack() -> Packet<T> { |
50 | Packet { |
51 | on_stack: true, |
52 | ready: AtomicBool::new(false), |
53 | msg: UnsafeCell::new(None), |
54 | } |
55 | } |
56 | |
57 | /// Creates an empty packet on the heap. |
58 | fn empty_on_heap() -> Box<Packet<T>> { |
59 | Box::new(Packet { |
60 | on_stack: false, |
61 | ready: AtomicBool::new(false), |
62 | msg: UnsafeCell::new(None), |
63 | }) |
64 | } |
65 | |
66 | /// Creates a packet on the stack, containing a message. |
67 | fn message_on_stack(msg: T) -> Packet<T> { |
68 | Packet { |
69 | on_stack: true, |
70 | ready: AtomicBool::new(false), |
71 | msg: UnsafeCell::new(Some(msg)), |
72 | } |
73 | } |
74 | |
75 | /// Waits until the packet becomes ready for reading or writing. |
76 | fn wait_ready(&self) { |
77 | let backoff = Backoff::new(); |
78 | while !self.ready.load(Ordering::Acquire) { |
79 | backoff.snooze(); |
80 | } |
81 | } |
82 | } |
83 | |
84 | /// Inner representation of a zero-capacity channel. |
85 | struct Inner { |
86 | /// Senders waiting to pair up with a receive operation. |
87 | senders: Waker, |
88 | |
89 | /// Receivers waiting to pair up with a send operation. |
90 | receivers: Waker, |
91 | |
92 | /// Equals `true` when the channel is disconnected. |
93 | is_disconnected: bool, |
94 | } |
95 | |
96 | /// Zero-capacity channel. |
97 | pub(crate) struct Channel<T> { |
98 | /// Inner representation of the channel. |
99 | inner: Mutex<Inner>, |
100 | |
101 | /// Indicates that dropping a `Channel<T>` may drop values of type `T`. |
102 | _marker: PhantomData<T>, |
103 | } |
104 | |
105 | impl<T> Channel<T> { |
106 | /// Constructs a new zero-capacity channel. |
107 | pub(crate) fn new() -> Self { |
108 | Channel { |
109 | inner: Mutex::new(Inner { |
110 | senders: Waker::new(), |
111 | receivers: Waker::new(), |
112 | is_disconnected: false, |
113 | }), |
114 | _marker: PhantomData, |
115 | } |
116 | } |
117 | |
118 | /// Returns a receiver handle to the channel. |
119 | pub(crate) fn receiver(&self) -> Receiver<'_, T> { |
120 | Receiver(self) |
121 | } |
122 | |
123 | /// Returns a sender handle to the channel. |
124 | pub(crate) fn sender(&self) -> Sender<'_, T> { |
125 | Sender(self) |
126 | } |
127 | |
128 | /// Attempts to reserve a slot for sending a message. |
129 | fn start_send(&self, token: &mut Token) -> bool { |
130 | let mut inner = self.inner.lock().unwrap(); |
131 | |
132 | // If there's a waiting receiver, pair up with it. |
133 | if let Some(operation) = inner.receivers.try_select() { |
134 | token.zero.0 = operation.packet; |
135 | true |
136 | } else if inner.is_disconnected { |
137 | token.zero.0 = ptr::null_mut(); |
138 | true |
139 | } else { |
140 | false |
141 | } |
142 | } |
143 | |
144 | /// Writes a message into the packet. |
145 | pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { |
146 | // If there is no packet, the channel is disconnected. |
147 | if token.zero.0.is_null() { |
148 | return Err(msg); |
149 | } |
150 | |
151 | let packet = &*(token.zero.0 as *const Packet<T>); |
152 | packet.msg.get().write(Some(msg)); |
153 | packet.ready.store(true, Ordering::Release); |
154 | Ok(()) |
155 | } |
156 | |
157 | /// Attempts to pair up with a sender. |
158 | fn start_recv(&self, token: &mut Token) -> bool { |
159 | let mut inner = self.inner.lock().unwrap(); |
160 | |
161 | // If there's a waiting sender, pair up with it. |
162 | if let Some(operation) = inner.senders.try_select() { |
163 | token.zero.0 = operation.packet; |
164 | true |
165 | } else if inner.is_disconnected { |
166 | token.zero.0 = ptr::null_mut(); |
167 | true |
168 | } else { |
169 | false |
170 | } |
171 | } |
172 | |
173 | /// Reads a message from the packet. |
174 | pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { |
175 | // If there is no packet, the channel is disconnected. |
176 | if token.zero.0.is_null() { |
177 | return Err(()); |
178 | } |
179 | |
180 | let packet = &*(token.zero.0 as *const Packet<T>); |
181 | |
182 | if packet.on_stack { |
183 | // The message has been in the packet from the beginning, so there is no need to wait |
184 | // for it. However, after reading the message, we need to set `ready` to `true` in |
185 | // order to signal that the packet can be destroyed. |
186 | let msg = packet.msg.get().replace(None).unwrap(); |
187 | packet.ready.store(true, Ordering::Release); |
188 | Ok(msg) |
189 | } else { |
190 | // Wait until the message becomes available, then read it and destroy the |
191 | // heap-allocated packet. |
192 | packet.wait_ready(); |
193 | let msg = packet.msg.get().replace(None).unwrap(); |
194 | drop(Box::from_raw(token.zero.0.cast::<Packet<T>>())); |
195 | Ok(msg) |
196 | } |
197 | } |
198 | |
199 | /// Attempts to send a message into the channel. |
200 | pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
201 | let token = &mut Token::default(); |
202 | let mut inner = self.inner.lock().unwrap(); |
203 | |
204 | // If there's a waiting receiver, pair up with it. |
205 | if let Some(operation) = inner.receivers.try_select() { |
206 | token.zero.0 = operation.packet; |
207 | drop(inner); |
208 | unsafe { |
209 | self.write(token, msg).ok().unwrap(); |
210 | } |
211 | Ok(()) |
212 | } else if inner.is_disconnected { |
213 | Err(TrySendError::Disconnected(msg)) |
214 | } else { |
215 | Err(TrySendError::Full(msg)) |
216 | } |
217 | } |
218 | |
219 | /// Sends a message into the channel. |
220 | pub(crate) fn send( |
221 | &self, |
222 | msg: T, |
223 | deadline: Option<Instant>, |
224 | ) -> Result<(), SendTimeoutError<T>> { |
225 | let token = &mut Token::default(); |
226 | let mut inner = self.inner.lock().unwrap(); |
227 | |
228 | // If there's a waiting receiver, pair up with it. |
229 | if let Some(operation) = inner.receivers.try_select() { |
230 | token.zero.0 = operation.packet; |
231 | drop(inner); |
232 | unsafe { |
233 | self.write(token, msg).ok().unwrap(); |
234 | } |
235 | return Ok(()); |
236 | } |
237 | |
238 | if inner.is_disconnected { |
239 | return Err(SendTimeoutError::Disconnected(msg)); |
240 | } |
241 | |
242 | Context::with(|cx| { |
243 | // Prepare for blocking until a receiver wakes us up. |
244 | let oper = Operation::hook(token); |
245 | let mut packet = Packet::<T>::message_on_stack(msg); |
246 | inner |
247 | .senders |
248 | .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx); |
249 | inner.receivers.notify(); |
250 | drop(inner); |
251 | |
252 | // Block the current thread. |
253 | let sel = cx.wait_until(deadline); |
254 | |
255 | match sel { |
256 | Selected::Waiting => unreachable!(), |
257 | Selected::Aborted => { |
258 | self.inner.lock().unwrap().senders.unregister(oper).unwrap(); |
259 | let msg = unsafe { packet.msg.get().replace(None).unwrap() }; |
260 | Err(SendTimeoutError::Timeout(msg)) |
261 | } |
262 | Selected::Disconnected => { |
263 | self.inner.lock().unwrap().senders.unregister(oper).unwrap(); |
264 | let msg = unsafe { packet.msg.get().replace(None).unwrap() }; |
265 | Err(SendTimeoutError::Disconnected(msg)) |
266 | } |
267 | Selected::Operation(_) => { |
268 | // Wait until the message is read, then drop the packet. |
269 | packet.wait_ready(); |
270 | Ok(()) |
271 | } |
272 | } |
273 | }) |
274 | } |
275 | |
276 | /// Attempts to receive a message without blocking. |
277 | pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { |
278 | let token = &mut Token::default(); |
279 | let mut inner = self.inner.lock().unwrap(); |
280 | |
281 | // If there's a waiting sender, pair up with it. |
282 | if let Some(operation) = inner.senders.try_select() { |
283 | token.zero.0 = operation.packet; |
284 | drop(inner); |
285 | unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } |
286 | } else if inner.is_disconnected { |
287 | Err(TryRecvError::Disconnected) |
288 | } else { |
289 | Err(TryRecvError::Empty) |
290 | } |
291 | } |
292 | |
293 | /// Receives a message from the channel. |
294 | pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { |
295 | let token = &mut Token::default(); |
296 | let mut inner = self.inner.lock().unwrap(); |
297 | |
298 | // If there's a waiting sender, pair up with it. |
299 | if let Some(operation) = inner.senders.try_select() { |
300 | token.zero.0 = operation.packet; |
301 | drop(inner); |
302 | unsafe { |
303 | return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); |
304 | } |
305 | } |
306 | |
307 | if inner.is_disconnected { |
308 | return Err(RecvTimeoutError::Disconnected); |
309 | } |
310 | |
311 | Context::with(|cx| { |
312 | // Prepare for blocking until a sender wakes us up. |
313 | let oper = Operation::hook(token); |
314 | let mut packet = Packet::<T>::empty_on_stack(); |
315 | inner.receivers.register_with_packet( |
316 | oper, |
317 | &mut packet as *mut Packet<T> as *mut (), |
318 | cx, |
319 | ); |
320 | inner.senders.notify(); |
321 | drop(inner); |
322 | |
323 | // Block the current thread. |
324 | let sel = cx.wait_until(deadline); |
325 | |
326 | match sel { |
327 | Selected::Waiting => unreachable!(), |
328 | Selected::Aborted => { |
329 | self.inner |
330 | .lock() |
331 | .unwrap() |
332 | .receivers |
333 | .unregister(oper) |
334 | .unwrap(); |
335 | Err(RecvTimeoutError::Timeout) |
336 | } |
337 | Selected::Disconnected => { |
338 | self.inner |
339 | .lock() |
340 | .unwrap() |
341 | .receivers |
342 | .unregister(oper) |
343 | .unwrap(); |
344 | Err(RecvTimeoutError::Disconnected) |
345 | } |
346 | Selected::Operation(_) => { |
347 | // Wait until the message is provided, then read it. |
348 | packet.wait_ready(); |
349 | unsafe { Ok(packet.msg.get().replace(None).unwrap()) } |
350 | } |
351 | } |
352 | }) |
353 | } |
354 | |
355 | /// Disconnects the channel and wakes up all blocked senders and receivers. |
356 | /// |
357 | /// Returns `true` if this call disconnected the channel. |
358 | pub(crate) fn disconnect(&self) -> bool { |
359 | let mut inner = self.inner.lock().unwrap(); |
360 | |
361 | if !inner.is_disconnected { |
362 | inner.is_disconnected = true; |
363 | inner.senders.disconnect(); |
364 | inner.receivers.disconnect(); |
365 | true |
366 | } else { |
367 | false |
368 | } |
369 | } |
370 | |
371 | /// Returns the current number of messages inside the channel. |
372 | pub(crate) fn len(&self) -> usize { |
373 | 0 |
374 | } |
375 | |
376 | /// Returns the capacity of the channel. |
377 | pub(crate) fn capacity(&self) -> Option<usize> { |
378 | Some(0) |
379 | } |
380 | |
381 | /// Returns `true` if the channel is empty. |
382 | pub(crate) fn is_empty(&self) -> bool { |
383 | true |
384 | } |
385 | |
386 | /// Returns `true` if the channel is full. |
387 | pub(crate) fn is_full(&self) -> bool { |
388 | true |
389 | } |
390 | } |
391 | |
392 | /// Receiver handle to a channel. |
393 | pub(crate) struct Receiver<'a, T>(&'a Channel<T>); |
394 | |
395 | /// Sender handle to a channel. |
396 | pub(crate) struct Sender<'a, T>(&'a Channel<T>); |
397 | |
398 | impl<T> SelectHandle for Receiver<'_, T> { |
399 | fn try_select(&self, token: &mut Token) -> bool { |
400 | self.0.start_recv(token) |
401 | } |
402 | |
403 | fn deadline(&self) -> Option<Instant> { |
404 | None |
405 | } |
406 | |
407 | fn register(&self, oper: Operation, cx: &Context) -> bool { |
408 | let packet = Box::into_raw(Packet::<T>::empty_on_heap()); |
409 | |
410 | let mut inner = self.0.inner.lock().unwrap(); |
411 | inner |
412 | .receivers |
413 | .register_with_packet(oper, packet.cast::<()>(), cx); |
414 | inner.senders.notify(); |
415 | inner.senders.can_select() || inner.is_disconnected |
416 | } |
417 | |
418 | fn unregister(&self, oper: Operation) { |
419 | if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) { |
420 | unsafe { |
421 | drop(Box::from_raw(operation.packet.cast::<Packet<T>>())); |
422 | } |
423 | } |
424 | } |
425 | |
426 | fn accept(&self, token: &mut Token, cx: &Context) -> bool { |
427 | token.zero.0 = cx.wait_packet(); |
428 | true |
429 | } |
430 | |
431 | fn is_ready(&self) -> bool { |
432 | let inner = self.0.inner.lock().unwrap(); |
433 | inner.senders.can_select() || inner.is_disconnected |
434 | } |
435 | |
436 | fn watch(&self, oper: Operation, cx: &Context) -> bool { |
437 | let mut inner = self.0.inner.lock().unwrap(); |
438 | inner.receivers.watch(oper, cx); |
439 | inner.senders.can_select() || inner.is_disconnected |
440 | } |
441 | |
442 | fn unwatch(&self, oper: Operation) { |
443 | let mut inner = self.0.inner.lock().unwrap(); |
444 | inner.receivers.unwatch(oper); |
445 | } |
446 | } |
447 | |
448 | impl<T> SelectHandle for Sender<'_, T> { |
449 | fn try_select(&self, token: &mut Token) -> bool { |
450 | self.0.start_send(token) |
451 | } |
452 | |
453 | fn deadline(&self) -> Option<Instant> { |
454 | None |
455 | } |
456 | |
457 | fn register(&self, oper: Operation, cx: &Context) -> bool { |
458 | let packet = Box::into_raw(Packet::<T>::empty_on_heap()); |
459 | |
460 | let mut inner = self.0.inner.lock().unwrap(); |
461 | inner |
462 | .senders |
463 | .register_with_packet(oper, packet.cast::<()>(), cx); |
464 | inner.receivers.notify(); |
465 | inner.receivers.can_select() || inner.is_disconnected |
466 | } |
467 | |
468 | fn unregister(&self, oper: Operation) { |
469 | if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) { |
470 | unsafe { |
471 | drop(Box::from_raw(operation.packet.cast::<Packet<T>>())); |
472 | } |
473 | } |
474 | } |
475 | |
476 | fn accept(&self, token: &mut Token, cx: &Context) -> bool { |
477 | token.zero.0 = cx.wait_packet(); |
478 | true |
479 | } |
480 | |
481 | fn is_ready(&self) -> bool { |
482 | let inner = self.0.inner.lock().unwrap(); |
483 | inner.receivers.can_select() || inner.is_disconnected |
484 | } |
485 | |
486 | fn watch(&self, oper: Operation, cx: &Context) -> bool { |
487 | let mut inner = self.0.inner.lock().unwrap(); |
488 | inner.senders.watch(oper, cx); |
489 | inner.receivers.can_select() || inner.is_disconnected |
490 | } |
491 | |
492 | fn unwatch(&self, oper: Operation) { |
493 | let mut inner = self.0.inner.lock().unwrap(); |
494 | inner.senders.unwatch(oper); |
495 | } |
496 | } |
497 | |