1 | //! A multi-producer, single-consumer queue for sending values across |
---|---|
2 | //! asynchronous tasks. |
3 | //! |
4 | //! Similarly to the `std`, channel creation provides [`Receiver`] and |
5 | //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to |
6 | //! read values out of the channel. If there is no message to read from the |
7 | //! channel, the current task will be notified when a new value is sent. |
8 | //! [`Sender`] implements the `Sink` trait and allows a task to send messages into |
9 | //! the channel. If the channel is at capacity, the send will be rejected and |
10 | //! the task will be notified when additional capacity is available. In other |
11 | //! words, the channel provides backpressure. |
12 | //! |
13 | //! Unbounded channels are also available using the `unbounded` constructor. |
14 | //! |
15 | //! # Disconnection |
16 | //! |
17 | //! When all [`Sender`] handles have been dropped, it is no longer |
18 | //! possible to send values into the channel. This is considered the termination |
19 | //! event of the stream. As such, [`Receiver::poll_next`] |
20 | //! will return `Ok(Ready(None))`. |
21 | //! |
22 | //! If the [`Receiver`] handle is dropped, then messages can no longer |
23 | //! be read out of the channel. In this case, all further attempts to send will |
24 | //! result in an error. |
25 | //! |
26 | //! # Clean Shutdown |
27 | //! |
28 | //! If the [`Receiver`] is simply dropped, then it is possible for |
29 | //! there to be messages still in the channel that will not be processed. As |
30 | //! such, it is usually desirable to perform a "clean" shutdown. To do this, the |
31 | //! receiver will first call `close`, which will prevent any further messages to |
32 | //! be sent into the channel. Then, the receiver consumes the channel to |
33 | //! completion, at which point the receiver can be dropped. |
34 | //! |
35 | //! [`Sender`]: struct.Sender.html |
36 | //! [`Receiver`]: struct.Receiver.html |
37 | //! [`Stream`]: ../../futures_core/stream/trait.Stream.html |
38 | //! [`Receiver::poll_next`]: |
39 | //! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next |
40 | |
41 | // At the core, the channel uses an atomic FIFO queue for message passing. This |
42 | // queue is used as the primary coordination primitive. In order to enforce |
43 | // capacity limits and handle back pressure, a secondary FIFO queue is used to |
44 | // send parked task handles. |
45 | // |
46 | // The general idea is that the channel is created with a `buffer` size of `n`. |
47 | // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" |
48 | // slot to hold a message. This allows `Sender` to know for a fact that a send |
49 | // will succeed *before* starting to do the actual work of sending the value. |
50 | // Since most of this work is lock-free, once the work starts, it is impossible |
51 | // to safely revert. |
52 | // |
53 | // If the sender is unable to process a send operation, then the current |
54 | // task is parked and the handle is sent on the parked task queue. |
55 | // |
56 | // Note that the implementation guarantees that the channel capacity will never |
57 | // exceed the configured limit, however there is no *strict* guarantee that the |
58 | // receiver will wake up a parked task *immediately* when a slot becomes |
59 | // available. However, it will almost always unpark a task when a slot becomes |
60 | // available and it is *guaranteed* that a sender will be unparked when the |
61 | // message that caused the sender to become parked is read out of the channel. |
62 | // |
63 | // The steps for sending a message are roughly: |
64 | // |
65 | // 1) Increment the channel message count |
66 | // 2) If the channel is at capacity, push the task handle onto the wait queue |
67 | // 3) Push the message onto the message queue. |
68 | // |
69 | // The steps for receiving a message are roughly: |
70 | // |
71 | // 1) Pop a message from the message queue |
72 | // 2) Pop a task handle from the wait queue |
73 | // 3) Decrement the channel message count. |
74 | // |
75 | // It's important for the order of operations on lock-free structures to happen |
76 | // in reverse order between the sender and receiver. This makes the message |
77 | // queue the primary coordination structure and establishes the necessary |
78 | // happens-before semantics required for the acquire / release semantics used |
79 | // by the queue structure. |
80 | |
81 | use futures_core::stream::{FusedStream, Stream}; |
82 | use futures_core::task::__internal::AtomicWaker; |
83 | use futures_core::task::{Context, Poll, Waker}; |
84 | use std::fmt; |
85 | use std::pin::Pin; |
86 | use std::sync::atomic::AtomicUsize; |
87 | use std::sync::atomic::Ordering::SeqCst; |
88 | use std::sync::{Arc, Mutex}; |
89 | use std::thread; |
90 | |
91 | use crate::mpsc::queue::Queue; |
92 | |
93 | mod queue; |
94 | #[cfg(feature = "sink")] |
95 | mod sink_impl; |
96 | |
97 | struct UnboundedSenderInner<T> { |
98 | // Channel state shared between the sender and receiver. |
99 | inner: Arc<UnboundedInner<T>>, |
100 | } |
101 | |
102 | struct BoundedSenderInner<T> { |
103 | // Channel state shared between the sender and receiver. |
104 | inner: Arc<BoundedInner<T>>, |
105 | |
106 | // Handle to the task that is blocked on this sender. This handle is sent |
107 | // to the receiver half in order to be notified when the sender becomes |
108 | // unblocked. |
109 | sender_task: Arc<Mutex<SenderTask>>, |
110 | |
111 | // `true` if the sender might be blocked. This is an optimization to avoid |
112 | // having to lock the mutex most of the time. |
113 | maybe_parked: bool, |
114 | } |
115 | |
116 | // We never project Pin<&mut SenderInner> to `Pin<&mut T>` |
117 | impl<T> Unpin for UnboundedSenderInner<T> {} |
118 | impl<T> Unpin for BoundedSenderInner<T> {} |
119 | |
120 | /// The transmission end of a bounded mpsc channel. |
121 | /// |
122 | /// This value is created by the [`channel`] function. |
123 | pub struct Sender<T>(Option<BoundedSenderInner<T>>); |
124 | |
125 | /// The transmission end of an unbounded mpsc channel. |
126 | /// |
127 | /// This value is created by the [`unbounded`] function. |
128 | pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>); |
129 | |
130 | #[allow(dead_code)] |
131 | trait AssertKinds: Send + Sync + Clone {} |
132 | impl AssertKinds for UnboundedSender<u32> {} |
133 | |
134 | /// The receiving end of a bounded mpsc channel. |
135 | /// |
136 | /// This value is created by the [`channel`] function. |
137 | pub struct Receiver<T> { |
138 | inner: Option<Arc<BoundedInner<T>>>, |
139 | } |
140 | |
141 | /// The receiving end of an unbounded mpsc channel. |
142 | /// |
143 | /// This value is created by the [`unbounded`] function. |
144 | pub struct UnboundedReceiver<T> { |
145 | inner: Option<Arc<UnboundedInner<T>>>, |
146 | } |
147 | |
148 | // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>` |
149 | impl<T> Unpin for UnboundedReceiver<T> {} |
150 | |
151 | /// The error type for [`Sender`s](Sender) used as `Sink`s. |
152 | #[derive(Clone, Debug, PartialEq, Eq)] |
153 | pub struct SendError { |
154 | kind: SendErrorKind, |
155 | } |
156 | |
157 | /// The error type returned from [`try_send`](Sender::try_send). |
158 | #[derive(Clone, PartialEq, Eq)] |
159 | pub struct TrySendError<T> { |
160 | err: SendError, |
161 | val: T, |
162 | } |
163 | |
164 | #[derive(Clone, Debug, PartialEq, Eq)] |
165 | enum SendErrorKind { |
166 | Full, |
167 | Disconnected, |
168 | } |
169 | |
170 | /// The error type returned from [`try_next`](Receiver::try_next). |
171 | pub struct TryRecvError { |
172 | _priv: (), |
173 | } |
174 | |
175 | impl fmt::Display for SendError { |
176 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
177 | if self.is_full() { |
178 | write!(f, "send failed because channel is full") |
179 | } else { |
180 | write!(f, "send failed because receiver is gone") |
181 | } |
182 | } |
183 | } |
184 | |
185 | impl std::error::Error for SendError {} |
186 | |
187 | impl SendError { |
188 | /// Returns `true` if this error is a result of the channel being full. |
189 | pub fn is_full(&self) -> bool { |
190 | match self.kind { |
191 | SendErrorKind::Full => true, |
192 | _ => false, |
193 | } |
194 | } |
195 | |
196 | /// Returns `true` if this error is a result of the receiver being dropped. |
197 | pub fn is_disconnected(&self) -> bool { |
198 | match self.kind { |
199 | SendErrorKind::Disconnected => true, |
200 | _ => false, |
201 | } |
202 | } |
203 | } |
204 | |
205 | impl<T> fmt::Debug for TrySendError<T> { |
206 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
207 | f.debug_struct("TrySendError").field(name: "kind", &self.err.kind).finish() |
208 | } |
209 | } |
210 | |
211 | impl<T> fmt::Display for TrySendError<T> { |
212 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
213 | if self.is_full() { |
214 | write!(f, "send failed because channel is full") |
215 | } else { |
216 | write!(f, "send failed because receiver is gone") |
217 | } |
218 | } |
219 | } |
220 | |
221 | impl<T: core::any::Any> std::error::Error for TrySendError<T> {} |
222 | |
223 | impl<T> TrySendError<T> { |
224 | /// Returns `true` if this error is a result of the channel being full. |
225 | pub fn is_full(&self) -> bool { |
226 | self.err.is_full() |
227 | } |
228 | |
229 | /// Returns `true` if this error is a result of the receiver being dropped. |
230 | pub fn is_disconnected(&self) -> bool { |
231 | self.err.is_disconnected() |
232 | } |
233 | |
234 | /// Returns the message that was attempted to be sent but failed. |
235 | pub fn into_inner(self) -> T { |
236 | self.val |
237 | } |
238 | |
239 | /// Drops the message and converts into a `SendError`. |
240 | pub fn into_send_error(self) -> SendError { |
241 | self.err |
242 | } |
243 | } |
244 | |
245 | impl fmt::Debug for TryRecvError { |
246 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
247 | f.debug_tuple(name:"TryRecvError").finish() |
248 | } |
249 | } |
250 | |
251 | impl fmt::Display for TryRecvError { |
252 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
253 | write!(f, "receiver channel is empty") |
254 | } |
255 | } |
256 | |
257 | impl std::error::Error for TryRecvError {} |
258 | |
259 | struct UnboundedInner<T> { |
260 | // Internal channel state. Consists of the number of messages stored in the |
261 | // channel as well as a flag signalling that the channel is closed. |
262 | state: AtomicUsize, |
263 | |
264 | // Atomic, FIFO queue used to send messages to the receiver |
265 | message_queue: Queue<T>, |
266 | |
267 | // Number of senders in existence |
268 | num_senders: AtomicUsize, |
269 | |
270 | // Handle to the receiver's task. |
271 | recv_task: AtomicWaker, |
272 | } |
273 | |
274 | struct BoundedInner<T> { |
275 | // Max buffer size of the channel. If `None` then the channel is unbounded. |
276 | buffer: usize, |
277 | |
278 | // Internal channel state. Consists of the number of messages stored in the |
279 | // channel as well as a flag signalling that the channel is closed. |
280 | state: AtomicUsize, |
281 | |
282 | // Atomic, FIFO queue used to send messages to the receiver |
283 | message_queue: Queue<T>, |
284 | |
285 | // Atomic, FIFO queue used to send parked task handles to the receiver. |
286 | parked_queue: Queue<Arc<Mutex<SenderTask>>>, |
287 | |
288 | // Number of senders in existence |
289 | num_senders: AtomicUsize, |
290 | |
291 | // Handle to the receiver's task. |
292 | recv_task: AtomicWaker, |
293 | } |
294 | |
295 | // Struct representation of `Inner::state`. |
296 | #[derive(Clone, Copy)] |
297 | struct State { |
298 | // `true` when the channel is open |
299 | is_open: bool, |
300 | |
301 | // Number of messages in the channel |
302 | num_messages: usize, |
303 | } |
304 | |
305 | // The `is_open` flag is stored in the left-most bit of `Inner::state` |
306 | const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1); |
307 | |
308 | // When a new channel is created, it is created in the open state with no |
309 | // pending messages. |
310 | const INIT_STATE: usize = OPEN_MASK; |
311 | |
312 | // The maximum number of messages that a channel can track is `usize::MAX >> 1` |
313 | const MAX_CAPACITY: usize = !(OPEN_MASK); |
314 | |
315 | // The maximum requested buffer size must be less than the maximum capacity of |
316 | // a channel. This is because each sender gets a guaranteed slot. |
317 | const MAX_BUFFER: usize = MAX_CAPACITY >> 1; |
318 | |
319 | // Sent to the consumer to wake up blocked producers |
320 | struct SenderTask { |
321 | task: Option<Waker>, |
322 | is_parked: bool, |
323 | } |
324 | |
325 | impl SenderTask { |
326 | fn new() -> Self { |
327 | Self { task: None, is_parked: false } |
328 | } |
329 | |
330 | fn notify(&mut self) { |
331 | self.is_parked = false; |
332 | |
333 | if let Some(task: Waker) = self.task.take() { |
334 | task.wake(); |
335 | } |
336 | } |
337 | } |
338 | |
339 | /// Creates a bounded mpsc channel for communicating between asynchronous tasks. |
340 | /// |
341 | /// Being bounded, this channel provides backpressure to ensure that the sender |
342 | /// outpaces the receiver by only a limited amount. The channel's capacity is |
343 | /// equal to `buffer + num-senders`. In other words, each sender gets a |
344 | /// guaranteed slot in the channel capacity, and on top of that there are |
345 | /// `buffer` "first come, first serve" slots available to all senders. |
346 | /// |
347 | /// The [`Receiver`] returned implements the [`Stream`] trait, while [`Sender`] |
348 | /// implements `Sink`. |
349 | pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { |
350 | // Check that the requested buffer size does not exceed the maximum buffer |
351 | // size permitted by the system. |
352 | assert!(buffer < MAX_BUFFER, "requested buffer size too large"); |
353 | |
354 | let inner: Arc |
355 | buffer, |
356 | state: AtomicUsize::new(INIT_STATE), |
357 | message_queue: Queue::new(), |
358 | parked_queue: Queue::new(), |
359 | num_senders: AtomicUsize::new(1), |
360 | recv_task: AtomicWaker::new(), |
361 | }); |
362 | |
363 | let tx: BoundedSenderInner |
364 | inner: inner.clone(), |
365 | sender_task: Arc::new(data:Mutex::new(SenderTask::new())), |
366 | maybe_parked: false, |
367 | }; |
368 | |
369 | let rx: Receiver |
370 | |
371 | (Sender(Some(tx)), rx) |
372 | } |
373 | |
374 | /// Creates an unbounded mpsc channel for communicating between asynchronous |
375 | /// tasks. |
376 | /// |
377 | /// A `send` on this channel will always succeed as long as the receive half has |
378 | /// not been closed. If the receiver falls behind, messages will be arbitrarily |
379 | /// buffered. |
380 | /// |
381 | /// **Note** that the amount of available system memory is an implicit bound to |
382 | /// the channel. Using an `unbounded` channel has the ability of causing the |
383 | /// process to run out of memory. In this case, the process will be aborted. |
384 | pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { |
385 | let inner: Arc |
386 | state: AtomicUsize::new(INIT_STATE), |
387 | message_queue: Queue::new(), |
388 | num_senders: AtomicUsize::new(1), |
389 | recv_task: AtomicWaker::new(), |
390 | }); |
391 | |
392 | let tx: UnboundedSenderInner |
393 | |
394 | let rx: UnboundedReceiver |
395 | |
396 | (UnboundedSender(Some(tx)), rx) |
397 | } |
398 | |
399 | /* |
400 | * |
401 | * ===== impl Sender ===== |
402 | * |
403 | */ |
404 | |
405 | impl<T> UnboundedSenderInner<T> { |
406 | fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> { |
407 | let state = decode_state(self.inner.state.load(SeqCst)); |
408 | if state.is_open { |
409 | Poll::Ready(Ok(())) |
410 | } else { |
411 | Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })) |
412 | } |
413 | } |
414 | |
415 | // Push message to the queue and signal to the receiver |
416 | fn queue_push_and_signal(&self, msg: T) { |
417 | // Push the message onto the message queue |
418 | self.inner.message_queue.push(msg); |
419 | |
420 | // Signal to the receiver that a message has been enqueued. If the |
421 | // receiver is parked, this will unpark the task. |
422 | self.inner.recv_task.wake(); |
423 | } |
424 | |
425 | // Increment the number of queued messages. Returns the resulting number. |
426 | fn inc_num_messages(&self) -> Option<usize> { |
427 | let mut curr = self.inner.state.load(SeqCst); |
428 | |
429 | loop { |
430 | let mut state = decode_state(curr); |
431 | |
432 | // The receiver end closed the channel. |
433 | if !state.is_open { |
434 | return None; |
435 | } |
436 | |
437 | // This probably is never hit? Odds are the process will run out of |
438 | // memory first. It may be worth to return something else in this |
439 | // case? |
440 | assert!( |
441 | state.num_messages < MAX_CAPACITY, |
442 | "buffer space \ |
443 | exhausted; sending this messages would overflow the state" |
444 | ); |
445 | |
446 | state.num_messages += 1; |
447 | |
448 | let next = encode_state(&state); |
449 | match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { |
450 | Ok(_) => return Some(state.num_messages), |
451 | Err(actual) => curr = actual, |
452 | } |
453 | } |
454 | } |
455 | |
456 | /// Returns whether the senders send to the same receiver. |
457 | fn same_receiver(&self, other: &Self) -> bool { |
458 | Arc::ptr_eq(&self.inner, &other.inner) |
459 | } |
460 | |
461 | /// Returns whether the sender send to this receiver. |
462 | fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool { |
463 | Arc::ptr_eq(&self.inner, inner) |
464 | } |
465 | |
466 | /// Returns pointer to the Arc containing sender |
467 | /// |
468 | /// The returned pointer is not referenced and should be only used for hashing! |
469 | fn ptr(&self) -> *const UnboundedInner<T> { |
470 | &*self.inner |
471 | } |
472 | |
473 | /// Returns whether this channel is closed without needing a context. |
474 | fn is_closed(&self) -> bool { |
475 | !decode_state(self.inner.state.load(SeqCst)).is_open |
476 | } |
477 | |
478 | /// Closes this channel from the sender side, preventing any new messages. |
479 | fn close_channel(&self) { |
480 | // There's no need to park this sender, its dropping, |
481 | // and we don't want to check for capacity, so skip |
482 | // that stuff from `do_send`. |
483 | |
484 | self.inner.set_closed(); |
485 | self.inner.recv_task.wake(); |
486 | } |
487 | } |
488 | |
489 | impl<T> BoundedSenderInner<T> { |
490 | /// Attempts to send a message on this `Sender`, returning the message |
491 | /// if there was an error. |
492 | fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
493 | // If the sender is currently blocked, reject the message |
494 | if !self.poll_unparked(None).is_ready() { |
495 | return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg }); |
496 | } |
497 | |
498 | // The channel has capacity to accept the message, so send it |
499 | self.do_send_b(msg) |
500 | } |
501 | |
502 | // Do the send without failing. |
503 | // Can be called only by bounded sender. |
504 | fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
505 | // Anyone calling do_send *should* make sure there is room first, |
506 | // but assert here for tests as a sanity check. |
507 | debug_assert!(self.poll_unparked(None).is_ready()); |
508 | |
509 | // First, increment the number of messages contained by the channel. |
510 | // This operation will also atomically determine if the sender task |
511 | // should be parked. |
512 | // |
513 | // `None` is returned in the case that the channel has been closed by the |
514 | // receiver. This happens when `Receiver::close` is called or the |
515 | // receiver is dropped. |
516 | let park_self = match self.inc_num_messages() { |
517 | Some(num_messages) => { |
518 | // Block if the current number of pending messages has exceeded |
519 | // the configured buffer size |
520 | num_messages > self.inner.buffer |
521 | } |
522 | None => { |
523 | return Err(TrySendError { |
524 | err: SendError { kind: SendErrorKind::Disconnected }, |
525 | val: msg, |
526 | }) |
527 | } |
528 | }; |
529 | |
530 | // If the channel has reached capacity, then the sender task needs to |
531 | // be parked. This will send the task handle on the parked task queue. |
532 | // |
533 | // However, when `do_send` is called while dropping the `Sender`, |
534 | // `task::current()` can't be called safely. In this case, in order to |
535 | // maintain internal consistency, a blank message is pushed onto the |
536 | // parked task queue. |
537 | if park_self { |
538 | self.park(); |
539 | } |
540 | |
541 | self.queue_push_and_signal(msg); |
542 | |
543 | Ok(()) |
544 | } |
545 | |
546 | // Push message to the queue and signal to the receiver |
547 | fn queue_push_and_signal(&self, msg: T) { |
548 | // Push the message onto the message queue |
549 | self.inner.message_queue.push(msg); |
550 | |
551 | // Signal to the receiver that a message has been enqueued. If the |
552 | // receiver is parked, this will unpark the task. |
553 | self.inner.recv_task.wake(); |
554 | } |
555 | |
556 | // Increment the number of queued messages. Returns the resulting number. |
557 | fn inc_num_messages(&self) -> Option<usize> { |
558 | let mut curr = self.inner.state.load(SeqCst); |
559 | |
560 | loop { |
561 | let mut state = decode_state(curr); |
562 | |
563 | // The receiver end closed the channel. |
564 | if !state.is_open { |
565 | return None; |
566 | } |
567 | |
568 | // This probably is never hit? Odds are the process will run out of |
569 | // memory first. It may be worth to return something else in this |
570 | // case? |
571 | assert!( |
572 | state.num_messages < MAX_CAPACITY, |
573 | "buffer space \ |
574 | exhausted; sending this messages would overflow the state" |
575 | ); |
576 | |
577 | state.num_messages += 1; |
578 | |
579 | let next = encode_state(&state); |
580 | match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { |
581 | Ok(_) => return Some(state.num_messages), |
582 | Err(actual) => curr = actual, |
583 | } |
584 | } |
585 | } |
586 | |
587 | fn park(&mut self) { |
588 | { |
589 | let mut sender = self.sender_task.lock().unwrap(); |
590 | sender.task = None; |
591 | sender.is_parked = true; |
592 | } |
593 | |
594 | // Send handle over queue |
595 | let t = self.sender_task.clone(); |
596 | self.inner.parked_queue.push(t); |
597 | |
598 | // Check to make sure we weren't closed after we sent our task on the |
599 | // queue |
600 | let state = decode_state(self.inner.state.load(SeqCst)); |
601 | self.maybe_parked = state.is_open; |
602 | } |
603 | |
604 | /// Polls the channel to determine if there is guaranteed capacity to send |
605 | /// at least one item without waiting. |
606 | /// |
607 | /// # Return value |
608 | /// |
609 | /// This method returns: |
610 | /// |
611 | /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; |
612 | /// - `Poll::Pending` if the channel may not have |
613 | /// capacity, in which case the current task is queued to be notified once |
614 | /// capacity is available; |
615 | /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. |
616 | fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
617 | let state = decode_state(self.inner.state.load(SeqCst)); |
618 | if !state.is_open { |
619 | return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected })); |
620 | } |
621 | |
622 | self.poll_unparked(Some(cx)).map(Ok) |
623 | } |
624 | |
625 | /// Returns whether the senders send to the same receiver. |
626 | fn same_receiver(&self, other: &Self) -> bool { |
627 | Arc::ptr_eq(&self.inner, &other.inner) |
628 | } |
629 | |
630 | /// Returns whether the sender send to this receiver. |
631 | fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool { |
632 | Arc::ptr_eq(&self.inner, receiver) |
633 | } |
634 | |
635 | /// Returns pointer to the Arc containing sender |
636 | /// |
637 | /// The returned pointer is not referenced and should be only used for hashing! |
638 | fn ptr(&self) -> *const BoundedInner<T> { |
639 | &*self.inner |
640 | } |
641 | |
642 | /// Returns whether this channel is closed without needing a context. |
643 | fn is_closed(&self) -> bool { |
644 | !decode_state(self.inner.state.load(SeqCst)).is_open |
645 | } |
646 | |
647 | /// Closes this channel from the sender side, preventing any new messages. |
648 | fn close_channel(&self) { |
649 | // There's no need to park this sender, its dropping, |
650 | // and we don't want to check for capacity, so skip |
651 | // that stuff from `do_send`. |
652 | |
653 | self.inner.set_closed(); |
654 | self.inner.recv_task.wake(); |
655 | } |
656 | |
657 | fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> { |
658 | // First check the `maybe_parked` variable. This avoids acquiring the |
659 | // lock in most cases |
660 | if self.maybe_parked { |
661 | // Get a lock on the task handle |
662 | let mut task = self.sender_task.lock().unwrap(); |
663 | |
664 | if !task.is_parked { |
665 | self.maybe_parked = false; |
666 | return Poll::Ready(()); |
667 | } |
668 | |
669 | // At this point, an unpark request is pending, so there will be an |
670 | // unpark sometime in the future. We just need to make sure that |
671 | // the correct task will be notified. |
672 | // |
673 | // Update the task in case the `Sender` has been moved to another |
674 | // task |
675 | task.task = cx.map(|cx| cx.waker().clone()); |
676 | |
677 | Poll::Pending |
678 | } else { |
679 | Poll::Ready(()) |
680 | } |
681 | } |
682 | } |
683 | |
684 | impl<T> Sender<T> { |
685 | /// Attempts to send a message on this `Sender`, returning the message |
686 | /// if there was an error. |
687 | pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { |
688 | if let Some(inner) = &mut self.0 { |
689 | inner.try_send(msg) |
690 | } else { |
691 | Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) |
692 | } |
693 | } |
694 | |
695 | /// Send a message on the channel. |
696 | /// |
697 | /// This function should only be called after |
698 | /// [`poll_ready`](Sender::poll_ready) has reported that the channel is |
699 | /// ready to receive a message. |
700 | pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { |
701 | self.try_send(msg).map_err(|e| e.err) |
702 | } |
703 | |
704 | /// Polls the channel to determine if there is guaranteed capacity to send |
705 | /// at least one item without waiting. |
706 | /// |
707 | /// # Return value |
708 | /// |
709 | /// This method returns: |
710 | /// |
711 | /// - `Poll::Ready(Ok(_))` if there is sufficient capacity; |
712 | /// - `Poll::Pending` if the channel may not have |
713 | /// capacity, in which case the current task is queued to be notified once |
714 | /// capacity is available; |
715 | /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. |
716 | pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
717 | let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?; |
718 | inner.poll_ready(cx) |
719 | } |
720 | |
721 | /// Returns whether this channel is closed without needing a context. |
722 | pub fn is_closed(&self) -> bool { |
723 | self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true) |
724 | } |
725 | |
726 | /// Closes this channel from the sender side, preventing any new messages. |
727 | pub fn close_channel(&mut self) { |
728 | if let Some(inner) = &mut self.0 { |
729 | inner.close_channel(); |
730 | } |
731 | } |
732 | |
733 | /// Disconnects this sender from the channel, closing it if there are no more senders left. |
734 | pub fn disconnect(&mut self) { |
735 | self.0 = None; |
736 | } |
737 | |
738 | /// Returns whether the senders send to the same receiver. |
739 | pub fn same_receiver(&self, other: &Self) -> bool { |
740 | match (&self.0, &other.0) { |
741 | (Some(inner), Some(other)) => inner.same_receiver(other), |
742 | _ => false, |
743 | } |
744 | } |
745 | |
746 | /// Returns whether the sender send to this receiver. |
747 | pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool { |
748 | match (&self.0, &receiver.inner) { |
749 | (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), |
750 | _ => false, |
751 | } |
752 | } |
753 | |
754 | /// Hashes the receiver into the provided hasher |
755 | pub fn hash_receiver<H>(&self, hasher: &mut H) |
756 | where |
757 | H: std::hash::Hasher, |
758 | { |
759 | use std::hash::Hash; |
760 | |
761 | let ptr = self.0.as_ref().map(|inner| inner.ptr()); |
762 | ptr.hash(hasher); |
763 | } |
764 | } |
765 | |
766 | impl<T> UnboundedSender<T> { |
767 | /// Check if the channel is ready to receive a message. |
768 | pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> { |
769 | let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?; |
770 | inner.poll_ready_nb() |
771 | } |
772 | |
773 | /// Returns whether this channel is closed without needing a context. |
774 | pub fn is_closed(&self) -> bool { |
775 | self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true) |
776 | } |
777 | |
778 | /// Closes this channel from the sender side, preventing any new messages. |
779 | pub fn close_channel(&self) { |
780 | if let Some(inner) = &self.0 { |
781 | inner.close_channel(); |
782 | } |
783 | } |
784 | |
785 | /// Disconnects this sender from the channel, closing it if there are no more senders left. |
786 | pub fn disconnect(&mut self) { |
787 | self.0 = None; |
788 | } |
789 | |
790 | // Do the send without parking current task. |
791 | fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> { |
792 | if let Some(inner) = &self.0 { |
793 | if inner.inc_num_messages().is_some() { |
794 | inner.queue_push_and_signal(msg); |
795 | return Ok(()); |
796 | } |
797 | } |
798 | |
799 | Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg }) |
800 | } |
801 | |
802 | /// Send a message on the channel. |
803 | /// |
804 | /// This method should only be called after `poll_ready` has been used to |
805 | /// verify that the channel is ready to receive a message. |
806 | pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { |
807 | self.do_send_nb(msg).map_err(|e| e.err) |
808 | } |
809 | |
810 | /// Sends a message along this channel. |
811 | /// |
812 | /// This is an unbounded sender, so this function differs from `Sink::send` |
813 | /// by ensuring the return type reflects that the channel is always ready to |
814 | /// receive messages. |
815 | pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
816 | self.do_send_nb(msg) |
817 | } |
818 | |
819 | /// Returns whether the senders send to the same receiver. |
820 | pub fn same_receiver(&self, other: &Self) -> bool { |
821 | match (&self.0, &other.0) { |
822 | (Some(inner), Some(other)) => inner.same_receiver(other), |
823 | _ => false, |
824 | } |
825 | } |
826 | |
827 | /// Returns whether the sender send to this receiver. |
828 | pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool { |
829 | match (&self.0, &receiver.inner) { |
830 | (Some(inner), Some(receiver)) => inner.is_connected_to(receiver), |
831 | _ => false, |
832 | } |
833 | } |
834 | |
835 | /// Hashes the receiver into the provided hasher |
836 | pub fn hash_receiver<H>(&self, hasher: &mut H) |
837 | where |
838 | H: std::hash::Hasher, |
839 | { |
840 | use std::hash::Hash; |
841 | |
842 | let ptr = self.0.as_ref().map(|inner| inner.ptr()); |
843 | ptr.hash(hasher); |
844 | } |
845 | |
846 | /// Return the number of messages in the queue or 0 if channel is disconnected. |
847 | pub fn len(&self) -> usize { |
848 | if let Some(sender) = &self.0 { |
849 | decode_state(sender.inner.state.load(SeqCst)).num_messages |
850 | } else { |
851 | 0 |
852 | } |
853 | } |
854 | |
855 | /// Return false is channel has no queued messages, true otherwise. |
856 | pub fn is_empty(&self) -> bool { |
857 | self.len() == 0 |
858 | } |
859 | } |
860 | |
861 | impl<T> Clone for Sender<T> { |
862 | fn clone(&self) -> Self { |
863 | Self(self.0.clone()) |
864 | } |
865 | } |
866 | |
867 | impl<T> Clone for UnboundedSender<T> { |
868 | fn clone(&self) -> Self { |
869 | Self(self.0.clone()) |
870 | } |
871 | } |
872 | |
873 | impl<T> Clone for UnboundedSenderInner<T> { |
874 | fn clone(&self) -> Self { |
875 | // Since this atomic op isn't actually guarding any memory and we don't |
876 | // care about any orderings besides the ordering on the single atomic |
877 | // variable, a relaxed ordering is acceptable. |
878 | let mut curr = self.inner.num_senders.load(SeqCst); |
879 | |
880 | loop { |
881 | // If the maximum number of senders has been reached, then fail |
882 | if curr == MAX_BUFFER { |
883 | panic!("cannot clone `Sender` -- too many outstanding senders"); |
884 | } |
885 | |
886 | debug_assert!(curr < MAX_BUFFER); |
887 | |
888 | let next = curr + 1; |
889 | match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { |
890 | Ok(_) => { |
891 | // The ABA problem doesn't matter here. We only care that the |
892 | // number of senders never exceeds the maximum. |
893 | return Self { inner: self.inner.clone() }; |
894 | } |
895 | Err(actual) => curr = actual, |
896 | } |
897 | } |
898 | } |
899 | } |
900 | |
901 | impl<T> Clone for BoundedSenderInner<T> { |
902 | fn clone(&self) -> Self { |
903 | // Since this atomic op isn't actually guarding any memory and we don't |
904 | // care about any orderings besides the ordering on the single atomic |
905 | // variable, a relaxed ordering is acceptable. |
906 | let mut curr = self.inner.num_senders.load(SeqCst); |
907 | |
908 | loop { |
909 | // If the maximum number of senders has been reached, then fail |
910 | if curr == self.inner.max_senders() { |
911 | panic!("cannot clone `Sender` -- too many outstanding senders"); |
912 | } |
913 | |
914 | debug_assert!(curr < self.inner.max_senders()); |
915 | |
916 | let next = curr + 1; |
917 | match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) { |
918 | Ok(_) => { |
919 | // The ABA problem doesn't matter here. We only care that the |
920 | // number of senders never exceeds the maximum. |
921 | return Self { |
922 | inner: self.inner.clone(), |
923 | sender_task: Arc::new(Mutex::new(SenderTask::new())), |
924 | maybe_parked: false, |
925 | }; |
926 | } |
927 | Err(actual) => curr = actual, |
928 | } |
929 | } |
930 | } |
931 | } |
932 | |
933 | impl<T> Drop for UnboundedSenderInner<T> { |
934 | fn drop(&mut self) { |
935 | // Ordering between variables don't matter here |
936 | let prev: usize = self.inner.num_senders.fetch_sub(val:1, order:SeqCst); |
937 | |
938 | if prev == 1 { |
939 | self.close_channel(); |
940 | } |
941 | } |
942 | } |
943 | |
944 | impl<T> Drop for BoundedSenderInner<T> { |
945 | fn drop(&mut self) { |
946 | // Ordering between variables don't matter here |
947 | let prev: usize = self.inner.num_senders.fetch_sub(val:1, order:SeqCst); |
948 | |
949 | if prev == 1 { |
950 | self.close_channel(); |
951 | } |
952 | } |
953 | } |
954 | |
955 | impl<T> fmt::Debug for Sender<T> { |
956 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
957 | f.debug_struct("Sender").field(name: "closed", &self.is_closed()).finish() |
958 | } |
959 | } |
960 | |
961 | impl<T> fmt::Debug for UnboundedSender<T> { |
962 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
963 | f.debug_struct("UnboundedSender").field(name: "closed", &self.is_closed()).finish() |
964 | } |
965 | } |
966 | |
967 | /* |
968 | * |
969 | * ===== impl Receiver ===== |
970 | * |
971 | */ |
972 | |
973 | impl<T> Receiver<T> { |
974 | /// Closes the receiving half of a channel, without dropping it. |
975 | /// |
976 | /// This prevents any further messages from being sent on the channel while |
977 | /// still enabling the receiver to drain messages that are buffered. |
978 | pub fn close(&mut self) { |
979 | if let Some(inner) = &mut self.inner { |
980 | inner.set_closed(); |
981 | |
982 | // Wake up any threads waiting as they'll see that we've closed the |
983 | // channel and will continue on their merry way. |
984 | while let Some(task) = unsafe { inner.parked_queue.pop_spin() } { |
985 | task.lock().unwrap().notify(); |
986 | } |
987 | } |
988 | } |
989 | |
990 | /// Tries to receive the next message without notifying a context if empty. |
991 | /// |
992 | /// It is not recommended to call this function from inside of a future, |
993 | /// only when you've otherwise arranged to be notified when the channel is |
994 | /// no longer empty. |
995 | /// |
996 | /// This function returns: |
997 | /// * `Ok(Some(t))` when message is fetched |
998 | /// * `Ok(None)` when channel is closed and no messages left in the queue |
999 | /// * `Err(e)` when there are no messages available, but channel is not yet closed |
1000 | pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { |
1001 | match self.next_message() { |
1002 | Poll::Ready(msg) => Ok(msg), |
1003 | Poll::Pending => Err(TryRecvError { _priv: () }), |
1004 | } |
1005 | } |
1006 | |
1007 | fn next_message(&mut self) -> Poll<Option<T>> { |
1008 | let inner = match self.inner.as_mut() { |
1009 | None => return Poll::Ready(None), |
1010 | Some(inner) => inner, |
1011 | }; |
1012 | // Pop off a message |
1013 | match unsafe { inner.message_queue.pop_spin() } { |
1014 | Some(msg) => { |
1015 | // If there are any parked task handles in the parked queue, |
1016 | // pop one and unpark it. |
1017 | self.unpark_one(); |
1018 | |
1019 | // Decrement number of messages |
1020 | self.dec_num_messages(); |
1021 | |
1022 | Poll::Ready(Some(msg)) |
1023 | } |
1024 | None => { |
1025 | let state = decode_state(inner.state.load(SeqCst)); |
1026 | if state.is_closed() { |
1027 | // If closed flag is set AND there are no pending messages |
1028 | // it means end of stream |
1029 | self.inner = None; |
1030 | Poll::Ready(None) |
1031 | } else { |
1032 | // If queue is open, we need to return Pending |
1033 | // to be woken up when new messages arrive. |
1034 | // If queue is closed but num_messages is non-zero, |
1035 | // it means that senders updated the state, |
1036 | // but didn't put message to queue yet, |
1037 | // so we need to park until sender unparks the task |
1038 | // after queueing the message. |
1039 | Poll::Pending |
1040 | } |
1041 | } |
1042 | } |
1043 | } |
1044 | |
1045 | // Unpark a single task handle if there is one pending in the parked queue |
1046 | fn unpark_one(&mut self) { |
1047 | if let Some(inner) = &mut self.inner { |
1048 | if let Some(task) = unsafe { inner.parked_queue.pop_spin() } { |
1049 | task.lock().unwrap().notify(); |
1050 | } |
1051 | } |
1052 | } |
1053 | |
1054 | fn dec_num_messages(&self) { |
1055 | if let Some(inner) = &self.inner { |
1056 | // OPEN_MASK is highest bit, so it's unaffected by subtraction |
1057 | // unless there's underflow, and we know there's no underflow |
1058 | // because number of messages at this point is always > 0. |
1059 | inner.state.fetch_sub(1, SeqCst); |
1060 | } |
1061 | } |
1062 | } |
1063 | |
1064 | // The receiver does not ever take a Pin to the inner T |
1065 | impl<T> Unpin for Receiver<T> {} |
1066 | |
1067 | impl<T> FusedStream for Receiver<T> { |
1068 | fn is_terminated(&self) -> bool { |
1069 | self.inner.is_none() |
1070 | } |
1071 | } |
1072 | |
1073 | impl<T> Stream for Receiver<T> { |
1074 | type Item = T; |
1075 | |
1076 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
1077 | // Try to read a message off of the message queue. |
1078 | match self.next_message() { |
1079 | Poll::Ready(msg) => { |
1080 | if msg.is_none() { |
1081 | self.inner = None; |
1082 | } |
1083 | Poll::Ready(msg) |
1084 | } |
1085 | Poll::Pending => { |
1086 | // There are no messages to read, in this case, park. |
1087 | self.inner.as_ref().unwrap().recv_task.register(cx.waker()); |
1088 | // Check queue again after parking to prevent race condition: |
1089 | // a message could be added to the queue after previous `next_message` |
1090 | // before `register` call. |
1091 | self.next_message() |
1092 | } |
1093 | } |
1094 | } |
1095 | |
1096 | fn size_hint(&self) -> (usize, Option<usize>) { |
1097 | if let Some(inner) = &self.inner { |
1098 | decode_state(inner.state.load(SeqCst)).size_hint() |
1099 | } else { |
1100 | (0, Some(0)) |
1101 | } |
1102 | } |
1103 | } |
1104 | |
1105 | impl<T> Drop for Receiver<T> { |
1106 | fn drop(&mut self) { |
1107 | // Drain the channel of all pending messages |
1108 | self.close(); |
1109 | if self.inner.is_some() { |
1110 | loop { |
1111 | match self.next_message() { |
1112 | Poll::Ready(Some(_)) => {} |
1113 | Poll::Ready(None) => break, |
1114 | Poll::Pending => { |
1115 | let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); |
1116 | |
1117 | // If the channel is closed, then there is no need to park. |
1118 | if state.is_closed() { |
1119 | break; |
1120 | } |
1121 | |
1122 | // TODO: Spinning isn't ideal, it might be worth |
1123 | // investigating using a condvar or some other strategy |
1124 | // here. That said, if this case is hit, then another thread |
1125 | // is about to push the value into the queue and this isn't |
1126 | // the only spinlock in the impl right now. |
1127 | thread::yield_now(); |
1128 | } |
1129 | } |
1130 | } |
1131 | } |
1132 | } |
1133 | } |
1134 | |
1135 | impl<T> fmt::Debug for Receiver<T> { |
1136 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1137 | let closed: bool = if let Some(ref inner: &Arc |
1138 | decode_state(num:inner.state.load(order:SeqCst)).is_closed() |
1139 | } else { |
1140 | false |
1141 | }; |
1142 | |
1143 | f.debug_struct("Receiver").field(name: "closed", &closed).finish() |
1144 | } |
1145 | } |
1146 | |
1147 | impl<T> UnboundedReceiver<T> { |
1148 | /// Closes the receiving half of a channel, without dropping it. |
1149 | /// |
1150 | /// This prevents any further messages from being sent on the channel while |
1151 | /// still enabling the receiver to drain messages that are buffered. |
1152 | pub fn close(&mut self) { |
1153 | if let Some(inner) = &mut self.inner { |
1154 | inner.set_closed(); |
1155 | } |
1156 | } |
1157 | |
1158 | /// Tries to receive the next message without notifying a context if empty. |
1159 | /// |
1160 | /// It is not recommended to call this function from inside of a future, |
1161 | /// only when you've otherwise arranged to be notified when the channel is |
1162 | /// no longer empty. |
1163 | /// |
1164 | /// This function returns: |
1165 | /// * `Ok(Some(t))` when message is fetched |
1166 | /// * `Ok(None)` when channel is closed and no messages left in the queue |
1167 | /// * `Err(e)` when there are no messages available, but channel is not yet closed |
1168 | pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { |
1169 | match self.next_message() { |
1170 | Poll::Ready(msg) => Ok(msg), |
1171 | Poll::Pending => Err(TryRecvError { _priv: () }), |
1172 | } |
1173 | } |
1174 | |
1175 | fn next_message(&mut self) -> Poll<Option<T>> { |
1176 | let inner = match self.inner.as_mut() { |
1177 | None => return Poll::Ready(None), |
1178 | Some(inner) => inner, |
1179 | }; |
1180 | // Pop off a message |
1181 | match unsafe { inner.message_queue.pop_spin() } { |
1182 | Some(msg) => { |
1183 | // Decrement number of messages |
1184 | self.dec_num_messages(); |
1185 | |
1186 | Poll::Ready(Some(msg)) |
1187 | } |
1188 | None => { |
1189 | let state = decode_state(inner.state.load(SeqCst)); |
1190 | if state.is_closed() { |
1191 | // If closed flag is set AND there are no pending messages |
1192 | // it means end of stream |
1193 | self.inner = None; |
1194 | Poll::Ready(None) |
1195 | } else { |
1196 | // If queue is open, we need to return Pending |
1197 | // to be woken up when new messages arrive. |
1198 | // If queue is closed but num_messages is non-zero, |
1199 | // it means that senders updated the state, |
1200 | // but didn't put message to queue yet, |
1201 | // so we need to park until sender unparks the task |
1202 | // after queueing the message. |
1203 | Poll::Pending |
1204 | } |
1205 | } |
1206 | } |
1207 | } |
1208 | |
1209 | fn dec_num_messages(&self) { |
1210 | if let Some(inner) = &self.inner { |
1211 | // OPEN_MASK is highest bit, so it's unaffected by subtraction |
1212 | // unless there's underflow, and we know there's no underflow |
1213 | // because number of messages at this point is always > 0. |
1214 | inner.state.fetch_sub(1, SeqCst); |
1215 | } |
1216 | } |
1217 | } |
1218 | |
1219 | impl<T> FusedStream for UnboundedReceiver<T> { |
1220 | fn is_terminated(&self) -> bool { |
1221 | self.inner.is_none() |
1222 | } |
1223 | } |
1224 | |
1225 | impl<T> Stream for UnboundedReceiver<T> { |
1226 | type Item = T; |
1227 | |
1228 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { |
1229 | // Try to read a message off of the message queue. |
1230 | match self.next_message() { |
1231 | Poll::Ready(msg) => { |
1232 | if msg.is_none() { |
1233 | self.inner = None; |
1234 | } |
1235 | Poll::Ready(msg) |
1236 | } |
1237 | Poll::Pending => { |
1238 | // There are no messages to read, in this case, park. |
1239 | self.inner.as_ref().unwrap().recv_task.register(cx.waker()); |
1240 | // Check queue again after parking to prevent race condition: |
1241 | // a message could be added to the queue after previous `next_message` |
1242 | // before `register` call. |
1243 | self.next_message() |
1244 | } |
1245 | } |
1246 | } |
1247 | |
1248 | fn size_hint(&self) -> (usize, Option<usize>) { |
1249 | if let Some(inner) = &self.inner { |
1250 | decode_state(inner.state.load(SeqCst)).size_hint() |
1251 | } else { |
1252 | (0, Some(0)) |
1253 | } |
1254 | } |
1255 | } |
1256 | |
1257 | impl<T> Drop for UnboundedReceiver<T> { |
1258 | fn drop(&mut self) { |
1259 | // Drain the channel of all pending messages |
1260 | self.close(); |
1261 | if self.inner.is_some() { |
1262 | loop { |
1263 | match self.next_message() { |
1264 | Poll::Ready(Some(_)) => {} |
1265 | Poll::Ready(None) => break, |
1266 | Poll::Pending => { |
1267 | let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst)); |
1268 | |
1269 | // If the channel is closed, then there is no need to park. |
1270 | if state.is_closed() { |
1271 | break; |
1272 | } |
1273 | |
1274 | // TODO: Spinning isn't ideal, it might be worth |
1275 | // investigating using a condvar or some other strategy |
1276 | // here. That said, if this case is hit, then another thread |
1277 | // is about to push the value into the queue and this isn't |
1278 | // the only spinlock in the impl right now. |
1279 | thread::yield_now(); |
1280 | } |
1281 | } |
1282 | } |
1283 | } |
1284 | } |
1285 | } |
1286 | |
1287 | impl<T> fmt::Debug for UnboundedReceiver<T> { |
1288 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1289 | let closed: bool = if let Some(ref inner: &Arc |
1290 | decode_state(num:inner.state.load(order:SeqCst)).is_closed() |
1291 | } else { |
1292 | false |
1293 | }; |
1294 | |
1295 | f.debug_struct("Receiver").field(name: "closed", &closed).finish() |
1296 | } |
1297 | } |
1298 | |
1299 | /* |
1300 | * |
1301 | * ===== impl Inner ===== |
1302 | * |
1303 | */ |
1304 | |
1305 | impl<T> UnboundedInner<T> { |
1306 | // Clear `open` flag in the state, keep `num_messages` intact. |
1307 | fn set_closed(&self) { |
1308 | let curr: usize = self.state.load(order:SeqCst); |
1309 | if !decode_state(num:curr).is_open { |
1310 | return; |
1311 | } |
1312 | |
1313 | self.state.fetch_and(!OPEN_MASK, order:SeqCst); |
1314 | } |
1315 | } |
1316 | |
1317 | impl<T> BoundedInner<T> { |
1318 | // The return value is such that the total number of messages that can be |
1319 | // enqueued into the channel will never exceed MAX_CAPACITY |
1320 | fn max_senders(&self) -> usize { |
1321 | MAX_CAPACITY - self.buffer |
1322 | } |
1323 | |
1324 | // Clear `open` flag in the state, keep `num_messages` intact. |
1325 | fn set_closed(&self) { |
1326 | let curr: usize = self.state.load(order:SeqCst); |
1327 | if !decode_state(num:curr).is_open { |
1328 | return; |
1329 | } |
1330 | |
1331 | self.state.fetch_and(!OPEN_MASK, order:SeqCst); |
1332 | } |
1333 | } |
1334 | |
1335 | unsafe impl<T: Send> Send for UnboundedInner<T> {} |
1336 | unsafe impl<T: Send> Sync for UnboundedInner<T> {} |
1337 | |
1338 | unsafe impl<T: Send> Send for BoundedInner<T> {} |
1339 | unsafe impl<T: Send> Sync for BoundedInner<T> {} |
1340 | |
1341 | impl State { |
1342 | fn is_closed(&self) -> bool { |
1343 | !self.is_open && self.num_messages == 0 |
1344 | } |
1345 | |
1346 | fn size_hint(&self) -> (usize, Option<usize>) { |
1347 | if self.is_open { |
1348 | (self.num_messages, None) |
1349 | } else { |
1350 | (self.num_messages, Some(self.num_messages)) |
1351 | } |
1352 | } |
1353 | } |
1354 | |
1355 | /* |
1356 | * |
1357 | * ===== Helpers ===== |
1358 | * |
1359 | */ |
1360 | |
1361 | fn decode_state(num: usize) -> State { |
1362 | State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY } |
1363 | } |
1364 | |
1365 | fn encode_state(state: &State) -> usize { |
1366 | let mut num: usize = state.num_messages; |
1367 | |
1368 | if state.is_open { |
1369 | num |= OPEN_MASK; |
1370 | } |
1371 | |
1372 | num |
1373 | } |
1374 |
Definitions
- UnboundedSenderInner
- inner
- BoundedSenderInner
- inner
- sender_task
- maybe_parked
- Sender
- UnboundedSender
- AssertKinds
- Receiver
- inner
- UnboundedReceiver
- inner
- SendError
- kind
- TrySendError
- err
- val
- SendErrorKind
- Full
- Disconnected
- TryRecvError
- _priv
- fmt
- is_full
- is_disconnected
- fmt
- fmt
- is_full
- is_disconnected
- into_inner
- into_send_error
- fmt
- fmt
- UnboundedInner
- state
- message_queue
- num_senders
- recv_task
- BoundedInner
- buffer
- state
- message_queue
- parked_queue
- num_senders
- recv_task
- State
- is_open
- num_messages
- SenderTask
- task
- is_parked
- new
- notify
- channel
- unbounded
- poll_ready_nb
- queue_push_and_signal
- inc_num_messages
- same_receiver
- is_connected_to
- ptr
- is_closed
- close_channel
- try_send
- do_send_b
- queue_push_and_signal
- inc_num_messages
- park
- poll_ready
- same_receiver
- is_connected_to
- ptr
- is_closed
- close_channel
- poll_unparked
- try_send
- start_send
- poll_ready
- is_closed
- close_channel
- disconnect
- same_receiver
- is_connected_to
- hash_receiver
- poll_ready
- is_closed
- close_channel
- disconnect
- do_send_nb
- start_send
- unbounded_send
- same_receiver
- is_connected_to
- hash_receiver
- len
- is_empty
- clone
- clone
- clone
- clone
- drop
- drop
- fmt
- fmt
- close
- try_next
- next_message
- unpark_one
- dec_num_messages
- is_terminated
- Item
- poll_next
- size_hint
- drop
- fmt
- close
- try_next
- next_message
- dec_num_messages
- is_terminated
- Item
- poll_next
- size_hint
- drop
- fmt
- set_closed
- max_senders
- set_closed
- is_closed
- size_hint
- decode_state
Learn Rust with the experts
Find out more