1 | //! A multi-producer, single-consumer queue for sending values across |
2 | //! asynchronous tasks. |
3 | //! |
4 | //! Similarly to the `std`, channel creation provides [`Receiver`] and |
5 | //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to |
6 | //! read values out of the channel. If there is no message to read from the |
7 | //! channel, the current task will be notified when a new value is sent. |
8 | //! [`Sender`] implements the `Sink` trait and allows a task to send messages into |
9 | //! the channel. If the channel is at capacity, the send will be rejected and |
10 | //! the task will be notified when additional capacity is available. In other |
11 | //! words, the channel provides backpressure. |
12 | //! |
13 | //! Unbounded channels are also available using the `unbounded` constructor. |
14 | //! |
15 | //! # Disconnection |
16 | //! |
17 | //! When all [`Sender`] handles have been dropped, it is no longer |
18 | //! possible to send values into the channel. This is considered the termination |
19 | //! event of the stream. As such, [`Receiver::poll_next`] |
20 | //! will return `Ok(Ready(None))`. |
21 | //! |
22 | //! If the [`Receiver`] handle is dropped, then messages can no longer |
23 | //! be read out of the channel. In this case, all further attempts to send will |
24 | //! result in an error. |
25 | //! |
26 | //! # Clean Shutdown |
27 | //! |
28 | //! If the [`Receiver`] is simply dropped, then it is possible for |
29 | //! there to be messages still in the channel that will not be processed. As |
30 | //! such, it is usually desirable to perform a "clean" shutdown. To do this, the |
31 | //! receiver will first call `close`, which will prevent any further messages to |
32 | //! be sent into the channel. Then, the receiver consumes the channel to |
33 | //! completion, at which point the receiver can be dropped. |
34 | //! |
35 | //! [`Sender`]: struct.Sender.html |
36 | //! [`Receiver`]: struct.Receiver.html |
37 | //! [`Stream`]: ../../futures_core/stream/trait.Stream.html |
38 | //! [`Receiver::poll_next`]: |
39 | //! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next |
40 | |
41 | // At the core, the channel uses an atomic FIFO queue for message passing. This |
42 | // queue is used as the primary coordination primitive. In order to enforce |
43 | // capacity limits and handle back pressure, a secondary FIFO queue is used to |
44 | // send parked task handles. |
45 | // |
46 | // The general idea is that the channel is created with a `buffer` size of `n`. |
47 | // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" |
48 | // slot to hold a message. This allows `Sender` to know for a fact that a send |
49 | // will succeed *before* starting to do the actual work of sending the value. |
50 | // Since most of this work is lock-free, once the work starts, it is impossible |
51 | // to safely revert. |
52 | // |
53 | // If the sender is unable to process a send operation, then the current |
54 | // task is parked and the handle is sent on the parked task queue. |
55 | // |
56 | // Note that the implementation guarantees that the channel capacity will never |
57 | // exceed the configured limit, however there is no *strict* guarantee that the |
58 | // receiver will wake up a parked task *immediately* when a slot becomes |
59 | // available. However, it will almost always unpark a task when a slot becomes |
60 | // available and it is *guaranteed* that a sender will be unparked when the |
61 | // message that caused the sender to become parked is read out of the channel. |
62 | // |
63 | // The steps for sending a message are roughly: |
64 | // |
65 | // 1) Increment the channel message count |
66 | // 2) If the channel is at capacity, push the task handle onto the wait queue |
67 | // 3) Push the message onto the message queue. |
68 | // |
69 | // The steps for receiving a message are roughly: |
70 | // |
71 | // 1) Pop a message from the message queue |
72 | // 2) Pop a task handle from the wait queue |
73 | // 3) Decrement the channel message count. |
74 | // |
75 | // It's important for the order of operations on lock-free structures to happen |
76 | // in reverse order between the sender and receiver. This makes the message |
77 | // queue the primary coordination structure and establishes the necessary |
78 | // happens-before semantics required for the acquire / release semantics used |
79 | // by the queue structure. |
80 | |
81 | use futures_core::stream::{FusedStream, Stream}; |
82 | use futures_core::task::__internal::AtomicWaker; |
83 | use futures_core::task::{Context, Poll, Waker}; |
84 | use std::fmt; |
85 | use std::pin::Pin; |
86 | use std::sync::atomic::AtomicUsize; |
87 | use std::sync::atomic::Ordering::SeqCst; |
88 | use std::sync::{Arc, Mutex}; |
89 | use std::thread; |
90 | |
91 | use crate::mpsc::queue::Queue; |
92 | |
93 | mod queue; |
94 | #[cfg (feature = "sink" )] |
95 | mod sink_impl; |
96 | |
97 | struct UnboundedSenderInner<T> { |
98 | // Channel state shared between the sender and receiver. |
99 | inner: Arc<UnboundedInner<T>>, |
100 | } |
101 | |
102 | struct BoundedSenderInner<T> { |
103 | // Channel state shared between the sender and receiver. |
104 | inner: Arc<BoundedInner<T>>, |
105 | |
106 | // Handle to the task that is blocked on this sender. This handle is sent |
107 | // to the receiver half in order to be notified when the sender becomes |
108 | // unblocked. |
109 | sender_task: Arc<Mutex<SenderTask>>, |
110 | |
111 | // `true` if the sender might be blocked. This is an optimization to avoid |
112 | // having to lock the mutex most of the time. |
113 | maybe_parked: bool, |
114 | } |
115 | |
116 | // We never project Pin<&mut SenderInner> to `Pin<&mut T>` |
117 | impl<T> Unpin for UnboundedSenderInner<T> {} |
118 | impl<T> Unpin for BoundedSenderInner<T> {} |
119 | |
120 | /// The transmission end of a bounded mpsc channel. |
121 | /// |
122 | /// This value is created by the [`channel`](channel) function. |
123 | pub struct Sender<T>(Option<BoundedSenderInner<T>>); |
124 | |
125 | /// The transmission end of an unbounded mpsc channel. |
126 | /// |
127 | /// This value is created by the [`unbounded`](unbounded) function. |
128 | pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>); |
129 | |
130 | trait AssertKinds: Send + Sync + Clone {} |
131 | impl AssertKinds for UnboundedSender<u32> {} |
132 | |
133 | /// The receiving end of a bounded mpsc channel. |
134 | /// |
135 | /// This value is created by the [`channel`](channel) function. |
136 | pub struct Receiver<T> { |
137 | inner: Option<Arc<BoundedInner<T>>>, |
138 | } |
139 | |
140 | /// The receiving end of an unbounded mpsc channel. |
141 | /// |
142 | /// This value is created by the [`unbounded`](unbounded) function. |
143 | pub struct UnboundedReceiver<T> { |
144 | inner: Option<Arc<UnboundedInner<T>>>, |
145 | } |
146 | |
147 | // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>` |
148 | impl<T> Unpin for UnboundedReceiver<T> {} |
149 | |
150 | /// The error type for [`Sender`s](Sender) used as `Sink`s. |
151 | #[derive (Clone, Debug, PartialEq, Eq)] |
152 | pub struct SendError { |
153 | kind: SendErrorKind, |
154 | } |
155 | |
156 | /// The error type returned from [`try_send`](Sender::try_send). |
157 | #[derive (Clone, PartialEq, Eq)] |
158 | pub struct TrySendError<T> { |
159 | err: SendError, |
160 | val: T, |
161 | } |
162 | |
163 | #[derive (Clone, Debug, PartialEq, Eq)] |
164 | enum SendErrorKind { |
165 | Full, |
166 | Disconnected, |
167 | } |
168 | |
169 | /// The error type returned from [`try_next`](Receiver::try_next). |
170 | pub struct TryRecvError { |
171 | _priv: (), |
172 | } |
173 | |
174 | impl fmt::Display for SendError { |
175 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
176 | if self.is_full() { |
177 | write!(f, "send failed because channel is full" ) |
178 | } else { |
179 | write!(f, "send failed because receiver is gone" ) |
180 | } |
181 | } |
182 | } |
183 | |
184 | impl std::error::Error for SendError {} |
185 | |
186 | impl SendError { |
187 | /// Returns `true` if this error is a result of the channel being full. |
188 | pub fn is_full(&self) -> bool { |
189 | match self.kind { |
190 | SendErrorKind::Full => true, |
191 | _ => false, |
192 | } |
193 | } |
194 | |
195 | /// Returns `true` if this error is a result of the receiver being dropped. |
196 | pub fn is_disconnected(&self) -> bool { |
197 | match self.kind { |
198 | SendErrorKind::Disconnected => true, |
199 | _ => false, |
200 | } |
201 | } |
202 | } |
203 | |
204 | impl<T> fmt::Debug for TrySendError<T> { |
205 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
206 | f.debug_struct("TrySendError" ).field(name:"kind" , &self.err.kind).finish() |
207 | } |
208 | } |
209 | |
210 | impl<T> fmt::Display for TrySendError<T> { |
211 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
212 | if self.is_full() { |
213 | write!(f, "send failed because channel is full" ) |
214 | } else { |
215 | write!(f, "send failed because receiver is gone" ) |
216 | } |
217 | } |
218 | } |
219 | |
220 | impl<T: core::any::Any> std::error::Error for TrySendError<T> {} |
221 | |
222 | impl<T> TrySendError<T> { |
223 | /// Returns `true` if this error is a result of the channel being full. |
224 | pub fn is_full(&self) -> bool { |
225 | self.err.is_full() |
226 | } |
227 | |
228 | /// Returns `true` if this error is a result of the receiver being dropped. |
229 | pub fn is_disconnected(&self) -> bool { |
230 | self.err.is_disconnected() |
231 | } |
232 | |
233 | /// Returns the message that was attempted to be sent but failed. |
234 | pub fn into_inner(self) -> T { |
235 | self.val |
236 | } |
237 | |
238 | /// Drops the message and converts into a `SendError`. |
239 | pub fn into_send_error(self) -> SendError { |
240 | self.err |
241 | } |
242 | } |
243 | |
244 | impl fmt::Debug for TryRecvError { |
245 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
246 | f.debug_tuple(name:"TryRecvError" ).finish() |
247 | } |
248 | } |
249 | |
250 | impl fmt::Display for TryRecvError { |
251 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
252 | write!(f, "receiver channel is empty" ) |
253 | } |
254 | } |
255 | |
256 | impl std::error::Error for TryRecvError {} |
257 | |
258 | struct UnboundedInner<T> { |
259 | // Internal channel state. Consists of the number of messages stored in the |
260 | // channel as well as a flag signalling that the channel is closed. |
261 | state: AtomicUsize, |
262 | |
263 | // Atomic, FIFO queue used to send messages to the receiver |
264 | message_queue: Queue<T>, |
265 | |
266 | // Number of senders in existence |
267 | num_senders: AtomicUsize, |
268 | |
269 | // Handle to the receiver's task. |
270 | recv_task: AtomicWaker, |
271 | } |
272 | |
273 | struct BoundedInner<T> { |
274 | // Max buffer size of the channel. If `None` then the channel is unbounded. |
275 | buffer: usize, |
276 | |
277 | // Internal channel state. Consists of the number of messages stored in the |
278 | // channel as well as a flag signalling that the channel is closed. |
279 | state: AtomicUsize, |
280 | |
281 | // Atomic, FIFO queue used to send messages to the receiver |
282 | message_queue: Queue<T>, |
283 | |
284 | // Atomic, FIFO queue used to send parked task handles to the receiver. |
285 | parked_queue: Queue<Arc<Mutex<SenderTask>>>, |
286 | |
287 | // Number of senders in existence |
288 | num_senders: AtomicUsize, |
289 | |
290 | // Handle to the receiver's task. |
291 | recv_task: AtomicWaker, |
292 | } |
293 | |
294 | // Struct representation of `Inner::state`. |
295 | #[derive (Clone, Copy)] |
296 | struct State { |
297 | // `true` when the channel is open |
298 | is_open: bool, |
299 | |
300 | // Number of messages in the channel |
301 | num_messages: usize, |
302 | } |
303 | |
304 | // The `is_open` flag is stored in the left-most bit of `Inner::state` |
305 | const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1); |
306 | |
307 | // When a new channel is created, it is created in the open state with no |
308 | // pending messages. |
309 | const INIT_STATE: usize = OPEN_MASK; |
310 | |
311 | // The maximum number of messages that a channel can track is `usize::max_value() >> 1` |
312 | const MAX_CAPACITY: usize = !(OPEN_MASK); |
313 | |
314 | // The maximum requested buffer size must be less than the maximum capacity of |
315 | // a channel. This is because each sender gets a guaranteed slot. |
316 | const MAX_BUFFER: usize = MAX_CAPACITY >> 1; |
317 | |
318 | // Sent to the consumer to wake up blocked producers |
319 | struct SenderTask { |
320 | task: Option<Waker>, |
321 | is_parked: bool, |
322 | } |
323 | |
324 | impl SenderTask { |
325 | fn new() -> Self { |
326 | Self { task: None, is_parked: false } |
327 | } |
328 | |
329 | fn notify(&mut self) { |
330 | self.is_parked = false; |
331 | |
332 | if let Some(task: Waker) = self.task.take() { |
333 | task.wake(); |
334 | } |
335 | } |
336 | } |
337 | |
338 | /// Creates a bounded mpsc channel for communicating between asynchronous tasks. |
339 | /// |
340 | /// Being bounded, this channel provides backpressure to ensure that the sender |
341 | /// outpaces the receiver by only a limited amount. The channel's capacity is |
342 | /// equal to `buffer + num-senders`. In other words, each sender gets a |
343 | /// guaranteed slot in the channel capacity, and on top of that there are |
344 | /// `buffer` "first come, first serve" slots available to all senders. |
345 | /// |
346 | /// The [`Receiver`](Receiver) returned implements the |
347 | /// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements |
348 | /// `Sink`. |
349 | pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { |
350 | // Check that the requested buffer size does not exceed the maximum buffer |
351 | // size permitted by the system. |
352 | assert!(buffer < MAX_BUFFER, "requested buffer size too large" ); |
353 | |
354 | let inner: Arc> = Arc::new(data:BoundedInner { |
355 | buffer, |
356 | state: AtomicUsize::new(INIT_STATE), |
357 | message_queue: Queue::new(), |
358 | parked_queue: Queue::new(), |
359 | num_senders: AtomicUsize::new(1), |
360 | recv_task: AtomicWaker::new(), |
361 | }); |
362 | |
363 | let tx: BoundedSenderInner = BoundedSenderInner { |
364 | inner: inner.clone(), |
365 | sender_task: Arc::new(data:Mutex::new(SenderTask::new())), |
366 | maybe_parked: false, |
367 | }; |
368 | |
369 | let rx: Receiver = Receiver { inner: Some(inner) }; |
370 | |
371 | (Sender(Some(tx)), rx) |
372 | } |
373 | |
374 | /// Creates an unbounded mpsc channel for communicating between asynchronous |
375 | /// tasks. |
376 | /// |
377 | /// A `send` on this channel will always succeed as long as the receive half has |
378 | /// not been closed. If the receiver falls behind, messages will be arbitrarily |
379 | /// buffered. |
380 | /// |
381 | /// **Note** that the amount of available system memory is an implicit bound to |
382 | /// the channel. Using an `unbounded` channel has the ability of causing the |
383 | /// process to run out of memory. In this case, the process will be aborted. |
384 | pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { |
385 | let inner: Arc> = Arc::new(data:UnboundedInner { |
386 | state: AtomicUsize::new(INIT_STATE), |
387 | message_queue: Queue::new(), |
388 | num_senders: AtomicUsize::new(1), |
389 | recv_task: AtomicWaker::new(), |
390 | }); |
391 | |
392 | let tx: UnboundedSenderInner = UnboundedSenderInner { inner: inner.clone() }; |
393 | |
394 | let rx: UnboundedReceiver = UnboundedReceiver { inner: Some(inner) }; |
395 | |
396 | (UnboundedSender(Some(tx)), rx) |
397 | } |
398 | |
399 | /* |
400 | * |
401 | * ===== impl Sender ===== |
402 | * |
403 | */ |
404 | |
405 | impl<T> UnboundedSenderInner<T> { |
406 | fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> { |
407 | let state = decode_state(self.inner.state.load(SeqCst)); |
408 | if state.is_open { |
409 | Poll::Ready(Ok(())) |
410 | } else { |
411 | Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) |
412 | } |
413 | } |
414 | |
415 | // Push message to the queue and signal to the receiver |
416 | fn queue_push_and_signal(&self, msg: T) { |
417 | // Push the message onto the message queue |
418 | self.inner.message_queue.push(msg); |
419 | |
420 | // Signal to the receiver that a message has been enqueued. If the |
421 | // receiver is parked, this will unpark the task. |
422 | self.inner.recv_task.wake(); |
423 | } |
424 | |
425 | // Increment the number of queued messages. Returns the resulting number. |
426 | fn inc_num_messages(&self) -> Option<usize> { |
427 | let mut curr = self.inner.state.load(SeqCst); |
428 | |
429 | loop { |
430 | let mut state = decode_state(curr); |
431 | |
432 | // The receiver end closed the channel. |
433 | if !state.is_open { |
434 | return None; |
435 | } |
436 | |
437 | // This probably is never hit? Odds are the process will run out of |
438 | // memory first. It may be worth to return something else in this |
439 | // case? |
440 | assert!( |
441 | state.num_messages < MAX_CAPACITY, |
442 | "buffer space \ |
443 | exhausted; sending this messages would overflow the state" |
444 | ); |
445 | |
446 | state.num_messages += 1; |
447 | |
448 | let next = encode_state(&state); |
449 | match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { |
450 | Ok(_) => return Some(state.num_messages), |
451 | Err(actual) => curr = actual, |
452 | } |
453 | } |
454 | } |
455 | |
456 | /// Returns whether the senders send to the same receiver. |
457 | fn same_receiver(&self, other: &Self) -> bool { |
458 | Arc::ptr_eq(&self.inner, &other.inner) |
459 | } |
460 | |
461 | /// Returns whether the sender send to this receiver. |
462 | fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool { |
463 | Arc::ptr_eq(&self.inner, inner) |
464 | } |
465 | |
466 | /// Returns pointer to the Arc containing sender |
467 | /// |
468 | /// The returned pointer is not referenced and should be only used for hashing! |
469 | fn ptr(&self) -> *const UnboundedInner<T> { |
470 | &*self.inner |
471 | } |
472 | |
473 | /// Returns whether this channel is closed without needing a context. |
474 | fn is_closed(&self) -> bool { |
475 | !decode_state(self.inner.state.load(SeqCst)).is_open |
476 | } |
477 | |
478 | /// Closes this channel from the sender side, preventing any new messages. |
479 | fn close_channel(&self) { |
480 | // There's no need to park this sender, its dropping, |
481 | // and we don't want to check for capacity, so skip |
482 | // that stuff from `do_send`. |
483 | |
484 | self.inner.set_closed(); |
485 | self.inner.recv_task.wake(); |
486 | } |
487 | } |
488 | |
489 | impl<T> BoundedSenderInner<T> { |
490 | /// Attempts to send a message on this `Sender`, returning the message |
491 | /// if there was an error. |
492 | fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
493 | // If the sender is currently blocked, reject the message |
494 | if !self.poll_unparked(None).is_ready() { |
495 | return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); |
496 | } |
497 | |
498 | // The channel has capacity to accept the message, so send it |
499 | self.do_send_b(msg) |
500 | } |
501 | |
502 | // Do the send without failing. |
503 | // Can be called only by bounded sender. |
504 | fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
505 | // Anyone calling do_send *should* make sure there is room first, |
506 | // but assert here for tests as a sanity check. |
507 | debug_assert!(self.poll_unparked(None).is_ready()); |
508 | |
509 | // First, increment the number of messages contained by the channel. |
510 | // This operation will also atomically determine if the sender task |
511 | // should be parked. |
512 | // |
513 | // `None` is returned in the case that the channel has been closed by the |
514 | // receiver. This happens when `Receiver::close` is called or the |
515 | // receiver is dropped. |
516 | let park_self = match self.inc_num_messages() { |
517 | Some(num_messages) => { |
518 | // Block if the current number of pending messages has exceeded |
519 | // the configured buffer size |
520 | num_messages > self.inner.buffer |
521 | } |
522 | None => { |
523 | return Err(TrySendError { |
524 | err: SendError { kind: SendErrorKind::Disconnected }, |
525 | val: msg, |
526 | }) |
527 | } |
528 | }; |
529 | |
530 | // If the channel has reached capacity, then the sender task needs to |
531 | // be parked. This will send the task handle on the parked task queue. |
532 | // |
533 | // However, when `do_send` is called while dropping the `Sender`, |
534 | // `task::current()` can't be called safely. In this case, in order to |
535 | // maintain internal consistency, a blank message is pushed onto the |
536 | // parked task queue. |
537 | if park_self { |
538 | self.park(); |
539 | } |
540 | |
541 | self.queue_push_and_signal(msg); |
542 | |
543 | Ok(()) |
544 | } |
545 | |
546 | // Push message to the queue and signal to the receiver |
547 | fn queue_push_and_signal(&self, msg: T) { |
548 | // Push the message onto the message queue |
549 | self.inner.message_queue.push(msg); |
550 | |
551 | // Signal to the receiver that a message has been enqueued. If the |
552 | // receiver is parked, this will unpark the task. |
553 | self.inner.recv_task.wake(); |
554 | } |
555 | |
556 | // Increment the number of queued messages. Returns the resulting number. |
557 | fn inc_num_messages(&self) -> Option<usize> { |
558 | let mut curr = self.inner.state.load(SeqCst); |
559 | |
560 | loop { |
561 | let mut state = decode_state(curr); |
562 | |
563 | // The receiver end closed the channel. |
564 | if !state.is_open { |
565 | return None; |
566 | } |
567 | |
568 | // This probably is never hit? Odds are the process will run out of |
569 | // memory first. It may be worth to return something else in this |
570 | // case? |
571 | assert!( |
572 | state.num_messages < MAX_CAPACITY, |
573 | "buffer space \ |
574 | exhausted; sending this messages would overflow the state" |
575 | ); |
576 | |
577 | state.num_messages += 1; |
578 | |
579 | let next = encode_state(&state); |
580 | match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { |
581 | Ok(_) => return Some(state.num_messages), |
582 | Err(actual) => curr = actual, |
583 | } |
584 | } |
585 | } |
586 | |
587 | fn park(&mut self) { |
588 | { |
589 | let mut sender = self.sender_task.lock().unwrap(); |
590 | sender.task = None; |
591 | sender.is_parked = true; |
592 | } |
593 | |
594 | // Send handle over queue |
595 | let t = self.sender_task.clone(); |
596 | self.inner.parked_queue.push(t); |
597 | |
598 | // Check to make sure we weren't closed after we sent our task on the |
599 | // queue |
600 | let state = decode_state(self.inner.state.load(SeqCst)); |
601 | self.maybe_parked = state.is_open; |
602 | } |
603 | |
604 | /// Polls the channel to determine if there is guaranteed capacity to send |
605 | /// at least one item without waiting. |
606 | /// |
607 | /// # Return value |
608 | /// |
609 | /// This method returns: |
610 | /// |
611 | /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; |
612 | /// - `Poll::Pending` if the channel may not have |
613 | /// capacity, in which case the current task is queued to be notified once |
614 | /// capacity is available; |
615 | /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. |
616 | fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
617 | let state = decode_state(self.inner.state.load(SeqCst)); |
618 | if !state.is_open { |
619 | return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); |
620 | } |
621 | |
622 | self.poll_unparked(Some(cx)).map(Ok) |
623 | } |
624 | |
625 | /// Returns whether the senders send to the same receiver. |
626 | fn same_receiver(&self, other: &Self) -> bool { |
627 | Arc::ptr_eq(&self.inner, &other.inner) |
628 | } |
629 | |
630 | /// Returns whether the sender send to this receiver. |
631 | fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool { |
632 | Arc::ptr_eq(&self.inner, receiver) |
633 | } |
634 | |
635 | /// Returns pointer to the Arc containing sender |
636 | /// |
637 | /// The returned pointer is not referenced and should be only used for hashing! |
638 | fn ptr(&self) -> *const BoundedInner<T> { |
639 | &*self.inner |
640 | } |
641 | |
642 | /// Returns whether this channel is closed without needing a context. |
643 | fn is_closed(&self) -> bool { |
644 | !decode_state(self.inner.state.load(SeqCst)).is_open |
645 | } |
646 | |
647 | /// Closes this channel from the sender side, preventing any new messages. |
648 | fn close_channel(&self) { |
649 | // There's no need to park this sender, its dropping, |
650 | // and we don't want to check for capacity, so skip |
651 | // that stuff from `do_send`. |
652 | |
653 | self.inner.set_closed(); |
654 | self.inner.recv_task.wake(); |
655 | } |
656 | |
657 | fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> { |
658 | // First check the `maybe_parked` variable. This avoids acquiring the |
659 | // lock in most cases |
660 | if self.maybe_parked { |
661 | // Get a lock on the task handle |
662 | let mut task = self.sender_task.lock().unwrap(); |
663 | |
664 | if !task.is_parked { |
665 | self.maybe_parked = false; |
666 | return Poll::Ready(()); |
667 | } |
668 | |
669 | // At this point, an unpark request is pending, so there will be an |
670 | // unpark sometime in the future. We just need to make sure that |
671 | // the correct task will be notified. |
672 | // |
673 | // Update the task in case the `Sender` has been moved to another |
674 | // task |
675 | task.task = cx.map(|cx| cx.waker().clone()); |
676 | |
677 | Poll::Pending |
678 | } else { |
679 | Poll::Ready(()) |
680 | } |
681 | } |
682 | } |
683 | |
684 | impl<T> Sender<T> { |
685 | /// Attempts to send a message on this `Sender`, returning the message |
686 | /// if there was an error. |
687 | pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
688 | if let Some(inner) = &mut self.0 { |
689 | inner.try_send(msg) |
690 | } else { |
691 | Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) |
692 | } |
693 | } |
694 | |
695 | /// Send a message on the channel. |
696 | /// |
697 | /// This function should only be called after |
698 | /// [`poll_ready`](Sender::poll_ready) has reported that the channel is |
699 | /// ready to receive a message. |
700 | pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { |
701 | self.try_send(msg).map_err(|e| e.err) |
702 | } |
703 | |
704 | /// Polls the channel to determine if there is guaranteed capacity to send |
705 | /// at least one item without waiting. |
706 | /// |
707 | /// # Return value |
708 | /// |
709 | /// This method returns: |
710 | /// |
711 | /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; |
712 | /// - `Poll::Pending` if the channel may not have |
713 | /// capacity, in which case the current task is queued to be notified once |
714 | /// capacity is available; |
715 | /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. |
716 | pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
717 | let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; |
718 | inner.poll_ready(cx) |
719 | } |
720 | |
721 | /// Returns whether this channel is closed without needing a context. |
722 | pub fn is_closed(&self) -> bool { |
723 | self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true) |
724 | } |
725 | |
726 | /// Closes this channel from the sender side, preventing any new messages. |
727 | pub fn close_channel(&mut self) { |
728 | if let Some(inner) = &mut self.0 { |
729 | inner.close_channel(); |
730 | } |
731 | } |
732 | |
733 | /// Disconnects this sender from the channel, closing it if there are no more senders left. |
734 | pub fn disconnect(&mut self) { |
735 | self.0 = None; |
736 | } |
737 | |
738 | /// Returns whether the senders send to the same receiver. |
739 | pub fn same_receiver(&self, other: &Self) -> bool { |
740 | match (&self.0, &other.0) { |
741 | (Some(inner), Some(other)) => inner.same_receiver(other), |
742 | _ => false, |
743 | } |
744 | } |
745 | |
746 | /// Returns whether the sender send to this receiver. |
747 | pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool { |
748 | match (&self.0, &receiver.inner) { |
749 | (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), |
750 | _ => false, |
751 | } |
752 | } |
753 | |
754 | /// Hashes the receiver into the provided hasher |
755 | pub fn hash_receiver<H>(&self, hasher: &mut H) |
756 | where |
757 | H: std::hash::Hasher, |
758 | { |
759 | use std::hash::Hash; |
760 | |
761 | let ptr = self.0.as_ref().map(|inner| inner.ptr()); |
762 | ptr.hash(hasher); |
763 | } |
764 | } |
765 | |
766 | impl<T> UnboundedSender<T> { |
767 | /// Check if the channel is ready to receive a message. |
768 | pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
769 | let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; |
770 | inner.poll_ready_nb() |
771 | } |
772 | |
773 | /// Returns whether this channel is closed without needing a context. |
774 | pub fn is_closed(&self) -> bool { |
775 | self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true) |
776 | } |
777 | |
778 | /// Closes this channel from the sender side, preventing any new messages. |
779 | pub fn close_channel(&self) { |
780 | if let Some(inner) = &self.0 { |
781 | inner.close_channel(); |
782 | } |
783 | } |
784 | |
785 | /// Disconnects this sender from the channel, closing it if there are no more senders left. |
786 | pub fn disconnect(&mut self) { |
787 | self.0 = None; |
788 | } |
789 | |
790 | // Do the send without parking current task. |
791 | fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> { |
792 | if let Some(inner) = &self.0 { |
793 | if inner.inc_num_messages().is_some() { |
794 | inner.queue_push_and_signal(msg); |
795 | return Ok(()); |
796 | } |
797 | } |
798 | |
799 | Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) |
800 | } |
801 | |
802 | /// Send a message on the channel. |
803 | /// |
804 | /// This method should only be called after `poll_ready` has been used to |
805 | /// verify that the channel is ready to receive a message. |
806 | pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { |
807 | self.do_send_nb(msg).map_err(|e| e.err) |
808 | } |
809 | |
810 | /// Sends a message along this channel. |
811 | /// |
812 | /// This is an unbounded sender, so this function differs from `Sink::send` |
813 | /// by ensuring the return type reflects that the channel is always ready to |
814 | /// receive messages. |
815 | pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
816 | self.do_send_nb(msg) |
817 | } |
818 | |
819 | /// Returns whether the senders send to the same receiver. |
820 | pub fn same_receiver(&self, other: &Self) -> bool { |
821 | match (&self.0, &other.0) { |
822 | (Some(inner), Some(other)) => inner.same_receiver(other), |
823 | _ => false, |
824 | } |
825 | } |
826 | |
827 | /// Returns whether the sender send to this receiver. |
828 | pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool { |
829 | match (&self.0, &receiver.inner) { |
830 | (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), |
831 | _ => false, |
832 | } |
833 | } |
834 | |
835 | /// Hashes the receiver into the provided hasher |
836 | pub fn hash_receiver<H>(&self, hasher: &mut H) |
837 | where |
838 | H: std::hash::Hasher, |
839 | { |
840 | use std::hash::Hash; |
841 | |
842 | let ptr = self.0.as_ref().map(|inner| inner.ptr()); |
843 | ptr.hash(hasher); |
844 | } |
845 | } |
846 | |
847 | impl<T> Clone for Sender<T> { |
848 | fn clone(&self) -> Self { |
849 | Self(self.0.clone()) |
850 | } |
851 | } |
852 | |
853 | impl<T> Clone for UnboundedSender<T> { |
854 | fn clone(&self) -> Self { |
855 | Self(self.0.clone()) |
856 | } |
857 | } |
858 | |
859 | impl<T> Clone for UnboundedSenderInner<T> { |
860 | fn clone(&self) -> Self { |
861 | // Since this atomic op isn't actually guarding any memory and we don't |
862 | // care about any orderings besides the ordering on the single atomic |
863 | // variable, a relaxed ordering is acceptable. |
864 | let mut curr = self.inner.num_senders.load(SeqCst); |
865 | |
866 | loop { |
867 | // If the maximum number of senders has been reached, then fail |
868 | if curr == MAX_BUFFER { |
869 | panic!("cannot clone `Sender` -- too many outstanding senders" ); |
870 | } |
871 | |
872 | debug_assert!(curr < MAX_BUFFER); |
873 | |
874 | let next = curr + 1; |
875 | match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { |
876 | Ok(_) => { |
877 | // The ABA problem doesn't matter here. We only care that the |
878 | // number of senders never exceeds the maximum. |
879 | return Self { inner: self.inner.clone() }; |
880 | } |
881 | Err(actual) => curr = actual, |
882 | } |
883 | } |
884 | } |
885 | } |
886 | |
887 | impl<T> Clone for BoundedSenderInner<T> { |
888 | fn clone(&self) -> Self { |
889 | // Since this atomic op isn't actually guarding any memory and we don't |
890 | // care about any orderings besides the ordering on the single atomic |
891 | // variable, a relaxed ordering is acceptable. |
892 | let mut curr = self.inner.num_senders.load(SeqCst); |
893 | |
894 | loop { |
895 | // If the maximum number of senders has been reached, then fail |
896 | if curr == self.inner.max_senders() { |
897 | panic!("cannot clone `Sender` -- too many outstanding senders" ); |
898 | } |
899 | |
900 | debug_assert!(curr < self.inner.max_senders()); |
901 | |
902 | let next = curr + 1; |
903 | match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { |
904 | Ok(_) => { |
905 | // The ABA problem doesn't matter here. We only care that the |
906 | // number of senders never exceeds the maximum. |
907 | return Self { |
908 | inner: self.inner.clone(), |
909 | sender_task: Arc::new(Mutex::new(SenderTask::new())), |
910 | maybe_parked: false, |
911 | }; |
912 | } |
913 | Err(actual) => curr = actual, |
914 | } |
915 | } |
916 | } |
917 | } |
918 | |
919 | impl<T> Drop for UnboundedSenderInner<T> { |
920 | fn drop(&mut self) { |
921 | // Ordering between variables don't matter here |
922 | let prev: usize = self.inner.num_senders.fetch_sub(val:1, order:SeqCst); |
923 | |
924 | if prev == 1 { |
925 | self.close_channel(); |
926 | } |
927 | } |
928 | } |
929 | |
930 | impl<T> Drop for BoundedSenderInner<T> { |
931 | fn drop(&mut self) { |
932 | // Ordering between variables don't matter here |
933 | let prev: usize = self.inner.num_senders.fetch_sub(val:1, order:SeqCst); |
934 | |
935 | if prev == 1 { |
936 | self.close_channel(); |
937 | } |
938 | } |
939 | } |
940 | |
941 | impl<T> fmt::Debug for Sender<T> { |
942 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
943 | f.debug_struct("Sender" ).field(name:"closed" , &self.is_closed()).finish() |
944 | } |
945 | } |
946 | |
947 | impl<T> fmt::Debug for UnboundedSender<T> { |
948 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
949 | f.debug_struct("UnboundedSender" ).field(name:"closed" , &self.is_closed()).finish() |
950 | } |
951 | } |
952 | |
953 | /* |
954 | * |
955 | * ===== impl Receiver ===== |
956 | * |
957 | */ |
958 | |
959 | impl<T> Receiver<T> { |
960 | /// Closes the receiving half of a channel, without dropping it. |
961 | /// |
962 | /// This prevents any further messages from being sent on the channel while |
963 | /// still enabling the receiver to drain messages that are buffered. |
964 | pub fn close(&mut self) { |
965 | if let Some(inner) = &mut self.inner { |
966 | inner.set_closed(); |
967 | |
968 | // Wake up any threads waiting as they'll see that we've closed the |
969 | // channel and will continue on their merry way. |
970 | while let Some(task) = unsafe { inner.parked_queue.pop_spin() } { |
971 | task.lock().unwrap().notify(); |
972 | } |
973 | } |
974 | } |
975 | |
976 | /// Tries to receive the next message without notifying a context if empty. |
977 | /// |
978 | /// It is not recommended to call this function from inside of a future, |
979 | /// only when you've otherwise arranged to be notified when the channel is |
980 | /// no longer empty. |
981 | /// |
982 | /// This function returns: |
983 | /// * `Ok(Some(t))` when message is fetched |
984 | /// * `Ok(None)` when channel is closed and no messages left in the queue |
985 | /// * `Err(e)` when there are no messages available, but channel is not yet closed |
986 | pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { |
987 | match self.next_message() { |
988 | Poll::Ready(msg) => Ok(msg), |
989 | Poll::Pending => Err(TryRecvError { _priv: () }), |
990 | } |
991 | } |
992 | |
993 | fn next_message(&mut self) -> Poll<Option<T>> { |
994 | let inner = match self.inner.as_mut() { |
995 | None => return Poll::Ready(None), |
996 | Some(inner) => inner, |
997 | }; |
998 | // Pop off a message |
999 | match unsafe { inner.message_queue.pop_spin() } { |
1000 | Some(msg) => { |
1001 | // If there are any parked task handles in the parked queue, |
1002 | // pop one and unpark it. |
1003 | self.unpark_one(); |
1004 | |
1005 | // Decrement number of messages |
1006 | self.dec_num_messages(); |
1007 | |
1008 | Poll::Ready(Some(msg)) |
1009 | } |
1010 | None => { |
1011 | let state = decode_state(inner.state.load(SeqCst)); |
1012 | if state.is_closed() { |
1013 | // If closed flag is set AND there are no pending messages |
1014 | // it means end of stream |
1015 | self.inner = None; |
1016 | Poll::Ready(None) |
1017 | } else { |
1018 | // If queue is open, we need to return Pending |
1019 | // to be woken up when new messages arrive. |
1020 | // If queue is closed but num_messages is non-zero, |
1021 | // it means that senders updated the state, |
1022 | // but didn't put message to queue yet, |
1023 | // so we need to park until sender unparks the task |
1024 | // after queueing the message. |
1025 | Poll::Pending |
1026 | } |
1027 | } |
1028 | } |
1029 | } |
1030 | |
1031 | // Unpark a single task handle if there is one pending in the parked queue |
1032 | fn unpark_one(&mut self) { |
1033 | if let Some(inner) = &mut self.inner { |
1034 | if let Some(task) = unsafe { inner.parked_queue.pop_spin() } { |
1035 | task.lock().unwrap().notify(); |
1036 | } |
1037 | } |
1038 | } |
1039 | |
1040 | fn dec_num_messages(&self) { |
1041 | if let Some(inner) = &self.inner { |
1042 | // OPEN_MASK is highest bit, so it's unaffected by subtraction |
1043 | // unless there's underflow, and we know there's no underflow |
1044 | // because number of messages at this point is always > 0. |
1045 | inner.state.fetch_sub(1, SeqCst); |
1046 | } |
1047 | } |
1048 | } |
1049 | |
1050 | // The receiver does not ever take a Pin to the inner T |
1051 | impl<T> Unpin for Receiver<T> {} |
1052 | |
1053 | impl<T> FusedStream for Receiver<T> { |
1054 | fn is_terminated(&self) -> bool { |
1055 | self.inner.is_none() |
1056 | } |
1057 | } |
1058 | |
1059 | impl<T> Stream for Receiver<T> { |
1060 | type Item = T; |
1061 | |
1062 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
1063 | // Try to read a message off of the message queue. |
1064 | match self.next_message() { |
1065 | Poll::Ready(msg) => { |
1066 | if msg.is_none() { |
1067 | self.inner = None; |
1068 | } |
1069 | Poll::Ready(msg) |
1070 | } |
1071 | Poll::Pending => { |
1072 | // There are no messages to read, in this case, park. |
1073 | self.inner.as_ref().unwrap().recv_task.register(cx.waker()); |
1074 | // Check queue again after parking to prevent race condition: |
1075 | // a message could be added to the queue after previous `next_message` |
1076 | // before `register` call. |
1077 | self.next_message() |
1078 | } |
1079 | } |
1080 | } |
1081 | |
1082 | fn size_hint(&self) -> (usize, Option<usize>) { |
1083 | if let Some(inner) = &self.inner { |
1084 | decode_state(inner.state.load(SeqCst)).size_hint() |
1085 | } else { |
1086 | (0, Some(0)) |
1087 | } |
1088 | } |
1089 | } |
1090 | |
1091 | impl<T> Drop for Receiver<T> { |
1092 | fn drop(&mut self) { |
1093 | // Drain the channel of all pending messages |
1094 | self.close(); |
1095 | if self.inner.is_some() { |
1096 | loop { |
1097 | match self.next_message() { |
1098 | Poll::Ready(Some(_)) => {} |
1099 | Poll::Ready(None) => break, |
1100 | Poll::Pending => { |
1101 | let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); |
1102 | |
1103 | // If the channel is closed, then there is no need to park. |
1104 | if state.is_closed() { |
1105 | break; |
1106 | } |
1107 | |
1108 | // TODO: Spinning isn't ideal, it might be worth |
1109 | // investigating using a condvar or some other strategy |
1110 | // here. That said, if this case is hit, then another thread |
1111 | // is about to push the value into the queue and this isn't |
1112 | // the only spinlock in the impl right now. |
1113 | thread::yield_now(); |
1114 | } |
1115 | } |
1116 | } |
1117 | } |
1118 | } |
1119 | } |
1120 | |
1121 | impl<T> fmt::Debug for Receiver<T> { |
1122 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1123 | let closed: bool = if let Some(ref inner: &Arc>) = self.inner { |
1124 | decode_state(num:inner.state.load(order:SeqCst)).is_closed() |
1125 | } else { |
1126 | false |
1127 | }; |
1128 | |
1129 | f.debug_struct("Receiver" ).field(name:"closed" , &closed).finish() |
1130 | } |
1131 | } |
1132 | |
1133 | impl<T> UnboundedReceiver<T> { |
1134 | /// Closes the receiving half of a channel, without dropping it. |
1135 | /// |
1136 | /// This prevents any further messages from being sent on the channel while |
1137 | /// still enabling the receiver to drain messages that are buffered. |
1138 | pub fn close(&mut self) { |
1139 | if let Some(inner) = &mut self.inner { |
1140 | inner.set_closed(); |
1141 | } |
1142 | } |
1143 | |
1144 | /// Tries to receive the next message without notifying a context if empty. |
1145 | /// |
1146 | /// It is not recommended to call this function from inside of a future, |
1147 | /// only when you've otherwise arranged to be notified when the channel is |
1148 | /// no longer empty. |
1149 | /// |
1150 | /// This function returns: |
1151 | /// * `Ok(Some(t))` when message is fetched |
1152 | /// * `Ok(None)` when channel is closed and no messages left in the queue |
1153 | /// * `Err(e)` when there are no messages available, but channel is not yet closed |
1154 | pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { |
1155 | match self.next_message() { |
1156 | Poll::Ready(msg) => Ok(msg), |
1157 | Poll::Pending => Err(TryRecvError { _priv: () }), |
1158 | } |
1159 | } |
1160 | |
1161 | fn next_message(&mut self) -> Poll<Option<T>> { |
1162 | let inner = match self.inner.as_mut() { |
1163 | None => return Poll::Ready(None), |
1164 | Some(inner) => inner, |
1165 | }; |
1166 | // Pop off a message |
1167 | match unsafe { inner.message_queue.pop_spin() } { |
1168 | Some(msg) => { |
1169 | // Decrement number of messages |
1170 | self.dec_num_messages(); |
1171 | |
1172 | Poll::Ready(Some(msg)) |
1173 | } |
1174 | None => { |
1175 | let state = decode_state(inner.state.load(SeqCst)); |
1176 | if state.is_closed() { |
1177 | // If closed flag is set AND there are no pending messages |
1178 | // it means end of stream |
1179 | self.inner = None; |
1180 | Poll::Ready(None) |
1181 | } else { |
1182 | // If queue is open, we need to return Pending |
1183 | // to be woken up when new messages arrive. |
1184 | // If queue is closed but num_messages is non-zero, |
1185 | // it means that senders updated the state, |
1186 | // but didn't put message to queue yet, |
1187 | // so we need to park until sender unparks the task |
1188 | // after queueing the message. |
1189 | Poll::Pending |
1190 | } |
1191 | } |
1192 | } |
1193 | } |
1194 | |
1195 | fn dec_num_messages(&self) { |
1196 | if let Some(inner) = &self.inner { |
1197 | // OPEN_MASK is highest bit, so it's unaffected by subtraction |
1198 | // unless there's underflow, and we know there's no underflow |
1199 | // because number of messages at this point is always > 0. |
1200 | inner.state.fetch_sub(1, SeqCst); |
1201 | } |
1202 | } |
1203 | } |
1204 | |
1205 | impl<T> FusedStream for UnboundedReceiver<T> { |
1206 | fn is_terminated(&self) -> bool { |
1207 | self.inner.is_none() |
1208 | } |
1209 | } |
1210 | |
1211 | impl<T> Stream for UnboundedReceiver<T> { |
1212 | type Item = T; |
1213 | |
1214 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
1215 | // Try to read a message off of the message queue. |
1216 | match self.next_message() { |
1217 | Poll::Ready(msg) => { |
1218 | if msg.is_none() { |
1219 | self.inner = None; |
1220 | } |
1221 | Poll::Ready(msg) |
1222 | } |
1223 | Poll::Pending => { |
1224 | // There are no messages to read, in this case, park. |
1225 | self.inner.as_ref().unwrap().recv_task.register(cx.waker()); |
1226 | // Check queue again after parking to prevent race condition: |
1227 | // a message could be added to the queue after previous `next_message` |
1228 | // before `register` call. |
1229 | self.next_message() |
1230 | } |
1231 | } |
1232 | } |
1233 | |
1234 | fn size_hint(&self) -> (usize, Option<usize>) { |
1235 | if let Some(inner) = &self.inner { |
1236 | decode_state(inner.state.load(SeqCst)).size_hint() |
1237 | } else { |
1238 | (0, Some(0)) |
1239 | } |
1240 | } |
1241 | } |
1242 | |
1243 | impl<T> Drop for UnboundedReceiver<T> { |
1244 | fn drop(&mut self) { |
1245 | // Drain the channel of all pending messages |
1246 | self.close(); |
1247 | if self.inner.is_some() { |
1248 | loop { |
1249 | match self.next_message() { |
1250 | Poll::Ready(Some(_)) => {} |
1251 | Poll::Ready(None) => break, |
1252 | Poll::Pending => { |
1253 | let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); |
1254 | |
1255 | // If the channel is closed, then there is no need to park. |
1256 | if state.is_closed() { |
1257 | break; |
1258 | } |
1259 | |
1260 | // TODO: Spinning isn't ideal, it might be worth |
1261 | // investigating using a condvar or some other strategy |
1262 | // here. That said, if this case is hit, then another thread |
1263 | // is about to push the value into the queue and this isn't |
1264 | // the only spinlock in the impl right now. |
1265 | thread::yield_now(); |
1266 | } |
1267 | } |
1268 | } |
1269 | } |
1270 | } |
1271 | } |
1272 | |
1273 | impl<T> fmt::Debug for UnboundedReceiver<T> { |
1274 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1275 | let closed: bool = if let Some(ref inner: &Arc>) = self.inner { |
1276 | decode_state(num:inner.state.load(order:SeqCst)).is_closed() |
1277 | } else { |
1278 | false |
1279 | }; |
1280 | |
1281 | f.debug_struct("Receiver" ).field(name:"closed" , &closed).finish() |
1282 | } |
1283 | } |
1284 | |
1285 | /* |
1286 | * |
1287 | * ===== impl Inner ===== |
1288 | * |
1289 | */ |
1290 | |
1291 | impl<T> UnboundedInner<T> { |
1292 | // Clear `open` flag in the state, keep `num_messages` intact. |
1293 | fn set_closed(&self) { |
1294 | let curr: usize = self.state.load(order:SeqCst); |
1295 | if !decode_state(num:curr).is_open { |
1296 | return; |
1297 | } |
1298 | |
1299 | self.state.fetch_and(!OPEN_MASK, order:SeqCst); |
1300 | } |
1301 | } |
1302 | |
1303 | impl<T> BoundedInner<T> { |
1304 | // The return value is such that the total number of messages that can be |
1305 | // enqueued into the channel will never exceed MAX_CAPACITY |
1306 | fn max_senders(&self) -> usize { |
1307 | MAX_CAPACITY - self.buffer |
1308 | } |
1309 | |
1310 | // Clear `open` flag in the state, keep `num_messages` intact. |
1311 | fn set_closed(&self) { |
1312 | let curr: usize = self.state.load(order:SeqCst); |
1313 | if !decode_state(num:curr).is_open { |
1314 | return; |
1315 | } |
1316 | |
1317 | self.state.fetch_and(!OPEN_MASK, order:SeqCst); |
1318 | } |
1319 | } |
1320 | |
1321 | unsafe impl<T: Send> Send for UnboundedInner<T> {} |
1322 | unsafe impl<T: Send> Sync for UnboundedInner<T> {} |
1323 | |
1324 | unsafe impl<T: Send> Send for BoundedInner<T> {} |
1325 | unsafe impl<T: Send> Sync for BoundedInner<T> {} |
1326 | |
1327 | impl State { |
1328 | fn is_closed(&self) -> bool { |
1329 | !self.is_open && self.num_messages == 0 |
1330 | } |
1331 | |
1332 | fn size_hint(&self) -> (usize, Option<usize>) { |
1333 | if self.is_open { |
1334 | (self.num_messages, None) |
1335 | } else { |
1336 | (self.num_messages, Some(self.num_messages)) |
1337 | } |
1338 | } |
1339 | } |
1340 | |
1341 | /* |
1342 | * |
1343 | * ===== Helpers ===== |
1344 | * |
1345 | */ |
1346 | |
1347 | fn decode_state(num: usize) -> State { |
1348 | State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } |
1349 | } |
1350 | |
1351 | fn encode_state(state: &State) -> usize { |
1352 | let mut num: usize = state.num_messages; |
1353 | |
1354 | if state.is_open { |
1355 | num |= OPEN_MASK; |
1356 | } |
1357 | |
1358 | num |
1359 | } |
1360 | |