1//! The channel interface.
2
3use std::fmt;
4use std::iter::FusedIterator;
5use std::mem;
6use std::panic::{RefUnwindSafe, UnwindSafe};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::context::Context;
11use crate::counter;
12use crate::err::{
13 RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
14};
15use crate::flavors;
16use crate::select::{Operation, SelectHandle, Token};
17
18/// Creates a channel of unbounded capacity.
19///
20/// This channel has a growable buffer that can hold any number of messages at a time.
21///
22/// # Examples
23///
24/// ```
25/// use std::thread;
26/// use crossbeam_channel::unbounded;
27///
28/// let (s, r) = unbounded();
29///
30/// // Computes the n-th Fibonacci number.
31/// fn fib(n: i32) -> i32 {
32/// if n <= 1 {
33/// n
34/// } else {
35/// fib(n - 1) + fib(n - 2)
36/// }
37/// }
38///
39/// // Spawn an asynchronous computation.
40/// thread::spawn(move || s.send(fib(20)).unwrap());
41///
42/// // Print the result of the computation.
43/// println!("{}", r.recv().unwrap());
44/// ```
45pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
46 let (s: Sender>, r: Receiver>) = counter::new(chan:flavors::list::Channel::new());
47 let s: Sender = Sender {
48 flavor: SenderFlavor::List(s),
49 };
50 let r: Receiver = Receiver {
51 flavor: ReceiverFlavor::List(r),
52 };
53 (s, r)
54}
55
56/// Creates a channel of bounded capacity.
57///
58/// This channel has a buffer that can hold at most `cap` messages at a time.
59///
60/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
61/// receive operations must appear at the same time in order to pair up and pass the message over.
62///
63/// # Examples
64///
65/// A channel of capacity 1:
66///
67/// ```
68/// use std::thread;
69/// use std::time::Duration;
70/// use crossbeam_channel::bounded;
71///
72/// let (s, r) = bounded(1);
73///
74/// // This call returns immediately because there is enough space in the channel.
75/// s.send(1).unwrap();
76///
77/// thread::spawn(move || {
78/// // This call blocks the current thread because the channel is full.
79/// // It will be able to complete only after the first message is received.
80/// s.send(2).unwrap();
81/// });
82///
83/// thread::sleep(Duration::from_secs(1));
84/// assert_eq!(r.recv(), Ok(1));
85/// assert_eq!(r.recv(), Ok(2));
86/// ```
87///
88/// A zero-capacity channel:
89///
90/// ```
91/// use std::thread;
92/// use std::time::Duration;
93/// use crossbeam_channel::bounded;
94///
95/// let (s, r) = bounded(0);
96///
97/// thread::spawn(move || {
98/// // This call blocks the current thread until a receive operation appears
99/// // on the other side of the channel.
100/// s.send(1).unwrap();
101/// });
102///
103/// thread::sleep(Duration::from_secs(1));
104/// assert_eq!(r.recv(), Ok(1));
105/// ```
106pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
107 if cap == 0 {
108 let (s: Sender>, r: Receiver>) = counter::new(chan:flavors::zero::Channel::new());
109 let s: Sender = Sender {
110 flavor: SenderFlavor::Zero(s),
111 };
112 let r: Receiver = Receiver {
113 flavor: ReceiverFlavor::Zero(r),
114 };
115 (s, r)
116 } else {
117 let (s: Sender>, r: Receiver>) = counter::new(chan:flavors::array::Channel::with_capacity(cap));
118 let s: Sender = Sender {
119 flavor: SenderFlavor::Array(s),
120 };
121 let r: Receiver = Receiver {
122 flavor: ReceiverFlavor::Array(r),
123 };
124 (s, r)
125 }
126}
127
128/// Creates a receiver that delivers a message after a certain duration of time.
129///
130/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
131/// be sent into the channel after `duration` elapses. The message is the instant at which it is
132/// sent.
133///
134/// # Examples
135///
136/// Using an `after` channel for timeouts:
137///
138/// ```
139/// use std::time::Duration;
140/// use crossbeam_channel::{after, select, unbounded};
141///
142/// let (s, r) = unbounded::<i32>();
143/// let timeout = Duration::from_millis(100);
144///
145/// select! {
146/// recv(r) -> msg => println!("received {:?}", msg),
147/// recv(after(timeout)) -> _ => println!("timed out"),
148/// }
149/// ```
150///
151/// When the message gets sent:
152///
153/// ```
154/// use std::thread;
155/// use std::time::{Duration, Instant};
156/// use crossbeam_channel::after;
157///
158/// // Converts a number of milliseconds into a `Duration`.
159/// let ms = |ms| Duration::from_millis(ms);
160///
161/// // Returns `true` if `a` and `b` are very close `Instant`s.
162/// let eq = |a, b| a + ms(60) > b && b + ms(60) > a;
163///
164/// let start = Instant::now();
165/// let r = after(ms(100));
166///
167/// thread::sleep(ms(500));
168///
169/// // This message was sent 100 ms from the start and received 500 ms from the start.
170/// assert!(eq(r.recv().unwrap(), start + ms(100)));
171/// assert!(eq(Instant::now(), start + ms(500)));
172/// ```
173pub fn after(duration: Duration) -> Receiver<Instant> {
174 match Instant::now().checked_add(duration) {
175 Some(deadline: Instant) => Receiver {
176 flavor: ReceiverFlavor::At(Arc::new(data:flavors::at::Channel::new_deadline(when:deadline))),
177 },
178 None => never(),
179 }
180}
181
182/// Creates a receiver that delivers a message at a certain instant in time.
183///
184/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
185/// be sent into the channel at the moment in time `when`. The message is the instant at which it
186/// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
187/// instantly to the receiver.
188///
189/// # Examples
190///
191/// Using an `at` channel for timeouts:
192///
193/// ```
194/// use std::time::{Instant, Duration};
195/// use crossbeam_channel::{at, select, unbounded};
196///
197/// let (s, r) = unbounded::<i32>();
198/// let deadline = Instant::now() + Duration::from_millis(500);
199///
200/// select! {
201/// recv(r) -> msg => println!("received {:?}", msg),
202/// recv(at(deadline)) -> _ => println!("timed out"),
203/// }
204/// ```
205///
206/// When the message gets sent:
207///
208/// ```
209/// use std::time::{Duration, Instant};
210/// use crossbeam_channel::at;
211///
212/// // Converts a number of milliseconds into a `Duration`.
213/// let ms = |ms| Duration::from_millis(ms);
214///
215/// let start = Instant::now();
216/// let end = start + ms(100);
217///
218/// let r = at(end);
219///
220/// // This message was sent 100 ms from the start
221/// assert_eq!(r.recv().unwrap(), end);
222/// assert!(Instant::now() > start + ms(100));
223/// ```
224pub fn at(when: Instant) -> Receiver<Instant> {
225 Receiver {
226 flavor: ReceiverFlavor::At(Arc::new(data:flavors::at::Channel::new_deadline(when))),
227 }
228}
229
230/// Creates a receiver that never delivers messages.
231///
232/// The channel is bounded with capacity of 0 and never gets disconnected.
233///
234/// # Examples
235///
236/// Using a `never` channel to optionally add a timeout to [`select!`]:
237///
238/// [`select!`]: crate::select!
239///
240/// ```
241/// use std::thread;
242/// use std::time::Duration;
243/// use crossbeam_channel::{after, select, never, unbounded};
244///
245/// let (s, r) = unbounded();
246///
247/// thread::spawn(move || {
248/// thread::sleep(Duration::from_secs(1));
249/// s.send(1).unwrap();
250/// });
251///
252/// // Suppose this duration can be a `Some` or a `None`.
253/// let duration = Some(Duration::from_millis(100));
254///
255/// // Create a channel that times out after the specified duration.
256/// let timeout = duration
257/// .map(|d| after(d))
258/// .unwrap_or(never());
259///
260/// select! {
261/// recv(r) -> msg => assert_eq!(msg, Ok(1)),
262/// recv(timeout) -> _ => println!("timed out"),
263/// }
264/// ```
265pub fn never<T>() -> Receiver<T> {
266 Receiver {
267 flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
268 }
269}
270
271/// Creates a receiver that delivers messages periodically.
272///
273/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
274/// sent into the channel in intervals of `duration`. Each message is the instant at which it is
275/// sent.
276///
277/// # Examples
278///
279/// Using a `tick` channel to periodically print elapsed time:
280///
281/// ```
282/// use std::time::{Duration, Instant};
283/// use crossbeam_channel::tick;
284///
285/// let start = Instant::now();
286/// let ticker = tick(Duration::from_millis(100));
287///
288/// for _ in 0..5 {
289/// ticker.recv().unwrap();
290/// println!("elapsed: {:?}", start.elapsed());
291/// }
292/// ```
293///
294/// When messages get sent:
295///
296/// ```
297/// use std::thread;
298/// use std::time::{Duration, Instant};
299/// use crossbeam_channel::tick;
300///
301/// // Converts a number of milliseconds into a `Duration`.
302/// let ms = |ms| Duration::from_millis(ms);
303///
304/// // Returns `true` if `a` and `b` are very close `Instant`s.
305/// let eq = |a, b| a + ms(65) > b && b + ms(65) > a;
306///
307/// let start = Instant::now();
308/// let r = tick(ms(100));
309///
310/// // This message was sent 100 ms from the start and received 100 ms from the start.
311/// assert!(eq(r.recv().unwrap(), start + ms(100)));
312/// assert!(eq(Instant::now(), start + ms(100)));
313///
314/// thread::sleep(ms(500));
315///
316/// // This message was sent 200 ms from the start and received 600 ms from the start.
317/// assert!(eq(r.recv().unwrap(), start + ms(200)));
318/// assert!(eq(Instant::now(), start + ms(600)));
319///
320/// // This message was sent 700 ms from the start and received 700 ms from the start.
321/// assert!(eq(r.recv().unwrap(), start + ms(700)));
322/// assert!(eq(Instant::now(), start + ms(700)));
323/// ```
324pub fn tick(duration: Duration) -> Receiver<Instant> {
325 match Instant::now().checked_add(duration) {
326 Some(delivery_time: Instant) => Receiver {
327 flavor: ReceiverFlavor::Tick(Arc::new(data:flavors::tick::Channel::new(
328 delivery_time,
329 dur:duration,
330 ))),
331 },
332 None => never(),
333 }
334}
335
336/// The sending side of a channel.
337///
338/// # Examples
339///
340/// ```
341/// use std::thread;
342/// use crossbeam_channel::unbounded;
343///
344/// let (s1, r) = unbounded();
345/// let s2 = s1.clone();
346///
347/// thread::spawn(move || s1.send(1).unwrap());
348/// thread::spawn(move || s2.send(2).unwrap());
349///
350/// let msg1 = r.recv().unwrap();
351/// let msg2 = r.recv().unwrap();
352///
353/// assert_eq!(msg1 + msg2, 3);
354/// ```
355pub struct Sender<T> {
356 flavor: SenderFlavor<T>,
357}
358
359/// Sender flavors.
360enum SenderFlavor<T> {
361 /// Bounded channel based on a preallocated array.
362 Array(counter::Sender<flavors::array::Channel<T>>),
363
364 /// Unbounded channel implemented as a linked list.
365 List(counter::Sender<flavors::list::Channel<T>>),
366
367 /// Zero-capacity channel.
368 Zero(counter::Sender<flavors::zero::Channel<T>>),
369}
370
371unsafe impl<T: Send> Send for Sender<T> {}
372unsafe impl<T: Send> Sync for Sender<T> {}
373
374impl<T> UnwindSafe for Sender<T> {}
375impl<T> RefUnwindSafe for Sender<T> {}
376
377impl<T> Sender<T> {
378 /// Attempts to send a message into the channel without blocking.
379 ///
380 /// This method will either send a message into the channel immediately or return an error if
381 /// the channel is full or disconnected. The returned error contains the original message.
382 ///
383 /// If called on a zero-capacity channel, this method will send the message only if there
384 /// happens to be a receive operation on the other side of the channel at the same time.
385 ///
386 /// # Examples
387 ///
388 /// ```
389 /// use crossbeam_channel::{bounded, TrySendError};
390 ///
391 /// let (s, r) = bounded(1);
392 ///
393 /// assert_eq!(s.try_send(1), Ok(()));
394 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
395 ///
396 /// drop(r);
397 /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
398 /// ```
399 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
400 match &self.flavor {
401 SenderFlavor::Array(chan) => chan.try_send(msg),
402 SenderFlavor::List(chan) => chan.try_send(msg),
403 SenderFlavor::Zero(chan) => chan.try_send(msg),
404 }
405 }
406
407 /// Blocks the current thread until a message is sent or the channel is disconnected.
408 ///
409 /// If the channel is full and not disconnected, this call will block until the send operation
410 /// can proceed. If the channel becomes disconnected, this call will wake up and return an
411 /// error. The returned error contains the original message.
412 ///
413 /// If called on a zero-capacity channel, this method will wait for a receive operation to
414 /// appear on the other side of the channel.
415 ///
416 /// # Examples
417 ///
418 /// ```
419 /// use std::thread;
420 /// use std::time::Duration;
421 /// use crossbeam_channel::{bounded, SendError};
422 ///
423 /// let (s, r) = bounded(1);
424 /// assert_eq!(s.send(1), Ok(()));
425 ///
426 /// thread::spawn(move || {
427 /// assert_eq!(r.recv(), Ok(1));
428 /// thread::sleep(Duration::from_secs(1));
429 /// drop(r);
430 /// });
431 ///
432 /// assert_eq!(s.send(2), Ok(()));
433 /// assert_eq!(s.send(3), Err(SendError(3)));
434 /// ```
435 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
436 match &self.flavor {
437 SenderFlavor::Array(chan) => chan.send(msg, None),
438 SenderFlavor::List(chan) => chan.send(msg, None),
439 SenderFlavor::Zero(chan) => chan.send(msg, None),
440 }
441 .map_err(|err| match err {
442 SendTimeoutError::Disconnected(msg) => SendError(msg),
443 SendTimeoutError::Timeout(_) => unreachable!(),
444 })
445 }
446
447 /// Waits for a message to be sent into the channel, but only for a limited time.
448 ///
449 /// If the channel is full and not disconnected, this call will block until the send operation
450 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
451 /// wake up and return an error. The returned error contains the original message.
452 ///
453 /// If called on a zero-capacity channel, this method will wait for a receive operation to
454 /// appear on the other side of the channel.
455 ///
456 /// # Examples
457 ///
458 /// ```
459 /// use std::thread;
460 /// use std::time::Duration;
461 /// use crossbeam_channel::{bounded, SendTimeoutError};
462 ///
463 /// let (s, r) = bounded(0);
464 ///
465 /// thread::spawn(move || {
466 /// thread::sleep(Duration::from_secs(1));
467 /// assert_eq!(r.recv(), Ok(2));
468 /// drop(r);
469 /// });
470 ///
471 /// assert_eq!(
472 /// s.send_timeout(1, Duration::from_millis(500)),
473 /// Err(SendTimeoutError::Timeout(1)),
474 /// );
475 /// assert_eq!(
476 /// s.send_timeout(2, Duration::from_secs(1)),
477 /// Ok(()),
478 /// );
479 /// assert_eq!(
480 /// s.send_timeout(3, Duration::from_millis(500)),
481 /// Err(SendTimeoutError::Disconnected(3)),
482 /// );
483 /// ```
484 pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
485 match Instant::now().checked_add(timeout) {
486 Some(deadline) => self.send_deadline(msg, deadline),
487 None => self.send(msg).map_err(SendTimeoutError::from),
488 }
489 }
490
491 /// Waits for a message to be sent into the channel, but only until a given deadline.
492 ///
493 /// If the channel is full and not disconnected, this call will block until the send operation
494 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
495 /// wake up and return an error. The returned error contains the original message.
496 ///
497 /// If called on a zero-capacity channel, this method will wait for a receive operation to
498 /// appear on the other side of the channel.
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// use std::thread;
504 /// use std::time::{Duration, Instant};
505 /// use crossbeam_channel::{bounded, SendTimeoutError};
506 ///
507 /// let (s, r) = bounded(0);
508 ///
509 /// thread::spawn(move || {
510 /// thread::sleep(Duration::from_secs(1));
511 /// assert_eq!(r.recv(), Ok(2));
512 /// drop(r);
513 /// });
514 ///
515 /// let now = Instant::now();
516 ///
517 /// assert_eq!(
518 /// s.send_deadline(1, now + Duration::from_millis(500)),
519 /// Err(SendTimeoutError::Timeout(1)),
520 /// );
521 /// assert_eq!(
522 /// s.send_deadline(2, now + Duration::from_millis(1500)),
523 /// Ok(()),
524 /// );
525 /// assert_eq!(
526 /// s.send_deadline(3, now + Duration::from_millis(2000)),
527 /// Err(SendTimeoutError::Disconnected(3)),
528 /// );
529 /// ```
530 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
531 match &self.flavor {
532 SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
533 SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
534 SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
535 }
536 }
537
538 /// Returns `true` if the channel is empty.
539 ///
540 /// Note: Zero-capacity channels are always empty.
541 ///
542 /// # Examples
543 ///
544 /// ```
545 /// use crossbeam_channel::unbounded;
546 ///
547 /// let (s, r) = unbounded();
548 /// assert!(s.is_empty());
549 ///
550 /// s.send(0).unwrap();
551 /// assert!(!s.is_empty());
552 /// ```
553 pub fn is_empty(&self) -> bool {
554 match &self.flavor {
555 SenderFlavor::Array(chan) => chan.is_empty(),
556 SenderFlavor::List(chan) => chan.is_empty(),
557 SenderFlavor::Zero(chan) => chan.is_empty(),
558 }
559 }
560
561 /// Returns `true` if the channel is full.
562 ///
563 /// Note: Zero-capacity channels are always full.
564 ///
565 /// # Examples
566 ///
567 /// ```
568 /// use crossbeam_channel::bounded;
569 ///
570 /// let (s, r) = bounded(1);
571 ///
572 /// assert!(!s.is_full());
573 /// s.send(0).unwrap();
574 /// assert!(s.is_full());
575 /// ```
576 pub fn is_full(&self) -> bool {
577 match &self.flavor {
578 SenderFlavor::Array(chan) => chan.is_full(),
579 SenderFlavor::List(chan) => chan.is_full(),
580 SenderFlavor::Zero(chan) => chan.is_full(),
581 }
582 }
583
584 /// Returns the number of messages in the channel.
585 ///
586 /// # Examples
587 ///
588 /// ```
589 /// use crossbeam_channel::unbounded;
590 ///
591 /// let (s, r) = unbounded();
592 /// assert_eq!(s.len(), 0);
593 ///
594 /// s.send(1).unwrap();
595 /// s.send(2).unwrap();
596 /// assert_eq!(s.len(), 2);
597 /// ```
598 pub fn len(&self) -> usize {
599 match &self.flavor {
600 SenderFlavor::Array(chan) => chan.len(),
601 SenderFlavor::List(chan) => chan.len(),
602 SenderFlavor::Zero(chan) => chan.len(),
603 }
604 }
605
606 /// If the channel is bounded, returns its capacity.
607 ///
608 /// # Examples
609 ///
610 /// ```
611 /// use crossbeam_channel::{bounded, unbounded};
612 ///
613 /// let (s, _) = unbounded::<i32>();
614 /// assert_eq!(s.capacity(), None);
615 ///
616 /// let (s, _) = bounded::<i32>(5);
617 /// assert_eq!(s.capacity(), Some(5));
618 ///
619 /// let (s, _) = bounded::<i32>(0);
620 /// assert_eq!(s.capacity(), Some(0));
621 /// ```
622 pub fn capacity(&self) -> Option<usize> {
623 match &self.flavor {
624 SenderFlavor::Array(chan) => chan.capacity(),
625 SenderFlavor::List(chan) => chan.capacity(),
626 SenderFlavor::Zero(chan) => chan.capacity(),
627 }
628 }
629
630 /// Returns `true` if senders belong to the same channel.
631 ///
632 /// # Examples
633 ///
634 /// ```rust
635 /// use crossbeam_channel::unbounded;
636 ///
637 /// let (s, _) = unbounded::<usize>();
638 ///
639 /// let s2 = s.clone();
640 /// assert!(s.same_channel(&s2));
641 ///
642 /// let (s3, _) = unbounded();
643 /// assert!(!s.same_channel(&s3));
644 /// ```
645 pub fn same_channel(&self, other: &Sender<T>) -> bool {
646 match (&self.flavor, &other.flavor) {
647 (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
648 (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
649 (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
650 _ => false,
651 }
652 }
653}
654
655impl<T> Drop for Sender<T> {
656 fn drop(&mut self) {
657 unsafe {
658 match &self.flavor {
659 SenderFlavor::Array(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect()),
660 SenderFlavor::List(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect_senders()),
661 SenderFlavor::Zero(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect()),
662 }
663 }
664 }
665}
666
667impl<T> Clone for Sender<T> {
668 fn clone(&self) -> Self {
669 let flavor: SenderFlavor = match &self.flavor {
670 SenderFlavor::Array(chan: &Sender>) => SenderFlavor::Array(chan.acquire()),
671 SenderFlavor::List(chan: &Sender>) => SenderFlavor::List(chan.acquire()),
672 SenderFlavor::Zero(chan: &Sender>) => SenderFlavor::Zero(chan.acquire()),
673 };
674
675 Sender { flavor }
676 }
677}
678
679impl<T> fmt::Debug for Sender<T> {
680 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
681 f.pad("Sender { .. }")
682 }
683}
684
685/// The receiving side of a channel.
686///
687/// # Examples
688///
689/// ```
690/// use std::thread;
691/// use std::time::Duration;
692/// use crossbeam_channel::unbounded;
693///
694/// let (s, r) = unbounded();
695///
696/// thread::spawn(move || {
697/// let _ = s.send(1);
698/// thread::sleep(Duration::from_secs(1));
699/// let _ = s.send(2);
700/// });
701///
702/// assert_eq!(r.recv(), Ok(1)); // Received immediately.
703/// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
704/// ```
705pub struct Receiver<T> {
706 flavor: ReceiverFlavor<T>,
707}
708
709/// Receiver flavors.
710enum ReceiverFlavor<T> {
711 /// Bounded channel based on a preallocated array.
712 Array(counter::Receiver<flavors::array::Channel<T>>),
713
714 /// Unbounded channel implemented as a linked list.
715 List(counter::Receiver<flavors::list::Channel<T>>),
716
717 /// Zero-capacity channel.
718 Zero(counter::Receiver<flavors::zero::Channel<T>>),
719
720 /// The after flavor.
721 At(Arc<flavors::at::Channel>),
722
723 /// The tick flavor.
724 Tick(Arc<flavors::tick::Channel>),
725
726 /// The never flavor.
727 Never(flavors::never::Channel<T>),
728}
729
730unsafe impl<T: Send> Send for Receiver<T> {}
731unsafe impl<T: Send> Sync for Receiver<T> {}
732
733impl<T> UnwindSafe for Receiver<T> {}
734impl<T> RefUnwindSafe for Receiver<T> {}
735
736impl<T> Receiver<T> {
737 /// Attempts to receive a message from the channel without blocking.
738 ///
739 /// This method will either receive a message from the channel immediately or return an error
740 /// if the channel is empty.
741 ///
742 /// If called on a zero-capacity channel, this method will receive a message only if there
743 /// happens to be a send operation on the other side of the channel at the same time.
744 ///
745 /// # Examples
746 ///
747 /// ```
748 /// use crossbeam_channel::{unbounded, TryRecvError};
749 ///
750 /// let (s, r) = unbounded();
751 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
752 ///
753 /// s.send(5).unwrap();
754 /// drop(s);
755 ///
756 /// assert_eq!(r.try_recv(), Ok(5));
757 /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
758 /// ```
759 pub fn try_recv(&self) -> Result<T, TryRecvError> {
760 match &self.flavor {
761 ReceiverFlavor::Array(chan) => chan.try_recv(),
762 ReceiverFlavor::List(chan) => chan.try_recv(),
763 ReceiverFlavor::Zero(chan) => chan.try_recv(),
764 ReceiverFlavor::At(chan) => {
765 let msg = chan.try_recv();
766 unsafe {
767 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
768 &msg,
769 )
770 }
771 }
772 ReceiverFlavor::Tick(chan) => {
773 let msg = chan.try_recv();
774 unsafe {
775 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
776 &msg,
777 )
778 }
779 }
780 ReceiverFlavor::Never(chan) => chan.try_recv(),
781 }
782 }
783
784 /// Blocks the current thread until a message is received or the channel is empty and
785 /// disconnected.
786 ///
787 /// If the channel is empty and not disconnected, this call will block until the receive
788 /// operation can proceed. If the channel is empty and becomes disconnected, this call will
789 /// wake up and return an error.
790 ///
791 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
792 /// on the other side of the channel.
793 ///
794 /// # Examples
795 ///
796 /// ```
797 /// use std::thread;
798 /// use std::time::Duration;
799 /// use crossbeam_channel::{unbounded, RecvError};
800 ///
801 /// let (s, r) = unbounded();
802 ///
803 /// thread::spawn(move || {
804 /// thread::sleep(Duration::from_secs(1));
805 /// s.send(5).unwrap();
806 /// drop(s);
807 /// });
808 ///
809 /// assert_eq!(r.recv(), Ok(5));
810 /// assert_eq!(r.recv(), Err(RecvError));
811 /// ```
812 pub fn recv(&self) -> Result<T, RecvError> {
813 match &self.flavor {
814 ReceiverFlavor::Array(chan) => chan.recv(None),
815 ReceiverFlavor::List(chan) => chan.recv(None),
816 ReceiverFlavor::Zero(chan) => chan.recv(None),
817 ReceiverFlavor::At(chan) => {
818 let msg = chan.recv(None);
819 unsafe {
820 mem::transmute_copy::<
821 Result<Instant, RecvTimeoutError>,
822 Result<T, RecvTimeoutError>,
823 >(&msg)
824 }
825 }
826 ReceiverFlavor::Tick(chan) => {
827 let msg = chan.recv(None);
828 unsafe {
829 mem::transmute_copy::<
830 Result<Instant, RecvTimeoutError>,
831 Result<T, RecvTimeoutError>,
832 >(&msg)
833 }
834 }
835 ReceiverFlavor::Never(chan) => chan.recv(None),
836 }
837 .map_err(|_| RecvError)
838 }
839
840 /// Waits for a message to be received from the channel, but only for a limited time.
841 ///
842 /// If the channel is empty and not disconnected, this call will block until the receive
843 /// operation can proceed or the operation times out. If the channel is empty and becomes
844 /// disconnected, this call will wake up and return an error.
845 ///
846 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
847 /// on the other side of the channel.
848 ///
849 /// # Examples
850 ///
851 /// ```
852 /// use std::thread;
853 /// use std::time::Duration;
854 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
855 ///
856 /// let (s, r) = unbounded();
857 ///
858 /// thread::spawn(move || {
859 /// thread::sleep(Duration::from_secs(1));
860 /// s.send(5).unwrap();
861 /// drop(s);
862 /// });
863 ///
864 /// assert_eq!(
865 /// r.recv_timeout(Duration::from_millis(500)),
866 /// Err(RecvTimeoutError::Timeout),
867 /// );
868 /// assert_eq!(
869 /// r.recv_timeout(Duration::from_secs(1)),
870 /// Ok(5),
871 /// );
872 /// assert_eq!(
873 /// r.recv_timeout(Duration::from_secs(1)),
874 /// Err(RecvTimeoutError::Disconnected),
875 /// );
876 /// ```
877 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
878 match Instant::now().checked_add(timeout) {
879 Some(deadline) => self.recv_deadline(deadline),
880 None => self.recv().map_err(RecvTimeoutError::from),
881 }
882 }
883
884 /// Waits for a message to be received from the channel, but only before a given deadline.
885 ///
886 /// If the channel is empty and not disconnected, this call will block until the receive
887 /// operation can proceed or the operation times out. If the channel is empty and becomes
888 /// disconnected, this call will wake up and return an error.
889 ///
890 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
891 /// on the other side of the channel.
892 ///
893 /// # Examples
894 ///
895 /// ```
896 /// use std::thread;
897 /// use std::time::{Instant, Duration};
898 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
899 ///
900 /// let (s, r) = unbounded();
901 ///
902 /// thread::spawn(move || {
903 /// thread::sleep(Duration::from_secs(1));
904 /// s.send(5).unwrap();
905 /// drop(s);
906 /// });
907 ///
908 /// let now = Instant::now();
909 ///
910 /// assert_eq!(
911 /// r.recv_deadline(now + Duration::from_millis(500)),
912 /// Err(RecvTimeoutError::Timeout),
913 /// );
914 /// assert_eq!(
915 /// r.recv_deadline(now + Duration::from_millis(1500)),
916 /// Ok(5),
917 /// );
918 /// assert_eq!(
919 /// r.recv_deadline(now + Duration::from_secs(5)),
920 /// Err(RecvTimeoutError::Disconnected),
921 /// );
922 /// ```
923 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
924 match &self.flavor {
925 ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
926 ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
927 ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
928 ReceiverFlavor::At(chan) => {
929 let msg = chan.recv(Some(deadline));
930 unsafe {
931 mem::transmute_copy::<
932 Result<Instant, RecvTimeoutError>,
933 Result<T, RecvTimeoutError>,
934 >(&msg)
935 }
936 }
937 ReceiverFlavor::Tick(chan) => {
938 let msg = chan.recv(Some(deadline));
939 unsafe {
940 mem::transmute_copy::<
941 Result<Instant, RecvTimeoutError>,
942 Result<T, RecvTimeoutError>,
943 >(&msg)
944 }
945 }
946 ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
947 }
948 }
949
950 /// Returns `true` if the channel is empty.
951 ///
952 /// Note: Zero-capacity channels are always empty.
953 ///
954 /// # Examples
955 ///
956 /// ```
957 /// use crossbeam_channel::unbounded;
958 ///
959 /// let (s, r) = unbounded();
960 ///
961 /// assert!(r.is_empty());
962 /// s.send(0).unwrap();
963 /// assert!(!r.is_empty());
964 /// ```
965 pub fn is_empty(&self) -> bool {
966 match &self.flavor {
967 ReceiverFlavor::Array(chan) => chan.is_empty(),
968 ReceiverFlavor::List(chan) => chan.is_empty(),
969 ReceiverFlavor::Zero(chan) => chan.is_empty(),
970 ReceiverFlavor::At(chan) => chan.is_empty(),
971 ReceiverFlavor::Tick(chan) => chan.is_empty(),
972 ReceiverFlavor::Never(chan) => chan.is_empty(),
973 }
974 }
975
976 /// Returns `true` if the channel is full.
977 ///
978 /// Note: Zero-capacity channels are always full.
979 ///
980 /// # Examples
981 ///
982 /// ```
983 /// use crossbeam_channel::bounded;
984 ///
985 /// let (s, r) = bounded(1);
986 ///
987 /// assert!(!r.is_full());
988 /// s.send(0).unwrap();
989 /// assert!(r.is_full());
990 /// ```
991 pub fn is_full(&self) -> bool {
992 match &self.flavor {
993 ReceiverFlavor::Array(chan) => chan.is_full(),
994 ReceiverFlavor::List(chan) => chan.is_full(),
995 ReceiverFlavor::Zero(chan) => chan.is_full(),
996 ReceiverFlavor::At(chan) => chan.is_full(),
997 ReceiverFlavor::Tick(chan) => chan.is_full(),
998 ReceiverFlavor::Never(chan) => chan.is_full(),
999 }
1000 }
1001
1002 /// Returns the number of messages in the channel.
1003 ///
1004 /// # Examples
1005 ///
1006 /// ```
1007 /// use crossbeam_channel::unbounded;
1008 ///
1009 /// let (s, r) = unbounded();
1010 /// assert_eq!(r.len(), 0);
1011 ///
1012 /// s.send(1).unwrap();
1013 /// s.send(2).unwrap();
1014 /// assert_eq!(r.len(), 2);
1015 /// ```
1016 pub fn len(&self) -> usize {
1017 match &self.flavor {
1018 ReceiverFlavor::Array(chan) => chan.len(),
1019 ReceiverFlavor::List(chan) => chan.len(),
1020 ReceiverFlavor::Zero(chan) => chan.len(),
1021 ReceiverFlavor::At(chan) => chan.len(),
1022 ReceiverFlavor::Tick(chan) => chan.len(),
1023 ReceiverFlavor::Never(chan) => chan.len(),
1024 }
1025 }
1026
1027 /// If the channel is bounded, returns its capacity.
1028 ///
1029 /// # Examples
1030 ///
1031 /// ```
1032 /// use crossbeam_channel::{bounded, unbounded};
1033 ///
1034 /// let (_, r) = unbounded::<i32>();
1035 /// assert_eq!(r.capacity(), None);
1036 ///
1037 /// let (_, r) = bounded::<i32>(5);
1038 /// assert_eq!(r.capacity(), Some(5));
1039 ///
1040 /// let (_, r) = bounded::<i32>(0);
1041 /// assert_eq!(r.capacity(), Some(0));
1042 /// ```
1043 pub fn capacity(&self) -> Option<usize> {
1044 match &self.flavor {
1045 ReceiverFlavor::Array(chan) => chan.capacity(),
1046 ReceiverFlavor::List(chan) => chan.capacity(),
1047 ReceiverFlavor::Zero(chan) => chan.capacity(),
1048 ReceiverFlavor::At(chan) => chan.capacity(),
1049 ReceiverFlavor::Tick(chan) => chan.capacity(),
1050 ReceiverFlavor::Never(chan) => chan.capacity(),
1051 }
1052 }
1053
1054 /// A blocking iterator over messages in the channel.
1055 ///
1056 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
1057 /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
1058 ///
1059 /// [`next`]: Iterator::next
1060 ///
1061 /// # Examples
1062 ///
1063 /// ```
1064 /// use std::thread;
1065 /// use crossbeam_channel::unbounded;
1066 ///
1067 /// let (s, r) = unbounded();
1068 ///
1069 /// thread::spawn(move || {
1070 /// s.send(1).unwrap();
1071 /// s.send(2).unwrap();
1072 /// s.send(3).unwrap();
1073 /// drop(s); // Disconnect the channel.
1074 /// });
1075 ///
1076 /// // Collect all messages from the channel.
1077 /// // Note that the call to `collect` blocks until the sender is dropped.
1078 /// let v: Vec<_> = r.iter().collect();
1079 ///
1080 /// assert_eq!(v, [1, 2, 3]);
1081 /// ```
1082 pub fn iter(&self) -> Iter<'_, T> {
1083 Iter { receiver: self }
1084 }
1085
1086 /// A non-blocking iterator over messages in the channel.
1087 ///
1088 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1089 /// never blocks waiting for the next message.
1090 ///
1091 /// [`next`]: Iterator::next
1092 ///
1093 /// # Examples
1094 ///
1095 /// ```
1096 /// use std::thread;
1097 /// use std::time::Duration;
1098 /// use crossbeam_channel::unbounded;
1099 ///
1100 /// let (s, r) = unbounded::<i32>();
1101 ///
1102 /// thread::spawn(move || {
1103 /// s.send(1).unwrap();
1104 /// thread::sleep(Duration::from_secs(1));
1105 /// s.send(2).unwrap();
1106 /// thread::sleep(Duration::from_secs(2));
1107 /// s.send(3).unwrap();
1108 /// });
1109 ///
1110 /// thread::sleep(Duration::from_secs(2));
1111 ///
1112 /// // Collect all messages from the channel without blocking.
1113 /// // The third message hasn't been sent yet so we'll collect only the first two.
1114 /// let v: Vec<_> = r.try_iter().collect();
1115 ///
1116 /// assert_eq!(v, [1, 2]);
1117 /// ```
1118 pub fn try_iter(&self) -> TryIter<'_, T> {
1119 TryIter { receiver: self }
1120 }
1121
1122 /// Returns `true` if receivers belong to the same channel.
1123 ///
1124 /// # Examples
1125 ///
1126 /// ```rust
1127 /// use crossbeam_channel::unbounded;
1128 ///
1129 /// let (_, r) = unbounded::<usize>();
1130 ///
1131 /// let r2 = r.clone();
1132 /// assert!(r.same_channel(&r2));
1133 ///
1134 /// let (_, r3) = unbounded();
1135 /// assert!(!r.same_channel(&r3));
1136 /// ```
1137 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1138 match (&self.flavor, &other.flavor) {
1139 (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1140 (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1141 (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1142 (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
1143 (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
1144 (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
1145 _ => false,
1146 }
1147 }
1148}
1149
1150impl<T> Drop for Receiver<T> {
1151 fn drop(&mut self) {
1152 unsafe {
1153 match &self.flavor {
1154 ReceiverFlavor::Array(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect()),
1155 ReceiverFlavor::List(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect_receivers()),
1156 ReceiverFlavor::Zero(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect()),
1157 ReceiverFlavor::At(_) => {}
1158 ReceiverFlavor::Tick(_) => {}
1159 ReceiverFlavor::Never(_) => {}
1160 }
1161 }
1162 }
1163}
1164
1165impl<T> Clone for Receiver<T> {
1166 fn clone(&self) -> Self {
1167 let flavor: ReceiverFlavor = match &self.flavor {
1168 ReceiverFlavor::Array(chan: &Receiver>) => ReceiverFlavor::Array(chan.acquire()),
1169 ReceiverFlavor::List(chan: &Receiver>) => ReceiverFlavor::List(chan.acquire()),
1170 ReceiverFlavor::Zero(chan: &Receiver>) => ReceiverFlavor::Zero(chan.acquire()),
1171 ReceiverFlavor::At(chan: &Arc) => ReceiverFlavor::At(chan.clone()),
1172 ReceiverFlavor::Tick(chan: &Arc) => ReceiverFlavor::Tick(chan.clone()),
1173 ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
1174 };
1175
1176 Receiver { flavor }
1177 }
1178}
1179
1180impl<T> fmt::Debug for Receiver<T> {
1181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1182 f.pad("Receiver { .. }")
1183 }
1184}
1185
1186impl<'a, T> IntoIterator for &'a Receiver<T> {
1187 type Item = T;
1188 type IntoIter = Iter<'a, T>;
1189
1190 fn into_iter(self) -> Self::IntoIter {
1191 self.iter()
1192 }
1193}
1194
1195impl<T> IntoIterator for Receiver<T> {
1196 type Item = T;
1197 type IntoIter = IntoIter<T>;
1198
1199 fn into_iter(self) -> Self::IntoIter {
1200 IntoIter { receiver: self }
1201 }
1202}
1203
1204/// A blocking iterator over messages in a channel.
1205///
1206/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1207/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1208///
1209/// [`next`]: Iterator::next
1210///
1211/// # Examples
1212///
1213/// ```
1214/// use std::thread;
1215/// use crossbeam_channel::unbounded;
1216///
1217/// let (s, r) = unbounded();
1218///
1219/// thread::spawn(move || {
1220/// s.send(1).unwrap();
1221/// s.send(2).unwrap();
1222/// s.send(3).unwrap();
1223/// drop(s); // Disconnect the channel.
1224/// });
1225///
1226/// // Collect all messages from the channel.
1227/// // Note that the call to `collect` blocks until the sender is dropped.
1228/// let v: Vec<_> = r.iter().collect();
1229///
1230/// assert_eq!(v, [1, 2, 3]);
1231/// ```
1232pub struct Iter<'a, T> {
1233 receiver: &'a Receiver<T>,
1234}
1235
1236impl<T> FusedIterator for Iter<'_, T> {}
1237
1238impl<T> Iterator for Iter<'_, T> {
1239 type Item = T;
1240
1241 fn next(&mut self) -> Option<Self::Item> {
1242 self.receiver.recv().ok()
1243 }
1244}
1245
1246impl<T> fmt::Debug for Iter<'_, T> {
1247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1248 f.pad("Iter { .. }")
1249 }
1250}
1251
1252/// A non-blocking iterator over messages in a channel.
1253///
1254/// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1255/// never blocks waiting for the next message.
1256///
1257/// [`next`]: Iterator::next
1258///
1259/// # Examples
1260///
1261/// ```
1262/// use std::thread;
1263/// use std::time::Duration;
1264/// use crossbeam_channel::unbounded;
1265///
1266/// let (s, r) = unbounded::<i32>();
1267///
1268/// thread::spawn(move || {
1269/// s.send(1).unwrap();
1270/// thread::sleep(Duration::from_secs(1));
1271/// s.send(2).unwrap();
1272/// thread::sleep(Duration::from_secs(2));
1273/// s.send(3).unwrap();
1274/// });
1275///
1276/// thread::sleep(Duration::from_secs(2));
1277///
1278/// // Collect all messages from the channel without blocking.
1279/// // The third message hasn't been sent yet so we'll collect only the first two.
1280/// let v: Vec<_> = r.try_iter().collect();
1281///
1282/// assert_eq!(v, [1, 2]);
1283/// ```
1284pub struct TryIter<'a, T> {
1285 receiver: &'a Receiver<T>,
1286}
1287
1288impl<T> Iterator for TryIter<'_, T> {
1289 type Item = T;
1290
1291 fn next(&mut self) -> Option<Self::Item> {
1292 self.receiver.try_recv().ok()
1293 }
1294}
1295
1296impl<T> fmt::Debug for TryIter<'_, T> {
1297 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1298 f.pad("TryIter { .. }")
1299 }
1300}
1301
1302/// A blocking iterator over messages in a channel.
1303///
1304/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1305/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1306///
1307/// [`next`]: Iterator::next
1308///
1309/// # Examples
1310///
1311/// ```
1312/// use std::thread;
1313/// use crossbeam_channel::unbounded;
1314///
1315/// let (s, r) = unbounded();
1316///
1317/// thread::spawn(move || {
1318/// s.send(1).unwrap();
1319/// s.send(2).unwrap();
1320/// s.send(3).unwrap();
1321/// drop(s); // Disconnect the channel.
1322/// });
1323///
1324/// // Collect all messages from the channel.
1325/// // Note that the call to `collect` blocks until the sender is dropped.
1326/// let v: Vec<_> = r.into_iter().collect();
1327///
1328/// assert_eq!(v, [1, 2, 3]);
1329/// ```
1330pub struct IntoIter<T> {
1331 receiver: Receiver<T>,
1332}
1333
1334impl<T> FusedIterator for IntoIter<T> {}
1335
1336impl<T> Iterator for IntoIter<T> {
1337 type Item = T;
1338
1339 fn next(&mut self) -> Option<Self::Item> {
1340 self.receiver.recv().ok()
1341 }
1342}
1343
1344impl<T> fmt::Debug for IntoIter<T> {
1345 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1346 f.pad("IntoIter { .. }")
1347 }
1348}
1349
1350impl<T> SelectHandle for Sender<T> {
1351 fn try_select(&self, token: &mut Token) -> bool {
1352 match &self.flavor {
1353 SenderFlavor::Array(chan) => chan.sender().try_select(token),
1354 SenderFlavor::List(chan) => chan.sender().try_select(token),
1355 SenderFlavor::Zero(chan) => chan.sender().try_select(token),
1356 }
1357 }
1358
1359 fn deadline(&self) -> Option<Instant> {
1360 None
1361 }
1362
1363 fn register(&self, oper: Operation, cx: &Context) -> bool {
1364 match &self.flavor {
1365 SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1366 SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1367 SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1368 }
1369 }
1370
1371 fn unregister(&self, oper: Operation) {
1372 match &self.flavor {
1373 SenderFlavor::Array(chan) => chan.sender().unregister(oper),
1374 SenderFlavor::List(chan) => chan.sender().unregister(oper),
1375 SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
1376 }
1377 }
1378
1379 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1380 match &self.flavor {
1381 SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
1382 SenderFlavor::List(chan) => chan.sender().accept(token, cx),
1383 SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
1384 }
1385 }
1386
1387 fn is_ready(&self) -> bool {
1388 match &self.flavor {
1389 SenderFlavor::Array(chan) => chan.sender().is_ready(),
1390 SenderFlavor::List(chan) => chan.sender().is_ready(),
1391 SenderFlavor::Zero(chan) => chan.sender().is_ready(),
1392 }
1393 }
1394
1395 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1396 match &self.flavor {
1397 SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
1398 SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
1399 SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
1400 }
1401 }
1402
1403 fn unwatch(&self, oper: Operation) {
1404 match &self.flavor {
1405 SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
1406 SenderFlavor::List(chan) => chan.sender().unwatch(oper),
1407 SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
1408 }
1409 }
1410}
1411
1412impl<T> SelectHandle for Receiver<T> {
1413 fn try_select(&self, token: &mut Token) -> bool {
1414 match &self.flavor {
1415 ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
1416 ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
1417 ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
1418 ReceiverFlavor::At(chan) => chan.try_select(token),
1419 ReceiverFlavor::Tick(chan) => chan.try_select(token),
1420 ReceiverFlavor::Never(chan) => chan.try_select(token),
1421 }
1422 }
1423
1424 fn deadline(&self) -> Option<Instant> {
1425 match &self.flavor {
1426 ReceiverFlavor::Array(_) => None,
1427 ReceiverFlavor::List(_) => None,
1428 ReceiverFlavor::Zero(_) => None,
1429 ReceiverFlavor::At(chan) => chan.deadline(),
1430 ReceiverFlavor::Tick(chan) => chan.deadline(),
1431 ReceiverFlavor::Never(chan) => chan.deadline(),
1432 }
1433 }
1434
1435 fn register(&self, oper: Operation, cx: &Context) -> bool {
1436 match &self.flavor {
1437 ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1438 ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1439 ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1440 ReceiverFlavor::At(chan) => chan.register(oper, cx),
1441 ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1442 ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1443 }
1444 }
1445
1446 fn unregister(&self, oper: Operation) {
1447 match &self.flavor {
1448 ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
1449 ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
1450 ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
1451 ReceiverFlavor::At(chan) => chan.unregister(oper),
1452 ReceiverFlavor::Tick(chan) => chan.unregister(oper),
1453 ReceiverFlavor::Never(chan) => chan.unregister(oper),
1454 }
1455 }
1456
1457 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1458 match &self.flavor {
1459 ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
1460 ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
1461 ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
1462 ReceiverFlavor::At(chan) => chan.accept(token, cx),
1463 ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
1464 ReceiverFlavor::Never(chan) => chan.accept(token, cx),
1465 }
1466 }
1467
1468 fn is_ready(&self) -> bool {
1469 match &self.flavor {
1470 ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
1471 ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
1472 ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
1473 ReceiverFlavor::At(chan) => chan.is_ready(),
1474 ReceiverFlavor::Tick(chan) => chan.is_ready(),
1475 ReceiverFlavor::Never(chan) => chan.is_ready(),
1476 }
1477 }
1478
1479 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1480 match &self.flavor {
1481 ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
1482 ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
1483 ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
1484 ReceiverFlavor::At(chan) => chan.watch(oper, cx),
1485 ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
1486 ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
1487 }
1488 }
1489
1490 fn unwatch(&self, oper: Operation) {
1491 match &self.flavor {
1492 ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
1493 ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
1494 ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
1495 ReceiverFlavor::At(chan) => chan.unwatch(oper),
1496 ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
1497 ReceiverFlavor::Never(chan) => chan.unwatch(oper),
1498 }
1499 }
1500}
1501
1502/// Writes a message into the channel.
1503pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
1504 match &s.flavor {
1505 SenderFlavor::Array(chan: &Sender>) => chan.write(token, msg),
1506 SenderFlavor::List(chan: &Sender>) => chan.write(token, msg),
1507 SenderFlavor::Zero(chan: &Sender>) => chan.write(token, msg),
1508 }
1509}
1510
1511/// Reads a message from the channel.
1512pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
1513 match &r.flavor {
1514 ReceiverFlavor::Array(chan: &Receiver>) => chan.read(token),
1515 ReceiverFlavor::List(chan: &Receiver>) => chan.read(token),
1516 ReceiverFlavor::Zero(chan: &Receiver>) => chan.read(token),
1517 ReceiverFlavor::At(chan: &Arc) => {
1518 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1519 }
1520 ReceiverFlavor::Tick(chan: &Arc) => {
1521 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1522 }
1523 ReceiverFlavor::Never(chan: &Channel) => chan.read(token),
1524 }
1525}
1526