1// Allow `unreachable_pub` warnings when sync is not enabled
2// due to the usage of `Notify` within the `rt` feature set.
3// When this module is compiled with `sync` enabled we will warn on
4// this lint. When `rt` is enabled we use `pub(crate)` which
5// triggers this warning but it is safe to ignore in this case.
6#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7
8use crate::loom::cell::UnsafeCell;
9use crate::loom::sync::atomic::AtomicUsize;
10use crate::loom::sync::Mutex;
11use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
12use crate::util::WakeList;
13
14use std::future::Future;
15use std::marker::PhantomPinned;
16use std::panic::{RefUnwindSafe, UnwindSafe};
17use std::pin::Pin;
18use std::ptr::NonNull;
19use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20use std::task::{Context, Poll, Waker};
21
22type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
23type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
24
25/// Notifies a single task to wake up.
26///
27/// `Notify` provides a basic mechanism to notify a single task of an event.
28/// `Notify` itself does not carry any data. Instead, it is to be used to signal
29/// another task to perform an operation.
30///
31/// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
32/// [`notified().await`] method waits for a permit to become available, and
33/// [`notify_one()`] sets a permit **if there currently are no available
34/// permits**.
35///
36/// The synchronization details of `Notify` are similar to
37/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
38/// value contains a single permit. [`notified().await`] waits for the permit to
39/// be made available, consumes the permit, and resumes. [`notify_one()`] sets
40/// the permit, waking a pending task if there is one.
41///
42/// If `notify_one()` is called **before** `notified().await`, then the next
43/// call to `notified().await` will complete immediately, consuming the permit.
44/// Any subsequent calls to `notified().await` will wait for a new permit.
45///
46/// If `notify_one()` is called **multiple** times before `notified().await`,
47/// only a **single** permit is stored. The next call to `notified().await` will
48/// complete immediately, but the one after will wait for a new permit.
49///
50/// # Examples
51///
52/// Basic usage.
53///
54/// ```
55/// use tokio::sync::Notify;
56/// use std::sync::Arc;
57///
58/// #[tokio::main]
59/// async fn main() {
60/// let notify = Arc::new(Notify::new());
61/// let notify2 = notify.clone();
62///
63/// let handle = tokio::spawn(async move {
64/// notify2.notified().await;
65/// println!("received notification");
66/// });
67///
68/// println!("sending notification");
69/// notify.notify_one();
70///
71/// // Wait for task to receive notification.
72/// handle.await.unwrap();
73/// }
74/// ```
75///
76/// Unbound multi-producer single-consumer (mpsc) channel.
77///
78/// No wakeups can be lost when using this channel because the call to
79/// `notify_one()` will store a permit in the `Notify`, which the following call
80/// to `notified()` will consume.
81///
82/// ```
83/// use tokio::sync::Notify;
84///
85/// use std::collections::VecDeque;
86/// use std::sync::Mutex;
87///
88/// struct Channel<T> {
89/// values: Mutex<VecDeque<T>>,
90/// notify: Notify,
91/// }
92///
93/// impl<T> Channel<T> {
94/// pub fn send(&self, value: T) {
95/// self.values.lock().unwrap()
96/// .push_back(value);
97///
98/// // Notify the consumer a value is available
99/// self.notify.notify_one();
100/// }
101///
102/// // This is a single-consumer channel, so several concurrent calls to
103/// // `recv` are not allowed.
104/// pub async fn recv(&self) -> T {
105/// loop {
106/// // Drain values
107/// if let Some(value) = self.values.lock().unwrap().pop_front() {
108/// return value;
109/// }
110///
111/// // Wait for values to be available
112/// self.notify.notified().await;
113/// }
114/// }
115/// }
116/// ```
117///
118/// Unbound multi-producer multi-consumer (mpmc) channel.
119///
120/// The call to [`enable`] is important because otherwise if you have two
121/// calls to `recv` and two calls to `send` in parallel, the following could
122/// happen:
123///
124/// 1. Both calls to `try_recv` return `None`.
125/// 2. Both new elements are added to the vector.
126/// 3. The `notify_one` method is called twice, adding only a single
127/// permit to the `Notify`.
128/// 4. Both calls to `recv` reach the `Notified` future. One of them
129/// consumes the permit, and the other sleeps forever.
130///
131/// By adding the `Notified` futures to the list by calling `enable` before
132/// `try_recv`, the `notify_one` calls in step three would remove the
133/// futures from the list and mark them notified instead of adding a permit
134/// to the `Notify`. This ensures that both futures are woken.
135///
136/// Notice that this failure can only happen if there are two concurrent calls
137/// to `recv`. This is why the mpsc example above does not require a call to
138/// `enable`.
139///
140/// ```
141/// use tokio::sync::Notify;
142///
143/// use std::collections::VecDeque;
144/// use std::sync::Mutex;
145///
146/// struct Channel<T> {
147/// messages: Mutex<VecDeque<T>>,
148/// notify_on_sent: Notify,
149/// }
150///
151/// impl<T> Channel<T> {
152/// pub fn send(&self, msg: T) {
153/// let mut locked_queue = self.messages.lock().unwrap();
154/// locked_queue.push_back(msg);
155/// drop(locked_queue);
156///
157/// // Send a notification to one of the calls currently
158/// // waiting in a call to `recv`.
159/// self.notify_on_sent.notify_one();
160/// }
161///
162/// pub fn try_recv(&self) -> Option<T> {
163/// let mut locked_queue = self.messages.lock().unwrap();
164/// locked_queue.pop_front()
165/// }
166///
167/// pub async fn recv(&self) -> T {
168/// let future = self.notify_on_sent.notified();
169/// tokio::pin!(future);
170///
171/// loop {
172/// // Make sure that no wakeup is lost if we get
173/// // `None` from `try_recv`.
174/// future.as_mut().enable();
175///
176/// if let Some(msg) = self.try_recv() {
177/// return msg;
178/// }
179///
180/// // Wait for a call to `notify_one`.
181/// //
182/// // This uses `.as_mut()` to avoid consuming the future,
183/// // which lets us call `Pin::set` below.
184/// future.as_mut().await;
185///
186/// // Reset the future in case another call to
187/// // `try_recv` got the message before us.
188/// future.set(self.notify_on_sent.notified());
189/// }
190/// }
191/// }
192/// ```
193///
194/// [park]: std::thread::park
195/// [unpark]: std::thread::Thread::unpark
196/// [`notified().await`]: Notify::notified()
197/// [`notify_one()`]: Notify::notify_one()
198/// [`enable`]: Notified::enable()
199/// [`Semaphore`]: crate::sync::Semaphore
200#[derive(Debug)]
201pub struct Notify {
202 // `state` uses 2 bits to store one of `EMPTY`,
203 // `WAITING` or `NOTIFIED`. The rest of the bits
204 // are used to store the number of times `notify_waiters`
205 // was called.
206 //
207 // Throughout the code there are two assumptions:
208 // - state can be transitioned *from* `WAITING` only if
209 // `waiters` lock is held
210 // - number of times `notify_waiters` was called can
211 // be modified only if `waiters` lock is held
212 state: AtomicUsize,
213 waiters: Mutex<WaitList>,
214}
215
216#[derive(Debug)]
217struct Waiter {
218 /// Intrusive linked-list pointers.
219 pointers: linked_list::Pointers<Waiter>,
220
221 /// Waiting task's waker. Depending on the value of `notification`,
222 /// this field is either protected by the `waiters` lock in
223 /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
224 waker: UnsafeCell<Option<Waker>>,
225
226 /// Notification for this waiter.
227 /// * if it's `None`, then `waker` is protected by the `waiters` lock.
228 /// * if it's `Some`, then `waker` is exclusively owned by the
229 /// enclosing `Waiter` and can be accessed without locking.
230 notification: AtomicNotification,
231
232 /// Should not be `Unpin`.
233 _p: PhantomPinned,
234}
235
236impl Waiter {
237 fn new() -> Waiter {
238 Waiter {
239 pointers: linked_list::Pointers::new(),
240 waker: UnsafeCell::new(data:None),
241 notification: AtomicNotification::none(),
242 _p: PhantomPinned,
243 }
244 }
245}
246
247generate_addr_of_methods! {
248 impl<> Waiter {
249 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
250 &self.pointers
251 }
252 }
253}
254
255// No notification.
256const NOTIFICATION_NONE: usize = 0;
257
258// Notification type used by `notify_one`.
259const NOTIFICATION_ONE: usize = 1;
260
261// Notification type used by `notify_waiters`.
262const NOTIFICATION_ALL: usize = 2;
263
264/// Notification for a `Waiter`.
265/// This struct is equivalent to `Option<Notification>`, but uses
266/// `AtomicUsize` inside for atomic operations.
267#[derive(Debug)]
268struct AtomicNotification(AtomicUsize);
269
270impl AtomicNotification {
271 fn none() -> Self {
272 AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
273 }
274
275 /// Store-release a notification.
276 /// This method should be called exactly once.
277 fn store_release(&self, notification: Notification) {
278 self.0.store(notification as usize, Release);
279 }
280
281 fn load(&self, ordering: Ordering) -> Option<Notification> {
282 match self.0.load(ordering) {
283 NOTIFICATION_NONE => None,
284 NOTIFICATION_ONE => Some(Notification::One),
285 NOTIFICATION_ALL => Some(Notification::All),
286 _ => unreachable!(),
287 }
288 }
289
290 /// Clears the notification.
291 /// This method is used by a `Notified` future to consume the
292 /// notification. It uses relaxed ordering and should be only
293 /// used once the atomic notification is no longer shared.
294 fn clear(&self) {
295 self.0.store(NOTIFICATION_NONE, Relaxed);
296 }
297}
298
299#[derive(Debug, PartialEq, Eq)]
300#[repr(usize)]
301enum Notification {
302 One = NOTIFICATION_ONE,
303 All = NOTIFICATION_ALL,
304}
305
306/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
307/// and gates the access to it on `notify.waiters` mutex. It also empties
308/// the list on drop.
309struct NotifyWaitersList<'a> {
310 list: GuardedWaitList,
311 is_empty: bool,
312 notify: &'a Notify,
313}
314
315impl<'a> NotifyWaitersList<'a> {
316 fn new(
317 unguarded_list: WaitList,
318 guard: Pin<&'a Waiter>,
319 notify: &'a Notify,
320 ) -> NotifyWaitersList<'a> {
321 let guard_ptr = NonNull::from(guard.get_ref());
322 let list = unguarded_list.into_guarded(guard_ptr);
323 NotifyWaitersList {
324 list,
325 is_empty: false,
326 notify,
327 }
328 }
329
330 /// Removes the last element from the guarded list. Modifying this list
331 /// requires an exclusive access to the main list in `Notify`.
332 fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
333 let result = self.list.pop_back();
334 if result.is_none() {
335 // Save information about emptiness to avoid waiting for lock
336 // in the destructor.
337 self.is_empty = true;
338 }
339 result
340 }
341}
342
343impl Drop for NotifyWaitersList<'_> {
344 fn drop(&mut self) {
345 // If the list is not empty, we unlink all waiters from it.
346 // We do not wake the waiters to avoid double panics.
347 if !self.is_empty {
348 let _lock_guard: MutexGuard<'_, LinkedList<…, …>> = self.notify.waiters.lock();
349 while let Some(waiter: NonNull) = self.list.pop_back() {
350 // Safety: we never make mutable references to waiters.
351 let waiter: &Waiter = unsafe { waiter.as_ref() };
352 waiter.notification.store_release(Notification::All);
353 }
354 }
355 }
356}
357
358/// Future returned from [`Notify::notified()`].
359///
360/// This future is fused, so once it has completed, any future calls to poll
361/// will immediately return `Poll::Ready`.
362#[derive(Debug)]
363pub struct Notified<'a> {
364 /// The `Notify` being received on.
365 notify: &'a Notify,
366
367 /// The current state of the receiving process.
368 state: State,
369
370 /// Number of calls to `notify_waiters` at the time of creation.
371 notify_waiters_calls: usize,
372
373 /// Entry in the waiter `LinkedList`.
374 waiter: Waiter,
375}
376
377unsafe impl<'a> Send for Notified<'a> {}
378unsafe impl<'a> Sync for Notified<'a> {}
379
380#[derive(Debug)]
381enum State {
382 Init,
383 Waiting,
384 Done,
385}
386
387const NOTIFY_WAITERS_SHIFT: usize = 2;
388const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
389const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
390
391/// Initial "idle" state.
392const EMPTY: usize = 0;
393
394/// One or more threads are currently waiting to be notified.
395const WAITING: usize = 1;
396
397/// Pending notification.
398const NOTIFIED: usize = 2;
399
400fn set_state(data: usize, state: usize) -> usize {
401 (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
402}
403
404fn get_state(data: usize) -> usize {
405 data & STATE_MASK
406}
407
408fn get_num_notify_waiters_calls(data: usize) -> usize {
409 (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
410}
411
412fn inc_num_notify_waiters_calls(data: usize) -> usize {
413 data + (1 << NOTIFY_WAITERS_SHIFT)
414}
415
416fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
417 data.fetch_add(val:1 << NOTIFY_WAITERS_SHIFT, order:SeqCst);
418}
419
420impl Notify {
421 /// Create a new `Notify`, initialized without a permit.
422 ///
423 /// # Examples
424 ///
425 /// ```
426 /// use tokio::sync::Notify;
427 ///
428 /// let notify = Notify::new();
429 /// ```
430 pub fn new() -> Notify {
431 Notify {
432 state: AtomicUsize::new(0),
433 waiters: Mutex::new(LinkedList::new()),
434 }
435 }
436
437 /// Create a new `Notify`, initialized without a permit.
438 ///
439 /// # Examples
440 ///
441 /// ```
442 /// use tokio::sync::Notify;
443 ///
444 /// static NOTIFY: Notify = Notify::const_new();
445 /// ```
446 #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
447 #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
448 pub const fn const_new() -> Notify {
449 Notify {
450 state: AtomicUsize::new(0),
451 waiters: Mutex::const_new(LinkedList::new()),
452 }
453 }
454
455 /// Wait for a notification.
456 ///
457 /// Equivalent to:
458 ///
459 /// ```ignore
460 /// async fn notified(&self);
461 /// ```
462 ///
463 /// Each `Notify` value holds a single permit. If a permit is available from
464 /// an earlier call to [`notify_one()`], then `notified().await` will complete
465 /// immediately, consuming that permit. Otherwise, `notified().await` waits
466 /// for a permit to be made available by the next call to `notify_one()`.
467 ///
468 /// The `Notified` future is not guaranteed to receive wakeups from calls to
469 /// `notify_one()` if it has not yet been polled. See the documentation for
470 /// [`Notified::enable()`] for more details.
471 ///
472 /// The `Notified` future is guaranteed to receive wakeups from
473 /// `notify_waiters()` as soon as it has been created, even if it has not
474 /// yet been polled.
475 ///
476 /// [`notify_one()`]: Notify::notify_one
477 /// [`Notified::enable()`]: Notified::enable
478 ///
479 /// # Cancel safety
480 ///
481 /// This method uses a queue to fairly distribute notifications in the order
482 /// they were requested. Cancelling a call to `notified` makes you lose your
483 /// place in the queue.
484 ///
485 /// # Examples
486 ///
487 /// ```
488 /// use tokio::sync::Notify;
489 /// use std::sync::Arc;
490 ///
491 /// #[tokio::main]
492 /// async fn main() {
493 /// let notify = Arc::new(Notify::new());
494 /// let notify2 = notify.clone();
495 ///
496 /// tokio::spawn(async move {
497 /// notify2.notified().await;
498 /// println!("received notification");
499 /// });
500 ///
501 /// println!("sending notification");
502 /// notify.notify_one();
503 /// }
504 /// ```
505 pub fn notified(&self) -> Notified<'_> {
506 // we load the number of times notify_waiters
507 // was called and store that in the future.
508 let state = self.state.load(SeqCst);
509 Notified {
510 notify: self,
511 state: State::Init,
512 notify_waiters_calls: get_num_notify_waiters_calls(state),
513 waiter: Waiter::new(),
514 }
515 }
516
517 /// Notifies a waiting task.
518 ///
519 /// If a task is currently waiting, that task is notified. Otherwise, a
520 /// permit is stored in this `Notify` value and the **next** call to
521 /// [`notified().await`] will complete immediately consuming the permit made
522 /// available by this call to `notify_one()`.
523 ///
524 /// At most one permit may be stored by `Notify`. Many sequential calls to
525 /// `notify_one` will result in a single permit being stored. The next call to
526 /// `notified().await` will complete immediately, but the one after that
527 /// will wait.
528 ///
529 /// [`notified().await`]: Notify::notified()
530 ///
531 /// # Examples
532 ///
533 /// ```
534 /// use tokio::sync::Notify;
535 /// use std::sync::Arc;
536 ///
537 /// #[tokio::main]
538 /// async fn main() {
539 /// let notify = Arc::new(Notify::new());
540 /// let notify2 = notify.clone();
541 ///
542 /// tokio::spawn(async move {
543 /// notify2.notified().await;
544 /// println!("received notification");
545 /// });
546 ///
547 /// println!("sending notification");
548 /// notify.notify_one();
549 /// }
550 /// ```
551 // Alias for old name in 0.x
552 #[cfg_attr(docsrs, doc(alias = "notify"))]
553 pub fn notify_one(&self) {
554 // Load the current state
555 let mut curr = self.state.load(SeqCst);
556
557 // If the state is `EMPTY`, transition to `NOTIFIED` and return.
558 while let EMPTY | NOTIFIED = get_state(curr) {
559 // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
560 // happens-before synchronization must happen between this atomic
561 // operation and a task calling `notified().await`.
562 let new = set_state(curr, NOTIFIED);
563 let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
564
565 match res {
566 // No waiters, no further work to do
567 Ok(_) => return,
568 Err(actual) => {
569 curr = actual;
570 }
571 }
572 }
573
574 // There are waiters, the lock must be acquired to notify.
575 let mut waiters = self.waiters.lock();
576
577 // The state must be reloaded while the lock is held. The state may only
578 // transition out of WAITING while the lock is held.
579 curr = self.state.load(SeqCst);
580
581 if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
582 drop(waiters);
583 waker.wake();
584 }
585 }
586
587 /// Notifies all waiting tasks.
588 ///
589 /// If a task is currently waiting, that task is notified. Unlike with
590 /// `notify_one()`, no permit is stored to be used by the next call to
591 /// `notified().await`. The purpose of this method is to notify all
592 /// already registered waiters. Registering for notification is done by
593 /// acquiring an instance of the `Notified` future via calling `notified()`.
594 ///
595 /// # Examples
596 ///
597 /// ```
598 /// use tokio::sync::Notify;
599 /// use std::sync::Arc;
600 ///
601 /// #[tokio::main]
602 /// async fn main() {
603 /// let notify = Arc::new(Notify::new());
604 /// let notify2 = notify.clone();
605 ///
606 /// let notified1 = notify.notified();
607 /// let notified2 = notify.notified();
608 ///
609 /// let handle = tokio::spawn(async move {
610 /// println!("sending notifications");
611 /// notify2.notify_waiters();
612 /// });
613 ///
614 /// notified1.await;
615 /// notified2.await;
616 /// println!("received notifications");
617 /// }
618 /// ```
619 pub fn notify_waiters(&self) {
620 let mut waiters = self.waiters.lock();
621
622 // The state must be loaded while the lock is held. The state may only
623 // transition out of WAITING while the lock is held.
624 let curr = self.state.load(SeqCst);
625
626 if matches!(get_state(curr), EMPTY | NOTIFIED) {
627 // There are no waiting tasks. All we need to do is increment the
628 // number of times this method was called.
629 atomic_inc_num_notify_waiters_calls(&self.state);
630 return;
631 }
632
633 // Increment the number of times this method was called
634 // and transition to empty.
635 let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
636 self.state.store(new_state, SeqCst);
637
638 // It is critical for `GuardedLinkedList` safety that the guard node is
639 // pinned in memory and is not dropped until the guarded list is dropped.
640 let guard = Waiter::new();
641 pin!(guard);
642
643 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
644 // underneath to allow every waiter to safely remove itself from it.
645 //
646 // * This list will be still guarded by the `waiters` lock.
647 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
648 // * This wrapper will empty the list on drop. It is critical for safety
649 // that we will not leave any list entry with a pointer to the local
650 // guard node after this function returns / panics.
651 let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
652
653 let mut wakers = WakeList::new();
654 'outer: loop {
655 while wakers.can_push() {
656 match list.pop_back_locked(&mut waiters) {
657 Some(waiter) => {
658 // Safety: we never make mutable references to waiters.
659 let waiter = unsafe { waiter.as_ref() };
660
661 // Safety: we hold the lock, so we can access the waker.
662 if let Some(waker) =
663 unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
664 {
665 wakers.push(waker);
666 }
667
668 // This waiter is unlinked and will not be shared ever again, release it.
669 waiter.notification.store_release(Notification::All);
670 }
671 None => {
672 break 'outer;
673 }
674 }
675 }
676
677 // Release the lock before notifying.
678 drop(waiters);
679
680 // One of the wakers may panic, but the remaining waiters will still
681 // be unlinked from the list in `NotifyWaitersList` destructor.
682 wakers.wake_all();
683
684 // Acquire the lock again.
685 waiters = self.waiters.lock();
686 }
687
688 // Release the lock before notifying
689 drop(waiters);
690
691 wakers.wake_all();
692 }
693}
694
695impl Default for Notify {
696 fn default() -> Notify {
697 Notify::new()
698 }
699}
700
701impl UnwindSafe for Notify {}
702impl RefUnwindSafe for Notify {}
703
704fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
705 loop {
706 match get_state(curr) {
707 EMPTY | NOTIFIED => {
708 let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
709
710 match res {
711 Ok(_) => return None,
712 Err(actual) => {
713 let actual_state = get_state(actual);
714 assert!(actual_state == EMPTY || actual_state == NOTIFIED);
715 state.store(set_state(actual, NOTIFIED), SeqCst);
716 return None;
717 }
718 }
719 }
720 WAITING => {
721 // At this point, it is guaranteed that the state will not
722 // concurrently change as holding the lock is required to
723 // transition **out** of `WAITING`.
724 //
725 // Get a pending waiter
726 let waiter = waiters.pop_back().unwrap();
727
728 // Safety: we never make mutable references to waiters.
729 let waiter = unsafe { waiter.as_ref() };
730
731 // Safety: we hold the lock, so we can access the waker.
732 let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
733
734 // This waiter is unlinked and will not be shared ever again, release it.
735 waiter.notification.store_release(Notification::One);
736
737 if waiters.is_empty() {
738 // As this the **final** waiter in the list, the state
739 // must be transitioned to `EMPTY`. As transitioning
740 // **from** `WAITING` requires the lock to be held, a
741 // `store` is sufficient.
742 state.store(set_state(curr, EMPTY), SeqCst);
743 }
744
745 return waker;
746 }
747 _ => unreachable!(),
748 }
749 }
750}
751
752// ===== impl Notified =====
753
754impl Notified<'_> {
755 /// Adds this future to the list of futures that are ready to receive
756 /// wakeups from calls to [`notify_one`].
757 ///
758 /// Polling the future also adds it to the list, so this method should only
759 /// be used if you want to add the future to the list before the first call
760 /// to `poll`. (In fact, this method is equivalent to calling `poll` except
761 /// that no `Waker` is registered.)
762 ///
763 /// This has no effect on notifications sent using [`notify_waiters`], which
764 /// are received as long as they happen after the creation of the `Notified`
765 /// regardless of whether `enable` or `poll` has been called.
766 ///
767 /// This method returns true if the `Notified` is ready. This happens in the
768 /// following situations:
769 ///
770 /// 1. The `notify_waiters` method was called between the creation of the
771 /// `Notified` and the call to this method.
772 /// 2. This is the first call to `enable` or `poll` on this future, and the
773 /// `Notify` was holding a permit from a previous call to `notify_one`.
774 /// The call consumes the permit in that case.
775 /// 3. The future has previously been enabled or polled, and it has since
776 /// then been marked ready by either consuming a permit from the
777 /// `Notify`, or by a call to `notify_one` or `notify_waiters` that
778 /// removed it from the list of futures ready to receive wakeups.
779 ///
780 /// If this method returns true, any future calls to poll on the same future
781 /// will immediately return `Poll::Ready`.
782 ///
783 /// # Examples
784 ///
785 /// Unbound multi-producer multi-consumer (mpmc) channel.
786 ///
787 /// The call to `enable` is important because otherwise if you have two
788 /// calls to `recv` and two calls to `send` in parallel, the following could
789 /// happen:
790 ///
791 /// 1. Both calls to `try_recv` return `None`.
792 /// 2. Both new elements are added to the vector.
793 /// 3. The `notify_one` method is called twice, adding only a single
794 /// permit to the `Notify`.
795 /// 4. Both calls to `recv` reach the `Notified` future. One of them
796 /// consumes the permit, and the other sleeps forever.
797 ///
798 /// By adding the `Notified` futures to the list by calling `enable` before
799 /// `try_recv`, the `notify_one` calls in step three would remove the
800 /// futures from the list and mark them notified instead of adding a permit
801 /// to the `Notify`. This ensures that both futures are woken.
802 ///
803 /// ```
804 /// use tokio::sync::Notify;
805 ///
806 /// use std::collections::VecDeque;
807 /// use std::sync::Mutex;
808 ///
809 /// struct Channel<T> {
810 /// messages: Mutex<VecDeque<T>>,
811 /// notify_on_sent: Notify,
812 /// }
813 ///
814 /// impl<T> Channel<T> {
815 /// pub fn send(&self, msg: T) {
816 /// let mut locked_queue = self.messages.lock().unwrap();
817 /// locked_queue.push_back(msg);
818 /// drop(locked_queue);
819 ///
820 /// // Send a notification to one of the calls currently
821 /// // waiting in a call to `recv`.
822 /// self.notify_on_sent.notify_one();
823 /// }
824 ///
825 /// pub fn try_recv(&self) -> Option<T> {
826 /// let mut locked_queue = self.messages.lock().unwrap();
827 /// locked_queue.pop_front()
828 /// }
829 ///
830 /// pub async fn recv(&self) -> T {
831 /// let future = self.notify_on_sent.notified();
832 /// tokio::pin!(future);
833 ///
834 /// loop {
835 /// // Make sure that no wakeup is lost if we get
836 /// // `None` from `try_recv`.
837 /// future.as_mut().enable();
838 ///
839 /// if let Some(msg) = self.try_recv() {
840 /// return msg;
841 /// }
842 ///
843 /// // Wait for a call to `notify_one`.
844 /// //
845 /// // This uses `.as_mut()` to avoid consuming the future,
846 /// // which lets us call `Pin::set` below.
847 /// future.as_mut().await;
848 ///
849 /// // Reset the future in case another call to
850 /// // `try_recv` got the message before us.
851 /// future.set(self.notify_on_sent.notified());
852 /// }
853 /// }
854 /// }
855 /// ```
856 ///
857 /// [`notify_one`]: Notify::notify_one()
858 /// [`notify_waiters`]: Notify::notify_waiters()
859 pub fn enable(self: Pin<&mut Self>) -> bool {
860 self.poll_notified(None).is_ready()
861 }
862
863 /// A custom `project` implementation is used in place of `pin-project-lite`
864 /// as a custom drop implementation is needed.
865 fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) {
866 unsafe {
867 // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
868
869 is_unpin::<&Notify>();
870 is_unpin::<State>();
871 is_unpin::<usize>();
872
873 let me = self.get_unchecked_mut();
874 (
875 me.notify,
876 &mut me.state,
877 &me.notify_waiters_calls,
878 &me.waiter,
879 )
880 }
881 }
882
883 fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
884 use State::*;
885
886 let (notify, state, notify_waiters_calls, waiter) = self.project();
887
888 'outer_loop: loop {
889 match *state {
890 Init => {
891 let curr = notify.state.load(SeqCst);
892
893 // Optimistically try acquiring a pending notification
894 let res = notify.state.compare_exchange(
895 set_state(curr, NOTIFIED),
896 set_state(curr, EMPTY),
897 SeqCst,
898 SeqCst,
899 );
900
901 if res.is_ok() {
902 // Acquired the notification
903 *state = Done;
904 continue 'outer_loop;
905 }
906
907 // Clone the waker before locking, a waker clone can be
908 // triggering arbitrary code.
909 let waker = waker.cloned();
910
911 // Acquire the lock and attempt to transition to the waiting
912 // state.
913 let mut waiters = notify.waiters.lock();
914
915 // Reload the state with the lock held
916 let mut curr = notify.state.load(SeqCst);
917
918 // if notify_waiters has been called after the future
919 // was created, then we are done
920 if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
921 *state = Done;
922 continue 'outer_loop;
923 }
924
925 // Transition the state to WAITING.
926 loop {
927 match get_state(curr) {
928 EMPTY => {
929 // Transition to WAITING
930 let res = notify.state.compare_exchange(
931 set_state(curr, EMPTY),
932 set_state(curr, WAITING),
933 SeqCst,
934 SeqCst,
935 );
936
937 if let Err(actual) = res {
938 assert_eq!(get_state(actual), NOTIFIED);
939 curr = actual;
940 } else {
941 break;
942 }
943 }
944 WAITING => break,
945 NOTIFIED => {
946 // Try consuming the notification
947 let res = notify.state.compare_exchange(
948 set_state(curr, NOTIFIED),
949 set_state(curr, EMPTY),
950 SeqCst,
951 SeqCst,
952 );
953
954 match res {
955 Ok(_) => {
956 // Acquired the notification
957 *state = Done;
958 continue 'outer_loop;
959 }
960 Err(actual) => {
961 assert_eq!(get_state(actual), EMPTY);
962 curr = actual;
963 }
964 }
965 }
966 _ => unreachable!(),
967 }
968 }
969
970 let mut old_waker = None;
971 if waker.is_some() {
972 // Safety: called while locked.
973 //
974 // The use of `old_waiter` here is not necessary, as the field is always
975 // None when we reach this line.
976 unsafe {
977 old_waker =
978 waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
979 }
980 }
981
982 // Insert the waiter into the linked list
983 waiters.push_front(NonNull::from(waiter));
984
985 *state = Waiting;
986
987 drop(waiters);
988 drop(old_waker);
989
990 return Poll::Pending;
991 }
992 Waiting => {
993 #[cfg(tokio_taskdump)]
994 if let Some(waker) = waker {
995 let mut ctx = Context::from_waker(waker);
996 ready!(crate::trace::trace_leaf(&mut ctx));
997 }
998
999 if waiter.notification.load(Acquire).is_some() {
1000 // Safety: waiter is already unlinked and will not be shared again,
1001 // so we have an exclusive access to `waker`.
1002 drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
1003
1004 waiter.notification.clear();
1005 *state = Done;
1006 return Poll::Ready(());
1007 }
1008
1009 // Our waiter was not notified, implying it is still stored in a waiter
1010 // list (guarded by `notify.waiters`). In order to access the waker
1011 // fields, we must acquire the lock.
1012
1013 let mut old_waker = None;
1014 let mut waiters = notify.waiters.lock();
1015
1016 // We hold the lock and notifications are set only with the lock held,
1017 // so this can be relaxed, because the happens-before relationship is
1018 // established through the mutex.
1019 if waiter.notification.load(Relaxed).is_some() {
1020 // Safety: waiter is already unlinked and will not be shared again,
1021 // so we have an exclusive access to `waker`.
1022 old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1023
1024 waiter.notification.clear();
1025
1026 // Drop the old waker after releasing the lock.
1027 drop(waiters);
1028 drop(old_waker);
1029
1030 *state = Done;
1031 return Poll::Ready(());
1032 }
1033
1034 // Load the state with the lock held.
1035 let curr = notify.state.load(SeqCst);
1036
1037 if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1038 // Before we add a waiter to the list we check if these numbers are
1039 // different while holding the lock. If these numbers are different now,
1040 // it means that there is a call to `notify_waiters` in progress and this
1041 // waiter must be contained by a guarded list used in `notify_waiters`.
1042 // We can treat the waiter as notified and remove it from the list, as
1043 // it would have been notified in the `notify_waiters` call anyways.
1044
1045 // Safety: we hold the lock, so we can modify the waker.
1046 old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1047
1048 // Safety: we hold the lock, so we have an exclusive access to the list.
1049 // The list is used in `notify_waiters`, so it must be guarded.
1050 unsafe { waiters.remove(NonNull::from(waiter)) };
1051
1052 *state = Done;
1053 } else {
1054 // Safety: we hold the lock, so we can modify the waker.
1055 unsafe {
1056 waiter.waker.with_mut(|v| {
1057 if let Some(waker) = waker {
1058 let should_update = match &*v {
1059 Some(current_waker) => !current_waker.will_wake(waker),
1060 None => true,
1061 };
1062 if should_update {
1063 old_waker = std::mem::replace(&mut *v, Some(waker.clone()));
1064 }
1065 }
1066 });
1067 }
1068
1069 // Drop the old waker after releasing the lock.
1070 drop(waiters);
1071 drop(old_waker);
1072
1073 return Poll::Pending;
1074 }
1075
1076 // Explicit drop of the lock to indicate the scope that the
1077 // lock is held. Because holding the lock is required to
1078 // ensure safe access to fields not held within the lock, it
1079 // is helpful to visualize the scope of the critical
1080 // section.
1081 drop(waiters);
1082
1083 // Drop the old waker after releasing the lock.
1084 drop(old_waker);
1085 }
1086 Done => {
1087 #[cfg(tokio_taskdump)]
1088 if let Some(waker) = waker {
1089 let mut ctx = Context::from_waker(waker);
1090 ready!(crate::trace::trace_leaf(&mut ctx));
1091 }
1092 return Poll::Ready(());
1093 }
1094 }
1095 }
1096 }
1097}
1098
1099impl Future for Notified<'_> {
1100 type Output = ();
1101
1102 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1103 self.poll_notified(waker:Some(cx.waker()))
1104 }
1105}
1106
1107impl Drop for Notified<'_> {
1108 fn drop(&mut self) {
1109 use State::*;
1110
1111 // Safety: The type only transitions to a "Waiting" state when pinned.
1112 let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() };
1113
1114 // This is where we ensure safety. The `Notified` value is being
1115 // dropped, which means we must ensure that the waiter entry is no
1116 // longer stored in the linked list.
1117 if matches!(*state, Waiting) {
1118 let mut waiters = notify.waiters.lock();
1119 let mut notify_state = notify.state.load(SeqCst);
1120
1121 // We hold the lock, so this field is not concurrently accessed by
1122 // `notify_*` functions and we can use the relaxed ordering.
1123 let notification = waiter.notification.load(Relaxed);
1124
1125 // remove the entry from the list (if not already removed)
1126 //
1127 // Safety: we hold the lock, so we have an exclusive access to every list the
1128 // waiter may be contained in. If the node is not contained in the `waiters`
1129 // list, then it is contained by a guarded list used by `notify_waiters`.
1130 unsafe { waiters.remove(NonNull::from(waiter)) };
1131
1132 if waiters.is_empty() && get_state(notify_state) == WAITING {
1133 notify_state = set_state(notify_state, EMPTY);
1134 notify.state.store(notify_state, SeqCst);
1135 }
1136
1137 // See if the node was notified but not received. In this case, if
1138 // the notification was triggered via `notify_one`, it must be sent
1139 // to the next waiter.
1140 if notification == Some(Notification::One) {
1141 if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
1142 drop(waiters);
1143 waker.wake();
1144 }
1145 }
1146 }
1147 }
1148}
1149
1150/// # Safety
1151///
1152/// `Waiter` is forced to be !Unpin.
1153unsafe impl linked_list::Link for Waiter {
1154 type Handle = NonNull<Waiter>;
1155 type Target = Waiter;
1156
1157 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1158 *handle
1159 }
1160
1161 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1162 ptr
1163 }
1164
1165 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1166 Waiter::addr_of_pointers(me:target)
1167 }
1168}
1169
1170fn is_unpin<T: Unpin>() {}
1171