1 | //! Interface to the select mechanism. |
2 | |
3 | use std::fmt; |
4 | use std::marker::PhantomData; |
5 | use std::mem; |
6 | use std::time::{Duration, Instant}; |
7 | |
8 | use crossbeam_utils::Backoff; |
9 | |
10 | use crate::channel::{self, Receiver, Sender}; |
11 | use crate::context::Context; |
12 | use crate::err::{ReadyTimeoutError, TryReadyError}; |
13 | use crate::err::{RecvError, SendError}; |
14 | use crate::err::{SelectTimeoutError, TrySelectError}; |
15 | use crate::flavors; |
16 | use crate::utils; |
17 | |
18 | /// Temporary data that gets initialized during select or a blocking operation, and is consumed by |
19 | /// `read` or `write`. |
20 | /// |
21 | /// Each field contains data associated with a specific channel flavor. |
22 | // This is a private API that is used by the select macro. |
23 | #[derive (Debug, Default)] |
24 | pub struct Token { |
25 | pub(crate) at: flavors::at::AtToken, |
26 | pub(crate) array: flavors::array::ArrayToken, |
27 | pub(crate) list: flavors::list::ListToken, |
28 | #[allow (dead_code)] |
29 | pub(crate) never: flavors::never::NeverToken, |
30 | pub(crate) tick: flavors::tick::TickToken, |
31 | pub(crate) zero: flavors::zero::ZeroToken, |
32 | } |
33 | |
34 | /// Identifier associated with an operation by a specific thread on a specific channel. |
35 | #[derive (Debug, Clone, Copy, PartialEq, Eq)] |
36 | pub struct Operation(usize); |
37 | |
38 | impl Operation { |
39 | /// Creates an operation identifier from a mutable reference. |
40 | /// |
41 | /// This function essentially just turns the address of the reference into a number. The |
42 | /// reference should point to a variable that is specific to the thread and the operation, |
43 | /// and is alive for the entire duration of select or blocking operation. |
44 | #[inline ] |
45 | pub fn hook<T>(r: &mut T) -> Operation { |
46 | let val: usize = r as *mut T as usize; |
47 | // Make sure that the pointer address doesn't equal the numerical representation of |
48 | // `Selected::{Waiting, Aborted, Disconnected}`. |
49 | assert!(val > 2); |
50 | Operation(val) |
51 | } |
52 | } |
53 | |
54 | /// Current state of a select or a blocking operation. |
55 | #[derive (Debug, Clone, Copy, PartialEq, Eq)] |
56 | pub enum Selected { |
57 | /// Still waiting for an operation. |
58 | Waiting, |
59 | |
60 | /// The attempt to block the current thread has been aborted. |
61 | Aborted, |
62 | |
63 | /// An operation became ready because a channel is disconnected. |
64 | Disconnected, |
65 | |
66 | /// An operation became ready because a message can be sent or received. |
67 | Operation(Operation), |
68 | } |
69 | |
70 | impl From<usize> for Selected { |
71 | #[inline ] |
72 | fn from(val: usize) -> Selected { |
73 | match val { |
74 | 0 => Selected::Waiting, |
75 | 1 => Selected::Aborted, |
76 | 2 => Selected::Disconnected, |
77 | oper: usize => Selected::Operation(Operation(oper)), |
78 | } |
79 | } |
80 | } |
81 | |
82 | impl Into<usize> for Selected { |
83 | #[inline ] |
84 | fn into(self) -> usize { |
85 | match self { |
86 | Selected::Waiting => 0, |
87 | Selected::Aborted => 1, |
88 | Selected::Disconnected => 2, |
89 | Selected::Operation(Operation(val: usize)) => val, |
90 | } |
91 | } |
92 | } |
93 | |
94 | /// A receiver or a sender that can participate in select. |
95 | /// |
96 | /// This is a handle that assists select in executing an operation, registration, deciding on the |
97 | /// appropriate deadline for blocking, etc. |
98 | // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. |
99 | pub trait SelectHandle { |
100 | /// Attempts to select an operation and returns `true` on success. |
101 | fn try_select(&self, token: &mut Token) -> bool; |
102 | |
103 | /// Returns a deadline for an operation, if there is one. |
104 | fn deadline(&self) -> Option<Instant>; |
105 | |
106 | /// Registers an operation for execution and returns `true` if it is now ready. |
107 | fn register(&self, oper: Operation, cx: &Context) -> bool; |
108 | |
109 | /// Unregisters an operation for execution. |
110 | fn unregister(&self, oper: Operation); |
111 | |
112 | /// Attempts to select an operation the thread got woken up for and returns `true` on success. |
113 | fn accept(&self, token: &mut Token, cx: &Context) -> bool; |
114 | |
115 | /// Returns `true` if an operation can be executed without blocking. |
116 | fn is_ready(&self) -> bool; |
117 | |
118 | /// Registers an operation for readiness notification and returns `true` if it is now ready. |
119 | fn watch(&self, oper: Operation, cx: &Context) -> bool; |
120 | |
121 | /// Unregisters an operation for readiness notification. |
122 | fn unwatch(&self, oper: Operation); |
123 | } |
124 | |
125 | impl<T: SelectHandle> SelectHandle for &T { |
126 | fn try_select(&self, token: &mut Token) -> bool { |
127 | (**self).try_select(token) |
128 | } |
129 | |
130 | fn deadline(&self) -> Option<Instant> { |
131 | (**self).deadline() |
132 | } |
133 | |
134 | fn register(&self, oper: Operation, cx: &Context) -> bool { |
135 | (**self).register(oper, cx) |
136 | } |
137 | |
138 | fn unregister(&self, oper: Operation) { |
139 | (**self).unregister(oper); |
140 | } |
141 | |
142 | fn accept(&self, token: &mut Token, cx: &Context) -> bool { |
143 | (**self).accept(token, cx) |
144 | } |
145 | |
146 | fn is_ready(&self) -> bool { |
147 | (**self).is_ready() |
148 | } |
149 | |
150 | fn watch(&self, oper: Operation, cx: &Context) -> bool { |
151 | (**self).watch(oper, cx) |
152 | } |
153 | |
154 | fn unwatch(&self, oper: Operation) { |
155 | (**self).unwatch(oper) |
156 | } |
157 | } |
158 | |
159 | /// Determines when a select operation should time out. |
160 | #[derive (Clone, Copy, Eq, PartialEq)] |
161 | enum Timeout { |
162 | /// No blocking. |
163 | Now, |
164 | |
165 | /// Block forever. |
166 | Never, |
167 | |
168 | /// Time out after the time instant. |
169 | At(Instant), |
170 | } |
171 | |
172 | /// Runs until one of the operations is selected, potentially blocking the current thread. |
173 | /// |
174 | /// Successful receive operations will have to be followed up by `channel::read()` and successful |
175 | /// send operations by `channel::write()`. |
176 | fn run_select( |
177 | handles: &mut [(&dyn SelectHandle, usize, *const u8)], |
178 | timeout: Timeout, |
179 | ) -> Option<(Token, usize, *const u8)> { |
180 | if handles.is_empty() { |
181 | // Wait until the timeout and return. |
182 | match timeout { |
183 | Timeout::Now => return None, |
184 | Timeout::Never => { |
185 | utils::sleep_until(None); |
186 | unreachable!(); |
187 | } |
188 | Timeout::At(when) => { |
189 | utils::sleep_until(Some(when)); |
190 | return None; |
191 | } |
192 | } |
193 | } |
194 | |
195 | // Shuffle the operations for fairness. |
196 | utils::shuffle(handles); |
197 | |
198 | // Create a token, which serves as a temporary variable that gets initialized in this function |
199 | // and is later used by a call to `channel::read()` or `channel::write()` that completes the |
200 | // selected operation. |
201 | let mut token = Token::default(); |
202 | |
203 | // Try selecting one of the operations without blocking. |
204 | for &(handle, i, ptr) in handles.iter() { |
205 | if handle.try_select(&mut token) { |
206 | return Some((token, i, ptr)); |
207 | } |
208 | } |
209 | |
210 | loop { |
211 | // Prepare for blocking. |
212 | let res = Context::with(|cx| { |
213 | let mut sel = Selected::Waiting; |
214 | let mut registered_count = 0; |
215 | let mut index_ready = None; |
216 | |
217 | if let Timeout::Now = timeout { |
218 | cx.try_select(Selected::Aborted).unwrap(); |
219 | } |
220 | |
221 | // Register all operations. |
222 | for (handle, i, _) in handles.iter_mut() { |
223 | registered_count += 1; |
224 | |
225 | // If registration returns `false`, that means the operation has just become ready. |
226 | if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) { |
227 | // Try aborting select. |
228 | sel = match cx.try_select(Selected::Aborted) { |
229 | Ok(()) => { |
230 | index_ready = Some(*i); |
231 | Selected::Aborted |
232 | } |
233 | Err(s) => s, |
234 | }; |
235 | break; |
236 | } |
237 | |
238 | // If another thread has already selected one of the operations, stop registration. |
239 | sel = cx.selected(); |
240 | if sel != Selected::Waiting { |
241 | break; |
242 | } |
243 | } |
244 | |
245 | if sel == Selected::Waiting { |
246 | // Check with each operation for how long we're allowed to block, and compute the |
247 | // earliest deadline. |
248 | let mut deadline: Option<Instant> = match timeout { |
249 | Timeout::Now => return None, |
250 | Timeout::Never => None, |
251 | Timeout::At(when) => Some(when), |
252 | }; |
253 | for &(handle, _, _) in handles.iter() { |
254 | if let Some(x) = handle.deadline() { |
255 | deadline = deadline.map(|y| x.min(y)).or(Some(x)); |
256 | } |
257 | } |
258 | |
259 | // Block the current thread. |
260 | sel = cx.wait_until(deadline); |
261 | } |
262 | |
263 | // Unregister all registered operations. |
264 | for (handle, _, _) in handles.iter_mut().take(registered_count) { |
265 | handle.unregister(Operation::hook::<&dyn SelectHandle>(handle)); |
266 | } |
267 | |
268 | match sel { |
269 | Selected::Waiting => unreachable!(), |
270 | Selected::Aborted => { |
271 | // If an operation became ready during registration, try selecting it. |
272 | if let Some(index_ready) = index_ready { |
273 | for &(handle, i, ptr) in handles.iter() { |
274 | if i == index_ready && handle.try_select(&mut token) { |
275 | return Some((i, ptr)); |
276 | } |
277 | } |
278 | } |
279 | } |
280 | Selected::Disconnected => {} |
281 | Selected::Operation(_) => { |
282 | // Find the selected operation. |
283 | for (handle, i, ptr) in handles.iter_mut() { |
284 | // Is this the selected operation? |
285 | if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle)) |
286 | { |
287 | // Try selecting this operation. |
288 | if handle.accept(&mut token, cx) { |
289 | return Some((*i, *ptr)); |
290 | } |
291 | } |
292 | } |
293 | } |
294 | } |
295 | |
296 | None |
297 | }); |
298 | |
299 | // Return if an operation was selected. |
300 | if let Some((i, ptr)) = res { |
301 | return Some((token, i, ptr)); |
302 | } |
303 | |
304 | // Try selecting one of the operations without blocking. |
305 | for &(handle, i, ptr) in handles.iter() { |
306 | if handle.try_select(&mut token) { |
307 | return Some((token, i, ptr)); |
308 | } |
309 | } |
310 | |
311 | match timeout { |
312 | Timeout::Now => return None, |
313 | Timeout::Never => {} |
314 | Timeout::At(when) => { |
315 | if Instant::now() >= when { |
316 | return None; |
317 | } |
318 | } |
319 | } |
320 | } |
321 | } |
322 | |
323 | /// Runs until one of the operations becomes ready, potentially blocking the current thread. |
324 | fn run_ready( |
325 | handles: &mut [(&dyn SelectHandle, usize, *const u8)], |
326 | timeout: Timeout, |
327 | ) -> Option<usize> { |
328 | if handles.is_empty() { |
329 | // Wait until the timeout and return. |
330 | match timeout { |
331 | Timeout::Now => return None, |
332 | Timeout::Never => { |
333 | utils::sleep_until(None); |
334 | unreachable!(); |
335 | } |
336 | Timeout::At(when) => { |
337 | utils::sleep_until(Some(when)); |
338 | return None; |
339 | } |
340 | } |
341 | } |
342 | |
343 | // Shuffle the operations for fairness. |
344 | utils::shuffle(handles); |
345 | |
346 | loop { |
347 | let backoff = Backoff::new(); |
348 | loop { |
349 | // Check operations for readiness. |
350 | for &(handle, i, _) in handles.iter() { |
351 | if handle.is_ready() { |
352 | return Some(i); |
353 | } |
354 | } |
355 | |
356 | if backoff.is_completed() { |
357 | break; |
358 | } else { |
359 | backoff.snooze(); |
360 | } |
361 | } |
362 | |
363 | // Check for timeout. |
364 | match timeout { |
365 | Timeout::Now => return None, |
366 | Timeout::Never => {} |
367 | Timeout::At(when) => { |
368 | if Instant::now() >= when { |
369 | return None; |
370 | } |
371 | } |
372 | } |
373 | |
374 | // Prepare for blocking. |
375 | let res = Context::with(|cx| { |
376 | let mut sel = Selected::Waiting; |
377 | let mut registered_count = 0; |
378 | |
379 | // Begin watching all operations. |
380 | for (handle, _, _) in handles.iter_mut() { |
381 | registered_count += 1; |
382 | let oper = Operation::hook::<&dyn SelectHandle>(handle); |
383 | |
384 | // If registration returns `false`, that means the operation has just become ready. |
385 | if handle.watch(oper, cx) { |
386 | sel = match cx.try_select(Selected::Operation(oper)) { |
387 | Ok(()) => Selected::Operation(oper), |
388 | Err(s) => s, |
389 | }; |
390 | break; |
391 | } |
392 | |
393 | // If another thread has already chosen one of the operations, stop registration. |
394 | sel = cx.selected(); |
395 | if sel != Selected::Waiting { |
396 | break; |
397 | } |
398 | } |
399 | |
400 | if sel == Selected::Waiting { |
401 | // Check with each operation for how long we're allowed to block, and compute the |
402 | // earliest deadline. |
403 | let mut deadline: Option<Instant> = match timeout { |
404 | Timeout::Now => unreachable!(), |
405 | Timeout::Never => None, |
406 | Timeout::At(when) => Some(when), |
407 | }; |
408 | for &(handle, _, _) in handles.iter() { |
409 | if let Some(x) = handle.deadline() { |
410 | deadline = deadline.map(|y| x.min(y)).or(Some(x)); |
411 | } |
412 | } |
413 | |
414 | // Block the current thread. |
415 | sel = cx.wait_until(deadline); |
416 | } |
417 | |
418 | // Unwatch all operations. |
419 | for (handle, _, _) in handles.iter_mut().take(registered_count) { |
420 | handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle)); |
421 | } |
422 | |
423 | match sel { |
424 | Selected::Waiting => unreachable!(), |
425 | Selected::Aborted => {} |
426 | Selected::Disconnected => {} |
427 | Selected::Operation(_) => { |
428 | for (handle, i, _) in handles.iter_mut() { |
429 | let oper = Operation::hook::<&dyn SelectHandle>(handle); |
430 | if sel == Selected::Operation(oper) { |
431 | return Some(*i); |
432 | } |
433 | } |
434 | } |
435 | } |
436 | |
437 | None |
438 | }); |
439 | |
440 | // Return if an operation became ready. |
441 | if res.is_some() { |
442 | return res; |
443 | } |
444 | } |
445 | } |
446 | |
447 | /// Attempts to select one of the operations without blocking. |
448 | // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. |
449 | #[inline ] |
450 | pub fn try_select<'a>( |
451 | handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], |
452 | ) -> Result<SelectedOperation<'a>, TrySelectError> { |
453 | match run_select(handles, Timeout::Now) { |
454 | None => Err(TrySelectError), |
455 | Some((token: Token, index: usize, ptr: *const u8)) => Ok(SelectedOperation { |
456 | token, |
457 | index, |
458 | ptr, |
459 | _marker: PhantomData, |
460 | }), |
461 | } |
462 | } |
463 | |
464 | /// Blocks until one of the operations becomes ready and selects it. |
465 | // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. |
466 | #[inline ] |
467 | pub fn select<'a>( |
468 | handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], |
469 | ) -> SelectedOperation<'a> { |
470 | if handles.is_empty() { |
471 | panic!("no operations have been added to `Select`" ); |
472 | } |
473 | |
474 | let (token: Token, index: usize, ptr: *const u8) = run_select(handles, Timeout::Never).unwrap(); |
475 | SelectedOperation { |
476 | token, |
477 | index, |
478 | ptr, |
479 | _marker: PhantomData, |
480 | } |
481 | } |
482 | |
483 | /// Blocks for a limited time until one of the operations becomes ready and selects it. |
484 | // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. |
485 | #[inline ] |
486 | pub fn select_timeout<'a>( |
487 | handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], |
488 | timeout: Duration, |
489 | ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { |
490 | match Instant::now().checked_add(duration:timeout) { |
491 | Some(deadline: Instant) => select_deadline(handles, deadline), |
492 | None => Ok(select(handles)), |
493 | } |
494 | } |
495 | |
496 | /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. |
497 | #[inline ] |
498 | pub(crate) fn select_deadline<'a>( |
499 | handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], |
500 | deadline: Instant, |
501 | ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { |
502 | match run_select(handles, Timeout::At(deadline)) { |
503 | None => Err(SelectTimeoutError), |
504 | Some((token: Token, index: usize, ptr: *const u8)) => Ok(SelectedOperation { |
505 | token, |
506 | index, |
507 | ptr, |
508 | _marker: PhantomData, |
509 | }), |
510 | } |
511 | } |
512 | |
513 | /// Selects from a set of channel operations. |
514 | /// |
515 | /// `Select` allows you to define a set of channel operations, wait until any one of them becomes |
516 | /// ready, and finally execute it. If multiple operations are ready at the same time, a random one |
517 | /// among them is selected. |
518 | /// |
519 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even |
520 | /// when it will simply return an error because the channel is disconnected. |
521 | /// |
522 | /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a |
523 | /// dynamically created list of channel operations. |
524 | /// |
525 | /// [`select!`]: crate::select! |
526 | /// |
527 | /// Once a list of operations has been built with `Select`, there are two different ways of |
528 | /// proceeding: |
529 | /// |
530 | /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful, |
531 | /// the returned selected operation has already begun and **must** be completed. If we don't |
532 | /// complete it, a panic will occur. |
533 | /// |
534 | /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If |
535 | /// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's |
536 | /// possible for another thread to make the operation not ready just before we try executing it, |
537 | /// so it's wise to use a retry loop. However, note that these methods might return with success |
538 | /// spuriously, so it's a good idea to always double check if the operation is really ready. |
539 | /// |
540 | /// # Examples |
541 | /// |
542 | /// Use [`select`] to receive a message from a list of receivers: |
543 | /// |
544 | /// ``` |
545 | /// use crossbeam_channel::{Receiver, RecvError, Select}; |
546 | /// |
547 | /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> { |
548 | /// // Build a list of operations. |
549 | /// let mut sel = Select::new(); |
550 | /// for r in rs { |
551 | /// sel.recv(r); |
552 | /// } |
553 | /// |
554 | /// // Complete the selected operation. |
555 | /// let oper = sel.select(); |
556 | /// let index = oper.index(); |
557 | /// oper.recv(&rs[index]) |
558 | /// } |
559 | /// ``` |
560 | /// |
561 | /// Use [`ready`] to receive a message from a list of receivers: |
562 | /// |
563 | /// ``` |
564 | /// use crossbeam_channel::{Receiver, RecvError, Select}; |
565 | /// |
566 | /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> { |
567 | /// // Build a list of operations. |
568 | /// let mut sel = Select::new(); |
569 | /// for r in rs { |
570 | /// sel.recv(r); |
571 | /// } |
572 | /// |
573 | /// loop { |
574 | /// // Wait until a receive operation becomes ready and try executing it. |
575 | /// let index = sel.ready(); |
576 | /// let res = rs[index].try_recv(); |
577 | /// |
578 | /// // If the operation turns out not to be ready, retry. |
579 | /// if let Err(e) = res { |
580 | /// if e.is_empty() { |
581 | /// continue; |
582 | /// } |
583 | /// } |
584 | /// |
585 | /// // Success! |
586 | /// return res.map_err(|_| RecvError); |
587 | /// } |
588 | /// } |
589 | /// ``` |
590 | /// |
591 | /// [`try_select`]: Select::try_select |
592 | /// [`select`]: Select::select |
593 | /// [`select_timeout`]: Select::select_timeout |
594 | /// [`try_ready`]: Select::try_ready |
595 | /// [`ready`]: Select::ready |
596 | /// [`ready_timeout`]: Select::ready_timeout |
597 | pub struct Select<'a> { |
598 | /// A list of senders and receivers participating in selection. |
599 | handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>, |
600 | |
601 | /// The next index to assign to an operation. |
602 | next_index: usize, |
603 | } |
604 | |
605 | unsafe impl Send for Select<'_> {} |
606 | unsafe impl Sync for Select<'_> {} |
607 | |
608 | impl<'a> Select<'a> { |
609 | /// Creates an empty list of channel operations for selection. |
610 | /// |
611 | /// # Examples |
612 | /// |
613 | /// ``` |
614 | /// use crossbeam_channel::Select; |
615 | /// |
616 | /// let mut sel = Select::new(); |
617 | /// |
618 | /// // The list of operations is empty, which means no operation can be selected. |
619 | /// assert!(sel.try_select().is_err()); |
620 | /// ``` |
621 | pub fn new() -> Select<'a> { |
622 | Select { |
623 | handles: Vec::with_capacity(4), |
624 | next_index: 0, |
625 | } |
626 | } |
627 | |
628 | /// Adds a send operation. |
629 | /// |
630 | /// Returns the index of the added operation. |
631 | /// |
632 | /// # Examples |
633 | /// |
634 | /// ``` |
635 | /// use crossbeam_channel::{unbounded, Select}; |
636 | /// |
637 | /// let (s, r) = unbounded::<i32>(); |
638 | /// |
639 | /// let mut sel = Select::new(); |
640 | /// let index = sel.send(&s); |
641 | /// ``` |
642 | pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize { |
643 | let i = self.next_index; |
644 | let ptr = s as *const Sender<_> as *const u8; |
645 | self.handles.push((s, i, ptr)); |
646 | self.next_index += 1; |
647 | i |
648 | } |
649 | |
650 | /// Adds a receive operation. |
651 | /// |
652 | /// Returns the index of the added operation. |
653 | /// |
654 | /// # Examples |
655 | /// |
656 | /// ``` |
657 | /// use crossbeam_channel::{unbounded, Select}; |
658 | /// |
659 | /// let (s, r) = unbounded::<i32>(); |
660 | /// |
661 | /// let mut sel = Select::new(); |
662 | /// let index = sel.recv(&r); |
663 | /// ``` |
664 | pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize { |
665 | let i = self.next_index; |
666 | let ptr = r as *const Receiver<_> as *const u8; |
667 | self.handles.push((r, i, ptr)); |
668 | self.next_index += 1; |
669 | i |
670 | } |
671 | |
672 | /// Removes a previously added operation. |
673 | /// |
674 | /// This is useful when an operation is selected because the channel got disconnected and we |
675 | /// want to try again to select a different operation instead. |
676 | /// |
677 | /// If new operations are added after removing some, the indices of removed operations will not |
678 | /// be reused. |
679 | /// |
680 | /// # Panics |
681 | /// |
682 | /// An attempt to remove a non-existing or already removed operation will panic. |
683 | /// |
684 | /// # Examples |
685 | /// |
686 | /// ``` |
687 | /// use crossbeam_channel::{unbounded, Select}; |
688 | /// |
689 | /// let (s1, r1) = unbounded::<i32>(); |
690 | /// let (_, r2) = unbounded::<i32>(); |
691 | /// |
692 | /// let mut sel = Select::new(); |
693 | /// let oper1 = sel.recv(&r1); |
694 | /// let oper2 = sel.recv(&r2); |
695 | /// |
696 | /// // Both operations are initially ready, so a random one will be executed. |
697 | /// let oper = sel.select(); |
698 | /// assert_eq!(oper.index(), oper2); |
699 | /// assert!(oper.recv(&r2).is_err()); |
700 | /// sel.remove(oper2); |
701 | /// |
702 | /// s1.send(10).unwrap(); |
703 | /// |
704 | /// let oper = sel.select(); |
705 | /// assert_eq!(oper.index(), oper1); |
706 | /// assert_eq!(oper.recv(&r1), Ok(10)); |
707 | /// ``` |
708 | pub fn remove(&mut self, index: usize) { |
709 | assert!( |
710 | index < self.next_index, |
711 | "index out of bounds; {} >= {}" , |
712 | index, |
713 | self.next_index, |
714 | ); |
715 | |
716 | let i = self |
717 | .handles |
718 | .iter() |
719 | .enumerate() |
720 | .find(|(_, (_, i, _))| *i == index) |
721 | .expect("no operation with this index" ) |
722 | .0; |
723 | |
724 | self.handles.swap_remove(i); |
725 | } |
726 | |
727 | /// Attempts to select one of the operations without blocking. |
728 | /// |
729 | /// If an operation is ready, it is selected and returned. If multiple operations are ready at |
730 | /// the same time, a random one among them is selected. If none of the operations are ready, an |
731 | /// error is returned. |
732 | /// |
733 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready |
734 | /// even when it will simply return an error because the channel is disconnected. |
735 | /// |
736 | /// The selected operation must be completed with [`SelectedOperation::send`] |
737 | /// or [`SelectedOperation::recv`]. |
738 | /// |
739 | /// # Examples |
740 | /// |
741 | /// ``` |
742 | /// use crossbeam_channel::{unbounded, Select}; |
743 | /// |
744 | /// let (s1, r1) = unbounded(); |
745 | /// let (s2, r2) = unbounded(); |
746 | /// |
747 | /// s1.send(10).unwrap(); |
748 | /// s2.send(20).unwrap(); |
749 | /// |
750 | /// let mut sel = Select::new(); |
751 | /// let oper1 = sel.recv(&r1); |
752 | /// let oper2 = sel.recv(&r2); |
753 | /// |
754 | /// // Both operations are initially ready, so a random one will be executed. |
755 | /// let oper = sel.try_select(); |
756 | /// match oper { |
757 | /// Err(_) => panic!("both operations should be ready" ), |
758 | /// Ok(oper) => match oper.index() { |
759 | /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), |
760 | /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), |
761 | /// _ => unreachable!(), |
762 | /// } |
763 | /// } |
764 | /// ``` |
765 | pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> { |
766 | try_select(&mut self.handles) |
767 | } |
768 | |
769 | /// Blocks until one of the operations becomes ready and selects it. |
770 | /// |
771 | /// Once an operation becomes ready, it is selected and returned. If multiple operations are |
772 | /// ready at the same time, a random one among them is selected. |
773 | /// |
774 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready |
775 | /// even when it will simply return an error because the channel is disconnected. |
776 | /// |
777 | /// The selected operation must be completed with [`SelectedOperation::send`] |
778 | /// or [`SelectedOperation::recv`]. |
779 | /// |
780 | /// # Panics |
781 | /// |
782 | /// Panics if no operations have been added to `Select`. |
783 | /// |
784 | /// # Examples |
785 | /// |
786 | /// ``` |
787 | /// use std::thread; |
788 | /// use std::time::Duration; |
789 | /// use crossbeam_channel::{unbounded, Select}; |
790 | /// |
791 | /// let (s1, r1) = unbounded(); |
792 | /// let (s2, r2) = unbounded(); |
793 | /// |
794 | /// thread::spawn(move || { |
795 | /// thread::sleep(Duration::from_secs(1)); |
796 | /// s1.send(10).unwrap(); |
797 | /// }); |
798 | /// thread::spawn(move || s2.send(20).unwrap()); |
799 | /// |
800 | /// let mut sel = Select::new(); |
801 | /// let oper1 = sel.recv(&r1); |
802 | /// let oper2 = sel.recv(&r2); |
803 | /// |
804 | /// // The second operation will be selected because it becomes ready first. |
805 | /// let oper = sel.select(); |
806 | /// match oper.index() { |
807 | /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), |
808 | /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), |
809 | /// _ => unreachable!(), |
810 | /// } |
811 | /// ``` |
812 | pub fn select(&mut self) -> SelectedOperation<'a> { |
813 | select(&mut self.handles) |
814 | } |
815 | |
816 | /// Blocks for a limited time until one of the operations becomes ready and selects it. |
817 | /// |
818 | /// If an operation becomes ready, it is selected and returned. If multiple operations are |
819 | /// ready at the same time, a random one among them is selected. If none of the operations |
820 | /// become ready for the specified duration, an error is returned. |
821 | /// |
822 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready |
823 | /// even when it will simply return an error because the channel is disconnected. |
824 | /// |
825 | /// The selected operation must be completed with [`SelectedOperation::send`] |
826 | /// or [`SelectedOperation::recv`]. |
827 | /// |
828 | /// # Examples |
829 | /// |
830 | /// ``` |
831 | /// use std::thread; |
832 | /// use std::time::Duration; |
833 | /// use crossbeam_channel::{unbounded, Select}; |
834 | /// |
835 | /// let (s1, r1) = unbounded(); |
836 | /// let (s2, r2) = unbounded(); |
837 | /// |
838 | /// thread::spawn(move || { |
839 | /// thread::sleep(Duration::from_secs(1)); |
840 | /// s1.send(10).unwrap(); |
841 | /// }); |
842 | /// thread::spawn(move || s2.send(20).unwrap()); |
843 | /// |
844 | /// let mut sel = Select::new(); |
845 | /// let oper1 = sel.recv(&r1); |
846 | /// let oper2 = sel.recv(&r2); |
847 | /// |
848 | /// // The second operation will be selected because it becomes ready first. |
849 | /// let oper = sel.select_timeout(Duration::from_millis(500)); |
850 | /// match oper { |
851 | /// Err(_) => panic!("should not have timed out" ), |
852 | /// Ok(oper) => match oper.index() { |
853 | /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), |
854 | /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), |
855 | /// _ => unreachable!(), |
856 | /// } |
857 | /// } |
858 | /// ``` |
859 | pub fn select_timeout( |
860 | &mut self, |
861 | timeout: Duration, |
862 | ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { |
863 | select_timeout(&mut self.handles, timeout) |
864 | } |
865 | |
866 | /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. |
867 | /// |
868 | /// If an operation becomes ready, it is selected and returned. If multiple operations are |
869 | /// ready at the same time, a random one among them is selected. If none of the operations |
870 | /// become ready before the given deadline, an error is returned. |
871 | /// |
872 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready |
873 | /// even when it will simply return an error because the channel is disconnected. |
874 | /// |
875 | /// The selected operation must be completed with [`SelectedOperation::send`] |
876 | /// or [`SelectedOperation::recv`]. |
877 | /// |
878 | /// # Examples |
879 | /// |
880 | /// ``` |
881 | /// use std::thread; |
882 | /// use std::time::{Instant, Duration}; |
883 | /// use crossbeam_channel::{unbounded, Select}; |
884 | /// |
885 | /// let (s1, r1) = unbounded(); |
886 | /// let (s2, r2) = unbounded(); |
887 | /// |
888 | /// thread::spawn(move || { |
889 | /// thread::sleep(Duration::from_secs(1)); |
890 | /// s1.send(10).unwrap(); |
891 | /// }); |
892 | /// thread::spawn(move || s2.send(20).unwrap()); |
893 | /// |
894 | /// let mut sel = Select::new(); |
895 | /// let oper1 = sel.recv(&r1); |
896 | /// let oper2 = sel.recv(&r2); |
897 | /// |
898 | /// let deadline = Instant::now() + Duration::from_millis(500); |
899 | /// |
900 | /// // The second operation will be selected because it becomes ready first. |
901 | /// let oper = sel.select_deadline(deadline); |
902 | /// match oper { |
903 | /// Err(_) => panic!("should not have timed out" ), |
904 | /// Ok(oper) => match oper.index() { |
905 | /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), |
906 | /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), |
907 | /// _ => unreachable!(), |
908 | /// } |
909 | /// } |
910 | /// ``` |
911 | pub fn select_deadline( |
912 | &mut self, |
913 | deadline: Instant, |
914 | ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { |
915 | select_deadline(&mut self.handles, deadline) |
916 | } |
917 | |
918 | /// Attempts to find a ready operation without blocking. |
919 | /// |
920 | /// If an operation is ready, its index is returned. If multiple operations are ready at the |
921 | /// same time, a random one among them is chosen. If none of the operations are ready, an error |
922 | /// is returned. |
923 | /// |
924 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready |
925 | /// even when it will simply return an error because the channel is disconnected. |
926 | /// |
927 | /// Note that this method might return with success spuriously, so it's a good idea to always |
928 | /// double check if the operation is really ready. |
929 | /// |
930 | /// # Examples |
931 | /// |
932 | /// ``` |
933 | /// use crossbeam_channel::{unbounded, Select}; |
934 | /// |
935 | /// let (s1, r1) = unbounded(); |
936 | /// let (s2, r2) = unbounded(); |
937 | /// |
938 | /// s1.send(10).unwrap(); |
939 | /// s2.send(20).unwrap(); |
940 | /// |
941 | /// let mut sel = Select::new(); |
942 | /// let oper1 = sel.recv(&r1); |
943 | /// let oper2 = sel.recv(&r2); |
944 | /// |
945 | /// // Both operations are initially ready, so a random one will be chosen. |
946 | /// match sel.try_ready() { |
947 | /// Err(_) => panic!("both operations should be ready" ), |
948 | /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), |
949 | /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), |
950 | /// Ok(_) => unreachable!(), |
951 | /// } |
952 | /// ``` |
953 | pub fn try_ready(&mut self) -> Result<usize, TryReadyError> { |
954 | match run_ready(&mut self.handles, Timeout::Now) { |
955 | None => Err(TryReadyError), |
956 | Some(index) => Ok(index), |
957 | } |
958 | } |
959 | |
960 | /// Blocks until one of the operations becomes ready. |
961 | /// |
962 | /// Once an operation becomes ready, its index is returned. If multiple operations are ready at |
963 | /// the same time, a random one among them is chosen. |
964 | /// |
965 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready |
966 | /// even when it will simply return an error because the channel is disconnected. |
967 | /// |
968 | /// Note that this method might return with success spuriously, so it's a good idea to always |
969 | /// double check if the operation is really ready. |
970 | /// |
971 | /// # Panics |
972 | /// |
973 | /// Panics if no operations have been added to `Select`. |
974 | /// |
975 | /// # Examples |
976 | /// |
977 | /// ``` |
978 | /// use std::thread; |
979 | /// use std::time::Duration; |
980 | /// use crossbeam_channel::{unbounded, Select}; |
981 | /// |
982 | /// let (s1, r1) = unbounded(); |
983 | /// let (s2, r2) = unbounded(); |
984 | /// |
985 | /// thread::spawn(move || { |
986 | /// thread::sleep(Duration::from_secs(1)); |
987 | /// s1.send(10).unwrap(); |
988 | /// }); |
989 | /// thread::spawn(move || s2.send(20).unwrap()); |
990 | /// |
991 | /// let mut sel = Select::new(); |
992 | /// let oper1 = sel.recv(&r1); |
993 | /// let oper2 = sel.recv(&r2); |
994 | /// |
995 | /// // The second operation will be selected because it becomes ready first. |
996 | /// match sel.ready() { |
997 | /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), |
998 | /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), |
999 | /// _ => unreachable!(), |
1000 | /// } |
1001 | /// ``` |
1002 | pub fn ready(&mut self) -> usize { |
1003 | if self.handles.is_empty() { |
1004 | panic!("no operations have been added to `Select`" ); |
1005 | } |
1006 | |
1007 | run_ready(&mut self.handles, Timeout::Never).unwrap() |
1008 | } |
1009 | |
1010 | /// Blocks for a limited time until one of the operations becomes ready. |
1011 | /// |
1012 | /// If an operation becomes ready, its index is returned. If multiple operations are ready at |
1013 | /// the same time, a random one among them is chosen. If none of the operations become ready |
1014 | /// for the specified duration, an error is returned. |
1015 | /// |
1016 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready |
1017 | /// even when it will simply return an error because the channel is disconnected. |
1018 | /// |
1019 | /// Note that this method might return with success spuriously, so it's a good idea to double |
1020 | /// check if the operation is really ready. |
1021 | /// |
1022 | /// # Examples |
1023 | /// |
1024 | /// ``` |
1025 | /// use std::thread; |
1026 | /// use std::time::Duration; |
1027 | /// use crossbeam_channel::{unbounded, Select}; |
1028 | /// |
1029 | /// let (s1, r1) = unbounded(); |
1030 | /// let (s2, r2) = unbounded(); |
1031 | /// |
1032 | /// thread::spawn(move || { |
1033 | /// thread::sleep(Duration::from_secs(1)); |
1034 | /// s1.send(10).unwrap(); |
1035 | /// }); |
1036 | /// thread::spawn(move || s2.send(20).unwrap()); |
1037 | /// |
1038 | /// let mut sel = Select::new(); |
1039 | /// let oper1 = sel.recv(&r1); |
1040 | /// let oper2 = sel.recv(&r2); |
1041 | /// |
1042 | /// // The second operation will be selected because it becomes ready first. |
1043 | /// match sel.ready_timeout(Duration::from_millis(500)) { |
1044 | /// Err(_) => panic!("should not have timed out" ), |
1045 | /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), |
1046 | /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), |
1047 | /// Ok(_) => unreachable!(), |
1048 | /// } |
1049 | /// ``` |
1050 | pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> { |
1051 | match Instant::now().checked_add(timeout) { |
1052 | Some(deadline) => self.ready_deadline(deadline), |
1053 | None => Ok(self.ready()), |
1054 | } |
1055 | } |
1056 | |
1057 | /// Blocks until a given deadline, or until one of the operations becomes ready. |
1058 | /// |
1059 | /// If an operation becomes ready, its index is returned. If multiple operations are ready at |
1060 | /// the same time, a random one among them is chosen. If none of the operations become ready |
1061 | /// before the deadline, an error is returned. |
1062 | /// |
1063 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready |
1064 | /// even when it will simply return an error because the channel is disconnected. |
1065 | /// |
1066 | /// Note that this method might return with success spuriously, so it's a good idea to double |
1067 | /// check if the operation is really ready. |
1068 | /// |
1069 | /// # Examples |
1070 | /// |
1071 | /// ``` |
1072 | /// use std::thread; |
1073 | /// use std::time::{Duration, Instant}; |
1074 | /// use crossbeam_channel::{unbounded, Select}; |
1075 | /// |
1076 | /// let deadline = Instant::now() + Duration::from_millis(500); |
1077 | /// |
1078 | /// let (s1, r1) = unbounded(); |
1079 | /// let (s2, r2) = unbounded(); |
1080 | /// |
1081 | /// thread::spawn(move || { |
1082 | /// thread::sleep(Duration::from_secs(1)); |
1083 | /// s1.send(10).unwrap(); |
1084 | /// }); |
1085 | /// thread::spawn(move || s2.send(20).unwrap()); |
1086 | /// |
1087 | /// let mut sel = Select::new(); |
1088 | /// let oper1 = sel.recv(&r1); |
1089 | /// let oper2 = sel.recv(&r2); |
1090 | /// |
1091 | /// // The second operation will be selected because it becomes ready first. |
1092 | /// match sel.ready_deadline(deadline) { |
1093 | /// Err(_) => panic!("should not have timed out" ), |
1094 | /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), |
1095 | /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), |
1096 | /// Ok(_) => unreachable!(), |
1097 | /// } |
1098 | /// ``` |
1099 | pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> { |
1100 | match run_ready(&mut self.handles, Timeout::At(deadline)) { |
1101 | None => Err(ReadyTimeoutError), |
1102 | Some(index) => Ok(index), |
1103 | } |
1104 | } |
1105 | } |
1106 | |
1107 | impl<'a> Clone for Select<'a> { |
1108 | fn clone(&self) -> Select<'a> { |
1109 | Select { |
1110 | handles: self.handles.clone(), |
1111 | next_index: self.next_index, |
1112 | } |
1113 | } |
1114 | } |
1115 | |
1116 | impl<'a> Default for Select<'a> { |
1117 | fn default() -> Select<'a> { |
1118 | Select::new() |
1119 | } |
1120 | } |
1121 | |
1122 | impl fmt::Debug for Select<'_> { |
1123 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1124 | f.pad("Select { .. }" ) |
1125 | } |
1126 | } |
1127 | |
1128 | /// A selected operation that needs to be completed. |
1129 | /// |
1130 | /// To complete the operation, call [`send`] or [`recv`]. |
1131 | /// |
1132 | /// # Panics |
1133 | /// |
1134 | /// Forgetting to complete the operation is an error and might lead to deadlocks. If a |
1135 | /// `SelectedOperation` is dropped without completion, a panic occurs. |
1136 | /// |
1137 | /// [`send`]: SelectedOperation::send |
1138 | /// [`recv`]: SelectedOperation::recv |
1139 | #[must_use ] |
1140 | pub struct SelectedOperation<'a> { |
1141 | /// Token needed to complete the operation. |
1142 | token: Token, |
1143 | |
1144 | /// The index of the selected operation. |
1145 | index: usize, |
1146 | |
1147 | /// The address of the selected `Sender` or `Receiver`. |
1148 | ptr: *const u8, |
1149 | |
1150 | /// Indicates that `Sender`s and `Receiver`s are borrowed. |
1151 | _marker: PhantomData<&'a ()>, |
1152 | } |
1153 | |
1154 | impl SelectedOperation<'_> { |
1155 | /// Returns the index of the selected operation. |
1156 | /// |
1157 | /// # Examples |
1158 | /// |
1159 | /// ``` |
1160 | /// use crossbeam_channel::{bounded, Select}; |
1161 | /// |
1162 | /// let (s1, r1) = bounded::<()>(0); |
1163 | /// let (s2, r2) = bounded::<()>(0); |
1164 | /// let (s3, r3) = bounded::<()>(1); |
1165 | /// |
1166 | /// let mut sel = Select::new(); |
1167 | /// let oper1 = sel.send(&s1); |
1168 | /// let oper2 = sel.recv(&r2); |
1169 | /// let oper3 = sel.send(&s3); |
1170 | /// |
1171 | /// // Only the last operation is ready. |
1172 | /// let oper = sel.select(); |
1173 | /// assert_eq!(oper.index(), 2); |
1174 | /// assert_eq!(oper.index(), oper3); |
1175 | /// |
1176 | /// // Complete the operation. |
1177 | /// oper.send(&s3, ()).unwrap(); |
1178 | /// ``` |
1179 | pub fn index(&self) -> usize { |
1180 | self.index |
1181 | } |
1182 | |
1183 | /// Completes the send operation. |
1184 | /// |
1185 | /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`] |
1186 | /// when the operation was added. |
1187 | /// |
1188 | /// # Panics |
1189 | /// |
1190 | /// Panics if an incorrect [`Sender`] reference is passed. |
1191 | /// |
1192 | /// # Examples |
1193 | /// |
1194 | /// ``` |
1195 | /// use crossbeam_channel::{bounded, Select, SendError}; |
1196 | /// |
1197 | /// let (s, r) = bounded::<i32>(0); |
1198 | /// drop(r); |
1199 | /// |
1200 | /// let mut sel = Select::new(); |
1201 | /// let oper1 = sel.send(&s); |
1202 | /// |
1203 | /// let oper = sel.select(); |
1204 | /// assert_eq!(oper.index(), oper1); |
1205 | /// assert_eq!(oper.send(&s, 10), Err(SendError(10))); |
1206 | /// ``` |
1207 | pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> { |
1208 | assert!( |
1209 | s as *const Sender<T> as *const u8 == self.ptr, |
1210 | "passed a sender that wasn't selected" , |
1211 | ); |
1212 | let res = unsafe { channel::write(s, &mut self.token, msg) }; |
1213 | mem::forget(self); |
1214 | res.map_err(SendError) |
1215 | } |
1216 | |
1217 | /// Completes the receive operation. |
1218 | /// |
1219 | /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`] |
1220 | /// when the operation was added. |
1221 | /// |
1222 | /// # Panics |
1223 | /// |
1224 | /// Panics if an incorrect [`Receiver`] reference is passed. |
1225 | /// |
1226 | /// # Examples |
1227 | /// |
1228 | /// ``` |
1229 | /// use crossbeam_channel::{bounded, Select, RecvError}; |
1230 | /// |
1231 | /// let (s, r) = bounded::<i32>(0); |
1232 | /// drop(s); |
1233 | /// |
1234 | /// let mut sel = Select::new(); |
1235 | /// let oper1 = sel.recv(&r); |
1236 | /// |
1237 | /// let oper = sel.select(); |
1238 | /// assert_eq!(oper.index(), oper1); |
1239 | /// assert_eq!(oper.recv(&r), Err(RecvError)); |
1240 | /// ``` |
1241 | pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> { |
1242 | assert!( |
1243 | r as *const Receiver<T> as *const u8 == self.ptr, |
1244 | "passed a receiver that wasn't selected" , |
1245 | ); |
1246 | let res = unsafe { channel::read(r, &mut self.token) }; |
1247 | mem::forget(self); |
1248 | res.map_err(|_| RecvError) |
1249 | } |
1250 | } |
1251 | |
1252 | impl fmt::Debug for SelectedOperation<'_> { |
1253 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1254 | f.pad("SelectedOperation { .. }" ) |
1255 | } |
1256 | } |
1257 | |
1258 | impl Drop for SelectedOperation<'_> { |
1259 | fn drop(&mut self) { |
1260 | panic!("dropped `SelectedOperation` without completing the operation" ); |
1261 | } |
1262 | } |
1263 | |