1//! A multi-producer, single-consumer queue for sending values across
2//! asynchronous tasks.
3//!
4//! Similarly to the `std`, channel creation provides [`Receiver`] and
5//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6//! read values out of the channel. If there is no message to read from the
7//! channel, the current task will be notified when a new value is sent.
8//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9//! the channel. If the channel is at capacity, the send will be rejected and
10//! the task will be notified when additional capacity is available. In other
11//! words, the channel provides backpressure.
12//!
13//! Unbounded channels are also available using the `unbounded` constructor.
14//!
15//! # Disconnection
16//!
17//! When all [`Sender`] handles have been dropped, it is no longer
18//! possible to send values into the channel. This is considered the termination
19//! event of the stream. As such, [`Receiver::poll_next`]
20//! will return `Ok(Ready(None))`.
21//!
22//! If the [`Receiver`] handle is dropped, then messages can no longer
23//! be read out of the channel. In this case, all further attempts to send will
24//! result in an error.
25//!
26//! # Clean Shutdown
27//!
28//! If the [`Receiver`] is simply dropped, then it is possible for
29//! there to be messages still in the channel that will not be processed. As
30//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31//! receiver will first call `close`, which will prevent any further messages to
32//! be sent into the channel. Then, the receiver consumes the channel to
33//! completion, at which point the receiver can be dropped.
34//!
35//! [`Sender`]: struct.Sender.html
36//! [`Receiver`]: struct.Receiver.html
37//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38//! [`Receiver::poll_next`]:
39//! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40
41// At the core, the channel uses an atomic FIFO queue for message passing. This
42// queue is used as the primary coordination primitive. In order to enforce
43// capacity limits and handle back pressure, a secondary FIFO queue is used to
44// send parked task handles.
45//
46// The general idea is that the channel is created with a `buffer` size of `n`.
47// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48// slot to hold a message. This allows `Sender` to know for a fact that a send
49// will succeed *before* starting to do the actual work of sending the value.
50// Since most of this work is lock-free, once the work starts, it is impossible
51// to safely revert.
52//
53// If the sender is unable to process a send operation, then the current
54// task is parked and the handle is sent on the parked task queue.
55//
56// Note that the implementation guarantees that the channel capacity will never
57// exceed the configured limit, however there is no *strict* guarantee that the
58// receiver will wake up a parked task *immediately* when a slot becomes
59// available. However, it will almost always unpark a task when a slot becomes
60// available and it is *guaranteed* that a sender will be unparked when the
61// message that caused the sender to become parked is read out of the channel.
62//
63// The steps for sending a message are roughly:
64//
65// 1) Increment the channel message count
66// 2) If the channel is at capacity, push the task handle onto the wait queue
67// 3) Push the message onto the message queue.
68//
69// The steps for receiving a message are roughly:
70//
71// 1) Pop a message from the message queue
72// 2) Pop a task handle from the wait queue
73// 3) Decrement the channel message count.
74//
75// It's important for the order of operations on lock-free structures to happen
76// in reverse order between the sender and receiver. This makes the message
77// queue the primary coordination structure and establishes the necessary
78// happens-before semantics required for the acquire / release semantics used
79// by the queue structure.
80
81use futures_core::stream::{FusedStream, Stream};
82use futures_core::task::__internal::AtomicWaker;
83use futures_core::task::{Context, Poll, Waker};
84use std::fmt;
85use std::pin::Pin;
86use std::sync::atomic::AtomicUsize;
87use std::sync::atomic::Ordering::SeqCst;
88use std::sync::{Arc, Mutex};
89use std::thread;
90
91use crate::mpsc::queue::Queue;
92
93mod queue;
94#[cfg(feature = "sink")]
95mod sink_impl;
96
97struct UnboundedSenderInner<T> {
98 // Channel state shared between the sender and receiver.
99 inner: Arc<UnboundedInner<T>>,
100}
101
102struct BoundedSenderInner<T> {
103 // Channel state shared between the sender and receiver.
104 inner: Arc<BoundedInner<T>>,
105
106 // Handle to the task that is blocked on this sender. This handle is sent
107 // to the receiver half in order to be notified when the sender becomes
108 // unblocked.
109 sender_task: Arc<Mutex<SenderTask>>,
110
111 // `true` if the sender might be blocked. This is an optimization to avoid
112 // having to lock the mutex most of the time.
113 maybe_parked: bool,
114}
115
116// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
117impl<T> Unpin for UnboundedSenderInner<T> {}
118impl<T> Unpin for BoundedSenderInner<T> {}
119
120/// The transmission end of a bounded mpsc channel.
121///
122/// This value is created by the [`channel`](channel) function.
123pub struct Sender<T>(Option<BoundedSenderInner<T>>);
124
125/// The transmission end of an unbounded mpsc channel.
126///
127/// This value is created by the [`unbounded`](unbounded) function.
128pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
129
130trait AssertKinds: Send + Sync + Clone {}
131impl AssertKinds for UnboundedSender<u32> {}
132
133/// The receiving end of a bounded mpsc channel.
134///
135/// This value is created by the [`channel`](channel) function.
136pub struct Receiver<T> {
137 inner: Option<Arc<BoundedInner<T>>>,
138}
139
140/// The receiving end of an unbounded mpsc channel.
141///
142/// This value is created by the [`unbounded`](unbounded) function.
143pub struct UnboundedReceiver<T> {
144 inner: Option<Arc<UnboundedInner<T>>>,
145}
146
147// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
148impl<T> Unpin for UnboundedReceiver<T> {}
149
150/// The error type for [`Sender`s](Sender) used as `Sink`s.
151#[derive(Clone, Debug, PartialEq, Eq)]
152pub struct SendError {
153 kind: SendErrorKind,
154}
155
156/// The error type returned from [`try_send`](Sender::try_send).
157#[derive(Clone, PartialEq, Eq)]
158pub struct TrySendError<T> {
159 err: SendError,
160 val: T,
161}
162
163#[derive(Clone, Debug, PartialEq, Eq)]
164enum SendErrorKind {
165 Full,
166 Disconnected,
167}
168
169/// The error type returned from [`try_next`](Receiver::try_next).
170pub struct TryRecvError {
171 _priv: (),
172}
173
174impl fmt::Display for SendError {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 if self.is_full() {
177 write!(f, "send failed because channel is full")
178 } else {
179 write!(f, "send failed because receiver is gone")
180 }
181 }
182}
183
184impl std::error::Error for SendError {}
185
186impl SendError {
187 /// Returns `true` if this error is a result of the channel being full.
188 pub fn is_full(&self) -> bool {
189 match self.kind {
190 SendErrorKind::Full => true,
191 _ => false,
192 }
193 }
194
195 /// Returns `true` if this error is a result of the receiver being dropped.
196 pub fn is_disconnected(&self) -> bool {
197 match self.kind {
198 SendErrorKind::Disconnected => true,
199 _ => false,
200 }
201 }
202}
203
204impl<T> fmt::Debug for TrySendError<T> {
205 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206 f.debug_struct("TrySendError").field(name:"kind", &self.err.kind).finish()
207 }
208}
209
210impl<T> fmt::Display for TrySendError<T> {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 if self.is_full() {
213 write!(f, "send failed because channel is full")
214 } else {
215 write!(f, "send failed because receiver is gone")
216 }
217 }
218}
219
220impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
221
222impl<T> TrySendError<T> {
223 /// Returns `true` if this error is a result of the channel being full.
224 pub fn is_full(&self) -> bool {
225 self.err.is_full()
226 }
227
228 /// Returns `true` if this error is a result of the receiver being dropped.
229 pub fn is_disconnected(&self) -> bool {
230 self.err.is_disconnected()
231 }
232
233 /// Returns the message that was attempted to be sent but failed.
234 pub fn into_inner(self) -> T {
235 self.val
236 }
237
238 /// Drops the message and converts into a `SendError`.
239 pub fn into_send_error(self) -> SendError {
240 self.err
241 }
242}
243
244impl fmt::Debug for TryRecvError {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 f.debug_tuple(name:"TryRecvError").finish()
247 }
248}
249
250impl fmt::Display for TryRecvError {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 write!(f, "receiver channel is empty")
253 }
254}
255
256impl std::error::Error for TryRecvError {}
257
258struct UnboundedInner<T> {
259 // Internal channel state. Consists of the number of messages stored in the
260 // channel as well as a flag signalling that the channel is closed.
261 state: AtomicUsize,
262
263 // Atomic, FIFO queue used to send messages to the receiver
264 message_queue: Queue<T>,
265
266 // Number of senders in existence
267 num_senders: AtomicUsize,
268
269 // Handle to the receiver's task.
270 recv_task: AtomicWaker,
271}
272
273struct BoundedInner<T> {
274 // Max buffer size of the channel. If `None` then the channel is unbounded.
275 buffer: usize,
276
277 // Internal channel state. Consists of the number of messages stored in the
278 // channel as well as a flag signalling that the channel is closed.
279 state: AtomicUsize,
280
281 // Atomic, FIFO queue used to send messages to the receiver
282 message_queue: Queue<T>,
283
284 // Atomic, FIFO queue used to send parked task handles to the receiver.
285 parked_queue: Queue<Arc<Mutex<SenderTask>>>,
286
287 // Number of senders in existence
288 num_senders: AtomicUsize,
289
290 // Handle to the receiver's task.
291 recv_task: AtomicWaker,
292}
293
294// Struct representation of `Inner::state`.
295#[derive(Clone, Copy)]
296struct State {
297 // `true` when the channel is open
298 is_open: bool,
299
300 // Number of messages in the channel
301 num_messages: usize,
302}
303
304// The `is_open` flag is stored in the left-most bit of `Inner::state`
305const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
306
307// When a new channel is created, it is created in the open state with no
308// pending messages.
309const INIT_STATE: usize = OPEN_MASK;
310
311// The maximum number of messages that a channel can track is `usize::max_value() >> 1`
312const MAX_CAPACITY: usize = !(OPEN_MASK);
313
314// The maximum requested buffer size must be less than the maximum capacity of
315// a channel. This is because each sender gets a guaranteed slot.
316const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
317
318// Sent to the consumer to wake up blocked producers
319struct SenderTask {
320 task: Option<Waker>,
321 is_parked: bool,
322}
323
324impl SenderTask {
325 fn new() -> Self {
326 Self { task: None, is_parked: false }
327 }
328
329 fn notify(&mut self) {
330 self.is_parked = false;
331
332 if let Some(task: Waker) = self.task.take() {
333 task.wake();
334 }
335 }
336}
337
338/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
339///
340/// Being bounded, this channel provides backpressure to ensure that the sender
341/// outpaces the receiver by only a limited amount. The channel's capacity is
342/// equal to `buffer + num-senders`. In other words, each sender gets a
343/// guaranteed slot in the channel capacity, and on top of that there are
344/// `buffer` "first come, first serve" slots available to all senders.
345///
346/// The [`Receiver`](Receiver) returned implements the
347/// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
348/// `Sink`.
349pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
350 // Check that the requested buffer size does not exceed the maximum buffer
351 // size permitted by the system.
352 assert!(buffer < MAX_BUFFER, "requested buffer size too large");
353
354 let inner: Arc> = Arc::new(data:BoundedInner {
355 buffer,
356 state: AtomicUsize::new(INIT_STATE),
357 message_queue: Queue::new(),
358 parked_queue: Queue::new(),
359 num_senders: AtomicUsize::new(1),
360 recv_task: AtomicWaker::new(),
361 });
362
363 let tx: BoundedSenderInner = BoundedSenderInner {
364 inner: inner.clone(),
365 sender_task: Arc::new(data:Mutex::new(SenderTask::new())),
366 maybe_parked: false,
367 };
368
369 let rx: Receiver = Receiver { inner: Some(inner) };
370
371 (Sender(Some(tx)), rx)
372}
373
374/// Creates an unbounded mpsc channel for communicating between asynchronous
375/// tasks.
376///
377/// A `send` on this channel will always succeed as long as the receive half has
378/// not been closed. If the receiver falls behind, messages will be arbitrarily
379/// buffered.
380///
381/// **Note** that the amount of available system memory is an implicit bound to
382/// the channel. Using an `unbounded` channel has the ability of causing the
383/// process to run out of memory. In this case, the process will be aborted.
384pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
385 let inner: Arc> = Arc::new(data:UnboundedInner {
386 state: AtomicUsize::new(INIT_STATE),
387 message_queue: Queue::new(),
388 num_senders: AtomicUsize::new(1),
389 recv_task: AtomicWaker::new(),
390 });
391
392 let tx: UnboundedSenderInner = UnboundedSenderInner { inner: inner.clone() };
393
394 let rx: UnboundedReceiver = UnboundedReceiver { inner: Some(inner) };
395
396 (UnboundedSender(Some(tx)), rx)
397}
398
399/*
400 *
401 * ===== impl Sender =====
402 *
403 */
404
405impl<T> UnboundedSenderInner<T> {
406 fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
407 let state = decode_state(self.inner.state.load(SeqCst));
408 if state.is_open {
409 Poll::Ready(Ok(()))
410 } else {
411 Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
412 }
413 }
414
415 // Push message to the queue and signal to the receiver
416 fn queue_push_and_signal(&self, msg: T) {
417 // Push the message onto the message queue
418 self.inner.message_queue.push(msg);
419
420 // Signal to the receiver that a message has been enqueued. If the
421 // receiver is parked, this will unpark the task.
422 self.inner.recv_task.wake();
423 }
424
425 // Increment the number of queued messages. Returns the resulting number.
426 fn inc_num_messages(&self) -> Option<usize> {
427 let mut curr = self.inner.state.load(SeqCst);
428
429 loop {
430 let mut state = decode_state(curr);
431
432 // The receiver end closed the channel.
433 if !state.is_open {
434 return None;
435 }
436
437 // This probably is never hit? Odds are the process will run out of
438 // memory first. It may be worth to return something else in this
439 // case?
440 assert!(
441 state.num_messages < MAX_CAPACITY,
442 "buffer space \
443 exhausted; sending this messages would overflow the state"
444 );
445
446 state.num_messages += 1;
447
448 let next = encode_state(&state);
449 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
450 Ok(_) => return Some(state.num_messages),
451 Err(actual) => curr = actual,
452 }
453 }
454 }
455
456 /// Returns whether the senders send to the same receiver.
457 fn same_receiver(&self, other: &Self) -> bool {
458 Arc::ptr_eq(&self.inner, &other.inner)
459 }
460
461 /// Returns whether the sender send to this receiver.
462 fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
463 Arc::ptr_eq(&self.inner, inner)
464 }
465
466 /// Returns pointer to the Arc containing sender
467 ///
468 /// The returned pointer is not referenced and should be only used for hashing!
469 fn ptr(&self) -> *const UnboundedInner<T> {
470 &*self.inner
471 }
472
473 /// Returns whether this channel is closed without needing a context.
474 fn is_closed(&self) -> bool {
475 !decode_state(self.inner.state.load(SeqCst)).is_open
476 }
477
478 /// Closes this channel from the sender side, preventing any new messages.
479 fn close_channel(&self) {
480 // There's no need to park this sender, its dropping,
481 // and we don't want to check for capacity, so skip
482 // that stuff from `do_send`.
483
484 self.inner.set_closed();
485 self.inner.recv_task.wake();
486 }
487}
488
489impl<T> BoundedSenderInner<T> {
490 /// Attempts to send a message on this `Sender`, returning the message
491 /// if there was an error.
492 fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
493 // If the sender is currently blocked, reject the message
494 if !self.poll_unparked(None).is_ready() {
495 return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
496 }
497
498 // The channel has capacity to accept the message, so send it
499 self.do_send_b(msg)
500 }
501
502 // Do the send without failing.
503 // Can be called only by bounded sender.
504 fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
505 // Anyone calling do_send *should* make sure there is room first,
506 // but assert here for tests as a sanity check.
507 debug_assert!(self.poll_unparked(None).is_ready());
508
509 // First, increment the number of messages contained by the channel.
510 // This operation will also atomically determine if the sender task
511 // should be parked.
512 //
513 // `None` is returned in the case that the channel has been closed by the
514 // receiver. This happens when `Receiver::close` is called or the
515 // receiver is dropped.
516 let park_self = match self.inc_num_messages() {
517 Some(num_messages) => {
518 // Block if the current number of pending messages has exceeded
519 // the configured buffer size
520 num_messages > self.inner.buffer
521 }
522 None => {
523 return Err(TrySendError {
524 err: SendError { kind: SendErrorKind::Disconnected },
525 val: msg,
526 })
527 }
528 };
529
530 // If the channel has reached capacity, then the sender task needs to
531 // be parked. This will send the task handle on the parked task queue.
532 //
533 // However, when `do_send` is called while dropping the `Sender`,
534 // `task::current()` can't be called safely. In this case, in order to
535 // maintain internal consistency, a blank message is pushed onto the
536 // parked task queue.
537 if park_self {
538 self.park();
539 }
540
541 self.queue_push_and_signal(msg);
542
543 Ok(())
544 }
545
546 // Push message to the queue and signal to the receiver
547 fn queue_push_and_signal(&self, msg: T) {
548 // Push the message onto the message queue
549 self.inner.message_queue.push(msg);
550
551 // Signal to the receiver that a message has been enqueued. If the
552 // receiver is parked, this will unpark the task.
553 self.inner.recv_task.wake();
554 }
555
556 // Increment the number of queued messages. Returns the resulting number.
557 fn inc_num_messages(&self) -> Option<usize> {
558 let mut curr = self.inner.state.load(SeqCst);
559
560 loop {
561 let mut state = decode_state(curr);
562
563 // The receiver end closed the channel.
564 if !state.is_open {
565 return None;
566 }
567
568 // This probably is never hit? Odds are the process will run out of
569 // memory first. It may be worth to return something else in this
570 // case?
571 assert!(
572 state.num_messages < MAX_CAPACITY,
573 "buffer space \
574 exhausted; sending this messages would overflow the state"
575 );
576
577 state.num_messages += 1;
578
579 let next = encode_state(&state);
580 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
581 Ok(_) => return Some(state.num_messages),
582 Err(actual) => curr = actual,
583 }
584 }
585 }
586
587 fn park(&mut self) {
588 {
589 let mut sender = self.sender_task.lock().unwrap();
590 sender.task = None;
591 sender.is_parked = true;
592 }
593
594 // Send handle over queue
595 let t = self.sender_task.clone();
596 self.inner.parked_queue.push(t);
597
598 // Check to make sure we weren't closed after we sent our task on the
599 // queue
600 let state = decode_state(self.inner.state.load(SeqCst));
601 self.maybe_parked = state.is_open;
602 }
603
604 /// Polls the channel to determine if there is guaranteed capacity to send
605 /// at least one item without waiting.
606 ///
607 /// # Return value
608 ///
609 /// This method returns:
610 ///
611 /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
612 /// - `Poll::Pending` if the channel may not have
613 /// capacity, in which case the current task is queued to be notified once
614 /// capacity is available;
615 /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
616 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
617 let state = decode_state(self.inner.state.load(SeqCst));
618 if !state.is_open {
619 return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
620 }
621
622 self.poll_unparked(Some(cx)).map(Ok)
623 }
624
625 /// Returns whether the senders send to the same receiver.
626 fn same_receiver(&self, other: &Self) -> bool {
627 Arc::ptr_eq(&self.inner, &other.inner)
628 }
629
630 /// Returns whether the sender send to this receiver.
631 fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
632 Arc::ptr_eq(&self.inner, receiver)
633 }
634
635 /// Returns pointer to the Arc containing sender
636 ///
637 /// The returned pointer is not referenced and should be only used for hashing!
638 fn ptr(&self) -> *const BoundedInner<T> {
639 &*self.inner
640 }
641
642 /// Returns whether this channel is closed without needing a context.
643 fn is_closed(&self) -> bool {
644 !decode_state(self.inner.state.load(SeqCst)).is_open
645 }
646
647 /// Closes this channel from the sender side, preventing any new messages.
648 fn close_channel(&self) {
649 // There's no need to park this sender, its dropping,
650 // and we don't want to check for capacity, so skip
651 // that stuff from `do_send`.
652
653 self.inner.set_closed();
654 self.inner.recv_task.wake();
655 }
656
657 fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
658 // First check the `maybe_parked` variable. This avoids acquiring the
659 // lock in most cases
660 if self.maybe_parked {
661 // Get a lock on the task handle
662 let mut task = self.sender_task.lock().unwrap();
663
664 if !task.is_parked {
665 self.maybe_parked = false;
666 return Poll::Ready(());
667 }
668
669 // At this point, an unpark request is pending, so there will be an
670 // unpark sometime in the future. We just need to make sure that
671 // the correct task will be notified.
672 //
673 // Update the task in case the `Sender` has been moved to another
674 // task
675 task.task = cx.map(|cx| cx.waker().clone());
676
677 Poll::Pending
678 } else {
679 Poll::Ready(())
680 }
681 }
682}
683
684impl<T> Sender<T> {
685 /// Attempts to send a message on this `Sender`, returning the message
686 /// if there was an error.
687 pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
688 if let Some(inner) = &mut self.0 {
689 inner.try_send(msg)
690 } else {
691 Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
692 }
693 }
694
695 /// Send a message on the channel.
696 ///
697 /// This function should only be called after
698 /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
699 /// ready to receive a message.
700 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
701 self.try_send(msg).map_err(|e| e.err)
702 }
703
704 /// Polls the channel to determine if there is guaranteed capacity to send
705 /// at least one item without waiting.
706 ///
707 /// # Return value
708 ///
709 /// This method returns:
710 ///
711 /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
712 /// - `Poll::Pending` if the channel may not have
713 /// capacity, in which case the current task is queued to be notified once
714 /// capacity is available;
715 /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
716 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
717 let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
718 inner.poll_ready(cx)
719 }
720
721 /// Returns whether this channel is closed without needing a context.
722 pub fn is_closed(&self) -> bool {
723 self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
724 }
725
726 /// Closes this channel from the sender side, preventing any new messages.
727 pub fn close_channel(&mut self) {
728 if let Some(inner) = &mut self.0 {
729 inner.close_channel();
730 }
731 }
732
733 /// Disconnects this sender from the channel, closing it if there are no more senders left.
734 pub fn disconnect(&mut self) {
735 self.0 = None;
736 }
737
738 /// Returns whether the senders send to the same receiver.
739 pub fn same_receiver(&self, other: &Self) -> bool {
740 match (&self.0, &other.0) {
741 (Some(inner), Some(other)) => inner.same_receiver(other),
742 _ => false,
743 }
744 }
745
746 /// Returns whether the sender send to this receiver.
747 pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
748 match (&self.0, &receiver.inner) {
749 (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
750 _ => false,
751 }
752 }
753
754 /// Hashes the receiver into the provided hasher
755 pub fn hash_receiver<H>(&self, hasher: &mut H)
756 where
757 H: std::hash::Hasher,
758 {
759 use std::hash::Hash;
760
761 let ptr = self.0.as_ref().map(|inner| inner.ptr());
762 ptr.hash(hasher);
763 }
764}
765
766impl<T> UnboundedSender<T> {
767 /// Check if the channel is ready to receive a message.
768 pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
769 let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
770 inner.poll_ready_nb()
771 }
772
773 /// Returns whether this channel is closed without needing a context.
774 pub fn is_closed(&self) -> bool {
775 self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
776 }
777
778 /// Closes this channel from the sender side, preventing any new messages.
779 pub fn close_channel(&self) {
780 if let Some(inner) = &self.0 {
781 inner.close_channel();
782 }
783 }
784
785 /// Disconnects this sender from the channel, closing it if there are no more senders left.
786 pub fn disconnect(&mut self) {
787 self.0 = None;
788 }
789
790 // Do the send without parking current task.
791 fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
792 if let Some(inner) = &self.0 {
793 if inner.inc_num_messages().is_some() {
794 inner.queue_push_and_signal(msg);
795 return Ok(());
796 }
797 }
798
799 Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
800 }
801
802 /// Send a message on the channel.
803 ///
804 /// This method should only be called after `poll_ready` has been used to
805 /// verify that the channel is ready to receive a message.
806 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
807 self.do_send_nb(msg).map_err(|e| e.err)
808 }
809
810 /// Sends a message along this channel.
811 ///
812 /// This is an unbounded sender, so this function differs from `Sink::send`
813 /// by ensuring the return type reflects that the channel is always ready to
814 /// receive messages.
815 pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
816 self.do_send_nb(msg)
817 }
818
819 /// Returns whether the senders send to the same receiver.
820 pub fn same_receiver(&self, other: &Self) -> bool {
821 match (&self.0, &other.0) {
822 (Some(inner), Some(other)) => inner.same_receiver(other),
823 _ => false,
824 }
825 }
826
827 /// Returns whether the sender send to this receiver.
828 pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
829 match (&self.0, &receiver.inner) {
830 (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
831 _ => false,
832 }
833 }
834
835 /// Hashes the receiver into the provided hasher
836 pub fn hash_receiver<H>(&self, hasher: &mut H)
837 where
838 H: std::hash::Hasher,
839 {
840 use std::hash::Hash;
841
842 let ptr = self.0.as_ref().map(|inner| inner.ptr());
843 ptr.hash(hasher);
844 }
845}
846
847impl<T> Clone for Sender<T> {
848 fn clone(&self) -> Self {
849 Self(self.0.clone())
850 }
851}
852
853impl<T> Clone for UnboundedSender<T> {
854 fn clone(&self) -> Self {
855 Self(self.0.clone())
856 }
857}
858
859impl<T> Clone for UnboundedSenderInner<T> {
860 fn clone(&self) -> Self {
861 // Since this atomic op isn't actually guarding any memory and we don't
862 // care about any orderings besides the ordering on the single atomic
863 // variable, a relaxed ordering is acceptable.
864 let mut curr = self.inner.num_senders.load(SeqCst);
865
866 loop {
867 // If the maximum number of senders has been reached, then fail
868 if curr == MAX_BUFFER {
869 panic!("cannot clone `Sender` -- too many outstanding senders");
870 }
871
872 debug_assert!(curr < MAX_BUFFER);
873
874 let next = curr + 1;
875 match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
876 Ok(_) => {
877 // The ABA problem doesn't matter here. We only care that the
878 // number of senders never exceeds the maximum.
879 return Self { inner: self.inner.clone() };
880 }
881 Err(actual) => curr = actual,
882 }
883 }
884 }
885}
886
887impl<T> Clone for BoundedSenderInner<T> {
888 fn clone(&self) -> Self {
889 // Since this atomic op isn't actually guarding any memory and we don't
890 // care about any orderings besides the ordering on the single atomic
891 // variable, a relaxed ordering is acceptable.
892 let mut curr = self.inner.num_senders.load(SeqCst);
893
894 loop {
895 // If the maximum number of senders has been reached, then fail
896 if curr == self.inner.max_senders() {
897 panic!("cannot clone `Sender` -- too many outstanding senders");
898 }
899
900 debug_assert!(curr < self.inner.max_senders());
901
902 let next = curr + 1;
903 match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
904 Ok(_) => {
905 // The ABA problem doesn't matter here. We only care that the
906 // number of senders never exceeds the maximum.
907 return Self {
908 inner: self.inner.clone(),
909 sender_task: Arc::new(Mutex::new(SenderTask::new())),
910 maybe_parked: false,
911 };
912 }
913 Err(actual) => curr = actual,
914 }
915 }
916 }
917}
918
919impl<T> Drop for UnboundedSenderInner<T> {
920 fn drop(&mut self) {
921 // Ordering between variables don't matter here
922 let prev: usize = self.inner.num_senders.fetch_sub(val:1, order:SeqCst);
923
924 if prev == 1 {
925 self.close_channel();
926 }
927 }
928}
929
930impl<T> Drop for BoundedSenderInner<T> {
931 fn drop(&mut self) {
932 // Ordering between variables don't matter here
933 let prev: usize = self.inner.num_senders.fetch_sub(val:1, order:SeqCst);
934
935 if prev == 1 {
936 self.close_channel();
937 }
938 }
939}
940
941impl<T> fmt::Debug for Sender<T> {
942 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
943 f.debug_struct("Sender").field(name:"closed", &self.is_closed()).finish()
944 }
945}
946
947impl<T> fmt::Debug for UnboundedSender<T> {
948 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
949 f.debug_struct("UnboundedSender").field(name:"closed", &self.is_closed()).finish()
950 }
951}
952
953/*
954 *
955 * ===== impl Receiver =====
956 *
957 */
958
959impl<T> Receiver<T> {
960 /// Closes the receiving half of a channel, without dropping it.
961 ///
962 /// This prevents any further messages from being sent on the channel while
963 /// still enabling the receiver to drain messages that are buffered.
964 pub fn close(&mut self) {
965 if let Some(inner) = &mut self.inner {
966 inner.set_closed();
967
968 // Wake up any threads waiting as they'll see that we've closed the
969 // channel and will continue on their merry way.
970 while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
971 task.lock().unwrap().notify();
972 }
973 }
974 }
975
976 /// Tries to receive the next message without notifying a context if empty.
977 ///
978 /// It is not recommended to call this function from inside of a future,
979 /// only when you've otherwise arranged to be notified when the channel is
980 /// no longer empty.
981 ///
982 /// This function returns:
983 /// * `Ok(Some(t))` when message is fetched
984 /// * `Ok(None)` when channel is closed and no messages left in the queue
985 /// * `Err(e)` when there are no messages available, but channel is not yet closed
986 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
987 match self.next_message() {
988 Poll::Ready(msg) => Ok(msg),
989 Poll::Pending => Err(TryRecvError { _priv: () }),
990 }
991 }
992
993 fn next_message(&mut self) -> Poll<Option<T>> {
994 let inner = match self.inner.as_mut() {
995 None => return Poll::Ready(None),
996 Some(inner) => inner,
997 };
998 // Pop off a message
999 match unsafe { inner.message_queue.pop_spin() } {
1000 Some(msg) => {
1001 // If there are any parked task handles in the parked queue,
1002 // pop one and unpark it.
1003 self.unpark_one();
1004
1005 // Decrement number of messages
1006 self.dec_num_messages();
1007
1008 Poll::Ready(Some(msg))
1009 }
1010 None => {
1011 let state = decode_state(inner.state.load(SeqCst));
1012 if state.is_closed() {
1013 // If closed flag is set AND there are no pending messages
1014 // it means end of stream
1015 self.inner = None;
1016 Poll::Ready(None)
1017 } else {
1018 // If queue is open, we need to return Pending
1019 // to be woken up when new messages arrive.
1020 // If queue is closed but num_messages is non-zero,
1021 // it means that senders updated the state,
1022 // but didn't put message to queue yet,
1023 // so we need to park until sender unparks the task
1024 // after queueing the message.
1025 Poll::Pending
1026 }
1027 }
1028 }
1029 }
1030
1031 // Unpark a single task handle if there is one pending in the parked queue
1032 fn unpark_one(&mut self) {
1033 if let Some(inner) = &mut self.inner {
1034 if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1035 task.lock().unwrap().notify();
1036 }
1037 }
1038 }
1039
1040 fn dec_num_messages(&self) {
1041 if let Some(inner) = &self.inner {
1042 // OPEN_MASK is highest bit, so it's unaffected by subtraction
1043 // unless there's underflow, and we know there's no underflow
1044 // because number of messages at this point is always > 0.
1045 inner.state.fetch_sub(1, SeqCst);
1046 }
1047 }
1048}
1049
1050// The receiver does not ever take a Pin to the inner T
1051impl<T> Unpin for Receiver<T> {}
1052
1053impl<T> FusedStream for Receiver<T> {
1054 fn is_terminated(&self) -> bool {
1055 self.inner.is_none()
1056 }
1057}
1058
1059impl<T> Stream for Receiver<T> {
1060 type Item = T;
1061
1062 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1063 // Try to read a message off of the message queue.
1064 match self.next_message() {
1065 Poll::Ready(msg) => {
1066 if msg.is_none() {
1067 self.inner = None;
1068 }
1069 Poll::Ready(msg)
1070 }
1071 Poll::Pending => {
1072 // There are no messages to read, in this case, park.
1073 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1074 // Check queue again after parking to prevent race condition:
1075 // a message could be added to the queue after previous `next_message`
1076 // before `register` call.
1077 self.next_message()
1078 }
1079 }
1080 }
1081
1082 fn size_hint(&self) -> (usize, Option<usize>) {
1083 if let Some(inner) = &self.inner {
1084 decode_state(inner.state.load(SeqCst)).size_hint()
1085 } else {
1086 (0, Some(0))
1087 }
1088 }
1089}
1090
1091impl<T> Drop for Receiver<T> {
1092 fn drop(&mut self) {
1093 // Drain the channel of all pending messages
1094 self.close();
1095 if self.inner.is_some() {
1096 loop {
1097 match self.next_message() {
1098 Poll::Ready(Some(_)) => {}
1099 Poll::Ready(None) => break,
1100 Poll::Pending => {
1101 let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1102
1103 // If the channel is closed, then there is no need to park.
1104 if state.is_closed() {
1105 break;
1106 }
1107
1108 // TODO: Spinning isn't ideal, it might be worth
1109 // investigating using a condvar or some other strategy
1110 // here. That said, if this case is hit, then another thread
1111 // is about to push the value into the queue and this isn't
1112 // the only spinlock in the impl right now.
1113 thread::yield_now();
1114 }
1115 }
1116 }
1117 }
1118 }
1119}
1120
1121impl<T> fmt::Debug for Receiver<T> {
1122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1123 let closed: bool = if let Some(ref inner: &Arc>) = self.inner {
1124 decode_state(num:inner.state.load(order:SeqCst)).is_closed()
1125 } else {
1126 false
1127 };
1128
1129 f.debug_struct("Receiver").field(name:"closed", &closed).finish()
1130 }
1131}
1132
1133impl<T> UnboundedReceiver<T> {
1134 /// Closes the receiving half of a channel, without dropping it.
1135 ///
1136 /// This prevents any further messages from being sent on the channel while
1137 /// still enabling the receiver to drain messages that are buffered.
1138 pub fn close(&mut self) {
1139 if let Some(inner) = &mut self.inner {
1140 inner.set_closed();
1141 }
1142 }
1143
1144 /// Tries to receive the next message without notifying a context if empty.
1145 ///
1146 /// It is not recommended to call this function from inside of a future,
1147 /// only when you've otherwise arranged to be notified when the channel is
1148 /// no longer empty.
1149 ///
1150 /// This function returns:
1151 /// * `Ok(Some(t))` when message is fetched
1152 /// * `Ok(None)` when channel is closed and no messages left in the queue
1153 /// * `Err(e)` when there are no messages available, but channel is not yet closed
1154 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1155 match self.next_message() {
1156 Poll::Ready(msg) => Ok(msg),
1157 Poll::Pending => Err(TryRecvError { _priv: () }),
1158 }
1159 }
1160
1161 fn next_message(&mut self) -> Poll<Option<T>> {
1162 let inner = match self.inner.as_mut() {
1163 None => return Poll::Ready(None),
1164 Some(inner) => inner,
1165 };
1166 // Pop off a message
1167 match unsafe { inner.message_queue.pop_spin() } {
1168 Some(msg) => {
1169 // Decrement number of messages
1170 self.dec_num_messages();
1171
1172 Poll::Ready(Some(msg))
1173 }
1174 None => {
1175 let state = decode_state(inner.state.load(SeqCst));
1176 if state.is_closed() {
1177 // If closed flag is set AND there are no pending messages
1178 // it means end of stream
1179 self.inner = None;
1180 Poll::Ready(None)
1181 } else {
1182 // If queue is open, we need to return Pending
1183 // to be woken up when new messages arrive.
1184 // If queue is closed but num_messages is non-zero,
1185 // it means that senders updated the state,
1186 // but didn't put message to queue yet,
1187 // so we need to park until sender unparks the task
1188 // after queueing the message.
1189 Poll::Pending
1190 }
1191 }
1192 }
1193 }
1194
1195 fn dec_num_messages(&self) {
1196 if let Some(inner) = &self.inner {
1197 // OPEN_MASK is highest bit, so it's unaffected by subtraction
1198 // unless there's underflow, and we know there's no underflow
1199 // because number of messages at this point is always > 0.
1200 inner.state.fetch_sub(1, SeqCst);
1201 }
1202 }
1203}
1204
1205impl<T> FusedStream for UnboundedReceiver<T> {
1206 fn is_terminated(&self) -> bool {
1207 self.inner.is_none()
1208 }
1209}
1210
1211impl<T> Stream for UnboundedReceiver<T> {
1212 type Item = T;
1213
1214 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1215 // Try to read a message off of the message queue.
1216 match self.next_message() {
1217 Poll::Ready(msg) => {
1218 if msg.is_none() {
1219 self.inner = None;
1220 }
1221 Poll::Ready(msg)
1222 }
1223 Poll::Pending => {
1224 // There are no messages to read, in this case, park.
1225 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1226 // Check queue again after parking to prevent race condition:
1227 // a message could be added to the queue after previous `next_message`
1228 // before `register` call.
1229 self.next_message()
1230 }
1231 }
1232 }
1233
1234 fn size_hint(&self) -> (usize, Option<usize>) {
1235 if let Some(inner) = &self.inner {
1236 decode_state(inner.state.load(SeqCst)).size_hint()
1237 } else {
1238 (0, Some(0))
1239 }
1240 }
1241}
1242
1243impl<T> Drop for UnboundedReceiver<T> {
1244 fn drop(&mut self) {
1245 // Drain the channel of all pending messages
1246 self.close();
1247 if self.inner.is_some() {
1248 loop {
1249 match self.next_message() {
1250 Poll::Ready(Some(_)) => {}
1251 Poll::Ready(None) => break,
1252 Poll::Pending => {
1253 let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1254
1255 // If the channel is closed, then there is no need to park.
1256 if state.is_closed() {
1257 break;
1258 }
1259
1260 // TODO: Spinning isn't ideal, it might be worth
1261 // investigating using a condvar or some other strategy
1262 // here. That said, if this case is hit, then another thread
1263 // is about to push the value into the queue and this isn't
1264 // the only spinlock in the impl right now.
1265 thread::yield_now();
1266 }
1267 }
1268 }
1269 }
1270 }
1271}
1272
1273impl<T> fmt::Debug for UnboundedReceiver<T> {
1274 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1275 let closed: bool = if let Some(ref inner: &Arc>) = self.inner {
1276 decode_state(num:inner.state.load(order:SeqCst)).is_closed()
1277 } else {
1278 false
1279 };
1280
1281 f.debug_struct("Receiver").field(name:"closed", &closed).finish()
1282 }
1283}
1284
1285/*
1286 *
1287 * ===== impl Inner =====
1288 *
1289 */
1290
1291impl<T> UnboundedInner<T> {
1292 // Clear `open` flag in the state, keep `num_messages` intact.
1293 fn set_closed(&self) {
1294 let curr: usize = self.state.load(order:SeqCst);
1295 if !decode_state(num:curr).is_open {
1296 return;
1297 }
1298
1299 self.state.fetch_and(!OPEN_MASK, order:SeqCst);
1300 }
1301}
1302
1303impl<T> BoundedInner<T> {
1304 // The return value is such that the total number of messages that can be
1305 // enqueued into the channel will never exceed MAX_CAPACITY
1306 fn max_senders(&self) -> usize {
1307 MAX_CAPACITY - self.buffer
1308 }
1309
1310 // Clear `open` flag in the state, keep `num_messages` intact.
1311 fn set_closed(&self) {
1312 let curr: usize = self.state.load(order:SeqCst);
1313 if !decode_state(num:curr).is_open {
1314 return;
1315 }
1316
1317 self.state.fetch_and(!OPEN_MASK, order:SeqCst);
1318 }
1319}
1320
1321unsafe impl<T: Send> Send for UnboundedInner<T> {}
1322unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1323
1324unsafe impl<T: Send> Send for BoundedInner<T> {}
1325unsafe impl<T: Send> Sync for BoundedInner<T> {}
1326
1327impl State {
1328 fn is_closed(&self) -> bool {
1329 !self.is_open && self.num_messages == 0
1330 }
1331
1332 fn size_hint(&self) -> (usize, Option<usize>) {
1333 if self.is_open {
1334 (self.num_messages, None)
1335 } else {
1336 (self.num_messages, Some(self.num_messages))
1337 }
1338 }
1339}
1340
1341/*
1342 *
1343 * ===== Helpers =====
1344 *
1345 */
1346
1347fn decode_state(num: usize) -> State {
1348 State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1349}
1350
1351fn encode_state(state: &State) -> usize {
1352 let mut num: usize = state.num_messages;
1353
1354 if state.is_open {
1355 num |= OPEN_MASK;
1356 }
1357
1358 num
1359}
1360