1 | //! Multi-producer, multi-consumer FIFO queue communication primitives. |
2 | //! |
3 | //! This module provides message-based communication over channels, concretely |
4 | //! defined by two types: |
5 | //! |
6 | //! * [`Sender`] |
7 | //! * [`Receiver`] |
8 | //! |
9 | //! [`Sender`]s are used to send data to a set of [`Receiver`]s. Both |
10 | //! sender and receiver are cloneable (multi-producer) such that many threads can send |
11 | //! simultaneously to receivers (multi-consumer). |
12 | //! |
13 | //! These channels come in two flavors: |
14 | //! |
15 | //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function |
16 | //! will return a `(Sender, Receiver)` tuple where all sends will be |
17 | //! **asynchronous** (they never block). The channel conceptually has an |
18 | //! infinite buffer. |
19 | //! |
20 | //! 2. A synchronous, bounded channel. The [`sync_channel`] function will |
21 | //! return a `(Sender, Receiver)` tuple where the storage for pending |
22 | //! messages is a pre-allocated buffer of a fixed size. All sends will be |
23 | //! **synchronous** by blocking until there is buffer space available. Note |
24 | //! that a bound of 0 is allowed, causing the channel to become a "rendezvous" |
25 | //! channel where each sender atomically hands off a message to a receiver. |
26 | //! |
27 | //! [`send`]: Sender::send |
28 | //! |
29 | //! ## Disconnection |
30 | //! |
31 | //! The send and receive operations on channels will all return a [`Result`] |
32 | //! indicating whether the operation succeeded or not. An unsuccessful operation |
33 | //! is normally indicative of the other half of a channel having "hung up" by |
34 | //! being dropped in its corresponding thread. |
35 | //! |
36 | //! Once half of a channel has been deallocated, most operations can no longer |
37 | //! continue to make progress, so [`Err`] will be returned. Many applications |
38 | //! will continue to [`unwrap`] the results returned from this module, |
39 | //! instigating a propagation of failure among threads if one unexpectedly dies. |
40 | //! |
41 | //! [`unwrap`]: Result::unwrap |
42 | //! |
43 | //! # Examples |
44 | //! |
45 | //! Simple usage: |
46 | //! |
47 | //! ``` |
48 | //! #![feature(mpmc_channel)] |
49 | //! |
50 | //! use std::thread; |
51 | //! use std::sync::mpmc::channel; |
52 | //! |
53 | //! // Create a simple streaming channel |
54 | //! let (tx, rx) = channel(); |
55 | //! thread::spawn(move || { |
56 | //! tx.send(10).unwrap(); |
57 | //! }); |
58 | //! assert_eq!(rx.recv().unwrap(), 10); |
59 | //! ``` |
60 | //! |
61 | //! Shared usage: |
62 | //! |
63 | //! ``` |
64 | //! #![feature(mpmc_channel)] |
65 | //! |
66 | //! use std::thread; |
67 | //! use std::sync::mpmc::channel; |
68 | //! |
69 | //! thread::scope(|s| { |
70 | //! // Create a shared channel that can be sent along from many threads |
71 | //! // where tx is the sending half (tx for transmission), and rx is the receiving |
72 | //! // half (rx for receiving). |
73 | //! let (tx, rx) = channel(); |
74 | //! for i in 0..10 { |
75 | //! let tx = tx.clone(); |
76 | //! s.spawn(move || { |
77 | //! tx.send(i).unwrap(); |
78 | //! }); |
79 | //! } |
80 | //! |
81 | //! for _ in 0..5 { |
82 | //! let rx1 = rx.clone(); |
83 | //! let rx2 = rx.clone(); |
84 | //! s.spawn(move || { |
85 | //! let j = rx1.recv().unwrap(); |
86 | //! assert!(0 <= j && j < 10); |
87 | //! }); |
88 | //! s.spawn(move || { |
89 | //! let j = rx2.recv().unwrap(); |
90 | //! assert!(0 <= j && j < 10); |
91 | //! }); |
92 | //! } |
93 | //! }) |
94 | //! ``` |
95 | //! |
96 | //! Propagating panics: |
97 | //! |
98 | //! ``` |
99 | //! #![feature(mpmc_channel)] |
100 | //! |
101 | //! use std::sync::mpmc::channel; |
102 | //! |
103 | //! // The call to recv() will return an error because the channel has already |
104 | //! // hung up (or been deallocated) |
105 | //! let (tx, rx) = channel::<i32>(); |
106 | //! drop(tx); |
107 | //! assert!(rx.recv().is_err()); |
108 | //! ``` |
109 | |
110 | // This module is used as the implementation for the channels in `sync::mpsc`. |
111 | // The implementation comes from the crossbeam-channel crate: |
112 | // |
113 | // Copyright (c) 2019 The Crossbeam Project Developers |
114 | // |
115 | // Permission is hereby granted, free of charge, to any |
116 | // person obtaining a copy of this software and associated |
117 | // documentation files (the "Software"), to deal in the |
118 | // Software without restriction, including without |
119 | // limitation the rights to use, copy, modify, merge, |
120 | // publish, distribute, sublicense, and/or sell copies of |
121 | // the Software, and to permit persons to whom the Software |
122 | // is furnished to do so, subject to the following |
123 | // conditions: |
124 | // |
125 | // The above copyright notice and this permission notice |
126 | // shall be included in all copies or substantial portions |
127 | // of the Software. |
128 | // |
129 | // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF |
130 | // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED |
131 | // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A |
132 | // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
133 | // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY |
134 | // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |
135 | // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR |
136 | // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER |
137 | // DEALINGS IN THE SOFTWARE. |
138 | |
139 | mod array; |
140 | mod context; |
141 | mod counter; |
142 | mod error; |
143 | mod list; |
144 | mod select; |
145 | mod utils; |
146 | mod waker; |
147 | mod zero; |
148 | |
149 | pub use error::*; |
150 | |
151 | use crate::fmt; |
152 | use crate::panic::{RefUnwindSafe, UnwindSafe}; |
153 | use crate::time::{Duration, Instant}; |
154 | |
155 | /// Creates a new asynchronous channel, returning the sender/receiver halves. |
156 | /// |
157 | /// All data sent on the [`Sender`] will become available on the [`Receiver`] in |
158 | /// the same order as it was sent, and no [`send`] will block the calling thread |
159 | /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will |
160 | /// block after its buffer limit is reached). [`recv`] will block until a message |
161 | /// is available while there is at least one [`Sender`] alive (including clones). |
162 | /// |
163 | /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times. |
164 | /// The [`Receiver`] also can be cloned to have multi receivers. |
165 | /// |
166 | /// If the [`Receiver`] is disconnected while trying to [`send`] with the |
167 | /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the |
168 | /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will |
169 | /// return a [`RecvError`]. |
170 | /// |
171 | /// [`send`]: Sender::send |
172 | /// [`recv`]: Receiver::recv |
173 | /// |
174 | /// # Examples |
175 | /// |
176 | /// ``` |
177 | /// #![feature(mpmc_channel)] |
178 | /// |
179 | /// use std::sync::mpmc::channel; |
180 | /// use std::thread; |
181 | /// |
182 | /// let (sender, receiver) = channel(); |
183 | /// |
184 | /// // Spawn off an expensive computation |
185 | /// thread::spawn(move || { |
186 | /// # fn expensive_computation() {} |
187 | /// sender.send(expensive_computation()).unwrap(); |
188 | /// }); |
189 | /// |
190 | /// // Do some useful work for awhile |
191 | /// |
192 | /// // Let's see what that answer was |
193 | /// println!("{:?}" , receiver.recv().unwrap()); |
194 | /// ``` |
195 | #[must_use ] |
196 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
197 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
198 | let (s: Sender>, r: Receiver>) = counter::new(chan:list::Channel::new()); |
199 | let s: Sender = Sender { flavor: SenderFlavor::List(s) }; |
200 | let r: Receiver = Receiver { flavor: ReceiverFlavor::List(r) }; |
201 | (s, r) |
202 | } |
203 | |
204 | /// Creates a new synchronous, bounded channel. |
205 | /// |
206 | /// All data sent on the [`Sender`] will become available on the [`Receiver`] |
207 | /// in the same order as it was sent. Like asynchronous [`channel`]s, the |
208 | /// [`Receiver`] will block until a message becomes available. `sync_channel` |
209 | /// differs greatly in the semantics of the sender, however. |
210 | /// |
211 | /// This channel has an internal buffer on which messages will be queued. |
212 | /// `bound` specifies the buffer size. When the internal buffer becomes full, |
213 | /// future sends will *block* waiting for the buffer to open up. Note that a |
214 | /// buffer size of 0 is valid, in which case this becomes "rendezvous channel" |
215 | /// where each [`send`] will not return until a [`recv`] is paired with it. |
216 | /// |
217 | /// The [`Sender`] can be cloned to [`send`] to the same channel multiple |
218 | /// times. The [`Receiver`] also can be cloned to have multi receivers. |
219 | /// |
220 | /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying |
221 | /// to [`send`] with the [`Sender`], the [`send`] method will return a |
222 | /// [`SendError`]. Similarly, If the [`Sender`] is disconnected while trying |
223 | /// to [`recv`], the [`recv`] method will return a [`RecvError`]. |
224 | /// |
225 | /// [`send`]: Sender::send |
226 | /// [`recv`]: Receiver::recv |
227 | /// |
228 | /// # Examples |
229 | /// |
230 | /// ``` |
231 | /// use std::sync::mpsc::sync_channel; |
232 | /// use std::thread; |
233 | /// |
234 | /// let (sender, receiver) = sync_channel(1); |
235 | /// |
236 | /// // this returns immediately |
237 | /// sender.send(1).unwrap(); |
238 | /// |
239 | /// thread::spawn(move || { |
240 | /// // this will block until the previous message has been received |
241 | /// sender.send(2).unwrap(); |
242 | /// }); |
243 | /// |
244 | /// assert_eq!(receiver.recv().unwrap(), 1); |
245 | /// assert_eq!(receiver.recv().unwrap(), 2); |
246 | /// ``` |
247 | #[must_use ] |
248 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
249 | pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) { |
250 | if cap == 0 { |
251 | let (s: Sender>, r: Receiver>) = counter::new(chan:zero::Channel::new()); |
252 | let s: Sender = Sender { flavor: SenderFlavor::Zero(s) }; |
253 | let r: Receiver = Receiver { flavor: ReceiverFlavor::Zero(r) }; |
254 | (s, r) |
255 | } else { |
256 | let (s: Sender>, r: Receiver>) = counter::new(chan:array::Channel::with_capacity(cap)); |
257 | let s: Sender = Sender { flavor: SenderFlavor::Array(s) }; |
258 | let r: Receiver = Receiver { flavor: ReceiverFlavor::Array(r) }; |
259 | (s, r) |
260 | } |
261 | } |
262 | |
263 | /// The sending-half of Rust's synchronous [`channel`] type. |
264 | /// |
265 | /// Messages can be sent through this channel with [`send`]. |
266 | /// |
267 | /// Note: all senders (the original and its clones) need to be dropped for the receiver |
268 | /// to stop blocking to receive messages with [`Receiver::recv`]. |
269 | /// |
270 | /// [`send`]: Sender::send |
271 | /// |
272 | /// # Examples |
273 | /// |
274 | /// ```rust |
275 | /// #![feature(mpmc_channel)] |
276 | /// |
277 | /// use std::sync::mpmc::channel; |
278 | /// use std::thread; |
279 | /// |
280 | /// let (sender, receiver) = channel(); |
281 | /// let sender2 = sender.clone(); |
282 | /// |
283 | /// // First thread owns sender |
284 | /// thread::spawn(move || { |
285 | /// sender.send(1).unwrap(); |
286 | /// }); |
287 | /// |
288 | /// // Second thread owns sender2 |
289 | /// thread::spawn(move || { |
290 | /// sender2.send(2).unwrap(); |
291 | /// }); |
292 | /// |
293 | /// let msg = receiver.recv().unwrap(); |
294 | /// let msg2 = receiver.recv().unwrap(); |
295 | /// |
296 | /// assert_eq!(3, msg + msg2); |
297 | /// ``` |
298 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
299 | pub struct Sender<T> { |
300 | flavor: SenderFlavor<T>, |
301 | } |
302 | |
303 | /// Sender flavors. |
304 | enum SenderFlavor<T> { |
305 | /// Bounded channel based on a preallocated array. |
306 | Array(counter::Sender<array::Channel<T>>), |
307 | |
308 | /// Unbounded channel implemented as a linked list. |
309 | List(counter::Sender<list::Channel<T>>), |
310 | |
311 | /// Zero-capacity channel. |
312 | Zero(counter::Sender<zero::Channel<T>>), |
313 | } |
314 | |
315 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
316 | unsafe impl<T: Send> Send for Sender<T> {} |
317 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
318 | unsafe impl<T: Send> Sync for Sender<T> {} |
319 | |
320 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
321 | impl<T> UnwindSafe for Sender<T> {} |
322 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
323 | impl<T> RefUnwindSafe for Sender<T> {} |
324 | |
325 | impl<T> Sender<T> { |
326 | /// Attempts to send a message into the channel without blocking. |
327 | /// |
328 | /// This method will either send a message into the channel immediately or return an error if |
329 | /// the channel is full or disconnected. The returned error contains the original message. |
330 | /// |
331 | /// If called on a zero-capacity channel, this method will send the message only if there |
332 | /// happens to be a receive operation on the other side of the channel at the same time. |
333 | /// |
334 | /// # Examples |
335 | /// |
336 | /// ```rust |
337 | /// #![feature(mpmc_channel)] |
338 | /// |
339 | /// use std::sync::mpmc::{channel, Receiver, Sender}; |
340 | /// |
341 | /// let (sender, _receiver): (Sender<i32>, Receiver<i32>) = channel(); |
342 | /// |
343 | /// assert!(sender.try_send(1).is_ok()); |
344 | /// ``` |
345 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
346 | pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
347 | match &self.flavor { |
348 | SenderFlavor::Array(chan) => chan.try_send(msg), |
349 | SenderFlavor::List(chan) => chan.try_send(msg), |
350 | SenderFlavor::Zero(chan) => chan.try_send(msg), |
351 | } |
352 | } |
353 | |
354 | /// Attempts to send a value on this channel, returning it back if it could |
355 | /// not be sent. |
356 | /// |
357 | /// A successful send occurs when it is determined that the other end of |
358 | /// the channel has not hung up already. An unsuccessful send would be one |
359 | /// where the corresponding receiver has already been deallocated. Note |
360 | /// that a return value of [`Err`] means that the data will never be |
361 | /// received, but a return value of [`Ok`] does *not* mean that the data |
362 | /// will be received. It is possible for the corresponding receiver to |
363 | /// hang up immediately after this function returns [`Ok`]. However, if |
364 | /// the channel is zero-capacity, it acts as a rendezvous channel and a |
365 | /// return value of [`Ok`] means that the data has been received. |
366 | /// |
367 | /// If the channel is full and not disconnected, this call will block until |
368 | /// the send operation can proceed. If the channel becomes disconnected, |
369 | /// this call will wake up and return an error. The returned error contains |
370 | /// the original message. |
371 | /// |
372 | /// If called on a zero-capacity channel, this method will wait for a receive |
373 | /// operation to appear on the other side of the channel. |
374 | /// |
375 | /// # Examples |
376 | /// |
377 | /// ``` |
378 | /// #![feature(mpmc_channel)] |
379 | /// |
380 | /// use std::sync::mpmc::channel; |
381 | /// |
382 | /// let (tx, rx) = channel(); |
383 | /// |
384 | /// // This send is always successful |
385 | /// tx.send(1).unwrap(); |
386 | /// |
387 | /// // This send will fail because the receiver is gone |
388 | /// drop(rx); |
389 | /// assert!(tx.send(1).is_err()); |
390 | /// ``` |
391 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
392 | pub fn send(&self, msg: T) -> Result<(), SendError<T>> { |
393 | match &self.flavor { |
394 | SenderFlavor::Array(chan) => chan.send(msg, None), |
395 | SenderFlavor::List(chan) => chan.send(msg, None), |
396 | SenderFlavor::Zero(chan) => chan.send(msg, None), |
397 | } |
398 | .map_err(|err| match err { |
399 | SendTimeoutError::Disconnected(msg) => SendError(msg), |
400 | SendTimeoutError::Timeout(_) => unreachable!(), |
401 | }) |
402 | } |
403 | } |
404 | |
405 | impl<T> Sender<T> { |
406 | /// Waits for a message to be sent into the channel, but only for a limited time. |
407 | /// |
408 | /// If the channel is full and not disconnected, this call will block until the send operation |
409 | /// can proceed or the operation times out. If the channel becomes disconnected, this call will |
410 | /// wake up and return an error. The returned error contains the original message. |
411 | /// |
412 | /// If called on a zero-capacity channel, this method will wait for a receive operation to |
413 | /// appear on the other side of the channel. |
414 | /// |
415 | /// # Examples |
416 | /// |
417 | /// ``` |
418 | /// #![feature(mpmc_channel)] |
419 | /// |
420 | /// use std::sync::mpmc::channel; |
421 | /// use std::time::Duration; |
422 | /// |
423 | /// let (tx, rx) = channel(); |
424 | /// |
425 | /// tx.send_timeout(1, Duration::from_millis(400)).unwrap(); |
426 | /// ``` |
427 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
428 | pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> { |
429 | match Instant::now().checked_add(timeout) { |
430 | Some(deadline) => self.send_deadline(msg, deadline), |
431 | // So far in the future that it's practically the same as waiting indefinitely. |
432 | None => self.send(msg).map_err(SendTimeoutError::from), |
433 | } |
434 | } |
435 | |
436 | /// Waits for a message to be sent into the channel, but only until a given deadline. |
437 | /// |
438 | /// If the channel is full and not disconnected, this call will block until the send operation |
439 | /// can proceed or the operation times out. If the channel becomes disconnected, this call will |
440 | /// wake up and return an error. The returned error contains the original message. |
441 | /// |
442 | /// If called on a zero-capacity channel, this method will wait for a receive operation to |
443 | /// appear on the other side of the channel. |
444 | /// |
445 | /// # Examples |
446 | /// |
447 | /// ``` |
448 | /// #![feature(mpmc_channel)] |
449 | /// |
450 | /// use std::sync::mpmc::channel; |
451 | /// use std::time::{Duration, Instant}; |
452 | /// |
453 | /// let (tx, rx) = channel(); |
454 | /// |
455 | /// let t = Instant::now() + Duration::from_millis(400); |
456 | /// tx.send_deadline(1, t).unwrap(); |
457 | /// ``` |
458 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
459 | pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> { |
460 | match &self.flavor { |
461 | SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), |
462 | SenderFlavor::List(chan) => chan.send(msg, Some(deadline)), |
463 | SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)), |
464 | } |
465 | } |
466 | |
467 | /// Returns `true` if the channel is empty. |
468 | /// |
469 | /// Note: Zero-capacity channels are always empty. |
470 | /// |
471 | /// # Examples |
472 | /// |
473 | /// ``` |
474 | /// #![feature(mpmc_channel)] |
475 | /// |
476 | /// use std::sync::mpmc; |
477 | /// use std::thread; |
478 | /// |
479 | /// let (send, _recv) = mpmc::channel(); |
480 | /// |
481 | /// let tx1 = send.clone(); |
482 | /// let tx2 = send.clone(); |
483 | /// |
484 | /// assert!(tx1.is_empty()); |
485 | /// |
486 | /// let handle = thread::spawn(move || { |
487 | /// tx2.send(1u8).unwrap(); |
488 | /// }); |
489 | /// |
490 | /// handle.join().unwrap(); |
491 | /// |
492 | /// assert!(!tx1.is_empty()); |
493 | /// ``` |
494 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
495 | pub fn is_empty(&self) -> bool { |
496 | match &self.flavor { |
497 | SenderFlavor::Array(chan) => chan.is_empty(), |
498 | SenderFlavor::List(chan) => chan.is_empty(), |
499 | SenderFlavor::Zero(chan) => chan.is_empty(), |
500 | } |
501 | } |
502 | |
503 | /// Returns `true` if the channel is full. |
504 | /// |
505 | /// Note: Zero-capacity channels are always full. |
506 | /// |
507 | /// # Examples |
508 | /// |
509 | /// ``` |
510 | /// #![feature(mpmc_channel)] |
511 | /// |
512 | /// use std::sync::mpmc; |
513 | /// use std::thread; |
514 | /// |
515 | /// let (send, _recv) = mpmc::sync_channel(1); |
516 | /// |
517 | /// let (tx1, tx2) = (send.clone(), send.clone()); |
518 | /// assert!(!tx1.is_full()); |
519 | /// |
520 | /// let handle = thread::spawn(move || { |
521 | /// tx2.send(1u8).unwrap(); |
522 | /// }); |
523 | /// |
524 | /// handle.join().unwrap(); |
525 | /// |
526 | /// assert!(tx1.is_full()); |
527 | /// ``` |
528 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
529 | pub fn is_full(&self) -> bool { |
530 | match &self.flavor { |
531 | SenderFlavor::Array(chan) => chan.is_full(), |
532 | SenderFlavor::List(chan) => chan.is_full(), |
533 | SenderFlavor::Zero(chan) => chan.is_full(), |
534 | } |
535 | } |
536 | |
537 | /// Returns the number of messages in the channel. |
538 | /// |
539 | /// # Examples |
540 | /// |
541 | /// ``` |
542 | /// #![feature(mpmc_channel)] |
543 | /// |
544 | /// use std::sync::mpmc; |
545 | /// use std::thread; |
546 | /// |
547 | /// let (send, _recv) = mpmc::channel(); |
548 | /// let (tx1, tx2) = (send.clone(), send.clone()); |
549 | /// |
550 | /// assert_eq!(tx1.len(), 0); |
551 | /// |
552 | /// let handle = thread::spawn(move || { |
553 | /// tx2.send(1u8).unwrap(); |
554 | /// }); |
555 | /// |
556 | /// handle.join().unwrap(); |
557 | /// |
558 | /// assert_eq!(tx1.len(), 1); |
559 | /// ``` |
560 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
561 | pub fn len(&self) -> usize { |
562 | match &self.flavor { |
563 | SenderFlavor::Array(chan) => chan.len(), |
564 | SenderFlavor::List(chan) => chan.len(), |
565 | SenderFlavor::Zero(chan) => chan.len(), |
566 | } |
567 | } |
568 | |
569 | /// If the channel is bounded, returns its capacity. |
570 | /// |
571 | /// # Examples |
572 | /// |
573 | /// ``` |
574 | /// #![feature(mpmc_channel)] |
575 | /// |
576 | /// use std::sync::mpmc; |
577 | /// use std::thread; |
578 | /// |
579 | /// let (send, _recv) = mpmc::sync_channel(3); |
580 | /// let (tx1, tx2) = (send.clone(), send.clone()); |
581 | /// |
582 | /// assert_eq!(tx1.capacity(), Some(3)); |
583 | /// |
584 | /// let handle = thread::spawn(move || { |
585 | /// tx2.send(1u8).unwrap(); |
586 | /// }); |
587 | /// |
588 | /// handle.join().unwrap(); |
589 | /// |
590 | /// assert_eq!(tx1.capacity(), Some(3)); |
591 | /// ``` |
592 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
593 | pub fn capacity(&self) -> Option<usize> { |
594 | match &self.flavor { |
595 | SenderFlavor::Array(chan) => chan.capacity(), |
596 | SenderFlavor::List(chan) => chan.capacity(), |
597 | SenderFlavor::Zero(chan) => chan.capacity(), |
598 | } |
599 | } |
600 | |
601 | /// Returns `true` if senders belong to the same channel. |
602 | /// |
603 | /// # Examples |
604 | /// |
605 | /// ``` |
606 | /// #![feature(mpmc_channel)] |
607 | /// |
608 | /// use std::sync::mpmc; |
609 | /// |
610 | /// let (tx1, _) = mpmc::channel::<i32>(); |
611 | /// let (tx2, _) = mpmc::channel::<i32>(); |
612 | /// |
613 | /// assert!(tx1.same_channel(&tx1)); |
614 | /// assert!(!tx1.same_channel(&tx2)); |
615 | /// ``` |
616 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
617 | pub fn same_channel(&self, other: &Sender<T>) -> bool { |
618 | match (&self.flavor, &other.flavor) { |
619 | (SenderFlavor::Array(a), SenderFlavor::Array(b)) => a == b, |
620 | (SenderFlavor::List(a), SenderFlavor::List(b)) => a == b, |
621 | (SenderFlavor::Zero(a), SenderFlavor::Zero(b)) => a == b, |
622 | _ => false, |
623 | } |
624 | } |
625 | } |
626 | |
627 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
628 | impl<T> Drop for Sender<T> { |
629 | fn drop(&mut self) { |
630 | unsafe { |
631 | match &self.flavor { |
632 | SenderFlavor::Array(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect_senders()), |
633 | SenderFlavor::List(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect_senders()), |
634 | SenderFlavor::Zero(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect()), |
635 | } |
636 | } |
637 | } |
638 | } |
639 | |
640 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
641 | impl<T> Clone for Sender<T> { |
642 | fn clone(&self) -> Self { |
643 | let flavor: SenderFlavor = match &self.flavor { |
644 | SenderFlavor::Array(chan: &Sender>) => SenderFlavor::Array(chan.acquire()), |
645 | SenderFlavor::List(chan: &Sender>) => SenderFlavor::List(chan.acquire()), |
646 | SenderFlavor::Zero(chan: &Sender>) => SenderFlavor::Zero(chan.acquire()), |
647 | }; |
648 | |
649 | Sender { flavor } |
650 | } |
651 | } |
652 | |
653 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
654 | impl<T> fmt::Debug for Sender<T> { |
655 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
656 | f.pad("Sender { .. }" ) |
657 | } |
658 | } |
659 | |
660 | /// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type. |
661 | /// Different threads can share this [`Receiver`] by cloning it. |
662 | /// |
663 | /// Messages sent to the channel can be retrieved using [`recv`]. |
664 | /// |
665 | /// [`recv`]: Receiver::recv |
666 | /// |
667 | /// # Examples |
668 | /// |
669 | /// ```rust |
670 | /// #![feature(mpmc_channel)] |
671 | /// |
672 | /// use std::sync::mpmc::channel; |
673 | /// use std::thread; |
674 | /// use std::time::Duration; |
675 | /// |
676 | /// let (send, recv) = channel(); |
677 | /// |
678 | /// let tx_thread = thread::spawn(move || { |
679 | /// send.send("Hello world!" ).unwrap(); |
680 | /// thread::sleep(Duration::from_secs(2)); // block for two seconds |
681 | /// send.send("Delayed for 2 seconds" ).unwrap(); |
682 | /// }); |
683 | /// |
684 | /// let (rx1, rx2) = (recv.clone(), recv.clone()); |
685 | /// let rx_thread_1 = thread::spawn(move || { |
686 | /// println!("{}" , rx1.recv().unwrap()); // Received immediately |
687 | /// }); |
688 | /// let rx_thread_2 = thread::spawn(move || { |
689 | /// println!("{}" , rx2.recv().unwrap()); // Received after 2 seconds |
690 | /// }); |
691 | /// |
692 | /// tx_thread.join().unwrap(); |
693 | /// rx_thread_1.join().unwrap(); |
694 | /// rx_thread_2.join().unwrap(); |
695 | /// ``` |
696 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
697 | pub struct Receiver<T> { |
698 | flavor: ReceiverFlavor<T>, |
699 | } |
700 | |
701 | /// An iterator over messages on a [`Receiver`], created by [`iter`]. |
702 | /// |
703 | /// This iterator will block whenever [`next`] is called, |
704 | /// waiting for a new message, and [`None`] will be returned |
705 | /// when the corresponding channel has hung up. |
706 | /// |
707 | /// [`iter`]: Receiver::iter |
708 | /// [`next`]: Iterator::next |
709 | /// |
710 | /// # Examples |
711 | /// |
712 | /// ```rust |
713 | /// #![feature(mpmc_channel)] |
714 | /// |
715 | /// use std::sync::mpmc::channel; |
716 | /// use std::thread; |
717 | /// |
718 | /// let (send, recv) = channel(); |
719 | /// |
720 | /// thread::spawn(move || { |
721 | /// send.send(1u8).unwrap(); |
722 | /// send.send(2u8).unwrap(); |
723 | /// send.send(3u8).unwrap(); |
724 | /// }); |
725 | /// |
726 | /// for x in recv.iter() { |
727 | /// println!("Got: {x}" ); |
728 | /// } |
729 | /// ``` |
730 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
731 | #[derive (Debug)] |
732 | pub struct Iter<'a, T: 'a> { |
733 | rx: &'a Receiver<T>, |
734 | } |
735 | |
736 | /// An iterator that attempts to yield all pending values for a [`Receiver`], |
737 | /// created by [`try_iter`]. |
738 | /// |
739 | /// [`None`] will be returned when there are no pending values remaining or |
740 | /// if the corresponding channel has hung up. |
741 | /// |
742 | /// This iterator will never block the caller in order to wait for data to |
743 | /// become available. Instead, it will return [`None`]. |
744 | /// |
745 | /// [`try_iter`]: Receiver::try_iter |
746 | /// |
747 | /// # Examples |
748 | /// |
749 | /// ```rust |
750 | /// #![feature(mpmc_channel)] |
751 | /// |
752 | /// use std::sync::mpmc::channel; |
753 | /// use std::thread; |
754 | /// use std::time::Duration; |
755 | /// |
756 | /// let (sender, receiver) = channel(); |
757 | /// |
758 | /// // Nothing is in the buffer yet |
759 | /// assert!(receiver.try_iter().next().is_none()); |
760 | /// println!("Nothing in the buffer..." ); |
761 | /// |
762 | /// thread::spawn(move || { |
763 | /// sender.send(1).unwrap(); |
764 | /// sender.send(2).unwrap(); |
765 | /// sender.send(3).unwrap(); |
766 | /// }); |
767 | /// |
768 | /// println!("Going to sleep..." ); |
769 | /// thread::sleep(Duration::from_secs(2)); // block for two seconds |
770 | /// |
771 | /// for x in receiver.try_iter() { |
772 | /// println!("Got: {x}" ); |
773 | /// } |
774 | /// ``` |
775 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
776 | #[derive (Debug)] |
777 | pub struct TryIter<'a, T: 'a> { |
778 | rx: &'a Receiver<T>, |
779 | } |
780 | |
781 | /// An owning iterator over messages on a [`Receiver`], |
782 | /// created by [`into_iter`]. |
783 | /// |
784 | /// This iterator will block whenever [`next`] |
785 | /// is called, waiting for a new message, and [`None`] will be |
786 | /// returned if the corresponding channel has hung up. |
787 | /// |
788 | /// [`into_iter`]: Receiver::into_iter |
789 | /// [`next`]: Iterator::next |
790 | /// |
791 | /// # Examples |
792 | /// |
793 | /// ```rust |
794 | /// #![feature(mpmc_channel)] |
795 | /// |
796 | /// use std::sync::mpmc::channel; |
797 | /// use std::thread; |
798 | /// |
799 | /// let (send, recv) = channel(); |
800 | /// |
801 | /// thread::spawn(move || { |
802 | /// send.send(1u8).unwrap(); |
803 | /// send.send(2u8).unwrap(); |
804 | /// send.send(3u8).unwrap(); |
805 | /// }); |
806 | /// |
807 | /// for x in recv.into_iter() { |
808 | /// println!("Got: {x}" ); |
809 | /// } |
810 | /// ``` |
811 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
812 | #[derive (Debug)] |
813 | pub struct IntoIter<T> { |
814 | rx: Receiver<T>, |
815 | } |
816 | |
817 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
818 | impl<'a, T> Iterator for Iter<'a, T> { |
819 | type Item = T; |
820 | |
821 | fn next(&mut self) -> Option<T> { |
822 | self.rx.recv().ok() |
823 | } |
824 | } |
825 | |
826 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
827 | impl<'a, T> Iterator for TryIter<'a, T> { |
828 | type Item = T; |
829 | |
830 | fn next(&mut self) -> Option<T> { |
831 | self.rx.try_recv().ok() |
832 | } |
833 | } |
834 | |
835 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
836 | impl<'a, T> IntoIterator for &'a Receiver<T> { |
837 | type Item = T; |
838 | type IntoIter = Iter<'a, T>; |
839 | |
840 | fn into_iter(self) -> Iter<'a, T> { |
841 | self.iter() |
842 | } |
843 | } |
844 | |
845 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
846 | impl<T> Iterator for IntoIter<T> { |
847 | type Item = T; |
848 | fn next(&mut self) -> Option<T> { |
849 | self.rx.recv().ok() |
850 | } |
851 | } |
852 | |
853 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
854 | impl<T> IntoIterator for Receiver<T> { |
855 | type Item = T; |
856 | type IntoIter = IntoIter<T>; |
857 | |
858 | fn into_iter(self) -> IntoIter<T> { |
859 | IntoIter { rx: self } |
860 | } |
861 | } |
862 | |
863 | /// Receiver flavors. |
864 | enum ReceiverFlavor<T> { |
865 | /// Bounded channel based on a preallocated array. |
866 | Array(counter::Receiver<array::Channel<T>>), |
867 | |
868 | /// Unbounded channel implemented as a linked list. |
869 | List(counter::Receiver<list::Channel<T>>), |
870 | |
871 | /// Zero-capacity channel. |
872 | Zero(counter::Receiver<zero::Channel<T>>), |
873 | } |
874 | |
875 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
876 | unsafe impl<T: Send> Send for Receiver<T> {} |
877 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
878 | unsafe impl<T: Send> Sync for Receiver<T> {} |
879 | |
880 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
881 | impl<T> UnwindSafe for Receiver<T> {} |
882 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
883 | impl<T> RefUnwindSafe for Receiver<T> {} |
884 | |
885 | impl<T> Receiver<T> { |
886 | /// Attempts to receive a message from the channel without blocking. |
887 | /// |
888 | /// This method will never block the caller in order to wait for data to |
889 | /// become available. Instead, this will always return immediately with a |
890 | /// possible option of pending data on the channel. |
891 | /// |
892 | /// If called on a zero-capacity channel, this method will receive a message only if there |
893 | /// happens to be a send operation on the other side of the channel at the same time. |
894 | /// |
895 | /// This is useful for a flavor of "optimistic check" before deciding to |
896 | /// block on a receiver. |
897 | /// |
898 | /// Compared with [`recv`], this function has two failure cases instead of one |
899 | /// (one for disconnection, one for an empty buffer). |
900 | /// |
901 | /// [`recv`]: Self::recv |
902 | /// |
903 | /// # Examples |
904 | /// |
905 | /// ```rust |
906 | /// #![feature(mpmc_channel)] |
907 | /// |
908 | /// use std::sync::mpmc::{Receiver, channel}; |
909 | /// |
910 | /// let (_, receiver): (_, Receiver<i32>) = channel(); |
911 | /// |
912 | /// assert!(receiver.try_recv().is_err()); |
913 | /// ``` |
914 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
915 | pub fn try_recv(&self) -> Result<T, TryRecvError> { |
916 | match &self.flavor { |
917 | ReceiverFlavor::Array(chan) => chan.try_recv(), |
918 | ReceiverFlavor::List(chan) => chan.try_recv(), |
919 | ReceiverFlavor::Zero(chan) => chan.try_recv(), |
920 | } |
921 | } |
922 | |
923 | /// Attempts to wait for a value on this receiver, returning an error if the |
924 | /// corresponding channel has hung up. |
925 | /// |
926 | /// This function will always block the current thread if there is no data |
927 | /// available and it's possible for more data to be sent (at least one sender |
928 | /// still exists). Once a message is sent to the corresponding [`Sender`], |
929 | /// this receiver will wake up and return that message. |
930 | /// |
931 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while |
932 | /// this call is blocking, this call will wake up and return [`Err`] to |
933 | /// indicate that no more messages can ever be received on this channel. |
934 | /// However, since channels are buffered, messages sent before the disconnect |
935 | /// will still be properly received. |
936 | /// |
937 | /// # Examples |
938 | /// |
939 | /// ``` |
940 | /// #![feature(mpmc_channel)] |
941 | /// |
942 | /// use std::sync::mpmc; |
943 | /// use std::thread; |
944 | /// |
945 | /// let (send, recv) = mpmc::channel(); |
946 | /// let handle = thread::spawn(move || { |
947 | /// send.send(1u8).unwrap(); |
948 | /// }); |
949 | /// |
950 | /// handle.join().unwrap(); |
951 | /// |
952 | /// assert_eq!(Ok(1), recv.recv()); |
953 | /// ``` |
954 | /// |
955 | /// Buffering behavior: |
956 | /// |
957 | /// ``` |
958 | /// #![feature(mpmc_channel)] |
959 | /// |
960 | /// use std::sync::mpmc; |
961 | /// use std::thread; |
962 | /// use std::sync::mpmc::RecvError; |
963 | /// |
964 | /// let (send, recv) = mpmc::channel(); |
965 | /// let handle = thread::spawn(move || { |
966 | /// send.send(1u8).unwrap(); |
967 | /// send.send(2).unwrap(); |
968 | /// send.send(3).unwrap(); |
969 | /// drop(send); |
970 | /// }); |
971 | /// |
972 | /// // wait for the thread to join so we ensure the sender is dropped |
973 | /// handle.join().unwrap(); |
974 | /// |
975 | /// assert_eq!(Ok(1), recv.recv()); |
976 | /// assert_eq!(Ok(2), recv.recv()); |
977 | /// assert_eq!(Ok(3), recv.recv()); |
978 | /// assert_eq!(Err(RecvError), recv.recv()); |
979 | /// ``` |
980 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
981 | pub fn recv(&self) -> Result<T, RecvError> { |
982 | match &self.flavor { |
983 | ReceiverFlavor::Array(chan) => chan.recv(None), |
984 | ReceiverFlavor::List(chan) => chan.recv(None), |
985 | ReceiverFlavor::Zero(chan) => chan.recv(None), |
986 | } |
987 | .map_err(|_| RecvError) |
988 | } |
989 | |
990 | /// Attempts to wait for a value on this receiver, returning an error if the |
991 | /// corresponding channel has hung up, or if it waits more than `timeout`. |
992 | /// |
993 | /// This function will always block the current thread if there is no data |
994 | /// available and it's possible for more data to be sent (at least one sender |
995 | /// still exists). Once a message is sent to the corresponding [`Sender`], |
996 | /// this receiver will wake up and return that message. |
997 | /// |
998 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while |
999 | /// this call is blocking, this call will wake up and return [`Err`] to |
1000 | /// indicate that no more messages can ever be received on this channel. |
1001 | /// However, since channels are buffered, messages sent before the disconnect |
1002 | /// will still be properly received. |
1003 | /// |
1004 | /// # Examples |
1005 | /// |
1006 | /// Successfully receiving value before encountering timeout: |
1007 | /// |
1008 | /// ```no_run |
1009 | /// #![feature(mpmc_channel)] |
1010 | /// |
1011 | /// use std::thread; |
1012 | /// use std::time::Duration; |
1013 | /// use std::sync::mpmc; |
1014 | /// |
1015 | /// let (send, recv) = mpmc::channel(); |
1016 | /// |
1017 | /// thread::spawn(move || { |
1018 | /// send.send('a' ).unwrap(); |
1019 | /// }); |
1020 | /// |
1021 | /// assert_eq!( |
1022 | /// recv.recv_timeout(Duration::from_millis(400)), |
1023 | /// Ok('a' ) |
1024 | /// ); |
1025 | /// ``` |
1026 | /// |
1027 | /// Receiving an error upon reaching timeout: |
1028 | /// |
1029 | /// ```no_run |
1030 | /// #![feature(mpmc_channel)] |
1031 | /// |
1032 | /// use std::thread; |
1033 | /// use std::time::Duration; |
1034 | /// use std::sync::mpmc; |
1035 | /// |
1036 | /// let (send, recv) = mpmc::channel(); |
1037 | /// |
1038 | /// thread::spawn(move || { |
1039 | /// thread::sleep(Duration::from_millis(800)); |
1040 | /// send.send('a' ).unwrap(); |
1041 | /// }); |
1042 | /// |
1043 | /// assert_eq!( |
1044 | /// recv.recv_timeout(Duration::from_millis(400)), |
1045 | /// Err(mpmc::RecvTimeoutError::Timeout) |
1046 | /// ); |
1047 | /// ``` |
1048 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1049 | pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { |
1050 | match Instant::now().checked_add(timeout) { |
1051 | Some(deadline) => self.recv_deadline(deadline), |
1052 | // So far in the future that it's practically the same as waiting indefinitely. |
1053 | None => self.recv().map_err(RecvTimeoutError::from), |
1054 | } |
1055 | } |
1056 | |
1057 | /// Attempts to wait for a value on this receiver, returning an error if the |
1058 | /// corresponding channel has hung up, or if `deadline` is reached. |
1059 | /// |
1060 | /// This function will always block the current thread if there is no data |
1061 | /// available and it's possible for more data to be sent. Once a message is |
1062 | /// sent to the corresponding [`Sender`], then this receiver will wake up |
1063 | /// and return that message. |
1064 | /// |
1065 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while |
1066 | /// this call is blocking, this call will wake up and return [`Err`] to |
1067 | /// indicate that no more messages can ever be received on this channel. |
1068 | /// However, since channels are buffered, messages sent before the disconnect |
1069 | /// will still be properly received. |
1070 | /// |
1071 | /// # Examples |
1072 | /// |
1073 | /// Successfully receiving value before reaching deadline: |
1074 | /// |
1075 | /// ```no_run |
1076 | /// #![feature(mpmc_channel)] |
1077 | /// |
1078 | /// use std::thread; |
1079 | /// use std::time::{Duration, Instant}; |
1080 | /// use std::sync::mpmc; |
1081 | /// |
1082 | /// let (send, recv) = mpmc::channel(); |
1083 | /// |
1084 | /// thread::spawn(move || { |
1085 | /// send.send('a' ).unwrap(); |
1086 | /// }); |
1087 | /// |
1088 | /// assert_eq!( |
1089 | /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), |
1090 | /// Ok('a' ) |
1091 | /// ); |
1092 | /// ``` |
1093 | /// |
1094 | /// Receiving an error upon reaching deadline: |
1095 | /// |
1096 | /// ```no_run |
1097 | /// #![feature(mpmc_channel)] |
1098 | /// |
1099 | /// use std::thread; |
1100 | /// use std::time::{Duration, Instant}; |
1101 | /// use std::sync::mpmc; |
1102 | /// |
1103 | /// let (send, recv) = mpmc::channel(); |
1104 | /// |
1105 | /// thread::spawn(move || { |
1106 | /// thread::sleep(Duration::from_millis(800)); |
1107 | /// send.send('a' ).unwrap(); |
1108 | /// }); |
1109 | /// |
1110 | /// assert_eq!( |
1111 | /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), |
1112 | /// Err(mpmc::RecvTimeoutError::Timeout) |
1113 | /// ); |
1114 | /// ``` |
1115 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1116 | pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { |
1117 | match &self.flavor { |
1118 | ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)), |
1119 | ReceiverFlavor::List(chan) => chan.recv(Some(deadline)), |
1120 | ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)), |
1121 | } |
1122 | } |
1123 | |
1124 | /// Returns an iterator that will attempt to yield all pending values. |
1125 | /// It will return `None` if there are no more pending values or if the |
1126 | /// channel has hung up. The iterator will never [`panic!`] or block the |
1127 | /// user by waiting for values. |
1128 | /// |
1129 | /// # Examples |
1130 | /// |
1131 | /// ```no_run |
1132 | /// #![feature(mpmc_channel)] |
1133 | /// |
1134 | /// use std::sync::mpmc::channel; |
1135 | /// use std::thread; |
1136 | /// use std::time::Duration; |
1137 | /// |
1138 | /// let (sender, receiver) = channel(); |
1139 | /// |
1140 | /// // nothing is in the buffer yet |
1141 | /// assert!(receiver.try_iter().next().is_none()); |
1142 | /// |
1143 | /// thread::spawn(move || { |
1144 | /// thread::sleep(Duration::from_secs(1)); |
1145 | /// sender.send(1).unwrap(); |
1146 | /// sender.send(2).unwrap(); |
1147 | /// sender.send(3).unwrap(); |
1148 | /// }); |
1149 | /// |
1150 | /// // nothing is in the buffer yet |
1151 | /// assert!(receiver.try_iter().next().is_none()); |
1152 | /// |
1153 | /// // block for two seconds |
1154 | /// thread::sleep(Duration::from_secs(2)); |
1155 | /// |
1156 | /// let mut iter = receiver.try_iter(); |
1157 | /// assert_eq!(iter.next(), Some(1)); |
1158 | /// assert_eq!(iter.next(), Some(2)); |
1159 | /// assert_eq!(iter.next(), Some(3)); |
1160 | /// assert_eq!(iter.next(), None); |
1161 | /// ``` |
1162 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1163 | pub fn try_iter(&self) -> TryIter<'_, T> { |
1164 | TryIter { rx: self } |
1165 | } |
1166 | } |
1167 | |
1168 | impl<T> Receiver<T> { |
1169 | /// Returns `true` if the channel is empty. |
1170 | /// |
1171 | /// Note: Zero-capacity channels are always empty. |
1172 | /// |
1173 | /// # Examples |
1174 | /// |
1175 | /// ``` |
1176 | /// #![feature(mpmc_channel)] |
1177 | /// |
1178 | /// use std::sync::mpmc; |
1179 | /// use std::thread; |
1180 | /// |
1181 | /// let (send, recv) = mpmc::channel(); |
1182 | /// |
1183 | /// assert!(recv.is_empty()); |
1184 | /// |
1185 | /// let handle = thread::spawn(move || { |
1186 | /// send.send(1u8).unwrap(); |
1187 | /// }); |
1188 | /// |
1189 | /// handle.join().unwrap(); |
1190 | /// |
1191 | /// assert!(!recv.is_empty()); |
1192 | /// ``` |
1193 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1194 | pub fn is_empty(&self) -> bool { |
1195 | match &self.flavor { |
1196 | ReceiverFlavor::Array(chan) => chan.is_empty(), |
1197 | ReceiverFlavor::List(chan) => chan.is_empty(), |
1198 | ReceiverFlavor::Zero(chan) => chan.is_empty(), |
1199 | } |
1200 | } |
1201 | |
1202 | /// Returns `true` if the channel is full. |
1203 | /// |
1204 | /// Note: Zero-capacity channels are always full. |
1205 | /// |
1206 | /// # Examples |
1207 | /// |
1208 | /// ``` |
1209 | /// #![feature(mpmc_channel)] |
1210 | /// |
1211 | /// use std::sync::mpmc; |
1212 | /// use std::thread; |
1213 | /// |
1214 | /// let (send, recv) = mpmc::sync_channel(1); |
1215 | /// |
1216 | /// assert!(!recv.is_full()); |
1217 | /// |
1218 | /// let handle = thread::spawn(move || { |
1219 | /// send.send(1u8).unwrap(); |
1220 | /// }); |
1221 | /// |
1222 | /// handle.join().unwrap(); |
1223 | /// |
1224 | /// assert!(recv.is_full()); |
1225 | /// ``` |
1226 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1227 | pub fn is_full(&self) -> bool { |
1228 | match &self.flavor { |
1229 | ReceiverFlavor::Array(chan) => chan.is_full(), |
1230 | ReceiverFlavor::List(chan) => chan.is_full(), |
1231 | ReceiverFlavor::Zero(chan) => chan.is_full(), |
1232 | } |
1233 | } |
1234 | |
1235 | /// Returns the number of messages in the channel. |
1236 | /// |
1237 | /// # Examples |
1238 | /// |
1239 | /// ``` |
1240 | /// #![feature(mpmc_channel)] |
1241 | /// |
1242 | /// use std::sync::mpmc; |
1243 | /// use std::thread; |
1244 | /// |
1245 | /// let (send, recv) = mpmc::channel(); |
1246 | /// |
1247 | /// assert_eq!(recv.len(), 0); |
1248 | /// |
1249 | /// let handle = thread::spawn(move || { |
1250 | /// send.send(1u8).unwrap(); |
1251 | /// }); |
1252 | /// |
1253 | /// handle.join().unwrap(); |
1254 | /// |
1255 | /// assert_eq!(recv.len(), 1); |
1256 | /// ``` |
1257 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1258 | pub fn len(&self) -> usize { |
1259 | match &self.flavor { |
1260 | ReceiverFlavor::Array(chan) => chan.len(), |
1261 | ReceiverFlavor::List(chan) => chan.len(), |
1262 | ReceiverFlavor::Zero(chan) => chan.len(), |
1263 | } |
1264 | } |
1265 | |
1266 | /// If the channel is bounded, returns its capacity. |
1267 | /// |
1268 | /// # Examples |
1269 | /// |
1270 | /// ``` |
1271 | /// #![feature(mpmc_channel)] |
1272 | /// |
1273 | /// use std::sync::mpmc; |
1274 | /// use std::thread; |
1275 | /// |
1276 | /// let (send, recv) = mpmc::sync_channel(3); |
1277 | /// |
1278 | /// assert_eq!(recv.capacity(), Some(3)); |
1279 | /// |
1280 | /// let handle = thread::spawn(move || { |
1281 | /// send.send(1u8).unwrap(); |
1282 | /// }); |
1283 | /// |
1284 | /// handle.join().unwrap(); |
1285 | /// |
1286 | /// assert_eq!(recv.capacity(), Some(3)); |
1287 | /// ``` |
1288 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1289 | pub fn capacity(&self) -> Option<usize> { |
1290 | match &self.flavor { |
1291 | ReceiverFlavor::Array(chan) => chan.capacity(), |
1292 | ReceiverFlavor::List(chan) => chan.capacity(), |
1293 | ReceiverFlavor::Zero(chan) => chan.capacity(), |
1294 | } |
1295 | } |
1296 | |
1297 | /// Returns `true` if receivers belong to the same channel. |
1298 | /// |
1299 | /// # Examples |
1300 | /// |
1301 | /// ``` |
1302 | /// #![feature(mpmc_channel)] |
1303 | /// |
1304 | /// use std::sync::mpmc; |
1305 | /// |
1306 | /// let (_, rx1) = mpmc::channel::<i32>(); |
1307 | /// let (_, rx2) = mpmc::channel::<i32>(); |
1308 | /// |
1309 | /// assert!(rx1.same_channel(&rx1)); |
1310 | /// assert!(!rx1.same_channel(&rx2)); |
1311 | /// ``` |
1312 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1313 | pub fn same_channel(&self, other: &Receiver<T>) -> bool { |
1314 | match (&self.flavor, &other.flavor) { |
1315 | (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b, |
1316 | (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b, |
1317 | (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b, |
1318 | _ => false, |
1319 | } |
1320 | } |
1321 | |
1322 | /// Returns an iterator that will block waiting for messages, but never |
1323 | /// [`panic!`]. It will return [`None`] when the channel has hung up. |
1324 | /// |
1325 | /// # Examples |
1326 | /// |
1327 | /// ```rust |
1328 | /// #![feature(mpmc_channel)] |
1329 | /// |
1330 | /// use std::sync::mpmc::channel; |
1331 | /// use std::thread; |
1332 | /// |
1333 | /// let (send, recv) = channel(); |
1334 | /// |
1335 | /// thread::spawn(move || { |
1336 | /// send.send(1).unwrap(); |
1337 | /// send.send(2).unwrap(); |
1338 | /// send.send(3).unwrap(); |
1339 | /// }); |
1340 | /// |
1341 | /// let mut iter = recv.iter(); |
1342 | /// assert_eq!(iter.next(), Some(1)); |
1343 | /// assert_eq!(iter.next(), Some(2)); |
1344 | /// assert_eq!(iter.next(), Some(3)); |
1345 | /// assert_eq!(iter.next(), None); |
1346 | /// ``` |
1347 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1348 | pub fn iter(&self) -> Iter<'_, T> { |
1349 | Iter { rx: self } |
1350 | } |
1351 | } |
1352 | |
1353 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1354 | impl<T> Drop for Receiver<T> { |
1355 | fn drop(&mut self) { |
1356 | unsafe { |
1357 | match &self.flavor { |
1358 | ReceiverFlavor::Array(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect_receivers()), |
1359 | ReceiverFlavor::List(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect_receivers()), |
1360 | ReceiverFlavor::Zero(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect()), |
1361 | } |
1362 | } |
1363 | } |
1364 | } |
1365 | |
1366 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1367 | impl<T> Clone for Receiver<T> { |
1368 | fn clone(&self) -> Self { |
1369 | let flavor: ReceiverFlavor = match &self.flavor { |
1370 | ReceiverFlavor::Array(chan: &Receiver>) => ReceiverFlavor::Array(chan.acquire()), |
1371 | ReceiverFlavor::List(chan: &Receiver>) => ReceiverFlavor::List(chan.acquire()), |
1372 | ReceiverFlavor::Zero(chan: &Receiver>) => ReceiverFlavor::Zero(chan.acquire()), |
1373 | }; |
1374 | |
1375 | Receiver { flavor } |
1376 | } |
1377 | } |
1378 | |
1379 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
1380 | impl<T> fmt::Debug for Receiver<T> { |
1381 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1382 | f.pad("Receiver { .. }" ) |
1383 | } |
1384 | } |
1385 | |
1386 | #[cfg (test)] |
1387 | mod tests; |
1388 | |