1use crate::loom::cell::UnsafeCell;
2use crate::loom::future::AtomicWaker;
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::loom::sync::Arc;
5use crate::runtime::park::CachedParkThread;
6use crate::sync::mpsc::error::TryRecvError;
7use crate::sync::mpsc::{bounded, list, unbounded};
8use crate::sync::notify::Notify;
9
10use std::fmt;
11use std::process;
12use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
13use std::task::Poll::{Pending, Ready};
14use std::task::{Context, Poll};
15
16/// Channel sender.
17pub(crate) struct Tx<T, S> {
18 inner: Arc<Chan<T, S>>,
19}
20
21impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
22 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
23 fmt.debug_struct("Tx").field(name:"inner", &self.inner).finish()
24 }
25}
26
27/// Channel receiver.
28pub(crate) struct Rx<T, S: Semaphore> {
29 inner: Arc<Chan<T, S>>,
30}
31
32impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> {
33 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
34 fmt.debug_struct("Rx").field(name:"inner", &self.inner).finish()
35 }
36}
37
38pub(crate) trait Semaphore {
39 fn is_idle(&self) -> bool;
40
41 fn add_permit(&self);
42
43 fn close(&self);
44
45 fn is_closed(&self) -> bool;
46}
47
48pub(super) struct Chan<T, S> {
49 /// Notifies all tasks listening for the receiver being dropped.
50 notify_rx_closed: Notify,
51
52 /// Handle to the push half of the lock-free list.
53 tx: list::Tx<T>,
54
55 /// Coordinates access to channel's capacity.
56 semaphore: S,
57
58 /// Receiver waker. Notified when a value is pushed into the channel.
59 rx_waker: AtomicWaker,
60
61 /// Tracks the number of outstanding sender handles.
62 ///
63 /// When this drops to zero, the send half of the channel is closed.
64 tx_count: AtomicUsize,
65
66 /// Only accessed by `Rx` handle.
67 rx_fields: UnsafeCell<RxFields<T>>,
68}
69
70impl<T, S> fmt::Debug for Chan<T, S>
71where
72 S: fmt::Debug,
73{
74 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
75 fmt&mut DebugStruct<'_, '_>.debug_struct("Chan")
76 .field("tx", &self.tx)
77 .field("semaphore", &self.semaphore)
78 .field("rx_waker", &self.rx_waker)
79 .field("tx_count", &self.tx_count)
80 .field(name:"rx_fields", &"...")
81 .finish()
82 }
83}
84
85/// Fields only accessed by `Rx` handle.
86struct RxFields<T> {
87 /// Channel receiver. This field is only accessed by the `Receiver` type.
88 list: list::Rx<T>,
89
90 /// `true` if `Rx::close` is called.
91 rx_closed: bool,
92}
93
94impl<T> fmt::Debug for RxFields<T> {
95 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
96 fmt&mut DebugStruct<'_, '_>.debug_struct("RxFields")
97 .field("list", &self.list)
98 .field(name:"rx_closed", &self.rx_closed)
99 .finish()
100 }
101}
102
103unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
104unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
105
106pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
107 let (tx: Tx, rx: Rx) = list::channel();
108
109 let chan: Arc> = Arc::new(data:Chan {
110 notify_rx_closed: Notify::new(),
111 tx,
112 semaphore,
113 rx_waker: AtomicWaker::new(),
114 tx_count: AtomicUsize::new(val:1),
115 rx_fields: UnsafeCell::new(data:RxFields {
116 list: rx,
117 rx_closed: false,
118 }),
119 });
120
121 (Tx::new(chan.clone()), Rx::new(chan))
122}
123
124// ===== impl Tx =====
125
126impl<T, S> Tx<T, S> {
127 fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
128 Tx { inner: chan }
129 }
130
131 pub(super) fn downgrade(&self) -> Arc<Chan<T, S>> {
132 self.inner.clone()
133 }
134
135 // Returns the upgraded channel or None if the upgrade failed.
136 pub(super) fn upgrade(chan: Arc<Chan<T, S>>) -> Option<Self> {
137 let mut tx_count = chan.tx_count.load(Acquire);
138
139 loop {
140 if tx_count == 0 {
141 // channel is closed
142 return None;
143 }
144
145 match chan
146 .tx_count
147 .compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire)
148 {
149 Ok(_) => return Some(Tx { inner: chan }),
150 Err(prev_count) => tx_count = prev_count,
151 }
152 }
153 }
154
155 pub(super) fn semaphore(&self) -> &S {
156 &self.inner.semaphore
157 }
158
159 /// Send a message and notify the receiver.
160 pub(crate) fn send(&self, value: T) {
161 self.inner.send(value);
162 }
163
164 /// Wake the receive half
165 pub(crate) fn wake_rx(&self) {
166 self.inner.rx_waker.wake();
167 }
168
169 /// Returns `true` if senders belong to the same channel.
170 pub(crate) fn same_channel(&self, other: &Self) -> bool {
171 Arc::ptr_eq(&self.inner, &other.inner)
172 }
173}
174
175impl<T, S: Semaphore> Tx<T, S> {
176 pub(crate) fn is_closed(&self) -> bool {
177 self.inner.semaphore.is_closed()
178 }
179
180 pub(crate) async fn closed(&self) {
181 // In order to avoid a race condition, we first request a notification,
182 // **then** check whether the semaphore is closed. If the semaphore is
183 // closed the notification request is dropped.
184 let notified: Notified<'_> = self.inner.notify_rx_closed.notified();
185
186 if self.inner.semaphore.is_closed() {
187 return;
188 }
189 notified.await;
190 }
191}
192
193impl<T, S> Clone for Tx<T, S> {
194 fn clone(&self) -> Tx<T, S> {
195 // Using a Relaxed ordering here is sufficient as the caller holds a
196 // strong ref to `self`, preventing a concurrent decrement to zero.
197 self.inner.tx_count.fetch_add(val:1, order:Relaxed);
198
199 Tx {
200 inner: self.inner.clone(),
201 }
202 }
203}
204
205impl<T, S> Drop for Tx<T, S> {
206 fn drop(&mut self) {
207 if self.inner.tx_count.fetch_sub(val:1, order:AcqRel) != 1 {
208 return;
209 }
210
211 // Close the list, which sends a `Close` message
212 self.inner.tx.close();
213
214 // Notify the receiver
215 self.wake_rx();
216 }
217}
218
219// ===== impl Rx =====
220
221impl<T, S: Semaphore> Rx<T, S> {
222 fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
223 Rx { inner: chan }
224 }
225
226 pub(crate) fn close(&mut self) {
227 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
228 let rx_fields = unsafe { &mut *rx_fields_ptr };
229
230 if rx_fields.rx_closed {
231 return;
232 }
233
234 rx_fields.rx_closed = true;
235 });
236
237 self.inner.semaphore.close();
238 self.inner.notify_rx_closed.notify_waiters();
239 }
240
241 /// Receive the next value
242 pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
243 use super::block::Read::*;
244
245 ready!(crate::trace::trace_leaf(cx));
246
247 // Keep track of task budget
248 let coop = ready!(crate::runtime::coop::poll_proceed(cx));
249
250 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
251 let rx_fields = unsafe { &mut *rx_fields_ptr };
252
253 macro_rules! try_recv {
254 () => {
255 match rx_fields.list.pop(&self.inner.tx) {
256 Some(Value(value)) => {
257 self.inner.semaphore.add_permit();
258 coop.made_progress();
259 return Ready(Some(value));
260 }
261 Some(Closed) => {
262 // TODO: This check may not be required as it most
263 // likely can only return `true` at this point. A
264 // channel is closed when all tx handles are
265 // dropped. Dropping a tx handle releases memory,
266 // which ensures that if dropping the tx handle is
267 // visible, then all messages sent are also visible.
268 assert!(self.inner.semaphore.is_idle());
269 coop.made_progress();
270 return Ready(None);
271 }
272 None => {} // fall through
273 }
274 };
275 }
276
277 try_recv!();
278
279 self.inner.rx_waker.register_by_ref(cx.waker());
280
281 // It is possible that a value was pushed between attempting to read
282 // and registering the task, so we have to check the channel a
283 // second time here.
284 try_recv!();
285
286 if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
287 coop.made_progress();
288 Ready(None)
289 } else {
290 Pending
291 }
292 })
293 }
294
295 /// Try to receive the next value.
296 pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
297 use super::list::TryPopResult;
298
299 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
300 let rx_fields = unsafe { &mut *rx_fields_ptr };
301
302 macro_rules! try_recv {
303 () => {
304 match rx_fields.list.try_pop(&self.inner.tx) {
305 TryPopResult::Ok(value) => {
306 self.inner.semaphore.add_permit();
307 return Ok(value);
308 }
309 TryPopResult::Closed => return Err(TryRecvError::Disconnected),
310 TryPopResult::Empty => return Err(TryRecvError::Empty),
311 TryPopResult::Busy => {} // fall through
312 }
313 };
314 }
315
316 try_recv!();
317
318 // If a previous `poll_recv` call has set a waker, we wake it here.
319 // This allows us to put our own CachedParkThread waker in the
320 // AtomicWaker slot instead.
321 //
322 // This is not a spurious wakeup to `poll_recv` since we just got a
323 // Busy from `try_pop`, which only happens if there are messages in
324 // the queue.
325 self.inner.rx_waker.wake();
326
327 // Park the thread until the problematic send has completed.
328 let mut park = CachedParkThread::new();
329 let waker = park.waker().unwrap();
330 loop {
331 self.inner.rx_waker.register_by_ref(&waker);
332 // It is possible that the problematic send has now completed,
333 // so we have to check for messages again.
334 try_recv!();
335 park.park();
336 }
337 })
338 }
339}
340
341impl<T, S: Semaphore> Drop for Rx<T, S> {
342 fn drop(&mut self) {
343 use super::block::Read::Value;
344
345 self.close();
346
347 self.inner.rx_fields.with_mut(|rx_fields_ptr: *mut RxFields| {
348 let rx_fields: &mut RxFields = unsafe { &mut *rx_fields_ptr };
349
350 while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
351 self.inner.semaphore.add_permit();
352 }
353 })
354 }
355}
356
357// ===== impl Chan =====
358
359impl<T, S> Chan<T, S> {
360 fn send(&self, value: T) {
361 // Push the value
362 self.tx.push(value);
363
364 // Notify the rx task
365 self.rx_waker.wake();
366 }
367}
368
369impl<T, S> Drop for Chan<T, S> {
370 fn drop(&mut self) {
371 use super::block::Read::Value;
372
373 // Safety: the only owner of the rx fields is Chan, and being
374 // inside its own Drop means we're the last ones to touch it.
375 self.rx_fields.with_mut(|rx_fields_ptr: *mut RxFields| {
376 let rx_fields: &mut RxFields = unsafe { &mut *rx_fields_ptr };
377
378 while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
379 unsafe { rx_fields.list.free_blocks() };
380 });
381 }
382}
383
384// ===== impl Semaphore for (::Semaphore, capacity) =====
385
386impl Semaphore for bounded::Semaphore {
387 fn add_permit(&self) {
388 self.semaphore.release(added:1)
389 }
390
391 fn is_idle(&self) -> bool {
392 self.semaphore.available_permits() == self.bound
393 }
394
395 fn close(&self) {
396 self.semaphore.close();
397 }
398
399 fn is_closed(&self) -> bool {
400 self.semaphore.is_closed()
401 }
402}
403
404// ===== impl Semaphore for AtomicUsize =====
405
406impl Semaphore for unbounded::Semaphore {
407 fn add_permit(&self) {
408 let prev: usize = self.0.fetch_sub(val:2, order:Release);
409
410 if prev >> 1 == 0 {
411 // Something went wrong
412 process::abort();
413 }
414 }
415
416 fn is_idle(&self) -> bool {
417 self.0.load(order:Acquire) >> 1 == 0
418 }
419
420 fn close(&self) {
421 self.0.fetch_or(val:1, order:Release);
422 }
423
424 fn is_closed(&self) -> bool {
425 self.0.load(order:Acquire) & 1 == 1
426 }
427}
428