1//! A multi-producer, single-consumer queue for sending values across
2//! asynchronous tasks.
3//!
4//! Similarly to the `std`, channel creation provides [`Receiver`] and
5//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6//! read values out of the channel. If there is no message to read from the
7//! channel, the current task will be notified when a new value is sent.
8//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9//! the channel. If the channel is at capacity, the send will be rejected and
10//! the task will be notified when additional capacity is available. In other
11//! words, the channel provides backpressure.
12//!
13//! Unbounded channels are also available using the `unbounded` constructor.
14//!
15//! # Disconnection
16//!
17//! When all [`Sender`] handles have been dropped, it is no longer
18//! possible to send values into the channel. This is considered the termination
19//! event of the stream. As such, [`Receiver::poll_next`]
20//! will return `Ok(Ready(None))`.
21//!
22//! If the [`Receiver`] handle is dropped, then messages can no longer
23//! be read out of the channel. In this case, all further attempts to send will
24//! result in an error.
25//!
26//! # Clean Shutdown
27//!
28//! If the [`Receiver`] is simply dropped, then it is possible for
29//! there to be messages still in the channel that will not be processed. As
30//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31//! receiver will first call `close`, which will prevent any further messages to
32//! be sent into the channel. Then, the receiver consumes the channel to
33//! completion, at which point the receiver can be dropped.
34//!
35//! [`Sender`]: struct.Sender.html
36//! [`Receiver`]: struct.Receiver.html
37//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38//! [`Receiver::poll_next`]:
39//! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40
41// At the core, the channel uses an atomic FIFO queue for message passing. This
42// queue is used as the primary coordination primitive. In order to enforce
43// capacity limits and handle back pressure, a secondary FIFO queue is used to
44// send parked task handles.
45//
46// The general idea is that the channel is created with a `buffer` size of `n`.
47// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48// slot to hold a message. This allows `Sender` to know for a fact that a send
49// will succeed *before* starting to do the actual work of sending the value.
50// Since most of this work is lock-free, once the work starts, it is impossible
51// to safely revert.
52//
53// If the sender is unable to process a send operation, then the current
54// task is parked and the handle is sent on the parked task queue.
55//
56// Note that the implementation guarantees that the channel capacity will never
57// exceed the configured limit, however there is no *strict* guarantee that the
58// receiver will wake up a parked task *immediately* when a slot becomes
59// available. However, it will almost always unpark a task when a slot becomes
60// available and it is *guaranteed* that a sender will be unparked when the
61// message that caused the sender to become parked is read out of the channel.
62//
63// The steps for sending a message are roughly:
64//
65// 1) Increment the channel message count
66// 2) If the channel is at capacity, push the task handle onto the wait queue
67// 3) Push the message onto the message queue.
68//
69// The steps for receiving a message are roughly:
70//
71// 1) Pop a message from the message queue
72// 2) Pop a task handle from the wait queue
73// 3) Decrement the channel message count.
74//
75// It's important for the order of operations on lock-free structures to happen
76// in reverse order between the sender and receiver. This makes the message
77// queue the primary coordination structure and establishes the necessary
78// happens-before semantics required for the acquire / release semantics used
79// by the queue structure.
80
81use futures_core::stream::{FusedStream, Stream};
82use futures_core::task::__internal::AtomicWaker;
83use futures_core::task::{Context, Poll, Waker};
84use std::fmt;
85use std::pin::Pin;
86use std::sync::atomic::AtomicUsize;
87use std::sync::atomic::Ordering::SeqCst;
88use std::sync::{Arc, Mutex};
89use std::thread;
90
91use crate::mpsc::queue::Queue;
92
93mod queue;
94#[cfg(feature = "sink")]
95mod sink_impl;
96
97struct UnboundedSenderInner<T> {
98 // Channel state shared between the sender and receiver.
99 inner: Arc<UnboundedInner<T>>,
100}
101
102struct BoundedSenderInner<T> {
103 // Channel state shared between the sender and receiver.
104 inner: Arc<BoundedInner<T>>,
105
106 // Handle to the task that is blocked on this sender. This handle is sent
107 // to the receiver half in order to be notified when the sender becomes
108 // unblocked.
109 sender_task: Arc<Mutex<SenderTask>>,
110
111 // `true` if the sender might be blocked. This is an optimization to avoid
112 // having to lock the mutex most of the time.
113 maybe_parked: bool,
114}
115
116// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
117impl<T> Unpin for UnboundedSenderInner<T> {}
118impl<T> Unpin for BoundedSenderInner<T> {}
119
120/// The transmission end of a bounded mpsc channel.
121///
122/// This value is created by the [`channel`] function.
123pub struct Sender<T>(Option<BoundedSenderInner<T>>);
124
125/// The transmission end of an unbounded mpsc channel.
126///
127/// This value is created by the [`unbounded`] function.
128pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
129
130trait AssertKinds: Send + Sync + Clone {}
131impl AssertKinds for UnboundedSender<u32> {}
132
133/// The receiving end of a bounded mpsc channel.
134///
135/// This value is created by the [`channel`] function.
136pub struct Receiver<T> {
137 inner: Option<Arc<BoundedInner<T>>>,
138}
139
140/// The receiving end of an unbounded mpsc channel.
141///
142/// This value is created by the [`unbounded`] function.
143pub struct UnboundedReceiver<T> {
144 inner: Option<Arc<UnboundedInner<T>>>,
145}
146
147// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
148impl<T> Unpin for UnboundedReceiver<T> {}
149
150/// The error type for [`Sender`s](Sender) used as `Sink`s.
151#[derive(Clone, Debug, PartialEq, Eq)]
152pub struct SendError {
153 kind: SendErrorKind,
154}
155
156/// The error type returned from [`try_send`](Sender::try_send).
157#[derive(Clone, PartialEq, Eq)]
158pub struct TrySendError<T> {
159 err: SendError,
160 val: T,
161}
162
163#[derive(Clone, Debug, PartialEq, Eq)]
164enum SendErrorKind {
165 Full,
166 Disconnected,
167}
168
169/// The error type returned from [`try_next`](Receiver::try_next).
170pub struct TryRecvError {
171 _priv: (),
172}
173
174impl fmt::Display for SendError {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 if self.is_full() {
177 write!(f, "send failed because channel is full")
178 } else {
179 write!(f, "send failed because receiver is gone")
180 }
181 }
182}
183
184impl std::error::Error for SendError {}
185
186impl SendError {
187 /// Returns `true` if this error is a result of the channel being full.
188 pub fn is_full(&self) -> bool {
189 match self.kind {
190 SendErrorKind::Full => true,
191 _ => false,
192 }
193 }
194
195 /// Returns `true` if this error is a result of the receiver being dropped.
196 pub fn is_disconnected(&self) -> bool {
197 match self.kind {
198 SendErrorKind::Disconnected => true,
199 _ => false,
200 }
201 }
202}
203
204impl<T> fmt::Debug for TrySendError<T> {
205 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206 f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
207 }
208}
209
210impl<T> fmt::Display for TrySendError<T> {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 if self.is_full() {
213 write!(f, "send failed because channel is full")
214 } else {
215 write!(f, "send failed because receiver is gone")
216 }
217 }
218}
219
220impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
221
222impl<T> TrySendError<T> {
223 /// Returns `true` if this error is a result of the channel being full.
224 pub fn is_full(&self) -> bool {
225 self.err.is_full()
226 }
227
228 /// Returns `true` if this error is a result of the receiver being dropped.
229 pub fn is_disconnected(&self) -> bool {
230 self.err.is_disconnected()
231 }
232
233 /// Returns the message that was attempted to be sent but failed.
234 pub fn into_inner(self) -> T {
235 self.val
236 }
237
238 /// Drops the message and converts into a `SendError`.
239 pub fn into_send_error(self) -> SendError {
240 self.err
241 }
242}
243
244impl fmt::Debug for TryRecvError {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 f.debug_tuple("TryRecvError").finish()
247 }
248}
249
250impl fmt::Display for TryRecvError {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 write!(f, "receiver channel is empty")
253 }
254}
255
256impl std::error::Error for TryRecvError {}
257
258struct UnboundedInner<T> {
259 // Internal channel state. Consists of the number of messages stored in the
260 // channel as well as a flag signalling that the channel is closed.
261 state: AtomicUsize,
262
263 // Atomic, FIFO queue used to send messages to the receiver
264 message_queue: Queue<T>,
265
266 // Number of senders in existence
267 num_senders: AtomicUsize,
268
269 // Handle to the receiver's task.
270 recv_task: AtomicWaker,
271}
272
273struct BoundedInner<T> {
274 // Max buffer size of the channel. If `None` then the channel is unbounded.
275 buffer: usize,
276
277 // Internal channel state. Consists of the number of messages stored in the
278 // channel as well as a flag signalling that the channel is closed.
279 state: AtomicUsize,
280
281 // Atomic, FIFO queue used to send messages to the receiver
282 message_queue: Queue<T>,
283
284 // Atomic, FIFO queue used to send parked task handles to the receiver.
285 parked_queue: Queue<Arc<Mutex<SenderTask>>>,
286
287 // Number of senders in existence
288 num_senders: AtomicUsize,
289
290 // Handle to the receiver's task.
291 recv_task: AtomicWaker,
292}
293
294// Struct representation of `Inner::state`.
295#[derive(Clone, Copy)]
296struct State {
297 // `true` when the channel is open
298 is_open: bool,
299
300 // Number of messages in the channel
301 num_messages: usize,
302}
303
304// The `is_open` flag is stored in the left-most bit of `Inner::state`
305const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
306
307// When a new channel is created, it is created in the open state with no
308// pending messages.
309const INIT_STATE: usize = OPEN_MASK;
310
311// The maximum number of messages that a channel can track is `usize::max_value() >> 1`
312const MAX_CAPACITY: usize = !(OPEN_MASK);
313
314// The maximum requested buffer size must be less than the maximum capacity of
315// a channel. This is because each sender gets a guaranteed slot.
316const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
317
318// Sent to the consumer to wake up blocked producers
319struct SenderTask {
320 task: Option<Waker>,
321 is_parked: bool,
322}
323
324impl SenderTask {
325 fn new() -> Self {
326 Self { task: None, is_parked: false }
327 }
328
329 fn notify(&mut self) {
330 self.is_parked = false;
331
332 if let Some(task) = self.task.take() {
333 task.wake();
334 }
335 }
336}
337
338/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
339///
340/// Being bounded, this channel provides backpressure to ensure that the sender
341/// outpaces the receiver by only a limited amount. The channel's capacity is
342/// equal to `buffer + num-senders`. In other words, each sender gets a
343/// guaranteed slot in the channel capacity, and on top of that there are
344/// `buffer` "first come, first serve" slots available to all senders.
345///
346/// The [`Receiver`] returned implements the [`Stream`] trait, while [`Sender`]
347/// implements `Sink`.
348pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
349 // Check that the requested buffer size does not exceed the maximum buffer
350 // size permitted by the system.
351 assert!(buffer < MAX_BUFFER, "requested buffer size too large");
352
353 let inner = Arc::new(BoundedInner {
354 buffer,
355 state: AtomicUsize::new(INIT_STATE),
356 message_queue: Queue::new(),
357 parked_queue: Queue::new(),
358 num_senders: AtomicUsize::new(1),
359 recv_task: AtomicWaker::new(),
360 });
361
362 let tx = BoundedSenderInner {
363 inner: inner.clone(),
364 sender_task: Arc::new(Mutex::new(SenderTask::new())),
365 maybe_parked: false,
366 };
367
368 let rx = Receiver { inner: Some(inner) };
369
370 (Sender(Some(tx)), rx)
371}
372
373/// Creates an unbounded mpsc channel for communicating between asynchronous
374/// tasks.
375///
376/// A `send` on this channel will always succeed as long as the receive half has
377/// not been closed. If the receiver falls behind, messages will be arbitrarily
378/// buffered.
379///
380/// **Note** that the amount of available system memory is an implicit bound to
381/// the channel. Using an `unbounded` channel has the ability of causing the
382/// process to run out of memory. In this case, the process will be aborted.
383pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
384 let inner = Arc::new(UnboundedInner {
385 state: AtomicUsize::new(INIT_STATE),
386 message_queue: Queue::new(),
387 num_senders: AtomicUsize::new(1),
388 recv_task: AtomicWaker::new(),
389 });
390
391 let tx = UnboundedSenderInner { inner: inner.clone() };
392
393 let rx = UnboundedReceiver { inner: Some(inner) };
394
395 (UnboundedSender(Some(tx)), rx)
396}
397
398/*
399 *
400 * ===== impl Sender =====
401 *
402 */
403
404impl<T> UnboundedSenderInner<T> {
405 fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
406 let state = decode_state(self.inner.state.load(SeqCst));
407 if state.is_open {
408 Poll::Ready(Ok(()))
409 } else {
410 Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
411 }
412 }
413
414 // Push message to the queue and signal to the receiver
415 fn queue_push_and_signal(&self, msg: T) {
416 // Push the message onto the message queue
417 self.inner.message_queue.push(msg);
418
419 // Signal to the receiver that a message has been enqueued. If the
420 // receiver is parked, this will unpark the task.
421 self.inner.recv_task.wake();
422 }
423
424 // Increment the number of queued messages. Returns the resulting number.
425 fn inc_num_messages(&self) -> Option<usize> {
426 let mut curr = self.inner.state.load(SeqCst);
427
428 loop {
429 let mut state = decode_state(curr);
430
431 // The receiver end closed the channel.
432 if !state.is_open {
433 return None;
434 }
435
436 // This probably is never hit? Odds are the process will run out of
437 // memory first. It may be worth to return something else in this
438 // case?
439 assert!(
440 state.num_messages < MAX_CAPACITY,
441 "buffer space \
442 exhausted; sending this messages would overflow the state"
443 );
444
445 state.num_messages += 1;
446
447 let next = encode_state(&state);
448 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
449 Ok(_) => return Some(state.num_messages),
450 Err(actual) => curr = actual,
451 }
452 }
453 }
454
455 /// Returns whether the senders send to the same receiver.
456 fn same_receiver(&self, other: &Self) -> bool {
457 Arc::ptr_eq(&self.inner, &other.inner)
458 }
459
460 /// Returns whether the sender send to this receiver.
461 fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
462 Arc::ptr_eq(&self.inner, inner)
463 }
464
465 /// Returns pointer to the Arc containing sender
466 ///
467 /// The returned pointer is not referenced and should be only used for hashing!
468 fn ptr(&self) -> *const UnboundedInner<T> {
469 &*self.inner
470 }
471
472 /// Returns whether this channel is closed without needing a context.
473 fn is_closed(&self) -> bool {
474 !decode_state(self.inner.state.load(SeqCst)).is_open
475 }
476
477 /// Closes this channel from the sender side, preventing any new messages.
478 fn close_channel(&self) {
479 // There's no need to park this sender, its dropping,
480 // and we don't want to check for capacity, so skip
481 // that stuff from `do_send`.
482
483 self.inner.set_closed();
484 self.inner.recv_task.wake();
485 }
486}
487
488impl<T> BoundedSenderInner<T> {
489 /// Attempts to send a message on this `Sender`, returning the message
490 /// if there was an error.
491 fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
492 // If the sender is currently blocked, reject the message
493 if !self.poll_unparked(None).is_ready() {
494 return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
495 }
496
497 // The channel has capacity to accept the message, so send it
498 self.do_send_b(msg)
499 }
500
501 // Do the send without failing.
502 // Can be called only by bounded sender.
503 fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
504 // Anyone calling do_send *should* make sure there is room first,
505 // but assert here for tests as a sanity check.
506 debug_assert!(self.poll_unparked(None).is_ready());
507
508 // First, increment the number of messages contained by the channel.
509 // This operation will also atomically determine if the sender task
510 // should be parked.
511 //
512 // `None` is returned in the case that the channel has been closed by the
513 // receiver. This happens when `Receiver::close` is called or the
514 // receiver is dropped.
515 let park_self = match self.inc_num_messages() {
516 Some(num_messages) => {
517 // Block if the current number of pending messages has exceeded
518 // the configured buffer size
519 num_messages > self.inner.buffer
520 }
521 None => {
522 return Err(TrySendError {
523 err: SendError { kind: SendErrorKind::Disconnected },
524 val: msg,
525 })
526 }
527 };
528
529 // If the channel has reached capacity, then the sender task needs to
530 // be parked. This will send the task handle on the parked task queue.
531 //
532 // However, when `do_send` is called while dropping the `Sender`,
533 // `task::current()` can't be called safely. In this case, in order to
534 // maintain internal consistency, a blank message is pushed onto the
535 // parked task queue.
536 if park_self {
537 self.park();
538 }
539
540 self.queue_push_and_signal(msg);
541
542 Ok(())
543 }
544
545 // Push message to the queue and signal to the receiver
546 fn queue_push_and_signal(&self, msg: T) {
547 // Push the message onto the message queue
548 self.inner.message_queue.push(msg);
549
550 // Signal to the receiver that a message has been enqueued. If the
551 // receiver is parked, this will unpark the task.
552 self.inner.recv_task.wake();
553 }
554
555 // Increment the number of queued messages. Returns the resulting number.
556 fn inc_num_messages(&self) -> Option<usize> {
557 let mut curr = self.inner.state.load(SeqCst);
558
559 loop {
560 let mut state = decode_state(curr);
561
562 // The receiver end closed the channel.
563 if !state.is_open {
564 return None;
565 }
566
567 // This probably is never hit? Odds are the process will run out of
568 // memory first. It may be worth to return something else in this
569 // case?
570 assert!(
571 state.num_messages < MAX_CAPACITY,
572 "buffer space \
573 exhausted; sending this messages would overflow the state"
574 );
575
576 state.num_messages += 1;
577
578 let next = encode_state(&state);
579 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
580 Ok(_) => return Some(state.num_messages),
581 Err(actual) => curr = actual,
582 }
583 }
584 }
585
586 fn park(&mut self) {
587 {
588 let mut sender = self.sender_task.lock().unwrap();
589 sender.task = None;
590 sender.is_parked = true;
591 }
592
593 // Send handle over queue
594 let t = self.sender_task.clone();
595 self.inner.parked_queue.push(t);
596
597 // Check to make sure we weren't closed after we sent our task on the
598 // queue
599 let state = decode_state(self.inner.state.load(SeqCst));
600 self.maybe_parked = state.is_open;
601 }
602
603 /// Polls the channel to determine if there is guaranteed capacity to send
604 /// at least one item without waiting.
605 ///
606 /// # Return value
607 ///
608 /// This method returns:
609 ///
610 /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
611 /// - `Poll::Pending` if the channel may not have
612 /// capacity, in which case the current task is queued to be notified once
613 /// capacity is available;
614 /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
615 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
616 let state = decode_state(self.inner.state.load(SeqCst));
617 if !state.is_open {
618 return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
619 }
620
621 self.poll_unparked(Some(cx)).map(Ok)
622 }
623
624 /// Returns whether the senders send to the same receiver.
625 fn same_receiver(&self, other: &Self) -> bool {
626 Arc::ptr_eq(&self.inner, &other.inner)
627 }
628
629 /// Returns whether the sender send to this receiver.
630 fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
631 Arc::ptr_eq(&self.inner, receiver)
632 }
633
634 /// Returns pointer to the Arc containing sender
635 ///
636 /// The returned pointer is not referenced and should be only used for hashing!
637 fn ptr(&self) -> *const BoundedInner<T> {
638 &*self.inner
639 }
640
641 /// Returns whether this channel is closed without needing a context.
642 fn is_closed(&self) -> bool {
643 !decode_state(self.inner.state.load(SeqCst)).is_open
644 }
645
646 /// Closes this channel from the sender side, preventing any new messages.
647 fn close_channel(&self) {
648 // There's no need to park this sender, its dropping,
649 // and we don't want to check for capacity, so skip
650 // that stuff from `do_send`.
651
652 self.inner.set_closed();
653 self.inner.recv_task.wake();
654 }
655
656 fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
657 // First check the `maybe_parked` variable. This avoids acquiring the
658 // lock in most cases
659 if self.maybe_parked {
660 // Get a lock on the task handle
661 let mut task = self.sender_task.lock().unwrap();
662
663 if !task.is_parked {
664 self.maybe_parked = false;
665 return Poll::Ready(());
666 }
667
668 // At this point, an unpark request is pending, so there will be an
669 // unpark sometime in the future. We just need to make sure that
670 // the correct task will be notified.
671 //
672 // Update the task in case the `Sender` has been moved to another
673 // task
674 task.task = cx.map(|cx| cx.waker().clone());
675
676 Poll::Pending
677 } else {
678 Poll::Ready(())
679 }
680 }
681}
682
683impl<T> Sender<T> {
684 /// Attempts to send a message on this `Sender`, returning the message
685 /// if there was an error.
686 pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
687 if let Some(inner) = &mut self.0 {
688 inner.try_send(msg)
689 } else {
690 Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
691 }
692 }
693
694 /// Send a message on the channel.
695 ///
696 /// This function should only be called after
697 /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
698 /// ready to receive a message.
699 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
700 self.try_send(msg).map_err(|e| e.err)
701 }
702
703 /// Polls the channel to determine if there is guaranteed capacity to send
704 /// at least one item without waiting.
705 ///
706 /// # Return value
707 ///
708 /// This method returns:
709 ///
710 /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
711 /// - `Poll::Pending` if the channel may not have
712 /// capacity, in which case the current task is queued to be notified once
713 /// capacity is available;
714 /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
715 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
716 let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
717 inner.poll_ready(cx)
718 }
719
720 /// Returns whether this channel is closed without needing a context.
721 pub fn is_closed(&self) -> bool {
722 self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
723 }
724
725 /// Closes this channel from the sender side, preventing any new messages.
726 pub fn close_channel(&mut self) {
727 if let Some(inner) = &mut self.0 {
728 inner.close_channel();
729 }
730 }
731
732 /// Disconnects this sender from the channel, closing it if there are no more senders left.
733 pub fn disconnect(&mut self) {
734 self.0 = None;
735 }
736
737 /// Returns whether the senders send to the same receiver.
738 pub fn same_receiver(&self, other: &Self) -> bool {
739 match (&self.0, &other.0) {
740 (Some(inner), Some(other)) => inner.same_receiver(other),
741 _ => false,
742 }
743 }
744
745 /// Returns whether the sender send to this receiver.
746 pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
747 match (&self.0, &receiver.inner) {
748 (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
749 _ => false,
750 }
751 }
752
753 /// Hashes the receiver into the provided hasher
754 pub fn hash_receiver<H>(&self, hasher: &mut H)
755 where
756 H: std::hash::Hasher,
757 {
758 use std::hash::Hash;
759
760 let ptr = self.0.as_ref().map(|inner| inner.ptr());
761 ptr.hash(hasher);
762 }
763}
764
765impl<T> UnboundedSender<T> {
766 /// Check if the channel is ready to receive a message.
767 pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
768 let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
769 inner.poll_ready_nb()
770 }
771
772 /// Returns whether this channel is closed without needing a context.
773 pub fn is_closed(&self) -> bool {
774 self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
775 }
776
777 /// Closes this channel from the sender side, preventing any new messages.
778 pub fn close_channel(&self) {
779 if let Some(inner) = &self.0 {
780 inner.close_channel();
781 }
782 }
783
784 /// Disconnects this sender from the channel, closing it if there are no more senders left.
785 pub fn disconnect(&mut self) {
786 self.0 = None;
787 }
788
789 // Do the send without parking current task.
790 fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
791 if let Some(inner) = &self.0 {
792 if inner.inc_num_messages().is_some() {
793 inner.queue_push_and_signal(msg);
794 return Ok(());
795 }
796 }
797
798 Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
799 }
800
801 /// Send a message on the channel.
802 ///
803 /// This method should only be called after `poll_ready` has been used to
804 /// verify that the channel is ready to receive a message.
805 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
806 self.do_send_nb(msg).map_err(|e| e.err)
807 }
808
809 /// Sends a message along this channel.
810 ///
811 /// This is an unbounded sender, so this function differs from `Sink::send`
812 /// by ensuring the return type reflects that the channel is always ready to
813 /// receive messages.
814 pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
815 self.do_send_nb(msg)
816 }
817
818 /// Returns whether the senders send to the same receiver.
819 pub fn same_receiver(&self, other: &Self) -> bool {
820 match (&self.0, &other.0) {
821 (Some(inner), Some(other)) => inner.same_receiver(other),
822 _ => false,
823 }
824 }
825
826 /// Returns whether the sender send to this receiver.
827 pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
828 match (&self.0, &receiver.inner) {
829 (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
830 _ => false,
831 }
832 }
833
834 /// Hashes the receiver into the provided hasher
835 pub fn hash_receiver<H>(&self, hasher: &mut H)
836 where
837 H: std::hash::Hasher,
838 {
839 use std::hash::Hash;
840
841 let ptr = self.0.as_ref().map(|inner| inner.ptr());
842 ptr.hash(hasher);
843 }
844
845 /// Return the number of messages in the queue or 0 if channel is disconnected.
846 pub fn len(&self) -> usize {
847 if let Some(sender) = &self.0 {
848 decode_state(sender.inner.state.load(SeqCst)).num_messages
849 } else {
850 0
851 }
852 }
853
854 /// Return false is channel has no queued messages, true otherwise.
855 pub fn is_empty(&self) -> bool {
856 self.len() == 0
857 }
858}
859
860impl<T> Clone for Sender<T> {
861 fn clone(&self) -> Self {
862 Self(self.0.clone())
863 }
864}
865
866impl<T> Clone for UnboundedSender<T> {
867 fn clone(&self) -> Self {
868 Self(self.0.clone())
869 }
870}
871
872impl<T> Clone for UnboundedSenderInner<T> {
873 fn clone(&self) -> Self {
874 // Since this atomic op isn't actually guarding any memory and we don't
875 // care about any orderings besides the ordering on the single atomic
876 // variable, a relaxed ordering is acceptable.
877 let mut curr = self.inner.num_senders.load(SeqCst);
878
879 loop {
880 // If the maximum number of senders has been reached, then fail
881 if curr == MAX_BUFFER {
882 panic!("cannot clone `Sender` -- too many outstanding senders");
883 }
884
885 debug_assert!(curr < MAX_BUFFER);
886
887 let next = curr + 1;
888 match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
889 Ok(_) => {
890 // The ABA problem doesn't matter here. We only care that the
891 // number of senders never exceeds the maximum.
892 return Self { inner: self.inner.clone() };
893 }
894 Err(actual) => curr = actual,
895 }
896 }
897 }
898}
899
900impl<T> Clone for BoundedSenderInner<T> {
901 fn clone(&self) -> Self {
902 // Since this atomic op isn't actually guarding any memory and we don't
903 // care about any orderings besides the ordering on the single atomic
904 // variable, a relaxed ordering is acceptable.
905 let mut curr = self.inner.num_senders.load(SeqCst);
906
907 loop {
908 // If the maximum number of senders has been reached, then fail
909 if curr == self.inner.max_senders() {
910 panic!("cannot clone `Sender` -- too many outstanding senders");
911 }
912
913 debug_assert!(curr < self.inner.max_senders());
914
915 let next = curr + 1;
916 match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
917 Ok(_) => {
918 // The ABA problem doesn't matter here. We only care that the
919 // number of senders never exceeds the maximum.
920 return Self {
921 inner: self.inner.clone(),
922 sender_task: Arc::new(Mutex::new(SenderTask::new())),
923 maybe_parked: false,
924 };
925 }
926 Err(actual) => curr = actual,
927 }
928 }
929 }
930}
931
932impl<T> Drop for UnboundedSenderInner<T> {
933 fn drop(&mut self) {
934 // Ordering between variables don't matter here
935 let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
936
937 if prev == 1 {
938 self.close_channel();
939 }
940 }
941}
942
943impl<T> Drop for BoundedSenderInner<T> {
944 fn drop(&mut self) {
945 // Ordering between variables don't matter here
946 let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
947
948 if prev == 1 {
949 self.close_channel();
950 }
951 }
952}
953
954impl<T> fmt::Debug for Sender<T> {
955 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
956 f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
957 }
958}
959
960impl<T> fmt::Debug for UnboundedSender<T> {
961 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
962 f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
963 }
964}
965
966/*
967 *
968 * ===== impl Receiver =====
969 *
970 */
971
972impl<T> Receiver<T> {
973 /// Closes the receiving half of a channel, without dropping it.
974 ///
975 /// This prevents any further messages from being sent on the channel while
976 /// still enabling the receiver to drain messages that are buffered.
977 pub fn close(&mut self) {
978 if let Some(inner) = &mut self.inner {
979 inner.set_closed();
980
981 // Wake up any threads waiting as they'll see that we've closed the
982 // channel and will continue on their merry way.
983 while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
984 task.lock().unwrap().notify();
985 }
986 }
987 }
988
989 /// Tries to receive the next message without notifying a context if empty.
990 ///
991 /// It is not recommended to call this function from inside of a future,
992 /// only when you've otherwise arranged to be notified when the channel is
993 /// no longer empty.
994 ///
995 /// This function returns:
996 /// * `Ok(Some(t))` when message is fetched
997 /// * `Ok(None)` when channel is closed and no messages left in the queue
998 /// * `Err(e)` when there are no messages available, but channel is not yet closed
999 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1000 match self.next_message() {
1001 Poll::Ready(msg) => Ok(msg),
1002 Poll::Pending => Err(TryRecvError { _priv: () }),
1003 }
1004 }
1005
1006 fn next_message(&mut self) -> Poll<Option<T>> {
1007 let inner = match self.inner.as_mut() {
1008 None => return Poll::Ready(None),
1009 Some(inner) => inner,
1010 };
1011 // Pop off a message
1012 match unsafe { inner.message_queue.pop_spin() } {
1013 Some(msg) => {
1014 // If there are any parked task handles in the parked queue,
1015 // pop one and unpark it.
1016 self.unpark_one();
1017
1018 // Decrement number of messages
1019 self.dec_num_messages();
1020
1021 Poll::Ready(Some(msg))
1022 }
1023 None => {
1024 let state = decode_state(inner.state.load(SeqCst));
1025 if state.is_closed() {
1026 // If closed flag is set AND there are no pending messages
1027 // it means end of stream
1028 self.inner = None;
1029 Poll::Ready(None)
1030 } else {
1031 // If queue is open, we need to return Pending
1032 // to be woken up when new messages arrive.
1033 // If queue is closed but num_messages is non-zero,
1034 // it means that senders updated the state,
1035 // but didn't put message to queue yet,
1036 // so we need to park until sender unparks the task
1037 // after queueing the message.
1038 Poll::Pending
1039 }
1040 }
1041 }
1042 }
1043
1044 // Unpark a single task handle if there is one pending in the parked queue
1045 fn unpark_one(&mut self) {
1046 if let Some(inner) = &mut self.inner {
1047 if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1048 task.lock().unwrap().notify();
1049 }
1050 }
1051 }
1052
1053 fn dec_num_messages(&self) {
1054 if let Some(inner) = &self.inner {
1055 // OPEN_MASK is highest bit, so it's unaffected by subtraction
1056 // unless there's underflow, and we know there's no underflow
1057 // because number of messages at this point is always > 0.
1058 inner.state.fetch_sub(1, SeqCst);
1059 }
1060 }
1061}
1062
1063// The receiver does not ever take a Pin to the inner T
1064impl<T> Unpin for Receiver<T> {}
1065
1066impl<T> FusedStream for Receiver<T> {
1067 fn is_terminated(&self) -> bool {
1068 self.inner.is_none()
1069 }
1070}
1071
1072impl<T> Stream for Receiver<T> {
1073 type Item = T;
1074
1075 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1076 // Try to read a message off of the message queue.
1077 match self.next_message() {
1078 Poll::Ready(msg) => {
1079 if msg.is_none() {
1080 self.inner = None;
1081 }
1082 Poll::Ready(msg)
1083 }
1084 Poll::Pending => {
1085 // There are no messages to read, in this case, park.
1086 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1087 // Check queue again after parking to prevent race condition:
1088 // a message could be added to the queue after previous `next_message`
1089 // before `register` call.
1090 self.next_message()
1091 }
1092 }
1093 }
1094
1095 fn size_hint(&self) -> (usize, Option<usize>) {
1096 if let Some(inner) = &self.inner {
1097 decode_state(inner.state.load(SeqCst)).size_hint()
1098 } else {
1099 (0, Some(0))
1100 }
1101 }
1102}
1103
1104impl<T> Drop for Receiver<T> {
1105 fn drop(&mut self) {
1106 // Drain the channel of all pending messages
1107 self.close();
1108 if self.inner.is_some() {
1109 loop {
1110 match self.next_message() {
1111 Poll::Ready(Some(_)) => {}
1112 Poll::Ready(None) => break,
1113 Poll::Pending => {
1114 let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1115
1116 // If the channel is closed, then there is no need to park.
1117 if state.is_closed() {
1118 break;
1119 }
1120
1121 // TODO: Spinning isn't ideal, it might be worth
1122 // investigating using a condvar or some other strategy
1123 // here. That said, if this case is hit, then another thread
1124 // is about to push the value into the queue and this isn't
1125 // the only spinlock in the impl right now.
1126 thread::yield_now();
1127 }
1128 }
1129 }
1130 }
1131 }
1132}
1133
1134impl<T> fmt::Debug for Receiver<T> {
1135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1136 let closed = if let Some(ref inner) = self.inner {
1137 decode_state(inner.state.load(SeqCst)).is_closed()
1138 } else {
1139 false
1140 };
1141
1142 f.debug_struct("Receiver").field("closed", &closed).finish()
1143 }
1144}
1145
1146impl<T> UnboundedReceiver<T> {
1147 /// Closes the receiving half of a channel, without dropping it.
1148 ///
1149 /// This prevents any further messages from being sent on the channel while
1150 /// still enabling the receiver to drain messages that are buffered.
1151 pub fn close(&mut self) {
1152 if let Some(inner) = &mut self.inner {
1153 inner.set_closed();
1154 }
1155 }
1156
1157 /// Tries to receive the next message without notifying a context if empty.
1158 ///
1159 /// It is not recommended to call this function from inside of a future,
1160 /// only when you've otherwise arranged to be notified when the channel is
1161 /// no longer empty.
1162 ///
1163 /// This function returns:
1164 /// * `Ok(Some(t))` when message is fetched
1165 /// * `Ok(None)` when channel is closed and no messages left in the queue
1166 /// * `Err(e)` when there are no messages available, but channel is not yet closed
1167 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1168 match self.next_message() {
1169 Poll::Ready(msg) => Ok(msg),
1170 Poll::Pending => Err(TryRecvError { _priv: () }),
1171 }
1172 }
1173
1174 fn next_message(&mut self) -> Poll<Option<T>> {
1175 let inner = match self.inner.as_mut() {
1176 None => return Poll::Ready(None),
1177 Some(inner) => inner,
1178 };
1179 // Pop off a message
1180 match unsafe { inner.message_queue.pop_spin() } {
1181 Some(msg) => {
1182 // Decrement number of messages
1183 self.dec_num_messages();
1184
1185 Poll::Ready(Some(msg))
1186 }
1187 None => {
1188 let state = decode_state(inner.state.load(SeqCst));
1189 if state.is_closed() {
1190 // If closed flag is set AND there are no pending messages
1191 // it means end of stream
1192 self.inner = None;
1193 Poll::Ready(None)
1194 } else {
1195 // If queue is open, we need to return Pending
1196 // to be woken up when new messages arrive.
1197 // If queue is closed but num_messages is non-zero,
1198 // it means that senders updated the state,
1199 // but didn't put message to queue yet,
1200 // so we need to park until sender unparks the task
1201 // after queueing the message.
1202 Poll::Pending
1203 }
1204 }
1205 }
1206 }
1207
1208 fn dec_num_messages(&self) {
1209 if let Some(inner) = &self.inner {
1210 // OPEN_MASK is highest bit, so it's unaffected by subtraction
1211 // unless there's underflow, and we know there's no underflow
1212 // because number of messages at this point is always > 0.
1213 inner.state.fetch_sub(1, SeqCst);
1214 }
1215 }
1216}
1217
1218impl<T> FusedStream for UnboundedReceiver<T> {
1219 fn is_terminated(&self) -> bool {
1220 self.inner.is_none()
1221 }
1222}
1223
1224impl<T> Stream for UnboundedReceiver<T> {
1225 type Item = T;
1226
1227 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1228 // Try to read a message off of the message queue.
1229 match self.next_message() {
1230 Poll::Ready(msg) => {
1231 if msg.is_none() {
1232 self.inner = None;
1233 }
1234 Poll::Ready(msg)
1235 }
1236 Poll::Pending => {
1237 // There are no messages to read, in this case, park.
1238 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1239 // Check queue again after parking to prevent race condition:
1240 // a message could be added to the queue after previous `next_message`
1241 // before `register` call.
1242 self.next_message()
1243 }
1244 }
1245 }
1246
1247 fn size_hint(&self) -> (usize, Option<usize>) {
1248 if let Some(inner) = &self.inner {
1249 decode_state(inner.state.load(SeqCst)).size_hint()
1250 } else {
1251 (0, Some(0))
1252 }
1253 }
1254}
1255
1256impl<T> Drop for UnboundedReceiver<T> {
1257 fn drop(&mut self) {
1258 // Drain the channel of all pending messages
1259 self.close();
1260 if self.inner.is_some() {
1261 loop {
1262 match self.next_message() {
1263 Poll::Ready(Some(_)) => {}
1264 Poll::Ready(None) => break,
1265 Poll::Pending => {
1266 let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1267
1268 // If the channel is closed, then there is no need to park.
1269 if state.is_closed() {
1270 break;
1271 }
1272
1273 // TODO: Spinning isn't ideal, it might be worth
1274 // investigating using a condvar or some other strategy
1275 // here. That said, if this case is hit, then another thread
1276 // is about to push the value into the queue and this isn't
1277 // the only spinlock in the impl right now.
1278 thread::yield_now();
1279 }
1280 }
1281 }
1282 }
1283 }
1284}
1285
1286impl<T> fmt::Debug for UnboundedReceiver<T> {
1287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1288 let closed = if let Some(ref inner) = self.inner {
1289 decode_state(inner.state.load(SeqCst)).is_closed()
1290 } else {
1291 false
1292 };
1293
1294 f.debug_struct("Receiver").field("closed", &closed).finish()
1295 }
1296}
1297
1298/*
1299 *
1300 * ===== impl Inner =====
1301 *
1302 */
1303
1304impl<T> UnboundedInner<T> {
1305 // Clear `open` flag in the state, keep `num_messages` intact.
1306 fn set_closed(&self) {
1307 let curr = self.state.load(SeqCst);
1308 if !decode_state(curr).is_open {
1309 return;
1310 }
1311
1312 self.state.fetch_and(!OPEN_MASK, SeqCst);
1313 }
1314}
1315
1316impl<T> BoundedInner<T> {
1317 // The return value is such that the total number of messages that can be
1318 // enqueued into the channel will never exceed MAX_CAPACITY
1319 fn max_senders(&self) -> usize {
1320 MAX_CAPACITY - self.buffer
1321 }
1322
1323 // Clear `open` flag in the state, keep `num_messages` intact.
1324 fn set_closed(&self) {
1325 let curr = self.state.load(SeqCst);
1326 if !decode_state(curr).is_open {
1327 return;
1328 }
1329
1330 self.state.fetch_and(!OPEN_MASK, SeqCst);
1331 }
1332}
1333
1334unsafe impl<T: Send> Send for UnboundedInner<T> {}
1335unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1336
1337unsafe impl<T: Send> Send for BoundedInner<T> {}
1338unsafe impl<T: Send> Sync for BoundedInner<T> {}
1339
1340impl State {
1341 fn is_closed(&self) -> bool {
1342 !self.is_open && self.num_messages == 0
1343 }
1344
1345 fn size_hint(&self) -> (usize, Option<usize>) {
1346 if self.is_open {
1347 (self.num_messages, None)
1348 } else {
1349 (self.num_messages, Some(self.num_messages))
1350 }
1351 }
1352}
1353
1354/*
1355 *
1356 * ===== Helpers =====
1357 *
1358 */
1359
1360fn decode_state(num: usize) -> State {
1361 State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1362}
1363
1364fn encode_state(state: &State) -> usize {
1365 let mut num = state.num_messages;
1366
1367 if state.is_open {
1368 num |= OPEN_MASK;
1369 }
1370
1371 num
1372}
1373