1 | //! Unbounded channel implemented as a linked list. |
2 | |
3 | use std::boxed::Box; |
4 | use std::cell::UnsafeCell; |
5 | use std::marker::PhantomData; |
6 | use std::mem::MaybeUninit; |
7 | use std::ptr; |
8 | use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; |
9 | use std::time::Instant; |
10 | |
11 | use crossbeam_utils::{Backoff, CachePadded}; |
12 | |
13 | use crate::context::Context; |
14 | use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; |
15 | use crate::select::{Operation, SelectHandle, Selected, Token}; |
16 | use crate::waker::SyncWaker; |
17 | |
18 | // TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the |
19 | // following changes by @kleimkuhler: |
20 | // |
21 | // 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100 |
22 | // 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101 |
23 | |
24 | // Bits indicating the state of a slot: |
25 | // * If a message has been written into the slot, `WRITE` is set. |
26 | // * If a message has been read from the slot, `READ` is set. |
27 | // * If the block is being destroyed, `DESTROY` is set. |
28 | const WRITE: usize = 1; |
29 | const READ: usize = 2; |
30 | const DESTROY: usize = 4; |
31 | |
32 | // Each block covers one "lap" of indices. |
33 | const LAP: usize = 32; |
34 | // The maximum number of messages a block can hold. |
35 | const BLOCK_CAP: usize = LAP - 1; |
36 | // How many lower bits are reserved for metadata. |
37 | const SHIFT: usize = 1; |
38 | // Has two different purposes: |
39 | // * If set in head, indicates that the block is not the last one. |
40 | // * If set in tail, indicates that the channel is disconnected. |
41 | const MARK_BIT: usize = 1; |
42 | |
43 | /// A slot in a block. |
44 | struct Slot<T> { |
45 | /// The message. |
46 | msg: UnsafeCell<MaybeUninit<T>>, |
47 | |
48 | /// The state of the slot. |
49 | state: AtomicUsize, |
50 | } |
51 | |
52 | impl<T> Slot<T> { |
53 | const UNINIT: Self = Self { |
54 | msg: UnsafeCell::new(MaybeUninit::uninit()), |
55 | state: AtomicUsize::new(0), |
56 | }; |
57 | |
58 | /// Waits until a message is written into the slot. |
59 | fn wait_write(&self) { |
60 | let backoff: Backoff = Backoff::new(); |
61 | while self.state.load(order:Ordering::Acquire) & WRITE == 0 { |
62 | backoff.snooze(); |
63 | } |
64 | } |
65 | } |
66 | |
67 | /// A block in a linked list. |
68 | /// |
69 | /// Each block in the list can hold up to `BLOCK_CAP` messages. |
70 | struct Block<T> { |
71 | /// The next block in the linked list. |
72 | next: AtomicPtr<Block<T>>, |
73 | |
74 | /// Slots for messages. |
75 | slots: [Slot<T>; BLOCK_CAP], |
76 | } |
77 | |
78 | impl<T> Block<T> { |
79 | /// Creates an empty block. |
80 | fn new() -> Block<T> { |
81 | Self { |
82 | next: AtomicPtr::new(ptr::null_mut()), |
83 | slots: [Slot::UNINIT; BLOCK_CAP], |
84 | } |
85 | } |
86 | |
87 | /// Waits until the next pointer is set. |
88 | fn wait_next(&self) -> *mut Block<T> { |
89 | let backoff = Backoff::new(); |
90 | loop { |
91 | let next = self.next.load(Ordering::Acquire); |
92 | if !next.is_null() { |
93 | return next; |
94 | } |
95 | backoff.snooze(); |
96 | } |
97 | } |
98 | |
99 | /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. |
100 | unsafe fn destroy(this: *mut Block<T>, start: usize) { |
101 | // It is not necessary to set the `DESTROY` bit in the last slot because that slot has |
102 | // begun destruction of the block. |
103 | for i in start..BLOCK_CAP - 1 { |
104 | let slot = (*this).slots.get_unchecked(i); |
105 | |
106 | // Mark the `DESTROY` bit if a thread is still using the slot. |
107 | if slot.state.load(Ordering::Acquire) & READ == 0 |
108 | && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 |
109 | { |
110 | // If a thread is still using the slot, it will continue destruction of the block. |
111 | return; |
112 | } |
113 | } |
114 | |
115 | // No thread is using the block, now it is safe to destroy it. |
116 | drop(Box::from_raw(this)); |
117 | } |
118 | } |
119 | |
120 | /// A position in a channel. |
121 | #[derive (Debug)] |
122 | struct Position<T> { |
123 | /// The index in the channel. |
124 | index: AtomicUsize, |
125 | |
126 | /// The block in the linked list. |
127 | block: AtomicPtr<Block<T>>, |
128 | } |
129 | |
130 | /// The token type for the list flavor. |
131 | #[derive (Debug)] |
132 | pub(crate) struct ListToken { |
133 | /// The block of slots. |
134 | block: *const u8, |
135 | |
136 | /// The offset into the block. |
137 | offset: usize, |
138 | } |
139 | |
140 | impl Default for ListToken { |
141 | #[inline ] |
142 | fn default() -> Self { |
143 | ListToken { |
144 | block: ptr::null(), |
145 | offset: 0, |
146 | } |
147 | } |
148 | } |
149 | |
150 | /// Unbounded channel implemented as a linked list. |
151 | /// |
152 | /// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are |
153 | /// represented as numbers of type `usize` and wrap on overflow. |
154 | /// |
155 | /// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and |
156 | /// improve cache efficiency. |
157 | pub(crate) struct Channel<T> { |
158 | /// The head of the channel. |
159 | head: CachePadded<Position<T>>, |
160 | |
161 | /// The tail of the channel. |
162 | tail: CachePadded<Position<T>>, |
163 | |
164 | /// Receivers waiting while the channel is empty and not disconnected. |
165 | receivers: SyncWaker, |
166 | |
167 | /// Indicates that dropping a `Channel<T>` may drop messages of type `T`. |
168 | _marker: PhantomData<T>, |
169 | } |
170 | |
171 | impl<T> Channel<T> { |
172 | /// Creates a new unbounded channel. |
173 | pub(crate) fn new() -> Self { |
174 | Channel { |
175 | head: CachePadded::new(Position { |
176 | block: AtomicPtr::new(ptr::null_mut()), |
177 | index: AtomicUsize::new(0), |
178 | }), |
179 | tail: CachePadded::new(Position { |
180 | block: AtomicPtr::new(ptr::null_mut()), |
181 | index: AtomicUsize::new(0), |
182 | }), |
183 | receivers: SyncWaker::new(), |
184 | _marker: PhantomData, |
185 | } |
186 | } |
187 | |
188 | /// Returns a receiver handle to the channel. |
189 | pub(crate) fn receiver(&self) -> Receiver<'_, T> { |
190 | Receiver(self) |
191 | } |
192 | |
193 | /// Returns a sender handle to the channel. |
194 | pub(crate) fn sender(&self) -> Sender<'_, T> { |
195 | Sender(self) |
196 | } |
197 | |
198 | /// Attempts to reserve a slot for sending a message. |
199 | fn start_send(&self, token: &mut Token) -> bool { |
200 | let backoff = Backoff::new(); |
201 | let mut tail = self.tail.index.load(Ordering::Acquire); |
202 | let mut block = self.tail.block.load(Ordering::Acquire); |
203 | let mut next_block = None; |
204 | |
205 | loop { |
206 | // Check if the channel is disconnected. |
207 | if tail & MARK_BIT != 0 { |
208 | token.list.block = ptr::null(); |
209 | return true; |
210 | } |
211 | |
212 | // Calculate the offset of the index into the block. |
213 | let offset = (tail >> SHIFT) % LAP; |
214 | |
215 | // If we reached the end of the block, wait until the next one is installed. |
216 | if offset == BLOCK_CAP { |
217 | backoff.snooze(); |
218 | tail = self.tail.index.load(Ordering::Acquire); |
219 | block = self.tail.block.load(Ordering::Acquire); |
220 | continue; |
221 | } |
222 | |
223 | // If we're going to have to install the next block, allocate it in advance in order to |
224 | // make the wait for other threads as short as possible. |
225 | if offset + 1 == BLOCK_CAP && next_block.is_none() { |
226 | next_block = Some(Box::new(Block::<T>::new())); |
227 | } |
228 | |
229 | // If this is the first message to be sent into the channel, we need to allocate the |
230 | // first block and install it. |
231 | if block.is_null() { |
232 | let new = Box::into_raw(Box::new(Block::<T>::new())); |
233 | |
234 | if self |
235 | .tail |
236 | .block |
237 | .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) |
238 | .is_ok() |
239 | { |
240 | self.head.block.store(new, Ordering::Release); |
241 | block = new; |
242 | } else { |
243 | next_block = unsafe { Some(Box::from_raw(new)) }; |
244 | tail = self.tail.index.load(Ordering::Acquire); |
245 | block = self.tail.block.load(Ordering::Acquire); |
246 | continue; |
247 | } |
248 | } |
249 | |
250 | let new_tail = tail + (1 << SHIFT); |
251 | |
252 | // Try advancing the tail forward. |
253 | match self.tail.index.compare_exchange_weak( |
254 | tail, |
255 | new_tail, |
256 | Ordering::SeqCst, |
257 | Ordering::Acquire, |
258 | ) { |
259 | Ok(_) => unsafe { |
260 | // If we've reached the end of the block, install the next one. |
261 | if offset + 1 == BLOCK_CAP { |
262 | let next_block = Box::into_raw(next_block.unwrap()); |
263 | self.tail.block.store(next_block, Ordering::Release); |
264 | self.tail.index.fetch_add(1 << SHIFT, Ordering::Release); |
265 | (*block).next.store(next_block, Ordering::Release); |
266 | } |
267 | |
268 | token.list.block = block as *const u8; |
269 | token.list.offset = offset; |
270 | return true; |
271 | }, |
272 | Err(t) => { |
273 | tail = t; |
274 | block = self.tail.block.load(Ordering::Acquire); |
275 | backoff.spin(); |
276 | } |
277 | } |
278 | } |
279 | } |
280 | |
281 | /// Writes a message into the channel. |
282 | pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { |
283 | // If there is no slot, the channel is disconnected. |
284 | if token.list.block.is_null() { |
285 | return Err(msg); |
286 | } |
287 | |
288 | // Write the message into the slot. |
289 | let block = token.list.block.cast::<Block<T>>(); |
290 | let offset = token.list.offset; |
291 | let slot = (*block).slots.get_unchecked(offset); |
292 | slot.msg.get().write(MaybeUninit::new(msg)); |
293 | slot.state.fetch_or(WRITE, Ordering::Release); |
294 | |
295 | // Wake a sleeping receiver. |
296 | self.receivers.notify(); |
297 | Ok(()) |
298 | } |
299 | |
300 | /// Attempts to reserve a slot for receiving a message. |
301 | fn start_recv(&self, token: &mut Token) -> bool { |
302 | let backoff = Backoff::new(); |
303 | let mut head = self.head.index.load(Ordering::Acquire); |
304 | let mut block = self.head.block.load(Ordering::Acquire); |
305 | |
306 | loop { |
307 | // Calculate the offset of the index into the block. |
308 | let offset = (head >> SHIFT) % LAP; |
309 | |
310 | // If we reached the end of the block, wait until the next one is installed. |
311 | if offset == BLOCK_CAP { |
312 | backoff.snooze(); |
313 | head = self.head.index.load(Ordering::Acquire); |
314 | block = self.head.block.load(Ordering::Acquire); |
315 | continue; |
316 | } |
317 | |
318 | let mut new_head = head + (1 << SHIFT); |
319 | |
320 | if new_head & MARK_BIT == 0 { |
321 | atomic::fence(Ordering::SeqCst); |
322 | let tail = self.tail.index.load(Ordering::Relaxed); |
323 | |
324 | // If the tail equals the head, that means the channel is empty. |
325 | if head >> SHIFT == tail >> SHIFT { |
326 | // If the channel is disconnected... |
327 | if tail & MARK_BIT != 0 { |
328 | // ...then receive an error. |
329 | token.list.block = ptr::null(); |
330 | return true; |
331 | } else { |
332 | // Otherwise, the receive operation is not ready. |
333 | return false; |
334 | } |
335 | } |
336 | |
337 | // If head and tail are not in the same block, set `MARK_BIT` in head. |
338 | if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { |
339 | new_head |= MARK_BIT; |
340 | } |
341 | } |
342 | |
343 | // The block can be null here only if the first message is being sent into the channel. |
344 | // In that case, just wait until it gets initialized. |
345 | if block.is_null() { |
346 | backoff.snooze(); |
347 | head = self.head.index.load(Ordering::Acquire); |
348 | block = self.head.block.load(Ordering::Acquire); |
349 | continue; |
350 | } |
351 | |
352 | // Try moving the head index forward. |
353 | match self.head.index.compare_exchange_weak( |
354 | head, |
355 | new_head, |
356 | Ordering::SeqCst, |
357 | Ordering::Acquire, |
358 | ) { |
359 | Ok(_) => unsafe { |
360 | // If we've reached the end of the block, move to the next one. |
361 | if offset + 1 == BLOCK_CAP { |
362 | let next = (*block).wait_next(); |
363 | let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); |
364 | if !(*next).next.load(Ordering::Relaxed).is_null() { |
365 | next_index |= MARK_BIT; |
366 | } |
367 | |
368 | self.head.block.store(next, Ordering::Release); |
369 | self.head.index.store(next_index, Ordering::Release); |
370 | } |
371 | |
372 | token.list.block = block as *const u8; |
373 | token.list.offset = offset; |
374 | return true; |
375 | }, |
376 | Err(h) => { |
377 | head = h; |
378 | block = self.head.block.load(Ordering::Acquire); |
379 | backoff.spin(); |
380 | } |
381 | } |
382 | } |
383 | } |
384 | |
385 | /// Reads a message from the channel. |
386 | pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { |
387 | if token.list.block.is_null() { |
388 | // The channel is disconnected. |
389 | return Err(()); |
390 | } |
391 | |
392 | // Read the message. |
393 | let block = token.list.block as *mut Block<T>; |
394 | let offset = token.list.offset; |
395 | let slot = (*block).slots.get_unchecked(offset); |
396 | slot.wait_write(); |
397 | let msg = slot.msg.get().read().assume_init(); |
398 | |
399 | // Destroy the block if we've reached the end, or if another thread wanted to destroy but |
400 | // couldn't because we were busy reading from the slot. |
401 | if offset + 1 == BLOCK_CAP { |
402 | Block::destroy(block, 0); |
403 | } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { |
404 | Block::destroy(block, offset + 1); |
405 | } |
406 | |
407 | Ok(msg) |
408 | } |
409 | |
410 | /// Attempts to send a message into the channel. |
411 | pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
412 | self.send(msg, None).map_err(|err| match err { |
413 | SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), |
414 | SendTimeoutError::Timeout(_) => unreachable!(), |
415 | }) |
416 | } |
417 | |
418 | /// Sends a message into the channel. |
419 | pub(crate) fn send( |
420 | &self, |
421 | msg: T, |
422 | _deadline: Option<Instant>, |
423 | ) -> Result<(), SendTimeoutError<T>> { |
424 | let token = &mut Token::default(); |
425 | assert!(self.start_send(token)); |
426 | unsafe { |
427 | self.write(token, msg) |
428 | .map_err(SendTimeoutError::Disconnected) |
429 | } |
430 | } |
431 | |
432 | /// Attempts to receive a message without blocking. |
433 | pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { |
434 | let token = &mut Token::default(); |
435 | |
436 | if self.start_recv(token) { |
437 | unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } |
438 | } else { |
439 | Err(TryRecvError::Empty) |
440 | } |
441 | } |
442 | |
443 | /// Receives a message from the channel. |
444 | pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { |
445 | let token = &mut Token::default(); |
446 | loop { |
447 | // Try receiving a message several times. |
448 | let backoff = Backoff::new(); |
449 | loop { |
450 | if self.start_recv(token) { |
451 | unsafe { |
452 | return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); |
453 | } |
454 | } |
455 | |
456 | if backoff.is_completed() { |
457 | break; |
458 | } else { |
459 | backoff.snooze(); |
460 | } |
461 | } |
462 | |
463 | if let Some(d) = deadline { |
464 | if Instant::now() >= d { |
465 | return Err(RecvTimeoutError::Timeout); |
466 | } |
467 | } |
468 | |
469 | // Prepare for blocking until a sender wakes us up. |
470 | Context::with(|cx| { |
471 | let oper = Operation::hook(token); |
472 | self.receivers.register(oper, cx); |
473 | |
474 | // Has the channel become ready just now? |
475 | if !self.is_empty() || self.is_disconnected() { |
476 | let _ = cx.try_select(Selected::Aborted); |
477 | } |
478 | |
479 | // Block the current thread. |
480 | let sel = cx.wait_until(deadline); |
481 | |
482 | match sel { |
483 | Selected::Waiting => unreachable!(), |
484 | Selected::Aborted | Selected::Disconnected => { |
485 | self.receivers.unregister(oper).unwrap(); |
486 | // If the channel was disconnected, we still have to check for remaining |
487 | // messages. |
488 | } |
489 | Selected::Operation(_) => {} |
490 | } |
491 | }); |
492 | } |
493 | } |
494 | |
495 | /// Returns the current number of messages inside the channel. |
496 | pub(crate) fn len(&self) -> usize { |
497 | loop { |
498 | // Load the tail index, then load the head index. |
499 | let mut tail = self.tail.index.load(Ordering::SeqCst); |
500 | let mut head = self.head.index.load(Ordering::SeqCst); |
501 | |
502 | // If the tail index didn't change, we've got consistent indices to work with. |
503 | if self.tail.index.load(Ordering::SeqCst) == tail { |
504 | // Erase the lower bits. |
505 | tail &= !((1 << SHIFT) - 1); |
506 | head &= !((1 << SHIFT) - 1); |
507 | |
508 | // Fix up indices if they fall onto block ends. |
509 | if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { |
510 | tail = tail.wrapping_add(1 << SHIFT); |
511 | } |
512 | if (head >> SHIFT) & (LAP - 1) == LAP - 1 { |
513 | head = head.wrapping_add(1 << SHIFT); |
514 | } |
515 | |
516 | // Rotate indices so that head falls into the first block. |
517 | let lap = (head >> SHIFT) / LAP; |
518 | tail = tail.wrapping_sub((lap * LAP) << SHIFT); |
519 | head = head.wrapping_sub((lap * LAP) << SHIFT); |
520 | |
521 | // Remove the lower bits. |
522 | tail >>= SHIFT; |
523 | head >>= SHIFT; |
524 | |
525 | // Return the difference minus the number of blocks between tail and head. |
526 | return tail - head - tail / LAP; |
527 | } |
528 | } |
529 | } |
530 | |
531 | /// Returns the capacity of the channel. |
532 | pub(crate) fn capacity(&self) -> Option<usize> { |
533 | None |
534 | } |
535 | |
536 | /// Disconnects senders and wakes up all blocked receivers. |
537 | /// |
538 | /// Returns `true` if this call disconnected the channel. |
539 | pub(crate) fn disconnect_senders(&self) -> bool { |
540 | let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); |
541 | |
542 | if tail & MARK_BIT == 0 { |
543 | self.receivers.disconnect(); |
544 | true |
545 | } else { |
546 | false |
547 | } |
548 | } |
549 | |
550 | /// Disconnects receivers. |
551 | /// |
552 | /// Returns `true` if this call disconnected the channel. |
553 | pub(crate) fn disconnect_receivers(&self) -> bool { |
554 | let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); |
555 | |
556 | if tail & MARK_BIT == 0 { |
557 | // If receivers are dropped first, discard all messages to free |
558 | // memory eagerly. |
559 | self.discard_all_messages(); |
560 | true |
561 | } else { |
562 | false |
563 | } |
564 | } |
565 | |
566 | /// Discards all messages. |
567 | /// |
568 | /// This method should only be called when all receivers are dropped. |
569 | fn discard_all_messages(&self) { |
570 | let backoff = Backoff::new(); |
571 | let mut tail = self.tail.index.load(Ordering::Acquire); |
572 | loop { |
573 | let offset = (tail >> SHIFT) % LAP; |
574 | if offset != BLOCK_CAP { |
575 | break; |
576 | } |
577 | |
578 | // New updates to tail will be rejected by MARK_BIT and aborted unless it's |
579 | // at boundary. We need to wait for the updates take affect otherwise there |
580 | // can be memory leaks. |
581 | backoff.snooze(); |
582 | tail = self.tail.index.load(Ordering::Acquire); |
583 | } |
584 | |
585 | let mut head = self.head.index.load(Ordering::Acquire); |
586 | let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel); |
587 | |
588 | // If we're going to be dropping messages we need to synchronize with initialization |
589 | if head >> SHIFT != tail >> SHIFT { |
590 | // The block can be null here only if a sender is in the process of initializing the |
591 | // channel while another sender managed to send a message by inserting it into the |
592 | // semi-initialized channel and advanced the tail. |
593 | // In that case, just wait until it gets initialized. |
594 | while block.is_null() { |
595 | backoff.snooze(); |
596 | block = self.head.block.load(Ordering::Acquire); |
597 | } |
598 | } |
599 | |
600 | unsafe { |
601 | // Drop all messages between head and tail and deallocate the heap-allocated blocks. |
602 | while head >> SHIFT != tail >> SHIFT { |
603 | let offset = (head >> SHIFT) % LAP; |
604 | |
605 | if offset < BLOCK_CAP { |
606 | // Drop the message in the slot. |
607 | let slot = (*block).slots.get_unchecked(offset); |
608 | slot.wait_write(); |
609 | (*slot.msg.get()).assume_init_drop(); |
610 | } else { |
611 | (*block).wait_next(); |
612 | // Deallocate the block and move to the next one. |
613 | let next = (*block).next.load(Ordering::Acquire); |
614 | drop(Box::from_raw(block)); |
615 | block = next; |
616 | } |
617 | |
618 | head = head.wrapping_add(1 << SHIFT); |
619 | } |
620 | |
621 | // Deallocate the last remaining block. |
622 | if !block.is_null() { |
623 | drop(Box::from_raw(block)); |
624 | } |
625 | } |
626 | head &= !MARK_BIT; |
627 | self.head.index.store(head, Ordering::Release); |
628 | } |
629 | |
630 | /// Returns `true` if the channel is disconnected. |
631 | pub(crate) fn is_disconnected(&self) -> bool { |
632 | self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 |
633 | } |
634 | |
635 | /// Returns `true` if the channel is empty. |
636 | pub(crate) fn is_empty(&self) -> bool { |
637 | let head = self.head.index.load(Ordering::SeqCst); |
638 | let tail = self.tail.index.load(Ordering::SeqCst); |
639 | head >> SHIFT == tail >> SHIFT |
640 | } |
641 | |
642 | /// Returns `true` if the channel is full. |
643 | pub(crate) fn is_full(&self) -> bool { |
644 | false |
645 | } |
646 | } |
647 | |
648 | impl<T> Drop for Channel<T> { |
649 | fn drop(&mut self) { |
650 | let mut head = *self.head.index.get_mut(); |
651 | let mut tail = *self.tail.index.get_mut(); |
652 | let mut block = *self.head.block.get_mut(); |
653 | |
654 | // Erase the lower bits. |
655 | head &= !((1 << SHIFT) - 1); |
656 | tail &= !((1 << SHIFT) - 1); |
657 | |
658 | unsafe { |
659 | // Drop all messages between head and tail and deallocate the heap-allocated blocks. |
660 | while head != tail { |
661 | let offset = (head >> SHIFT) % LAP; |
662 | |
663 | if offset < BLOCK_CAP { |
664 | // Drop the message in the slot. |
665 | let slot = (*block).slots.get_unchecked(offset); |
666 | (*slot.msg.get()).assume_init_drop(); |
667 | } else { |
668 | // Deallocate the block and move to the next one. |
669 | let next = *(*block).next.get_mut(); |
670 | drop(Box::from_raw(block)); |
671 | block = next; |
672 | } |
673 | |
674 | head = head.wrapping_add(1 << SHIFT); |
675 | } |
676 | |
677 | // Deallocate the last remaining block. |
678 | if !block.is_null() { |
679 | drop(Box::from_raw(block)); |
680 | } |
681 | } |
682 | } |
683 | } |
684 | |
685 | /// Receiver handle to a channel. |
686 | pub(crate) struct Receiver<'a, T>(&'a Channel<T>); |
687 | |
688 | /// Sender handle to a channel. |
689 | pub(crate) struct Sender<'a, T>(&'a Channel<T>); |
690 | |
691 | impl<T> SelectHandle for Receiver<'_, T> { |
692 | fn try_select(&self, token: &mut Token) -> bool { |
693 | self.0.start_recv(token) |
694 | } |
695 | |
696 | fn deadline(&self) -> Option<Instant> { |
697 | None |
698 | } |
699 | |
700 | fn register(&self, oper: Operation, cx: &Context) -> bool { |
701 | self.0.receivers.register(oper, cx); |
702 | self.is_ready() |
703 | } |
704 | |
705 | fn unregister(&self, oper: Operation) { |
706 | self.0.receivers.unregister(oper); |
707 | } |
708 | |
709 | fn accept(&self, token: &mut Token, _cx: &Context) -> bool { |
710 | self.try_select(token) |
711 | } |
712 | |
713 | fn is_ready(&self) -> bool { |
714 | !self.0.is_empty() || self.0.is_disconnected() |
715 | } |
716 | |
717 | fn watch(&self, oper: Operation, cx: &Context) -> bool { |
718 | self.0.receivers.watch(oper, cx); |
719 | self.is_ready() |
720 | } |
721 | |
722 | fn unwatch(&self, oper: Operation) { |
723 | self.0.receivers.unwatch(oper); |
724 | } |
725 | } |
726 | |
727 | impl<T> SelectHandle for Sender<'_, T> { |
728 | fn try_select(&self, token: &mut Token) -> bool { |
729 | self.0.start_send(token) |
730 | } |
731 | |
732 | fn deadline(&self) -> Option<Instant> { |
733 | None |
734 | } |
735 | |
736 | fn register(&self, _oper: Operation, _cx: &Context) -> bool { |
737 | self.is_ready() |
738 | } |
739 | |
740 | fn unregister(&self, _oper: Operation) {} |
741 | |
742 | fn accept(&self, token: &mut Token, _cx: &Context) -> bool { |
743 | self.try_select(token) |
744 | } |
745 | |
746 | fn is_ready(&self) -> bool { |
747 | true |
748 | } |
749 | |
750 | fn watch(&self, _oper: Operation, _cx: &Context) -> bool { |
751 | self.is_ready() |
752 | } |
753 | |
754 | fn unwatch(&self, _oper: Operation) {} |
755 | } |
756 | |