1 | //! Multi-producer multi-consumer channels. |
2 | |
3 | // This module is not currently exposed publicly, but is used |
4 | // as the implementation for the channels in `sync::mpsc`. The |
5 | // implementation comes from the crossbeam-channel crate: |
6 | // |
7 | // Copyright (c) 2019 The Crossbeam Project Developers |
8 | // |
9 | // Permission is hereby granted, free of charge, to any |
10 | // person obtaining a copy of this software and associated |
11 | // documentation files (the "Software"), to deal in the |
12 | // Software without restriction, including without |
13 | // limitation the rights to use, copy, modify, merge, |
14 | // publish, distribute, sublicense, and/or sell copies of |
15 | // the Software, and to permit persons to whom the Software |
16 | // is furnished to do so, subject to the following |
17 | // conditions: |
18 | // |
19 | // The above copyright notice and this permission notice |
20 | // shall be included in all copies or substantial portions |
21 | // of the Software. |
22 | // |
23 | // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF |
24 | // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED |
25 | // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A |
26 | // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
27 | // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY |
28 | // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |
29 | // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR |
30 | // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER |
31 | // DEALINGS IN THE SOFTWARE. |
32 | |
33 | mod array; |
34 | mod context; |
35 | mod counter; |
36 | mod error; |
37 | mod list; |
38 | mod select; |
39 | mod utils; |
40 | mod waker; |
41 | mod zero; |
42 | |
43 | use crate::fmt; |
44 | use crate::panic::{RefUnwindSafe, UnwindSafe}; |
45 | use crate::time::{Duration, Instant}; |
46 | pub use error::*; |
47 | |
48 | /// Creates a channel of unbounded capacity. |
49 | /// |
50 | /// This channel has a growable buffer that can hold any number of messages at a time. |
51 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
52 | let (s: Sender>, r: Receiver>) = counter::new(chan:list::Channel::new()); |
53 | let s: Sender = Sender { flavor: SenderFlavor::List(s) }; |
54 | let r: Receiver = Receiver { flavor: ReceiverFlavor::List(r) }; |
55 | (s, r) |
56 | } |
57 | |
58 | /// Creates a channel of bounded capacity. |
59 | /// |
60 | /// This channel has a buffer that can hold at most `cap` messages at a time. |
61 | /// |
62 | /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and |
63 | /// receive operations must appear at the same time in order to pair up and pass the message over. |
64 | pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) { |
65 | if cap == 0 { |
66 | let (s: Sender>, r: Receiver>) = counter::new(chan:zero::Channel::new()); |
67 | let s: Sender = Sender { flavor: SenderFlavor::Zero(s) }; |
68 | let r: Receiver = Receiver { flavor: ReceiverFlavor::Zero(r) }; |
69 | (s, r) |
70 | } else { |
71 | let (s: Sender>, r: Receiver>) = counter::new(chan:array::Channel::with_capacity(cap)); |
72 | let s: Sender = Sender { flavor: SenderFlavor::Array(s) }; |
73 | let r: Receiver = Receiver { flavor: ReceiverFlavor::Array(r) }; |
74 | (s, r) |
75 | } |
76 | } |
77 | |
78 | /// The sending side of a channel. |
79 | pub struct Sender<T> { |
80 | flavor: SenderFlavor<T>, |
81 | } |
82 | |
83 | /// Sender flavors. |
84 | enum SenderFlavor<T> { |
85 | /// Bounded channel based on a preallocated array. |
86 | Array(counter::Sender<array::Channel<T>>), |
87 | |
88 | /// Unbounded channel implemented as a linked list. |
89 | List(counter::Sender<list::Channel<T>>), |
90 | |
91 | /// Zero-capacity channel. |
92 | Zero(counter::Sender<zero::Channel<T>>), |
93 | } |
94 | |
95 | unsafe impl<T: Send> Send for Sender<T> {} |
96 | unsafe impl<T: Send> Sync for Sender<T> {} |
97 | |
98 | impl<T> UnwindSafe for Sender<T> {} |
99 | impl<T> RefUnwindSafe for Sender<T> {} |
100 | |
101 | impl<T> Sender<T> { |
102 | /// Attempts to send a message into the channel without blocking. |
103 | /// |
104 | /// This method will either send a message into the channel immediately or return an error if |
105 | /// the channel is full or disconnected. The returned error contains the original message. |
106 | /// |
107 | /// If called on a zero-capacity channel, this method will send the message only if there |
108 | /// happens to be a receive operation on the other side of the channel at the same time. |
109 | pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
110 | match &self.flavor { |
111 | SenderFlavor::Array(chan) => chan.try_send(msg), |
112 | SenderFlavor::List(chan) => chan.try_send(msg), |
113 | SenderFlavor::Zero(chan) => chan.try_send(msg), |
114 | } |
115 | } |
116 | |
117 | /// Blocks the current thread until a message is sent or the channel is disconnected. |
118 | /// |
119 | /// If the channel is full and not disconnected, this call will block until the send operation |
120 | /// can proceed. If the channel becomes disconnected, this call will wake up and return an |
121 | /// error. The returned error contains the original message. |
122 | /// |
123 | /// If called on a zero-capacity channel, this method will wait for a receive operation to |
124 | /// appear on the other side of the channel. |
125 | pub fn send(&self, msg: T) -> Result<(), SendError<T>> { |
126 | match &self.flavor { |
127 | SenderFlavor::Array(chan) => chan.send(msg, None), |
128 | SenderFlavor::List(chan) => chan.send(msg, None), |
129 | SenderFlavor::Zero(chan) => chan.send(msg, None), |
130 | } |
131 | .map_err(|err| match err { |
132 | SendTimeoutError::Disconnected(msg) => SendError(msg), |
133 | SendTimeoutError::Timeout(_) => unreachable!(), |
134 | }) |
135 | } |
136 | } |
137 | |
138 | // The methods below are not used by `sync::mpsc`, but |
139 | // are useful and we'll likely want to expose them |
140 | // eventually |
141 | #[allow (unused)] |
142 | impl<T> Sender<T> { |
143 | /// Waits for a message to be sent into the channel, but only for a limited time. |
144 | /// |
145 | /// If the channel is full and not disconnected, this call will block until the send operation |
146 | /// can proceed or the operation times out. If the channel becomes disconnected, this call will |
147 | /// wake up and return an error. The returned error contains the original message. |
148 | /// |
149 | /// If called on a zero-capacity channel, this method will wait for a receive operation to |
150 | /// appear on the other side of the channel. |
151 | pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> { |
152 | match Instant::now().checked_add(timeout) { |
153 | Some(deadline) => self.send_deadline(msg, deadline), |
154 | // So far in the future that it's practically the same as waiting indefinitely. |
155 | None => self.send(msg).map_err(SendTimeoutError::from), |
156 | } |
157 | } |
158 | |
159 | /// Waits for a message to be sent into the channel, but only until a given deadline. |
160 | /// |
161 | /// If the channel is full and not disconnected, this call will block until the send operation |
162 | /// can proceed or the operation times out. If the channel becomes disconnected, this call will |
163 | /// wake up and return an error. The returned error contains the original message. |
164 | /// |
165 | /// If called on a zero-capacity channel, this method will wait for a receive operation to |
166 | /// appear on the other side of the channel. |
167 | pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> { |
168 | match &self.flavor { |
169 | SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), |
170 | SenderFlavor::List(chan) => chan.send(msg, Some(deadline)), |
171 | SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)), |
172 | } |
173 | } |
174 | |
175 | /// Returns `true` if the channel is empty. |
176 | /// |
177 | /// Note: Zero-capacity channels are always empty. |
178 | pub fn is_empty(&self) -> bool { |
179 | match &self.flavor { |
180 | SenderFlavor::Array(chan) => chan.is_empty(), |
181 | SenderFlavor::List(chan) => chan.is_empty(), |
182 | SenderFlavor::Zero(chan) => chan.is_empty(), |
183 | } |
184 | } |
185 | |
186 | /// Returns `true` if the channel is full. |
187 | /// |
188 | /// Note: Zero-capacity channels are always full. |
189 | pub fn is_full(&self) -> bool { |
190 | match &self.flavor { |
191 | SenderFlavor::Array(chan) => chan.is_full(), |
192 | SenderFlavor::List(chan) => chan.is_full(), |
193 | SenderFlavor::Zero(chan) => chan.is_full(), |
194 | } |
195 | } |
196 | |
197 | /// Returns the number of messages in the channel. |
198 | pub fn len(&self) -> usize { |
199 | match &self.flavor { |
200 | SenderFlavor::Array(chan) => chan.len(), |
201 | SenderFlavor::List(chan) => chan.len(), |
202 | SenderFlavor::Zero(chan) => chan.len(), |
203 | } |
204 | } |
205 | |
206 | /// If the channel is bounded, returns its capacity. |
207 | pub fn capacity(&self) -> Option<usize> { |
208 | match &self.flavor { |
209 | SenderFlavor::Array(chan) => chan.capacity(), |
210 | SenderFlavor::List(chan) => chan.capacity(), |
211 | SenderFlavor::Zero(chan) => chan.capacity(), |
212 | } |
213 | } |
214 | |
215 | /// Returns `true` if senders belong to the same channel. |
216 | pub fn same_channel(&self, other: &Sender<T>) -> bool { |
217 | match (&self.flavor, &other.flavor) { |
218 | (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b, |
219 | (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b, |
220 | (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b, |
221 | _ => false, |
222 | } |
223 | } |
224 | } |
225 | |
226 | impl<T> Drop for Sender<T> { |
227 | fn drop(&mut self) { |
228 | unsafe { |
229 | match &self.flavor { |
230 | SenderFlavor::Array(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect_senders()), |
231 | SenderFlavor::List(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect_senders()), |
232 | SenderFlavor::Zero(chan: &Sender>) => chan.release(|c: &Channel| c.disconnect()), |
233 | } |
234 | } |
235 | } |
236 | } |
237 | |
238 | impl<T> Clone for Sender<T> { |
239 | fn clone(&self) -> Self { |
240 | let flavor: SenderFlavor = match &self.flavor { |
241 | SenderFlavor::Array(chan: &Sender>) => SenderFlavor::Array(chan.acquire()), |
242 | SenderFlavor::List(chan: &Sender>) => SenderFlavor::List(chan.acquire()), |
243 | SenderFlavor::Zero(chan: &Sender>) => SenderFlavor::Zero(chan.acquire()), |
244 | }; |
245 | |
246 | Sender { flavor } |
247 | } |
248 | } |
249 | |
250 | impl<T> fmt::Debug for Sender<T> { |
251 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
252 | f.pad("Sender { .. }" ) |
253 | } |
254 | } |
255 | |
256 | /// The receiving side of a channel. |
257 | pub struct Receiver<T> { |
258 | flavor: ReceiverFlavor<T>, |
259 | } |
260 | |
261 | /// Receiver flavors. |
262 | enum ReceiverFlavor<T> { |
263 | /// Bounded channel based on a preallocated array. |
264 | Array(counter::Receiver<array::Channel<T>>), |
265 | |
266 | /// Unbounded channel implemented as a linked list. |
267 | List(counter::Receiver<list::Channel<T>>), |
268 | |
269 | /// Zero-capacity channel. |
270 | Zero(counter::Receiver<zero::Channel<T>>), |
271 | } |
272 | |
273 | unsafe impl<T: Send> Send for Receiver<T> {} |
274 | unsafe impl<T: Send> Sync for Receiver<T> {} |
275 | |
276 | impl<T> UnwindSafe for Receiver<T> {} |
277 | impl<T> RefUnwindSafe for Receiver<T> {} |
278 | |
279 | impl<T> Receiver<T> { |
280 | /// Attempts to receive a message from the channel without blocking. |
281 | /// |
282 | /// This method will either receive a message from the channel immediately or return an error |
283 | /// if the channel is empty. |
284 | /// |
285 | /// If called on a zero-capacity channel, this method will receive a message only if there |
286 | /// happens to be a send operation on the other side of the channel at the same time. |
287 | pub fn try_recv(&self) -> Result<T, TryRecvError> { |
288 | match &self.flavor { |
289 | ReceiverFlavor::Array(chan) => chan.try_recv(), |
290 | ReceiverFlavor::List(chan) => chan.try_recv(), |
291 | ReceiverFlavor::Zero(chan) => chan.try_recv(), |
292 | } |
293 | } |
294 | |
295 | /// Blocks the current thread until a message is received or the channel is empty and |
296 | /// disconnected. |
297 | /// |
298 | /// If the channel is empty and not disconnected, this call will block until the receive |
299 | /// operation can proceed. If the channel is empty and becomes disconnected, this call will |
300 | /// wake up and return an error. |
301 | /// |
302 | /// If called on a zero-capacity channel, this method will wait for a send operation to appear |
303 | /// on the other side of the channel. |
304 | pub fn recv(&self) -> Result<T, RecvError> { |
305 | match &self.flavor { |
306 | ReceiverFlavor::Array(chan) => chan.recv(None), |
307 | ReceiverFlavor::List(chan) => chan.recv(None), |
308 | ReceiverFlavor::Zero(chan) => chan.recv(None), |
309 | } |
310 | .map_err(|_| RecvError) |
311 | } |
312 | |
313 | /// Waits for a message to be received from the channel, but only for a limited time. |
314 | /// |
315 | /// If the channel is empty and not disconnected, this call will block until the receive |
316 | /// operation can proceed or the operation times out. If the channel is empty and becomes |
317 | /// disconnected, this call will wake up and return an error. |
318 | /// |
319 | /// If called on a zero-capacity channel, this method will wait for a send operation to appear |
320 | /// on the other side of the channel. |
321 | pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { |
322 | match Instant::now().checked_add(timeout) { |
323 | Some(deadline) => self.recv_deadline(deadline), |
324 | // So far in the future that it's practically the same as waiting indefinitely. |
325 | None => self.recv().map_err(RecvTimeoutError::from), |
326 | } |
327 | } |
328 | |
329 | /// Waits for a message to be received from the channel, but only for a limited time. |
330 | /// |
331 | /// If the channel is empty and not disconnected, this call will block until the receive |
332 | /// operation can proceed or the operation times out. If the channel is empty and becomes |
333 | /// disconnected, this call will wake up and return an error. |
334 | /// |
335 | /// If called on a zero-capacity channel, this method will wait for a send operation to appear |
336 | /// on the other side of the channel. |
337 | pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { |
338 | match &self.flavor { |
339 | ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)), |
340 | ReceiverFlavor::List(chan) => chan.recv(Some(deadline)), |
341 | ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)), |
342 | } |
343 | } |
344 | } |
345 | |
346 | // The methods below are not used by `sync::mpsc`, but |
347 | // are useful and we'll likely want to expose them |
348 | // eventually |
349 | #[allow (unused)] |
350 | impl<T> Receiver<T> { |
351 | /// Returns `true` if the channel is empty. |
352 | /// |
353 | /// Note: Zero-capacity channels are always empty. |
354 | pub fn is_empty(&self) -> bool { |
355 | match &self.flavor { |
356 | ReceiverFlavor::Array(chan) => chan.is_empty(), |
357 | ReceiverFlavor::List(chan) => chan.is_empty(), |
358 | ReceiverFlavor::Zero(chan) => chan.is_empty(), |
359 | } |
360 | } |
361 | |
362 | /// Returns `true` if the channel is full. |
363 | /// |
364 | /// Note: Zero-capacity channels are always full. |
365 | pub fn is_full(&self) -> bool { |
366 | match &self.flavor { |
367 | ReceiverFlavor::Array(chan) => chan.is_full(), |
368 | ReceiverFlavor::List(chan) => chan.is_full(), |
369 | ReceiverFlavor::Zero(chan) => chan.is_full(), |
370 | } |
371 | } |
372 | |
373 | /// Returns the number of messages in the channel. |
374 | pub fn len(&self) -> usize { |
375 | match &self.flavor { |
376 | ReceiverFlavor::Array(chan) => chan.len(), |
377 | ReceiverFlavor::List(chan) => chan.len(), |
378 | ReceiverFlavor::Zero(chan) => chan.len(), |
379 | } |
380 | } |
381 | |
382 | /// If the channel is bounded, returns its capacity. |
383 | pub fn capacity(&self) -> Option<usize> { |
384 | match &self.flavor { |
385 | ReceiverFlavor::Array(chan) => chan.capacity(), |
386 | ReceiverFlavor::List(chan) => chan.capacity(), |
387 | ReceiverFlavor::Zero(chan) => chan.capacity(), |
388 | } |
389 | } |
390 | |
391 | /// Returns `true` if receivers belong to the same channel. |
392 | pub fn same_channel(&self, other: &Receiver<T>) -> bool { |
393 | match (&self.flavor, &other.flavor) { |
394 | (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b, |
395 | (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b, |
396 | (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b, |
397 | _ => false, |
398 | } |
399 | } |
400 | } |
401 | |
402 | impl<T> Drop for Receiver<T> { |
403 | fn drop(&mut self) { |
404 | unsafe { |
405 | match &self.flavor { |
406 | ReceiverFlavor::Array(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect_receivers()), |
407 | ReceiverFlavor::List(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect_receivers()), |
408 | ReceiverFlavor::Zero(chan: &Receiver>) => chan.release(|c: &Channel| c.disconnect()), |
409 | } |
410 | } |
411 | } |
412 | } |
413 | |
414 | impl<T> Clone for Receiver<T> { |
415 | fn clone(&self) -> Self { |
416 | let flavor: ReceiverFlavor = match &self.flavor { |
417 | ReceiverFlavor::Array(chan: &Receiver>) => ReceiverFlavor::Array(chan.acquire()), |
418 | ReceiverFlavor::List(chan: &Receiver>) => ReceiverFlavor::List(chan.acquire()), |
419 | ReceiverFlavor::Zero(chan: &Receiver>) => ReceiverFlavor::Zero(chan.acquire()), |
420 | }; |
421 | |
422 | Receiver { flavor } |
423 | } |
424 | } |
425 | |
426 | impl<T> fmt::Debug for Receiver<T> { |
427 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
428 | f.pad("Receiver { .. }" ) |
429 | } |
430 | } |
431 | |