1//! Multi-producer multi-consumer channels.
2
3// This module is not currently exposed publicly, but is used
4// as the implementation for the channels in `sync::mpsc`. The
5// implementation comes from the crossbeam-channel crate:
6//
7// Copyright (c) 2019 The Crossbeam Project Developers
8//
9// Permission is hereby granted, free of charge, to any
10// person obtaining a copy of this software and associated
11// documentation files (the "Software"), to deal in the
12// Software without restriction, including without
13// limitation the rights to use, copy, modify, merge,
14// publish, distribute, sublicense, and/or sell copies of
15// the Software, and to permit persons to whom the Software
16// is furnished to do so, subject to the following
17// conditions:
18//
19// The above copyright notice and this permission notice
20// shall be included in all copies or substantial portions
21// of the Software.
22//
23// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
24// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
25// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
26// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
27// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
28// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
29// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
30// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
31// DEALINGS IN THE SOFTWARE.
32
33mod array;
34mod context;
35mod counter;
36mod error;
37mod list;
38mod select;
39mod utils;
40mod waker;
41mod zero;
42
43use crate::fmt;
44use crate::panic::{RefUnwindSafe, UnwindSafe};
45use crate::time::{Duration, Instant};
46pub use error::*;
47
48/// Creates a channel of unbounded capacity.
49///
50/// This channel has a growable buffer that can hold any number of messages at a time.
51pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
52 let (s: Sender>, r: Receiver>) = counter::new(chan:list::Channel::new());
53 let s: Sender = Sender { flavor: SenderFlavor::List(s) };
54 let r: Receiver = Receiver { flavor: ReceiverFlavor::List(r) };
55 (s, r)
56}
57
58/// Creates a channel of bounded capacity.
59///
60/// This channel has a buffer that can hold at most `cap` messages at a time.
61///
62/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
63/// receive operations must appear at the same time in order to pair up and pass the message over.
64pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
65 if cap == 0 {
66 let (s: Sender>, r: Receiver>) = counter::new(chan:zero::Channel::new());
67 let s: Sender = Sender { flavor: SenderFlavor::Zero(s) };
68 let r: Receiver = Receiver { flavor: ReceiverFlavor::Zero(r) };
69 (s, r)
70 } else {
71 let (s: Sender>, r: Receiver>) = counter::new(chan:array::Channel::with_capacity(cap));
72 let s: Sender = Sender { flavor: SenderFlavor::Array(s) };
73 let r: Receiver = Receiver { flavor: ReceiverFlavor::Array(r) };
74 (s, r)
75 }
76}
77
78/// The sending side of a channel.
79pub struct Sender<T> {
80 flavor: SenderFlavor<T>,
81}
82
83/// Sender flavors.
84enum SenderFlavor<T> {
85 /// Bounded channel based on a preallocated array.
86 Array(counter::Sender<array::Channel<T>>),
87
88 /// Unbounded channel implemented as a linked list.
89 List(counter::Sender<list::Channel<T>>),
90
91 /// Zero-capacity channel.
92 Zero(counter::Sender<zero::Channel<T>>),
93}
94
95unsafe impl<T: Send> Send for Sender<T> {}
96unsafe impl<T: Send> Sync for Sender<T> {}
97
98impl<T> UnwindSafe for Sender<T> {}
99impl<T> RefUnwindSafe for Sender<T> {}
100
101impl<T> Sender<T> {
102 /// Attempts to send a message into the channel without blocking.
103 ///
104 /// This method will either send a message into the channel immediately or return an error if
105 /// the channel is full or disconnected. The returned error contains the original message.
106 ///
107 /// If called on a zero-capacity channel, this method will send the message only if there
108 /// happens to be a receive operation on the other side of the channel at the same time.
109 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
110 match &self.flavor {
111 SenderFlavor::Array(chan) => chan.try_send(msg),
112 SenderFlavor::List(chan) => chan.try_send(msg),
113 SenderFlavor::Zero(chan) => chan.try_send(msg),
114 }
115 }
116
117 /// Blocks the current thread until a message is sent or the channel is disconnected.
118 ///
119 /// If the channel is full and not disconnected, this call will block until the send operation
120 /// can proceed. If the channel becomes disconnected, this call will wake up and return an
121 /// error. The returned error contains the original message.
122 ///
123 /// If called on a zero-capacity channel, this method will wait for a receive operation to
124 /// appear on the other side of the channel.
125 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
126 match &self.flavor {
127 SenderFlavor::Array(chan) => chan.send(msg, None),
128 SenderFlavor::List(chan) => chan.send(msg, None),
129 SenderFlavor::Zero(chan) => chan.send(msg, None),
130 }
131 .map_err(|err| match err {
132 SendTimeoutError::Disconnected(msg) => SendError(msg),
133 SendTimeoutError::Timeout(_) => unreachable!(),
134 })
135 }
136}
137
138// The methods below are not used by `sync::mpsc`, but
139// are useful and we'll likely want to expose them
140// eventually
141#[allow(unused)]
142impl<T> Sender<T> {
143 /// Waits for a message to be sent into the channel, but only for a limited time.
144 ///
145 /// If the channel is full and not disconnected, this call will block until the send operation
146 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
147 /// wake up and return an error. The returned error contains the original message.
148 ///
149 /// If called on a zero-capacity channel, this method will wait for a receive operation to
150 /// appear on the other side of the channel.
151 pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
152 match Instant::now().checked_add(timeout) {
153 Some(deadline) => self.send_deadline(msg, deadline),
154 // So far in the future that it's practically the same as waiting indefinitely.
155 None => self.send(msg).map_err(SendTimeoutError::from),
156 }
157 }
158
159 /// Waits for a message to be sent into the channel, but only until a given deadline.
160 ///
161 /// If the channel is full and not disconnected, this call will block until the send operation
162 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
163 /// wake up and return an error. The returned error contains the original message.
164 ///
165 /// If called on a zero-capacity channel, this method will wait for a receive operation to
166 /// appear on the other side of the channel.
167 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
168 match &self.flavor {
169 SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
170 SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
171 SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
172 }
173 }
174
175 /// Returns `true` if the channel is empty.
176 ///
177 /// Note: Zero-capacity channels are always empty.
178 pub fn is_empty(&self) -> bool {
179 match &self.flavor {
180 SenderFlavor::Array(chan) => chan.is_empty(),
181 SenderFlavor::List(chan) => chan.is_empty(),
182 SenderFlavor::Zero(chan) => chan.is_empty(),
183 }
184 }
185
186 /// Returns `true` if the channel is full.
187 ///
188 /// Note: Zero-capacity channels are always full.
189 pub fn is_full(&self) -> bool {
190 match &self.flavor {
191 SenderFlavor::Array(chan) => chan.is_full(),
192 SenderFlavor::List(chan) => chan.is_full(),
193 SenderFlavor::Zero(chan) => chan.is_full(),
194 }
195 }
196
197 /// Returns the number of messages in the channel.
198 pub fn len(&self) -> usize {
199 match &self.flavor {
200 SenderFlavor::Array(chan) => chan.len(),
201 SenderFlavor::List(chan) => chan.len(),
202 SenderFlavor::Zero(chan) => chan.len(),
203 }
204 }
205
206 /// If the channel is bounded, returns its capacity.
207 pub fn capacity(&self) -> Option<usize> {
208 match &self.flavor {
209 SenderFlavor::Array(chan) => chan.capacity(),
210 SenderFlavor::List(chan) => chan.capacity(),
211 SenderFlavor::Zero(chan) => chan.capacity(),
212 }
213 }
214
215 /// Returns `true` if senders belong to the same channel.
216 pub fn same_channel(&self, other: &Sender<T>) -> bool {
217 match (&self.flavor, &other.flavor) {
218 (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
219 (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
220 (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
221 _ => false,
222 }
223 }
224}
225
226impl<T> Drop for Sender<T> {
227 fn drop(&mut self) {
228 unsafe {
229 match &self.flavor {
230 SenderFlavor::Array(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect_senders()),
231 SenderFlavor::List(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect_senders()),
232 SenderFlavor::Zero(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect()),
233 }
234 }
235 }
236}
237
238impl<T> Clone for Sender<T> {
239 fn clone(&self) -> Self {
240 let flavor: SenderFlavor = match &self.flavor {
241 SenderFlavor::Array(chan: &Sender>) => SenderFlavor::Array(chan.acquire()),
242 SenderFlavor::List(chan: &Sender>) => SenderFlavor::List(chan.acquire()),
243 SenderFlavor::Zero(chan: &Sender>) => SenderFlavor::Zero(chan.acquire()),
244 };
245
246 Sender { flavor }
247 }
248}
249
250impl<T> fmt::Debug for Sender<T> {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 f.pad("Sender { .. }")
253 }
254}
255
256/// The receiving side of a channel.
257pub struct Receiver<T> {
258 flavor: ReceiverFlavor<T>,
259}
260
261/// Receiver flavors.
262enum ReceiverFlavor<T> {
263 /// Bounded channel based on a preallocated array.
264 Array(counter::Receiver<array::Channel<T>>),
265
266 /// Unbounded channel implemented as a linked list.
267 List(counter::Receiver<list::Channel<T>>),
268
269 /// Zero-capacity channel.
270 Zero(counter::Receiver<zero::Channel<T>>),
271}
272
273unsafe impl<T: Send> Send for Receiver<T> {}
274unsafe impl<T: Send> Sync for Receiver<T> {}
275
276impl<T> UnwindSafe for Receiver<T> {}
277impl<T> RefUnwindSafe for Receiver<T> {}
278
279impl<T> Receiver<T> {
280 /// Attempts to receive a message from the channel without blocking.
281 ///
282 /// This method will either receive a message from the channel immediately or return an error
283 /// if the channel is empty.
284 ///
285 /// If called on a zero-capacity channel, this method will receive a message only if there
286 /// happens to be a send operation on the other side of the channel at the same time.
287 pub fn try_recv(&self) -> Result<T, TryRecvError> {
288 match &self.flavor {
289 ReceiverFlavor::Array(chan) => chan.try_recv(),
290 ReceiverFlavor::List(chan) => chan.try_recv(),
291 ReceiverFlavor::Zero(chan) => chan.try_recv(),
292 }
293 }
294
295 /// Blocks the current thread until a message is received or the channel is empty and
296 /// disconnected.
297 ///
298 /// If the channel is empty and not disconnected, this call will block until the receive
299 /// operation can proceed. If the channel is empty and becomes disconnected, this call will
300 /// wake up and return an error.
301 ///
302 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
303 /// on the other side of the channel.
304 pub fn recv(&self) -> Result<T, RecvError> {
305 match &self.flavor {
306 ReceiverFlavor::Array(chan) => chan.recv(None),
307 ReceiverFlavor::List(chan) => chan.recv(None),
308 ReceiverFlavor::Zero(chan) => chan.recv(None),
309 }
310 .map_err(|_| RecvError)
311 }
312
313 /// Waits for a message to be received from the channel, but only for a limited time.
314 ///
315 /// If the channel is empty and not disconnected, this call will block until the receive
316 /// operation can proceed or the operation times out. If the channel is empty and becomes
317 /// disconnected, this call will wake up and return an error.
318 ///
319 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
320 /// on the other side of the channel.
321 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
322 match Instant::now().checked_add(timeout) {
323 Some(deadline) => self.recv_deadline(deadline),
324 // So far in the future that it's practically the same as waiting indefinitely.
325 None => self.recv().map_err(RecvTimeoutError::from),
326 }
327 }
328
329 /// Waits for a message to be received from the channel, but only for a limited time.
330 ///
331 /// If the channel is empty and not disconnected, this call will block until the receive
332 /// operation can proceed or the operation times out. If the channel is empty and becomes
333 /// disconnected, this call will wake up and return an error.
334 ///
335 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
336 /// on the other side of the channel.
337 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
338 match &self.flavor {
339 ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
340 ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
341 ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
342 }
343 }
344}
345
346// The methods below are not used by `sync::mpsc`, but
347// are useful and we'll likely want to expose them
348// eventually
349#[allow(unused)]
350impl<T> Receiver<T> {
351 /// Returns `true` if the channel is empty.
352 ///
353 /// Note: Zero-capacity channels are always empty.
354 pub fn is_empty(&self) -> bool {
355 match &self.flavor {
356 ReceiverFlavor::Array(chan) => chan.is_empty(),
357 ReceiverFlavor::List(chan) => chan.is_empty(),
358 ReceiverFlavor::Zero(chan) => chan.is_empty(),
359 }
360 }
361
362 /// Returns `true` if the channel is full.
363 ///
364 /// Note: Zero-capacity channels are always full.
365 pub fn is_full(&self) -> bool {
366 match &self.flavor {
367 ReceiverFlavor::Array(chan) => chan.is_full(),
368 ReceiverFlavor::List(chan) => chan.is_full(),
369 ReceiverFlavor::Zero(chan) => chan.is_full(),
370 }
371 }
372
373 /// Returns the number of messages in the channel.
374 pub fn len(&self) -> usize {
375 match &self.flavor {
376 ReceiverFlavor::Array(chan) => chan.len(),
377 ReceiverFlavor::List(chan) => chan.len(),
378 ReceiverFlavor::Zero(chan) => chan.len(),
379 }
380 }
381
382 /// If the channel is bounded, returns its capacity.
383 pub fn capacity(&self) -> Option<usize> {
384 match &self.flavor {
385 ReceiverFlavor::Array(chan) => chan.capacity(),
386 ReceiverFlavor::List(chan) => chan.capacity(),
387 ReceiverFlavor::Zero(chan) => chan.capacity(),
388 }
389 }
390
391 /// Returns `true` if receivers belong to the same channel.
392 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
393 match (&self.flavor, &other.flavor) {
394 (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
395 (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
396 (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
397 _ => false,
398 }
399 }
400}
401
402impl<T> Drop for Receiver<T> {
403 fn drop(&mut self) {
404 unsafe {
405 match &self.flavor {
406 ReceiverFlavor::Array(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect_receivers()),
407 ReceiverFlavor::List(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect_receivers()),
408 ReceiverFlavor::Zero(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect()),
409 }
410 }
411 }
412}
413
414impl<T> Clone for Receiver<T> {
415 fn clone(&self) -> Self {
416 let flavor: ReceiverFlavor = match &self.flavor {
417 ReceiverFlavor::Array(chan: &Receiver>) => ReceiverFlavor::Array(chan.acquire()),
418 ReceiverFlavor::List(chan: &Receiver>) => ReceiverFlavor::List(chan.acquire()),
419 ReceiverFlavor::Zero(chan: &Receiver>) => ReceiverFlavor::Zero(chan.acquire()),
420 };
421
422 Receiver { flavor }
423 }
424}
425
426impl<T> fmt::Debug for Receiver<T> {
427 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
428 f.pad("Receiver { .. }")
429 }
430}
431