1 | // Allow `unreachable_pub` warnings when sync is not enabled |
2 | // due to the usage of `Notify` within the `rt` feature set. |
3 | // When this module is compiled with `sync` enabled we will warn on |
4 | // this lint. When `rt` is enabled we use `pub(crate)` which |
5 | // triggers this warning but it is safe to ignore in this case. |
6 | #![cfg_attr (not(feature = "sync" ), allow(unreachable_pub, dead_code))] |
7 | |
8 | use crate::loom::cell::UnsafeCell; |
9 | use crate::loom::sync::atomic::AtomicUsize; |
10 | use crate::loom::sync::Mutex; |
11 | use crate::util::linked_list::{self, GuardedLinkedList, LinkedList}; |
12 | use crate::util::WakeList; |
13 | |
14 | use std::future::Future; |
15 | use std::marker::PhantomPinned; |
16 | use std::panic::{RefUnwindSafe, UnwindSafe}; |
17 | use std::pin::Pin; |
18 | use std::ptr::NonNull; |
19 | use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst}; |
20 | use std::task::{Context, Poll, Waker}; |
21 | |
22 | type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; |
23 | type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>; |
24 | |
25 | /// Notifies a single task to wake up. |
26 | /// |
27 | /// `Notify` provides a basic mechanism to notify a single task of an event. |
28 | /// `Notify` itself does not carry any data. Instead, it is to be used to signal |
29 | /// another task to perform an operation. |
30 | /// |
31 | /// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The |
32 | /// [`notified().await`] method waits for a permit to become available, and |
33 | /// [`notify_one()`] sets a permit **if there currently are no available |
34 | /// permits**. |
35 | /// |
36 | /// The synchronization details of `Notify` are similar to |
37 | /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`] |
38 | /// value contains a single permit. [`notified().await`] waits for the permit to |
39 | /// be made available, consumes the permit, and resumes. [`notify_one()`] sets |
40 | /// the permit, waking a pending task if there is one. |
41 | /// |
42 | /// If `notify_one()` is called **before** `notified().await`, then the next |
43 | /// call to `notified().await` will complete immediately, consuming the permit. |
44 | /// Any subsequent calls to `notified().await` will wait for a new permit. |
45 | /// |
46 | /// If `notify_one()` is called **multiple** times before `notified().await`, |
47 | /// only a **single** permit is stored. The next call to `notified().await` will |
48 | /// complete immediately, but the one after will wait for a new permit. |
49 | /// |
50 | /// # Examples |
51 | /// |
52 | /// Basic usage. |
53 | /// |
54 | /// ``` |
55 | /// use tokio::sync::Notify; |
56 | /// use std::sync::Arc; |
57 | /// |
58 | /// #[tokio::main] |
59 | /// async fn main() { |
60 | /// let notify = Arc::new(Notify::new()); |
61 | /// let notify2 = notify.clone(); |
62 | /// |
63 | /// let handle = tokio::spawn(async move { |
64 | /// notify2.notified().await; |
65 | /// println!("received notification" ); |
66 | /// }); |
67 | /// |
68 | /// println!("sending notification" ); |
69 | /// notify.notify_one(); |
70 | /// |
71 | /// // Wait for task to receive notification. |
72 | /// handle.await.unwrap(); |
73 | /// } |
74 | /// ``` |
75 | /// |
76 | /// Unbound multi-producer single-consumer (mpsc) channel. |
77 | /// |
78 | /// No wakeups can be lost when using this channel because the call to |
79 | /// `notify_one()` will store a permit in the `Notify`, which the following call |
80 | /// to `notified()` will consume. |
81 | /// |
82 | /// ``` |
83 | /// use tokio::sync::Notify; |
84 | /// |
85 | /// use std::collections::VecDeque; |
86 | /// use std::sync::Mutex; |
87 | /// |
88 | /// struct Channel<T> { |
89 | /// values: Mutex<VecDeque<T>>, |
90 | /// notify: Notify, |
91 | /// } |
92 | /// |
93 | /// impl<T> Channel<T> { |
94 | /// pub fn send(&self, value: T) { |
95 | /// self.values.lock().unwrap() |
96 | /// .push_back(value); |
97 | /// |
98 | /// // Notify the consumer a value is available |
99 | /// self.notify.notify_one(); |
100 | /// } |
101 | /// |
102 | /// // This is a single-consumer channel, so several concurrent calls to |
103 | /// // `recv` are not allowed. |
104 | /// pub async fn recv(&self) -> T { |
105 | /// loop { |
106 | /// // Drain values |
107 | /// if let Some(value) = self.values.lock().unwrap().pop_front() { |
108 | /// return value; |
109 | /// } |
110 | /// |
111 | /// // Wait for values to be available |
112 | /// self.notify.notified().await; |
113 | /// } |
114 | /// } |
115 | /// } |
116 | /// ``` |
117 | /// |
118 | /// Unbound multi-producer multi-consumer (mpmc) channel. |
119 | /// |
120 | /// The call to [`enable`] is important because otherwise if you have two |
121 | /// calls to `recv` and two calls to `send` in parallel, the following could |
122 | /// happen: |
123 | /// |
124 | /// 1. Both calls to `try_recv` return `None`. |
125 | /// 2. Both new elements are added to the vector. |
126 | /// 3. The `notify_one` method is called twice, adding only a single |
127 | /// permit to the `Notify`. |
128 | /// 4. Both calls to `recv` reach the `Notified` future. One of them |
129 | /// consumes the permit, and the other sleeps forever. |
130 | /// |
131 | /// By adding the `Notified` futures to the list by calling `enable` before |
132 | /// `try_recv`, the `notify_one` calls in step three would remove the |
133 | /// futures from the list and mark them notified instead of adding a permit |
134 | /// to the `Notify`. This ensures that both futures are woken. |
135 | /// |
136 | /// Notice that this failure can only happen if there are two concurrent calls |
137 | /// to `recv`. This is why the mpsc example above does not require a call to |
138 | /// `enable`. |
139 | /// |
140 | /// ``` |
141 | /// use tokio::sync::Notify; |
142 | /// |
143 | /// use std::collections::VecDeque; |
144 | /// use std::sync::Mutex; |
145 | /// |
146 | /// struct Channel<T> { |
147 | /// messages: Mutex<VecDeque<T>>, |
148 | /// notify_on_sent: Notify, |
149 | /// } |
150 | /// |
151 | /// impl<T> Channel<T> { |
152 | /// pub fn send(&self, msg: T) { |
153 | /// let mut locked_queue = self.messages.lock().unwrap(); |
154 | /// locked_queue.push_back(msg); |
155 | /// drop(locked_queue); |
156 | /// |
157 | /// // Send a notification to one of the calls currently |
158 | /// // waiting in a call to `recv`. |
159 | /// self.notify_on_sent.notify_one(); |
160 | /// } |
161 | /// |
162 | /// pub fn try_recv(&self) -> Option<T> { |
163 | /// let mut locked_queue = self.messages.lock().unwrap(); |
164 | /// locked_queue.pop_front() |
165 | /// } |
166 | /// |
167 | /// pub async fn recv(&self) -> T { |
168 | /// let future = self.notify_on_sent.notified(); |
169 | /// tokio::pin!(future); |
170 | /// |
171 | /// loop { |
172 | /// // Make sure that no wakeup is lost if we get |
173 | /// // `None` from `try_recv`. |
174 | /// future.as_mut().enable(); |
175 | /// |
176 | /// if let Some(msg) = self.try_recv() { |
177 | /// return msg; |
178 | /// } |
179 | /// |
180 | /// // Wait for a call to `notify_one`. |
181 | /// // |
182 | /// // This uses `.as_mut()` to avoid consuming the future, |
183 | /// // which lets us call `Pin::set` below. |
184 | /// future.as_mut().await; |
185 | /// |
186 | /// // Reset the future in case another call to |
187 | /// // `try_recv` got the message before us. |
188 | /// future.set(self.notify_on_sent.notified()); |
189 | /// } |
190 | /// } |
191 | /// } |
192 | /// ``` |
193 | /// |
194 | /// [park]: std::thread::park |
195 | /// [unpark]: std::thread::Thread::unpark |
196 | /// [`notified().await`]: Notify::notified() |
197 | /// [`notify_one()`]: Notify::notify_one() |
198 | /// [`enable`]: Notified::enable() |
199 | /// [`Semaphore`]: crate::sync::Semaphore |
200 | #[derive (Debug)] |
201 | pub struct Notify { |
202 | // `state` uses 2 bits to store one of `EMPTY`, |
203 | // `WAITING` or `NOTIFIED`. The rest of the bits |
204 | // are used to store the number of times `notify_waiters` |
205 | // was called. |
206 | // |
207 | // Throughout the code there are two assumptions: |
208 | // - state can be transitioned *from* `WAITING` only if |
209 | // `waiters` lock is held |
210 | // - number of times `notify_waiters` was called can |
211 | // be modified only if `waiters` lock is held |
212 | state: AtomicUsize, |
213 | waiters: Mutex<WaitList>, |
214 | } |
215 | |
216 | #[derive (Debug)] |
217 | struct Waiter { |
218 | /// Intrusive linked-list pointers. |
219 | pointers: linked_list::Pointers<Waiter>, |
220 | |
221 | /// Waiting task's waker. Depending on the value of `notification`, |
222 | /// this field is either protected by the `waiters` lock in |
223 | /// `Notify`, or it is exclusively owned by the enclosing `Waiter`. |
224 | waker: UnsafeCell<Option<Waker>>, |
225 | |
226 | /// Notification for this waiter. Uses 2 bits to store if and how was |
227 | /// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and |
228 | /// the rest of it is unused. |
229 | /// * if it's `None`, then `waker` is protected by the `waiters` lock. |
230 | /// * if it's `Some`, then `waker` is exclusively owned by the |
231 | /// enclosing `Waiter` and can be accessed without locking. |
232 | notification: AtomicNotification, |
233 | |
234 | /// Should not be `Unpin`. |
235 | _p: PhantomPinned, |
236 | } |
237 | |
238 | impl Waiter { |
239 | fn new() -> Waiter { |
240 | Waiter { |
241 | pointers: linked_list::Pointers::new(), |
242 | waker: UnsafeCell::new(data:None), |
243 | notification: AtomicNotification::none(), |
244 | _p: PhantomPinned, |
245 | } |
246 | } |
247 | } |
248 | |
249 | generate_addr_of_methods! { |
250 | impl<> Waiter { |
251 | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { |
252 | &self.pointers |
253 | } |
254 | } |
255 | } |
256 | |
257 | // No notification. |
258 | const NOTIFICATION_NONE: usize = 0b000; |
259 | |
260 | // Notification type used by `notify_one`. |
261 | const NOTIFICATION_ONE: usize = 0b001; |
262 | |
263 | // Notification type used by `notify_last`. |
264 | const NOTIFICATION_LAST: usize = 0b101; |
265 | |
266 | // Notification type used by `notify_waiters`. |
267 | const NOTIFICATION_ALL: usize = 0b010; |
268 | |
269 | /// Notification for a `Waiter`. |
270 | /// This struct is equivalent to `Option<Notification>`, but uses |
271 | /// `AtomicUsize` inside for atomic operations. |
272 | #[derive (Debug)] |
273 | struct AtomicNotification(AtomicUsize); |
274 | |
275 | impl AtomicNotification { |
276 | fn none() -> Self { |
277 | AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE)) |
278 | } |
279 | |
280 | /// Store-release a notification. |
281 | /// This method should be called exactly once. |
282 | fn store_release(&self, notification: Notification) { |
283 | let data: usize = match notification { |
284 | Notification::All => NOTIFICATION_ALL, |
285 | Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE, |
286 | Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST, |
287 | }; |
288 | self.0.store(data, Release); |
289 | } |
290 | |
291 | fn load(&self, ordering: Ordering) -> Option<Notification> { |
292 | let data = self.0.load(ordering); |
293 | match data { |
294 | NOTIFICATION_NONE => None, |
295 | NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)), |
296 | NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)), |
297 | NOTIFICATION_ALL => Some(Notification::All), |
298 | _ => unreachable!(), |
299 | } |
300 | } |
301 | |
302 | /// Clears the notification. |
303 | /// This method is used by a `Notified` future to consume the |
304 | /// notification. It uses relaxed ordering and should be only |
305 | /// used once the atomic notification is no longer shared. |
306 | fn clear(&self) { |
307 | self.0.store(NOTIFICATION_NONE, Relaxed); |
308 | } |
309 | } |
310 | |
311 | #[derive (Debug, PartialEq, Eq)] |
312 | #[repr (usize)] |
313 | enum NotifyOneStrategy { |
314 | Fifo, |
315 | Lifo, |
316 | } |
317 | |
318 | #[derive (Debug, PartialEq, Eq)] |
319 | #[repr (usize)] |
320 | enum Notification { |
321 | One(NotifyOneStrategy), |
322 | All, |
323 | } |
324 | |
325 | /// List used in `Notify::notify_waiters`. It wraps a guarded linked list |
326 | /// and gates the access to it on `notify.waiters` mutex. It also empties |
327 | /// the list on drop. |
328 | struct NotifyWaitersList<'a> { |
329 | list: GuardedWaitList, |
330 | is_empty: bool, |
331 | notify: &'a Notify, |
332 | } |
333 | |
334 | impl<'a> NotifyWaitersList<'a> { |
335 | fn new( |
336 | unguarded_list: WaitList, |
337 | guard: Pin<&'a Waiter>, |
338 | notify: &'a Notify, |
339 | ) -> NotifyWaitersList<'a> { |
340 | let guard_ptr = NonNull::from(guard.get_ref()); |
341 | let list = unguarded_list.into_guarded(guard_ptr); |
342 | NotifyWaitersList { |
343 | list, |
344 | is_empty: false, |
345 | notify, |
346 | } |
347 | } |
348 | |
349 | /// Removes the last element from the guarded list. Modifying this list |
350 | /// requires an exclusive access to the main list in `Notify`. |
351 | fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> { |
352 | let result = self.list.pop_back(); |
353 | if result.is_none() { |
354 | // Save information about emptiness to avoid waiting for lock |
355 | // in the destructor. |
356 | self.is_empty = true; |
357 | } |
358 | result |
359 | } |
360 | } |
361 | |
362 | impl Drop for NotifyWaitersList<'_> { |
363 | fn drop(&mut self) { |
364 | // If the list is not empty, we unlink all waiters from it. |
365 | // We do not wake the waiters to avoid double panics. |
366 | if !self.is_empty { |
367 | let _lock_guard: MutexGuard<'_, LinkedList<…, …>> = self.notify.waiters.lock(); |
368 | while let Some(waiter: NonNull) = self.list.pop_back() { |
369 | // Safety: we never make mutable references to waiters. |
370 | let waiter: &Waiter = unsafe { waiter.as_ref() }; |
371 | waiter.notification.store_release(Notification::All); |
372 | } |
373 | } |
374 | } |
375 | } |
376 | |
377 | /// Future returned from [`Notify::notified()`]. |
378 | /// |
379 | /// This future is fused, so once it has completed, any future calls to poll |
380 | /// will immediately return `Poll::Ready`. |
381 | #[derive (Debug)] |
382 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
383 | pub struct Notified<'a> { |
384 | /// The `Notify` being received on. |
385 | notify: &'a Notify, |
386 | |
387 | /// The current state of the receiving process. |
388 | state: State, |
389 | |
390 | /// Number of calls to `notify_waiters` at the time of creation. |
391 | notify_waiters_calls: usize, |
392 | |
393 | /// Entry in the waiter `LinkedList`. |
394 | waiter: Waiter, |
395 | } |
396 | |
397 | unsafe impl<'a> Send for Notified<'a> {} |
398 | unsafe impl<'a> Sync for Notified<'a> {} |
399 | |
400 | #[derive (Debug)] |
401 | enum State { |
402 | Init, |
403 | Waiting, |
404 | Done, |
405 | } |
406 | |
407 | const NOTIFY_WAITERS_SHIFT: usize = 2; |
408 | const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1; |
409 | const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK; |
410 | |
411 | /// Initial "idle" state. |
412 | const EMPTY: usize = 0; |
413 | |
414 | /// One or more threads are currently waiting to be notified. |
415 | const WAITING: usize = 1; |
416 | |
417 | /// Pending notification. |
418 | const NOTIFIED: usize = 2; |
419 | |
420 | fn set_state(data: usize, state: usize) -> usize { |
421 | (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK) |
422 | } |
423 | |
424 | fn get_state(data: usize) -> usize { |
425 | data & STATE_MASK |
426 | } |
427 | |
428 | fn get_num_notify_waiters_calls(data: usize) -> usize { |
429 | (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT |
430 | } |
431 | |
432 | fn inc_num_notify_waiters_calls(data: usize) -> usize { |
433 | data + (1 << NOTIFY_WAITERS_SHIFT) |
434 | } |
435 | |
436 | fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) { |
437 | data.fetch_add(val:1 << NOTIFY_WAITERS_SHIFT, order:SeqCst); |
438 | } |
439 | |
440 | impl Notify { |
441 | /// Create a new `Notify`, initialized without a permit. |
442 | /// |
443 | /// # Examples |
444 | /// |
445 | /// ``` |
446 | /// use tokio::sync::Notify; |
447 | /// |
448 | /// let notify = Notify::new(); |
449 | /// ``` |
450 | pub fn new() -> Notify { |
451 | Notify { |
452 | state: AtomicUsize::new(0), |
453 | waiters: Mutex::new(LinkedList::new()), |
454 | } |
455 | } |
456 | |
457 | /// Create a new `Notify`, initialized without a permit. |
458 | /// |
459 | /// When using the `tracing` [unstable feature], a `Notify` created with |
460 | /// `const_new` will not be instrumented. As such, it will not be visible |
461 | /// in [`tokio-console`]. Instead, [`Notify::new`] should be used to create |
462 | /// an instrumented object if that is needed. |
463 | /// |
464 | /// # Examples |
465 | /// |
466 | /// ``` |
467 | /// use tokio::sync::Notify; |
468 | /// |
469 | /// static NOTIFY: Notify = Notify::const_new(); |
470 | /// ``` |
471 | /// |
472 | /// [`tokio-console`]: https://github.com/tokio-rs/console |
473 | /// [unstable feature]: crate#unstable-features |
474 | #[cfg (not(all(loom, test)))] |
475 | pub const fn const_new() -> Notify { |
476 | Notify { |
477 | state: AtomicUsize::new(0), |
478 | waiters: Mutex::const_new(LinkedList::new()), |
479 | } |
480 | } |
481 | |
482 | /// Wait for a notification. |
483 | /// |
484 | /// Equivalent to: |
485 | /// |
486 | /// ```ignore |
487 | /// async fn notified(&self); |
488 | /// ``` |
489 | /// |
490 | /// Each `Notify` value holds a single permit. If a permit is available from |
491 | /// an earlier call to [`notify_one()`], then `notified().await` will complete |
492 | /// immediately, consuming that permit. Otherwise, `notified().await` waits |
493 | /// for a permit to be made available by the next call to `notify_one()`. |
494 | /// |
495 | /// The `Notified` future is not guaranteed to receive wakeups from calls to |
496 | /// `notify_one()` if it has not yet been polled. See the documentation for |
497 | /// [`Notified::enable()`] for more details. |
498 | /// |
499 | /// The `Notified` future is guaranteed to receive wakeups from |
500 | /// `notify_waiters()` as soon as it has been created, even if it has not |
501 | /// yet been polled. |
502 | /// |
503 | /// [`notify_one()`]: Notify::notify_one |
504 | /// [`Notified::enable()`]: Notified::enable |
505 | /// |
506 | /// # Cancel safety |
507 | /// |
508 | /// This method uses a queue to fairly distribute notifications in the order |
509 | /// they were requested. Cancelling a call to `notified` makes you lose your |
510 | /// place in the queue. |
511 | /// |
512 | /// # Examples |
513 | /// |
514 | /// ``` |
515 | /// use tokio::sync::Notify; |
516 | /// use std::sync::Arc; |
517 | /// |
518 | /// #[tokio::main] |
519 | /// async fn main() { |
520 | /// let notify = Arc::new(Notify::new()); |
521 | /// let notify2 = notify.clone(); |
522 | /// |
523 | /// tokio::spawn(async move { |
524 | /// notify2.notified().await; |
525 | /// println!("received notification" ); |
526 | /// }); |
527 | /// |
528 | /// println!("sending notification" ); |
529 | /// notify.notify_one(); |
530 | /// } |
531 | /// ``` |
532 | pub fn notified(&self) -> Notified<'_> { |
533 | // we load the number of times notify_waiters |
534 | // was called and store that in the future. |
535 | let state = self.state.load(SeqCst); |
536 | Notified { |
537 | notify: self, |
538 | state: State::Init, |
539 | notify_waiters_calls: get_num_notify_waiters_calls(state), |
540 | waiter: Waiter::new(), |
541 | } |
542 | } |
543 | |
544 | /// Notifies the first waiting task. |
545 | /// |
546 | /// If a task is currently waiting, that task is notified. Otherwise, a |
547 | /// permit is stored in this `Notify` value and the **next** call to |
548 | /// [`notified().await`] will complete immediately consuming the permit made |
549 | /// available by this call to `notify_one()`. |
550 | /// |
551 | /// At most one permit may be stored by `Notify`. Many sequential calls to |
552 | /// `notify_one` will result in a single permit being stored. The next call to |
553 | /// `notified().await` will complete immediately, but the one after that |
554 | /// will wait. |
555 | /// |
556 | /// [`notified().await`]: Notify::notified() |
557 | /// |
558 | /// # Examples |
559 | /// |
560 | /// ``` |
561 | /// use tokio::sync::Notify; |
562 | /// use std::sync::Arc; |
563 | /// |
564 | /// #[tokio::main] |
565 | /// async fn main() { |
566 | /// let notify = Arc::new(Notify::new()); |
567 | /// let notify2 = notify.clone(); |
568 | /// |
569 | /// tokio::spawn(async move { |
570 | /// notify2.notified().await; |
571 | /// println!("received notification" ); |
572 | /// }); |
573 | /// |
574 | /// println!("sending notification" ); |
575 | /// notify.notify_one(); |
576 | /// } |
577 | /// ``` |
578 | // Alias for old name in 0.x |
579 | #[cfg_attr (docsrs, doc(alias = "notify" ))] |
580 | pub fn notify_one(&self) { |
581 | self.notify_with_strategy(NotifyOneStrategy::Fifo); |
582 | } |
583 | |
584 | /// Notifies the last waiting task. |
585 | /// |
586 | /// This function behaves similar to `notify_one`. The only difference is that it wakes |
587 | /// the most recently added waiter instead of the oldest waiter. |
588 | /// |
589 | /// Check the [`notify_one()`] documentation for more info and |
590 | /// examples. |
591 | /// |
592 | /// [`notify_one()`]: Notify::notify_one |
593 | pub fn notify_last(&self) { |
594 | self.notify_with_strategy(NotifyOneStrategy::Lifo); |
595 | } |
596 | |
597 | fn notify_with_strategy(&self, strategy: NotifyOneStrategy) { |
598 | // Load the current state |
599 | let mut curr = self.state.load(SeqCst); |
600 | |
601 | // If the state is `EMPTY`, transition to `NOTIFIED` and return. |
602 | while let EMPTY | NOTIFIED = get_state(curr) { |
603 | // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A |
604 | // happens-before synchronization must happen between this atomic |
605 | // operation and a task calling `notified().await`. |
606 | let new = set_state(curr, NOTIFIED); |
607 | let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst); |
608 | |
609 | match res { |
610 | // No waiters, no further work to do |
611 | Ok(_) => return, |
612 | Err(actual) => { |
613 | curr = actual; |
614 | } |
615 | } |
616 | } |
617 | |
618 | // There are waiters, the lock must be acquired to notify. |
619 | let mut waiters = self.waiters.lock(); |
620 | |
621 | // The state must be reloaded while the lock is held. The state may only |
622 | // transition out of WAITING while the lock is held. |
623 | curr = self.state.load(SeqCst); |
624 | |
625 | if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) { |
626 | drop(waiters); |
627 | waker.wake(); |
628 | } |
629 | } |
630 | |
631 | /// Notifies all waiting tasks. |
632 | /// |
633 | /// If a task is currently waiting, that task is notified. Unlike with |
634 | /// `notify_one()`, no permit is stored to be used by the next call to |
635 | /// `notified().await`. The purpose of this method is to notify all |
636 | /// already registered waiters. Registering for notification is done by |
637 | /// acquiring an instance of the `Notified` future via calling `notified()`. |
638 | /// |
639 | /// # Examples |
640 | /// |
641 | /// ``` |
642 | /// use tokio::sync::Notify; |
643 | /// use std::sync::Arc; |
644 | /// |
645 | /// #[tokio::main] |
646 | /// async fn main() { |
647 | /// let notify = Arc::new(Notify::new()); |
648 | /// let notify2 = notify.clone(); |
649 | /// |
650 | /// let notified1 = notify.notified(); |
651 | /// let notified2 = notify.notified(); |
652 | /// |
653 | /// let handle = tokio::spawn(async move { |
654 | /// println!("sending notifications" ); |
655 | /// notify2.notify_waiters(); |
656 | /// }); |
657 | /// |
658 | /// notified1.await; |
659 | /// notified2.await; |
660 | /// println!("received notifications" ); |
661 | /// } |
662 | /// ``` |
663 | pub fn notify_waiters(&self) { |
664 | let mut waiters = self.waiters.lock(); |
665 | |
666 | // The state must be loaded while the lock is held. The state may only |
667 | // transition out of WAITING while the lock is held. |
668 | let curr = self.state.load(SeqCst); |
669 | |
670 | if matches!(get_state(curr), EMPTY | NOTIFIED) { |
671 | // There are no waiting tasks. All we need to do is increment the |
672 | // number of times this method was called. |
673 | atomic_inc_num_notify_waiters_calls(&self.state); |
674 | return; |
675 | } |
676 | |
677 | // Increment the number of times this method was called |
678 | // and transition to empty. |
679 | let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY); |
680 | self.state.store(new_state, SeqCst); |
681 | |
682 | // It is critical for `GuardedLinkedList` safety that the guard node is |
683 | // pinned in memory and is not dropped until the guarded list is dropped. |
684 | let guard = Waiter::new(); |
685 | pin!(guard); |
686 | |
687 | // We move all waiters to a secondary list. It uses a `GuardedLinkedList` |
688 | // underneath to allow every waiter to safely remove itself from it. |
689 | // |
690 | // * This list will be still guarded by the `waiters` lock. |
691 | // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it. |
692 | // * This wrapper will empty the list on drop. It is critical for safety |
693 | // that we will not leave any list entry with a pointer to the local |
694 | // guard node after this function returns / panics. |
695 | let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self); |
696 | |
697 | let mut wakers = WakeList::new(); |
698 | 'outer: loop { |
699 | while wakers.can_push() { |
700 | match list.pop_back_locked(&mut waiters) { |
701 | Some(waiter) => { |
702 | // Safety: we never make mutable references to waiters. |
703 | let waiter = unsafe { waiter.as_ref() }; |
704 | |
705 | // Safety: we hold the lock, so we can access the waker. |
706 | if let Some(waker) = |
707 | unsafe { waiter.waker.with_mut(|waker| (*waker).take()) } |
708 | { |
709 | wakers.push(waker); |
710 | } |
711 | |
712 | // This waiter is unlinked and will not be shared ever again, release it. |
713 | waiter.notification.store_release(Notification::All); |
714 | } |
715 | None => { |
716 | break 'outer; |
717 | } |
718 | } |
719 | } |
720 | |
721 | // Release the lock before notifying. |
722 | drop(waiters); |
723 | |
724 | // One of the wakers may panic, but the remaining waiters will still |
725 | // be unlinked from the list in `NotifyWaitersList` destructor. |
726 | wakers.wake_all(); |
727 | |
728 | // Acquire the lock again. |
729 | waiters = self.waiters.lock(); |
730 | } |
731 | |
732 | // Release the lock before notifying |
733 | drop(waiters); |
734 | |
735 | wakers.wake_all(); |
736 | } |
737 | } |
738 | |
739 | impl Default for Notify { |
740 | fn default() -> Notify { |
741 | Notify::new() |
742 | } |
743 | } |
744 | |
745 | impl UnwindSafe for Notify {} |
746 | impl RefUnwindSafe for Notify {} |
747 | |
748 | fn notify_locked( |
749 | waiters: &mut WaitList, |
750 | state: &AtomicUsize, |
751 | curr: usize, |
752 | strategy: NotifyOneStrategy, |
753 | ) -> Option<Waker> { |
754 | match get_state(curr) { |
755 | EMPTY | NOTIFIED => { |
756 | let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst); |
757 | |
758 | match res { |
759 | Ok(_) => None, |
760 | Err(actual) => { |
761 | let actual_state = get_state(actual); |
762 | assert!(actual_state == EMPTY || actual_state == NOTIFIED); |
763 | state.store(set_state(actual, NOTIFIED), SeqCst); |
764 | None |
765 | } |
766 | } |
767 | } |
768 | WAITING => { |
769 | // At this point, it is guaranteed that the state will not |
770 | // concurrently change as holding the lock is required to |
771 | // transition **out** of `WAITING`. |
772 | // |
773 | // Get a pending waiter using one of the available dequeue strategies. |
774 | let waiter = match strategy { |
775 | NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(), |
776 | NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(), |
777 | }; |
778 | |
779 | // Safety: we never make mutable references to waiters. |
780 | let waiter = unsafe { waiter.as_ref() }; |
781 | |
782 | // Safety: we hold the lock, so we can access the waker. |
783 | let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; |
784 | |
785 | // This waiter is unlinked and will not be shared ever again, release it. |
786 | waiter |
787 | .notification |
788 | .store_release(Notification::One(strategy)); |
789 | |
790 | if waiters.is_empty() { |
791 | // As this the **final** waiter in the list, the state |
792 | // must be transitioned to `EMPTY`. As transitioning |
793 | // **from** `WAITING` requires the lock to be held, a |
794 | // `store` is sufficient. |
795 | state.store(set_state(curr, EMPTY), SeqCst); |
796 | } |
797 | waker |
798 | } |
799 | _ => unreachable!(), |
800 | } |
801 | } |
802 | |
803 | // ===== impl Notified ===== |
804 | |
805 | impl Notified<'_> { |
806 | /// Adds this future to the list of futures that are ready to receive |
807 | /// wakeups from calls to [`notify_one`]. |
808 | /// |
809 | /// Polling the future also adds it to the list, so this method should only |
810 | /// be used if you want to add the future to the list before the first call |
811 | /// to `poll`. (In fact, this method is equivalent to calling `poll` except |
812 | /// that no `Waker` is registered.) |
813 | /// |
814 | /// This has no effect on notifications sent using [`notify_waiters`], which |
815 | /// are received as long as they happen after the creation of the `Notified` |
816 | /// regardless of whether `enable` or `poll` has been called. |
817 | /// |
818 | /// This method returns true if the `Notified` is ready. This happens in the |
819 | /// following situations: |
820 | /// |
821 | /// 1. The `notify_waiters` method was called between the creation of the |
822 | /// `Notified` and the call to this method. |
823 | /// 2. This is the first call to `enable` or `poll` on this future, and the |
824 | /// `Notify` was holding a permit from a previous call to `notify_one`. |
825 | /// The call consumes the permit in that case. |
826 | /// 3. The future has previously been enabled or polled, and it has since |
827 | /// then been marked ready by either consuming a permit from the |
828 | /// `Notify`, or by a call to `notify_one` or `notify_waiters` that |
829 | /// removed it from the list of futures ready to receive wakeups. |
830 | /// |
831 | /// If this method returns true, any future calls to poll on the same future |
832 | /// will immediately return `Poll::Ready`. |
833 | /// |
834 | /// # Examples |
835 | /// |
836 | /// Unbound multi-producer multi-consumer (mpmc) channel. |
837 | /// |
838 | /// The call to `enable` is important because otherwise if you have two |
839 | /// calls to `recv` and two calls to `send` in parallel, the following could |
840 | /// happen: |
841 | /// |
842 | /// 1. Both calls to `try_recv` return `None`. |
843 | /// 2. Both new elements are added to the vector. |
844 | /// 3. The `notify_one` method is called twice, adding only a single |
845 | /// permit to the `Notify`. |
846 | /// 4. Both calls to `recv` reach the `Notified` future. One of them |
847 | /// consumes the permit, and the other sleeps forever. |
848 | /// |
849 | /// By adding the `Notified` futures to the list by calling `enable` before |
850 | /// `try_recv`, the `notify_one` calls in step three would remove the |
851 | /// futures from the list and mark them notified instead of adding a permit |
852 | /// to the `Notify`. This ensures that both futures are woken. |
853 | /// |
854 | /// ``` |
855 | /// use tokio::sync::Notify; |
856 | /// |
857 | /// use std::collections::VecDeque; |
858 | /// use std::sync::Mutex; |
859 | /// |
860 | /// struct Channel<T> { |
861 | /// messages: Mutex<VecDeque<T>>, |
862 | /// notify_on_sent: Notify, |
863 | /// } |
864 | /// |
865 | /// impl<T> Channel<T> { |
866 | /// pub fn send(&self, msg: T) { |
867 | /// let mut locked_queue = self.messages.lock().unwrap(); |
868 | /// locked_queue.push_back(msg); |
869 | /// drop(locked_queue); |
870 | /// |
871 | /// // Send a notification to one of the calls currently |
872 | /// // waiting in a call to `recv`. |
873 | /// self.notify_on_sent.notify_one(); |
874 | /// } |
875 | /// |
876 | /// pub fn try_recv(&self) -> Option<T> { |
877 | /// let mut locked_queue = self.messages.lock().unwrap(); |
878 | /// locked_queue.pop_front() |
879 | /// } |
880 | /// |
881 | /// pub async fn recv(&self) -> T { |
882 | /// let future = self.notify_on_sent.notified(); |
883 | /// tokio::pin!(future); |
884 | /// |
885 | /// loop { |
886 | /// // Make sure that no wakeup is lost if we get |
887 | /// // `None` from `try_recv`. |
888 | /// future.as_mut().enable(); |
889 | /// |
890 | /// if let Some(msg) = self.try_recv() { |
891 | /// return msg; |
892 | /// } |
893 | /// |
894 | /// // Wait for a call to `notify_one`. |
895 | /// // |
896 | /// // This uses `.as_mut()` to avoid consuming the future, |
897 | /// // which lets us call `Pin::set` below. |
898 | /// future.as_mut().await; |
899 | /// |
900 | /// // Reset the future in case another call to |
901 | /// // `try_recv` got the message before us. |
902 | /// future.set(self.notify_on_sent.notified()); |
903 | /// } |
904 | /// } |
905 | /// } |
906 | /// ``` |
907 | /// |
908 | /// [`notify_one`]: Notify::notify_one() |
909 | /// [`notify_waiters`]: Notify::notify_waiters() |
910 | pub fn enable(self: Pin<&mut Self>) -> bool { |
911 | self.poll_notified(None).is_ready() |
912 | } |
913 | |
914 | /// A custom `project` implementation is used in place of `pin-project-lite` |
915 | /// as a custom drop implementation is needed. |
916 | fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) { |
917 | unsafe { |
918 | // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`. |
919 | |
920 | is_unpin::<&Notify>(); |
921 | is_unpin::<State>(); |
922 | is_unpin::<usize>(); |
923 | |
924 | let me = self.get_unchecked_mut(); |
925 | ( |
926 | me.notify, |
927 | &mut me.state, |
928 | &me.notify_waiters_calls, |
929 | &me.waiter, |
930 | ) |
931 | } |
932 | } |
933 | |
934 | fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> { |
935 | let (notify, state, notify_waiters_calls, waiter) = self.project(); |
936 | |
937 | 'outer_loop: loop { |
938 | match *state { |
939 | State::Init => { |
940 | let curr = notify.state.load(SeqCst); |
941 | |
942 | // Optimistically try acquiring a pending notification |
943 | let res = notify.state.compare_exchange( |
944 | set_state(curr, NOTIFIED), |
945 | set_state(curr, EMPTY), |
946 | SeqCst, |
947 | SeqCst, |
948 | ); |
949 | |
950 | if res.is_ok() { |
951 | // Acquired the notification |
952 | *state = State::Done; |
953 | continue 'outer_loop; |
954 | } |
955 | |
956 | // Clone the waker before locking, a waker clone can be |
957 | // triggering arbitrary code. |
958 | let waker = waker.cloned(); |
959 | |
960 | // Acquire the lock and attempt to transition to the waiting |
961 | // state. |
962 | let mut waiters = notify.waiters.lock(); |
963 | |
964 | // Reload the state with the lock held |
965 | let mut curr = notify.state.load(SeqCst); |
966 | |
967 | // if notify_waiters has been called after the future |
968 | // was created, then we are done |
969 | if get_num_notify_waiters_calls(curr) != *notify_waiters_calls { |
970 | *state = State::Done; |
971 | continue 'outer_loop; |
972 | } |
973 | |
974 | // Transition the state to WAITING. |
975 | loop { |
976 | match get_state(curr) { |
977 | EMPTY => { |
978 | // Transition to WAITING |
979 | let res = notify.state.compare_exchange( |
980 | set_state(curr, EMPTY), |
981 | set_state(curr, WAITING), |
982 | SeqCst, |
983 | SeqCst, |
984 | ); |
985 | |
986 | if let Err(actual) = res { |
987 | assert_eq!(get_state(actual), NOTIFIED); |
988 | curr = actual; |
989 | } else { |
990 | break; |
991 | } |
992 | } |
993 | WAITING => break, |
994 | NOTIFIED => { |
995 | // Try consuming the notification |
996 | let res = notify.state.compare_exchange( |
997 | set_state(curr, NOTIFIED), |
998 | set_state(curr, EMPTY), |
999 | SeqCst, |
1000 | SeqCst, |
1001 | ); |
1002 | |
1003 | match res { |
1004 | Ok(_) => { |
1005 | // Acquired the notification |
1006 | *state = State::Done; |
1007 | continue 'outer_loop; |
1008 | } |
1009 | Err(actual) => { |
1010 | assert_eq!(get_state(actual), EMPTY); |
1011 | curr = actual; |
1012 | } |
1013 | } |
1014 | } |
1015 | _ => unreachable!(), |
1016 | } |
1017 | } |
1018 | |
1019 | let mut old_waker = None; |
1020 | if waker.is_some() { |
1021 | // Safety: called while locked. |
1022 | // |
1023 | // The use of `old_waiter` here is not necessary, as the field is always |
1024 | // None when we reach this line. |
1025 | unsafe { |
1026 | old_waker = |
1027 | waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker)); |
1028 | } |
1029 | } |
1030 | |
1031 | // Insert the waiter into the linked list |
1032 | waiters.push_front(NonNull::from(waiter)); |
1033 | |
1034 | *state = State::Waiting; |
1035 | |
1036 | drop(waiters); |
1037 | drop(old_waker); |
1038 | |
1039 | return Poll::Pending; |
1040 | } |
1041 | State::Waiting => { |
1042 | #[cfg (tokio_taskdump)] |
1043 | if let Some(waker) = waker { |
1044 | let mut ctx = Context::from_waker(waker); |
1045 | std::task::ready!(crate::trace::trace_leaf(&mut ctx)); |
1046 | } |
1047 | |
1048 | if waiter.notification.load(Acquire).is_some() { |
1049 | // Safety: waiter is already unlinked and will not be shared again, |
1050 | // so we have an exclusive access to `waker`. |
1051 | drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }); |
1052 | |
1053 | waiter.notification.clear(); |
1054 | *state = State::Done; |
1055 | return Poll::Ready(()); |
1056 | } |
1057 | |
1058 | // Our waiter was not notified, implying it is still stored in a waiter |
1059 | // list (guarded by `notify.waiters`). In order to access the waker |
1060 | // fields, we must acquire the lock. |
1061 | |
1062 | let mut old_waker = None; |
1063 | let mut waiters = notify.waiters.lock(); |
1064 | |
1065 | // We hold the lock and notifications are set only with the lock held, |
1066 | // so this can be relaxed, because the happens-before relationship is |
1067 | // established through the mutex. |
1068 | if waiter.notification.load(Relaxed).is_some() { |
1069 | // Safety: waiter is already unlinked and will not be shared again, |
1070 | // so we have an exclusive access to `waker`. |
1071 | old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; |
1072 | |
1073 | waiter.notification.clear(); |
1074 | |
1075 | // Drop the old waker after releasing the lock. |
1076 | drop(waiters); |
1077 | drop(old_waker); |
1078 | |
1079 | *state = State::Done; |
1080 | return Poll::Ready(()); |
1081 | } |
1082 | |
1083 | // Load the state with the lock held. |
1084 | let curr = notify.state.load(SeqCst); |
1085 | |
1086 | if get_num_notify_waiters_calls(curr) != *notify_waiters_calls { |
1087 | // Before we add a waiter to the list we check if these numbers are |
1088 | // different while holding the lock. If these numbers are different now, |
1089 | // it means that there is a call to `notify_waiters` in progress and this |
1090 | // waiter must be contained by a guarded list used in `notify_waiters`. |
1091 | // We can treat the waiter as notified and remove it from the list, as |
1092 | // it would have been notified in the `notify_waiters` call anyways. |
1093 | |
1094 | // Safety: we hold the lock, so we can modify the waker. |
1095 | old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; |
1096 | |
1097 | // Safety: we hold the lock, so we have an exclusive access to the list. |
1098 | // The list is used in `notify_waiters`, so it must be guarded. |
1099 | unsafe { waiters.remove(NonNull::from(waiter)) }; |
1100 | |
1101 | *state = State::Done; |
1102 | } else { |
1103 | // Safety: we hold the lock, so we can modify the waker. |
1104 | unsafe { |
1105 | waiter.waker.with_mut(|v| { |
1106 | if let Some(waker) = waker { |
1107 | let should_update = match &*v { |
1108 | Some(current_waker) => !current_waker.will_wake(waker), |
1109 | None => true, |
1110 | }; |
1111 | if should_update { |
1112 | old_waker = std::mem::replace(&mut *v, Some(waker.clone())); |
1113 | } |
1114 | } |
1115 | }); |
1116 | } |
1117 | |
1118 | // Drop the old waker after releasing the lock. |
1119 | drop(waiters); |
1120 | drop(old_waker); |
1121 | |
1122 | return Poll::Pending; |
1123 | } |
1124 | |
1125 | // Explicit drop of the lock to indicate the scope that the |
1126 | // lock is held. Because holding the lock is required to |
1127 | // ensure safe access to fields not held within the lock, it |
1128 | // is helpful to visualize the scope of the critical |
1129 | // section. |
1130 | drop(waiters); |
1131 | |
1132 | // Drop the old waker after releasing the lock. |
1133 | drop(old_waker); |
1134 | } |
1135 | State::Done => { |
1136 | #[cfg (tokio_taskdump)] |
1137 | if let Some(waker) = waker { |
1138 | let mut ctx = Context::from_waker(waker); |
1139 | std::task::ready!(crate::trace::trace_leaf(&mut ctx)); |
1140 | } |
1141 | return Poll::Ready(()); |
1142 | } |
1143 | } |
1144 | } |
1145 | } |
1146 | } |
1147 | |
1148 | impl Future for Notified<'_> { |
1149 | type Output = (); |
1150 | |
1151 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
1152 | self.poll_notified(waker:Some(cx.waker())) |
1153 | } |
1154 | } |
1155 | |
1156 | impl Drop for Notified<'_> { |
1157 | fn drop(&mut self) { |
1158 | // Safety: The type only transitions to a "Waiting" state when pinned. |
1159 | let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() }; |
1160 | |
1161 | // This is where we ensure safety. The `Notified` value is being |
1162 | // dropped, which means we must ensure that the waiter entry is no |
1163 | // longer stored in the linked list. |
1164 | if matches!(*state, State::Waiting) { |
1165 | let mut waiters = notify.waiters.lock(); |
1166 | let mut notify_state = notify.state.load(SeqCst); |
1167 | |
1168 | // We hold the lock, so this field is not concurrently accessed by |
1169 | // `notify_*` functions and we can use the relaxed ordering. |
1170 | let notification = waiter.notification.load(Relaxed); |
1171 | |
1172 | // remove the entry from the list (if not already removed) |
1173 | // |
1174 | // Safety: we hold the lock, so we have an exclusive access to every list the |
1175 | // waiter may be contained in. If the node is not contained in the `waiters` |
1176 | // list, then it is contained by a guarded list used by `notify_waiters`. |
1177 | unsafe { waiters.remove(NonNull::from(waiter)) }; |
1178 | |
1179 | if waiters.is_empty() && get_state(notify_state) == WAITING { |
1180 | notify_state = set_state(notify_state, EMPTY); |
1181 | notify.state.store(notify_state, SeqCst); |
1182 | } |
1183 | |
1184 | // See if the node was notified but not received. In this case, if |
1185 | // the notification was triggered via `notify_one`, it must be sent |
1186 | // to the next waiter. |
1187 | if let Some(Notification::One(strategy)) = notification { |
1188 | if let Some(waker) = |
1189 | notify_locked(&mut waiters, ¬ify.state, notify_state, strategy) |
1190 | { |
1191 | drop(waiters); |
1192 | waker.wake(); |
1193 | } |
1194 | } |
1195 | } |
1196 | } |
1197 | } |
1198 | |
1199 | /// # Safety |
1200 | /// |
1201 | /// `Waiter` is forced to be !Unpin. |
1202 | unsafe impl linked_list::Link for Waiter { |
1203 | type Handle = NonNull<Waiter>; |
1204 | type Target = Waiter; |
1205 | |
1206 | fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { |
1207 | *handle |
1208 | } |
1209 | |
1210 | unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { |
1211 | ptr |
1212 | } |
1213 | |
1214 | unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { |
1215 | Waiter::addr_of_pointers(me:target) |
1216 | } |
1217 | } |
1218 | |
1219 | fn is_unpin<T: Unpin>() {} |
1220 | |