1 | //! Multi-producer multi-consumer channels for message passing. |
2 | //! |
3 | //! This crate is an alternative to [`std::sync::mpsc`] with more features and better performance. |
4 | //! |
5 | //! # Hello, world! |
6 | //! |
7 | //! ``` |
8 | //! use crossbeam_channel::unbounded; |
9 | //! |
10 | //! // Create a channel of unbounded capacity. |
11 | //! let (s, r) = unbounded(); |
12 | //! |
13 | //! // Send a message into the channel. |
14 | //! s.send("Hello, world!" ).unwrap(); |
15 | //! |
16 | //! // Receive the message from the channel. |
17 | //! assert_eq!(r.recv(), Ok("Hello, world!" )); |
18 | //! ``` |
19 | //! |
20 | //! # Channel types |
21 | //! |
22 | //! Channels can be created using two functions: |
23 | //! |
24 | //! * [`bounded`] creates a channel of bounded capacity, i.e. there is a limit to how many messages |
25 | //! it can hold at a time. |
26 | //! |
27 | //! * [`unbounded`] creates a channel of unbounded capacity, i.e. it can hold any number of |
28 | //! messages at a time. |
29 | //! |
30 | //! Both functions return a [`Sender`] and a [`Receiver`], which represent the two opposite sides |
31 | //! of a channel. |
32 | //! |
33 | //! Creating a bounded channel: |
34 | //! |
35 | //! ``` |
36 | //! use crossbeam_channel::bounded; |
37 | //! |
38 | //! // Create a channel that can hold at most 5 messages at a time. |
39 | //! let (s, r) = bounded(5); |
40 | //! |
41 | //! // Can send only 5 messages without blocking. |
42 | //! for i in 0..5 { |
43 | //! s.send(i).unwrap(); |
44 | //! } |
45 | //! |
46 | //! // Another call to `send` would block because the channel is full. |
47 | //! // s.send(5).unwrap(); |
48 | //! ``` |
49 | //! |
50 | //! Creating an unbounded channel: |
51 | //! |
52 | //! ``` |
53 | //! use crossbeam_channel::unbounded; |
54 | //! |
55 | //! // Create an unbounded channel. |
56 | //! let (s, r) = unbounded(); |
57 | //! |
58 | //! // Can send any number of messages into the channel without blocking. |
59 | //! for i in 0..1000 { |
60 | //! s.send(i).unwrap(); |
61 | //! } |
62 | //! ``` |
63 | //! |
64 | //! A special case is zero-capacity channel, which cannot hold any messages. Instead, send and |
65 | //! receive operations must appear at the same time in order to pair up and pass the message over: |
66 | //! |
67 | //! ``` |
68 | //! use std::thread; |
69 | //! use crossbeam_channel::bounded; |
70 | //! |
71 | //! // Create a zero-capacity channel. |
72 | //! let (s, r) = bounded(0); |
73 | //! |
74 | //! // Sending blocks until a receive operation appears on the other side. |
75 | //! thread::spawn(move || s.send("Hi!" ).unwrap()); |
76 | //! |
77 | //! // Receiving blocks until a send operation appears on the other side. |
78 | //! assert_eq!(r.recv(), Ok("Hi!" )); |
79 | //! ``` |
80 | //! |
81 | //! # Sharing channels |
82 | //! |
83 | //! Senders and receivers can be cloned and sent to other threads: |
84 | //! |
85 | //! ``` |
86 | //! use std::thread; |
87 | //! use crossbeam_channel::bounded; |
88 | //! |
89 | //! let (s1, r1) = bounded(0); |
90 | //! let (s2, r2) = (s1.clone(), r1.clone()); |
91 | //! |
92 | //! // Spawn a thread that receives a message and then sends one. |
93 | //! thread::spawn(move || { |
94 | //! r2.recv().unwrap(); |
95 | //! s2.send(2).unwrap(); |
96 | //! }); |
97 | //! |
98 | //! // Send a message and then receive one. |
99 | //! s1.send(1).unwrap(); |
100 | //! r1.recv().unwrap(); |
101 | //! ``` |
102 | //! |
103 | //! Note that cloning only creates a new handle to the same sending or receiving side. It does not |
104 | //! create a separate stream of messages in any way: |
105 | //! |
106 | //! ``` |
107 | //! use crossbeam_channel::unbounded; |
108 | //! |
109 | //! let (s1, r1) = unbounded(); |
110 | //! let (s2, r2) = (s1.clone(), r1.clone()); |
111 | //! let (s3, r3) = (s2.clone(), r2.clone()); |
112 | //! |
113 | //! s1.send(10).unwrap(); |
114 | //! s2.send(20).unwrap(); |
115 | //! s3.send(30).unwrap(); |
116 | //! |
117 | //! assert_eq!(r3.recv(), Ok(10)); |
118 | //! assert_eq!(r1.recv(), Ok(20)); |
119 | //! assert_eq!(r2.recv(), Ok(30)); |
120 | //! ``` |
121 | //! |
122 | //! It's also possible to share senders and receivers by reference: |
123 | //! |
124 | //! ``` |
125 | //! use crossbeam_channel::bounded; |
126 | //! use crossbeam_utils::thread::scope; |
127 | //! |
128 | //! let (s, r) = bounded(0); |
129 | //! |
130 | //! scope(|scope| { |
131 | //! // Spawn a thread that receives a message and then sends one. |
132 | //! scope.spawn(|_| { |
133 | //! r.recv().unwrap(); |
134 | //! s.send(2).unwrap(); |
135 | //! }); |
136 | //! |
137 | //! // Send a message and then receive one. |
138 | //! s.send(1).unwrap(); |
139 | //! r.recv().unwrap(); |
140 | //! }).unwrap(); |
141 | //! ``` |
142 | //! |
143 | //! # Disconnection |
144 | //! |
145 | //! When all senders or all receivers associated with a channel get dropped, the channel becomes |
146 | //! disconnected. No more messages can be sent, but any remaining messages can still be received. |
147 | //! Send and receive operations on a disconnected channel never block. |
148 | //! |
149 | //! ``` |
150 | //! use crossbeam_channel::{unbounded, RecvError}; |
151 | //! |
152 | //! let (s, r) = unbounded(); |
153 | //! s.send(1).unwrap(); |
154 | //! s.send(2).unwrap(); |
155 | //! s.send(3).unwrap(); |
156 | //! |
157 | //! // The only sender is dropped, disconnecting the channel. |
158 | //! drop(s); |
159 | //! |
160 | //! // The remaining messages can be received. |
161 | //! assert_eq!(r.recv(), Ok(1)); |
162 | //! assert_eq!(r.recv(), Ok(2)); |
163 | //! assert_eq!(r.recv(), Ok(3)); |
164 | //! |
165 | //! // There are no more messages in the channel. |
166 | //! assert!(r.is_empty()); |
167 | //! |
168 | //! // Note that calling `r.recv()` does not block. |
169 | //! // Instead, `Err(RecvError)` is returned immediately. |
170 | //! assert_eq!(r.recv(), Err(RecvError)); |
171 | //! ``` |
172 | //! |
173 | //! # Blocking operations |
174 | //! |
175 | //! Send and receive operations come in three flavors: |
176 | //! |
177 | //! * Non-blocking (returns immediately with success or failure). |
178 | //! * Blocking (waits until the operation succeeds or the channel becomes disconnected). |
179 | //! * Blocking with a timeout (blocks only for a certain duration of time). |
180 | //! |
181 | //! A simple example showing the difference between non-blocking and blocking operations: |
182 | //! |
183 | //! ``` |
184 | //! use crossbeam_channel::{bounded, RecvError, TryRecvError}; |
185 | //! |
186 | //! let (s, r) = bounded(1); |
187 | //! |
188 | //! // Send a message into the channel. |
189 | //! s.send("foo" ).unwrap(); |
190 | //! |
191 | //! // This call would block because the channel is full. |
192 | //! // s.send("bar").unwrap(); |
193 | //! |
194 | //! // Receive the message. |
195 | //! assert_eq!(r.recv(), Ok("foo" )); |
196 | //! |
197 | //! // This call would block because the channel is empty. |
198 | //! // r.recv(); |
199 | //! |
200 | //! // Try receiving a message without blocking. |
201 | //! assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); |
202 | //! |
203 | //! // Disconnect the channel. |
204 | //! drop(s); |
205 | //! |
206 | //! // This call doesn't block because the channel is now disconnected. |
207 | //! assert_eq!(r.recv(), Err(RecvError)); |
208 | //! ``` |
209 | //! |
210 | //! # Iteration |
211 | //! |
212 | //! Receivers can be used as iterators. For example, method [`iter`] creates an iterator that |
213 | //! receives messages until the channel becomes empty and disconnected. Note that iteration may |
214 | //! block waiting for next message to arrive. |
215 | //! |
216 | //! ``` |
217 | //! use std::thread; |
218 | //! use crossbeam_channel::unbounded; |
219 | //! |
220 | //! let (s, r) = unbounded(); |
221 | //! |
222 | //! thread::spawn(move || { |
223 | //! s.send(1).unwrap(); |
224 | //! s.send(2).unwrap(); |
225 | //! s.send(3).unwrap(); |
226 | //! drop(s); // Disconnect the channel. |
227 | //! }); |
228 | //! |
229 | //! // Collect all messages from the channel. |
230 | //! // Note that the call to `collect` blocks until the sender is dropped. |
231 | //! let v: Vec<_> = r.iter().collect(); |
232 | //! |
233 | //! assert_eq!(v, [1, 2, 3]); |
234 | //! ``` |
235 | //! |
236 | //! A non-blocking iterator can be created using [`try_iter`], which receives all available |
237 | //! messages without blocking: |
238 | //! |
239 | //! ``` |
240 | //! use crossbeam_channel::unbounded; |
241 | //! |
242 | //! let (s, r) = unbounded(); |
243 | //! s.send(1).unwrap(); |
244 | //! s.send(2).unwrap(); |
245 | //! s.send(3).unwrap(); |
246 | //! // No need to drop the sender. |
247 | //! |
248 | //! // Receive all messages currently in the channel. |
249 | //! let v: Vec<_> = r.try_iter().collect(); |
250 | //! |
251 | //! assert_eq!(v, [1, 2, 3]); |
252 | //! ``` |
253 | //! |
254 | //! # Selection |
255 | //! |
256 | //! The [`select!`] macro allows you to define a set of channel operations, wait until any one of |
257 | //! them becomes ready, and finally execute it. If multiple operations are ready at the same time, |
258 | //! a random one among them is selected. |
259 | //! |
260 | //! It is also possible to define a `default` case that gets executed if none of the operations are |
261 | //! ready, either right away or for a certain duration of time. |
262 | //! |
263 | //! An operation is considered to be ready if it doesn't have to block. Note that it is ready even |
264 | //! when it will simply return an error because the channel is disconnected. |
265 | //! |
266 | //! An example of receiving a message from two channels: |
267 | //! |
268 | //! ``` |
269 | //! use std::thread; |
270 | //! use std::time::Duration; |
271 | //! use crossbeam_channel::{select, unbounded}; |
272 | //! |
273 | //! let (s1, r1) = unbounded(); |
274 | //! let (s2, r2) = unbounded(); |
275 | //! |
276 | //! thread::spawn(move || s1.send(10).unwrap()); |
277 | //! thread::spawn(move || s2.send(20).unwrap()); |
278 | //! |
279 | //! // At most one of these two receive operations will be executed. |
280 | //! select! { |
281 | //! recv(r1) -> msg => assert_eq!(msg, Ok(10)), |
282 | //! recv(r2) -> msg => assert_eq!(msg, Ok(20)), |
283 | //! default(Duration::from_secs(1)) => println!("timed out" ), |
284 | //! } |
285 | //! ``` |
286 | //! |
287 | //! If you need to select over a dynamically created list of channel operations, use [`Select`] |
288 | //! instead. The [`select!`] macro is just a convenience wrapper around [`Select`]. |
289 | //! |
290 | //! # Extra channels |
291 | //! |
292 | //! Three functions can create special kinds of channels, all of which return just a [`Receiver`] |
293 | //! handle: |
294 | //! |
295 | //! * [`after`] creates a channel that delivers a single message after a certain duration of time. |
296 | //! * [`tick`] creates a channel that delivers messages periodically. |
297 | //! * [`never`](never()) creates a channel that never delivers messages. |
298 | //! |
299 | //! These channels are very efficient because messages get lazily generated on receive operations. |
300 | //! |
301 | //! An example that prints elapsed time every 50 milliseconds for the duration of 1 second: |
302 | //! |
303 | //! ``` |
304 | //! use std::time::{Duration, Instant}; |
305 | //! use crossbeam_channel::{after, select, tick}; |
306 | //! |
307 | //! let start = Instant::now(); |
308 | //! let ticker = tick(Duration::from_millis(50)); |
309 | //! let timeout = after(Duration::from_secs(1)); |
310 | //! |
311 | //! loop { |
312 | //! select! { |
313 | //! recv(ticker) -> _ => println!("elapsed: {:?}" , start.elapsed()), |
314 | //! recv(timeout) -> _ => break, |
315 | //! } |
316 | //! } |
317 | //! ``` |
318 | //! |
319 | //! [`send`]: Sender::send |
320 | //! [`recv`]: Receiver::recv |
321 | //! [`iter`]: Receiver::iter |
322 | //! [`try_iter`]: Receiver::try_iter |
323 | |
324 | #![no_std ] |
325 | #![doc (test( |
326 | no_crate_inject, |
327 | attr( |
328 | deny(warnings, rust_2018_idioms), |
329 | allow(dead_code, unused_assignments, unused_variables) |
330 | ) |
331 | ))] |
332 | #![warn ( |
333 | missing_docs, |
334 | missing_debug_implementations, |
335 | rust_2018_idioms, |
336 | unreachable_pub |
337 | )] |
338 | |
339 | #[cfg (feature = "std" )] |
340 | extern crate std; |
341 | |
342 | #[cfg (feature = "std" )] |
343 | mod channel; |
344 | #[cfg (feature = "std" )] |
345 | mod context; |
346 | #[cfg (feature = "std" )] |
347 | mod counter; |
348 | #[cfg (feature = "std" )] |
349 | mod err; |
350 | #[cfg (feature = "std" )] |
351 | mod flavors; |
352 | #[cfg (feature = "std" )] |
353 | mod select; |
354 | #[cfg (feature = "std" )] |
355 | mod select_macro; |
356 | #[cfg (feature = "std" )] |
357 | mod utils; |
358 | #[cfg (feature = "std" )] |
359 | mod waker; |
360 | |
361 | /// Crate internals used by the `select!` macro. |
362 | #[doc (hidden)] |
363 | #[cfg (feature = "std" )] |
364 | pub mod internal { |
365 | pub use crate::select::{select, select_timeout, try_select, SelectHandle}; |
366 | } |
367 | |
368 | #[cfg (feature = "std" )] |
369 | pub use crate::{ |
370 | channel::{ |
371 | after, at, bounded, never, tick, unbounded, IntoIter, Iter, Receiver, Sender, TryIter, |
372 | }, |
373 | err::{ |
374 | ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError, |
375 | SendTimeoutError, TryReadyError, TryRecvError, TrySelectError, TrySendError, |
376 | }, |
377 | select::{Select, SelectedOperation}, |
378 | }; |
379 | |