1 | //! Unbounded channel implemented as a linked list. |
2 | |
3 | use super::context::Context; |
4 | use super::error::*; |
5 | use super::select::{Operation, Selected, Token}; |
6 | use super::utils::{Backoff, CachePadded}; |
7 | use super::waker::SyncWaker; |
8 | |
9 | use crate::cell::UnsafeCell; |
10 | use crate::marker::PhantomData; |
11 | use crate::mem::MaybeUninit; |
12 | use crate::ptr; |
13 | use crate::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; |
14 | use crate::time::Instant; |
15 | |
16 | // Bits indicating the state of a slot: |
17 | // * If a message has been written into the slot, `WRITE` is set. |
18 | // * If a message has been read from the slot, `READ` is set. |
19 | // * If the block is being destroyed, `DESTROY` is set. |
20 | const WRITE: usize = 1; |
21 | const READ: usize = 2; |
22 | const DESTROY: usize = 4; |
23 | |
24 | // Each block covers one "lap" of indices. |
25 | const LAP: usize = 32; |
26 | // The maximum number of messages a block can hold. |
27 | const BLOCK_CAP: usize = LAP - 1; |
28 | // How many lower bits are reserved for metadata. |
29 | const SHIFT: usize = 1; |
30 | // Has two different purposes: |
31 | // * If set in head, indicates that the block is not the last one. |
32 | // * If set in tail, indicates that the channel is disconnected. |
33 | const MARK_BIT: usize = 1; |
34 | |
35 | /// A slot in a block. |
36 | struct Slot<T> { |
37 | /// The message. |
38 | msg: UnsafeCell<MaybeUninit<T>>, |
39 | |
40 | /// The state of the slot. |
41 | state: AtomicUsize, |
42 | } |
43 | |
44 | impl<T> Slot<T> { |
45 | /// Waits until a message is written into the slot. |
46 | fn wait_write(&self) { |
47 | let backoff: Backoff = Backoff::new(); |
48 | while self.state.load(order:Ordering::Acquire) & WRITE == 0 { |
49 | backoff.spin_heavy(); |
50 | } |
51 | } |
52 | } |
53 | |
54 | /// A block in a linked list. |
55 | /// |
56 | /// Each block in the list can hold up to `BLOCK_CAP` messages. |
57 | struct Block<T> { |
58 | /// The next block in the linked list. |
59 | next: AtomicPtr<Block<T>>, |
60 | |
61 | /// Slots for messages. |
62 | slots: [Slot<T>; BLOCK_CAP], |
63 | } |
64 | |
65 | impl<T> Block<T> { |
66 | /// Creates an empty block. |
67 | fn new() -> Block<T> { |
68 | // SAFETY: This is safe because: |
69 | // [1] `Block::next` (AtomicPtr) may be safely zero initialized. |
70 | // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. |
71 | // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it |
72 | // holds a MaybeUninit. |
73 | // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. |
74 | unsafe { MaybeUninit::zeroed().assume_init() } |
75 | } |
76 | |
77 | /// Waits until the next pointer is set. |
78 | fn wait_next(&self) -> *mut Block<T> { |
79 | let backoff = Backoff::new(); |
80 | loop { |
81 | let next = self.next.load(Ordering::Acquire); |
82 | if !next.is_null() { |
83 | return next; |
84 | } |
85 | backoff.spin_heavy(); |
86 | } |
87 | } |
88 | |
89 | /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. |
90 | unsafe fn destroy(this: *mut Block<T>, start: usize) { |
91 | // It is not necessary to set the `DESTROY` bit in the last slot because that slot has |
92 | // begun destruction of the block. |
93 | for i in start..BLOCK_CAP - 1 { |
94 | let slot = (*this).slots.get_unchecked(i); |
95 | |
96 | // Mark the `DESTROY` bit if a thread is still using the slot. |
97 | if slot.state.load(Ordering::Acquire) & READ == 0 |
98 | && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 |
99 | { |
100 | // If a thread is still using the slot, it will continue destruction of the block. |
101 | return; |
102 | } |
103 | } |
104 | |
105 | // No thread is using the block, now it is safe to destroy it. |
106 | drop(Box::from_raw(this)); |
107 | } |
108 | } |
109 | |
110 | /// A position in a channel. |
111 | #[derive (Debug)] |
112 | struct Position<T> { |
113 | /// The index in the channel. |
114 | index: AtomicUsize, |
115 | |
116 | /// The block in the linked list. |
117 | block: AtomicPtr<Block<T>>, |
118 | } |
119 | |
120 | /// The token type for the list flavor. |
121 | #[derive (Debug)] |
122 | pub(crate) struct ListToken { |
123 | /// The block of slots. |
124 | block: *const u8, |
125 | |
126 | /// The offset into the block. |
127 | offset: usize, |
128 | } |
129 | |
130 | impl Default for ListToken { |
131 | #[inline ] |
132 | fn default() -> Self { |
133 | ListToken { block: ptr::null(), offset: 0 } |
134 | } |
135 | } |
136 | |
137 | /// Unbounded channel implemented as a linked list. |
138 | /// |
139 | /// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are |
140 | /// represented as numbers of type `usize` and wrap on overflow. |
141 | /// |
142 | /// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and |
143 | /// improve cache efficiency. |
144 | pub(crate) struct Channel<T> { |
145 | /// The head of the channel. |
146 | head: CachePadded<Position<T>>, |
147 | |
148 | /// The tail of the channel. |
149 | tail: CachePadded<Position<T>>, |
150 | |
151 | /// Receivers waiting while the channel is empty and not disconnected. |
152 | receivers: SyncWaker, |
153 | |
154 | /// Indicates that dropping a `Channel<T>` may drop messages of type `T`. |
155 | _marker: PhantomData<T>, |
156 | } |
157 | |
158 | impl<T> Channel<T> { |
159 | /// Creates a new unbounded channel. |
160 | pub(crate) fn new() -> Self { |
161 | Channel { |
162 | head: CachePadded::new(Position { |
163 | block: AtomicPtr::new(ptr::null_mut()), |
164 | index: AtomicUsize::new(0), |
165 | }), |
166 | tail: CachePadded::new(Position { |
167 | block: AtomicPtr::new(ptr::null_mut()), |
168 | index: AtomicUsize::new(0), |
169 | }), |
170 | receivers: SyncWaker::new(), |
171 | _marker: PhantomData, |
172 | } |
173 | } |
174 | |
175 | /// Attempts to reserve a slot for sending a message. |
176 | fn start_send(&self, token: &mut Token) -> bool { |
177 | let backoff = Backoff::new(); |
178 | let mut tail = self.tail.index.load(Ordering::Acquire); |
179 | let mut block = self.tail.block.load(Ordering::Acquire); |
180 | let mut next_block = None; |
181 | |
182 | loop { |
183 | // Check if the channel is disconnected. |
184 | if tail & MARK_BIT != 0 { |
185 | token.list.block = ptr::null(); |
186 | return true; |
187 | } |
188 | |
189 | // Calculate the offset of the index into the block. |
190 | let offset = (tail >> SHIFT) % LAP; |
191 | |
192 | // If we reached the end of the block, wait until the next one is installed. |
193 | if offset == BLOCK_CAP { |
194 | backoff.spin_heavy(); |
195 | tail = self.tail.index.load(Ordering::Acquire); |
196 | block = self.tail.block.load(Ordering::Acquire); |
197 | continue; |
198 | } |
199 | |
200 | // If we're going to have to install the next block, allocate it in advance in order to |
201 | // make the wait for other threads as short as possible. |
202 | if offset + 1 == BLOCK_CAP && next_block.is_none() { |
203 | next_block = Some(Box::new(Block::<T>::new())); |
204 | } |
205 | |
206 | // If this is the first message to be sent into the channel, we need to allocate the |
207 | // first block and install it. |
208 | if block.is_null() { |
209 | let new = Box::into_raw(Box::new(Block::<T>::new())); |
210 | |
211 | if self |
212 | .tail |
213 | .block |
214 | .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) |
215 | .is_ok() |
216 | { |
217 | self.head.block.store(new, Ordering::Release); |
218 | block = new; |
219 | } else { |
220 | next_block = unsafe { Some(Box::from_raw(new)) }; |
221 | tail = self.tail.index.load(Ordering::Acquire); |
222 | block = self.tail.block.load(Ordering::Acquire); |
223 | continue; |
224 | } |
225 | } |
226 | |
227 | let new_tail = tail + (1 << SHIFT); |
228 | |
229 | // Try advancing the tail forward. |
230 | match self.tail.index.compare_exchange_weak( |
231 | tail, |
232 | new_tail, |
233 | Ordering::SeqCst, |
234 | Ordering::Acquire, |
235 | ) { |
236 | Ok(_) => unsafe { |
237 | // If we've reached the end of the block, install the next one. |
238 | if offset + 1 == BLOCK_CAP { |
239 | let next_block = Box::into_raw(next_block.unwrap()); |
240 | self.tail.block.store(next_block, Ordering::Release); |
241 | self.tail.index.fetch_add(1 << SHIFT, Ordering::Release); |
242 | (*block).next.store(next_block, Ordering::Release); |
243 | } |
244 | |
245 | token.list.block = block as *const u8; |
246 | token.list.offset = offset; |
247 | return true; |
248 | }, |
249 | Err(_) => { |
250 | backoff.spin_light(); |
251 | tail = self.tail.index.load(Ordering::Acquire); |
252 | block = self.tail.block.load(Ordering::Acquire); |
253 | } |
254 | } |
255 | } |
256 | } |
257 | |
258 | /// Writes a message into the channel. |
259 | pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { |
260 | // If there is no slot, the channel is disconnected. |
261 | if token.list.block.is_null() { |
262 | return Err(msg); |
263 | } |
264 | |
265 | // Write the message into the slot. |
266 | let block = token.list.block as *mut Block<T>; |
267 | let offset = token.list.offset; |
268 | let slot = (*block).slots.get_unchecked(offset); |
269 | slot.msg.get().write(MaybeUninit::new(msg)); |
270 | slot.state.fetch_or(WRITE, Ordering::Release); |
271 | |
272 | // Wake a sleeping receiver. |
273 | self.receivers.notify(); |
274 | Ok(()) |
275 | } |
276 | |
277 | /// Attempts to reserve a slot for receiving a message. |
278 | fn start_recv(&self, token: &mut Token) -> bool { |
279 | let backoff = Backoff::new(); |
280 | let mut head = self.head.index.load(Ordering::Acquire); |
281 | let mut block = self.head.block.load(Ordering::Acquire); |
282 | |
283 | loop { |
284 | // Calculate the offset of the index into the block. |
285 | let offset = (head >> SHIFT) % LAP; |
286 | |
287 | // If we reached the end of the block, wait until the next one is installed. |
288 | if offset == BLOCK_CAP { |
289 | backoff.spin_heavy(); |
290 | head = self.head.index.load(Ordering::Acquire); |
291 | block = self.head.block.load(Ordering::Acquire); |
292 | continue; |
293 | } |
294 | |
295 | let mut new_head = head + (1 << SHIFT); |
296 | |
297 | if new_head & MARK_BIT == 0 { |
298 | atomic::fence(Ordering::SeqCst); |
299 | let tail = self.tail.index.load(Ordering::Relaxed); |
300 | |
301 | // If the tail equals the head, that means the channel is empty. |
302 | if head >> SHIFT == tail >> SHIFT { |
303 | // If the channel is disconnected... |
304 | if tail & MARK_BIT != 0 { |
305 | // ...then receive an error. |
306 | token.list.block = ptr::null(); |
307 | return true; |
308 | } else { |
309 | // Otherwise, the receive operation is not ready. |
310 | return false; |
311 | } |
312 | } |
313 | |
314 | // If head and tail are not in the same block, set `MARK_BIT` in head. |
315 | if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { |
316 | new_head |= MARK_BIT; |
317 | } |
318 | } |
319 | |
320 | // The block can be null here only if the first message is being sent into the channel. |
321 | // In that case, just wait until it gets initialized. |
322 | if block.is_null() { |
323 | backoff.spin_heavy(); |
324 | head = self.head.index.load(Ordering::Acquire); |
325 | block = self.head.block.load(Ordering::Acquire); |
326 | continue; |
327 | } |
328 | |
329 | // Try moving the head index forward. |
330 | match self.head.index.compare_exchange_weak( |
331 | head, |
332 | new_head, |
333 | Ordering::SeqCst, |
334 | Ordering::Acquire, |
335 | ) { |
336 | Ok(_) => unsafe { |
337 | // If we've reached the end of the block, move to the next one. |
338 | if offset + 1 == BLOCK_CAP { |
339 | let next = (*block).wait_next(); |
340 | let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); |
341 | if !(*next).next.load(Ordering::Relaxed).is_null() { |
342 | next_index |= MARK_BIT; |
343 | } |
344 | |
345 | self.head.block.store(next, Ordering::Release); |
346 | self.head.index.store(next_index, Ordering::Release); |
347 | } |
348 | |
349 | token.list.block = block as *const u8; |
350 | token.list.offset = offset; |
351 | return true; |
352 | }, |
353 | Err(_) => { |
354 | backoff.spin_light(); |
355 | head = self.head.index.load(Ordering::Acquire); |
356 | block = self.head.block.load(Ordering::Acquire); |
357 | } |
358 | } |
359 | } |
360 | } |
361 | |
362 | /// Reads a message from the channel. |
363 | pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { |
364 | if token.list.block.is_null() { |
365 | // The channel is disconnected. |
366 | return Err(()); |
367 | } |
368 | |
369 | // Read the message. |
370 | let block = token.list.block as *mut Block<T>; |
371 | let offset = token.list.offset; |
372 | let slot = (*block).slots.get_unchecked(offset); |
373 | slot.wait_write(); |
374 | let msg = slot.msg.get().read().assume_init(); |
375 | |
376 | // Destroy the block if we've reached the end, or if another thread wanted to destroy but |
377 | // couldn't because we were busy reading from the slot. |
378 | if offset + 1 == BLOCK_CAP { |
379 | Block::destroy(block, 0); |
380 | } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { |
381 | Block::destroy(block, offset + 1); |
382 | } |
383 | |
384 | Ok(msg) |
385 | } |
386 | |
387 | /// Attempts to send a message into the channel. |
388 | pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
389 | self.send(msg, None).map_err(|err| match err { |
390 | SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), |
391 | SendTimeoutError::Timeout(_) => unreachable!(), |
392 | }) |
393 | } |
394 | |
395 | /// Sends a message into the channel. |
396 | pub(crate) fn send( |
397 | &self, |
398 | msg: T, |
399 | _deadline: Option<Instant>, |
400 | ) -> Result<(), SendTimeoutError<T>> { |
401 | let token = &mut Token::default(); |
402 | assert!(self.start_send(token)); |
403 | unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) } |
404 | } |
405 | |
406 | /// Attempts to receive a message without blocking. |
407 | pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { |
408 | let token = &mut Token::default(); |
409 | |
410 | if self.start_recv(token) { |
411 | unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } |
412 | } else { |
413 | Err(TryRecvError::Empty) |
414 | } |
415 | } |
416 | |
417 | /// Receives a message from the channel. |
418 | pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { |
419 | let token = &mut Token::default(); |
420 | loop { |
421 | if self.start_recv(token) { |
422 | unsafe { |
423 | return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); |
424 | } |
425 | } |
426 | |
427 | if let Some(d) = deadline { |
428 | if Instant::now() >= d { |
429 | return Err(RecvTimeoutError::Timeout); |
430 | } |
431 | } |
432 | |
433 | // Prepare for blocking until a sender wakes us up. |
434 | Context::with(|cx| { |
435 | let oper = Operation::hook(token); |
436 | self.receivers.register(oper, cx); |
437 | |
438 | // Has the channel become ready just now? |
439 | if !self.is_empty() || self.is_disconnected() { |
440 | let _ = cx.try_select(Selected::Aborted); |
441 | } |
442 | |
443 | // Block the current thread. |
444 | let sel = cx.wait_until(deadline); |
445 | |
446 | match sel { |
447 | Selected::Waiting => unreachable!(), |
448 | Selected::Aborted | Selected::Disconnected => { |
449 | self.receivers.unregister(oper).unwrap(); |
450 | // If the channel was disconnected, we still have to check for remaining |
451 | // messages. |
452 | } |
453 | Selected::Operation(_) => {} |
454 | } |
455 | }); |
456 | } |
457 | } |
458 | |
459 | /// Returns the current number of messages inside the channel. |
460 | pub(crate) fn len(&self) -> usize { |
461 | loop { |
462 | // Load the tail index, then load the head index. |
463 | let mut tail = self.tail.index.load(Ordering::SeqCst); |
464 | let mut head = self.head.index.load(Ordering::SeqCst); |
465 | |
466 | // If the tail index didn't change, we've got consistent indices to work with. |
467 | if self.tail.index.load(Ordering::SeqCst) == tail { |
468 | // Erase the lower bits. |
469 | tail &= !((1 << SHIFT) - 1); |
470 | head &= !((1 << SHIFT) - 1); |
471 | |
472 | // Fix up indices if they fall onto block ends. |
473 | if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { |
474 | tail = tail.wrapping_add(1 << SHIFT); |
475 | } |
476 | if (head >> SHIFT) & (LAP - 1) == LAP - 1 { |
477 | head = head.wrapping_add(1 << SHIFT); |
478 | } |
479 | |
480 | // Rotate indices so that head falls into the first block. |
481 | let lap = (head >> SHIFT) / LAP; |
482 | tail = tail.wrapping_sub((lap * LAP) << SHIFT); |
483 | head = head.wrapping_sub((lap * LAP) << SHIFT); |
484 | |
485 | // Remove the lower bits. |
486 | tail >>= SHIFT; |
487 | head >>= SHIFT; |
488 | |
489 | // Return the difference minus the number of blocks between tail and head. |
490 | return tail - head - tail / LAP; |
491 | } |
492 | } |
493 | } |
494 | |
495 | /// Returns the capacity of the channel. |
496 | pub(crate) fn capacity(&self) -> Option<usize> { |
497 | None |
498 | } |
499 | |
500 | /// Disconnects senders and wakes up all blocked receivers. |
501 | /// |
502 | /// Returns `true` if this call disconnected the channel. |
503 | pub(crate) fn disconnect_senders(&self) -> bool { |
504 | let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); |
505 | |
506 | if tail & MARK_BIT == 0 { |
507 | self.receivers.disconnect(); |
508 | true |
509 | } else { |
510 | false |
511 | } |
512 | } |
513 | |
514 | /// Disconnects receivers. |
515 | /// |
516 | /// Returns `true` if this call disconnected the channel. |
517 | pub(crate) fn disconnect_receivers(&self) -> bool { |
518 | let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); |
519 | |
520 | if tail & MARK_BIT == 0 { |
521 | // If receivers are dropped first, discard all messages to free |
522 | // memory eagerly. |
523 | self.discard_all_messages(); |
524 | true |
525 | } else { |
526 | false |
527 | } |
528 | } |
529 | |
530 | /// Discards all messages. |
531 | /// |
532 | /// This method should only be called when all receivers are dropped. |
533 | fn discard_all_messages(&self) { |
534 | let backoff = Backoff::new(); |
535 | let mut tail = self.tail.index.load(Ordering::Acquire); |
536 | loop { |
537 | let offset = (tail >> SHIFT) % LAP; |
538 | if offset != BLOCK_CAP { |
539 | break; |
540 | } |
541 | |
542 | // New updates to tail will be rejected by MARK_BIT and aborted unless it's |
543 | // at boundary. We need to wait for the updates take affect otherwise there |
544 | // can be memory leaks. |
545 | backoff.spin_heavy(); |
546 | tail = self.tail.index.load(Ordering::Acquire); |
547 | } |
548 | |
549 | let mut head = self.head.index.load(Ordering::Acquire); |
550 | // The channel may be uninitialized, so we have to swap to avoid overwriting any sender's attempts |
551 | // to initalize the first block before noticing that the receivers disconnected. Late allocations |
552 | // will be deallocated by the sender in Drop. |
553 | let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel); |
554 | |
555 | // If we're going to be dropping messages we need to synchronize with initialization |
556 | if head >> SHIFT != tail >> SHIFT { |
557 | // The block can be null here only if a sender is in the process of initializing the |
558 | // channel while another sender managed to send a message by inserting it into the |
559 | // semi-initialized channel and advanced the tail. |
560 | // In that case, just wait until it gets initialized. |
561 | while block.is_null() { |
562 | backoff.spin_heavy(); |
563 | block = self.head.block.load(Ordering::Acquire); |
564 | } |
565 | } |
566 | |
567 | unsafe { |
568 | // Drop all messages between head and tail and deallocate the heap-allocated blocks. |
569 | while head >> SHIFT != tail >> SHIFT { |
570 | let offset = (head >> SHIFT) % LAP; |
571 | |
572 | if offset < BLOCK_CAP { |
573 | // Drop the message in the slot. |
574 | let slot = (*block).slots.get_unchecked(offset); |
575 | slot.wait_write(); |
576 | let p = &mut *slot.msg.get(); |
577 | p.as_mut_ptr().drop_in_place(); |
578 | } else { |
579 | (*block).wait_next(); |
580 | // Deallocate the block and move to the next one. |
581 | let next = (*block).next.load(Ordering::Acquire); |
582 | drop(Box::from_raw(block)); |
583 | block = next; |
584 | } |
585 | |
586 | head = head.wrapping_add(1 << SHIFT); |
587 | } |
588 | |
589 | // Deallocate the last remaining block. |
590 | if !block.is_null() { |
591 | drop(Box::from_raw(block)); |
592 | } |
593 | } |
594 | |
595 | head &= !MARK_BIT; |
596 | self.head.index.store(head, Ordering::Release); |
597 | } |
598 | |
599 | /// Returns `true` if the channel is disconnected. |
600 | pub(crate) fn is_disconnected(&self) -> bool { |
601 | self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 |
602 | } |
603 | |
604 | /// Returns `true` if the channel is empty. |
605 | pub(crate) fn is_empty(&self) -> bool { |
606 | let head = self.head.index.load(Ordering::SeqCst); |
607 | let tail = self.tail.index.load(Ordering::SeqCst); |
608 | head >> SHIFT == tail >> SHIFT |
609 | } |
610 | |
611 | /// Returns `true` if the channel is full. |
612 | pub(crate) fn is_full(&self) -> bool { |
613 | false |
614 | } |
615 | } |
616 | |
617 | impl<T> Drop for Channel<T> { |
618 | fn drop(&mut self) { |
619 | let mut head = self.head.index.load(Ordering::Relaxed); |
620 | let mut tail = self.tail.index.load(Ordering::Relaxed); |
621 | let mut block = self.head.block.load(Ordering::Relaxed); |
622 | |
623 | // Erase the lower bits. |
624 | head &= !((1 << SHIFT) - 1); |
625 | tail &= !((1 << SHIFT) - 1); |
626 | |
627 | unsafe { |
628 | // Drop all messages between head and tail and deallocate the heap-allocated blocks. |
629 | while head != tail { |
630 | let offset = (head >> SHIFT) % LAP; |
631 | |
632 | if offset < BLOCK_CAP { |
633 | // Drop the message in the slot. |
634 | let slot = (*block).slots.get_unchecked(offset); |
635 | let p = &mut *slot.msg.get(); |
636 | p.as_mut_ptr().drop_in_place(); |
637 | } else { |
638 | // Deallocate the block and move to the next one. |
639 | let next = (*block).next.load(Ordering::Relaxed); |
640 | drop(Box::from_raw(block)); |
641 | block = next; |
642 | } |
643 | |
644 | head = head.wrapping_add(1 << SHIFT); |
645 | } |
646 | |
647 | // Deallocate the last remaining block. |
648 | if !block.is_null() { |
649 | drop(Box::from_raw(block)); |
650 | } |
651 | } |
652 | } |
653 | } |
654 | |