1 | //! Zero-capacity channel. |
2 | //! |
3 | //! This kind of channel is also known as *rendezvous* channel. |
4 | |
5 | use std::cell::UnsafeCell; |
6 | use std::marker::PhantomData; |
7 | use std::sync::atomic::{AtomicBool, Ordering}; |
8 | use std::sync::Mutex; |
9 | use std::time::Instant; |
10 | use std::{fmt, ptr}; |
11 | |
12 | use crossbeam_utils::Backoff; |
13 | |
14 | use crate::context::Context; |
15 | use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; |
16 | use crate::select::{Operation, SelectHandle, Selected, Token}; |
17 | use crate::waker::Waker; |
18 | |
19 | /// A pointer to a packet. |
20 | pub(crate) struct ZeroToken(*mut ()); |
21 | |
22 | impl Default for ZeroToken { |
23 | fn default() -> Self { |
24 | Self(ptr::null_mut()) |
25 | } |
26 | } |
27 | |
28 | impl fmt::Debug for ZeroToken { |
29 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
30 | fmt::Debug::fmt(&(self.0 as usize), f) |
31 | } |
32 | } |
33 | |
34 | /// A slot for passing one message from a sender to a receiver. |
35 | struct Packet<T> { |
36 | /// Equals `true` if the packet is allocated on the stack. |
37 | on_stack: bool, |
38 | |
39 | /// Equals `true` once the packet is ready for reading or writing. |
40 | ready: AtomicBool, |
41 | |
42 | /// The message. |
43 | msg: UnsafeCell<Option<T>>, |
44 | } |
45 | |
46 | impl<T> Packet<T> { |
47 | /// Creates an empty packet on the stack. |
48 | fn empty_on_stack() -> Packet<T> { |
49 | Packet { |
50 | on_stack: true, |
51 | ready: AtomicBool::new(false), |
52 | msg: UnsafeCell::new(None), |
53 | } |
54 | } |
55 | |
56 | /// Creates an empty packet on the heap. |
57 | fn empty_on_heap() -> Box<Packet<T>> { |
58 | Box::new(Packet { |
59 | on_stack: false, |
60 | ready: AtomicBool::new(false), |
61 | msg: UnsafeCell::new(None), |
62 | }) |
63 | } |
64 | |
65 | /// Creates a packet on the stack, containing a message. |
66 | fn message_on_stack(msg: T) -> Packet<T> { |
67 | Packet { |
68 | on_stack: true, |
69 | ready: AtomicBool::new(false), |
70 | msg: UnsafeCell::new(Some(msg)), |
71 | } |
72 | } |
73 | |
74 | /// Waits until the packet becomes ready for reading or writing. |
75 | fn wait_ready(&self) { |
76 | let backoff = Backoff::new(); |
77 | while !self.ready.load(Ordering::Acquire) { |
78 | backoff.snooze(); |
79 | } |
80 | } |
81 | } |
82 | |
83 | /// Inner representation of a zero-capacity channel. |
84 | struct Inner { |
85 | /// Senders waiting to pair up with a receive operation. |
86 | senders: Waker, |
87 | |
88 | /// Receivers waiting to pair up with a send operation. |
89 | receivers: Waker, |
90 | |
91 | /// Equals `true` when the channel is disconnected. |
92 | is_disconnected: bool, |
93 | } |
94 | |
95 | /// Zero-capacity channel. |
96 | pub(crate) struct Channel<T> { |
97 | /// Inner representation of the channel. |
98 | inner: Mutex<Inner>, |
99 | |
100 | /// Indicates that dropping a `Channel<T>` may drop values of type `T`. |
101 | _marker: PhantomData<T>, |
102 | } |
103 | |
104 | impl<T> Channel<T> { |
105 | /// Constructs a new zero-capacity channel. |
106 | pub(crate) fn new() -> Self { |
107 | Channel { |
108 | inner: Mutex::new(Inner { |
109 | senders: Waker::new(), |
110 | receivers: Waker::new(), |
111 | is_disconnected: false, |
112 | }), |
113 | _marker: PhantomData, |
114 | } |
115 | } |
116 | |
117 | /// Returns a receiver handle to the channel. |
118 | pub(crate) fn receiver(&self) -> Receiver<'_, T> { |
119 | Receiver(self) |
120 | } |
121 | |
122 | /// Returns a sender handle to the channel. |
123 | pub(crate) fn sender(&self) -> Sender<'_, T> { |
124 | Sender(self) |
125 | } |
126 | |
127 | /// Attempts to reserve a slot for sending a message. |
128 | fn start_send(&self, token: &mut Token) -> bool { |
129 | let mut inner = self.inner.lock().unwrap(); |
130 | |
131 | // If there's a waiting receiver, pair up with it. |
132 | if let Some(operation) = inner.receivers.try_select() { |
133 | token.zero.0 = operation.packet; |
134 | true |
135 | } else if inner.is_disconnected { |
136 | token.zero.0 = ptr::null_mut(); |
137 | true |
138 | } else { |
139 | false |
140 | } |
141 | } |
142 | |
143 | /// Writes a message into the packet. |
144 | pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { |
145 | // If there is no packet, the channel is disconnected. |
146 | if token.zero.0.is_null() { |
147 | return Err(msg); |
148 | } |
149 | |
150 | let packet = &*(token.zero.0 as *const Packet<T>); |
151 | packet.msg.get().write(Some(msg)); |
152 | packet.ready.store(true, Ordering::Release); |
153 | Ok(()) |
154 | } |
155 | |
156 | /// Attempts to pair up with a sender. |
157 | fn start_recv(&self, token: &mut Token) -> bool { |
158 | let mut inner = self.inner.lock().unwrap(); |
159 | |
160 | // If there's a waiting sender, pair up with it. |
161 | if let Some(operation) = inner.senders.try_select() { |
162 | token.zero.0 = operation.packet; |
163 | true |
164 | } else if inner.is_disconnected { |
165 | token.zero.0 = ptr::null_mut(); |
166 | true |
167 | } else { |
168 | false |
169 | } |
170 | } |
171 | |
172 | /// Reads a message from the packet. |
173 | pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { |
174 | // If there is no packet, the channel is disconnected. |
175 | if token.zero.0.is_null() { |
176 | return Err(()); |
177 | } |
178 | |
179 | let packet = &*(token.zero.0 as *const Packet<T>); |
180 | |
181 | if packet.on_stack { |
182 | // The message has been in the packet from the beginning, so there is no need to wait |
183 | // for it. However, after reading the message, we need to set `ready` to `true` in |
184 | // order to signal that the packet can be destroyed. |
185 | let msg = packet.msg.get().replace(None).unwrap(); |
186 | packet.ready.store(true, Ordering::Release); |
187 | Ok(msg) |
188 | } else { |
189 | // Wait until the message becomes available, then read it and destroy the |
190 | // heap-allocated packet. |
191 | packet.wait_ready(); |
192 | let msg = packet.msg.get().replace(None).unwrap(); |
193 | drop(Box::from_raw(token.zero.0.cast::<Packet<T>>())); |
194 | Ok(msg) |
195 | } |
196 | } |
197 | |
198 | /// Attempts to send a message into the channel. |
199 | pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
200 | let token = &mut Token::default(); |
201 | let mut inner = self.inner.lock().unwrap(); |
202 | |
203 | // If there's a waiting receiver, pair up with it. |
204 | if let Some(operation) = inner.receivers.try_select() { |
205 | token.zero.0 = operation.packet; |
206 | drop(inner); |
207 | unsafe { |
208 | self.write(token, msg).ok().unwrap(); |
209 | } |
210 | Ok(()) |
211 | } else if inner.is_disconnected { |
212 | Err(TrySendError::Disconnected(msg)) |
213 | } else { |
214 | Err(TrySendError::Full(msg)) |
215 | } |
216 | } |
217 | |
218 | /// Sends a message into the channel. |
219 | pub(crate) fn send( |
220 | &self, |
221 | msg: T, |
222 | deadline: Option<Instant>, |
223 | ) -> Result<(), SendTimeoutError<T>> { |
224 | let token = &mut Token::default(); |
225 | let mut inner = self.inner.lock().unwrap(); |
226 | |
227 | // If there's a waiting receiver, pair up with it. |
228 | if let Some(operation) = inner.receivers.try_select() { |
229 | token.zero.0 = operation.packet; |
230 | drop(inner); |
231 | unsafe { |
232 | self.write(token, msg).ok().unwrap(); |
233 | } |
234 | return Ok(()); |
235 | } |
236 | |
237 | if inner.is_disconnected { |
238 | return Err(SendTimeoutError::Disconnected(msg)); |
239 | } |
240 | |
241 | Context::with(|cx| { |
242 | // Prepare for blocking until a receiver wakes us up. |
243 | let oper = Operation::hook(token); |
244 | let mut packet = Packet::<T>::message_on_stack(msg); |
245 | inner |
246 | .senders |
247 | .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx); |
248 | inner.receivers.notify(); |
249 | drop(inner); |
250 | |
251 | // Block the current thread. |
252 | let sel = cx.wait_until(deadline); |
253 | |
254 | match sel { |
255 | Selected::Waiting => unreachable!(), |
256 | Selected::Aborted => { |
257 | self.inner.lock().unwrap().senders.unregister(oper).unwrap(); |
258 | let msg = unsafe { packet.msg.get().replace(None).unwrap() }; |
259 | Err(SendTimeoutError::Timeout(msg)) |
260 | } |
261 | Selected::Disconnected => { |
262 | self.inner.lock().unwrap().senders.unregister(oper).unwrap(); |
263 | let msg = unsafe { packet.msg.get().replace(None).unwrap() }; |
264 | Err(SendTimeoutError::Disconnected(msg)) |
265 | } |
266 | Selected::Operation(_) => { |
267 | // Wait until the message is read, then drop the packet. |
268 | packet.wait_ready(); |
269 | Ok(()) |
270 | } |
271 | } |
272 | }) |
273 | } |
274 | |
275 | /// Attempts to receive a message without blocking. |
276 | pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { |
277 | let token = &mut Token::default(); |
278 | let mut inner = self.inner.lock().unwrap(); |
279 | |
280 | // If there's a waiting sender, pair up with it. |
281 | if let Some(operation) = inner.senders.try_select() { |
282 | token.zero.0 = operation.packet; |
283 | drop(inner); |
284 | unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } |
285 | } else if inner.is_disconnected { |
286 | Err(TryRecvError::Disconnected) |
287 | } else { |
288 | Err(TryRecvError::Empty) |
289 | } |
290 | } |
291 | |
292 | /// Receives a message from the channel. |
293 | pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { |
294 | let token = &mut Token::default(); |
295 | let mut inner = self.inner.lock().unwrap(); |
296 | |
297 | // If there's a waiting sender, pair up with it. |
298 | if let Some(operation) = inner.senders.try_select() { |
299 | token.zero.0 = operation.packet; |
300 | drop(inner); |
301 | unsafe { |
302 | return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); |
303 | } |
304 | } |
305 | |
306 | if inner.is_disconnected { |
307 | return Err(RecvTimeoutError::Disconnected); |
308 | } |
309 | |
310 | Context::with(|cx| { |
311 | // Prepare for blocking until a sender wakes us up. |
312 | let oper = Operation::hook(token); |
313 | let mut packet = Packet::<T>::empty_on_stack(); |
314 | inner.receivers.register_with_packet( |
315 | oper, |
316 | &mut packet as *mut Packet<T> as *mut (), |
317 | cx, |
318 | ); |
319 | inner.senders.notify(); |
320 | drop(inner); |
321 | |
322 | // Block the current thread. |
323 | let sel = cx.wait_until(deadline); |
324 | |
325 | match sel { |
326 | Selected::Waiting => unreachable!(), |
327 | Selected::Aborted => { |
328 | self.inner |
329 | .lock() |
330 | .unwrap() |
331 | .receivers |
332 | .unregister(oper) |
333 | .unwrap(); |
334 | Err(RecvTimeoutError::Timeout) |
335 | } |
336 | Selected::Disconnected => { |
337 | self.inner |
338 | .lock() |
339 | .unwrap() |
340 | .receivers |
341 | .unregister(oper) |
342 | .unwrap(); |
343 | Err(RecvTimeoutError::Disconnected) |
344 | } |
345 | Selected::Operation(_) => { |
346 | // Wait until the message is provided, then read it. |
347 | packet.wait_ready(); |
348 | unsafe { Ok(packet.msg.get().replace(None).unwrap()) } |
349 | } |
350 | } |
351 | }) |
352 | } |
353 | |
354 | /// Disconnects the channel and wakes up all blocked senders and receivers. |
355 | /// |
356 | /// Returns `true` if this call disconnected the channel. |
357 | pub(crate) fn disconnect(&self) -> bool { |
358 | let mut inner = self.inner.lock().unwrap(); |
359 | |
360 | if !inner.is_disconnected { |
361 | inner.is_disconnected = true; |
362 | inner.senders.disconnect(); |
363 | inner.receivers.disconnect(); |
364 | true |
365 | } else { |
366 | false |
367 | } |
368 | } |
369 | |
370 | /// Returns the current number of messages inside the channel. |
371 | pub(crate) fn len(&self) -> usize { |
372 | 0 |
373 | } |
374 | |
375 | /// Returns the capacity of the channel. |
376 | pub(crate) fn capacity(&self) -> Option<usize> { |
377 | Some(0) |
378 | } |
379 | |
380 | /// Returns `true` if the channel is empty. |
381 | pub(crate) fn is_empty(&self) -> bool { |
382 | true |
383 | } |
384 | |
385 | /// Returns `true` if the channel is full. |
386 | pub(crate) fn is_full(&self) -> bool { |
387 | true |
388 | } |
389 | } |
390 | |
391 | /// Receiver handle to a channel. |
392 | pub(crate) struct Receiver<'a, T>(&'a Channel<T>); |
393 | |
394 | /// Sender handle to a channel. |
395 | pub(crate) struct Sender<'a, T>(&'a Channel<T>); |
396 | |
397 | impl<T> SelectHandle for Receiver<'_, T> { |
398 | fn try_select(&self, token: &mut Token) -> bool { |
399 | self.0.start_recv(token) |
400 | } |
401 | |
402 | fn deadline(&self) -> Option<Instant> { |
403 | None |
404 | } |
405 | |
406 | fn register(&self, oper: Operation, cx: &Context) -> bool { |
407 | let packet = Box::into_raw(Packet::<T>::empty_on_heap()); |
408 | |
409 | let mut inner = self.0.inner.lock().unwrap(); |
410 | inner |
411 | .receivers |
412 | .register_with_packet(oper, packet.cast::<()>(), cx); |
413 | inner.senders.notify(); |
414 | inner.senders.can_select() || inner.is_disconnected |
415 | } |
416 | |
417 | fn unregister(&self, oper: Operation) { |
418 | if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) { |
419 | unsafe { |
420 | drop(Box::from_raw(operation.packet.cast::<Packet<T>>())); |
421 | } |
422 | } |
423 | } |
424 | |
425 | fn accept(&self, token: &mut Token, cx: &Context) -> bool { |
426 | token.zero.0 = cx.wait_packet(); |
427 | true |
428 | } |
429 | |
430 | fn is_ready(&self) -> bool { |
431 | let inner = self.0.inner.lock().unwrap(); |
432 | inner.senders.can_select() || inner.is_disconnected |
433 | } |
434 | |
435 | fn watch(&self, oper: Operation, cx: &Context) -> bool { |
436 | let mut inner = self.0.inner.lock().unwrap(); |
437 | inner.receivers.watch(oper, cx); |
438 | inner.senders.can_select() || inner.is_disconnected |
439 | } |
440 | |
441 | fn unwatch(&self, oper: Operation) { |
442 | let mut inner = self.0.inner.lock().unwrap(); |
443 | inner.receivers.unwatch(oper); |
444 | } |
445 | } |
446 | |
447 | impl<T> SelectHandle for Sender<'_, T> { |
448 | fn try_select(&self, token: &mut Token) -> bool { |
449 | self.0.start_send(token) |
450 | } |
451 | |
452 | fn deadline(&self) -> Option<Instant> { |
453 | None |
454 | } |
455 | |
456 | fn register(&self, oper: Operation, cx: &Context) -> bool { |
457 | let packet = Box::into_raw(Packet::<T>::empty_on_heap()); |
458 | |
459 | let mut inner = self.0.inner.lock().unwrap(); |
460 | inner |
461 | .senders |
462 | .register_with_packet(oper, packet.cast::<()>(), cx); |
463 | inner.receivers.notify(); |
464 | inner.receivers.can_select() || inner.is_disconnected |
465 | } |
466 | |
467 | fn unregister(&self, oper: Operation) { |
468 | if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) { |
469 | unsafe { |
470 | drop(Box::from_raw(operation.packet.cast::<Packet<T>>())); |
471 | } |
472 | } |
473 | } |
474 | |
475 | fn accept(&self, token: &mut Token, cx: &Context) -> bool { |
476 | token.zero.0 = cx.wait_packet(); |
477 | true |
478 | } |
479 | |
480 | fn is_ready(&self) -> bool { |
481 | let inner = self.0.inner.lock().unwrap(); |
482 | inner.receivers.can_select() || inner.is_disconnected |
483 | } |
484 | |
485 | fn watch(&self, oper: Operation, cx: &Context) -> bool { |
486 | let mut inner = self.0.inner.lock().unwrap(); |
487 | inner.senders.watch(oper, cx); |
488 | inner.receivers.can_select() || inner.is_disconnected |
489 | } |
490 | |
491 | fn unwatch(&self, oper: Operation) { |
492 | let mut inner = self.0.inner.lock().unwrap(); |
493 | inner.senders.unwatch(oper); |
494 | } |
495 | } |
496 | |