| 1 | //! Multi-producer, multi-consumer FIFO queue communication primitives. |
| 2 | //! |
| 3 | //! This module provides message-based communication over channels, concretely |
| 4 | //! defined by two types: |
| 5 | //! |
| 6 | //! * [`Sender`] |
| 7 | //! * [`Receiver`] |
| 8 | //! |
| 9 | //! [`Sender`]s are used to send data to a set of [`Receiver`]s. Both |
| 10 | //! sender and receiver are cloneable (multi-producer) such that many threads can send |
| 11 | //! simultaneously to receivers (multi-consumer). |
| 12 | //! |
| 13 | //! These channels come in two flavors: |
| 14 | //! |
| 15 | //! 1. An asynchronous, infinitely buffered channel. The [`channel`] function |
| 16 | //! will return a `(Sender, Receiver)` tuple where all sends will be |
| 17 | //! **asynchronous** (they never block). The channel conceptually has an |
| 18 | //! infinite buffer. |
| 19 | //! |
| 20 | //! 2. A synchronous, bounded channel. The [`sync_channel`] function will |
| 21 | //! return a `(Sender, Receiver)` tuple where the storage for pending |
| 22 | //! messages is a pre-allocated buffer of a fixed size. All sends will be |
| 23 | //! **synchronous** by blocking until there is buffer space available. Note |
| 24 | //! that a bound of 0 is allowed, causing the channel to become a "rendezvous" |
| 25 | //! channel where each sender atomically hands off a message to a receiver. |
| 26 | //! |
| 27 | //! [`send`]: Sender::send |
| 28 | //! |
| 29 | //! ## Disconnection |
| 30 | //! |
| 31 | //! The send and receive operations on channels will all return a [`Result`] |
| 32 | //! indicating whether the operation succeeded or not. An unsuccessful operation |
| 33 | //! is normally indicative of the other half of a channel having "hung up" by |
| 34 | //! being dropped in its corresponding thread. |
| 35 | //! |
| 36 | //! Once half of a channel has been deallocated, most operations can no longer |
| 37 | //! continue to make progress, so [`Err`] will be returned. Many applications |
| 38 | //! will continue to [`unwrap`] the results returned from this module, |
| 39 | //! instigating a propagation of failure among threads if one unexpectedly dies. |
| 40 | //! |
| 41 | //! [`unwrap`]: Result::unwrap |
| 42 | //! |
| 43 | //! # Examples |
| 44 | //! |
| 45 | //! Simple usage: |
| 46 | //! |
| 47 | //! ``` |
| 48 | //! #![feature(mpmc_channel)] |
| 49 | //! |
| 50 | //! use std::thread; |
| 51 | //! use std::sync::mpmc::channel; |
| 52 | //! |
| 53 | //! // Create a simple streaming channel |
| 54 | //! let (tx, rx) = channel(); |
| 55 | //! thread::spawn(move || { |
| 56 | //! tx.send(10).unwrap(); |
| 57 | //! }); |
| 58 | //! assert_eq!(rx.recv().unwrap(), 10); |
| 59 | //! ``` |
| 60 | //! |
| 61 | //! Shared usage: |
| 62 | //! |
| 63 | //! ``` |
| 64 | //! #![feature(mpmc_channel)] |
| 65 | //! |
| 66 | //! use std::thread; |
| 67 | //! use std::sync::mpmc::channel; |
| 68 | //! |
| 69 | //! thread::scope(|s| { |
| 70 | //! // Create a shared channel that can be sent along from many threads |
| 71 | //! // where tx is the sending half (tx for transmission), and rx is the receiving |
| 72 | //! // half (rx for receiving). |
| 73 | //! let (tx, rx) = channel(); |
| 74 | //! for i in 0..10 { |
| 75 | //! let tx = tx.clone(); |
| 76 | //! s.spawn(move || { |
| 77 | //! tx.send(i).unwrap(); |
| 78 | //! }); |
| 79 | //! } |
| 80 | //! |
| 81 | //! for _ in 0..5 { |
| 82 | //! let rx1 = rx.clone(); |
| 83 | //! let rx2 = rx.clone(); |
| 84 | //! s.spawn(move || { |
| 85 | //! let j = rx1.recv().unwrap(); |
| 86 | //! assert!(0 <= j && j < 10); |
| 87 | //! }); |
| 88 | //! s.spawn(move || { |
| 89 | //! let j = rx2.recv().unwrap(); |
| 90 | //! assert!(0 <= j && j < 10); |
| 91 | //! }); |
| 92 | //! } |
| 93 | //! }) |
| 94 | //! ``` |
| 95 | //! |
| 96 | //! Propagating panics: |
| 97 | //! |
| 98 | //! ``` |
| 99 | //! #![feature(mpmc_channel)] |
| 100 | //! |
| 101 | //! use std::sync::mpmc::channel; |
| 102 | //! |
| 103 | //! // The call to recv() will return an error because the channel has already |
| 104 | //! // hung up (or been deallocated) |
| 105 | //! let (tx, rx) = channel::<i32>(); |
| 106 | //! drop(tx); |
| 107 | //! assert!(rx.recv().is_err()); |
| 108 | //! ``` |
| 109 | |
| 110 | // This module is used as the implementation for the channels in `sync::mpsc`. |
| 111 | // The implementation comes from the crossbeam-channel crate: |
| 112 | // |
| 113 | // Copyright (c) 2019 The Crossbeam Project Developers |
| 114 | // |
| 115 | // Permission is hereby granted, free of charge, to any |
| 116 | // person obtaining a copy of this software and associated |
| 117 | // documentation files (the "Software"), to deal in the |
| 118 | // Software without restriction, including without |
| 119 | // limitation the rights to use, copy, modify, merge, |
| 120 | // publish, distribute, sublicense, and/or sell copies of |
| 121 | // the Software, and to permit persons to whom the Software |
| 122 | // is furnished to do so, subject to the following |
| 123 | // conditions: |
| 124 | // |
| 125 | // The above copyright notice and this permission notice |
| 126 | // shall be included in all copies or substantial portions |
| 127 | // of the Software. |
| 128 | // |
| 129 | // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF |
| 130 | // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED |
| 131 | // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A |
| 132 | // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 133 | // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY |
| 134 | // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |
| 135 | // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR |
| 136 | // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER |
| 137 | // DEALINGS IN THE SOFTWARE. |
| 138 | |
| 139 | mod array; |
| 140 | mod context; |
| 141 | mod counter; |
| 142 | mod error; |
| 143 | mod list; |
| 144 | mod select; |
| 145 | mod utils; |
| 146 | mod waker; |
| 147 | mod zero; |
| 148 | |
| 149 | pub use error::*; |
| 150 | |
| 151 | use crate::fmt; |
| 152 | use crate::panic::{RefUnwindSafe, UnwindSafe}; |
| 153 | use crate::time::{Duration, Instant}; |
| 154 | |
| 155 | /// Creates a new asynchronous channel, returning the sender/receiver halves. |
| 156 | /// |
| 157 | /// All data sent on the [`Sender`] will become available on the [`Receiver`] in |
| 158 | /// the same order as it was sent, and no [`send`] will block the calling thread |
| 159 | /// (this channel has an "infinite buffer", unlike [`sync_channel`], which will |
| 160 | /// block after its buffer limit is reached). [`recv`] will block until a message |
| 161 | /// is available while there is at least one [`Sender`] alive (including clones). |
| 162 | /// |
| 163 | /// The [`Sender`] can be cloned to [`send`] to the same channel multiple times. |
| 164 | /// The [`Receiver`] also can be cloned to have multi receivers. |
| 165 | /// |
| 166 | /// If the [`Receiver`] is disconnected while trying to [`send`] with the |
| 167 | /// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the |
| 168 | /// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will |
| 169 | /// return a [`RecvError`]. |
| 170 | /// |
| 171 | /// [`send`]: Sender::send |
| 172 | /// [`recv`]: Receiver::recv |
| 173 | /// |
| 174 | /// # Examples |
| 175 | /// |
| 176 | /// ``` |
| 177 | /// #![feature(mpmc_channel)] |
| 178 | /// |
| 179 | /// use std::sync::mpmc::channel; |
| 180 | /// use std::thread; |
| 181 | /// |
| 182 | /// let (sender, receiver) = channel(); |
| 183 | /// |
| 184 | /// // Spawn off an expensive computation |
| 185 | /// thread::spawn(move || { |
| 186 | /// # fn expensive_computation() {} |
| 187 | /// sender.send(expensive_computation()).unwrap(); |
| 188 | /// }); |
| 189 | /// |
| 190 | /// // Do some useful work for awhile |
| 191 | /// |
| 192 | /// // Let's see what that answer was |
| 193 | /// println!("{:?}" , receiver.recv().unwrap()); |
| 194 | /// ``` |
| 195 | #[must_use ] |
| 196 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 197 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
| 198 | let (s: Sender>, r: Receiver>) = counter::new(chan:list::Channel::new()); |
| 199 | let s: Sender = Sender { flavor: SenderFlavor::List(s) }; |
| 200 | let r: Receiver = Receiver { flavor: ReceiverFlavor::List(r) }; |
| 201 | (s, r) |
| 202 | } |
| 203 | |
| 204 | /// Creates a new synchronous, bounded channel. |
| 205 | /// |
| 206 | /// All data sent on the [`Sender`] will become available on the [`Receiver`] |
| 207 | /// in the same order as it was sent. Like asynchronous [`channel`]s, the |
| 208 | /// [`Receiver`] will block until a message becomes available. `sync_channel` |
| 209 | /// differs greatly in the semantics of the sender, however. |
| 210 | /// |
| 211 | /// This channel has an internal buffer on which messages will be queued. |
| 212 | /// `bound` specifies the buffer size. When the internal buffer becomes full, |
| 213 | /// future sends will *block* waiting for the buffer to open up. Note that a |
| 214 | /// buffer size of 0 is valid, in which case this becomes "rendezvous channel" |
| 215 | /// where each [`send`] will not return until a [`recv`] is paired with it. |
| 216 | /// |
| 217 | /// The [`Sender`] can be cloned to [`send`] to the same channel multiple |
| 218 | /// times. The [`Receiver`] also can be cloned to have multi receivers. |
| 219 | /// |
| 220 | /// Like asynchronous channels, if the [`Receiver`] is disconnected while trying |
| 221 | /// to [`send`] with the [`Sender`], the [`send`] method will return a |
| 222 | /// [`SendError`]. Similarly, If the [`Sender`] is disconnected while trying |
| 223 | /// to [`recv`], the [`recv`] method will return a [`RecvError`]. |
| 224 | /// |
| 225 | /// [`send`]: Sender::send |
| 226 | /// [`recv`]: Receiver::recv |
| 227 | /// |
| 228 | /// # Examples |
| 229 | /// |
| 230 | /// ``` |
| 231 | /// use std::sync::mpsc::sync_channel; |
| 232 | /// use std::thread; |
| 233 | /// |
| 234 | /// let (sender, receiver) = sync_channel(1); |
| 235 | /// |
| 236 | /// // this returns immediately |
| 237 | /// sender.send(1).unwrap(); |
| 238 | /// |
| 239 | /// thread::spawn(move || { |
| 240 | /// // this will block until the previous message has been received |
| 241 | /// sender.send(2).unwrap(); |
| 242 | /// }); |
| 243 | /// |
| 244 | /// assert_eq!(receiver.recv().unwrap(), 1); |
| 245 | /// assert_eq!(receiver.recv().unwrap(), 2); |
| 246 | /// ``` |
| 247 | #[must_use ] |
| 248 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 249 | pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) { |
| 250 | if cap == 0 { |
| 251 | let (s: Sender>, r: Receiver>) = counter::new(chan:zero::Channel::new()); |
| 252 | let s: Sender = Sender { flavor: SenderFlavor::Zero(s) }; |
| 253 | let r: Receiver = Receiver { flavor: ReceiverFlavor::Zero(r) }; |
| 254 | (s, r) |
| 255 | } else { |
| 256 | let (s: Sender>, r: Receiver>) = counter::new(chan:array::Channel::with_capacity(cap)); |
| 257 | let s: Sender = Sender { flavor: SenderFlavor::Array(s) }; |
| 258 | let r: Receiver = Receiver { flavor: ReceiverFlavor::Array(r) }; |
| 259 | (s, r) |
| 260 | } |
| 261 | } |
| 262 | |
| 263 | /// The sending-half of Rust's synchronous [`channel`] type. |
| 264 | /// |
| 265 | /// Messages can be sent through this channel with [`send`]. |
| 266 | /// |
| 267 | /// Note: all senders (the original and its clones) need to be dropped for the receiver |
| 268 | /// to stop blocking to receive messages with [`Receiver::recv`]. |
| 269 | /// |
| 270 | /// [`send`]: Sender::send |
| 271 | /// |
| 272 | /// # Examples |
| 273 | /// |
| 274 | /// ```rust |
| 275 | /// #![feature(mpmc_channel)] |
| 276 | /// |
| 277 | /// use std::sync::mpmc::channel; |
| 278 | /// use std::thread; |
| 279 | /// |
| 280 | /// let (sender, receiver) = channel(); |
| 281 | /// let sender2 = sender.clone(); |
| 282 | /// |
| 283 | /// // First thread owns sender |
| 284 | /// thread::spawn(move || { |
| 285 | /// sender.send(1).unwrap(); |
| 286 | /// }); |
| 287 | /// |
| 288 | /// // Second thread owns sender2 |
| 289 | /// thread::spawn(move || { |
| 290 | /// sender2.send(2).unwrap(); |
| 291 | /// }); |
| 292 | /// |
| 293 | /// let msg = receiver.recv().unwrap(); |
| 294 | /// let msg2 = receiver.recv().unwrap(); |
| 295 | /// |
| 296 | /// assert_eq!(3, msg + msg2); |
| 297 | /// ``` |
| 298 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 299 | pub struct Sender<T> { |
| 300 | flavor: SenderFlavor<T>, |
| 301 | } |
| 302 | |
| 303 | /// Sender flavors. |
| 304 | enum SenderFlavor<T> { |
| 305 | /// Bounded channel based on a preallocated array. |
| 306 | Array(counter::Sender<array::Channel<T>>), |
| 307 | |
| 308 | /// Unbounded channel implemented as a linked list. |
| 309 | List(counter::Sender<list::Channel<T>>), |
| 310 | |
| 311 | /// Zero-capacity channel. |
| 312 | Zero(counter::Sender<zero::Channel<T>>), |
| 313 | } |
| 314 | |
| 315 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 316 | unsafe impl<T: Send> Send for Sender<T> {} |
| 317 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 318 | unsafe impl<T: Send> Sync for Sender<T> {} |
| 319 | |
| 320 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 321 | impl<T> UnwindSafe for Sender<T> {} |
| 322 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 323 | impl<T> RefUnwindSafe for Sender<T> {} |
| 324 | |
| 325 | impl<T> Sender<T> { |
| 326 | /// Attempts to send a message into the channel without blocking. |
| 327 | /// |
| 328 | /// This method will either send a message into the channel immediately or return an error if |
| 329 | /// the channel is full or disconnected. The returned error contains the original message. |
| 330 | /// |
| 331 | /// If called on a zero-capacity channel, this method will send the message only if there |
| 332 | /// happens to be a receive operation on the other side of the channel at the same time. |
| 333 | /// |
| 334 | /// # Examples |
| 335 | /// |
| 336 | /// ```rust |
| 337 | /// #![feature(mpmc_channel)] |
| 338 | /// |
| 339 | /// use std::sync::mpmc::{channel, Receiver, Sender}; |
| 340 | /// |
| 341 | /// let (sender, _receiver): (Sender<i32>, Receiver<i32>) = channel(); |
| 342 | /// |
| 343 | /// assert!(sender.try_send(1).is_ok()); |
| 344 | /// ``` |
| 345 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 346 | pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
| 347 | match &self.flavor { |
| 348 | SenderFlavor::Array(chan) => chan.try_send(msg), |
| 349 | SenderFlavor::List(chan) => chan.try_send(msg), |
| 350 | SenderFlavor::Zero(chan) => chan.try_send(msg), |
| 351 | } |
| 352 | } |
| 353 | |
| 354 | /// Attempts to send a value on this channel, returning it back if it could |
| 355 | /// not be sent. |
| 356 | /// |
| 357 | /// A successful send occurs when it is determined that the other end of |
| 358 | /// the channel has not hung up already. An unsuccessful send would be one |
| 359 | /// where the corresponding receiver has already been deallocated. Note |
| 360 | /// that a return value of [`Err`] means that the data will never be |
| 361 | /// received, but a return value of [`Ok`] does *not* mean that the data |
| 362 | /// will be received. It is possible for the corresponding receiver to |
| 363 | /// hang up immediately after this function returns [`Ok`]. However, if |
| 364 | /// the channel is zero-capacity, it acts as a rendezvous channel and a |
| 365 | /// return value of [`Ok`] means that the data has been received. |
| 366 | /// |
| 367 | /// If the channel is full and not disconnected, this call will block until |
| 368 | /// the send operation can proceed. If the channel becomes disconnected, |
| 369 | /// this call will wake up and return an error. The returned error contains |
| 370 | /// the original message. |
| 371 | /// |
| 372 | /// If called on a zero-capacity channel, this method will wait for a receive |
| 373 | /// operation to appear on the other side of the channel. |
| 374 | /// |
| 375 | /// # Examples |
| 376 | /// |
| 377 | /// ``` |
| 378 | /// #![feature(mpmc_channel)] |
| 379 | /// |
| 380 | /// use std::sync::mpmc::channel; |
| 381 | /// |
| 382 | /// let (tx, rx) = channel(); |
| 383 | /// |
| 384 | /// // This send is always successful |
| 385 | /// tx.send(1).unwrap(); |
| 386 | /// |
| 387 | /// // This send will fail because the receiver is gone |
| 388 | /// drop(rx); |
| 389 | /// assert!(tx.send(1).is_err()); |
| 390 | /// ``` |
| 391 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 392 | pub fn send(&self, msg: T) -> Result<(), SendError<T>> { |
| 393 | match &self.flavor { |
| 394 | SenderFlavor::Array(chan) => chan.send(msg, None), |
| 395 | SenderFlavor::List(chan) => chan.send(msg, None), |
| 396 | SenderFlavor::Zero(chan) => chan.send(msg, None), |
| 397 | } |
| 398 | .map_err(|err| match err { |
| 399 | SendTimeoutError::Disconnected(msg) => SendError(msg), |
| 400 | SendTimeoutError::Timeout(_) => unreachable!(), |
| 401 | }) |
| 402 | } |
| 403 | } |
| 404 | |
| 405 | impl<T> Sender<T> { |
| 406 | /// Waits for a message to be sent into the channel, but only for a limited time. |
| 407 | /// |
| 408 | /// If the channel is full and not disconnected, this call will block until the send operation |
| 409 | /// can proceed or the operation times out. If the channel becomes disconnected, this call will |
| 410 | /// wake up and return an error. The returned error contains the original message. |
| 411 | /// |
| 412 | /// If called on a zero-capacity channel, this method will wait for a receive operation to |
| 413 | /// appear on the other side of the channel. |
| 414 | /// |
| 415 | /// # Examples |
| 416 | /// |
| 417 | /// ``` |
| 418 | /// #![feature(mpmc_channel)] |
| 419 | /// |
| 420 | /// use std::sync::mpmc::channel; |
| 421 | /// use std::time::Duration; |
| 422 | /// |
| 423 | /// let (tx, rx) = channel(); |
| 424 | /// |
| 425 | /// tx.send_timeout(1, Duration::from_millis(400)).unwrap(); |
| 426 | /// ``` |
| 427 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 428 | pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> { |
| 429 | match Instant::now().checked_add(timeout) { |
| 430 | Some(deadline) => self.send_deadline(msg, deadline), |
| 431 | // So far in the future that it's practically the same as waiting indefinitely. |
| 432 | None => self.send(msg).map_err(SendTimeoutError::from), |
| 433 | } |
| 434 | } |
| 435 | |
| 436 | /// Waits for a message to be sent into the channel, but only until a given deadline. |
| 437 | /// |
| 438 | /// If the channel is full and not disconnected, this call will block until the send operation |
| 439 | /// can proceed or the operation times out. If the channel becomes disconnected, this call will |
| 440 | /// wake up and return an error. The returned error contains the original message. |
| 441 | /// |
| 442 | /// If called on a zero-capacity channel, this method will wait for a receive operation to |
| 443 | /// appear on the other side of the channel. |
| 444 | /// |
| 445 | /// # Examples |
| 446 | /// |
| 447 | /// ``` |
| 448 | /// #![feature(mpmc_channel)] |
| 449 | /// |
| 450 | /// use std::sync::mpmc::channel; |
| 451 | /// use std::time::{Duration, Instant}; |
| 452 | /// |
| 453 | /// let (tx, rx) = channel(); |
| 454 | /// |
| 455 | /// let t = Instant::now() + Duration::from_millis(400); |
| 456 | /// tx.send_deadline(1, t).unwrap(); |
| 457 | /// ``` |
| 458 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 459 | pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> { |
| 460 | match &self.flavor { |
| 461 | SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), |
| 462 | SenderFlavor::List(chan) => chan.send(msg, Some(deadline)), |
| 463 | SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)), |
| 464 | } |
| 465 | } |
| 466 | |
| 467 | /// Returns `true` if the channel is empty. |
| 468 | /// |
| 469 | /// Note: Zero-capacity channels are always empty. |
| 470 | /// |
| 471 | /// # Examples |
| 472 | /// |
| 473 | /// ``` |
| 474 | /// #![feature(mpmc_channel)] |
| 475 | /// |
| 476 | /// use std::sync::mpmc; |
| 477 | /// use std::thread; |
| 478 | /// |
| 479 | /// let (send, _recv) = mpmc::channel(); |
| 480 | /// |
| 481 | /// let tx1 = send.clone(); |
| 482 | /// let tx2 = send.clone(); |
| 483 | /// |
| 484 | /// assert!(tx1.is_empty()); |
| 485 | /// |
| 486 | /// let handle = thread::spawn(move || { |
| 487 | /// tx2.send(1u8).unwrap(); |
| 488 | /// }); |
| 489 | /// |
| 490 | /// handle.join().unwrap(); |
| 491 | /// |
| 492 | /// assert!(!tx1.is_empty()); |
| 493 | /// ``` |
| 494 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 495 | pub fn is_empty(&self) -> bool { |
| 496 | match &self.flavor { |
| 497 | SenderFlavor::Array(chan) => chan.is_empty(), |
| 498 | SenderFlavor::List(chan) => chan.is_empty(), |
| 499 | SenderFlavor::Zero(chan) => chan.is_empty(), |
| 500 | } |
| 501 | } |
| 502 | |
| 503 | /// Returns `true` if the channel is full. |
| 504 | /// |
| 505 | /// Note: Zero-capacity channels are always full. |
| 506 | /// |
| 507 | /// # Examples |
| 508 | /// |
| 509 | /// ``` |
| 510 | /// #![feature(mpmc_channel)] |
| 511 | /// |
| 512 | /// use std::sync::mpmc; |
| 513 | /// use std::thread; |
| 514 | /// |
| 515 | /// let (send, _recv) = mpmc::sync_channel(1); |
| 516 | /// |
| 517 | /// let (tx1, tx2) = (send.clone(), send.clone()); |
| 518 | /// assert!(!tx1.is_full()); |
| 519 | /// |
| 520 | /// let handle = thread::spawn(move || { |
| 521 | /// tx2.send(1u8).unwrap(); |
| 522 | /// }); |
| 523 | /// |
| 524 | /// handle.join().unwrap(); |
| 525 | /// |
| 526 | /// assert!(tx1.is_full()); |
| 527 | /// ``` |
| 528 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 529 | pub fn is_full(&self) -> bool { |
| 530 | match &self.flavor { |
| 531 | SenderFlavor::Array(chan) => chan.is_full(), |
| 532 | SenderFlavor::List(chan) => chan.is_full(), |
| 533 | SenderFlavor::Zero(chan) => chan.is_full(), |
| 534 | } |
| 535 | } |
| 536 | |
| 537 | /// Returns the number of messages in the channel. |
| 538 | /// |
| 539 | /// # Examples |
| 540 | /// |
| 541 | /// ``` |
| 542 | /// #![feature(mpmc_channel)] |
| 543 | /// |
| 544 | /// use std::sync::mpmc; |
| 545 | /// use std::thread; |
| 546 | /// |
| 547 | /// let (send, _recv) = mpmc::channel(); |
| 548 | /// let (tx1, tx2) = (send.clone(), send.clone()); |
| 549 | /// |
| 550 | /// assert_eq!(tx1.len(), 0); |
| 551 | /// |
| 552 | /// let handle = thread::spawn(move || { |
| 553 | /// tx2.send(1u8).unwrap(); |
| 554 | /// }); |
| 555 | /// |
| 556 | /// handle.join().unwrap(); |
| 557 | /// |
| 558 | /// assert_eq!(tx1.len(), 1); |
| 559 | /// ``` |
| 560 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 561 | pub fn len(&self) -> usize { |
| 562 | match &self.flavor { |
| 563 | SenderFlavor::Array(chan) => chan.len(), |
| 564 | SenderFlavor::List(chan) => chan.len(), |
| 565 | SenderFlavor::Zero(chan) => chan.len(), |
| 566 | } |
| 567 | } |
| 568 | |
| 569 | /// If the channel is bounded, returns its capacity. |
| 570 | /// |
| 571 | /// # Examples |
| 572 | /// |
| 573 | /// ``` |
| 574 | /// #![feature(mpmc_channel)] |
| 575 | /// |
| 576 | /// use std::sync::mpmc; |
| 577 | /// use std::thread; |
| 578 | /// |
| 579 | /// let (send, _recv) = mpmc::sync_channel(3); |
| 580 | /// let (tx1, tx2) = (send.clone(), send.clone()); |
| 581 | /// |
| 582 | /// assert_eq!(tx1.capacity(), Some(3)); |
| 583 | /// |
| 584 | /// let handle = thread::spawn(move || { |
| 585 | /// tx2.send(1u8).unwrap(); |
| 586 | /// }); |
| 587 | /// |
| 588 | /// handle.join().unwrap(); |
| 589 | /// |
| 590 | /// assert_eq!(tx1.capacity(), Some(3)); |
| 591 | /// ``` |
| 592 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 593 | pub fn capacity(&self) -> Option<usize> { |
| 594 | match &self.flavor { |
| 595 | SenderFlavor::Array(chan) => chan.capacity(), |
| 596 | SenderFlavor::List(chan) => chan.capacity(), |
| 597 | SenderFlavor::Zero(chan) => chan.capacity(), |
| 598 | } |
| 599 | } |
| 600 | |
| 601 | /// Returns `true` if senders belong to the same channel. |
| 602 | /// |
| 603 | /// # Examples |
| 604 | /// |
| 605 | /// ``` |
| 606 | /// #![feature(mpmc_channel)] |
| 607 | /// |
| 608 | /// use std::sync::mpmc; |
| 609 | /// |
| 610 | /// let (tx1, _) = mpmc::channel::<i32>(); |
| 611 | /// let (tx2, _) = mpmc::channel::<i32>(); |
| 612 | /// |
| 613 | /// assert!(tx1.same_channel(&tx1)); |
| 614 | /// assert!(!tx1.same_channel(&tx2)); |
| 615 | /// ``` |
| 616 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 617 | pub fn same_channel(&self, other: &Sender<T>) -> bool { |
| 618 | match (&self.flavor, &other.flavor) { |
| 619 | (SenderFlavor::Array(a), SenderFlavor::Array(b)) => a == b, |
| 620 | (SenderFlavor::List(a), SenderFlavor::List(b)) => a == b, |
| 621 | (SenderFlavor::Zero(a), SenderFlavor::Zero(b)) => a == b, |
| 622 | _ => false, |
| 623 | } |
| 624 | } |
| 625 | } |
| 626 | |
| 627 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 628 | impl<T> Drop for Sender<T> { |
| 629 | fn drop(&mut self) { |
| 630 | unsafe { |
| 631 | match &self.flavor { |
| 632 | SenderFlavor::Array(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect_senders()), |
| 633 | SenderFlavor::List(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect_senders()), |
| 634 | SenderFlavor::Zero(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect()), |
| 635 | } |
| 636 | } |
| 637 | } |
| 638 | } |
| 639 | |
| 640 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 641 | impl<T> Clone for Sender<T> { |
| 642 | fn clone(&self) -> Self { |
| 643 | let flavor: SenderFlavor = match &self.flavor { |
| 644 | SenderFlavor::Array(chan: &Sender>) => SenderFlavor::Array(chan.acquire()), |
| 645 | SenderFlavor::List(chan: &Sender>) => SenderFlavor::List(chan.acquire()), |
| 646 | SenderFlavor::Zero(chan: &Sender>) => SenderFlavor::Zero(chan.acquire()), |
| 647 | }; |
| 648 | |
| 649 | Sender { flavor } |
| 650 | } |
| 651 | } |
| 652 | |
| 653 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 654 | impl<T> fmt::Debug for Sender<T> { |
| 655 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 656 | f.pad("Sender { .. }" ) |
| 657 | } |
| 658 | } |
| 659 | |
| 660 | /// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type. |
| 661 | /// Different threads can share this [`Receiver`] by cloning it. |
| 662 | /// |
| 663 | /// Messages sent to the channel can be retrieved using [`recv`]. |
| 664 | /// |
| 665 | /// [`recv`]: Receiver::recv |
| 666 | /// |
| 667 | /// # Examples |
| 668 | /// |
| 669 | /// ```rust |
| 670 | /// #![feature(mpmc_channel)] |
| 671 | /// |
| 672 | /// use std::sync::mpmc::channel; |
| 673 | /// use std::thread; |
| 674 | /// use std::time::Duration; |
| 675 | /// |
| 676 | /// let (send, recv) = channel(); |
| 677 | /// |
| 678 | /// let tx_thread = thread::spawn(move || { |
| 679 | /// send.send("Hello world!" ).unwrap(); |
| 680 | /// thread::sleep(Duration::from_secs(2)); // block for two seconds |
| 681 | /// send.send("Delayed for 2 seconds" ).unwrap(); |
| 682 | /// }); |
| 683 | /// |
| 684 | /// let (rx1, rx2) = (recv.clone(), recv.clone()); |
| 685 | /// let rx_thread_1 = thread::spawn(move || { |
| 686 | /// println!("{}" , rx1.recv().unwrap()); // Received immediately |
| 687 | /// }); |
| 688 | /// let rx_thread_2 = thread::spawn(move || { |
| 689 | /// println!("{}" , rx2.recv().unwrap()); // Received after 2 seconds |
| 690 | /// }); |
| 691 | /// |
| 692 | /// tx_thread.join().unwrap(); |
| 693 | /// rx_thread_1.join().unwrap(); |
| 694 | /// rx_thread_2.join().unwrap(); |
| 695 | /// ``` |
| 696 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 697 | pub struct Receiver<T> { |
| 698 | flavor: ReceiverFlavor<T>, |
| 699 | } |
| 700 | |
| 701 | /// An iterator over messages on a [`Receiver`], created by [`iter`]. |
| 702 | /// |
| 703 | /// This iterator will block whenever [`next`] is called, |
| 704 | /// waiting for a new message, and [`None`] will be returned |
| 705 | /// when the corresponding channel has hung up. |
| 706 | /// |
| 707 | /// [`iter`]: Receiver::iter |
| 708 | /// [`next`]: Iterator::next |
| 709 | /// |
| 710 | /// # Examples |
| 711 | /// |
| 712 | /// ```rust |
| 713 | /// #![feature(mpmc_channel)] |
| 714 | /// |
| 715 | /// use std::sync::mpmc::channel; |
| 716 | /// use std::thread; |
| 717 | /// |
| 718 | /// let (send, recv) = channel(); |
| 719 | /// |
| 720 | /// thread::spawn(move || { |
| 721 | /// send.send(1u8).unwrap(); |
| 722 | /// send.send(2u8).unwrap(); |
| 723 | /// send.send(3u8).unwrap(); |
| 724 | /// }); |
| 725 | /// |
| 726 | /// for x in recv.iter() { |
| 727 | /// println!("Got: {x}" ); |
| 728 | /// } |
| 729 | /// ``` |
| 730 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 731 | #[derive (Debug)] |
| 732 | pub struct Iter<'a, T: 'a> { |
| 733 | rx: &'a Receiver<T>, |
| 734 | } |
| 735 | |
| 736 | /// An iterator that attempts to yield all pending values for a [`Receiver`], |
| 737 | /// created by [`try_iter`]. |
| 738 | /// |
| 739 | /// [`None`] will be returned when there are no pending values remaining or |
| 740 | /// if the corresponding channel has hung up. |
| 741 | /// |
| 742 | /// This iterator will never block the caller in order to wait for data to |
| 743 | /// become available. Instead, it will return [`None`]. |
| 744 | /// |
| 745 | /// [`try_iter`]: Receiver::try_iter |
| 746 | /// |
| 747 | /// # Examples |
| 748 | /// |
| 749 | /// ```rust |
| 750 | /// #![feature(mpmc_channel)] |
| 751 | /// |
| 752 | /// use std::sync::mpmc::channel; |
| 753 | /// use std::thread; |
| 754 | /// use std::time::Duration; |
| 755 | /// |
| 756 | /// let (sender, receiver) = channel(); |
| 757 | /// |
| 758 | /// // Nothing is in the buffer yet |
| 759 | /// assert!(receiver.try_iter().next().is_none()); |
| 760 | /// println!("Nothing in the buffer..." ); |
| 761 | /// |
| 762 | /// thread::spawn(move || { |
| 763 | /// sender.send(1).unwrap(); |
| 764 | /// sender.send(2).unwrap(); |
| 765 | /// sender.send(3).unwrap(); |
| 766 | /// }); |
| 767 | /// |
| 768 | /// println!("Going to sleep..." ); |
| 769 | /// thread::sleep(Duration::from_secs(2)); // block for two seconds |
| 770 | /// |
| 771 | /// for x in receiver.try_iter() { |
| 772 | /// println!("Got: {x}" ); |
| 773 | /// } |
| 774 | /// ``` |
| 775 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 776 | #[derive (Debug)] |
| 777 | pub struct TryIter<'a, T: 'a> { |
| 778 | rx: &'a Receiver<T>, |
| 779 | } |
| 780 | |
| 781 | /// An owning iterator over messages on a [`Receiver`], |
| 782 | /// created by [`into_iter`]. |
| 783 | /// |
| 784 | /// This iterator will block whenever [`next`] |
| 785 | /// is called, waiting for a new message, and [`None`] will be |
| 786 | /// returned if the corresponding channel has hung up. |
| 787 | /// |
| 788 | /// [`into_iter`]: Receiver::into_iter |
| 789 | /// [`next`]: Iterator::next |
| 790 | /// |
| 791 | /// # Examples |
| 792 | /// |
| 793 | /// ```rust |
| 794 | /// #![feature(mpmc_channel)] |
| 795 | /// |
| 796 | /// use std::sync::mpmc::channel; |
| 797 | /// use std::thread; |
| 798 | /// |
| 799 | /// let (send, recv) = channel(); |
| 800 | /// |
| 801 | /// thread::spawn(move || { |
| 802 | /// send.send(1u8).unwrap(); |
| 803 | /// send.send(2u8).unwrap(); |
| 804 | /// send.send(3u8).unwrap(); |
| 805 | /// }); |
| 806 | /// |
| 807 | /// for x in recv.into_iter() { |
| 808 | /// println!("Got: {x}" ); |
| 809 | /// } |
| 810 | /// ``` |
| 811 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 812 | #[derive (Debug)] |
| 813 | pub struct IntoIter<T> { |
| 814 | rx: Receiver<T>, |
| 815 | } |
| 816 | |
| 817 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 818 | impl<'a, T> Iterator for Iter<'a, T> { |
| 819 | type Item = T; |
| 820 | |
| 821 | fn next(&mut self) -> Option<T> { |
| 822 | self.rx.recv().ok() |
| 823 | } |
| 824 | } |
| 825 | |
| 826 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 827 | impl<'a, T> Iterator for TryIter<'a, T> { |
| 828 | type Item = T; |
| 829 | |
| 830 | fn next(&mut self) -> Option<T> { |
| 831 | self.rx.try_recv().ok() |
| 832 | } |
| 833 | } |
| 834 | |
| 835 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 836 | impl<'a, T> IntoIterator for &'a Receiver<T> { |
| 837 | type Item = T; |
| 838 | type IntoIter = Iter<'a, T>; |
| 839 | |
| 840 | fn into_iter(self) -> Iter<'a, T> { |
| 841 | self.iter() |
| 842 | } |
| 843 | } |
| 844 | |
| 845 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 846 | impl<T> Iterator for IntoIter<T> { |
| 847 | type Item = T; |
| 848 | fn next(&mut self) -> Option<T> { |
| 849 | self.rx.recv().ok() |
| 850 | } |
| 851 | } |
| 852 | |
| 853 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 854 | impl<T> IntoIterator for Receiver<T> { |
| 855 | type Item = T; |
| 856 | type IntoIter = IntoIter<T>; |
| 857 | |
| 858 | fn into_iter(self) -> IntoIter<T> { |
| 859 | IntoIter { rx: self } |
| 860 | } |
| 861 | } |
| 862 | |
| 863 | /// Receiver flavors. |
| 864 | enum ReceiverFlavor<T> { |
| 865 | /// Bounded channel based on a preallocated array. |
| 866 | Array(counter::Receiver<array::Channel<T>>), |
| 867 | |
| 868 | /// Unbounded channel implemented as a linked list. |
| 869 | List(counter::Receiver<list::Channel<T>>), |
| 870 | |
| 871 | /// Zero-capacity channel. |
| 872 | Zero(counter::Receiver<zero::Channel<T>>), |
| 873 | } |
| 874 | |
| 875 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 876 | unsafe impl<T: Send> Send for Receiver<T> {} |
| 877 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 878 | unsafe impl<T: Send> Sync for Receiver<T> {} |
| 879 | |
| 880 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 881 | impl<T> UnwindSafe for Receiver<T> {} |
| 882 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 883 | impl<T> RefUnwindSafe for Receiver<T> {} |
| 884 | |
| 885 | impl<T> Receiver<T> { |
| 886 | /// Attempts to receive a message from the channel without blocking. |
| 887 | /// |
| 888 | /// This method will never block the caller in order to wait for data to |
| 889 | /// become available. Instead, this will always return immediately with a |
| 890 | /// possible option of pending data on the channel. |
| 891 | /// |
| 892 | /// If called on a zero-capacity channel, this method will receive a message only if there |
| 893 | /// happens to be a send operation on the other side of the channel at the same time. |
| 894 | /// |
| 895 | /// This is useful for a flavor of "optimistic check" before deciding to |
| 896 | /// block on a receiver. |
| 897 | /// |
| 898 | /// Compared with [`recv`], this function has two failure cases instead of one |
| 899 | /// (one for disconnection, one for an empty buffer). |
| 900 | /// |
| 901 | /// [`recv`]: Self::recv |
| 902 | /// |
| 903 | /// # Examples |
| 904 | /// |
| 905 | /// ```rust |
| 906 | /// #![feature(mpmc_channel)] |
| 907 | /// |
| 908 | /// use std::sync::mpmc::{Receiver, channel}; |
| 909 | /// |
| 910 | /// let (_, receiver): (_, Receiver<i32>) = channel(); |
| 911 | /// |
| 912 | /// assert!(receiver.try_recv().is_err()); |
| 913 | /// ``` |
| 914 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 915 | pub fn try_recv(&self) -> Result<T, TryRecvError> { |
| 916 | match &self.flavor { |
| 917 | ReceiverFlavor::Array(chan) => chan.try_recv(), |
| 918 | ReceiverFlavor::List(chan) => chan.try_recv(), |
| 919 | ReceiverFlavor::Zero(chan) => chan.try_recv(), |
| 920 | } |
| 921 | } |
| 922 | |
| 923 | /// Attempts to wait for a value on this receiver, returning an error if the |
| 924 | /// corresponding channel has hung up. |
| 925 | /// |
| 926 | /// This function will always block the current thread if there is no data |
| 927 | /// available and it's possible for more data to be sent (at least one sender |
| 928 | /// still exists). Once a message is sent to the corresponding [`Sender`], |
| 929 | /// this receiver will wake up and return that message. |
| 930 | /// |
| 931 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while |
| 932 | /// this call is blocking, this call will wake up and return [`Err`] to |
| 933 | /// indicate that no more messages can ever be received on this channel. |
| 934 | /// However, since channels are buffered, messages sent before the disconnect |
| 935 | /// will still be properly received. |
| 936 | /// |
| 937 | /// # Examples |
| 938 | /// |
| 939 | /// ``` |
| 940 | /// #![feature(mpmc_channel)] |
| 941 | /// |
| 942 | /// use std::sync::mpmc; |
| 943 | /// use std::thread; |
| 944 | /// |
| 945 | /// let (send, recv) = mpmc::channel(); |
| 946 | /// let handle = thread::spawn(move || { |
| 947 | /// send.send(1u8).unwrap(); |
| 948 | /// }); |
| 949 | /// |
| 950 | /// handle.join().unwrap(); |
| 951 | /// |
| 952 | /// assert_eq!(Ok(1), recv.recv()); |
| 953 | /// ``` |
| 954 | /// |
| 955 | /// Buffering behavior: |
| 956 | /// |
| 957 | /// ``` |
| 958 | /// #![feature(mpmc_channel)] |
| 959 | /// |
| 960 | /// use std::sync::mpmc; |
| 961 | /// use std::thread; |
| 962 | /// use std::sync::mpmc::RecvError; |
| 963 | /// |
| 964 | /// let (send, recv) = mpmc::channel(); |
| 965 | /// let handle = thread::spawn(move || { |
| 966 | /// send.send(1u8).unwrap(); |
| 967 | /// send.send(2).unwrap(); |
| 968 | /// send.send(3).unwrap(); |
| 969 | /// drop(send); |
| 970 | /// }); |
| 971 | /// |
| 972 | /// // wait for the thread to join so we ensure the sender is dropped |
| 973 | /// handle.join().unwrap(); |
| 974 | /// |
| 975 | /// assert_eq!(Ok(1), recv.recv()); |
| 976 | /// assert_eq!(Ok(2), recv.recv()); |
| 977 | /// assert_eq!(Ok(3), recv.recv()); |
| 978 | /// assert_eq!(Err(RecvError), recv.recv()); |
| 979 | /// ``` |
| 980 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 981 | pub fn recv(&self) -> Result<T, RecvError> { |
| 982 | match &self.flavor { |
| 983 | ReceiverFlavor::Array(chan) => chan.recv(None), |
| 984 | ReceiverFlavor::List(chan) => chan.recv(None), |
| 985 | ReceiverFlavor::Zero(chan) => chan.recv(None), |
| 986 | } |
| 987 | .map_err(|_| RecvError) |
| 988 | } |
| 989 | |
| 990 | /// Attempts to wait for a value on this receiver, returning an error if the |
| 991 | /// corresponding channel has hung up, or if it waits more than `timeout`. |
| 992 | /// |
| 993 | /// This function will always block the current thread if there is no data |
| 994 | /// available and it's possible for more data to be sent (at least one sender |
| 995 | /// still exists). Once a message is sent to the corresponding [`Sender`], |
| 996 | /// this receiver will wake up and return that message. |
| 997 | /// |
| 998 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while |
| 999 | /// this call is blocking, this call will wake up and return [`Err`] to |
| 1000 | /// indicate that no more messages can ever be received on this channel. |
| 1001 | /// However, since channels are buffered, messages sent before the disconnect |
| 1002 | /// will still be properly received. |
| 1003 | /// |
| 1004 | /// # Examples |
| 1005 | /// |
| 1006 | /// Successfully receiving value before encountering timeout: |
| 1007 | /// |
| 1008 | /// ```no_run |
| 1009 | /// #![feature(mpmc_channel)] |
| 1010 | /// |
| 1011 | /// use std::thread; |
| 1012 | /// use std::time::Duration; |
| 1013 | /// use std::sync::mpmc; |
| 1014 | /// |
| 1015 | /// let (send, recv) = mpmc::channel(); |
| 1016 | /// |
| 1017 | /// thread::spawn(move || { |
| 1018 | /// send.send('a' ).unwrap(); |
| 1019 | /// }); |
| 1020 | /// |
| 1021 | /// assert_eq!( |
| 1022 | /// recv.recv_timeout(Duration::from_millis(400)), |
| 1023 | /// Ok('a' ) |
| 1024 | /// ); |
| 1025 | /// ``` |
| 1026 | /// |
| 1027 | /// Receiving an error upon reaching timeout: |
| 1028 | /// |
| 1029 | /// ```no_run |
| 1030 | /// #![feature(mpmc_channel)] |
| 1031 | /// |
| 1032 | /// use std::thread; |
| 1033 | /// use std::time::Duration; |
| 1034 | /// use std::sync::mpmc; |
| 1035 | /// |
| 1036 | /// let (send, recv) = mpmc::channel(); |
| 1037 | /// |
| 1038 | /// thread::spawn(move || { |
| 1039 | /// thread::sleep(Duration::from_millis(800)); |
| 1040 | /// send.send('a' ).unwrap(); |
| 1041 | /// }); |
| 1042 | /// |
| 1043 | /// assert_eq!( |
| 1044 | /// recv.recv_timeout(Duration::from_millis(400)), |
| 1045 | /// Err(mpmc::RecvTimeoutError::Timeout) |
| 1046 | /// ); |
| 1047 | /// ``` |
| 1048 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1049 | pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { |
| 1050 | match Instant::now().checked_add(timeout) { |
| 1051 | Some(deadline) => self.recv_deadline(deadline), |
| 1052 | // So far in the future that it's practically the same as waiting indefinitely. |
| 1053 | None => self.recv().map_err(RecvTimeoutError::from), |
| 1054 | } |
| 1055 | } |
| 1056 | |
| 1057 | /// Attempts to wait for a value on this receiver, returning an error if the |
| 1058 | /// corresponding channel has hung up, or if `deadline` is reached. |
| 1059 | /// |
| 1060 | /// This function will always block the current thread if there is no data |
| 1061 | /// available and it's possible for more data to be sent. Once a message is |
| 1062 | /// sent to the corresponding [`Sender`], then this receiver will wake up |
| 1063 | /// and return that message. |
| 1064 | /// |
| 1065 | /// If the corresponding [`Sender`] has disconnected, or it disconnects while |
| 1066 | /// this call is blocking, this call will wake up and return [`Err`] to |
| 1067 | /// indicate that no more messages can ever be received on this channel. |
| 1068 | /// However, since channels are buffered, messages sent before the disconnect |
| 1069 | /// will still be properly received. |
| 1070 | /// |
| 1071 | /// # Examples |
| 1072 | /// |
| 1073 | /// Successfully receiving value before reaching deadline: |
| 1074 | /// |
| 1075 | /// ```no_run |
| 1076 | /// #![feature(mpmc_channel)] |
| 1077 | /// |
| 1078 | /// use std::thread; |
| 1079 | /// use std::time::{Duration, Instant}; |
| 1080 | /// use std::sync::mpmc; |
| 1081 | /// |
| 1082 | /// let (send, recv) = mpmc::channel(); |
| 1083 | /// |
| 1084 | /// thread::spawn(move || { |
| 1085 | /// send.send('a' ).unwrap(); |
| 1086 | /// }); |
| 1087 | /// |
| 1088 | /// assert_eq!( |
| 1089 | /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), |
| 1090 | /// Ok('a' ) |
| 1091 | /// ); |
| 1092 | /// ``` |
| 1093 | /// |
| 1094 | /// Receiving an error upon reaching deadline: |
| 1095 | /// |
| 1096 | /// ```no_run |
| 1097 | /// #![feature(mpmc_channel)] |
| 1098 | /// |
| 1099 | /// use std::thread; |
| 1100 | /// use std::time::{Duration, Instant}; |
| 1101 | /// use std::sync::mpmc; |
| 1102 | /// |
| 1103 | /// let (send, recv) = mpmc::channel(); |
| 1104 | /// |
| 1105 | /// thread::spawn(move || { |
| 1106 | /// thread::sleep(Duration::from_millis(800)); |
| 1107 | /// send.send('a' ).unwrap(); |
| 1108 | /// }); |
| 1109 | /// |
| 1110 | /// assert_eq!( |
| 1111 | /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), |
| 1112 | /// Err(mpmc::RecvTimeoutError::Timeout) |
| 1113 | /// ); |
| 1114 | /// ``` |
| 1115 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1116 | pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { |
| 1117 | match &self.flavor { |
| 1118 | ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)), |
| 1119 | ReceiverFlavor::List(chan) => chan.recv(Some(deadline)), |
| 1120 | ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)), |
| 1121 | } |
| 1122 | } |
| 1123 | |
| 1124 | /// Returns an iterator that will attempt to yield all pending values. |
| 1125 | /// It will return `None` if there are no more pending values or if the |
| 1126 | /// channel has hung up. The iterator will never [`panic!`] or block the |
| 1127 | /// user by waiting for values. |
| 1128 | /// |
| 1129 | /// # Examples |
| 1130 | /// |
| 1131 | /// ```no_run |
| 1132 | /// #![feature(mpmc_channel)] |
| 1133 | /// |
| 1134 | /// use std::sync::mpmc::channel; |
| 1135 | /// use std::thread; |
| 1136 | /// use std::time::Duration; |
| 1137 | /// |
| 1138 | /// let (sender, receiver) = channel(); |
| 1139 | /// |
| 1140 | /// // nothing is in the buffer yet |
| 1141 | /// assert!(receiver.try_iter().next().is_none()); |
| 1142 | /// |
| 1143 | /// thread::spawn(move || { |
| 1144 | /// thread::sleep(Duration::from_secs(1)); |
| 1145 | /// sender.send(1).unwrap(); |
| 1146 | /// sender.send(2).unwrap(); |
| 1147 | /// sender.send(3).unwrap(); |
| 1148 | /// }); |
| 1149 | /// |
| 1150 | /// // nothing is in the buffer yet |
| 1151 | /// assert!(receiver.try_iter().next().is_none()); |
| 1152 | /// |
| 1153 | /// // block for two seconds |
| 1154 | /// thread::sleep(Duration::from_secs(2)); |
| 1155 | /// |
| 1156 | /// let mut iter = receiver.try_iter(); |
| 1157 | /// assert_eq!(iter.next(), Some(1)); |
| 1158 | /// assert_eq!(iter.next(), Some(2)); |
| 1159 | /// assert_eq!(iter.next(), Some(3)); |
| 1160 | /// assert_eq!(iter.next(), None); |
| 1161 | /// ``` |
| 1162 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1163 | pub fn try_iter(&self) -> TryIter<'_, T> { |
| 1164 | TryIter { rx: self } |
| 1165 | } |
| 1166 | } |
| 1167 | |
| 1168 | impl<T> Receiver<T> { |
| 1169 | /// Returns `true` if the channel is empty. |
| 1170 | /// |
| 1171 | /// Note: Zero-capacity channels are always empty. |
| 1172 | /// |
| 1173 | /// # Examples |
| 1174 | /// |
| 1175 | /// ``` |
| 1176 | /// #![feature(mpmc_channel)] |
| 1177 | /// |
| 1178 | /// use std::sync::mpmc; |
| 1179 | /// use std::thread; |
| 1180 | /// |
| 1181 | /// let (send, recv) = mpmc::channel(); |
| 1182 | /// |
| 1183 | /// assert!(recv.is_empty()); |
| 1184 | /// |
| 1185 | /// let handle = thread::spawn(move || { |
| 1186 | /// send.send(1u8).unwrap(); |
| 1187 | /// }); |
| 1188 | /// |
| 1189 | /// handle.join().unwrap(); |
| 1190 | /// |
| 1191 | /// assert!(!recv.is_empty()); |
| 1192 | /// ``` |
| 1193 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1194 | pub fn is_empty(&self) -> bool { |
| 1195 | match &self.flavor { |
| 1196 | ReceiverFlavor::Array(chan) => chan.is_empty(), |
| 1197 | ReceiverFlavor::List(chan) => chan.is_empty(), |
| 1198 | ReceiverFlavor::Zero(chan) => chan.is_empty(), |
| 1199 | } |
| 1200 | } |
| 1201 | |
| 1202 | /// Returns `true` if the channel is full. |
| 1203 | /// |
| 1204 | /// Note: Zero-capacity channels are always full. |
| 1205 | /// |
| 1206 | /// # Examples |
| 1207 | /// |
| 1208 | /// ``` |
| 1209 | /// #![feature(mpmc_channel)] |
| 1210 | /// |
| 1211 | /// use std::sync::mpmc; |
| 1212 | /// use std::thread; |
| 1213 | /// |
| 1214 | /// let (send, recv) = mpmc::sync_channel(1); |
| 1215 | /// |
| 1216 | /// assert!(!recv.is_full()); |
| 1217 | /// |
| 1218 | /// let handle = thread::spawn(move || { |
| 1219 | /// send.send(1u8).unwrap(); |
| 1220 | /// }); |
| 1221 | /// |
| 1222 | /// handle.join().unwrap(); |
| 1223 | /// |
| 1224 | /// assert!(recv.is_full()); |
| 1225 | /// ``` |
| 1226 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1227 | pub fn is_full(&self) -> bool { |
| 1228 | match &self.flavor { |
| 1229 | ReceiverFlavor::Array(chan) => chan.is_full(), |
| 1230 | ReceiverFlavor::List(chan) => chan.is_full(), |
| 1231 | ReceiverFlavor::Zero(chan) => chan.is_full(), |
| 1232 | } |
| 1233 | } |
| 1234 | |
| 1235 | /// Returns the number of messages in the channel. |
| 1236 | /// |
| 1237 | /// # Examples |
| 1238 | /// |
| 1239 | /// ``` |
| 1240 | /// #![feature(mpmc_channel)] |
| 1241 | /// |
| 1242 | /// use std::sync::mpmc; |
| 1243 | /// use std::thread; |
| 1244 | /// |
| 1245 | /// let (send, recv) = mpmc::channel(); |
| 1246 | /// |
| 1247 | /// assert_eq!(recv.len(), 0); |
| 1248 | /// |
| 1249 | /// let handle = thread::spawn(move || { |
| 1250 | /// send.send(1u8).unwrap(); |
| 1251 | /// }); |
| 1252 | /// |
| 1253 | /// handle.join().unwrap(); |
| 1254 | /// |
| 1255 | /// assert_eq!(recv.len(), 1); |
| 1256 | /// ``` |
| 1257 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1258 | pub fn len(&self) -> usize { |
| 1259 | match &self.flavor { |
| 1260 | ReceiverFlavor::Array(chan) => chan.len(), |
| 1261 | ReceiverFlavor::List(chan) => chan.len(), |
| 1262 | ReceiverFlavor::Zero(chan) => chan.len(), |
| 1263 | } |
| 1264 | } |
| 1265 | |
| 1266 | /// If the channel is bounded, returns its capacity. |
| 1267 | /// |
| 1268 | /// # Examples |
| 1269 | /// |
| 1270 | /// ``` |
| 1271 | /// #![feature(mpmc_channel)] |
| 1272 | /// |
| 1273 | /// use std::sync::mpmc; |
| 1274 | /// use std::thread; |
| 1275 | /// |
| 1276 | /// let (send, recv) = mpmc::sync_channel(3); |
| 1277 | /// |
| 1278 | /// assert_eq!(recv.capacity(), Some(3)); |
| 1279 | /// |
| 1280 | /// let handle = thread::spawn(move || { |
| 1281 | /// send.send(1u8).unwrap(); |
| 1282 | /// }); |
| 1283 | /// |
| 1284 | /// handle.join().unwrap(); |
| 1285 | /// |
| 1286 | /// assert_eq!(recv.capacity(), Some(3)); |
| 1287 | /// ``` |
| 1288 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1289 | pub fn capacity(&self) -> Option<usize> { |
| 1290 | match &self.flavor { |
| 1291 | ReceiverFlavor::Array(chan) => chan.capacity(), |
| 1292 | ReceiverFlavor::List(chan) => chan.capacity(), |
| 1293 | ReceiverFlavor::Zero(chan) => chan.capacity(), |
| 1294 | } |
| 1295 | } |
| 1296 | |
| 1297 | /// Returns `true` if receivers belong to the same channel. |
| 1298 | /// |
| 1299 | /// # Examples |
| 1300 | /// |
| 1301 | /// ``` |
| 1302 | /// #![feature(mpmc_channel)] |
| 1303 | /// |
| 1304 | /// use std::sync::mpmc; |
| 1305 | /// |
| 1306 | /// let (_, rx1) = mpmc::channel::<i32>(); |
| 1307 | /// let (_, rx2) = mpmc::channel::<i32>(); |
| 1308 | /// |
| 1309 | /// assert!(rx1.same_channel(&rx1)); |
| 1310 | /// assert!(!rx1.same_channel(&rx2)); |
| 1311 | /// ``` |
| 1312 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1313 | pub fn same_channel(&self, other: &Receiver<T>) -> bool { |
| 1314 | match (&self.flavor, &other.flavor) { |
| 1315 | (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b, |
| 1316 | (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b, |
| 1317 | (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b, |
| 1318 | _ => false, |
| 1319 | } |
| 1320 | } |
| 1321 | |
| 1322 | /// Returns an iterator that will block waiting for messages, but never |
| 1323 | /// [`panic!`]. It will return [`None`] when the channel has hung up. |
| 1324 | /// |
| 1325 | /// # Examples |
| 1326 | /// |
| 1327 | /// ```rust |
| 1328 | /// #![feature(mpmc_channel)] |
| 1329 | /// |
| 1330 | /// use std::sync::mpmc::channel; |
| 1331 | /// use std::thread; |
| 1332 | /// |
| 1333 | /// let (send, recv) = channel(); |
| 1334 | /// |
| 1335 | /// thread::spawn(move || { |
| 1336 | /// send.send(1).unwrap(); |
| 1337 | /// send.send(2).unwrap(); |
| 1338 | /// send.send(3).unwrap(); |
| 1339 | /// }); |
| 1340 | /// |
| 1341 | /// let mut iter = recv.iter(); |
| 1342 | /// assert_eq!(iter.next(), Some(1)); |
| 1343 | /// assert_eq!(iter.next(), Some(2)); |
| 1344 | /// assert_eq!(iter.next(), Some(3)); |
| 1345 | /// assert_eq!(iter.next(), None); |
| 1346 | /// ``` |
| 1347 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1348 | pub fn iter(&self) -> Iter<'_, T> { |
| 1349 | Iter { rx: self } |
| 1350 | } |
| 1351 | } |
| 1352 | |
| 1353 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1354 | impl<T> Drop for Receiver<T> { |
| 1355 | fn drop(&mut self) { |
| 1356 | unsafe { |
| 1357 | match &self.flavor { |
| 1358 | ReceiverFlavor::Array(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect_receivers()), |
| 1359 | ReceiverFlavor::List(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect_receivers()), |
| 1360 | ReceiverFlavor::Zero(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect()), |
| 1361 | } |
| 1362 | } |
| 1363 | } |
| 1364 | } |
| 1365 | |
| 1366 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1367 | impl<T> Clone for Receiver<T> { |
| 1368 | fn clone(&self) -> Self { |
| 1369 | let flavor: ReceiverFlavor = match &self.flavor { |
| 1370 | ReceiverFlavor::Array(chan: &Receiver>) => ReceiverFlavor::Array(chan.acquire()), |
| 1371 | ReceiverFlavor::List(chan: &Receiver>) => ReceiverFlavor::List(chan.acquire()), |
| 1372 | ReceiverFlavor::Zero(chan: &Receiver>) => ReceiverFlavor::Zero(chan.acquire()), |
| 1373 | }; |
| 1374 | |
| 1375 | Receiver { flavor } |
| 1376 | } |
| 1377 | } |
| 1378 | |
| 1379 | #[unstable (feature = "mpmc_channel" , issue = "126840" )] |
| 1380 | impl<T> fmt::Debug for Receiver<T> { |
| 1381 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 1382 | f.pad("Receiver { .. }" ) |
| 1383 | } |
| 1384 | } |
| 1385 | |
| 1386 | #[cfg (test)] |
| 1387 | mod tests; |
| 1388 | |