1//! Interface to the select mechanism.
2
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem;
6use std::time::{Duration, Instant};
7
8use crossbeam_utils::Backoff;
9
10use crate::channel::{self, Receiver, Sender};
11use crate::context::Context;
12use crate::err::{ReadyTimeoutError, TryReadyError};
13use crate::err::{RecvError, SendError};
14use crate::err::{SelectTimeoutError, TrySelectError};
15use crate::flavors;
16use crate::utils;
17
18/// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19/// `read` or `write`.
20///
21/// Each field contains data associated with a specific channel flavor.
22// This is a private API that is used by the select macro.
23#[derive(Debug, Default)]
24pub struct Token {
25 pub(crate) at: flavors::at::AtToken,
26 pub(crate) array: flavors::array::ArrayToken,
27 pub(crate) list: flavors::list::ListToken,
28 #[allow(dead_code)]
29 pub(crate) never: flavors::never::NeverToken,
30 pub(crate) tick: flavors::tick::TickToken,
31 pub(crate) zero: flavors::zero::ZeroToken,
32}
33
34/// Identifier associated with an operation by a specific thread on a specific channel.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct Operation(usize);
37
38impl Operation {
39 /// Creates an operation identifier from a mutable reference.
40 ///
41 /// This function essentially just turns the address of the reference into a number. The
42 /// reference should point to a variable that is specific to the thread and the operation,
43 /// and is alive for the entire duration of select or blocking operation.
44 #[inline]
45 pub fn hook<T>(r: &mut T) -> Operation {
46 let val: usize = r as *mut T as usize;
47 // Make sure that the pointer address doesn't equal the numerical representation of
48 // `Selected::{Waiting, Aborted, Disconnected}`.
49 assert!(val > 2);
50 Operation(val)
51 }
52}
53
54/// Current state of a select or a blocking operation.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum Selected {
57 /// Still waiting for an operation.
58 Waiting,
59
60 /// The attempt to block the current thread has been aborted.
61 Aborted,
62
63 /// An operation became ready because a channel is disconnected.
64 Disconnected,
65
66 /// An operation became ready because a message can be sent or received.
67 Operation(Operation),
68}
69
70impl From<usize> for Selected {
71 #[inline]
72 fn from(val: usize) -> Selected {
73 match val {
74 0 => Selected::Waiting,
75 1 => Selected::Aborted,
76 2 => Selected::Disconnected,
77 oper: usize => Selected::Operation(Operation(oper)),
78 }
79 }
80}
81
82impl Into<usize> for Selected {
83 #[inline]
84 fn into(self) -> usize {
85 match self {
86 Selected::Waiting => 0,
87 Selected::Aborted => 1,
88 Selected::Disconnected => 2,
89 Selected::Operation(Operation(val: usize)) => val,
90 }
91 }
92}
93
94/// A receiver or a sender that can participate in select.
95///
96/// This is a handle that assists select in executing an operation, registration, deciding on the
97/// appropriate deadline for blocking, etc.
98// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
99pub trait SelectHandle {
100 /// Attempts to select an operation and returns `true` on success.
101 fn try_select(&self, token: &mut Token) -> bool;
102
103 /// Returns a deadline for an operation, if there is one.
104 fn deadline(&self) -> Option<Instant>;
105
106 /// Registers an operation for execution and returns `true` if it is now ready.
107 fn register(&self, oper: Operation, cx: &Context) -> bool;
108
109 /// Unregisters an operation for execution.
110 fn unregister(&self, oper: Operation);
111
112 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
113 fn accept(&self, token: &mut Token, cx: &Context) -> bool;
114
115 /// Returns `true` if an operation can be executed without blocking.
116 fn is_ready(&self) -> bool;
117
118 /// Registers an operation for readiness notification and returns `true` if it is now ready.
119 fn watch(&self, oper: Operation, cx: &Context) -> bool;
120
121 /// Unregisters an operation for readiness notification.
122 fn unwatch(&self, oper: Operation);
123}
124
125impl<T: SelectHandle> SelectHandle for &T {
126 fn try_select(&self, token: &mut Token) -> bool {
127 (**self).try_select(token)
128 }
129
130 fn deadline(&self) -> Option<Instant> {
131 (**self).deadline()
132 }
133
134 fn register(&self, oper: Operation, cx: &Context) -> bool {
135 (**self).register(oper, cx)
136 }
137
138 fn unregister(&self, oper: Operation) {
139 (**self).unregister(oper);
140 }
141
142 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
143 (**self).accept(token, cx)
144 }
145
146 fn is_ready(&self) -> bool {
147 (**self).is_ready()
148 }
149
150 fn watch(&self, oper: Operation, cx: &Context) -> bool {
151 (**self).watch(oper, cx)
152 }
153
154 fn unwatch(&self, oper: Operation) {
155 (**self).unwatch(oper)
156 }
157}
158
159/// Determines when a select operation should time out.
160#[derive(Clone, Copy, Eq, PartialEq)]
161enum Timeout {
162 /// No blocking.
163 Now,
164
165 /// Block forever.
166 Never,
167
168 /// Time out after the time instant.
169 At(Instant),
170}
171
172/// Runs until one of the operations is selected, potentially blocking the current thread.
173///
174/// Successful receive operations will have to be followed up by `channel::read()` and successful
175/// send operations by `channel::write()`.
176fn run_select(
177 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
178 timeout: Timeout,
179) -> Option<(Token, usize, *const u8)> {
180 if handles.is_empty() {
181 // Wait until the timeout and return.
182 match timeout {
183 Timeout::Now => return None,
184 Timeout::Never => {
185 utils::sleep_until(None);
186 unreachable!();
187 }
188 Timeout::At(when) => {
189 utils::sleep_until(Some(when));
190 return None;
191 }
192 }
193 }
194
195 // Shuffle the operations for fairness.
196 utils::shuffle(handles);
197
198 // Create a token, which serves as a temporary variable that gets initialized in this function
199 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
200 // selected operation.
201 let mut token = Token::default();
202
203 // Try selecting one of the operations without blocking.
204 for &(handle, i, ptr) in handles.iter() {
205 if handle.try_select(&mut token) {
206 return Some((token, i, ptr));
207 }
208 }
209
210 loop {
211 // Prepare for blocking.
212 let res = Context::with(|cx| {
213 let mut sel = Selected::Waiting;
214 let mut registered_count = 0;
215 let mut index_ready = None;
216
217 if let Timeout::Now = timeout {
218 cx.try_select(Selected::Aborted).unwrap();
219 }
220
221 // Register all operations.
222 for (handle, i, _) in handles.iter_mut() {
223 registered_count += 1;
224
225 // If registration returns `false`, that means the operation has just become ready.
226 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
227 // Try aborting select.
228 sel = match cx.try_select(Selected::Aborted) {
229 Ok(()) => {
230 index_ready = Some(*i);
231 Selected::Aborted
232 }
233 Err(s) => s,
234 };
235 break;
236 }
237
238 // If another thread has already selected one of the operations, stop registration.
239 sel = cx.selected();
240 if sel != Selected::Waiting {
241 break;
242 }
243 }
244
245 if sel == Selected::Waiting {
246 // Check with each operation for how long we're allowed to block, and compute the
247 // earliest deadline.
248 let mut deadline: Option<Instant> = match timeout {
249 Timeout::Now => return None,
250 Timeout::Never => None,
251 Timeout::At(when) => Some(when),
252 };
253 for &(handle, _, _) in handles.iter() {
254 if let Some(x) = handle.deadline() {
255 deadline = deadline.map(|y| x.min(y)).or(Some(x));
256 }
257 }
258
259 // Block the current thread.
260 sel = cx.wait_until(deadline);
261 }
262
263 // Unregister all registered operations.
264 for (handle, _, _) in handles.iter_mut().take(registered_count) {
265 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
266 }
267
268 match sel {
269 Selected::Waiting => unreachable!(),
270 Selected::Aborted => {
271 // If an operation became ready during registration, try selecting it.
272 if let Some(index_ready) = index_ready {
273 for &(handle, i, ptr) in handles.iter() {
274 if i == index_ready && handle.try_select(&mut token) {
275 return Some((i, ptr));
276 }
277 }
278 }
279 }
280 Selected::Disconnected => {}
281 Selected::Operation(_) => {
282 // Find the selected operation.
283 for (handle, i, ptr) in handles.iter_mut() {
284 // Is this the selected operation?
285 if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
286 {
287 // Try selecting this operation.
288 if handle.accept(&mut token, cx) {
289 return Some((*i, *ptr));
290 }
291 }
292 }
293 }
294 }
295
296 None
297 });
298
299 // Return if an operation was selected.
300 if let Some((i, ptr)) = res {
301 return Some((token, i, ptr));
302 }
303
304 // Try selecting one of the operations without blocking.
305 for &(handle, i, ptr) in handles.iter() {
306 if handle.try_select(&mut token) {
307 return Some((token, i, ptr));
308 }
309 }
310
311 match timeout {
312 Timeout::Now => return None,
313 Timeout::Never => {}
314 Timeout::At(when) => {
315 if Instant::now() >= when {
316 return None;
317 }
318 }
319 }
320 }
321}
322
323/// Runs until one of the operations becomes ready, potentially blocking the current thread.
324fn run_ready(
325 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
326 timeout: Timeout,
327) -> Option<usize> {
328 if handles.is_empty() {
329 // Wait until the timeout and return.
330 match timeout {
331 Timeout::Now => return None,
332 Timeout::Never => {
333 utils::sleep_until(None);
334 unreachable!();
335 }
336 Timeout::At(when) => {
337 utils::sleep_until(Some(when));
338 return None;
339 }
340 }
341 }
342
343 // Shuffle the operations for fairness.
344 utils::shuffle(handles);
345
346 loop {
347 let backoff = Backoff::new();
348 loop {
349 // Check operations for readiness.
350 for &(handle, i, _) in handles.iter() {
351 if handle.is_ready() {
352 return Some(i);
353 }
354 }
355
356 if backoff.is_completed() {
357 break;
358 } else {
359 backoff.snooze();
360 }
361 }
362
363 // Check for timeout.
364 match timeout {
365 Timeout::Now => return None,
366 Timeout::Never => {}
367 Timeout::At(when) => {
368 if Instant::now() >= when {
369 return None;
370 }
371 }
372 }
373
374 // Prepare for blocking.
375 let res = Context::with(|cx| {
376 let mut sel = Selected::Waiting;
377 let mut registered_count = 0;
378
379 // Begin watching all operations.
380 for (handle, _, _) in handles.iter_mut() {
381 registered_count += 1;
382 let oper = Operation::hook::<&dyn SelectHandle>(handle);
383
384 // If registration returns `false`, that means the operation has just become ready.
385 if handle.watch(oper, cx) {
386 sel = match cx.try_select(Selected::Operation(oper)) {
387 Ok(()) => Selected::Operation(oper),
388 Err(s) => s,
389 };
390 break;
391 }
392
393 // If another thread has already chosen one of the operations, stop registration.
394 sel = cx.selected();
395 if sel != Selected::Waiting {
396 break;
397 }
398 }
399
400 if sel == Selected::Waiting {
401 // Check with each operation for how long we're allowed to block, and compute the
402 // earliest deadline.
403 let mut deadline: Option<Instant> = match timeout {
404 Timeout::Now => unreachable!(),
405 Timeout::Never => None,
406 Timeout::At(when) => Some(when),
407 };
408 for &(handle, _, _) in handles.iter() {
409 if let Some(x) = handle.deadline() {
410 deadline = deadline.map(|y| x.min(y)).or(Some(x));
411 }
412 }
413
414 // Block the current thread.
415 sel = cx.wait_until(deadline);
416 }
417
418 // Unwatch all operations.
419 for (handle, _, _) in handles.iter_mut().take(registered_count) {
420 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
421 }
422
423 match sel {
424 Selected::Waiting => unreachable!(),
425 Selected::Aborted => {}
426 Selected::Disconnected => {}
427 Selected::Operation(_) => {
428 for (handle, i, _) in handles.iter_mut() {
429 let oper = Operation::hook::<&dyn SelectHandle>(handle);
430 if sel == Selected::Operation(oper) {
431 return Some(*i);
432 }
433 }
434 }
435 }
436
437 None
438 });
439
440 // Return if an operation became ready.
441 if res.is_some() {
442 return res;
443 }
444 }
445}
446
447/// Attempts to select one of the operations without blocking.
448// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
449#[inline]
450pub fn try_select<'a>(
451 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
452) -> Result<SelectedOperation<'a>, TrySelectError> {
453 match run_select(handles, Timeout::Now) {
454 None => Err(TrySelectError),
455 Some((token: Token, index: usize, ptr: *const u8)) => Ok(SelectedOperation {
456 token,
457 index,
458 ptr,
459 _marker: PhantomData,
460 }),
461 }
462}
463
464/// Blocks until one of the operations becomes ready and selects it.
465// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
466#[inline]
467pub fn select<'a>(
468 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
469) -> SelectedOperation<'a> {
470 if handles.is_empty() {
471 panic!("no operations have been added to `Select`");
472 }
473
474 let (token: Token, index: usize, ptr: *const u8) = run_select(handles, Timeout::Never).unwrap();
475 SelectedOperation {
476 token,
477 index,
478 ptr,
479 _marker: PhantomData,
480 }
481}
482
483/// Blocks for a limited time until one of the operations becomes ready and selects it.
484// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
485#[inline]
486pub fn select_timeout<'a>(
487 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
488 timeout: Duration,
489) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
490 match Instant::now().checked_add(duration:timeout) {
491 Some(deadline: Instant) => select_deadline(handles, deadline),
492 None => Ok(select(handles)),
493 }
494}
495
496/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
497#[inline]
498pub(crate) fn select_deadline<'a>(
499 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
500 deadline: Instant,
501) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
502 match run_select(handles, Timeout::At(deadline)) {
503 None => Err(SelectTimeoutError),
504 Some((token: Token, index: usize, ptr: *const u8)) => Ok(SelectedOperation {
505 token,
506 index,
507 ptr,
508 _marker: PhantomData,
509 }),
510 }
511}
512
513/// Selects from a set of channel operations.
514///
515/// `Select` allows you to define a set of channel operations, wait until any one of them becomes
516/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
517/// among them is selected.
518///
519/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
520/// when it will simply return an error because the channel is disconnected.
521///
522/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
523/// dynamically created list of channel operations.
524///
525/// [`select!`]: crate::select!
526///
527/// Once a list of operations has been built with `Select`, there are two different ways of
528/// proceeding:
529///
530/// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
531/// the returned selected operation has already begun and **must** be completed. If we don't
532/// complete it, a panic will occur.
533///
534/// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
535/// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
536/// possible for another thread to make the operation not ready just before we try executing it,
537/// so it's wise to use a retry loop. However, note that these methods might return with success
538/// spuriously, so it's a good idea to always double check if the operation is really ready.
539///
540/// # Examples
541///
542/// Use [`select`] to receive a message from a list of receivers:
543///
544/// ```
545/// use crossbeam_channel::{Receiver, RecvError, Select};
546///
547/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
548/// // Build a list of operations.
549/// let mut sel = Select::new();
550/// for r in rs {
551/// sel.recv(r);
552/// }
553///
554/// // Complete the selected operation.
555/// let oper = sel.select();
556/// let index = oper.index();
557/// oper.recv(&rs[index])
558/// }
559/// ```
560///
561/// Use [`ready`] to receive a message from a list of receivers:
562///
563/// ```
564/// use crossbeam_channel::{Receiver, RecvError, Select};
565///
566/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
567/// // Build a list of operations.
568/// let mut sel = Select::new();
569/// for r in rs {
570/// sel.recv(r);
571/// }
572///
573/// loop {
574/// // Wait until a receive operation becomes ready and try executing it.
575/// let index = sel.ready();
576/// let res = rs[index].try_recv();
577///
578/// // If the operation turns out not to be ready, retry.
579/// if let Err(e) = res {
580/// if e.is_empty() {
581/// continue;
582/// }
583/// }
584///
585/// // Success!
586/// return res.map_err(|_| RecvError);
587/// }
588/// }
589/// ```
590///
591/// [`try_select`]: Select::try_select
592/// [`select`]: Select::select
593/// [`select_timeout`]: Select::select_timeout
594/// [`try_ready`]: Select::try_ready
595/// [`ready`]: Select::ready
596/// [`ready_timeout`]: Select::ready_timeout
597pub struct Select<'a> {
598 /// A list of senders and receivers participating in selection.
599 handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
600
601 /// The next index to assign to an operation.
602 next_index: usize,
603}
604
605unsafe impl Send for Select<'_> {}
606unsafe impl Sync for Select<'_> {}
607
608impl<'a> Select<'a> {
609 /// Creates an empty list of channel operations for selection.
610 ///
611 /// # Examples
612 ///
613 /// ```
614 /// use crossbeam_channel::Select;
615 ///
616 /// let mut sel = Select::new();
617 ///
618 /// // The list of operations is empty, which means no operation can be selected.
619 /// assert!(sel.try_select().is_err());
620 /// ```
621 pub fn new() -> Select<'a> {
622 Select {
623 handles: Vec::with_capacity(4),
624 next_index: 0,
625 }
626 }
627
628 /// Adds a send operation.
629 ///
630 /// Returns the index of the added operation.
631 ///
632 /// # Examples
633 ///
634 /// ```
635 /// use crossbeam_channel::{unbounded, Select};
636 ///
637 /// let (s, r) = unbounded::<i32>();
638 ///
639 /// let mut sel = Select::new();
640 /// let index = sel.send(&s);
641 /// ```
642 pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
643 let i = self.next_index;
644 let ptr = s as *const Sender<_> as *const u8;
645 self.handles.push((s, i, ptr));
646 self.next_index += 1;
647 i
648 }
649
650 /// Adds a receive operation.
651 ///
652 /// Returns the index of the added operation.
653 ///
654 /// # Examples
655 ///
656 /// ```
657 /// use crossbeam_channel::{unbounded, Select};
658 ///
659 /// let (s, r) = unbounded::<i32>();
660 ///
661 /// let mut sel = Select::new();
662 /// let index = sel.recv(&r);
663 /// ```
664 pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
665 let i = self.next_index;
666 let ptr = r as *const Receiver<_> as *const u8;
667 self.handles.push((r, i, ptr));
668 self.next_index += 1;
669 i
670 }
671
672 /// Removes a previously added operation.
673 ///
674 /// This is useful when an operation is selected because the channel got disconnected and we
675 /// want to try again to select a different operation instead.
676 ///
677 /// If new operations are added after removing some, the indices of removed operations will not
678 /// be reused.
679 ///
680 /// # Panics
681 ///
682 /// An attempt to remove a non-existing or already removed operation will panic.
683 ///
684 /// # Examples
685 ///
686 /// ```
687 /// use crossbeam_channel::{unbounded, Select};
688 ///
689 /// let (s1, r1) = unbounded::<i32>();
690 /// let (_, r2) = unbounded::<i32>();
691 ///
692 /// let mut sel = Select::new();
693 /// let oper1 = sel.recv(&r1);
694 /// let oper2 = sel.recv(&r2);
695 ///
696 /// // Both operations are initially ready, so a random one will be executed.
697 /// let oper = sel.select();
698 /// assert_eq!(oper.index(), oper2);
699 /// assert!(oper.recv(&r2).is_err());
700 /// sel.remove(oper2);
701 ///
702 /// s1.send(10).unwrap();
703 ///
704 /// let oper = sel.select();
705 /// assert_eq!(oper.index(), oper1);
706 /// assert_eq!(oper.recv(&r1), Ok(10));
707 /// ```
708 pub fn remove(&mut self, index: usize) {
709 assert!(
710 index < self.next_index,
711 "index out of bounds; {} >= {}",
712 index,
713 self.next_index,
714 );
715
716 let i = self
717 .handles
718 .iter()
719 .enumerate()
720 .find(|(_, (_, i, _))| *i == index)
721 .expect("no operation with this index")
722 .0;
723
724 self.handles.swap_remove(i);
725 }
726
727 /// Attempts to select one of the operations without blocking.
728 ///
729 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
730 /// the same time, a random one among them is selected. If none of the operations are ready, an
731 /// error is returned.
732 ///
733 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
734 /// even when it will simply return an error because the channel is disconnected.
735 ///
736 /// The selected operation must be completed with [`SelectedOperation::send`]
737 /// or [`SelectedOperation::recv`].
738 ///
739 /// # Examples
740 ///
741 /// ```
742 /// use crossbeam_channel::{unbounded, Select};
743 ///
744 /// let (s1, r1) = unbounded();
745 /// let (s2, r2) = unbounded();
746 ///
747 /// s1.send(10).unwrap();
748 /// s2.send(20).unwrap();
749 ///
750 /// let mut sel = Select::new();
751 /// let oper1 = sel.recv(&r1);
752 /// let oper2 = sel.recv(&r2);
753 ///
754 /// // Both operations are initially ready, so a random one will be executed.
755 /// let oper = sel.try_select();
756 /// match oper {
757 /// Err(_) => panic!("both operations should be ready"),
758 /// Ok(oper) => match oper.index() {
759 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
760 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
761 /// _ => unreachable!(),
762 /// }
763 /// }
764 /// ```
765 pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
766 try_select(&mut self.handles)
767 }
768
769 /// Blocks until one of the operations becomes ready and selects it.
770 ///
771 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
772 /// ready at the same time, a random one among them is selected.
773 ///
774 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
775 /// even when it will simply return an error because the channel is disconnected.
776 ///
777 /// The selected operation must be completed with [`SelectedOperation::send`]
778 /// or [`SelectedOperation::recv`].
779 ///
780 /// # Panics
781 ///
782 /// Panics if no operations have been added to `Select`.
783 ///
784 /// # Examples
785 ///
786 /// ```
787 /// use std::thread;
788 /// use std::time::Duration;
789 /// use crossbeam_channel::{unbounded, Select};
790 ///
791 /// let (s1, r1) = unbounded();
792 /// let (s2, r2) = unbounded();
793 ///
794 /// thread::spawn(move || {
795 /// thread::sleep(Duration::from_secs(1));
796 /// s1.send(10).unwrap();
797 /// });
798 /// thread::spawn(move || s2.send(20).unwrap());
799 ///
800 /// let mut sel = Select::new();
801 /// let oper1 = sel.recv(&r1);
802 /// let oper2 = sel.recv(&r2);
803 ///
804 /// // The second operation will be selected because it becomes ready first.
805 /// let oper = sel.select();
806 /// match oper.index() {
807 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
808 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
809 /// _ => unreachable!(),
810 /// }
811 /// ```
812 pub fn select(&mut self) -> SelectedOperation<'a> {
813 select(&mut self.handles)
814 }
815
816 /// Blocks for a limited time until one of the operations becomes ready and selects it.
817 ///
818 /// If an operation becomes ready, it is selected and returned. If multiple operations are
819 /// ready at the same time, a random one among them is selected. If none of the operations
820 /// become ready for the specified duration, an error is returned.
821 ///
822 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
823 /// even when it will simply return an error because the channel is disconnected.
824 ///
825 /// The selected operation must be completed with [`SelectedOperation::send`]
826 /// or [`SelectedOperation::recv`].
827 ///
828 /// # Examples
829 ///
830 /// ```
831 /// use std::thread;
832 /// use std::time::Duration;
833 /// use crossbeam_channel::{unbounded, Select};
834 ///
835 /// let (s1, r1) = unbounded();
836 /// let (s2, r2) = unbounded();
837 ///
838 /// thread::spawn(move || {
839 /// thread::sleep(Duration::from_secs(1));
840 /// s1.send(10).unwrap();
841 /// });
842 /// thread::spawn(move || s2.send(20).unwrap());
843 ///
844 /// let mut sel = Select::new();
845 /// let oper1 = sel.recv(&r1);
846 /// let oper2 = sel.recv(&r2);
847 ///
848 /// // The second operation will be selected because it becomes ready first.
849 /// let oper = sel.select_timeout(Duration::from_millis(500));
850 /// match oper {
851 /// Err(_) => panic!("should not have timed out"),
852 /// Ok(oper) => match oper.index() {
853 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
854 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
855 /// _ => unreachable!(),
856 /// }
857 /// }
858 /// ```
859 pub fn select_timeout(
860 &mut self,
861 timeout: Duration,
862 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
863 select_timeout(&mut self.handles, timeout)
864 }
865
866 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
867 ///
868 /// If an operation becomes ready, it is selected and returned. If multiple operations are
869 /// ready at the same time, a random one among them is selected. If none of the operations
870 /// become ready before the given deadline, an error is returned.
871 ///
872 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
873 /// even when it will simply return an error because the channel is disconnected.
874 ///
875 /// The selected operation must be completed with [`SelectedOperation::send`]
876 /// or [`SelectedOperation::recv`].
877 ///
878 /// # Examples
879 ///
880 /// ```
881 /// use std::thread;
882 /// use std::time::{Instant, Duration};
883 /// use crossbeam_channel::{unbounded, Select};
884 ///
885 /// let (s1, r1) = unbounded();
886 /// let (s2, r2) = unbounded();
887 ///
888 /// thread::spawn(move || {
889 /// thread::sleep(Duration::from_secs(1));
890 /// s1.send(10).unwrap();
891 /// });
892 /// thread::spawn(move || s2.send(20).unwrap());
893 ///
894 /// let mut sel = Select::new();
895 /// let oper1 = sel.recv(&r1);
896 /// let oper2 = sel.recv(&r2);
897 ///
898 /// let deadline = Instant::now() + Duration::from_millis(500);
899 ///
900 /// // The second operation will be selected because it becomes ready first.
901 /// let oper = sel.select_deadline(deadline);
902 /// match oper {
903 /// Err(_) => panic!("should not have timed out"),
904 /// Ok(oper) => match oper.index() {
905 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
906 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
907 /// _ => unreachable!(),
908 /// }
909 /// }
910 /// ```
911 pub fn select_deadline(
912 &mut self,
913 deadline: Instant,
914 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
915 select_deadline(&mut self.handles, deadline)
916 }
917
918 /// Attempts to find a ready operation without blocking.
919 ///
920 /// If an operation is ready, its index is returned. If multiple operations are ready at the
921 /// same time, a random one among them is chosen. If none of the operations are ready, an error
922 /// is returned.
923 ///
924 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
925 /// even when it will simply return an error because the channel is disconnected.
926 ///
927 /// Note that this method might return with success spuriously, so it's a good idea to always
928 /// double check if the operation is really ready.
929 ///
930 /// # Examples
931 ///
932 /// ```
933 /// use crossbeam_channel::{unbounded, Select};
934 ///
935 /// let (s1, r1) = unbounded();
936 /// let (s2, r2) = unbounded();
937 ///
938 /// s1.send(10).unwrap();
939 /// s2.send(20).unwrap();
940 ///
941 /// let mut sel = Select::new();
942 /// let oper1 = sel.recv(&r1);
943 /// let oper2 = sel.recv(&r2);
944 ///
945 /// // Both operations are initially ready, so a random one will be chosen.
946 /// match sel.try_ready() {
947 /// Err(_) => panic!("both operations should be ready"),
948 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
949 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
950 /// Ok(_) => unreachable!(),
951 /// }
952 /// ```
953 pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
954 match run_ready(&mut self.handles, Timeout::Now) {
955 None => Err(TryReadyError),
956 Some(index) => Ok(index),
957 }
958 }
959
960 /// Blocks until one of the operations becomes ready.
961 ///
962 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
963 /// the same time, a random one among them is chosen.
964 ///
965 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
966 /// even when it will simply return an error because the channel is disconnected.
967 ///
968 /// Note that this method might return with success spuriously, so it's a good idea to always
969 /// double check if the operation is really ready.
970 ///
971 /// # Panics
972 ///
973 /// Panics if no operations have been added to `Select`.
974 ///
975 /// # Examples
976 ///
977 /// ```
978 /// use std::thread;
979 /// use std::time::Duration;
980 /// use crossbeam_channel::{unbounded, Select};
981 ///
982 /// let (s1, r1) = unbounded();
983 /// let (s2, r2) = unbounded();
984 ///
985 /// thread::spawn(move || {
986 /// thread::sleep(Duration::from_secs(1));
987 /// s1.send(10).unwrap();
988 /// });
989 /// thread::spawn(move || s2.send(20).unwrap());
990 ///
991 /// let mut sel = Select::new();
992 /// let oper1 = sel.recv(&r1);
993 /// let oper2 = sel.recv(&r2);
994 ///
995 /// // The second operation will be selected because it becomes ready first.
996 /// match sel.ready() {
997 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
998 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
999 /// _ => unreachable!(),
1000 /// }
1001 /// ```
1002 pub fn ready(&mut self) -> usize {
1003 if self.handles.is_empty() {
1004 panic!("no operations have been added to `Select`");
1005 }
1006
1007 run_ready(&mut self.handles, Timeout::Never).unwrap()
1008 }
1009
1010 /// Blocks for a limited time until one of the operations becomes ready.
1011 ///
1012 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1013 /// the same time, a random one among them is chosen. If none of the operations become ready
1014 /// for the specified duration, an error is returned.
1015 ///
1016 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1017 /// even when it will simply return an error because the channel is disconnected.
1018 ///
1019 /// Note that this method might return with success spuriously, so it's a good idea to double
1020 /// check if the operation is really ready.
1021 ///
1022 /// # Examples
1023 ///
1024 /// ```
1025 /// use std::thread;
1026 /// use std::time::Duration;
1027 /// use crossbeam_channel::{unbounded, Select};
1028 ///
1029 /// let (s1, r1) = unbounded();
1030 /// let (s2, r2) = unbounded();
1031 ///
1032 /// thread::spawn(move || {
1033 /// thread::sleep(Duration::from_secs(1));
1034 /// s1.send(10).unwrap();
1035 /// });
1036 /// thread::spawn(move || s2.send(20).unwrap());
1037 ///
1038 /// let mut sel = Select::new();
1039 /// let oper1 = sel.recv(&r1);
1040 /// let oper2 = sel.recv(&r2);
1041 ///
1042 /// // The second operation will be selected because it becomes ready first.
1043 /// match sel.ready_timeout(Duration::from_millis(500)) {
1044 /// Err(_) => panic!("should not have timed out"),
1045 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1046 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1047 /// Ok(_) => unreachable!(),
1048 /// }
1049 /// ```
1050 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1051 match Instant::now().checked_add(timeout) {
1052 Some(deadline) => self.ready_deadline(deadline),
1053 None => Ok(self.ready()),
1054 }
1055 }
1056
1057 /// Blocks until a given deadline, or until one of the operations becomes ready.
1058 ///
1059 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1060 /// the same time, a random one among them is chosen. If none of the operations become ready
1061 /// before the deadline, an error is returned.
1062 ///
1063 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1064 /// even when it will simply return an error because the channel is disconnected.
1065 ///
1066 /// Note that this method might return with success spuriously, so it's a good idea to double
1067 /// check if the operation is really ready.
1068 ///
1069 /// # Examples
1070 ///
1071 /// ```
1072 /// use std::thread;
1073 /// use std::time::{Duration, Instant};
1074 /// use crossbeam_channel::{unbounded, Select};
1075 ///
1076 /// let deadline = Instant::now() + Duration::from_millis(500);
1077 ///
1078 /// let (s1, r1) = unbounded();
1079 /// let (s2, r2) = unbounded();
1080 ///
1081 /// thread::spawn(move || {
1082 /// thread::sleep(Duration::from_secs(1));
1083 /// s1.send(10).unwrap();
1084 /// });
1085 /// thread::spawn(move || s2.send(20).unwrap());
1086 ///
1087 /// let mut sel = Select::new();
1088 /// let oper1 = sel.recv(&r1);
1089 /// let oper2 = sel.recv(&r2);
1090 ///
1091 /// // The second operation will be selected because it becomes ready first.
1092 /// match sel.ready_deadline(deadline) {
1093 /// Err(_) => panic!("should not have timed out"),
1094 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1095 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1096 /// Ok(_) => unreachable!(),
1097 /// }
1098 /// ```
1099 pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1100 match run_ready(&mut self.handles, Timeout::At(deadline)) {
1101 None => Err(ReadyTimeoutError),
1102 Some(index) => Ok(index),
1103 }
1104 }
1105}
1106
1107impl<'a> Clone for Select<'a> {
1108 fn clone(&self) -> Select<'a> {
1109 Select {
1110 handles: self.handles.clone(),
1111 next_index: self.next_index,
1112 }
1113 }
1114}
1115
1116impl<'a> Default for Select<'a> {
1117 fn default() -> Select<'a> {
1118 Select::new()
1119 }
1120}
1121
1122impl fmt::Debug for Select<'_> {
1123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1124 f.pad("Select { .. }")
1125 }
1126}
1127
1128/// A selected operation that needs to be completed.
1129///
1130/// To complete the operation, call [`send`] or [`recv`].
1131///
1132/// # Panics
1133///
1134/// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1135/// `SelectedOperation` is dropped without completion, a panic occurs.
1136///
1137/// [`send`]: SelectedOperation::send
1138/// [`recv`]: SelectedOperation::recv
1139#[must_use]
1140pub struct SelectedOperation<'a> {
1141 /// Token needed to complete the operation.
1142 token: Token,
1143
1144 /// The index of the selected operation.
1145 index: usize,
1146
1147 /// The address of the selected `Sender` or `Receiver`.
1148 ptr: *const u8,
1149
1150 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1151 _marker: PhantomData<&'a ()>,
1152}
1153
1154impl SelectedOperation<'_> {
1155 /// Returns the index of the selected operation.
1156 ///
1157 /// # Examples
1158 ///
1159 /// ```
1160 /// use crossbeam_channel::{bounded, Select};
1161 ///
1162 /// let (s1, r1) = bounded::<()>(0);
1163 /// let (s2, r2) = bounded::<()>(0);
1164 /// let (s3, r3) = bounded::<()>(1);
1165 ///
1166 /// let mut sel = Select::new();
1167 /// let oper1 = sel.send(&s1);
1168 /// let oper2 = sel.recv(&r2);
1169 /// let oper3 = sel.send(&s3);
1170 ///
1171 /// // Only the last operation is ready.
1172 /// let oper = sel.select();
1173 /// assert_eq!(oper.index(), 2);
1174 /// assert_eq!(oper.index(), oper3);
1175 ///
1176 /// // Complete the operation.
1177 /// oper.send(&s3, ()).unwrap();
1178 /// ```
1179 pub fn index(&self) -> usize {
1180 self.index
1181 }
1182
1183 /// Completes the send operation.
1184 ///
1185 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1186 /// when the operation was added.
1187 ///
1188 /// # Panics
1189 ///
1190 /// Panics if an incorrect [`Sender`] reference is passed.
1191 ///
1192 /// # Examples
1193 ///
1194 /// ```
1195 /// use crossbeam_channel::{bounded, Select, SendError};
1196 ///
1197 /// let (s, r) = bounded::<i32>(0);
1198 /// drop(r);
1199 ///
1200 /// let mut sel = Select::new();
1201 /// let oper1 = sel.send(&s);
1202 ///
1203 /// let oper = sel.select();
1204 /// assert_eq!(oper.index(), oper1);
1205 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1206 /// ```
1207 pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1208 assert!(
1209 s as *const Sender<T> as *const u8 == self.ptr,
1210 "passed a sender that wasn't selected",
1211 );
1212 let res = unsafe { channel::write(s, &mut self.token, msg) };
1213 mem::forget(self);
1214 res.map_err(SendError)
1215 }
1216
1217 /// Completes the receive operation.
1218 ///
1219 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1220 /// when the operation was added.
1221 ///
1222 /// # Panics
1223 ///
1224 /// Panics if an incorrect [`Receiver`] reference is passed.
1225 ///
1226 /// # Examples
1227 ///
1228 /// ```
1229 /// use crossbeam_channel::{bounded, Select, RecvError};
1230 ///
1231 /// let (s, r) = bounded::<i32>(0);
1232 /// drop(s);
1233 ///
1234 /// let mut sel = Select::new();
1235 /// let oper1 = sel.recv(&r);
1236 ///
1237 /// let oper = sel.select();
1238 /// assert_eq!(oper.index(), oper1);
1239 /// assert_eq!(oper.recv(&r), Err(RecvError));
1240 /// ```
1241 pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1242 assert!(
1243 r as *const Receiver<T> as *const u8 == self.ptr,
1244 "passed a receiver that wasn't selected",
1245 );
1246 let res = unsafe { channel::read(r, &mut self.token) };
1247 mem::forget(self);
1248 res.map_err(|_| RecvError)
1249 }
1250}
1251
1252impl fmt::Debug for SelectedOperation<'_> {
1253 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1254 f.pad("SelectedOperation { .. }")
1255 }
1256}
1257
1258impl Drop for SelectedOperation<'_> {
1259 fn drop(&mut self) {
1260 panic!("dropped `SelectedOperation` without completing the operation");
1261 }
1262}
1263