1use crate::loom::cell::UnsafeCell;
2use crate::loom::future::AtomicWaker;
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::loom::sync::Arc;
5use crate::runtime::park::CachedParkThread;
6use crate::sync::mpsc::error::TryRecvError;
7use crate::sync::mpsc::{bounded, list, unbounded};
8use crate::sync::notify::Notify;
9use crate::util::cacheline::CachePadded;
10
11use std::fmt;
12use std::process;
13use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
14use std::task::Poll::{Pending, Ready};
15use std::task::{Context, Poll};
16
17/// Channel sender.
18pub(crate) struct Tx<T, S> {
19 inner: Arc<Chan<T, S>>,
20}
21
22impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
23 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
24 fmt.debug_struct("Tx").field("inner", &self.inner).finish()
25 }
26}
27
28/// Channel receiver.
29pub(crate) struct Rx<T, S: Semaphore> {
30 inner: Arc<Chan<T, S>>,
31}
32
33impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> {
34 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
35 fmt.debug_struct("Rx").field("inner", &self.inner).finish()
36 }
37}
38
39pub(crate) trait Semaphore {
40 fn is_idle(&self) -> bool;
41
42 fn add_permit(&self);
43
44 fn add_permits(&self, n: usize);
45
46 fn close(&self);
47
48 fn is_closed(&self) -> bool;
49}
50
51pub(super) struct Chan<T, S> {
52 /// Handle to the push half of the lock-free list.
53 tx: CachePadded<list::Tx<T>>,
54
55 /// Receiver waker. Notified when a value is pushed into the channel.
56 rx_waker: CachePadded<AtomicWaker>,
57
58 /// Notifies all tasks listening for the receiver being dropped.
59 notify_rx_closed: Notify,
60
61 /// Coordinates access to channel's capacity.
62 semaphore: S,
63
64 /// Tracks the number of outstanding sender handles.
65 ///
66 /// When this drops to zero, the send half of the channel is closed.
67 tx_count: AtomicUsize,
68
69 /// Only accessed by `Rx` handle.
70 rx_fields: UnsafeCell<RxFields<T>>,
71}
72
73impl<T, S> fmt::Debug for Chan<T, S>
74where
75 S: fmt::Debug,
76{
77 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
78 fmt.debug_struct("Chan")
79 .field("tx", &*self.tx)
80 .field("semaphore", &self.semaphore)
81 .field("rx_waker", &*self.rx_waker)
82 .field("tx_count", &self.tx_count)
83 .field("rx_fields", &"...")
84 .finish()
85 }
86}
87
88/// Fields only accessed by `Rx` handle.
89struct RxFields<T> {
90 /// Channel receiver. This field is only accessed by the `Receiver` type.
91 list: list::Rx<T>,
92
93 /// `true` if `Rx::close` is called.
94 rx_closed: bool,
95}
96
97impl<T> fmt::Debug for RxFields<T> {
98 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
99 fmt.debug_struct("RxFields")
100 .field("list", &self.list)
101 .field("rx_closed", &self.rx_closed)
102 .finish()
103 }
104}
105
106unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
107unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
108
109pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
110 let (tx, rx) = list::channel();
111
112 let chan = Arc::new(Chan {
113 notify_rx_closed: Notify::new(),
114 tx: CachePadded::new(tx),
115 semaphore,
116 rx_waker: CachePadded::new(AtomicWaker::new()),
117 tx_count: AtomicUsize::new(1),
118 rx_fields: UnsafeCell::new(RxFields {
119 list: rx,
120 rx_closed: false,
121 }),
122 });
123
124 (Tx::new(chan.clone()), Rx::new(chan))
125}
126
127// ===== impl Tx =====
128
129impl<T, S> Tx<T, S> {
130 fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
131 Tx { inner: chan }
132 }
133
134 pub(super) fn downgrade(&self) -> Arc<Chan<T, S>> {
135 self.inner.clone()
136 }
137
138 // Returns the upgraded channel or None if the upgrade failed.
139 pub(super) fn upgrade(chan: Arc<Chan<T, S>>) -> Option<Self> {
140 let mut tx_count = chan.tx_count.load(Acquire);
141
142 loop {
143 if tx_count == 0 {
144 // channel is closed
145 return None;
146 }
147
148 match chan
149 .tx_count
150 .compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire)
151 {
152 Ok(_) => return Some(Tx { inner: chan }),
153 Err(prev_count) => tx_count = prev_count,
154 }
155 }
156 }
157
158 pub(super) fn semaphore(&self) -> &S {
159 &self.inner.semaphore
160 }
161
162 /// Send a message and notify the receiver.
163 pub(crate) fn send(&self, value: T) {
164 self.inner.send(value);
165 }
166
167 /// Wake the receive half
168 pub(crate) fn wake_rx(&self) {
169 self.inner.rx_waker.wake();
170 }
171
172 /// Returns `true` if senders belong to the same channel.
173 pub(crate) fn same_channel(&self, other: &Self) -> bool {
174 Arc::ptr_eq(&self.inner, &other.inner)
175 }
176}
177
178impl<T, S: Semaphore> Tx<T, S> {
179 pub(crate) fn is_closed(&self) -> bool {
180 self.inner.semaphore.is_closed()
181 }
182
183 pub(crate) async fn closed(&self) {
184 // In order to avoid a race condition, we first request a notification,
185 // **then** check whether the semaphore is closed. If the semaphore is
186 // closed the notification request is dropped.
187 let notified = self.inner.notify_rx_closed.notified();
188
189 if self.inner.semaphore.is_closed() {
190 return;
191 }
192 notified.await;
193 }
194}
195
196impl<T, S> Clone for Tx<T, S> {
197 fn clone(&self) -> Tx<T, S> {
198 // Using a Relaxed ordering here is sufficient as the caller holds a
199 // strong ref to `self`, preventing a concurrent decrement to zero.
200 self.inner.tx_count.fetch_add(1, Relaxed);
201
202 Tx {
203 inner: self.inner.clone(),
204 }
205 }
206}
207
208impl<T, S> Drop for Tx<T, S> {
209 fn drop(&mut self) {
210 if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
211 return;
212 }
213
214 // Close the list, which sends a `Close` message
215 self.inner.tx.close();
216
217 // Notify the receiver
218 self.wake_rx();
219 }
220}
221
222// ===== impl Rx =====
223
224impl<T, S: Semaphore> Rx<T, S> {
225 fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
226 Rx { inner: chan }
227 }
228
229 pub(crate) fn close(&mut self) {
230 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
231 let rx_fields = unsafe { &mut *rx_fields_ptr };
232
233 if rx_fields.rx_closed {
234 return;
235 }
236
237 rx_fields.rx_closed = true;
238 });
239
240 self.inner.semaphore.close();
241 self.inner.notify_rx_closed.notify_waiters();
242 }
243
244 /// Receive the next value
245 pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
246 use super::block::Read;
247
248 ready!(crate::trace::trace_leaf(cx));
249
250 // Keep track of task budget
251 let coop = ready!(crate::runtime::coop::poll_proceed(cx));
252
253 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
254 let rx_fields = unsafe { &mut *rx_fields_ptr };
255
256 macro_rules! try_recv {
257 () => {
258 match rx_fields.list.pop(&self.inner.tx) {
259 Some(Read::Value(value)) => {
260 self.inner.semaphore.add_permit();
261 coop.made_progress();
262 return Ready(Some(value));
263 }
264 Some(Read::Closed) => {
265 // TODO: This check may not be required as it most
266 // likely can only return `true` at this point. A
267 // channel is closed when all tx handles are
268 // dropped. Dropping a tx handle releases memory,
269 // which ensures that if dropping the tx handle is
270 // visible, then all messages sent are also visible.
271 assert!(self.inner.semaphore.is_idle());
272 coop.made_progress();
273 return Ready(None);
274 }
275 None => {} // fall through
276 }
277 };
278 }
279
280 try_recv!();
281
282 self.inner.rx_waker.register_by_ref(cx.waker());
283
284 // It is possible that a value was pushed between attempting to read
285 // and registering the task, so we have to check the channel a
286 // second time here.
287 try_recv!();
288
289 if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
290 coop.made_progress();
291 Ready(None)
292 } else {
293 Pending
294 }
295 })
296 }
297
298 /// Receives up to `limit` values into `buffer`
299 ///
300 /// For `limit > 0`, receives up to limit values into `buffer`.
301 /// For `limit == 0`, immediately returns Ready(0).
302 pub(crate) fn recv_many(
303 &mut self,
304 cx: &mut Context<'_>,
305 buffer: &mut Vec<T>,
306 limit: usize,
307 ) -> Poll<usize> {
308 use super::block::Read;
309
310 ready!(crate::trace::trace_leaf(cx));
311
312 // Keep track of task budget
313 let coop = ready!(crate::runtime::coop::poll_proceed(cx));
314
315 if limit == 0 {
316 coop.made_progress();
317 return Ready(0usize);
318 }
319
320 let mut remaining = limit;
321 let initial_length = buffer.len();
322
323 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
324 let rx_fields = unsafe { &mut *rx_fields_ptr };
325 macro_rules! try_recv {
326 () => {
327 while remaining > 0 {
328 match rx_fields.list.pop(&self.inner.tx) {
329 Some(Read::Value(value)) => {
330 remaining -= 1;
331 buffer.push(value);
332 }
333
334 Some(Read::Closed) => {
335 let number_added = buffer.len() - initial_length;
336 if number_added > 0 {
337 self.inner.semaphore.add_permits(number_added);
338 }
339 // TODO: This check may not be required as it most
340 // likely can only return `true` at this point. A
341 // channel is closed when all tx handles are
342 // dropped. Dropping a tx handle releases memory,
343 // which ensures that if dropping the tx handle is
344 // visible, then all messages sent are also visible.
345 assert!(self.inner.semaphore.is_idle());
346 coop.made_progress();
347 return Ready(number_added);
348 }
349
350 None => {
351 break; // fall through
352 }
353 }
354 }
355 let number_added = buffer.len() - initial_length;
356 if number_added > 0 {
357 self.inner.semaphore.add_permits(number_added);
358 coop.made_progress();
359 return Ready(number_added);
360 }
361 };
362 }
363
364 try_recv!();
365
366 self.inner.rx_waker.register_by_ref(cx.waker());
367
368 // It is possible that a value was pushed between attempting to read
369 // and registering the task, so we have to check the channel a
370 // second time here.
371 try_recv!();
372
373 if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
374 assert!(buffer.is_empty());
375 coop.made_progress();
376 Ready(0usize)
377 } else {
378 Pending
379 }
380 })
381 }
382
383 /// Try to receive the next value.
384 pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
385 use super::list::TryPopResult;
386
387 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
388 let rx_fields = unsafe { &mut *rx_fields_ptr };
389
390 macro_rules! try_recv {
391 () => {
392 match rx_fields.list.try_pop(&self.inner.tx) {
393 TryPopResult::Ok(value) => {
394 self.inner.semaphore.add_permit();
395 return Ok(value);
396 }
397 TryPopResult::Closed => return Err(TryRecvError::Disconnected),
398 TryPopResult::Empty => return Err(TryRecvError::Empty),
399 TryPopResult::Busy => {} // fall through
400 }
401 };
402 }
403
404 try_recv!();
405
406 // If a previous `poll_recv` call has set a waker, we wake it here.
407 // This allows us to put our own CachedParkThread waker in the
408 // AtomicWaker slot instead.
409 //
410 // This is not a spurious wakeup to `poll_recv` since we just got a
411 // Busy from `try_pop`, which only happens if there are messages in
412 // the queue.
413 self.inner.rx_waker.wake();
414
415 // Park the thread until the problematic send has completed.
416 let mut park = CachedParkThread::new();
417 let waker = park.waker().unwrap();
418 loop {
419 self.inner.rx_waker.register_by_ref(&waker);
420 // It is possible that the problematic send has now completed,
421 // so we have to check for messages again.
422 try_recv!();
423 park.park();
424 }
425 })
426 }
427}
428
429impl<T, S: Semaphore> Drop for Rx<T, S> {
430 fn drop(&mut self) {
431 use super::block::Read::Value;
432
433 self.close();
434
435 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
436 let rx_fields = unsafe { &mut *rx_fields_ptr };
437
438 while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
439 self.inner.semaphore.add_permit();
440 }
441 });
442 }
443}
444
445// ===== impl Chan =====
446
447impl<T, S> Chan<T, S> {
448 fn send(&self, value: T) {
449 // Push the value
450 self.tx.push(value);
451
452 // Notify the rx task
453 self.rx_waker.wake();
454 }
455}
456
457impl<T, S> Drop for Chan<T, S> {
458 fn drop(&mut self) {
459 use super::block::Read::Value;
460
461 // Safety: the only owner of the rx fields is Chan, and being
462 // inside its own Drop means we're the last ones to touch it.
463 self.rx_fields.with_mut(|rx_fields_ptr| {
464 let rx_fields = unsafe { &mut *rx_fields_ptr };
465
466 while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
467 unsafe { rx_fields.list.free_blocks() };
468 });
469 }
470}
471
472// ===== impl Semaphore for (::Semaphore, capacity) =====
473
474impl Semaphore for bounded::Semaphore {
475 fn add_permit(&self) {
476 self.semaphore.release(1);
477 }
478
479 fn add_permits(&self, n: usize) {
480 self.semaphore.release(n)
481 }
482
483 fn is_idle(&self) -> bool {
484 self.semaphore.available_permits() == self.bound
485 }
486
487 fn close(&self) {
488 self.semaphore.close();
489 }
490
491 fn is_closed(&self) -> bool {
492 self.semaphore.is_closed()
493 }
494}
495
496// ===== impl Semaphore for AtomicUsize =====
497
498impl Semaphore for unbounded::Semaphore {
499 fn add_permit(&self) {
500 let prev = self.0.fetch_sub(2, Release);
501
502 if prev >> 1 == 0 {
503 // Something went wrong
504 process::abort();
505 }
506 }
507
508 fn add_permits(&self, n: usize) {
509 let prev = self.0.fetch_sub(n << 1, Release);
510
511 if (prev >> 1) < n {
512 // Something went wrong
513 process::abort();
514 }
515 }
516
517 fn is_idle(&self) -> bool {
518 self.0.load(Acquire) >> 1 == 0
519 }
520
521 fn close(&self) {
522 self.0.fetch_or(1, Release);
523 }
524
525 fn is_closed(&self) -> bool {
526 self.0.load(Acquire) & 1 == 1
527 }
528}
529