1 | //! Bounded channel based on a preallocated array. |
2 | //! |
3 | //! This flavor has a fixed, positive capacity. |
4 | //! |
5 | //! The implementation is based on Dmitry Vyukov's bounded MPMC queue. |
6 | //! |
7 | //! Source: |
8 | //! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue> |
9 | //! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub> |
10 | |
11 | use super::context::Context; |
12 | use super::error::*; |
13 | use super::select::{Operation, Selected, Token}; |
14 | use super::utils::{Backoff, CachePadded}; |
15 | use super::waker::SyncWaker; |
16 | |
17 | use crate::cell::UnsafeCell; |
18 | use crate::mem::MaybeUninit; |
19 | use crate::ptr; |
20 | use crate::sync::atomic::{self, AtomicUsize, Ordering}; |
21 | use crate::time::Instant; |
22 | |
23 | /// A slot in a channel. |
24 | struct Slot<T> { |
25 | /// The current stamp. |
26 | stamp: AtomicUsize, |
27 | |
28 | /// The message in this slot. Either read out in `read` or dropped through |
29 | /// `discard_all_messages`. |
30 | msg: UnsafeCell<MaybeUninit<T>>, |
31 | } |
32 | |
33 | /// The token type for the array flavor. |
34 | #[derive (Debug)] |
35 | pub(crate) struct ArrayToken { |
36 | /// Slot to read from or write to. |
37 | slot: *const u8, |
38 | |
39 | /// Stamp to store into the slot after reading or writing. |
40 | stamp: usize, |
41 | } |
42 | |
43 | impl Default for ArrayToken { |
44 | #[inline ] |
45 | fn default() -> Self { |
46 | ArrayToken { slot: ptr::null(), stamp: 0 } |
47 | } |
48 | } |
49 | |
50 | /// Bounded channel based on a preallocated array. |
51 | pub(crate) struct Channel<T> { |
52 | /// The head of the channel. |
53 | /// |
54 | /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but |
55 | /// packed into a single `usize`. The lower bits represent the index, while the upper bits |
56 | /// represent the lap. The mark bit in the head is always zero. |
57 | /// |
58 | /// Messages are popped from the head of the channel. |
59 | head: CachePadded<AtomicUsize>, |
60 | |
61 | /// The tail of the channel. |
62 | /// |
63 | /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but |
64 | /// packed into a single `usize`. The lower bits represent the index, while the upper bits |
65 | /// represent the lap. The mark bit indicates that the channel is disconnected. |
66 | /// |
67 | /// Messages are pushed into the tail of the channel. |
68 | tail: CachePadded<AtomicUsize>, |
69 | |
70 | /// The buffer holding slots. |
71 | buffer: Box<[Slot<T>]>, |
72 | |
73 | /// The channel capacity. |
74 | cap: usize, |
75 | |
76 | /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. |
77 | one_lap: usize, |
78 | |
79 | /// If this bit is set in the tail, that means the channel is disconnected. |
80 | mark_bit: usize, |
81 | |
82 | /// Senders waiting while the channel is full. |
83 | senders: SyncWaker, |
84 | |
85 | /// Receivers waiting while the channel is empty and not disconnected. |
86 | receivers: SyncWaker, |
87 | } |
88 | |
89 | impl<T> Channel<T> { |
90 | /// Creates a bounded channel of capacity `cap`. |
91 | pub(crate) fn with_capacity(cap: usize) -> Self { |
92 | assert!(cap > 0, "capacity must be positive" ); |
93 | |
94 | // Compute constants `mark_bit` and `one_lap`. |
95 | let mark_bit = (cap + 1).next_power_of_two(); |
96 | let one_lap = mark_bit * 2; |
97 | |
98 | // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. |
99 | let head = 0; |
100 | // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. |
101 | let tail = 0; |
102 | |
103 | // Allocate a buffer of `cap` slots initialized |
104 | // with stamps. |
105 | let buffer: Box<[Slot<T>]> = (0..cap) |
106 | .map(|i| { |
107 | // Set the stamp to `{ lap: 0, mark: 0, index: i }`. |
108 | Slot { stamp: AtomicUsize::new(i), msg: UnsafeCell::new(MaybeUninit::uninit()) } |
109 | }) |
110 | .collect(); |
111 | |
112 | Channel { |
113 | buffer, |
114 | cap, |
115 | one_lap, |
116 | mark_bit, |
117 | head: CachePadded::new(AtomicUsize::new(head)), |
118 | tail: CachePadded::new(AtomicUsize::new(tail)), |
119 | senders: SyncWaker::new(), |
120 | receivers: SyncWaker::new(), |
121 | } |
122 | } |
123 | |
124 | /// Attempts to reserve a slot for sending a message. |
125 | fn start_send(&self, token: &mut Token) -> bool { |
126 | let backoff = Backoff::new(); |
127 | let mut tail = self.tail.load(Ordering::Relaxed); |
128 | |
129 | loop { |
130 | // Check if the channel is disconnected. |
131 | if tail & self.mark_bit != 0 { |
132 | token.array.slot = ptr::null(); |
133 | token.array.stamp = 0; |
134 | return true; |
135 | } |
136 | |
137 | // Deconstruct the tail. |
138 | let index = tail & (self.mark_bit - 1); |
139 | let lap = tail & !(self.one_lap - 1); |
140 | |
141 | // Inspect the corresponding slot. |
142 | debug_assert!(index < self.buffer.len()); |
143 | let slot = unsafe { self.buffer.get_unchecked(index) }; |
144 | let stamp = slot.stamp.load(Ordering::Acquire); |
145 | |
146 | // If the tail and the stamp match, we may attempt to push. |
147 | if tail == stamp { |
148 | let new_tail = if index + 1 < self.cap { |
149 | // Same lap, incremented index. |
150 | // Set to `{ lap: lap, mark: 0, index: index + 1 }`. |
151 | tail + 1 |
152 | } else { |
153 | // One lap forward, index wraps around to zero. |
154 | // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. |
155 | lap.wrapping_add(self.one_lap) |
156 | }; |
157 | |
158 | // Try moving the tail. |
159 | match self.tail.compare_exchange_weak( |
160 | tail, |
161 | new_tail, |
162 | Ordering::SeqCst, |
163 | Ordering::Relaxed, |
164 | ) { |
165 | Ok(_) => { |
166 | // Prepare the token for the follow-up call to `write`. |
167 | token.array.slot = slot as *const Slot<T> as *const u8; |
168 | token.array.stamp = tail + 1; |
169 | return true; |
170 | } |
171 | Err(_) => { |
172 | backoff.spin_light(); |
173 | tail = self.tail.load(Ordering::Relaxed); |
174 | } |
175 | } |
176 | } else if stamp.wrapping_add(self.one_lap) == tail + 1 { |
177 | atomic::fence(Ordering::SeqCst); |
178 | let head = self.head.load(Ordering::Relaxed); |
179 | |
180 | // If the head lags one lap behind the tail as well... |
181 | if head.wrapping_add(self.one_lap) == tail { |
182 | // ...then the channel is full. |
183 | return false; |
184 | } |
185 | |
186 | backoff.spin_light(); |
187 | tail = self.tail.load(Ordering::Relaxed); |
188 | } else { |
189 | // Snooze because we need to wait for the stamp to get updated. |
190 | backoff.spin_heavy(); |
191 | tail = self.tail.load(Ordering::Relaxed); |
192 | } |
193 | } |
194 | } |
195 | |
196 | /// Writes a message into the channel. |
197 | pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { |
198 | // If there is no slot, the channel is disconnected. |
199 | if token.array.slot.is_null() { |
200 | return Err(msg); |
201 | } |
202 | |
203 | let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); |
204 | |
205 | // Write the message into the slot and update the stamp. |
206 | slot.msg.get().write(MaybeUninit::new(msg)); |
207 | slot.stamp.store(token.array.stamp, Ordering::Release); |
208 | |
209 | // Wake a sleeping receiver. |
210 | self.receivers.notify(); |
211 | Ok(()) |
212 | } |
213 | |
214 | /// Attempts to reserve a slot for receiving a message. |
215 | fn start_recv(&self, token: &mut Token) -> bool { |
216 | let backoff = Backoff::new(); |
217 | let mut head = self.head.load(Ordering::Relaxed); |
218 | |
219 | loop { |
220 | // Deconstruct the head. |
221 | let index = head & (self.mark_bit - 1); |
222 | let lap = head & !(self.one_lap - 1); |
223 | |
224 | // Inspect the corresponding slot. |
225 | debug_assert!(index < self.buffer.len()); |
226 | let slot = unsafe { self.buffer.get_unchecked(index) }; |
227 | let stamp = slot.stamp.load(Ordering::Acquire); |
228 | |
229 | // If the stamp is ahead of the head by 1, we may attempt to pop. |
230 | if head + 1 == stamp { |
231 | let new = if index + 1 < self.cap { |
232 | // Same lap, incremented index. |
233 | // Set to `{ lap: lap, mark: 0, index: index + 1 }`. |
234 | head + 1 |
235 | } else { |
236 | // One lap forward, index wraps around to zero. |
237 | // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. |
238 | lap.wrapping_add(self.one_lap) |
239 | }; |
240 | |
241 | // Try moving the head. |
242 | match self.head.compare_exchange_weak( |
243 | head, |
244 | new, |
245 | Ordering::SeqCst, |
246 | Ordering::Relaxed, |
247 | ) { |
248 | Ok(_) => { |
249 | // Prepare the token for the follow-up call to `read`. |
250 | token.array.slot = slot as *const Slot<T> as *const u8; |
251 | token.array.stamp = head.wrapping_add(self.one_lap); |
252 | return true; |
253 | } |
254 | Err(_) => { |
255 | backoff.spin_light(); |
256 | head = self.head.load(Ordering::Relaxed); |
257 | } |
258 | } |
259 | } else if stamp == head { |
260 | atomic::fence(Ordering::SeqCst); |
261 | let tail = self.tail.load(Ordering::Relaxed); |
262 | |
263 | // If the tail equals the head, that means the channel is empty. |
264 | if (tail & !self.mark_bit) == head { |
265 | // If the channel is disconnected... |
266 | if tail & self.mark_bit != 0 { |
267 | // ...then receive an error. |
268 | token.array.slot = ptr::null(); |
269 | token.array.stamp = 0; |
270 | return true; |
271 | } else { |
272 | // Otherwise, the receive operation is not ready. |
273 | return false; |
274 | } |
275 | } |
276 | |
277 | backoff.spin_light(); |
278 | head = self.head.load(Ordering::Relaxed); |
279 | } else { |
280 | // Snooze because we need to wait for the stamp to get updated. |
281 | backoff.spin_heavy(); |
282 | head = self.head.load(Ordering::Relaxed); |
283 | } |
284 | } |
285 | } |
286 | |
287 | /// Reads a message from the channel. |
288 | pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { |
289 | if token.array.slot.is_null() { |
290 | // The channel is disconnected. |
291 | return Err(()); |
292 | } |
293 | |
294 | let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); |
295 | |
296 | // Read the message from the slot and update the stamp. |
297 | let msg = slot.msg.get().read().assume_init(); |
298 | slot.stamp.store(token.array.stamp, Ordering::Release); |
299 | |
300 | // Wake a sleeping sender. |
301 | self.senders.notify(); |
302 | Ok(msg) |
303 | } |
304 | |
305 | /// Attempts to send a message into the channel. |
306 | pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
307 | let token = &mut Token::default(); |
308 | if self.start_send(token) { |
309 | unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) } |
310 | } else { |
311 | Err(TrySendError::Full(msg)) |
312 | } |
313 | } |
314 | |
315 | /// Sends a message into the channel. |
316 | pub(crate) fn send( |
317 | &self, |
318 | msg: T, |
319 | deadline: Option<Instant>, |
320 | ) -> Result<(), SendTimeoutError<T>> { |
321 | let token = &mut Token::default(); |
322 | loop { |
323 | // Try sending a message. |
324 | if self.start_send(token) { |
325 | let res = unsafe { self.write(token, msg) }; |
326 | return res.map_err(SendTimeoutError::Disconnected); |
327 | } |
328 | |
329 | if let Some(d) = deadline { |
330 | if Instant::now() >= d { |
331 | return Err(SendTimeoutError::Timeout(msg)); |
332 | } |
333 | } |
334 | |
335 | Context::with(|cx| { |
336 | // Prepare for blocking until a receiver wakes us up. |
337 | let oper = Operation::hook(token); |
338 | self.senders.register(oper, cx); |
339 | |
340 | // Has the channel become ready just now? |
341 | if !self.is_full() || self.is_disconnected() { |
342 | let _ = cx.try_select(Selected::Aborted); |
343 | } |
344 | |
345 | // Block the current thread. |
346 | let sel = cx.wait_until(deadline); |
347 | |
348 | match sel { |
349 | Selected::Waiting => unreachable!(), |
350 | Selected::Aborted | Selected::Disconnected => { |
351 | self.senders.unregister(oper).unwrap(); |
352 | } |
353 | Selected::Operation(_) => {} |
354 | } |
355 | }); |
356 | } |
357 | } |
358 | |
359 | /// Attempts to receive a message without blocking. |
360 | pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { |
361 | let token = &mut Token::default(); |
362 | |
363 | if self.start_recv(token) { |
364 | unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } |
365 | } else { |
366 | Err(TryRecvError::Empty) |
367 | } |
368 | } |
369 | |
370 | /// Receives a message from the channel. |
371 | pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { |
372 | let token = &mut Token::default(); |
373 | loop { |
374 | // Try receiving a message. |
375 | if self.start_recv(token) { |
376 | let res = unsafe { self.read(token) }; |
377 | return res.map_err(|_| RecvTimeoutError::Disconnected); |
378 | } |
379 | |
380 | if let Some(d) = deadline { |
381 | if Instant::now() >= d { |
382 | return Err(RecvTimeoutError::Timeout); |
383 | } |
384 | } |
385 | |
386 | Context::with(|cx| { |
387 | // Prepare for blocking until a sender wakes us up. |
388 | let oper = Operation::hook(token); |
389 | self.receivers.register(oper, cx); |
390 | |
391 | // Has the channel become ready just now? |
392 | if !self.is_empty() || self.is_disconnected() { |
393 | let _ = cx.try_select(Selected::Aborted); |
394 | } |
395 | |
396 | // Block the current thread. |
397 | let sel = cx.wait_until(deadline); |
398 | |
399 | match sel { |
400 | Selected::Waiting => unreachable!(), |
401 | Selected::Aborted | Selected::Disconnected => { |
402 | self.receivers.unregister(oper).unwrap(); |
403 | // If the channel was disconnected, we still have to check for remaining |
404 | // messages. |
405 | } |
406 | Selected::Operation(_) => {} |
407 | } |
408 | }); |
409 | } |
410 | } |
411 | |
412 | /// Returns the current number of messages inside the channel. |
413 | pub(crate) fn len(&self) -> usize { |
414 | loop { |
415 | // Load the tail, then load the head. |
416 | let tail = self.tail.load(Ordering::SeqCst); |
417 | let head = self.head.load(Ordering::SeqCst); |
418 | |
419 | // If the tail didn't change, we've got consistent values to work with. |
420 | if self.tail.load(Ordering::SeqCst) == tail { |
421 | let hix = head & (self.mark_bit - 1); |
422 | let tix = tail & (self.mark_bit - 1); |
423 | |
424 | return if hix < tix { |
425 | tix - hix |
426 | } else if hix > tix { |
427 | self.cap - hix + tix |
428 | } else if (tail & !self.mark_bit) == head { |
429 | 0 |
430 | } else { |
431 | self.cap |
432 | }; |
433 | } |
434 | } |
435 | } |
436 | |
437 | /// Returns the capacity of the channel. |
438 | #[allow (clippy::unnecessary_wraps)] // This is intentional. |
439 | pub(crate) fn capacity(&self) -> Option<usize> { |
440 | Some(self.cap) |
441 | } |
442 | |
443 | /// Disconnects senders and wakes up all blocked receivers. |
444 | /// |
445 | /// Returns `true` if this call disconnected the channel. |
446 | pub(crate) fn disconnect_senders(&self) -> bool { |
447 | let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); |
448 | |
449 | if tail & self.mark_bit == 0 { |
450 | self.receivers.disconnect(); |
451 | true |
452 | } else { |
453 | false |
454 | } |
455 | } |
456 | |
457 | /// Disconnects receivers and wakes up all blocked senders. |
458 | /// |
459 | /// Returns `true` if this call disconnected the channel. |
460 | /// |
461 | /// # Safety |
462 | /// May only be called once upon dropping the last receiver. The |
463 | /// destruction of all other receivers must have been observed with acquire |
464 | /// ordering or stronger. |
465 | pub(crate) unsafe fn disconnect_receivers(&self) -> bool { |
466 | let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); |
467 | let disconnected = if tail & self.mark_bit == 0 { |
468 | self.senders.disconnect(); |
469 | true |
470 | } else { |
471 | false |
472 | }; |
473 | |
474 | self.discard_all_messages(tail); |
475 | disconnected |
476 | } |
477 | |
478 | /// Discards all messages. |
479 | /// |
480 | /// `tail` should be the current (and therefore last) value of `tail`. |
481 | /// |
482 | /// # Panicking |
483 | /// If a destructor panics, the remaining messages are leaked, matching the |
484 | /// behaviour of the unbounded channel. |
485 | /// |
486 | /// # Safety |
487 | /// This method must only be called when dropping the last receiver. The |
488 | /// destruction of all other receivers must have been observed with acquire |
489 | /// ordering or stronger. |
490 | unsafe fn discard_all_messages(&self, tail: usize) { |
491 | debug_assert!(self.is_disconnected()); |
492 | |
493 | // Only receivers modify `head`, so since we are the last one, |
494 | // this value will not change and will not be observed (since |
495 | // no new messages can be sent after disconnection). |
496 | let mut head = self.head.load(Ordering::Relaxed); |
497 | let tail = tail & !self.mark_bit; |
498 | |
499 | let backoff = Backoff::new(); |
500 | loop { |
501 | // Deconstruct the head. |
502 | let index = head & (self.mark_bit - 1); |
503 | let lap = head & !(self.one_lap - 1); |
504 | |
505 | // Inspect the corresponding slot. |
506 | debug_assert!(index < self.buffer.len()); |
507 | let slot = unsafe { self.buffer.get_unchecked(index) }; |
508 | let stamp = slot.stamp.load(Ordering::Acquire); |
509 | |
510 | // If the stamp is ahead of the head by 1, we may drop the message. |
511 | if head + 1 == stamp { |
512 | head = if index + 1 < self.cap { |
513 | // Same lap, incremented index. |
514 | // Set to `{ lap: lap, mark: 0, index: index + 1 }`. |
515 | head + 1 |
516 | } else { |
517 | // One lap forward, index wraps around to zero. |
518 | // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. |
519 | lap.wrapping_add(self.one_lap) |
520 | }; |
521 | |
522 | unsafe { |
523 | (*slot.msg.get()).assume_init_drop(); |
524 | } |
525 | // If the tail equals the head, that means the channel is empty. |
526 | } else if tail == head { |
527 | return; |
528 | // Otherwise, a sender is about to write into the slot, so we need |
529 | // to wait for it to update the stamp. |
530 | } else { |
531 | backoff.spin_heavy(); |
532 | } |
533 | } |
534 | } |
535 | |
536 | /// Returns `true` if the channel is disconnected. |
537 | pub(crate) fn is_disconnected(&self) -> bool { |
538 | self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 |
539 | } |
540 | |
541 | /// Returns `true` if the channel is empty. |
542 | pub(crate) fn is_empty(&self) -> bool { |
543 | let head = self.head.load(Ordering::SeqCst); |
544 | let tail = self.tail.load(Ordering::SeqCst); |
545 | |
546 | // Is the tail equal to the head? |
547 | // |
548 | // Note: If the head changes just before we load the tail, that means there was a moment |
549 | // when the channel was not empty, so it is safe to just return `false`. |
550 | (tail & !self.mark_bit) == head |
551 | } |
552 | |
553 | /// Returns `true` if the channel is full. |
554 | pub(crate) fn is_full(&self) -> bool { |
555 | let tail = self.tail.load(Ordering::SeqCst); |
556 | let head = self.head.load(Ordering::SeqCst); |
557 | |
558 | // Is the head lagging one lap behind tail? |
559 | // |
560 | // Note: If the tail changes just before we load the head, that means there was a moment |
561 | // when the channel was not full, so it is safe to just return `false`. |
562 | head.wrapping_add(self.one_lap) == tail & !self.mark_bit |
563 | } |
564 | } |
565 | |