1//! Interface to the select mechanism.
2
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem;
6use std::time::{Duration, Instant};
7use std::vec::Vec;
8
9use crossbeam_utils::Backoff;
10
11use crate::channel::{self, Receiver, Sender};
12use crate::context::Context;
13use crate::err::{ReadyTimeoutError, TryReadyError};
14use crate::err::{RecvError, SendError};
15use crate::err::{SelectTimeoutError, TrySelectError};
16use crate::flavors;
17use crate::utils;
18
19/// Temporary data that gets initialized during select or a blocking operation, and is consumed by
20/// `read` or `write`.
21///
22/// Each field contains data associated with a specific channel flavor.
23// This is a private API that is used by the select macro.
24#[derive(Debug, Default)]
25pub struct Token {
26 pub(crate) at: flavors::at::AtToken,
27 pub(crate) array: flavors::array::ArrayToken,
28 pub(crate) list: flavors::list::ListToken,
29 #[allow(dead_code)]
30 pub(crate) never: flavors::never::NeverToken,
31 pub(crate) tick: flavors::tick::TickToken,
32 pub(crate) zero: flavors::zero::ZeroToken,
33}
34
35/// Identifier associated with an operation by a specific thread on a specific channel.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct Operation(usize);
38
39impl Operation {
40 /// Creates an operation identifier from a mutable reference.
41 ///
42 /// This function essentially just turns the address of the reference into a number. The
43 /// reference should point to a variable that is specific to the thread and the operation,
44 /// and is alive for the entire duration of select or blocking operation.
45 #[inline]
46 pub fn hook<T>(r: &mut T) -> Operation {
47 let val: usize = r as *mut T as usize;
48 // Make sure that the pointer address doesn't equal the numerical representation of
49 // `Selected::{Waiting, Aborted, Disconnected}`.
50 assert!(val > 2);
51 Operation(val)
52 }
53}
54
55/// Current state of a select or a blocking operation.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum Selected {
58 /// Still waiting for an operation.
59 Waiting,
60
61 /// The attempt to block the current thread has been aborted.
62 Aborted,
63
64 /// An operation became ready because a channel is disconnected.
65 Disconnected,
66
67 /// An operation became ready because a message can be sent or received.
68 Operation(Operation),
69}
70
71impl From<usize> for Selected {
72 #[inline]
73 fn from(val: usize) -> Selected {
74 match val {
75 0 => Selected::Waiting,
76 1 => Selected::Aborted,
77 2 => Selected::Disconnected,
78 oper: usize => Selected::Operation(Operation(oper)),
79 }
80 }
81}
82
83impl Into<usize> for Selected {
84 #[inline]
85 fn into(self) -> usize {
86 match self {
87 Selected::Waiting => 0,
88 Selected::Aborted => 1,
89 Selected::Disconnected => 2,
90 Selected::Operation(Operation(val: usize)) => val,
91 }
92 }
93}
94
95/// A receiver or a sender that can participate in select.
96///
97/// This is a handle that assists select in executing an operation, registration, deciding on the
98/// appropriate deadline for blocking, etc.
99// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
100pub trait SelectHandle {
101 /// Attempts to select an operation and returns `true` on success.
102 fn try_select(&self, token: &mut Token) -> bool;
103
104 /// Returns a deadline for an operation, if there is one.
105 fn deadline(&self) -> Option<Instant>;
106
107 /// Registers an operation for execution and returns `true` if it is now ready.
108 fn register(&self, oper: Operation, cx: &Context) -> bool;
109
110 /// Unregisters an operation for execution.
111 fn unregister(&self, oper: Operation);
112
113 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
114 fn accept(&self, token: &mut Token, cx: &Context) -> bool;
115
116 /// Returns `true` if an operation can be executed without blocking.
117 fn is_ready(&self) -> bool;
118
119 /// Registers an operation for readiness notification and returns `true` if it is now ready.
120 fn watch(&self, oper: Operation, cx: &Context) -> bool;
121
122 /// Unregisters an operation for readiness notification.
123 fn unwatch(&self, oper: Operation);
124}
125
126impl<T: SelectHandle> SelectHandle for &T {
127 fn try_select(&self, token: &mut Token) -> bool {
128 (**self).try_select(token)
129 }
130
131 fn deadline(&self) -> Option<Instant> {
132 (**self).deadline()
133 }
134
135 fn register(&self, oper: Operation, cx: &Context) -> bool {
136 (**self).register(oper, cx)
137 }
138
139 fn unregister(&self, oper: Operation) {
140 (**self).unregister(oper);
141 }
142
143 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
144 (**self).accept(token, cx)
145 }
146
147 fn is_ready(&self) -> bool {
148 (**self).is_ready()
149 }
150
151 fn watch(&self, oper: Operation, cx: &Context) -> bool {
152 (**self).watch(oper, cx)
153 }
154
155 fn unwatch(&self, oper: Operation) {
156 (**self).unwatch(oper)
157 }
158}
159
160/// Determines when a select operation should time out.
161#[derive(Clone, Copy, Eq, PartialEq)]
162enum Timeout {
163 /// No blocking.
164 Now,
165
166 /// Block forever.
167 Never,
168
169 /// Time out after the time instant.
170 At(Instant),
171}
172
173/// Runs until one of the operations is selected, potentially blocking the current thread.
174///
175/// Successful receive operations will have to be followed up by `channel::read()` and successful
176/// send operations by `channel::write()`.
177fn run_select(
178 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
179 timeout: Timeout,
180) -> Option<(Token, usize, *const u8)> {
181 if handles.is_empty() {
182 // Wait until the timeout and return.
183 match timeout {
184 Timeout::Now => return None,
185 Timeout::Never => {
186 utils::sleep_until(None);
187 unreachable!();
188 }
189 Timeout::At(when) => {
190 utils::sleep_until(Some(when));
191 return None;
192 }
193 }
194 }
195
196 // Shuffle the operations for fairness.
197 utils::shuffle(handles);
198
199 // Create a token, which serves as a temporary variable that gets initialized in this function
200 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
201 // selected operation.
202 let mut token = Token::default();
203
204 // Try selecting one of the operations without blocking.
205 for &(handle, i, ptr) in handles.iter() {
206 if handle.try_select(&mut token) {
207 return Some((token, i, ptr));
208 }
209 }
210
211 loop {
212 // Prepare for blocking.
213 let res = Context::with(|cx| {
214 let mut sel = Selected::Waiting;
215 let mut registered_count = 0;
216 let mut index_ready = None;
217
218 if let Timeout::Now = timeout {
219 cx.try_select(Selected::Aborted).unwrap();
220 }
221
222 // Register all operations.
223 for (handle, i, _) in handles.iter_mut() {
224 registered_count += 1;
225
226 // If registration returns `false`, that means the operation has just become ready.
227 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
228 // Try aborting select.
229 sel = match cx.try_select(Selected::Aborted) {
230 Ok(()) => {
231 index_ready = Some(*i);
232 Selected::Aborted
233 }
234 Err(s) => s,
235 };
236 break;
237 }
238
239 // If another thread has already selected one of the operations, stop registration.
240 sel = cx.selected();
241 if sel != Selected::Waiting {
242 break;
243 }
244 }
245
246 if sel == Selected::Waiting {
247 // Check with each operation for how long we're allowed to block, and compute the
248 // earliest deadline.
249 let mut deadline: Option<Instant> = match timeout {
250 Timeout::Now => return None,
251 Timeout::Never => None,
252 Timeout::At(when) => Some(when),
253 };
254 for &(handle, _, _) in handles.iter() {
255 if let Some(x) = handle.deadline() {
256 deadline = deadline.map(|y| x.min(y)).or(Some(x));
257 }
258 }
259
260 // Block the current thread.
261 sel = cx.wait_until(deadline);
262 }
263
264 // Unregister all registered operations.
265 for (handle, _, _) in handles.iter_mut().take(registered_count) {
266 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
267 }
268
269 match sel {
270 Selected::Waiting => unreachable!(),
271 Selected::Aborted => {
272 // If an operation became ready during registration, try selecting it.
273 if let Some(index_ready) = index_ready {
274 for &(handle, i, ptr) in handles.iter() {
275 if i == index_ready && handle.try_select(&mut token) {
276 return Some((i, ptr));
277 }
278 }
279 }
280 }
281 Selected::Disconnected => {}
282 Selected::Operation(_) => {
283 // Find the selected operation.
284 for (handle, i, ptr) in handles.iter_mut() {
285 // Is this the selected operation?
286 if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
287 {
288 // Try selecting this operation.
289 if handle.accept(&mut token, cx) {
290 return Some((*i, *ptr));
291 }
292 }
293 }
294 }
295 }
296
297 None
298 });
299
300 // Return if an operation was selected.
301 if let Some((i, ptr)) = res {
302 return Some((token, i, ptr));
303 }
304
305 // Try selecting one of the operations without blocking.
306 for &(handle, i, ptr) in handles.iter() {
307 if handle.try_select(&mut token) {
308 return Some((token, i, ptr));
309 }
310 }
311
312 match timeout {
313 Timeout::Now => return None,
314 Timeout::Never => {}
315 Timeout::At(when) => {
316 if Instant::now() >= when {
317 return None;
318 }
319 }
320 }
321 }
322}
323
324/// Runs until one of the operations becomes ready, potentially blocking the current thread.
325fn run_ready(
326 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
327 timeout: Timeout,
328) -> Option<usize> {
329 if handles.is_empty() {
330 // Wait until the timeout and return.
331 match timeout {
332 Timeout::Now => return None,
333 Timeout::Never => {
334 utils::sleep_until(None);
335 unreachable!();
336 }
337 Timeout::At(when) => {
338 utils::sleep_until(Some(when));
339 return None;
340 }
341 }
342 }
343
344 // Shuffle the operations for fairness.
345 utils::shuffle(handles);
346
347 loop {
348 let backoff = Backoff::new();
349 loop {
350 // Check operations for readiness.
351 for &(handle, i, _) in handles.iter() {
352 if handle.is_ready() {
353 return Some(i);
354 }
355 }
356
357 if backoff.is_completed() {
358 break;
359 } else {
360 backoff.snooze();
361 }
362 }
363
364 // Check for timeout.
365 match timeout {
366 Timeout::Now => return None,
367 Timeout::Never => {}
368 Timeout::At(when) => {
369 if Instant::now() >= when {
370 return None;
371 }
372 }
373 }
374
375 // Prepare for blocking.
376 let res = Context::with(|cx| {
377 let mut sel = Selected::Waiting;
378 let mut registered_count = 0;
379
380 // Begin watching all operations.
381 for (handle, _, _) in handles.iter_mut() {
382 registered_count += 1;
383 let oper = Operation::hook::<&dyn SelectHandle>(handle);
384
385 // If registration returns `false`, that means the operation has just become ready.
386 if handle.watch(oper, cx) {
387 sel = match cx.try_select(Selected::Operation(oper)) {
388 Ok(()) => Selected::Operation(oper),
389 Err(s) => s,
390 };
391 break;
392 }
393
394 // If another thread has already chosen one of the operations, stop registration.
395 sel = cx.selected();
396 if sel != Selected::Waiting {
397 break;
398 }
399 }
400
401 if sel == Selected::Waiting {
402 // Check with each operation for how long we're allowed to block, and compute the
403 // earliest deadline.
404 let mut deadline: Option<Instant> = match timeout {
405 Timeout::Now => unreachable!(),
406 Timeout::Never => None,
407 Timeout::At(when) => Some(when),
408 };
409 for &(handle, _, _) in handles.iter() {
410 if let Some(x) = handle.deadline() {
411 deadline = deadline.map(|y| x.min(y)).or(Some(x));
412 }
413 }
414
415 // Block the current thread.
416 sel = cx.wait_until(deadline);
417 }
418
419 // Unwatch all operations.
420 for (handle, _, _) in handles.iter_mut().take(registered_count) {
421 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
422 }
423
424 match sel {
425 Selected::Waiting => unreachable!(),
426 Selected::Aborted => {}
427 Selected::Disconnected => {}
428 Selected::Operation(_) => {
429 for (handle, i, _) in handles.iter_mut() {
430 let oper = Operation::hook::<&dyn SelectHandle>(handle);
431 if sel == Selected::Operation(oper) {
432 return Some(*i);
433 }
434 }
435 }
436 }
437
438 None
439 });
440
441 // Return if an operation became ready.
442 if res.is_some() {
443 return res;
444 }
445 }
446}
447
448/// Attempts to select one of the operations without blocking.
449// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
450#[inline]
451pub fn try_select<'a>(
452 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
453) -> Result<SelectedOperation<'a>, TrySelectError> {
454 match run_select(handles, Timeout::Now) {
455 None => Err(TrySelectError),
456 Some((token: Token, index: usize, ptr: *const u8)) => Ok(SelectedOperation {
457 token,
458 index,
459 ptr,
460 _marker: PhantomData,
461 }),
462 }
463}
464
465/// Blocks until one of the operations becomes ready and selects it.
466// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
467#[inline]
468pub fn select<'a>(
469 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
470) -> SelectedOperation<'a> {
471 if handles.is_empty() {
472 panic!("no operations have been added to `Select`");
473 }
474
475 let (token: Token, index: usize, ptr: *const u8) = run_select(handles, Timeout::Never).unwrap();
476 SelectedOperation {
477 token,
478 index,
479 ptr,
480 _marker: PhantomData,
481 }
482}
483
484/// Blocks for a limited time until one of the operations becomes ready and selects it.
485// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
486#[inline]
487pub fn select_timeout<'a>(
488 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
489 timeout: Duration,
490) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
491 match Instant::now().checked_add(duration:timeout) {
492 Some(deadline: Instant) => select_deadline(handles, deadline),
493 None => Ok(select(handles)),
494 }
495}
496
497/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
498#[inline]
499pub(crate) fn select_deadline<'a>(
500 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
501 deadline: Instant,
502) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
503 match run_select(handles, Timeout::At(deadline)) {
504 None => Err(SelectTimeoutError),
505 Some((token: Token, index: usize, ptr: *const u8)) => Ok(SelectedOperation {
506 token,
507 index,
508 ptr,
509 _marker: PhantomData,
510 }),
511 }
512}
513
514/// Selects from a set of channel operations.
515///
516/// `Select` allows you to define a set of channel operations, wait until any one of them becomes
517/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
518/// among them is selected.
519///
520/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
521/// when it will simply return an error because the channel is disconnected.
522///
523/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
524/// dynamically created list of channel operations.
525///
526/// [`select!`]: crate::select!
527///
528/// Once a list of operations has been built with `Select`, there are two different ways of
529/// proceeding:
530///
531/// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
532/// the returned selected operation has already begun and **must** be completed. If we don't
533/// complete it, a panic will occur.
534///
535/// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
536/// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
537/// possible for another thread to make the operation not ready just before we try executing it,
538/// so it's wise to use a retry loop. However, note that these methods might return with success
539/// spuriously, so it's a good idea to always double check if the operation is really ready.
540///
541/// # Examples
542///
543/// Use [`select`] to receive a message from a list of receivers:
544///
545/// ```
546/// use crossbeam_channel::{Receiver, RecvError, Select};
547///
548/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
549/// // Build a list of operations.
550/// let mut sel = Select::new();
551/// for r in rs {
552/// sel.recv(r);
553/// }
554///
555/// // Complete the selected operation.
556/// let oper = sel.select();
557/// let index = oper.index();
558/// oper.recv(&rs[index])
559/// }
560/// ```
561///
562/// Use [`ready`] to receive a message from a list of receivers:
563///
564/// ```
565/// use crossbeam_channel::{Receiver, RecvError, Select};
566///
567/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
568/// // Build a list of operations.
569/// let mut sel = Select::new();
570/// for r in rs {
571/// sel.recv(r);
572/// }
573///
574/// loop {
575/// // Wait until a receive operation becomes ready and try executing it.
576/// let index = sel.ready();
577/// let res = rs[index].try_recv();
578///
579/// // If the operation turns out not to be ready, retry.
580/// if let Err(e) = res {
581/// if e.is_empty() {
582/// continue;
583/// }
584/// }
585///
586/// // Success!
587/// return res.map_err(|_| RecvError);
588/// }
589/// }
590/// ```
591///
592/// [`try_select`]: Select::try_select
593/// [`select`]: Select::select
594/// [`select_timeout`]: Select::select_timeout
595/// [`try_ready`]: Select::try_ready
596/// [`ready`]: Select::ready
597/// [`ready_timeout`]: Select::ready_timeout
598pub struct Select<'a> {
599 /// A list of senders and receivers participating in selection.
600 handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
601
602 /// The next index to assign to an operation.
603 next_index: usize,
604}
605
606unsafe impl Send for Select<'_> {}
607unsafe impl Sync for Select<'_> {}
608
609impl<'a> Select<'a> {
610 /// Creates an empty list of channel operations for selection.
611 ///
612 /// # Examples
613 ///
614 /// ```
615 /// use crossbeam_channel::Select;
616 ///
617 /// let mut sel = Select::new();
618 ///
619 /// // The list of operations is empty, which means no operation can be selected.
620 /// assert!(sel.try_select().is_err());
621 /// ```
622 pub fn new() -> Select<'a> {
623 Select {
624 handles: Vec::with_capacity(4),
625 next_index: 0,
626 }
627 }
628
629 /// Adds a send operation.
630 ///
631 /// Returns the index of the added operation.
632 ///
633 /// # Examples
634 ///
635 /// ```
636 /// use crossbeam_channel::{unbounded, Select};
637 ///
638 /// let (s, r) = unbounded::<i32>();
639 ///
640 /// let mut sel = Select::new();
641 /// let index = sel.send(&s);
642 /// ```
643 pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
644 let i = self.next_index;
645 let ptr = s as *const Sender<_> as *const u8;
646 self.handles.push((s, i, ptr));
647 self.next_index += 1;
648 i
649 }
650
651 /// Adds a receive operation.
652 ///
653 /// Returns the index of the added operation.
654 ///
655 /// # Examples
656 ///
657 /// ```
658 /// use crossbeam_channel::{unbounded, Select};
659 ///
660 /// let (s, r) = unbounded::<i32>();
661 ///
662 /// let mut sel = Select::new();
663 /// let index = sel.recv(&r);
664 /// ```
665 pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
666 let i = self.next_index;
667 let ptr = r as *const Receiver<_> as *const u8;
668 self.handles.push((r, i, ptr));
669 self.next_index += 1;
670 i
671 }
672
673 /// Removes a previously added operation.
674 ///
675 /// This is useful when an operation is selected because the channel got disconnected and we
676 /// want to try again to select a different operation instead.
677 ///
678 /// If new operations are added after removing some, the indices of removed operations will not
679 /// be reused.
680 ///
681 /// # Panics
682 ///
683 /// An attempt to remove a non-existing or already removed operation will panic.
684 ///
685 /// # Examples
686 ///
687 /// ```
688 /// use crossbeam_channel::{unbounded, Select};
689 ///
690 /// let (s1, r1) = unbounded::<i32>();
691 /// let (_, r2) = unbounded::<i32>();
692 ///
693 /// let mut sel = Select::new();
694 /// let oper1 = sel.recv(&r1);
695 /// let oper2 = sel.recv(&r2);
696 ///
697 /// // Both operations are initially ready, so a random one will be executed.
698 /// let oper = sel.select();
699 /// assert_eq!(oper.index(), oper2);
700 /// assert!(oper.recv(&r2).is_err());
701 /// sel.remove(oper2);
702 ///
703 /// s1.send(10).unwrap();
704 ///
705 /// let oper = sel.select();
706 /// assert_eq!(oper.index(), oper1);
707 /// assert_eq!(oper.recv(&r1), Ok(10));
708 /// ```
709 pub fn remove(&mut self, index: usize) {
710 assert!(
711 index < self.next_index,
712 "index out of bounds; {} >= {}",
713 index,
714 self.next_index,
715 );
716
717 let i = self
718 .handles
719 .iter()
720 .enumerate()
721 .find(|(_, (_, i, _))| *i == index)
722 .expect("no operation with this index")
723 .0;
724
725 self.handles.swap_remove(i);
726 }
727
728 /// Attempts to select one of the operations without blocking.
729 ///
730 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
731 /// the same time, a random one among them is selected. If none of the operations are ready, an
732 /// error is returned.
733 ///
734 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
735 /// even when it will simply return an error because the channel is disconnected.
736 ///
737 /// The selected operation must be completed with [`SelectedOperation::send`]
738 /// or [`SelectedOperation::recv`].
739 ///
740 /// # Examples
741 ///
742 /// ```
743 /// use crossbeam_channel::{unbounded, Select};
744 ///
745 /// let (s1, r1) = unbounded();
746 /// let (s2, r2) = unbounded();
747 ///
748 /// s1.send(10).unwrap();
749 /// s2.send(20).unwrap();
750 ///
751 /// let mut sel = Select::new();
752 /// let oper1 = sel.recv(&r1);
753 /// let oper2 = sel.recv(&r2);
754 ///
755 /// // Both operations are initially ready, so a random one will be executed.
756 /// let oper = sel.try_select();
757 /// match oper {
758 /// Err(_) => panic!("both operations should be ready"),
759 /// Ok(oper) => match oper.index() {
760 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
761 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
762 /// _ => unreachable!(),
763 /// }
764 /// }
765 /// ```
766 pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
767 try_select(&mut self.handles)
768 }
769
770 /// Blocks until one of the operations becomes ready and selects it.
771 ///
772 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
773 /// ready at the same time, a random one among them is selected.
774 ///
775 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
776 /// even when it will simply return an error because the channel is disconnected.
777 ///
778 /// The selected operation must be completed with [`SelectedOperation::send`]
779 /// or [`SelectedOperation::recv`].
780 ///
781 /// # Panics
782 ///
783 /// Panics if no operations have been added to `Select`.
784 ///
785 /// # Examples
786 ///
787 /// ```
788 /// use std::thread;
789 /// use std::time::Duration;
790 /// use crossbeam_channel::{unbounded, Select};
791 ///
792 /// let (s1, r1) = unbounded();
793 /// let (s2, r2) = unbounded();
794 ///
795 /// thread::spawn(move || {
796 /// thread::sleep(Duration::from_secs(1));
797 /// s1.send(10).unwrap();
798 /// });
799 /// thread::spawn(move || s2.send(20).unwrap());
800 ///
801 /// let mut sel = Select::new();
802 /// let oper1 = sel.recv(&r1);
803 /// let oper2 = sel.recv(&r2);
804 ///
805 /// // The second operation will be selected because it becomes ready first.
806 /// let oper = sel.select();
807 /// match oper.index() {
808 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
809 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
810 /// _ => unreachable!(),
811 /// }
812 /// ```
813 pub fn select(&mut self) -> SelectedOperation<'a> {
814 select(&mut self.handles)
815 }
816
817 /// Blocks for a limited time until one of the operations becomes ready and selects it.
818 ///
819 /// If an operation becomes ready, it is selected and returned. If multiple operations are
820 /// ready at the same time, a random one among them is selected. If none of the operations
821 /// become ready for the specified duration, an error is returned.
822 ///
823 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
824 /// even when it will simply return an error because the channel is disconnected.
825 ///
826 /// The selected operation must be completed with [`SelectedOperation::send`]
827 /// or [`SelectedOperation::recv`].
828 ///
829 /// # Examples
830 ///
831 /// ```
832 /// use std::thread;
833 /// use std::time::Duration;
834 /// use crossbeam_channel::{unbounded, Select};
835 ///
836 /// let (s1, r1) = unbounded();
837 /// let (s2, r2) = unbounded();
838 ///
839 /// thread::spawn(move || {
840 /// thread::sleep(Duration::from_secs(1));
841 /// s1.send(10).unwrap();
842 /// });
843 /// thread::spawn(move || s2.send(20).unwrap());
844 ///
845 /// let mut sel = Select::new();
846 /// let oper1 = sel.recv(&r1);
847 /// let oper2 = sel.recv(&r2);
848 ///
849 /// // The second operation will be selected because it becomes ready first.
850 /// let oper = sel.select_timeout(Duration::from_millis(500));
851 /// match oper {
852 /// Err(_) => panic!("should not have timed out"),
853 /// Ok(oper) => match oper.index() {
854 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
855 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
856 /// _ => unreachable!(),
857 /// }
858 /// }
859 /// ```
860 pub fn select_timeout(
861 &mut self,
862 timeout: Duration,
863 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
864 select_timeout(&mut self.handles, timeout)
865 }
866
867 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
868 ///
869 /// If an operation becomes ready, it is selected and returned. If multiple operations are
870 /// ready at the same time, a random one among them is selected. If none of the operations
871 /// become ready before the given deadline, an error is returned.
872 ///
873 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
874 /// even when it will simply return an error because the channel is disconnected.
875 ///
876 /// The selected operation must be completed with [`SelectedOperation::send`]
877 /// or [`SelectedOperation::recv`].
878 ///
879 /// # Examples
880 ///
881 /// ```
882 /// use std::thread;
883 /// use std::time::{Instant, Duration};
884 /// use crossbeam_channel::{unbounded, Select};
885 ///
886 /// let (s1, r1) = unbounded();
887 /// let (s2, r2) = unbounded();
888 ///
889 /// thread::spawn(move || {
890 /// thread::sleep(Duration::from_secs(1));
891 /// s1.send(10).unwrap();
892 /// });
893 /// thread::spawn(move || s2.send(20).unwrap());
894 ///
895 /// let mut sel = Select::new();
896 /// let oper1 = sel.recv(&r1);
897 /// let oper2 = sel.recv(&r2);
898 ///
899 /// let deadline = Instant::now() + Duration::from_millis(500);
900 ///
901 /// // The second operation will be selected because it becomes ready first.
902 /// let oper = sel.select_deadline(deadline);
903 /// match oper {
904 /// Err(_) => panic!("should not have timed out"),
905 /// Ok(oper) => match oper.index() {
906 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
907 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
908 /// _ => unreachable!(),
909 /// }
910 /// }
911 /// ```
912 pub fn select_deadline(
913 &mut self,
914 deadline: Instant,
915 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
916 select_deadline(&mut self.handles, deadline)
917 }
918
919 /// Attempts to find a ready operation without blocking.
920 ///
921 /// If an operation is ready, its index is returned. If multiple operations are ready at the
922 /// same time, a random one among them is chosen. If none of the operations are ready, an error
923 /// is returned.
924 ///
925 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
926 /// even when it will simply return an error because the channel is disconnected.
927 ///
928 /// Note that this method might return with success spuriously, so it's a good idea to always
929 /// double check if the operation is really ready.
930 ///
931 /// # Examples
932 ///
933 /// ```
934 /// use crossbeam_channel::{unbounded, Select};
935 ///
936 /// let (s1, r1) = unbounded();
937 /// let (s2, r2) = unbounded();
938 ///
939 /// s1.send(10).unwrap();
940 /// s2.send(20).unwrap();
941 ///
942 /// let mut sel = Select::new();
943 /// let oper1 = sel.recv(&r1);
944 /// let oper2 = sel.recv(&r2);
945 ///
946 /// // Both operations are initially ready, so a random one will be chosen.
947 /// match sel.try_ready() {
948 /// Err(_) => panic!("both operations should be ready"),
949 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
950 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
951 /// Ok(_) => unreachable!(),
952 /// }
953 /// ```
954 pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
955 match run_ready(&mut self.handles, Timeout::Now) {
956 None => Err(TryReadyError),
957 Some(index) => Ok(index),
958 }
959 }
960
961 /// Blocks until one of the operations becomes ready.
962 ///
963 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
964 /// the same time, a random one among them is chosen.
965 ///
966 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
967 /// even when it will simply return an error because the channel is disconnected.
968 ///
969 /// Note that this method might return with success spuriously, so it's a good idea to always
970 /// double check if the operation is really ready.
971 ///
972 /// # Panics
973 ///
974 /// Panics if no operations have been added to `Select`.
975 ///
976 /// # Examples
977 ///
978 /// ```
979 /// use std::thread;
980 /// use std::time::Duration;
981 /// use crossbeam_channel::{unbounded, Select};
982 ///
983 /// let (s1, r1) = unbounded();
984 /// let (s2, r2) = unbounded();
985 ///
986 /// thread::spawn(move || {
987 /// thread::sleep(Duration::from_secs(1));
988 /// s1.send(10).unwrap();
989 /// });
990 /// thread::spawn(move || s2.send(20).unwrap());
991 ///
992 /// let mut sel = Select::new();
993 /// let oper1 = sel.recv(&r1);
994 /// let oper2 = sel.recv(&r2);
995 ///
996 /// // The second operation will be selected because it becomes ready first.
997 /// match sel.ready() {
998 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
999 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1000 /// _ => unreachable!(),
1001 /// }
1002 /// ```
1003 pub fn ready(&mut self) -> usize {
1004 if self.handles.is_empty() {
1005 panic!("no operations have been added to `Select`");
1006 }
1007
1008 run_ready(&mut self.handles, Timeout::Never).unwrap()
1009 }
1010
1011 /// Blocks for a limited time until one of the operations becomes ready.
1012 ///
1013 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1014 /// the same time, a random one among them is chosen. If none of the operations become ready
1015 /// for the specified duration, an error is returned.
1016 ///
1017 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1018 /// even when it will simply return an error because the channel is disconnected.
1019 ///
1020 /// Note that this method might return with success spuriously, so it's a good idea to double
1021 /// check if the operation is really ready.
1022 ///
1023 /// # Examples
1024 ///
1025 /// ```
1026 /// use std::thread;
1027 /// use std::time::Duration;
1028 /// use crossbeam_channel::{unbounded, Select};
1029 ///
1030 /// let (s1, r1) = unbounded();
1031 /// let (s2, r2) = unbounded();
1032 ///
1033 /// thread::spawn(move || {
1034 /// thread::sleep(Duration::from_secs(1));
1035 /// s1.send(10).unwrap();
1036 /// });
1037 /// thread::spawn(move || s2.send(20).unwrap());
1038 ///
1039 /// let mut sel = Select::new();
1040 /// let oper1 = sel.recv(&r1);
1041 /// let oper2 = sel.recv(&r2);
1042 ///
1043 /// // The second operation will be selected because it becomes ready first.
1044 /// match sel.ready_timeout(Duration::from_millis(500)) {
1045 /// Err(_) => panic!("should not have timed out"),
1046 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1047 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1048 /// Ok(_) => unreachable!(),
1049 /// }
1050 /// ```
1051 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1052 match Instant::now().checked_add(timeout) {
1053 Some(deadline) => self.ready_deadline(deadline),
1054 None => Ok(self.ready()),
1055 }
1056 }
1057
1058 /// Blocks until a given deadline, or until one of the operations becomes ready.
1059 ///
1060 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1061 /// the same time, a random one among them is chosen. If none of the operations become ready
1062 /// before the deadline, an error is returned.
1063 ///
1064 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1065 /// even when it will simply return an error because the channel is disconnected.
1066 ///
1067 /// Note that this method might return with success spuriously, so it's a good idea to double
1068 /// check if the operation is really ready.
1069 ///
1070 /// # Examples
1071 ///
1072 /// ```
1073 /// use std::thread;
1074 /// use std::time::{Duration, Instant};
1075 /// use crossbeam_channel::{unbounded, Select};
1076 ///
1077 /// let deadline = Instant::now() + Duration::from_millis(500);
1078 ///
1079 /// let (s1, r1) = unbounded();
1080 /// let (s2, r2) = unbounded();
1081 ///
1082 /// thread::spawn(move || {
1083 /// thread::sleep(Duration::from_secs(1));
1084 /// s1.send(10).unwrap();
1085 /// });
1086 /// thread::spawn(move || s2.send(20).unwrap());
1087 ///
1088 /// let mut sel = Select::new();
1089 /// let oper1 = sel.recv(&r1);
1090 /// let oper2 = sel.recv(&r2);
1091 ///
1092 /// // The second operation will be selected because it becomes ready first.
1093 /// match sel.ready_deadline(deadline) {
1094 /// Err(_) => panic!("should not have timed out"),
1095 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1096 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1097 /// Ok(_) => unreachable!(),
1098 /// }
1099 /// ```
1100 pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1101 match run_ready(&mut self.handles, Timeout::At(deadline)) {
1102 None => Err(ReadyTimeoutError),
1103 Some(index) => Ok(index),
1104 }
1105 }
1106}
1107
1108impl<'a> Clone for Select<'a> {
1109 fn clone(&self) -> Select<'a> {
1110 Select {
1111 handles: self.handles.clone(),
1112 next_index: self.next_index,
1113 }
1114 }
1115}
1116
1117impl<'a> Default for Select<'a> {
1118 fn default() -> Select<'a> {
1119 Select::new()
1120 }
1121}
1122
1123impl fmt::Debug for Select<'_> {
1124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1125 f.pad("Select { .. }")
1126 }
1127}
1128
1129/// A selected operation that needs to be completed.
1130///
1131/// To complete the operation, call [`send`] or [`recv`].
1132///
1133/// # Panics
1134///
1135/// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1136/// `SelectedOperation` is dropped without completion, a panic occurs.
1137///
1138/// [`send`]: SelectedOperation::send
1139/// [`recv`]: SelectedOperation::recv
1140#[must_use]
1141pub struct SelectedOperation<'a> {
1142 /// Token needed to complete the operation.
1143 token: Token,
1144
1145 /// The index of the selected operation.
1146 index: usize,
1147
1148 /// The address of the selected `Sender` or `Receiver`.
1149 ptr: *const u8,
1150
1151 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1152 _marker: PhantomData<&'a ()>,
1153}
1154
1155impl SelectedOperation<'_> {
1156 /// Returns the index of the selected operation.
1157 ///
1158 /// # Examples
1159 ///
1160 /// ```
1161 /// use crossbeam_channel::{bounded, Select};
1162 ///
1163 /// let (s1, r1) = bounded::<()>(0);
1164 /// let (s2, r2) = bounded::<()>(0);
1165 /// let (s3, r3) = bounded::<()>(1);
1166 ///
1167 /// let mut sel = Select::new();
1168 /// let oper1 = sel.send(&s1);
1169 /// let oper2 = sel.recv(&r2);
1170 /// let oper3 = sel.send(&s3);
1171 ///
1172 /// // Only the last operation is ready.
1173 /// let oper = sel.select();
1174 /// assert_eq!(oper.index(), 2);
1175 /// assert_eq!(oper.index(), oper3);
1176 ///
1177 /// // Complete the operation.
1178 /// oper.send(&s3, ()).unwrap();
1179 /// ```
1180 pub fn index(&self) -> usize {
1181 self.index
1182 }
1183
1184 /// Completes the send operation.
1185 ///
1186 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1187 /// when the operation was added.
1188 ///
1189 /// # Panics
1190 ///
1191 /// Panics if an incorrect [`Sender`] reference is passed.
1192 ///
1193 /// # Examples
1194 ///
1195 /// ```
1196 /// use crossbeam_channel::{bounded, Select, SendError};
1197 ///
1198 /// let (s, r) = bounded::<i32>(0);
1199 /// drop(r);
1200 ///
1201 /// let mut sel = Select::new();
1202 /// let oper1 = sel.send(&s);
1203 ///
1204 /// let oper = sel.select();
1205 /// assert_eq!(oper.index(), oper1);
1206 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1207 /// ```
1208 pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1209 assert!(
1210 s as *const Sender<T> as *const u8 == self.ptr,
1211 "passed a sender that wasn't selected",
1212 );
1213 let res = unsafe { channel::write(s, &mut self.token, msg) };
1214 mem::forget(self);
1215 res.map_err(SendError)
1216 }
1217
1218 /// Completes the receive operation.
1219 ///
1220 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1221 /// when the operation was added.
1222 ///
1223 /// # Panics
1224 ///
1225 /// Panics if an incorrect [`Receiver`] reference is passed.
1226 ///
1227 /// # Examples
1228 ///
1229 /// ```
1230 /// use crossbeam_channel::{bounded, Select, RecvError};
1231 ///
1232 /// let (s, r) = bounded::<i32>(0);
1233 /// drop(s);
1234 ///
1235 /// let mut sel = Select::new();
1236 /// let oper1 = sel.recv(&r);
1237 ///
1238 /// let oper = sel.select();
1239 /// assert_eq!(oper.index(), oper1);
1240 /// assert_eq!(oper.recv(&r), Err(RecvError));
1241 /// ```
1242 pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1243 assert!(
1244 r as *const Receiver<T> as *const u8 == self.ptr,
1245 "passed a receiver that wasn't selected",
1246 );
1247 let res = unsafe { channel::read(r, &mut self.token) };
1248 mem::forget(self);
1249 res.map_err(|_| RecvError)
1250 }
1251}
1252
1253impl fmt::Debug for SelectedOperation<'_> {
1254 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1255 f.pad("SelectedOperation { .. }")
1256 }
1257}
1258
1259impl Drop for SelectedOperation<'_> {
1260 fn drop(&mut self) {
1261 panic!("dropped `SelectedOperation` without completing the operation");
1262 }
1263}
1264