1 | //! A multi-producer, single-consumer queue for sending values across |
2 | //! asynchronous tasks. |
3 | //! |
4 | //! Similarly to the `std`, channel creation provides [`Receiver`] and |
5 | //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to |
6 | //! read values out of the channel. If there is no message to read from the |
7 | //! channel, the current task will be notified when a new value is sent. |
8 | //! [`Sender`] implements the `Sink` trait and allows a task to send messages into |
9 | //! the channel. If the channel is at capacity, the send will be rejected and |
10 | //! the task will be notified when additional capacity is available. In other |
11 | //! words, the channel provides backpressure. |
12 | //! |
13 | //! Unbounded channels are also available using the `unbounded` constructor. |
14 | //! |
15 | //! # Disconnection |
16 | //! |
17 | //! When all [`Sender`] handles have been dropped, it is no longer |
18 | //! possible to send values into the channel. This is considered the termination |
19 | //! event of the stream. As such, [`Receiver::poll_next`] |
20 | //! will return `Ok(Ready(None))`. |
21 | //! |
22 | //! If the [`Receiver`] handle is dropped, then messages can no longer |
23 | //! be read out of the channel. In this case, all further attempts to send will |
24 | //! result in an error. |
25 | //! |
26 | //! # Clean Shutdown |
27 | //! |
28 | //! If the [`Receiver`] is simply dropped, then it is possible for |
29 | //! there to be messages still in the channel that will not be processed. As |
30 | //! such, it is usually desirable to perform a "clean" shutdown. To do this, the |
31 | //! receiver will first call `close`, which will prevent any further messages to |
32 | //! be sent into the channel. Then, the receiver consumes the channel to |
33 | //! completion, at which point the receiver can be dropped. |
34 | //! |
35 | //! [`Sender`]: struct.Sender.html |
36 | //! [`Receiver`]: struct.Receiver.html |
37 | //! [`Stream`]: ../../futures_core/stream/trait.Stream.html |
38 | //! [`Receiver::poll_next`]: |
39 | //! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next |
40 | |
41 | // At the core, the channel uses an atomic FIFO queue for message passing. This |
42 | // queue is used as the primary coordination primitive. In order to enforce |
43 | // capacity limits and handle back pressure, a secondary FIFO queue is used to |
44 | // send parked task handles. |
45 | // |
46 | // The general idea is that the channel is created with a `buffer` size of `n`. |
47 | // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" |
48 | // slot to hold a message. This allows `Sender` to know for a fact that a send |
49 | // will succeed *before* starting to do the actual work of sending the value. |
50 | // Since most of this work is lock-free, once the work starts, it is impossible |
51 | // to safely revert. |
52 | // |
53 | // If the sender is unable to process a send operation, then the current |
54 | // task is parked and the handle is sent on the parked task queue. |
55 | // |
56 | // Note that the implementation guarantees that the channel capacity will never |
57 | // exceed the configured limit, however there is no *strict* guarantee that the |
58 | // receiver will wake up a parked task *immediately* when a slot becomes |
59 | // available. However, it will almost always unpark a task when a slot becomes |
60 | // available and it is *guaranteed* that a sender will be unparked when the |
61 | // message that caused the sender to become parked is read out of the channel. |
62 | // |
63 | // The steps for sending a message are roughly: |
64 | // |
65 | // 1) Increment the channel message count |
66 | // 2) If the channel is at capacity, push the task handle onto the wait queue |
67 | // 3) Push the message onto the message queue. |
68 | // |
69 | // The steps for receiving a message are roughly: |
70 | // |
71 | // 1) Pop a message from the message queue |
72 | // 2) Pop a task handle from the wait queue |
73 | // 3) Decrement the channel message count. |
74 | // |
75 | // It's important for the order of operations on lock-free structures to happen |
76 | // in reverse order between the sender and receiver. This makes the message |
77 | // queue the primary coordination structure and establishes the necessary |
78 | // happens-before semantics required for the acquire / release semantics used |
79 | // by the queue structure. |
80 | |
81 | use futures_core::stream::{FusedStream, Stream}; |
82 | use futures_core::task::__internal::AtomicWaker; |
83 | use futures_core::task::{Context, Poll, Waker}; |
84 | use std::fmt; |
85 | use std::pin::Pin; |
86 | use std::sync::atomic::AtomicUsize; |
87 | use std::sync::atomic::Ordering::SeqCst; |
88 | use std::sync::{Arc, Mutex}; |
89 | use std::thread; |
90 | |
91 | use crate::mpsc::queue::Queue; |
92 | |
93 | mod queue; |
94 | #[cfg (feature = "sink" )] |
95 | mod sink_impl; |
96 | |
97 | struct UnboundedSenderInner<T> { |
98 | // Channel state shared between the sender and receiver. |
99 | inner: Arc<UnboundedInner<T>>, |
100 | } |
101 | |
102 | struct BoundedSenderInner<T> { |
103 | // Channel state shared between the sender and receiver. |
104 | inner: Arc<BoundedInner<T>>, |
105 | |
106 | // Handle to the task that is blocked on this sender. This handle is sent |
107 | // to the receiver half in order to be notified when the sender becomes |
108 | // unblocked. |
109 | sender_task: Arc<Mutex<SenderTask>>, |
110 | |
111 | // `true` if the sender might be blocked. This is an optimization to avoid |
112 | // having to lock the mutex most of the time. |
113 | maybe_parked: bool, |
114 | } |
115 | |
116 | // We never project Pin<&mut SenderInner> to `Pin<&mut T>` |
117 | impl<T> Unpin for UnboundedSenderInner<T> {} |
118 | impl<T> Unpin for BoundedSenderInner<T> {} |
119 | |
120 | /// The transmission end of a bounded mpsc channel. |
121 | /// |
122 | /// This value is created by the [`channel`] function. |
123 | pub struct Sender<T>(Option<BoundedSenderInner<T>>); |
124 | |
125 | /// The transmission end of an unbounded mpsc channel. |
126 | /// |
127 | /// This value is created by the [`unbounded`] function. |
128 | pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>); |
129 | |
130 | trait AssertKinds: Send + Sync + Clone {} |
131 | impl AssertKinds for UnboundedSender<u32> {} |
132 | |
133 | /// The receiving end of a bounded mpsc channel. |
134 | /// |
135 | /// This value is created by the [`channel`] function. |
136 | pub struct Receiver<T> { |
137 | inner: Option<Arc<BoundedInner<T>>>, |
138 | } |
139 | |
140 | /// The receiving end of an unbounded mpsc channel. |
141 | /// |
142 | /// This value is created by the [`unbounded`] function. |
143 | pub struct UnboundedReceiver<T> { |
144 | inner: Option<Arc<UnboundedInner<T>>>, |
145 | } |
146 | |
147 | // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>` |
148 | impl<T> Unpin for UnboundedReceiver<T> {} |
149 | |
150 | /// The error type for [`Sender`s](Sender) used as `Sink`s. |
151 | #[derive(Clone, Debug, PartialEq, Eq)] |
152 | pub struct SendError { |
153 | kind: SendErrorKind, |
154 | } |
155 | |
156 | /// The error type returned from [`try_send`](Sender::try_send). |
157 | #[derive(Clone, PartialEq, Eq)] |
158 | pub struct TrySendError<T> { |
159 | err: SendError, |
160 | val: T, |
161 | } |
162 | |
163 | #[derive(Clone, Debug, PartialEq, Eq)] |
164 | enum SendErrorKind { |
165 | Full, |
166 | Disconnected, |
167 | } |
168 | |
169 | /// The error type returned from [`try_next`](Receiver::try_next). |
170 | pub struct TryRecvError { |
171 | _priv: (), |
172 | } |
173 | |
174 | impl fmt::Display for SendError { |
175 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
176 | if self.is_full() { |
177 | write!(f, "send failed because channel is full" ) |
178 | } else { |
179 | write!(f, "send failed because receiver is gone" ) |
180 | } |
181 | } |
182 | } |
183 | |
184 | impl std::error::Error for SendError {} |
185 | |
186 | impl SendError { |
187 | /// Returns `true` if this error is a result of the channel being full. |
188 | pub fn is_full(&self) -> bool { |
189 | match self.kind { |
190 | SendErrorKind::Full => true, |
191 | _ => false, |
192 | } |
193 | } |
194 | |
195 | /// Returns `true` if this error is a result of the receiver being dropped. |
196 | pub fn is_disconnected(&self) -> bool { |
197 | match self.kind { |
198 | SendErrorKind::Disconnected => true, |
199 | _ => false, |
200 | } |
201 | } |
202 | } |
203 | |
204 | impl<T> fmt::Debug for TrySendError<T> { |
205 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
206 | f.debug_struct("TrySendError" ).field("kind" , &self.err.kind).finish() |
207 | } |
208 | } |
209 | |
210 | impl<T> fmt::Display for TrySendError<T> { |
211 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
212 | if self.is_full() { |
213 | write!(f, "send failed because channel is full" ) |
214 | } else { |
215 | write!(f, "send failed because receiver is gone" ) |
216 | } |
217 | } |
218 | } |
219 | |
220 | impl<T: core::any::Any> std::error::Error for TrySendError<T> {} |
221 | |
222 | impl<T> TrySendError<T> { |
223 | /// Returns `true` if this error is a result of the channel being full. |
224 | pub fn is_full(&self) -> bool { |
225 | self.err.is_full() |
226 | } |
227 | |
228 | /// Returns `true` if this error is a result of the receiver being dropped. |
229 | pub fn is_disconnected(&self) -> bool { |
230 | self.err.is_disconnected() |
231 | } |
232 | |
233 | /// Returns the message that was attempted to be sent but failed. |
234 | pub fn into_inner(self) -> T { |
235 | self.val |
236 | } |
237 | |
238 | /// Drops the message and converts into a `SendError`. |
239 | pub fn into_send_error(self) -> SendError { |
240 | self.err |
241 | } |
242 | } |
243 | |
244 | impl fmt::Debug for TryRecvError { |
245 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
246 | f.debug_tuple("TryRecvError" ).finish() |
247 | } |
248 | } |
249 | |
250 | impl fmt::Display for TryRecvError { |
251 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
252 | write!(f, "receiver channel is empty" ) |
253 | } |
254 | } |
255 | |
256 | impl std::error::Error for TryRecvError {} |
257 | |
258 | struct UnboundedInner<T> { |
259 | // Internal channel state. Consists of the number of messages stored in the |
260 | // channel as well as a flag signalling that the channel is closed. |
261 | state: AtomicUsize, |
262 | |
263 | // Atomic, FIFO queue used to send messages to the receiver |
264 | message_queue: Queue<T>, |
265 | |
266 | // Number of senders in existence |
267 | num_senders: AtomicUsize, |
268 | |
269 | // Handle to the receiver's task. |
270 | recv_task: AtomicWaker, |
271 | } |
272 | |
273 | struct BoundedInner<T> { |
274 | // Max buffer size of the channel. If `None` then the channel is unbounded. |
275 | buffer: usize, |
276 | |
277 | // Internal channel state. Consists of the number of messages stored in the |
278 | // channel as well as a flag signalling that the channel is closed. |
279 | state: AtomicUsize, |
280 | |
281 | // Atomic, FIFO queue used to send messages to the receiver |
282 | message_queue: Queue<T>, |
283 | |
284 | // Atomic, FIFO queue used to send parked task handles to the receiver. |
285 | parked_queue: Queue<Arc<Mutex<SenderTask>>>, |
286 | |
287 | // Number of senders in existence |
288 | num_senders: AtomicUsize, |
289 | |
290 | // Handle to the receiver's task. |
291 | recv_task: AtomicWaker, |
292 | } |
293 | |
294 | // Struct representation of `Inner::state`. |
295 | #[derive(Clone, Copy)] |
296 | struct State { |
297 | // `true` when the channel is open |
298 | is_open: bool, |
299 | |
300 | // Number of messages in the channel |
301 | num_messages: usize, |
302 | } |
303 | |
304 | // The `is_open` flag is stored in the left-most bit of `Inner::state` |
305 | const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1); |
306 | |
307 | // When a new channel is created, it is created in the open state with no |
308 | // pending messages. |
309 | const INIT_STATE: usize = OPEN_MASK; |
310 | |
311 | // The maximum number of messages that a channel can track is `usize::max_value() >> 1` |
312 | const MAX_CAPACITY: usize = !(OPEN_MASK); |
313 | |
314 | // The maximum requested buffer size must be less than the maximum capacity of |
315 | // a channel. This is because each sender gets a guaranteed slot. |
316 | const MAX_BUFFER: usize = MAX_CAPACITY >> 1; |
317 | |
318 | // Sent to the consumer to wake up blocked producers |
319 | struct SenderTask { |
320 | task: Option<Waker>, |
321 | is_parked: bool, |
322 | } |
323 | |
324 | impl SenderTask { |
325 | fn new() -> Self { |
326 | Self { task: None, is_parked: false } |
327 | } |
328 | |
329 | fn notify(&mut self) { |
330 | self.is_parked = false; |
331 | |
332 | if let Some(task) = self.task.take() { |
333 | task.wake(); |
334 | } |
335 | } |
336 | } |
337 | |
338 | /// Creates a bounded mpsc channel for communicating between asynchronous tasks. |
339 | /// |
340 | /// Being bounded, this channel provides backpressure to ensure that the sender |
341 | /// outpaces the receiver by only a limited amount. The channel's capacity is |
342 | /// equal to `buffer + num-senders`. In other words, each sender gets a |
343 | /// guaranteed slot in the channel capacity, and on top of that there are |
344 | /// `buffer` "first come, first serve" slots available to all senders. |
345 | /// |
346 | /// The [`Receiver`] returned implements the [`Stream`] trait, while [`Sender`] |
347 | /// implements `Sink`. |
348 | pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { |
349 | // Check that the requested buffer size does not exceed the maximum buffer |
350 | // size permitted by the system. |
351 | assert!(buffer < MAX_BUFFER, "requested buffer size too large" ); |
352 | |
353 | let inner = Arc::new(BoundedInner { |
354 | buffer, |
355 | state: AtomicUsize::new(INIT_STATE), |
356 | message_queue: Queue::new(), |
357 | parked_queue: Queue::new(), |
358 | num_senders: AtomicUsize::new(1), |
359 | recv_task: AtomicWaker::new(), |
360 | }); |
361 | |
362 | let tx = BoundedSenderInner { |
363 | inner: inner.clone(), |
364 | sender_task: Arc::new(Mutex::new(SenderTask::new())), |
365 | maybe_parked: false, |
366 | }; |
367 | |
368 | let rx = Receiver { inner: Some(inner) }; |
369 | |
370 | (Sender(Some(tx)), rx) |
371 | } |
372 | |
373 | /// Creates an unbounded mpsc channel for communicating between asynchronous |
374 | /// tasks. |
375 | /// |
376 | /// A `send` on this channel will always succeed as long as the receive half has |
377 | /// not been closed. If the receiver falls behind, messages will be arbitrarily |
378 | /// buffered. |
379 | /// |
380 | /// **Note** that the amount of available system memory is an implicit bound to |
381 | /// the channel. Using an `unbounded` channel has the ability of causing the |
382 | /// process to run out of memory. In this case, the process will be aborted. |
383 | pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { |
384 | let inner = Arc::new(UnboundedInner { |
385 | state: AtomicUsize::new(INIT_STATE), |
386 | message_queue: Queue::new(), |
387 | num_senders: AtomicUsize::new(1), |
388 | recv_task: AtomicWaker::new(), |
389 | }); |
390 | |
391 | let tx = UnboundedSenderInner { inner: inner.clone() }; |
392 | |
393 | let rx = UnboundedReceiver { inner: Some(inner) }; |
394 | |
395 | (UnboundedSender(Some(tx)), rx) |
396 | } |
397 | |
398 | /* |
399 | * |
400 | * ===== impl Sender ===== |
401 | * |
402 | */ |
403 | |
404 | impl<T> UnboundedSenderInner<T> { |
405 | fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> { |
406 | let state = decode_state(self.inner.state.load(SeqCst)); |
407 | if state.is_open { |
408 | Poll::Ready(Ok(())) |
409 | } else { |
410 | Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) |
411 | } |
412 | } |
413 | |
414 | // Push message to the queue and signal to the receiver |
415 | fn queue_push_and_signal(&self, msg: T) { |
416 | // Push the message onto the message queue |
417 | self.inner.message_queue.push(msg); |
418 | |
419 | // Signal to the receiver that a message has been enqueued. If the |
420 | // receiver is parked, this will unpark the task. |
421 | self.inner.recv_task.wake(); |
422 | } |
423 | |
424 | // Increment the number of queued messages. Returns the resulting number. |
425 | fn inc_num_messages(&self) -> Option<usize> { |
426 | let mut curr = self.inner.state.load(SeqCst); |
427 | |
428 | loop { |
429 | let mut state = decode_state(curr); |
430 | |
431 | // The receiver end closed the channel. |
432 | if !state.is_open { |
433 | return None; |
434 | } |
435 | |
436 | // This probably is never hit? Odds are the process will run out of |
437 | // memory first. It may be worth to return something else in this |
438 | // case? |
439 | assert!( |
440 | state.num_messages < MAX_CAPACITY, |
441 | "buffer space \ |
442 | exhausted; sending this messages would overflow the state" |
443 | ); |
444 | |
445 | state.num_messages += 1; |
446 | |
447 | let next = encode_state(&state); |
448 | match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { |
449 | Ok(_) => return Some(state.num_messages), |
450 | Err(actual) => curr = actual, |
451 | } |
452 | } |
453 | } |
454 | |
455 | /// Returns whether the senders send to the same receiver. |
456 | fn same_receiver(&self, other: &Self) -> bool { |
457 | Arc::ptr_eq(&self.inner, &other.inner) |
458 | } |
459 | |
460 | /// Returns whether the sender send to this receiver. |
461 | fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool { |
462 | Arc::ptr_eq(&self.inner, inner) |
463 | } |
464 | |
465 | /// Returns pointer to the Arc containing sender |
466 | /// |
467 | /// The returned pointer is not referenced and should be only used for hashing! |
468 | fn ptr(&self) -> *const UnboundedInner<T> { |
469 | &*self.inner |
470 | } |
471 | |
472 | /// Returns whether this channel is closed without needing a context. |
473 | fn is_closed(&self) -> bool { |
474 | !decode_state(self.inner.state.load(SeqCst)).is_open |
475 | } |
476 | |
477 | /// Closes this channel from the sender side, preventing any new messages. |
478 | fn close_channel(&self) { |
479 | // There's no need to park this sender, its dropping, |
480 | // and we don't want to check for capacity, so skip |
481 | // that stuff from `do_send`. |
482 | |
483 | self.inner.set_closed(); |
484 | self.inner.recv_task.wake(); |
485 | } |
486 | } |
487 | |
488 | impl<T> BoundedSenderInner<T> { |
489 | /// Attempts to send a message on this `Sender`, returning the message |
490 | /// if there was an error. |
491 | fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
492 | // If the sender is currently blocked, reject the message |
493 | if !self.poll_unparked(None).is_ready() { |
494 | return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); |
495 | } |
496 | |
497 | // The channel has capacity to accept the message, so send it |
498 | self.do_send_b(msg) |
499 | } |
500 | |
501 | // Do the send without failing. |
502 | // Can be called only by bounded sender. |
503 | fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
504 | // Anyone calling do_send *should* make sure there is room first, |
505 | // but assert here for tests as a sanity check. |
506 | debug_assert!(self.poll_unparked(None).is_ready()); |
507 | |
508 | // First, increment the number of messages contained by the channel. |
509 | // This operation will also atomically determine if the sender task |
510 | // should be parked. |
511 | // |
512 | // `None` is returned in the case that the channel has been closed by the |
513 | // receiver. This happens when `Receiver::close` is called or the |
514 | // receiver is dropped. |
515 | let park_self = match self.inc_num_messages() { |
516 | Some(num_messages) => { |
517 | // Block if the current number of pending messages has exceeded |
518 | // the configured buffer size |
519 | num_messages > self.inner.buffer |
520 | } |
521 | None => { |
522 | return Err(TrySendError { |
523 | err: SendError { kind: SendErrorKind::Disconnected }, |
524 | val: msg, |
525 | }) |
526 | } |
527 | }; |
528 | |
529 | // If the channel has reached capacity, then the sender task needs to |
530 | // be parked. This will send the task handle on the parked task queue. |
531 | // |
532 | // However, when `do_send` is called while dropping the `Sender`, |
533 | // `task::current()` can't be called safely. In this case, in order to |
534 | // maintain internal consistency, a blank message is pushed onto the |
535 | // parked task queue. |
536 | if park_self { |
537 | self.park(); |
538 | } |
539 | |
540 | self.queue_push_and_signal(msg); |
541 | |
542 | Ok(()) |
543 | } |
544 | |
545 | // Push message to the queue and signal to the receiver |
546 | fn queue_push_and_signal(&self, msg: T) { |
547 | // Push the message onto the message queue |
548 | self.inner.message_queue.push(msg); |
549 | |
550 | // Signal to the receiver that a message has been enqueued. If the |
551 | // receiver is parked, this will unpark the task. |
552 | self.inner.recv_task.wake(); |
553 | } |
554 | |
555 | // Increment the number of queued messages. Returns the resulting number. |
556 | fn inc_num_messages(&self) -> Option<usize> { |
557 | let mut curr = self.inner.state.load(SeqCst); |
558 | |
559 | loop { |
560 | let mut state = decode_state(curr); |
561 | |
562 | // The receiver end closed the channel. |
563 | if !state.is_open { |
564 | return None; |
565 | } |
566 | |
567 | // This probably is never hit? Odds are the process will run out of |
568 | // memory first. It may be worth to return something else in this |
569 | // case? |
570 | assert!( |
571 | state.num_messages < MAX_CAPACITY, |
572 | "buffer space \ |
573 | exhausted; sending this messages would overflow the state" |
574 | ); |
575 | |
576 | state.num_messages += 1; |
577 | |
578 | let next = encode_state(&state); |
579 | match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { |
580 | Ok(_) => return Some(state.num_messages), |
581 | Err(actual) => curr = actual, |
582 | } |
583 | } |
584 | } |
585 | |
586 | fn park(&mut self) { |
587 | { |
588 | let mut sender = self.sender_task.lock().unwrap(); |
589 | sender.task = None; |
590 | sender.is_parked = true; |
591 | } |
592 | |
593 | // Send handle over queue |
594 | let t = self.sender_task.clone(); |
595 | self.inner.parked_queue.push(t); |
596 | |
597 | // Check to make sure we weren't closed after we sent our task on the |
598 | // queue |
599 | let state = decode_state(self.inner.state.load(SeqCst)); |
600 | self.maybe_parked = state.is_open; |
601 | } |
602 | |
603 | /// Polls the channel to determine if there is guaranteed capacity to send |
604 | /// at least one item without waiting. |
605 | /// |
606 | /// # Return value |
607 | /// |
608 | /// This method returns: |
609 | /// |
610 | /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; |
611 | /// - `Poll::Pending` if the channel may not have |
612 | /// capacity, in which case the current task is queued to be notified once |
613 | /// capacity is available; |
614 | /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. |
615 | fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
616 | let state = decode_state(self.inner.state.load(SeqCst)); |
617 | if !state.is_open { |
618 | return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); |
619 | } |
620 | |
621 | self.poll_unparked(Some(cx)).map(Ok) |
622 | } |
623 | |
624 | /// Returns whether the senders send to the same receiver. |
625 | fn same_receiver(&self, other: &Self) -> bool { |
626 | Arc::ptr_eq(&self.inner, &other.inner) |
627 | } |
628 | |
629 | /// Returns whether the sender send to this receiver. |
630 | fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool { |
631 | Arc::ptr_eq(&self.inner, receiver) |
632 | } |
633 | |
634 | /// Returns pointer to the Arc containing sender |
635 | /// |
636 | /// The returned pointer is not referenced and should be only used for hashing! |
637 | fn ptr(&self) -> *const BoundedInner<T> { |
638 | &*self.inner |
639 | } |
640 | |
641 | /// Returns whether this channel is closed without needing a context. |
642 | fn is_closed(&self) -> bool { |
643 | !decode_state(self.inner.state.load(SeqCst)).is_open |
644 | } |
645 | |
646 | /// Closes this channel from the sender side, preventing any new messages. |
647 | fn close_channel(&self) { |
648 | // There's no need to park this sender, its dropping, |
649 | // and we don't want to check for capacity, so skip |
650 | // that stuff from `do_send`. |
651 | |
652 | self.inner.set_closed(); |
653 | self.inner.recv_task.wake(); |
654 | } |
655 | |
656 | fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> { |
657 | // First check the `maybe_parked` variable. This avoids acquiring the |
658 | // lock in most cases |
659 | if self.maybe_parked { |
660 | // Get a lock on the task handle |
661 | let mut task = self.sender_task.lock().unwrap(); |
662 | |
663 | if !task.is_parked { |
664 | self.maybe_parked = false; |
665 | return Poll::Ready(()); |
666 | } |
667 | |
668 | // At this point, an unpark request is pending, so there will be an |
669 | // unpark sometime in the future. We just need to make sure that |
670 | // the correct task will be notified. |
671 | // |
672 | // Update the task in case the `Sender` has been moved to another |
673 | // task |
674 | task.task = cx.map(|cx| cx.waker().clone()); |
675 | |
676 | Poll::Pending |
677 | } else { |
678 | Poll::Ready(()) |
679 | } |
680 | } |
681 | } |
682 | |
683 | impl<T> Sender<T> { |
684 | /// Attempts to send a message on this `Sender`, returning the message |
685 | /// if there was an error. |
686 | pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
687 | if let Some(inner) = &mut self.0 { |
688 | inner.try_send(msg) |
689 | } else { |
690 | Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) |
691 | } |
692 | } |
693 | |
694 | /// Send a message on the channel. |
695 | /// |
696 | /// This function should only be called after |
697 | /// [`poll_ready`](Sender::poll_ready) has reported that the channel is |
698 | /// ready to receive a message. |
699 | pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { |
700 | self.try_send(msg).map_err(|e| e.err) |
701 | } |
702 | |
703 | /// Polls the channel to determine if there is guaranteed capacity to send |
704 | /// at least one item without waiting. |
705 | /// |
706 | /// # Return value |
707 | /// |
708 | /// This method returns: |
709 | /// |
710 | /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; |
711 | /// - `Poll::Pending` if the channel may not have |
712 | /// capacity, in which case the current task is queued to be notified once |
713 | /// capacity is available; |
714 | /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. |
715 | pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
716 | let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; |
717 | inner.poll_ready(cx) |
718 | } |
719 | |
720 | /// Returns whether this channel is closed without needing a context. |
721 | pub fn is_closed(&self) -> bool { |
722 | self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true) |
723 | } |
724 | |
725 | /// Closes this channel from the sender side, preventing any new messages. |
726 | pub fn close_channel(&mut self) { |
727 | if let Some(inner) = &mut self.0 { |
728 | inner.close_channel(); |
729 | } |
730 | } |
731 | |
732 | /// Disconnects this sender from the channel, closing it if there are no more senders left. |
733 | pub fn disconnect(&mut self) { |
734 | self.0 = None; |
735 | } |
736 | |
737 | /// Returns whether the senders send to the same receiver. |
738 | pub fn same_receiver(&self, other: &Self) -> bool { |
739 | match (&self.0, &other.0) { |
740 | (Some(inner), Some(other)) => inner.same_receiver(other), |
741 | _ => false, |
742 | } |
743 | } |
744 | |
745 | /// Returns whether the sender send to this receiver. |
746 | pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool { |
747 | match (&self.0, &receiver.inner) { |
748 | (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), |
749 | _ => false, |
750 | } |
751 | } |
752 | |
753 | /// Hashes the receiver into the provided hasher |
754 | pub fn hash_receiver<H>(&self, hasher: &mut H) |
755 | where |
756 | H: std::hash::Hasher, |
757 | { |
758 | use std::hash::Hash; |
759 | |
760 | let ptr = self.0.as_ref().map(|inner| inner.ptr()); |
761 | ptr.hash(hasher); |
762 | } |
763 | } |
764 | |
765 | impl<T> UnboundedSender<T> { |
766 | /// Check if the channel is ready to receive a message. |
767 | pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
768 | let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; |
769 | inner.poll_ready_nb() |
770 | } |
771 | |
772 | /// Returns whether this channel is closed without needing a context. |
773 | pub fn is_closed(&self) -> bool { |
774 | self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true) |
775 | } |
776 | |
777 | /// Closes this channel from the sender side, preventing any new messages. |
778 | pub fn close_channel(&self) { |
779 | if let Some(inner) = &self.0 { |
780 | inner.close_channel(); |
781 | } |
782 | } |
783 | |
784 | /// Disconnects this sender from the channel, closing it if there are no more senders left. |
785 | pub fn disconnect(&mut self) { |
786 | self.0 = None; |
787 | } |
788 | |
789 | // Do the send without parking current task. |
790 | fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> { |
791 | if let Some(inner) = &self.0 { |
792 | if inner.inc_num_messages().is_some() { |
793 | inner.queue_push_and_signal(msg); |
794 | return Ok(()); |
795 | } |
796 | } |
797 | |
798 | Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) |
799 | } |
800 | |
801 | /// Send a message on the channel. |
802 | /// |
803 | /// This method should only be called after `poll_ready` has been used to |
804 | /// verify that the channel is ready to receive a message. |
805 | pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { |
806 | self.do_send_nb(msg).map_err(|e| e.err) |
807 | } |
808 | |
809 | /// Sends a message along this channel. |
810 | /// |
811 | /// This is an unbounded sender, so this function differs from `Sink::send` |
812 | /// by ensuring the return type reflects that the channel is always ready to |
813 | /// receive messages. |
814 | pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
815 | self.do_send_nb(msg) |
816 | } |
817 | |
818 | /// Returns whether the senders send to the same receiver. |
819 | pub fn same_receiver(&self, other: &Self) -> bool { |
820 | match (&self.0, &other.0) { |
821 | (Some(inner), Some(other)) => inner.same_receiver(other), |
822 | _ => false, |
823 | } |
824 | } |
825 | |
826 | /// Returns whether the sender send to this receiver. |
827 | pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool { |
828 | match (&self.0, &receiver.inner) { |
829 | (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), |
830 | _ => false, |
831 | } |
832 | } |
833 | |
834 | /// Hashes the receiver into the provided hasher |
835 | pub fn hash_receiver<H>(&self, hasher: &mut H) |
836 | where |
837 | H: std::hash::Hasher, |
838 | { |
839 | use std::hash::Hash; |
840 | |
841 | let ptr = self.0.as_ref().map(|inner| inner.ptr()); |
842 | ptr.hash(hasher); |
843 | } |
844 | |
845 | /// Return the number of messages in the queue or 0 if channel is disconnected. |
846 | pub fn len(&self) -> usize { |
847 | if let Some(sender) = &self.0 { |
848 | decode_state(sender.inner.state.load(SeqCst)).num_messages |
849 | } else { |
850 | 0 |
851 | } |
852 | } |
853 | |
854 | /// Return false is channel has no queued messages, true otherwise. |
855 | pub fn is_empty(&self) -> bool { |
856 | self.len() == 0 |
857 | } |
858 | } |
859 | |
860 | impl<T> Clone for Sender<T> { |
861 | fn clone(&self) -> Self { |
862 | Self(self.0.clone()) |
863 | } |
864 | } |
865 | |
866 | impl<T> Clone for UnboundedSender<T> { |
867 | fn clone(&self) -> Self { |
868 | Self(self.0.clone()) |
869 | } |
870 | } |
871 | |
872 | impl<T> Clone for UnboundedSenderInner<T> { |
873 | fn clone(&self) -> Self { |
874 | // Since this atomic op isn't actually guarding any memory and we don't |
875 | // care about any orderings besides the ordering on the single atomic |
876 | // variable, a relaxed ordering is acceptable. |
877 | let mut curr = self.inner.num_senders.load(SeqCst); |
878 | |
879 | loop { |
880 | // If the maximum number of senders has been reached, then fail |
881 | if curr == MAX_BUFFER { |
882 | panic!("cannot clone `Sender` -- too many outstanding senders" ); |
883 | } |
884 | |
885 | debug_assert!(curr < MAX_BUFFER); |
886 | |
887 | let next = curr + 1; |
888 | match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { |
889 | Ok(_) => { |
890 | // The ABA problem doesn't matter here. We only care that the |
891 | // number of senders never exceeds the maximum. |
892 | return Self { inner: self.inner.clone() }; |
893 | } |
894 | Err(actual) => curr = actual, |
895 | } |
896 | } |
897 | } |
898 | } |
899 | |
900 | impl<T> Clone for BoundedSenderInner<T> { |
901 | fn clone(&self) -> Self { |
902 | // Since this atomic op isn't actually guarding any memory and we don't |
903 | // care about any orderings besides the ordering on the single atomic |
904 | // variable, a relaxed ordering is acceptable. |
905 | let mut curr = self.inner.num_senders.load(SeqCst); |
906 | |
907 | loop { |
908 | // If the maximum number of senders has been reached, then fail |
909 | if curr == self.inner.max_senders() { |
910 | panic!("cannot clone `Sender` -- too many outstanding senders" ); |
911 | } |
912 | |
913 | debug_assert!(curr < self.inner.max_senders()); |
914 | |
915 | let next = curr + 1; |
916 | match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { |
917 | Ok(_) => { |
918 | // The ABA problem doesn't matter here. We only care that the |
919 | // number of senders never exceeds the maximum. |
920 | return Self { |
921 | inner: self.inner.clone(), |
922 | sender_task: Arc::new(Mutex::new(SenderTask::new())), |
923 | maybe_parked: false, |
924 | }; |
925 | } |
926 | Err(actual) => curr = actual, |
927 | } |
928 | } |
929 | } |
930 | } |
931 | |
932 | impl<T> Drop for UnboundedSenderInner<T> { |
933 | fn drop(&mut self) { |
934 | // Ordering between variables don't matter here |
935 | let prev = self.inner.num_senders.fetch_sub(1, SeqCst); |
936 | |
937 | if prev == 1 { |
938 | self.close_channel(); |
939 | } |
940 | } |
941 | } |
942 | |
943 | impl<T> Drop for BoundedSenderInner<T> { |
944 | fn drop(&mut self) { |
945 | // Ordering between variables don't matter here |
946 | let prev = self.inner.num_senders.fetch_sub(1, SeqCst); |
947 | |
948 | if prev == 1 { |
949 | self.close_channel(); |
950 | } |
951 | } |
952 | } |
953 | |
954 | impl<T> fmt::Debug for Sender<T> { |
955 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
956 | f.debug_struct("Sender" ).field("closed" , &self.is_closed()).finish() |
957 | } |
958 | } |
959 | |
960 | impl<T> fmt::Debug for UnboundedSender<T> { |
961 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
962 | f.debug_struct("UnboundedSender" ).field("closed" , &self.is_closed()).finish() |
963 | } |
964 | } |
965 | |
966 | /* |
967 | * |
968 | * ===== impl Receiver ===== |
969 | * |
970 | */ |
971 | |
972 | impl<T> Receiver<T> { |
973 | /// Closes the receiving half of a channel, without dropping it. |
974 | /// |
975 | /// This prevents any further messages from being sent on the channel while |
976 | /// still enabling the receiver to drain messages that are buffered. |
977 | pub fn close(&mut self) { |
978 | if let Some(inner) = &mut self.inner { |
979 | inner.set_closed(); |
980 | |
981 | // Wake up any threads waiting as they'll see that we've closed the |
982 | // channel and will continue on their merry way. |
983 | while let Some(task) = unsafe { inner.parked_queue.pop_spin() } { |
984 | task.lock().unwrap().notify(); |
985 | } |
986 | } |
987 | } |
988 | |
989 | /// Tries to receive the next message without notifying a context if empty. |
990 | /// |
991 | /// It is not recommended to call this function from inside of a future, |
992 | /// only when you've otherwise arranged to be notified when the channel is |
993 | /// no longer empty. |
994 | /// |
995 | /// This function returns: |
996 | /// * `Ok(Some(t))` when message is fetched |
997 | /// * `Ok(None)` when channel is closed and no messages left in the queue |
998 | /// * `Err(e)` when there are no messages available, but channel is not yet closed |
999 | pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { |
1000 | match self.next_message() { |
1001 | Poll::Ready(msg) => Ok(msg), |
1002 | Poll::Pending => Err(TryRecvError { _priv: () }), |
1003 | } |
1004 | } |
1005 | |
1006 | fn next_message(&mut self) -> Poll<Option<T>> { |
1007 | let inner = match self.inner.as_mut() { |
1008 | None => return Poll::Ready(None), |
1009 | Some(inner) => inner, |
1010 | }; |
1011 | // Pop off a message |
1012 | match unsafe { inner.message_queue.pop_spin() } { |
1013 | Some(msg) => { |
1014 | // If there are any parked task handles in the parked queue, |
1015 | // pop one and unpark it. |
1016 | self.unpark_one(); |
1017 | |
1018 | // Decrement number of messages |
1019 | self.dec_num_messages(); |
1020 | |
1021 | Poll::Ready(Some(msg)) |
1022 | } |
1023 | None => { |
1024 | let state = decode_state(inner.state.load(SeqCst)); |
1025 | if state.is_closed() { |
1026 | // If closed flag is set AND there are no pending messages |
1027 | // it means end of stream |
1028 | self.inner = None; |
1029 | Poll::Ready(None) |
1030 | } else { |
1031 | // If queue is open, we need to return Pending |
1032 | // to be woken up when new messages arrive. |
1033 | // If queue is closed but num_messages is non-zero, |
1034 | // it means that senders updated the state, |
1035 | // but didn't put message to queue yet, |
1036 | // so we need to park until sender unparks the task |
1037 | // after queueing the message. |
1038 | Poll::Pending |
1039 | } |
1040 | } |
1041 | } |
1042 | } |
1043 | |
1044 | // Unpark a single task handle if there is one pending in the parked queue |
1045 | fn unpark_one(&mut self) { |
1046 | if let Some(inner) = &mut self.inner { |
1047 | if let Some(task) = unsafe { inner.parked_queue.pop_spin() } { |
1048 | task.lock().unwrap().notify(); |
1049 | } |
1050 | } |
1051 | } |
1052 | |
1053 | fn dec_num_messages(&self) { |
1054 | if let Some(inner) = &self.inner { |
1055 | // OPEN_MASK is highest bit, so it's unaffected by subtraction |
1056 | // unless there's underflow, and we know there's no underflow |
1057 | // because number of messages at this point is always > 0. |
1058 | inner.state.fetch_sub(1, SeqCst); |
1059 | } |
1060 | } |
1061 | } |
1062 | |
1063 | // The receiver does not ever take a Pin to the inner T |
1064 | impl<T> Unpin for Receiver<T> {} |
1065 | |
1066 | impl<T> FusedStream for Receiver<T> { |
1067 | fn is_terminated(&self) -> bool { |
1068 | self.inner.is_none() |
1069 | } |
1070 | } |
1071 | |
1072 | impl<T> Stream for Receiver<T> { |
1073 | type Item = T; |
1074 | |
1075 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
1076 | // Try to read a message off of the message queue. |
1077 | match self.next_message() { |
1078 | Poll::Ready(msg) => { |
1079 | if msg.is_none() { |
1080 | self.inner = None; |
1081 | } |
1082 | Poll::Ready(msg) |
1083 | } |
1084 | Poll::Pending => { |
1085 | // There are no messages to read, in this case, park. |
1086 | self.inner.as_ref().unwrap().recv_task.register(cx.waker()); |
1087 | // Check queue again after parking to prevent race condition: |
1088 | // a message could be added to the queue after previous `next_message` |
1089 | // before `register` call. |
1090 | self.next_message() |
1091 | } |
1092 | } |
1093 | } |
1094 | |
1095 | fn size_hint(&self) -> (usize, Option<usize>) { |
1096 | if let Some(inner) = &self.inner { |
1097 | decode_state(inner.state.load(SeqCst)).size_hint() |
1098 | } else { |
1099 | (0, Some(0)) |
1100 | } |
1101 | } |
1102 | } |
1103 | |
1104 | impl<T> Drop for Receiver<T> { |
1105 | fn drop(&mut self) { |
1106 | // Drain the channel of all pending messages |
1107 | self.close(); |
1108 | if self.inner.is_some() { |
1109 | loop { |
1110 | match self.next_message() { |
1111 | Poll::Ready(Some(_)) => {} |
1112 | Poll::Ready(None) => break, |
1113 | Poll::Pending => { |
1114 | let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); |
1115 | |
1116 | // If the channel is closed, then there is no need to park. |
1117 | if state.is_closed() { |
1118 | break; |
1119 | } |
1120 | |
1121 | // TODO: Spinning isn't ideal, it might be worth |
1122 | // investigating using a condvar or some other strategy |
1123 | // here. That said, if this case is hit, then another thread |
1124 | // is about to push the value into the queue and this isn't |
1125 | // the only spinlock in the impl right now. |
1126 | thread::yield_now(); |
1127 | } |
1128 | } |
1129 | } |
1130 | } |
1131 | } |
1132 | } |
1133 | |
1134 | impl<T> fmt::Debug for Receiver<T> { |
1135 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1136 | let closed = if let Some(ref inner) = self.inner { |
1137 | decode_state(inner.state.load(SeqCst)).is_closed() |
1138 | } else { |
1139 | false |
1140 | }; |
1141 | |
1142 | f.debug_struct("Receiver" ).field("closed" , &closed).finish() |
1143 | } |
1144 | } |
1145 | |
1146 | impl<T> UnboundedReceiver<T> { |
1147 | /// Closes the receiving half of a channel, without dropping it. |
1148 | /// |
1149 | /// This prevents any further messages from being sent on the channel while |
1150 | /// still enabling the receiver to drain messages that are buffered. |
1151 | pub fn close(&mut self) { |
1152 | if let Some(inner) = &mut self.inner { |
1153 | inner.set_closed(); |
1154 | } |
1155 | } |
1156 | |
1157 | /// Tries to receive the next message without notifying a context if empty. |
1158 | /// |
1159 | /// It is not recommended to call this function from inside of a future, |
1160 | /// only when you've otherwise arranged to be notified when the channel is |
1161 | /// no longer empty. |
1162 | /// |
1163 | /// This function returns: |
1164 | /// * `Ok(Some(t))` when message is fetched |
1165 | /// * `Ok(None)` when channel is closed and no messages left in the queue |
1166 | /// * `Err(e)` when there are no messages available, but channel is not yet closed |
1167 | pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { |
1168 | match self.next_message() { |
1169 | Poll::Ready(msg) => Ok(msg), |
1170 | Poll::Pending => Err(TryRecvError { _priv: () }), |
1171 | } |
1172 | } |
1173 | |
1174 | fn next_message(&mut self) -> Poll<Option<T>> { |
1175 | let inner = match self.inner.as_mut() { |
1176 | None => return Poll::Ready(None), |
1177 | Some(inner) => inner, |
1178 | }; |
1179 | // Pop off a message |
1180 | match unsafe { inner.message_queue.pop_spin() } { |
1181 | Some(msg) => { |
1182 | // Decrement number of messages |
1183 | self.dec_num_messages(); |
1184 | |
1185 | Poll::Ready(Some(msg)) |
1186 | } |
1187 | None => { |
1188 | let state = decode_state(inner.state.load(SeqCst)); |
1189 | if state.is_closed() { |
1190 | // If closed flag is set AND there are no pending messages |
1191 | // it means end of stream |
1192 | self.inner = None; |
1193 | Poll::Ready(None) |
1194 | } else { |
1195 | // If queue is open, we need to return Pending |
1196 | // to be woken up when new messages arrive. |
1197 | // If queue is closed but num_messages is non-zero, |
1198 | // it means that senders updated the state, |
1199 | // but didn't put message to queue yet, |
1200 | // so we need to park until sender unparks the task |
1201 | // after queueing the message. |
1202 | Poll::Pending |
1203 | } |
1204 | } |
1205 | } |
1206 | } |
1207 | |
1208 | fn dec_num_messages(&self) { |
1209 | if let Some(inner) = &self.inner { |
1210 | // OPEN_MASK is highest bit, so it's unaffected by subtraction |
1211 | // unless there's underflow, and we know there's no underflow |
1212 | // because number of messages at this point is always > 0. |
1213 | inner.state.fetch_sub(1, SeqCst); |
1214 | } |
1215 | } |
1216 | } |
1217 | |
1218 | impl<T> FusedStream for UnboundedReceiver<T> { |
1219 | fn is_terminated(&self) -> bool { |
1220 | self.inner.is_none() |
1221 | } |
1222 | } |
1223 | |
1224 | impl<T> Stream for UnboundedReceiver<T> { |
1225 | type Item = T; |
1226 | |
1227 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
1228 | // Try to read a message off of the message queue. |
1229 | match self.next_message() { |
1230 | Poll::Ready(msg) => { |
1231 | if msg.is_none() { |
1232 | self.inner = None; |
1233 | } |
1234 | Poll::Ready(msg) |
1235 | } |
1236 | Poll::Pending => { |
1237 | // There are no messages to read, in this case, park. |
1238 | self.inner.as_ref().unwrap().recv_task.register(cx.waker()); |
1239 | // Check queue again after parking to prevent race condition: |
1240 | // a message could be added to the queue after previous `next_message` |
1241 | // before `register` call. |
1242 | self.next_message() |
1243 | } |
1244 | } |
1245 | } |
1246 | |
1247 | fn size_hint(&self) -> (usize, Option<usize>) { |
1248 | if let Some(inner) = &self.inner { |
1249 | decode_state(inner.state.load(SeqCst)).size_hint() |
1250 | } else { |
1251 | (0, Some(0)) |
1252 | } |
1253 | } |
1254 | } |
1255 | |
1256 | impl<T> Drop for UnboundedReceiver<T> { |
1257 | fn drop(&mut self) { |
1258 | // Drain the channel of all pending messages |
1259 | self.close(); |
1260 | if self.inner.is_some() { |
1261 | loop { |
1262 | match self.next_message() { |
1263 | Poll::Ready(Some(_)) => {} |
1264 | Poll::Ready(None) => break, |
1265 | Poll::Pending => { |
1266 | let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); |
1267 | |
1268 | // If the channel is closed, then there is no need to park. |
1269 | if state.is_closed() { |
1270 | break; |
1271 | } |
1272 | |
1273 | // TODO: Spinning isn't ideal, it might be worth |
1274 | // investigating using a condvar or some other strategy |
1275 | // here. That said, if this case is hit, then another thread |
1276 | // is about to push the value into the queue and this isn't |
1277 | // the only spinlock in the impl right now. |
1278 | thread::yield_now(); |
1279 | } |
1280 | } |
1281 | } |
1282 | } |
1283 | } |
1284 | } |
1285 | |
1286 | impl<T> fmt::Debug for UnboundedReceiver<T> { |
1287 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1288 | let closed = if let Some(ref inner) = self.inner { |
1289 | decode_state(inner.state.load(SeqCst)).is_closed() |
1290 | } else { |
1291 | false |
1292 | }; |
1293 | |
1294 | f.debug_struct("Receiver" ).field("closed" , &closed).finish() |
1295 | } |
1296 | } |
1297 | |
1298 | /* |
1299 | * |
1300 | * ===== impl Inner ===== |
1301 | * |
1302 | */ |
1303 | |
1304 | impl<T> UnboundedInner<T> { |
1305 | // Clear `open` flag in the state, keep `num_messages` intact. |
1306 | fn set_closed(&self) { |
1307 | let curr = self.state.load(SeqCst); |
1308 | if !decode_state(curr).is_open { |
1309 | return; |
1310 | } |
1311 | |
1312 | self.state.fetch_and(!OPEN_MASK, SeqCst); |
1313 | } |
1314 | } |
1315 | |
1316 | impl<T> BoundedInner<T> { |
1317 | // The return value is such that the total number of messages that can be |
1318 | // enqueued into the channel will never exceed MAX_CAPACITY |
1319 | fn max_senders(&self) -> usize { |
1320 | MAX_CAPACITY - self.buffer |
1321 | } |
1322 | |
1323 | // Clear `open` flag in the state, keep `num_messages` intact. |
1324 | fn set_closed(&self) { |
1325 | let curr = self.state.load(SeqCst); |
1326 | if !decode_state(curr).is_open { |
1327 | return; |
1328 | } |
1329 | |
1330 | self.state.fetch_and(!OPEN_MASK, SeqCst); |
1331 | } |
1332 | } |
1333 | |
1334 | unsafe impl<T: Send> Send for UnboundedInner<T> {} |
1335 | unsafe impl<T: Send> Sync for UnboundedInner<T> {} |
1336 | |
1337 | unsafe impl<T: Send> Send for BoundedInner<T> {} |
1338 | unsafe impl<T: Send> Sync for BoundedInner<T> {} |
1339 | |
1340 | impl State { |
1341 | fn is_closed(&self) -> bool { |
1342 | !self.is_open && self.num_messages == 0 |
1343 | } |
1344 | |
1345 | fn size_hint(&self) -> (usize, Option<usize>) { |
1346 | if self.is_open { |
1347 | (self.num_messages, None) |
1348 | } else { |
1349 | (self.num_messages, Some(self.num_messages)) |
1350 | } |
1351 | } |
1352 | } |
1353 | |
1354 | /* |
1355 | * |
1356 | * ===== Helpers ===== |
1357 | * |
1358 | */ |
1359 | |
1360 | fn decode_state(num: usize) -> State { |
1361 | State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } |
1362 | } |
1363 | |
1364 | fn encode_state(state: &State) -> usize { |
1365 | let mut num = state.num_messages; |
1366 | |
1367 | if state.is_open { |
1368 | num |= OPEN_MASK; |
1369 | } |
1370 | |
1371 | num |
1372 | } |
1373 | |