1//! # Flume
2//!
3//! A blazingly fast multi-producer, multi-consumer channel.
4//!
5//! *"Do not communicate by sharing memory; instead, share memory by communicating."*
6//!
7//! ## Why Flume?
8//!
9//! - **Featureful**: Unbounded, bounded and rendezvous queues
10//! - **Fast**: Always faster than `std::sync::mpsc` and sometimes `crossbeam-channel`
11//! - **Safe**: No `unsafe` code anywhere in the codebase!
12//! - **Flexible**: `Sender` and `Receiver` both implement `Send + Sync + Clone`
13//! - **Familiar**: Drop-in replacement for `std::sync::mpsc`
14//! - **Capable**: Additional features like MPMC support and send timeouts/deadlines
15//! - **Simple**: Few dependencies, minimal codebase, fast to compile
16//! - **Asynchronous**: `async` support, including mix 'n match with sync code
17//! - **Ergonomic**: Powerful `select`-like interface
18//!
19//! ## Example
20//!
21//! ```
22//! let (tx, rx) = flume::unbounded();
23//!
24//! tx.send(42).unwrap();
25//! assert_eq!(rx.recv().unwrap(), 42);
26//! ```
27
28#![deny(missing_docs)]
29
30#[cfg(feature = "select")]
31pub mod select;
32#[cfg(feature = "async")]
33pub mod r#async;
34
35mod signal;
36
37// Reexports
38#[cfg(feature = "select")]
39pub use select::Selector;
40
41use std::{
42 collections::VecDeque,
43 sync::{Arc, atomic::{AtomicUsize, AtomicBool, Ordering}, Weak},
44 time::{Duration, Instant},
45 marker::PhantomData,
46 thread,
47 fmt,
48};
49
50#[cfg(feature = "spin")]
51use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
52use crate::signal::{Signal, SyncSignal};
53
54/// An error that may be emitted when attempting to send a value into a channel on a sender when
55/// all receivers are dropped.
56#[derive(Copy, Clone, PartialEq, Eq)]
57pub struct SendError<T>(pub T);
58
59impl<T> SendError<T> {
60 /// Consume the error, yielding the message that failed to send.
61 pub fn into_inner(self) -> T { self.0 }
62}
63
64impl<T> fmt::Debug for SendError<T> {
65 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66 "SendError(..)".fmt(f)
67 }
68}
69
70impl<T> fmt::Display for SendError<T> {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 "sending on a closed channel".fmt(f)
73 }
74}
75
76impl<T> std::error::Error for SendError<T> {}
77
78/// An error that may be emitted when attempting to send a value into a channel on a sender when
79/// the channel is full or all receivers are dropped.
80#[derive(Copy, Clone, PartialEq, Eq)]
81pub enum TrySendError<T> {
82 /// The channel the message is sent on has a finite capacity and was full when the send was attempted.
83 Full(T),
84 /// All channel receivers were dropped and so the message has nobody to receive it.
85 Disconnected(T),
86}
87
88impl<T> TrySendError<T> {
89 /// Consume the error, yielding the message that failed to send.
90 pub fn into_inner(self) -> T {
91 match self {
92 Self::Full(msg: T) | Self::Disconnected(msg: T) => msg,
93 }
94 }
95}
96
97impl<T> fmt::Debug for TrySendError<T> {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 match *self {
100 TrySendError::Full(..) => "Full(..)".fmt(f),
101 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
102 }
103 }
104}
105
106impl<T> fmt::Display for TrySendError<T> {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 match self {
109 TrySendError::Full(..) => "sending on a full channel".fmt(f),
110 TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
111 }
112 }
113}
114
115impl<T> std::error::Error for TrySendError<T> {}
116
117impl<T> From<SendError<T>> for TrySendError<T> {
118 fn from(err: SendError<T>) -> Self {
119 match err {
120 SendError(item: T) => Self::Disconnected(item),
121 }
122 }
123}
124
125/// An error that may be emitted when sending a value into a channel on a sender with a timeout when
126/// the send operation times out or all receivers are dropped.
127#[derive(Copy, Clone, PartialEq, Eq)]
128pub enum SendTimeoutError<T> {
129 /// A timeout occurred when attempting to send the message.
130 Timeout(T),
131 /// All channel receivers were dropped and so the message has nobody to receive it.
132 Disconnected(T),
133}
134
135impl<T> SendTimeoutError<T> {
136 /// Consume the error, yielding the message that failed to send.
137 pub fn into_inner(self) -> T {
138 match self {
139 Self::Timeout(msg: T) | Self::Disconnected(msg: T) => msg,
140 }
141 }
142}
143
144impl<T> fmt::Debug for SendTimeoutError<T> {
145 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146 "SendTimeoutError(..)".fmt(f)
147 }
148}
149
150impl<T> fmt::Display for SendTimeoutError<T> {
151 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152 match self {
153 SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f),
154 SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f),
155 }
156 }
157}
158
159impl<T> std::error::Error for SendTimeoutError<T> {}
160
161impl<T> From<SendError<T>> for SendTimeoutError<T> {
162 fn from(err: SendError<T>) -> Self {
163 match err {
164 SendError(item: T) => Self::Disconnected(item),
165 }
166 }
167}
168
169enum TrySendTimeoutError<T> {
170 Full(T),
171 Disconnected(T),
172 Timeout(T),
173}
174
175/// An error that may be emitted when attempting to wait for a value on a receiver when all senders
176/// are dropped and there are no more messages in the channel.
177#[derive(Copy, Clone, Debug, PartialEq, Eq)]
178pub enum RecvError {
179 /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
180 Disconnected,
181}
182
183impl fmt::Display for RecvError {
184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185 match self {
186 RecvError::Disconnected => "receiving on a closed channel".fmt(f),
187 }
188 }
189}
190
191impl std::error::Error for RecvError {}
192
193/// An error that may be emitted when attempting to fetch a value on a receiver when there are no
194/// messages in the channel. If there are no messages in the channel and all senders are dropped,
195/// then `TryRecvError::Disconnected` will be returned.
196#[derive(Copy, Clone, Debug, PartialEq, Eq)]
197pub enum TryRecvError {
198 /// The channel was empty when the receive was attempted.
199 Empty,
200 /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
201 Disconnected,
202}
203
204impl fmt::Display for TryRecvError {
205 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206 match self {
207 TryRecvError::Empty => "receiving on an empty channel".fmt(f),
208 TryRecvError::Disconnected => "channel is empty and closed".fmt(f),
209 }
210 }
211}
212
213impl std::error::Error for TryRecvError {}
214
215impl From<RecvError> for TryRecvError {
216 fn from(err: RecvError) -> Self {
217 match err {
218 RecvError::Disconnected => Self::Disconnected,
219 }
220 }
221}
222
223/// An error that may be emitted when attempting to wait for a value on a receiver with a timeout
224/// when the receive operation times out or all senders are dropped and there are no values left
225/// in the channel.
226#[derive(Copy, Clone, Debug, PartialEq, Eq)]
227pub enum RecvTimeoutError {
228 /// A timeout occurred when attempting to receive a message.
229 Timeout,
230 /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
231 Disconnected,
232}
233
234impl fmt::Display for RecvTimeoutError {
235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 match self {
237 RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f),
238 RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f),
239 }
240 }
241}
242
243impl std::error::Error for RecvTimeoutError {}
244
245impl From<RecvError> for RecvTimeoutError {
246 fn from(err: RecvError) -> Self {
247 match err {
248 RecvError::Disconnected => Self::Disconnected,
249 }
250 }
251}
252
253enum TryRecvTimeoutError {
254 Empty,
255 Timeout,
256 Disconnected,
257}
258
259// TODO: Investigate some sort of invalidation flag for timeouts
260#[cfg(feature = "spin")]
261struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
262
263#[cfg(not(feature = "spin"))]
264struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);
265
266#[cfg(feature = "spin")]
267impl<T, S: ?Sized + Signal> Hook<T, S> {
268 pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
269 where
270 S: Sized,
271 {
272 Arc::new(Self(Some(Spinlock::new(msg)), signal))
273 }
274
275 fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
276 self.0.as_ref().map(|s| s.lock())
277 }
278}
279
280#[cfg(not(feature = "spin"))]
281impl<T, S: ?Sized + Signal> Hook<T, S> {
282 pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
283 where
284 S: Sized,
285 {
286 Arc::new(Self(Some(Mutex::new(msg)), signal))
287 }
288
289 fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
290 self.0.as_ref().map(|s: &Mutex>| s.lock().unwrap())
291 }
292}
293
294impl<T, S: ?Sized + Signal> Hook<T, S> {
295 pub fn fire_recv(&self) -> (T, &S) {
296 let msg = self.lock().unwrap().take().unwrap();
297 (msg, self.signal())
298 }
299
300 pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
301 let ret = match self.lock() {
302 Some(mut lock) => {
303 *lock = Some(msg);
304 None
305 }
306 None => Some(msg),
307 };
308 (ret, self.signal())
309 }
310
311 pub fn is_empty(&self) -> bool {
312 self.lock().map(|s| s.is_none()).unwrap_or(true)
313 }
314
315 pub fn try_take(&self) -> Option<T> {
316 self.lock().unwrap().take()
317 }
318
319 pub fn trigger(signal: S) -> Arc<Self>
320 where
321 S: Sized,
322 {
323 Arc::new(Self(None, signal))
324 }
325
326 pub fn signal(&self) -> &S {
327 &self.1
328 }
329
330 pub fn fire_nothing(&self) -> bool {
331 self.signal().fire()
332 }
333}
334
335impl<T> Hook<T, SyncSignal> {
336 pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
337 loop {
338 let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
339 let msg = self.lock().unwrap().take();
340 if let Some(msg) = msg {
341 break Some(msg);
342 } else if disconnected {
343 break None;
344 } else {
345 self.signal().wait()
346 }
347 }
348 }
349
350 // Err(true) if timeout
351 pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
352 loop {
353 let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
354 let msg = self.lock().unwrap().take();
355 if let Some(msg) = msg {
356 break Ok(msg);
357 } else if disconnected {
358 break Err(false);
359 } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
360 self.signal().wait_timeout(dur);
361 } else {
362 break Err(true);
363 }
364 }
365 }
366
367 pub fn wait_send(&self, abort: &AtomicBool) {
368 loop {
369 let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
370 if disconnected || self.lock().unwrap().is_none() {
371 break;
372 }
373
374 self.signal().wait();
375 }
376 }
377
378 // Err(true) if timeout
379 pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
380 loop {
381 let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
382 if self.lock().unwrap().is_none() {
383 break Ok(());
384 } else if disconnected {
385 break Err(false);
386 } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
387 self.signal().wait_timeout(dur);
388 } else {
389 break Err(true);
390 }
391 }
392 }
393}
394
395#[cfg(feature = "spin")]
396#[inline]
397fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> {
398 let mut i = 4;
399 loop {
400 for _ in 0..10 {
401 if let Some(guard) = lock.try_lock() {
402 return guard;
403 }
404 thread::yield_now();
405 }
406 // Sleep for at most ~1 ms
407 thread::sleep(Duration::from_nanos(1 << i.min(20)));
408 i += 1;
409 }
410}
411
412#[cfg(not(feature = "spin"))]
413#[inline]
414fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
415 lock.lock().unwrap()
416}
417
418#[cfg(not(feature = "spin"))]
419use std::sync::{Mutex, MutexGuard};
420
421#[cfg(feature = "spin")]
422type ChanLock<T> = Spinlock<T>;
423#[cfg(not(feature = "spin"))]
424type ChanLock<T> = Mutex<T>;
425
426
427type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>;
428struct Chan<T> {
429 sending: Option<(usize, SignalVec<T>)>,
430 queue: VecDeque<T>,
431 waiting: SignalVec<T>,
432}
433
434impl<T> Chan<T> {
435 fn pull_pending(&mut self, pull_extra: bool) {
436 if let Some((cap: &mut usize, sending: &mut VecDeque>>)) = &mut self.sending {
437 let effective_cap: usize = *cap + pull_extra as usize;
438
439 while self.queue.len() < effective_cap {
440 if let Some(s: Arc>) = sending.pop_front() {
441 let (msg: T, signal: &dyn Signal) = s.fire_recv();
442 signal.fire();
443 self.queue.push_back(msg);
444 } else {
445 break;
446 }
447 }
448 }
449 }
450
451 fn try_wake_receiver_if_pending(&mut self) {
452 if !self.queue.is_empty() {
453 while Some(false) == self.waiting.pop_front().map(|s: Arc>| s.fire_nothing()) {}
454 }
455 }
456}
457
458struct Shared<T> {
459 chan: ChanLock<Chan<T>>,
460 disconnected: AtomicBool,
461 sender_count: AtomicUsize,
462 receiver_count: AtomicUsize,
463}
464
465impl<T> Shared<T> {
466 fn new(cap: Option<usize>) -> Self {
467 Self {
468 chan: ChanLock::new(Chan {
469 sending: cap.map(|cap| (cap, VecDeque::new())),
470 queue: VecDeque::new(),
471 waiting: VecDeque::new(),
472 }),
473 disconnected: AtomicBool::new(false),
474 sender_count: AtomicUsize::new(1),
475 receiver_count: AtomicUsize::new(1),
476 }
477 }
478
479 fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>(
480 &self,
481 msg: T,
482 should_block: bool,
483 make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>,
484 do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
485 ) -> R {
486 let mut chan = wait_lock(&self.chan);
487
488 if self.is_disconnected() {
489 Err(TrySendTimeoutError::Disconnected(msg)).into()
490 } else if !chan.waiting.is_empty() {
491 let mut msg = Some(msg);
492
493 loop {
494 let slot = chan.waiting.pop_front();
495 match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
496 // No more waiting receivers and msg in queue, so break out of the loop
497 None if msg.is_none() => break,
498 // No more waiting receivers, so add msg to queue and break out of the loop
499 None => {
500 chan.queue.push_back(msg.unwrap());
501 break;
502 }
503 Some((Some(m), signal)) => {
504 if signal.fire() {
505 // Was async and a stream, so didn't acquire the message. Wake another
506 // receiver, and do not yet push the message.
507 msg.replace(m);
508 continue;
509 } else {
510 // Was async and not a stream, so it did acquire the message. Push the
511 // message to the queue for it to be received.
512 chan.queue.push_back(m);
513 drop(chan);
514 break;
515 }
516 },
517 Some((None, signal)) => {
518 drop(chan);
519 signal.fire();
520 break; // Was sync, so it has acquired the message
521 },
522 }
523 }
524
525 Ok(()).into()
526 } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) {
527 chan.queue.push_back(msg);
528 Ok(()).into()
529 } else if should_block { // Only bounded from here on
530 let hook = make_signal(msg);
531 chan.sending.as_mut().unwrap().1.push_back(hook.clone());
532 drop(chan);
533
534 do_block(hook)
535 } else {
536 Err(TrySendTimeoutError::Full(msg)).into()
537 }
538 }
539
540 fn send_sync(
541 &self,
542 msg: T,
543 block: Option<Option<Instant>>,
544 ) -> Result<(), TrySendTimeoutError<T>> {
545 self.send(
546 // msg
547 msg,
548 // should_block
549 block.is_some(),
550 // make_signal
551 |msg| Hook::slot(Some(msg), SyncSignal::default()),
552 // do_block
553 |hook| if let Some(deadline) = block.unwrap() {
554 hook.wait_deadline_send(&self.disconnected, deadline)
555 .or_else(|timed_out| {
556 if timed_out { // Remove our signal
557 let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone();
558 wait_lock(&self.chan).sending
559 .as_mut()
560 .unwrap().1
561 .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
562 }
563 hook.try_take().map(|msg| if self.is_disconnected() {
564 Err(TrySendTimeoutError::Disconnected(msg))
565 } else {
566 Err(TrySendTimeoutError::Timeout(msg))
567 })
568 .unwrap_or(Ok(()))
569 })
570 } else {
571 hook.wait_send(&self.disconnected);
572
573 match hook.try_take() {
574 Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)),
575 None => Ok(()),
576 }
577 },
578 )
579 }
580
581 fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
582 &self,
583 should_block: bool,
584 make_signal: impl FnOnce() -> Arc<Hook<T, S>>,
585 do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
586 ) -> R {
587 let mut chan = wait_lock(&self.chan);
588 chan.pull_pending(true);
589
590 if let Some(msg) = chan.queue.pop_front() {
591 drop(chan);
592 Ok(msg).into()
593 } else if self.is_disconnected() {
594 drop(chan);
595 Err(TryRecvTimeoutError::Disconnected).into()
596 } else if should_block {
597 let hook = make_signal();
598 chan.waiting.push_back(hook.clone());
599 drop(chan);
600
601 do_block(hook)
602 } else {
603 drop(chan);
604 Err(TryRecvTimeoutError::Empty).into()
605 }
606 }
607
608 fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> {
609 self.recv(
610 // should_block
611 block.is_some(),
612 // make_signal
613 || Hook::slot(None, SyncSignal::default()),
614 // do_block
615 |hook| if let Some(deadline) = block.unwrap() {
616 hook.wait_deadline_recv(&self.disconnected, deadline)
617 .or_else(|timed_out| {
618 if timed_out { // Remove our signal
619 let hook: Arc<Hook<T, dyn Signal>> = hook.clone();
620 wait_lock(&self.chan).waiting
621 .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
622 }
623 match hook.try_take() {
624 Some(msg) => Ok(msg),
625 None => {
626 let disconnected = self.is_disconnected(); // Check disconnect *before* msg
627 if let Some(msg) = wait_lock(&self.chan).queue.pop_front() {
628 Ok(msg)
629 } else if disconnected {
630 Err(TryRecvTimeoutError::Disconnected)
631 } else {
632 Err(TryRecvTimeoutError::Timeout)
633 }
634 },
635 }
636 })
637 } else {
638 hook.wait_recv(&self.disconnected)
639 .or_else(|| wait_lock(&self.chan).queue.pop_front())
640 .ok_or(TryRecvTimeoutError::Disconnected)
641 },
642 )
643 }
644
645 /// Disconnect anything listening on this channel (this will not prevent receivers receiving
646 /// msgs that have already been sent)
647 fn disconnect_all(&self) {
648 self.disconnected.store(true, Ordering::Relaxed);
649
650 let mut chan = wait_lock(&self.chan);
651 chan.pull_pending(false);
652 if let Some((_, sending)) = chan.sending.as_ref() {
653 sending.iter().for_each(|hook| {
654 hook.signal().fire();
655 })
656 }
657 chan.waiting.iter().for_each(|hook| {
658 hook.signal().fire();
659 });
660 }
661
662 fn is_disconnected(&self) -> bool {
663 self.disconnected.load(Ordering::SeqCst)
664 }
665
666 fn is_empty(&self) -> bool {
667 self.len() == 0
668 }
669
670 fn is_full(&self) -> bool {
671 self.capacity().map(|cap| cap == self.len()).unwrap_or(false)
672 }
673
674 fn len(&self) -> usize {
675 let mut chan = wait_lock(&self.chan);
676 chan.pull_pending(false);
677 chan.queue.len()
678 }
679
680 fn capacity(&self) -> Option<usize> {
681 wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap)
682 }
683
684 fn sender_count(&self) -> usize {
685 self.sender_count.load(Ordering::Relaxed)
686 }
687
688 fn receiver_count(&self) -> usize {
689 self.receiver_count.load(Ordering::Relaxed)
690 }
691}
692
693/// A transmitting end of a channel.
694pub struct Sender<T> {
695 shared: Arc<Shared<T>>,
696}
697
698impl<T> Sender<T> {
699 /// Attempt to send a value into the channel. If the channel is bounded and full, or all
700 /// receivers have been dropped, an error is returned. If the channel associated with this
701 /// sender is unbounded, this method has the same behaviour as [`Sender::send`].
702 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
703 self.shared.send_sync(msg, None).map_err(|err| match err {
704 TrySendTimeoutError::Full(msg) => TrySendError::Full(msg),
705 TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
706 _ => unreachable!(),
707 })
708 }
709
710 /// Send a value into the channel, returning an error if all receivers have been dropped.
711 /// If the channel is bounded and is full, this method will block until space is available
712 /// or all receivers have been dropped. If the channel is unbounded, this method will not
713 /// block.
714 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
715 self.shared.send_sync(msg, Some(None)).map_err(|err| match err {
716 TrySendTimeoutError::Disconnected(msg) => SendError(msg),
717 _ => unreachable!(),
718 })
719 }
720
721 /// Send a value into the channel, returning an error if all receivers have been dropped
722 /// or the deadline has passed. If the channel is bounded and is full, this method will
723 /// block until space is available, the deadline is reached, or all receivers have been
724 /// dropped.
725 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
726 self.shared.send_sync(msg, Some(Some(deadline))).map_err(|err| match err {
727 TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg),
728 TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg),
729 _ => unreachable!(),
730 })
731 }
732
733 /// Send a value into the channel, returning an error if all receivers have been dropped
734 /// or the timeout has expired. If the channel is bounded and is full, this method will
735 /// block until space is available, the timeout has expired, or all receivers have been
736 /// dropped.
737 pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
738 self.send_deadline(msg, Instant::now().checked_add(dur).unwrap())
739 }
740
741 /// Returns true if all receivers for this channel have been dropped.
742 pub fn is_disconnected(&self) -> bool {
743 self.shared.is_disconnected()
744 }
745
746 /// Returns true if the channel is empty.
747 /// Note: Zero-capacity channels are always empty.
748 pub fn is_empty(&self) -> bool {
749 self.shared.is_empty()
750 }
751
752 /// Returns true if the channel is full.
753 /// Note: Zero-capacity channels are always full.
754 pub fn is_full(&self) -> bool {
755 self.shared.is_full()
756 }
757
758 /// Returns the number of messages in the channel
759 pub fn len(&self) -> usize {
760 self.shared.len()
761 }
762
763 /// If the channel is bounded, returns its capacity.
764 pub fn capacity(&self) -> Option<usize> {
765 self.shared.capacity()
766 }
767
768 /// Get the number of senders that currently exist, including this one.
769 pub fn sender_count(&self) -> usize {
770 self.shared.sender_count()
771 }
772
773 /// Get the number of receivers that currently exist.
774 ///
775 /// Note that this method makes no guarantees that a subsequent send will succeed; it's
776 /// possible that between `receiver_count()` being called and a `send()`, all open receivers
777 /// could drop.
778 pub fn receiver_count(&self) -> usize {
779 self.shared.receiver_count()
780 }
781
782 /// Creates a [`WeakSender`] that does not keep the channel open.
783 ///
784 /// The channel is closed once all `Sender`s are dropped, even if there
785 /// are still active `WeakSender`s.
786 pub fn downgrade(&self) -> WeakSender<T> {
787 WeakSender {
788 shared: Arc::downgrade(&self.shared),
789 }
790 }
791
792 /// Returns whether the senders are belong to the same channel.
793 pub fn same_channel(&self, other: &Sender<T>) -> bool {
794 Arc::ptr_eq(&self.shared, &other.shared)
795 }
796}
797
798impl<T> Clone for Sender<T> {
799 /// Clone this sender. [`Sender`] acts as a handle to the ending a channel. Remaining channel
800 /// contents will only be cleaned up when all senders and the receiver have been dropped.
801 fn clone(&self) -> Self {
802 self.shared.sender_count.fetch_add(val:1, order:Ordering::Relaxed);
803 Self { shared: self.shared.clone() }
804 }
805}
806
807impl<T> fmt::Debug for Sender<T> {
808 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
809 f.debug_struct(name:"Sender").finish()
810 }
811}
812
813impl<T> Drop for Sender<T> {
814 fn drop(&mut self) {
815 // Notify receivers that all senders have been dropped if the number of senders drops to 0.
816 if self.shared.sender_count.fetch_sub(val:1, order:Ordering::Relaxed) == 1 {
817 self.shared.disconnect_all();
818 }
819 }
820}
821
822/// A sender that does not prevent the channel from being closed.
823///
824/// Weak senders do not count towards the number of active senders on the channel. As soon as
825/// all normal [`Sender`]s are dropped, the channel is closed, even if there is still a
826/// `WeakSender`.
827///
828/// To send messages, a `WeakSender` must first be upgraded to a `Sender` using the [`upgrade`]
829/// method.
830pub struct WeakSender<T> {
831 shared: Weak<Shared<T>>,
832}
833
834impl<T> WeakSender<T> {
835 /// Tries to upgrade the `WeakSender` to a [`Sender`], in order to send messages.
836 ///
837 /// Returns `None` if the channel was closed already. Note that a `Some` return value
838 /// does not guarantee that the channel is still open.
839 pub fn upgrade(&self) -> Option<Sender<T>> {
840 self.shared
841 .upgrade()
842 // check that there are still live senders
843 .filter(|shared| {
844 shared
845 .sender_count
846 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
847 if count == 0 {
848 // all senders are closed already -> don't increase the sender count
849 None
850 } else {
851 // there is still at least one active sender
852 Some(count + 1)
853 }
854 })
855 .is_ok()
856 })
857 .map(|shared| Sender { shared })
858 }
859}
860
861/// The receiving end of a channel.
862///
863/// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
864/// Each message will only be received by a single receiver. This is useful for
865/// implementing work stealing for concurrent programs.
866pub struct Receiver<T> {
867 shared: Arc<Shared<T>>,
868}
869
870impl<T> Receiver<T> {
871 /// Attempt to fetch an incoming value from the channel associated with this receiver,
872 /// returning an error if the channel is empty or if all senders have been dropped.
873 pub fn try_recv(&self) -> Result<T, TryRecvError> {
874 self.shared.recv_sync(None).map_err(|err| match err {
875 TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected,
876 TryRecvTimeoutError::Empty => TryRecvError::Empty,
877 _ => unreachable!(),
878 })
879 }
880
881 /// Wait for an incoming value from the channel associated with this receiver, returning an
882 /// error if all senders have been dropped.
883 pub fn recv(&self) -> Result<T, RecvError> {
884 self.shared.recv_sync(Some(None)).map_err(|err| match err {
885 TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
886 _ => unreachable!(),
887 })
888 }
889
890 /// Wait for an incoming value from the channel associated with this receiver, returning an
891 /// error if all senders have been dropped or the deadline has passed.
892 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
893 self.shared.recv_sync(Some(Some(deadline))).map_err(|err| match err {
894 TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
895 TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
896 _ => unreachable!(),
897 })
898 }
899
900 /// Wait for an incoming value from the channel associated with this receiver, returning an
901 /// error if all senders have been dropped or the timeout has expired.
902 pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
903 self.recv_deadline(Instant::now().checked_add(dur).unwrap())
904 }
905
906 /// Create a blocking iterator over the values received on the channel that finishes iteration
907 /// when all senders have been dropped.
908 ///
909 /// You can also create a self-owned iterator with [`Receiver::into_iter`].
910 pub fn iter(&self) -> Iter<T> {
911 Iter { receiver: &self }
912 }
913
914 /// A non-blocking iterator over the values received on the channel that finishes iteration
915 /// when all senders have been dropped or the channel is empty.
916 pub fn try_iter(&self) -> TryIter<T> {
917 TryIter { receiver: &self }
918 }
919
920 /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
921 /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
922 /// the function has been called.
923 pub fn drain(&self) -> Drain<T> {
924 let mut chan = wait_lock(&self.shared.chan);
925 chan.pull_pending(false);
926 let queue = std::mem::take(&mut chan.queue);
927
928 Drain { queue, _phantom: PhantomData }
929 }
930
931 /// Returns true if all senders for this channel have been dropped.
932 pub fn is_disconnected(&self) -> bool {
933 self.shared.is_disconnected()
934 }
935
936 /// Returns true if the channel is empty.
937 /// Note: Zero-capacity channels are always empty.
938 pub fn is_empty(&self) -> bool {
939 self.shared.is_empty()
940 }
941
942 /// Returns true if the channel is full.
943 /// Note: Zero-capacity channels are always full.
944 pub fn is_full(&self) -> bool {
945 self.shared.is_full()
946 }
947
948 /// Returns the number of messages in the channel.
949 pub fn len(&self) -> usize {
950 self.shared.len()
951 }
952
953 /// If the channel is bounded, returns its capacity.
954 pub fn capacity(&self) -> Option<usize> {
955 self.shared.capacity()
956 }
957
958 /// Get the number of senders that currently exist.
959 pub fn sender_count(&self) -> usize {
960 self.shared.sender_count()
961 }
962
963 /// Get the number of receivers that currently exist, including this one.
964 pub fn receiver_count(&self) -> usize {
965 self.shared.receiver_count()
966 }
967
968 /// Returns whether the receivers are belong to the same channel.
969 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
970 Arc::ptr_eq(&self.shared, &other.shared)
971 }
972}
973
974impl<T> Clone for Receiver<T> {
975 /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining
976 /// channel contents will only be cleaned up when all senders and the receiver have been
977 /// dropped.
978 ///
979 /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
980 /// Each message will only be received by a single receiver. This is useful for
981 /// implementing work stealing for concurrent programs.
982 fn clone(&self) -> Self {
983 self.shared.receiver_count.fetch_add(val:1, order:Ordering::Relaxed);
984 Self { shared: self.shared.clone() }
985 }
986}
987
988impl<T> fmt::Debug for Receiver<T> {
989 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
990 f.debug_struct(name:"Receiver").finish()
991 }
992}
993
994impl<T> Drop for Receiver<T> {
995 fn drop(&mut self) {
996 // Notify senders that all receivers have been dropped if the number of receivers drops
997 // to 0.
998 if self.shared.receiver_count.fetch_sub(val:1, order:Ordering::Relaxed) == 1 {
999 self.shared.disconnect_all();
1000 }
1001 }
1002}
1003
1004/// This exists as a shorthand for [`Receiver::iter`].
1005impl<'a, T> IntoIterator for &'a Receiver<T> {
1006 type Item = T;
1007 type IntoIter = Iter<'a, T>;
1008
1009 fn into_iter(self) -> Self::IntoIter {
1010 Iter { receiver: self }
1011 }
1012}
1013
1014impl<T> IntoIterator for Receiver<T> {
1015 type Item = T;
1016 type IntoIter = IntoIter<T>;
1017
1018 /// Creates a self-owned but semantically equivalent alternative to [`Receiver::iter`].
1019 fn into_iter(self) -> Self::IntoIter {
1020 IntoIter { receiver: self }
1021 }
1022}
1023
1024/// An iterator over the msgs received from a channel.
1025pub struct Iter<'a, T> {
1026 receiver: &'a Receiver<T>,
1027}
1028
1029impl<'a, T> Iterator for Iter<'a, T> {
1030 type Item = T;
1031
1032 fn next(&mut self) -> Option<Self::Item> {
1033 self.receiver.recv().ok()
1034 }
1035}
1036
1037/// An non-blocking iterator over the msgs received from a channel.
1038pub struct TryIter<'a, T> {
1039 receiver: &'a Receiver<T>,
1040}
1041
1042impl<'a, T> Iterator for TryIter<'a, T> {
1043 type Item = T;
1044
1045 fn next(&mut self) -> Option<Self::Item> {
1046 self.receiver.try_recv().ok()
1047 }
1048}
1049
1050/// An fixed-sized iterator over the msgs drained from a channel.
1051#[derive(Debug)]
1052pub struct Drain<'a, T> {
1053 queue: VecDeque<T>,
1054 /// A phantom field used to constrain the lifetime of this iterator. We do this because the
1055 /// implementation may change and we don't want to unintentionally constrain it. Removing this
1056 /// lifetime later is a possibility.
1057 _phantom: PhantomData<&'a ()>,
1058}
1059
1060impl<'a, T> Iterator for Drain<'a, T> {
1061 type Item = T;
1062
1063 fn next(&mut self) -> Option<Self::Item> {
1064 self.queue.pop_front()
1065 }
1066}
1067
1068impl<'a, T> ExactSizeIterator for Drain<'a, T> {
1069 fn len(&self) -> usize {
1070 self.queue.len()
1071 }
1072}
1073
1074/// An owned iterator over the msgs received from a channel.
1075pub struct IntoIter<T> {
1076 receiver: Receiver<T>,
1077}
1078
1079impl<T> Iterator for IntoIter<T> {
1080 type Item = T;
1081
1082 fn next(&mut self) -> Option<Self::Item> {
1083 self.receiver.recv().ok()
1084 }
1085}
1086
1087/// Create a channel with no maximum capacity.
1088///
1089/// Create an unbounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in
1090/// one end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
1091/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
1092/// may be cloned.
1093///
1094/// # Examples
1095/// ```
1096/// let (tx, rx) = flume::unbounded();
1097///
1098/// tx.send(42).unwrap();
1099/// assert_eq!(rx.recv().unwrap(), 42);
1100/// ```
1101pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1102 let shared: Arc> = Arc::new(data:Shared::new(cap:None));
1103 (
1104 Sender { shared: shared.clone() },
1105 Receiver { shared },
1106 )
1107}
1108
1109/// Create a channel with a maximum capacity.
1110///
1111/// Create a bounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in one
1112/// end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
1113/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
1114/// may be cloned.
1115///
1116/// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to
1117/// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour
1118/// is not desired, [`Sender::try_send`] may be used.
1119///
1120/// Like `std::sync::mpsc`, `flume` supports 'rendezvous' channels. A bounded queue with a maximum capacity of zero
1121/// will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a
1122/// ['Glienicke Bridge'](https://en.wikipedia.org/wiki/Glienicke_Bridge)-style location at which senders and receivers
1123/// perform a handshake and transfer ownership of a value.
1124///
1125/// # Examples
1126/// ```
1127/// let (tx, rx) = flume::bounded(32);
1128///
1129/// for i in 1..33 {
1130/// tx.send(i).unwrap();
1131/// }
1132/// assert!(tx.try_send(33).is_err());
1133///
1134/// assert_eq!(rx.try_iter().sum::<u32>(), (1..33).sum());
1135/// ```
1136pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
1137 let shared: Arc> = Arc::new(data:Shared::new(cap:Some(cap)));
1138 (
1139 Sender { shared: shared.clone() },
1140 Receiver { shared },
1141 )
1142}
1143