| 1 | //! Multi-producer multi-consumer channels for message passing. |
| 2 | //! |
| 3 | //! This crate is an alternative to [`std::sync::mpsc`] with more features and better performance. |
| 4 | //! |
| 5 | //! # Hello, world! |
| 6 | //! |
| 7 | //! ``` |
| 8 | //! use crossbeam_channel::unbounded; |
| 9 | //! |
| 10 | //! // Create a channel of unbounded capacity. |
| 11 | //! let (s, r) = unbounded(); |
| 12 | //! |
| 13 | //! // Send a message into the channel. |
| 14 | //! s.send("Hello, world!" ).unwrap(); |
| 15 | //! |
| 16 | //! // Receive the message from the channel. |
| 17 | //! assert_eq!(r.recv(), Ok("Hello, world!" )); |
| 18 | //! ``` |
| 19 | //! |
| 20 | //! # Channel types |
| 21 | //! |
| 22 | //! Channels can be created using two functions: |
| 23 | //! |
| 24 | //! * [`bounded`] creates a channel of bounded capacity, i.e. there is a limit to how many messages |
| 25 | //! it can hold at a time. |
| 26 | //! |
| 27 | //! * [`unbounded`] creates a channel of unbounded capacity, i.e. it can hold any number of |
| 28 | //! messages at a time. |
| 29 | //! |
| 30 | //! Both functions return a [`Sender`] and a [`Receiver`], which represent the two opposite sides |
| 31 | //! of a channel. |
| 32 | //! |
| 33 | //! Creating a bounded channel: |
| 34 | //! |
| 35 | //! ``` |
| 36 | //! use crossbeam_channel::bounded; |
| 37 | //! |
| 38 | //! // Create a channel that can hold at most 5 messages at a time. |
| 39 | //! let (s, r) = bounded(5); |
| 40 | //! |
| 41 | //! // Can send only 5 messages without blocking. |
| 42 | //! for i in 0..5 { |
| 43 | //! s.send(i).unwrap(); |
| 44 | //! } |
| 45 | //! |
| 46 | //! // Another call to `send` would block because the channel is full. |
| 47 | //! // s.send(5).unwrap(); |
| 48 | //! ``` |
| 49 | //! |
| 50 | //! Creating an unbounded channel: |
| 51 | //! |
| 52 | //! ``` |
| 53 | //! use crossbeam_channel::unbounded; |
| 54 | //! |
| 55 | //! // Create an unbounded channel. |
| 56 | //! let (s, r) = unbounded(); |
| 57 | //! |
| 58 | //! // Can send any number of messages into the channel without blocking. |
| 59 | //! for i in 0..1000 { |
| 60 | //! s.send(i).unwrap(); |
| 61 | //! } |
| 62 | //! ``` |
| 63 | //! |
| 64 | //! A special case is zero-capacity channel, which cannot hold any messages. Instead, send and |
| 65 | //! receive operations must appear at the same time in order to pair up and pass the message over: |
| 66 | //! |
| 67 | //! ``` |
| 68 | //! use std::thread; |
| 69 | //! use crossbeam_channel::bounded; |
| 70 | //! |
| 71 | //! // Create a zero-capacity channel. |
| 72 | //! let (s, r) = bounded(0); |
| 73 | //! |
| 74 | //! // Sending blocks until a receive operation appears on the other side. |
| 75 | //! thread::spawn(move || s.send("Hi!" ).unwrap()); |
| 76 | //! |
| 77 | //! // Receiving blocks until a send operation appears on the other side. |
| 78 | //! assert_eq!(r.recv(), Ok("Hi!" )); |
| 79 | //! ``` |
| 80 | //! |
| 81 | //! # Sharing channels |
| 82 | //! |
| 83 | //! Senders and receivers can be cloned and sent to other threads: |
| 84 | //! |
| 85 | //! ``` |
| 86 | //! use std::thread; |
| 87 | //! use crossbeam_channel::bounded; |
| 88 | //! |
| 89 | //! let (s1, r1) = bounded(0); |
| 90 | //! let (s2, r2) = (s1.clone(), r1.clone()); |
| 91 | //! |
| 92 | //! // Spawn a thread that receives a message and then sends one. |
| 93 | //! thread::spawn(move || { |
| 94 | //! r2.recv().unwrap(); |
| 95 | //! s2.send(2).unwrap(); |
| 96 | //! }); |
| 97 | //! |
| 98 | //! // Send a message and then receive one. |
| 99 | //! s1.send(1).unwrap(); |
| 100 | //! r1.recv().unwrap(); |
| 101 | //! ``` |
| 102 | //! |
| 103 | //! Note that cloning only creates a new handle to the same sending or receiving side. It does not |
| 104 | //! create a separate stream of messages in any way: |
| 105 | //! |
| 106 | //! ``` |
| 107 | //! use crossbeam_channel::unbounded; |
| 108 | //! |
| 109 | //! let (s1, r1) = unbounded(); |
| 110 | //! let (s2, r2) = (s1.clone(), r1.clone()); |
| 111 | //! let (s3, r3) = (s2.clone(), r2.clone()); |
| 112 | //! |
| 113 | //! s1.send(10).unwrap(); |
| 114 | //! s2.send(20).unwrap(); |
| 115 | //! s3.send(30).unwrap(); |
| 116 | //! |
| 117 | //! assert_eq!(r3.recv(), Ok(10)); |
| 118 | //! assert_eq!(r1.recv(), Ok(20)); |
| 119 | //! assert_eq!(r2.recv(), Ok(30)); |
| 120 | //! ``` |
| 121 | //! |
| 122 | //! It's also possible to share senders and receivers by reference: |
| 123 | //! |
| 124 | //! ``` |
| 125 | //! use crossbeam_channel::bounded; |
| 126 | //! use crossbeam_utils::thread::scope; |
| 127 | //! |
| 128 | //! let (s, r) = bounded(0); |
| 129 | //! |
| 130 | //! scope(|scope| { |
| 131 | //! // Spawn a thread that receives a message and then sends one. |
| 132 | //! scope.spawn(|_| { |
| 133 | //! r.recv().unwrap(); |
| 134 | //! s.send(2).unwrap(); |
| 135 | //! }); |
| 136 | //! |
| 137 | //! // Send a message and then receive one. |
| 138 | //! s.send(1).unwrap(); |
| 139 | //! r.recv().unwrap(); |
| 140 | //! }).unwrap(); |
| 141 | //! ``` |
| 142 | //! |
| 143 | //! # Disconnection |
| 144 | //! |
| 145 | //! When all senders or all receivers associated with a channel get dropped, the channel becomes |
| 146 | //! disconnected. No more messages can be sent, but any remaining messages can still be received. |
| 147 | //! Send and receive operations on a disconnected channel never block. |
| 148 | //! |
| 149 | //! ``` |
| 150 | //! use crossbeam_channel::{unbounded, RecvError}; |
| 151 | //! |
| 152 | //! let (s, r) = unbounded(); |
| 153 | //! s.send(1).unwrap(); |
| 154 | //! s.send(2).unwrap(); |
| 155 | //! s.send(3).unwrap(); |
| 156 | //! |
| 157 | //! // The only sender is dropped, disconnecting the channel. |
| 158 | //! drop(s); |
| 159 | //! |
| 160 | //! // The remaining messages can be received. |
| 161 | //! assert_eq!(r.recv(), Ok(1)); |
| 162 | //! assert_eq!(r.recv(), Ok(2)); |
| 163 | //! assert_eq!(r.recv(), Ok(3)); |
| 164 | //! |
| 165 | //! // There are no more messages in the channel. |
| 166 | //! assert!(r.is_empty()); |
| 167 | //! |
| 168 | //! // Note that calling `r.recv()` does not block. |
| 169 | //! // Instead, `Err(RecvError)` is returned immediately. |
| 170 | //! assert_eq!(r.recv(), Err(RecvError)); |
| 171 | //! ``` |
| 172 | //! |
| 173 | //! # Blocking operations |
| 174 | //! |
| 175 | //! Send and receive operations come in three flavors: |
| 176 | //! |
| 177 | //! * Non-blocking (returns immediately with success or failure). |
| 178 | //! * Blocking (waits until the operation succeeds or the channel becomes disconnected). |
| 179 | //! * Blocking with a timeout (blocks only for a certain duration of time). |
| 180 | //! |
| 181 | //! A simple example showing the difference between non-blocking and blocking operations: |
| 182 | //! |
| 183 | //! ``` |
| 184 | //! use crossbeam_channel::{bounded, RecvError, TryRecvError}; |
| 185 | //! |
| 186 | //! let (s, r) = bounded(1); |
| 187 | //! |
| 188 | //! // Send a message into the channel. |
| 189 | //! s.send("foo" ).unwrap(); |
| 190 | //! |
| 191 | //! // This call would block because the channel is full. |
| 192 | //! // s.send("bar").unwrap(); |
| 193 | //! |
| 194 | //! // Receive the message. |
| 195 | //! assert_eq!(r.recv(), Ok("foo" )); |
| 196 | //! |
| 197 | //! // This call would block because the channel is empty. |
| 198 | //! // r.recv(); |
| 199 | //! |
| 200 | //! // Try receiving a message without blocking. |
| 201 | //! assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); |
| 202 | //! |
| 203 | //! // Disconnect the channel. |
| 204 | //! drop(s); |
| 205 | //! |
| 206 | //! // This call doesn't block because the channel is now disconnected. |
| 207 | //! assert_eq!(r.recv(), Err(RecvError)); |
| 208 | //! ``` |
| 209 | //! |
| 210 | //! # Iteration |
| 211 | //! |
| 212 | //! Receivers can be used as iterators. For example, method [`iter`] creates an iterator that |
| 213 | //! receives messages until the channel becomes empty and disconnected. Note that iteration may |
| 214 | //! block waiting for next message to arrive. |
| 215 | //! |
| 216 | //! ``` |
| 217 | //! use std::thread; |
| 218 | //! use crossbeam_channel::unbounded; |
| 219 | //! |
| 220 | //! let (s, r) = unbounded(); |
| 221 | //! |
| 222 | //! thread::spawn(move || { |
| 223 | //! s.send(1).unwrap(); |
| 224 | //! s.send(2).unwrap(); |
| 225 | //! s.send(3).unwrap(); |
| 226 | //! drop(s); // Disconnect the channel. |
| 227 | //! }); |
| 228 | //! |
| 229 | //! // Collect all messages from the channel. |
| 230 | //! // Note that the call to `collect` blocks until the sender is dropped. |
| 231 | //! let v: Vec<_> = r.iter().collect(); |
| 232 | //! |
| 233 | //! assert_eq!(v, [1, 2, 3]); |
| 234 | //! ``` |
| 235 | //! |
| 236 | //! A non-blocking iterator can be created using [`try_iter`], which receives all available |
| 237 | //! messages without blocking: |
| 238 | //! |
| 239 | //! ``` |
| 240 | //! use crossbeam_channel::unbounded; |
| 241 | //! |
| 242 | //! let (s, r) = unbounded(); |
| 243 | //! s.send(1).unwrap(); |
| 244 | //! s.send(2).unwrap(); |
| 245 | //! s.send(3).unwrap(); |
| 246 | //! // No need to drop the sender. |
| 247 | //! |
| 248 | //! // Receive all messages currently in the channel. |
| 249 | //! let v: Vec<_> = r.try_iter().collect(); |
| 250 | //! |
| 251 | //! assert_eq!(v, [1, 2, 3]); |
| 252 | //! ``` |
| 253 | //! |
| 254 | //! # Selection |
| 255 | //! |
| 256 | //! The [`select!`] macro allows you to define a set of channel operations, wait until any one of |
| 257 | //! them becomes ready, and finally execute it. If multiple operations are ready at the same time, |
| 258 | //! a random one among them is selected. |
| 259 | //! |
| 260 | //! It is also possible to define a `default` case that gets executed if none of the operations are |
| 261 | //! ready, either right away or for a certain duration of time. |
| 262 | //! |
| 263 | //! An operation is considered to be ready if it doesn't have to block. Note that it is ready even |
| 264 | //! when it will simply return an error because the channel is disconnected. |
| 265 | //! |
| 266 | //! An example of receiving a message from two channels: |
| 267 | //! |
| 268 | //! ``` |
| 269 | //! use std::thread; |
| 270 | //! use std::time::Duration; |
| 271 | //! use crossbeam_channel::{select, unbounded}; |
| 272 | //! |
| 273 | //! let (s1, r1) = unbounded(); |
| 274 | //! let (s2, r2) = unbounded(); |
| 275 | //! |
| 276 | //! thread::spawn(move || s1.send(10).unwrap()); |
| 277 | //! thread::spawn(move || s2.send(20).unwrap()); |
| 278 | //! |
| 279 | //! // At most one of these two receive operations will be executed. |
| 280 | //! select! { |
| 281 | //! recv(r1) -> msg => assert_eq!(msg, Ok(10)), |
| 282 | //! recv(r2) -> msg => assert_eq!(msg, Ok(20)), |
| 283 | //! default(Duration::from_secs(1)) => println!("timed out" ), |
| 284 | //! } |
| 285 | //! ``` |
| 286 | //! |
| 287 | //! If you need to select over a dynamically created list of channel operations, use [`Select`] |
| 288 | //! instead. The [`select!`] macro is just a convenience wrapper around [`Select`]. |
| 289 | //! |
| 290 | //! # Extra channels |
| 291 | //! |
| 292 | //! Three functions can create special kinds of channels, all of which return just a [`Receiver`] |
| 293 | //! handle: |
| 294 | //! |
| 295 | //! * [`after`] creates a channel that delivers a single message after a certain duration of time. |
| 296 | //! * [`tick`] creates a channel that delivers messages periodically. |
| 297 | //! * [`never`](never()) creates a channel that never delivers messages. |
| 298 | //! |
| 299 | //! These channels are very efficient because messages get lazily generated on receive operations. |
| 300 | //! |
| 301 | //! An example that prints elapsed time every 50 milliseconds for the duration of 1 second: |
| 302 | //! |
| 303 | //! ``` |
| 304 | //! use std::time::{Duration, Instant}; |
| 305 | //! use crossbeam_channel::{after, select, tick}; |
| 306 | //! |
| 307 | //! let start = Instant::now(); |
| 308 | //! let ticker = tick(Duration::from_millis(50)); |
| 309 | //! let timeout = after(Duration::from_secs(1)); |
| 310 | //! |
| 311 | //! loop { |
| 312 | //! select! { |
| 313 | //! recv(ticker) -> _ => println!("elapsed: {:?}" , start.elapsed()), |
| 314 | //! recv(timeout) -> _ => break, |
| 315 | //! } |
| 316 | //! } |
| 317 | //! ``` |
| 318 | //! |
| 319 | //! [`send`]: Sender::send |
| 320 | //! [`recv`]: Receiver::recv |
| 321 | //! [`iter`]: Receiver::iter |
| 322 | //! [`try_iter`]: Receiver::try_iter |
| 323 | |
| 324 | #![no_std ] |
| 325 | #![doc (test( |
| 326 | no_crate_inject, |
| 327 | attr( |
| 328 | deny(warnings, rust_2018_idioms), |
| 329 | allow(dead_code, unused_assignments, unused_variables) |
| 330 | ) |
| 331 | ))] |
| 332 | #![warn ( |
| 333 | missing_docs, |
| 334 | missing_debug_implementations, |
| 335 | rust_2018_idioms, |
| 336 | unreachable_pub |
| 337 | )] |
| 338 | |
| 339 | #[cfg (feature = "std" )] |
| 340 | extern crate std; |
| 341 | |
| 342 | #[cfg (feature = "std" )] |
| 343 | mod channel; |
| 344 | #[cfg (feature = "std" )] |
| 345 | mod context; |
| 346 | #[cfg (feature = "std" )] |
| 347 | mod counter; |
| 348 | #[cfg (feature = "std" )] |
| 349 | mod err; |
| 350 | #[cfg (feature = "std" )] |
| 351 | mod flavors; |
| 352 | #[cfg (feature = "std" )] |
| 353 | mod select; |
| 354 | #[cfg (feature = "std" )] |
| 355 | mod select_macro; |
| 356 | #[cfg (feature = "std" )] |
| 357 | mod utils; |
| 358 | #[cfg (feature = "std" )] |
| 359 | mod waker; |
| 360 | |
| 361 | /// Crate internals used by the `select!` macro. |
| 362 | #[doc (hidden)] |
| 363 | #[cfg (feature = "std" )] |
| 364 | pub mod internal { |
| 365 | pub use crate::select::{select, select_timeout, try_select, SelectHandle}; |
| 366 | } |
| 367 | |
| 368 | #[cfg (feature = "std" )] |
| 369 | pub use crate::{ |
| 370 | channel::{ |
| 371 | after, at, bounded, never, tick, unbounded, IntoIter, Iter, Receiver, Sender, TryIter, |
| 372 | }, |
| 373 | err::{ |
| 374 | ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError, |
| 375 | SendTimeoutError, TryReadyError, TryRecvError, TrySelectError, TrySendError, |
| 376 | }, |
| 377 | select::{Select, SelectedOperation}, |
| 378 | }; |
| 379 | |