1// Copyright (C) 2022 The Qt Company Ltd.
2// Copyright (C) 2018 Intel Corporation.
3// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
4
5#include "qsemaphore.h"
6#include "qfutex_p.h"
7#include "qdeadlinetimer.h"
8#include "qdatetime.h"
9#include "qdebug.h"
10#include "qlocking_p.h"
11#include "qwaitcondition_p.h"
12
13#include <chrono>
14
15QT_BEGIN_NAMESPACE
16
17using namespace QtFutex;
18
19/*!
20 \class QSemaphore
21 \inmodule QtCore
22 \brief The QSemaphore class provides a general counting semaphore.
23
24 \threadsafe
25
26 \ingroup thread
27
28 A semaphore is a generalization of a mutex. While a mutex can
29 only be locked once, it's possible to acquire a semaphore
30 multiple times. Semaphores are typically used to protect a
31 certain number of identical resources.
32
33 Semaphores support two fundamental operations, acquire() and
34 release():
35
36 \list
37 \li acquire(\e{n}) tries to acquire \e n resources. If there aren't
38 that many resources available, the call will block until this
39 is the case.
40 \li release(\e{n}) releases \e n resources.
41 \endlist
42
43 There's also a tryAcquire() function that returns immediately if
44 it cannot acquire the resources, and an available() function that
45 returns the number of available resources at any time.
46
47 Example:
48
49 \snippet code/src_corelib_thread_qsemaphore.cpp 0
50
51 A typical application of semaphores is for controlling access to
52 a circular buffer shared by a producer thread and a consumer
53 thread. The \l{Producer and Consumer using Semaphores} example shows how
54 to use QSemaphore to solve that problem.
55
56 A non-computing example of a semaphore would be dining at a
57 restaurant. A semaphore is initialized with the number of chairs
58 in the restaurant. As people arrive, they want a seat. As seats
59 are filled, available() is decremented. As people leave, the
60 available() is incremented, allowing more people to enter. If a
61 party of 10 people want to be seated, but there are only 9 seats,
62 those 10 people will wait, but a party of 4 people would be
63 seated (taking the available seats to 5, making the party of 10
64 people wait longer).
65
66 \sa QSemaphoreReleaser, QMutex, QWaitCondition, QThread,
67 {Producer and Consumer using Semaphores}
68*/
69
70/*
71 QSemaphore futex operation
72
73 QSemaphore stores a 32-bit integer with the counter of currently available
74 tokens (value between 0 and INT_MAX). When a thread attempts to acquire n
75 tokens and the counter is larger than that, we perform a compare-and-swap
76 with the new count. If that succeeds, the acquisition worked; if not, we
77 loop again because the counter changed. If there were not enough tokens,
78 we'll perform a futex-wait.
79
80 Before we do, we set the high bit in the futex to indicate that semaphore
81 is contended: that is, there's a thread waiting for more tokens. On
82 release() for n tokens, we perform a fetch-and-add of n and then check if
83 that high bit was set. If it was, then we clear that bit and perform a
84 futex-wake on the semaphore to indicate the waiting threads can wake up and
85 acquire tokens. Which ones get woken up is unspecified.
86
87 If the system has the ability to wake up a precise number of threads, has
88 Linux's FUTEX_WAKE_OP functionality, and is 64-bit, instead of using a
89 single bit indicating a contended semaphore, we'll store the number of
90 tokens *plus* total number of waiters in the high word. Additionally, all
91 multi-token waiters will be waiting on that high word. So when releasing n
92 tokens on those systems, we tell the kernel to wake up n single-token
93 threads and all of the multi-token ones. Which threads get woken up is
94 unspecified, but it's likely single-token threads will get woken up first.
95 */
96
97#if defined(FUTEX_OP) && QT_POINTER_SIZE > 4
98static constexpr bool futexHasWaiterCount = true;
99#else
100static constexpr bool futexHasWaiterCount = false;
101#endif
102
103static constexpr quintptr futexNeedsWakeAllBit = futexHasWaiterCount ?
104 (Q_UINT64_C(1) << (sizeof(quintptr) * CHAR_BIT - 1)) : 0x80000000U;
105
106static int futexAvailCounter(quintptr v)
107{
108 // the low 31 bits
109 if (futexHasWaiterCount) {
110 // the high bit of the low word isn't used
111 Q_ASSERT((v & 0x80000000U) == 0);
112
113 // so we can be a little faster
114 return int(unsigned(v));
115 }
116 return int(v & 0x7fffffffU);
117}
118
119static bool futexNeedsWake(quintptr v)
120{
121 // If we're counting waiters, the number of waiters plus value is stored in the
122 // low 31 bits of the high word (that is, bits 32-62). If we're not, then we only
123 // use futexNeedsWakeAllBit to indicate anyone is waiting.
124 if constexpr (futexHasWaiterCount)
125 return unsigned(quint64(v) >> 32) > unsigned(v);
126 return v >> 31;
127}
128
129static QBasicAtomicInteger<quint32> *futexLow32(QBasicAtomicInteger<quintptr> *ptr)
130{
131 auto result = reinterpret_cast<QBasicAtomicInteger<quint32> *>(ptr);
132#if Q_BYTE_ORDER == Q_BIG_ENDIAN && QT_POINTER_SIZE > 4
133 ++result;
134#endif
135 return result;
136}
137
138static QBasicAtomicInteger<quint32> *futexHigh32(QBasicAtomicInteger<quintptr> *ptr)
139{
140 Q_ASSERT(futexHasWaiterCount);
141 auto result = reinterpret_cast<QBasicAtomicInteger<quint32> *>(ptr);
142#if Q_BYTE_ORDER == Q_LITTLE_ENDIAN && QT_POINTER_SIZE > 4
143 ++result;
144#endif
145 return result;
146}
147
148template <bool IsTimed> bool
149futexSemaphoreTryAcquire_loop(QBasicAtomicInteger<quintptr> &u, quintptr curValue, quintptr nn,
150 QDeadlineTimer timer)
151{
152 using namespace std::chrono;
153 int n = int(unsigned(nn));
154
155 // we're called after one testAndSet, so start by waiting first
156 for (;;) {
157 // indicate we're waiting
158 auto ptr = futexLow32(ptr: &u);
159 if (n > 1 || !futexHasWaiterCount) {
160 u.fetchAndOrRelaxed(valueToAdd: futexNeedsWakeAllBit);
161 curValue |= futexNeedsWakeAllBit;
162 if constexpr (futexHasWaiterCount) {
163 Q_ASSERT(n > 1);
164 ptr = futexHigh32(ptr: &u);
165 curValue = quint64(curValue) >> 32;
166 }
167 }
168
169 if (IsTimed) {
170 bool timedout = !futexWait(futex&: *ptr, expectedValue: curValue, deadline: timer);
171 if (timedout)
172 return false;
173 } else {
174 futexWait(futex&: *ptr, expectedValue: curValue);
175 }
176
177 curValue = u.loadAcquire();
178
179 // try to acquire
180 while (futexAvailCounter(v: curValue) >= n) {
181 quintptr newValue = curValue - nn;
182 if (u.testAndSetOrdered(expectedValue: curValue, newValue, currentValue&: curValue))
183 return true; // succeeded!
184 }
185
186 // not enough tokens available, put us to wait
187 if (IsTimed && timer.hasExpired())
188 return false;
189 }
190}
191
192static constexpr QDeadlineTimer::ForeverConstant Expired =
193 QDeadlineTimer::ForeverConstant(1);
194
195template <typename T> bool
196futexSemaphoreTryAcquire(QBasicAtomicInteger<quintptr> &u, int n, T timeout)
197{
198 constexpr bool IsTimed = std::is_same_v<QDeadlineTimer, T>;
199 // Try to acquire without waiting (we still loop because the testAndSet
200 // call can fail).
201 quintptr nn = unsigned(n);
202 if (futexHasWaiterCount)
203 nn |= quint64(nn) << 32; // token count replicated in high word
204
205 quintptr curValue = u.loadAcquire();
206 while (futexAvailCounter(v: curValue) >= n) {
207 // try to acquire
208 quintptr newValue = curValue - nn;
209 if (u.testAndSetOrdered(expectedValue: curValue, newValue, currentValue&: curValue))
210 return true; // succeeded!
211 }
212 if constexpr (IsTimed) {
213 if (timeout.hasExpired())
214 return false;
215 } else {
216 if (timeout == Expired)
217 return false;
218 }
219
220 // we need to wait
221 constexpr quintptr oneWaiter = quintptr(Q_UINT64_C(1) << 32); // zero on 32-bit
222 if constexpr (futexHasWaiterCount) {
223 // We don't use the fetched value from above so futexWait() fails if
224 // it changed after the testAndSetOrdered above.
225 quint32 waiterCount = (quint64(curValue) >> 32) & 0x7fffffffU;
226 if (waiterCount == 0x7fffffffU) {
227 qCritical() << "Waiter count overflow in QSemaphore";
228 return false;
229 }
230
231 // increase the waiter count
232 u.fetchAndAddRelaxed(valueToAdd: oneWaiter);
233 curValue += oneWaiter;
234
235 // Also adjust nn to subtract oneWaiter when we succeed in acquiring.
236 nn += oneWaiter;
237 }
238
239 if (futexSemaphoreTryAcquire_loop<IsTimed>(u, curValue, nn, timeout))
240 return true;
241
242 Q_ASSERT(IsTimed);
243
244 if (futexHasWaiterCount) {
245 // decrement the number of threads waiting
246 Q_ASSERT(futexHigh32(&u)->loadRelaxed() & 0x7fffffffU);
247 u.fetchAndSubRelaxed(valueToAdd: oneWaiter);
248 }
249 return false;
250}
251
252namespace QtSemaphorePrivate {
253using namespace QtPrivate;
254struct Layout1
255{
256 alignas(IdealMutexAlignment) std::mutex mutex;
257 qsizetype avail = 0;
258 alignas(IdealMutexAlignment) std::condition_variable cond;
259};
260
261struct Layout2
262{
263 alignas(IdealMutexAlignment) std::mutex mutex;
264 alignas(IdealMutexAlignment) std::condition_variable cond;
265 qsizetype avail = 0;
266};
267
268// Choose Layout1 if it is smaller than Layout2. That happens for platforms
269// where sizeof(mutex) is 64.
270using Members = std::conditional_t<sizeof(Layout1) <= sizeof(Layout2), Layout1, Layout2>;
271} // namespace QtSemaphorePrivate
272
273class QSemaphorePrivate : public QtSemaphorePrivate::Members
274{
275public:
276 explicit QSemaphorePrivate(qsizetype n) { avail = n; }
277};
278
279/*!
280 Creates a new semaphore and initializes the number of resources
281 it guards to \a n (by default, 0).
282
283 \sa release(), available()
284*/
285QSemaphore::QSemaphore(int n)
286{
287 Q_ASSERT_X(n >= 0, "QSemaphore", "parameter 'n' must be non-negative");
288 if (futexAvailable()) {
289 quintptr nn = unsigned(n);
290 if (futexHasWaiterCount)
291 nn |= quint64(nn) << 32; // token count replicated in high word
292 u.storeRelaxed(newValue: nn);
293 } else {
294 d = new QSemaphorePrivate(n);
295 }
296}
297
298/*!
299 Destroys the semaphore.
300
301 \warning Destroying a semaphore that is in use may result in
302 undefined behavior.
303*/
304QSemaphore::~QSemaphore()
305{
306 if (!futexAvailable())
307 delete d;
308}
309
310/*!
311 Tries to acquire \c n resources guarded by the semaphore. If \a n
312 > available(), this call will block until enough resources are
313 available.
314
315 \sa release(), available(), tryAcquire()
316*/
317void QSemaphore::acquire(int n)
318{
319#if QT_VERSION >= QT_VERSION_CHECK(7, 0, 0)
320# warning "Move the Q_ASSERT to inline code, make QSemaphore have wide contract, " \
321 "and mark noexcept where futexes are in use."
322#else
323 Q_ASSERT_X(n >= 0, "QSemaphore::acquire", "parameter 'n' must be non-negative");
324#endif
325
326 if (futexAvailable()) {
327 futexSemaphoreTryAcquire(u&: u, n, timeout: QDeadlineTimer::Forever);
328 return;
329 }
330
331 const auto sufficientResourcesAvailable = [this, n] { return d->avail >= n; };
332
333 auto locker = qt_unique_lock(mutex&: d->mutex);
334 d->cond.wait(lock&: locker, p: sufficientResourcesAvailable);
335 d->avail -= n;
336}
337
338/*!
339 Releases \a n resources guarded by the semaphore.
340
341 This function can be used to "create" resources as well. For
342 example:
343
344 \snippet code/src_corelib_thread_qsemaphore.cpp 1
345
346 QSemaphoreReleaser is a \l{http://en.cppreference.com/w/cpp/language/raii}{RAII}
347 wrapper around this function.
348
349 \sa acquire(), available(), QSemaphoreReleaser
350*/
351void QSemaphore::release(int n)
352{
353 Q_ASSERT_X(n >= 0, "QSemaphore::release", "parameter 'n' must be non-negative");
354
355 if (futexAvailable()) {
356 quintptr nn = unsigned(n);
357 if (futexHasWaiterCount)
358 nn |= quint64(nn) << 32; // token count replicated in high word
359 quintptr prevValue = u.loadRelaxed();
360 quintptr newValue;
361 do { // loop just to ensure the operations are done atomically
362 newValue = prevValue + nn;
363 newValue &= (futexNeedsWakeAllBit - 1);
364 } while (!u.testAndSetRelease(expectedValue: prevValue, newValue, currentValue&: prevValue));
365 if (futexNeedsWake(v: prevValue)) {
366#ifdef FUTEX_OP
367 if (futexHasWaiterCount) {
368 /*
369 On 64-bit systems, the single-token waiters wait on the low half
370 and the multi-token waiters wait on the upper half. So we ask
371 the kernel to wake up n single-token waiters and all multi-token
372 waiters (if any), and clear the multi-token wait bit.
373
374 atomic {
375 int oldval = *upper;
376 *upper = oldval | 0;
377 futexWake(lower, n);
378 if (oldval != 0) // always true
379 futexWake(upper, INT_MAX);
380 }
381 */
382 quint32 op = FUTEX_OP_OR;
383 quint32 oparg = 0;
384 quint32 cmp = FUTEX_OP_CMP_NE;
385 quint32 cmparg = 0;
386 futexWakeOp(futex1&: *futexLow32(ptr: &u), wake1: n, INT_MAX, futex2&: *futexHigh32(ptr: &u), FUTEX_OP(op, oparg, cmp, cmparg));
387 return;
388 }
389#endif
390 // Unset the bit and wake everyone. There are two possibilities
391 // under which a thread can set the bit between the AND and the
392 // futexWake:
393 // 1) it did see the new counter value, but it wasn't enough for
394 // its acquisition anyway, so it has to wait;
395 // 2) it did not see the new counter value, in which case its
396 // futexWait will fail.
397 futexWakeAll(futex&: *futexLow32(ptr: &u));
398 if (futexHasWaiterCount)
399 futexWakeAll(futex&: *futexHigh32(ptr: &u));
400 }
401 return;
402 }
403
404 // Keep mutex locked until after notify_all() lest another thread acquire()s
405 // the semaphore once d->avail == 0 and then destroys it, leaving `d` dangling.
406 const auto locker = qt_scoped_lock(mutex&: d->mutex);
407 d->avail += n;
408 d->cond.notify_all();
409}
410
411/*!
412 Returns the number of resources currently available to the
413 semaphore. This number can never be negative.
414
415 \sa acquire(), release()
416*/
417int QSemaphore::available() const
418{
419 if (futexAvailable())
420 return futexAvailCounter(v: u.loadRelaxed());
421
422 const auto locker = qt_scoped_lock(mutex&: d->mutex);
423 return d->avail;
424}
425
426/*!
427 Tries to acquire \c n resources guarded by the semaphore and
428 returns \c true on success. If available() < \a n, this call
429 immediately returns \c false without acquiring any resources.
430
431 Example:
432
433 \snippet code/src_corelib_thread_qsemaphore.cpp 2
434
435 \sa acquire()
436*/
437bool QSemaphore::tryAcquire(int n)
438{
439 Q_ASSERT_X(n >= 0, "QSemaphore::tryAcquire", "parameter 'n' must be non-negative");
440
441 if (futexAvailable())
442 return futexSemaphoreTryAcquire(u&: u, n, timeout: Expired);
443
444 const auto locker = qt_scoped_lock(mutex&: d->mutex);
445 if (n > d->avail)
446 return false;
447 d->avail -= n;
448 return true;
449}
450
451/*!
452 \fn QSemaphore::tryAcquire(int n, int timeout)
453
454 Tries to acquire \c n resources guarded by the semaphore and
455 returns \c true on success. If available() < \a n, this call will
456 wait for at most \a timeout milliseconds for resources to become
457 available.
458
459 Note: Passing a negative number as the \a timeout is equivalent to
460 calling acquire(), i.e. this function will wait forever for
461 resources to become available if \a timeout is negative.
462
463 Example:
464
465 \snippet code/src_corelib_thread_qsemaphore.cpp 3
466
467 \sa acquire()
468*/
469
470/*!
471 \since 6.6
472
473 Tries to acquire \c n resources guarded by the semaphore and returns \c
474 true on success. If available() < \a n, this call will wait until \a timer
475 expires for resources to become available.
476
477 Example:
478
479 \snippet code/src_corelib_thread_qsemaphore.cpp tryAcquire-QDeadlineTimer
480
481 \sa acquire()
482*/
483bool QSemaphore::tryAcquire(int n, QDeadlineTimer timer)
484{
485 if (timer.isForever()) {
486 acquire(n);
487 return true;
488 }
489
490 if (timer.hasExpired())
491 return tryAcquire(n);
492
493 Q_ASSERT_X(n >= 0, "QSemaphore::tryAcquire", "parameter 'n' must be non-negative");
494
495 if (futexAvailable())
496 return futexSemaphoreTryAcquire(u&: u, n, timeout: timer);
497
498 using namespace std::chrono;
499 const auto sufficientResourcesAvailable = [this, n] { return d->avail >= n; };
500
501 auto locker = qt_unique_lock(mutex&: d->mutex);
502 if (!d->cond.wait_until(lock&: locker, atime: timer.deadline<steady_clock>(), p: sufficientResourcesAvailable))
503 return false;
504 d->avail -= n;
505 return true;
506}
507
508/*!
509 \fn template <typename Rep, typename Period> QSemaphore::tryAcquire(int n, std::chrono::duration<Rep, Period> timeout)
510 \overload
511 \since 6.3
512*/
513
514/*!
515 \fn bool QSemaphore::try_acquire()
516 \since 6.3
517
518 This function is provided for \c{std::counting_semaphore} compatibility.
519
520 It is equivalent to calling \c{tryAcquire(1)}, where the function returns
521 \c true on acquiring the resource successfully.
522
523 \sa tryAcquire(), try_acquire_for(), try_acquire_until()
524*/
525
526/*!
527 \fn template <typename Rep, typename Period> bool QSemaphore::try_acquire_for(const std::chrono::duration<Rep, Period> &timeout)
528 \since 6.3
529
530 This function is provided for \c{std::counting_semaphore} compatibility.
531
532 It is equivalent to calling \c{tryAcquire(1, timeout)}, where the call
533 times out on the given \a timeout value. The function returns \c true
534 on acquiring the resource successfully.
535
536 \sa tryAcquire(), try_acquire(), try_acquire_until()
537*/
538
539/*!
540 \fn template <typename Clock, typename Duration> bool QSemaphore::try_acquire_until(const std::chrono::time_point<Clock, Duration> &tp)
541 \since 6.3
542
543 This function is provided for \c{std::counting_semaphore} compatibility.
544
545 It is equivalent to calling \c{tryAcquire(1, tp - Clock::now())},
546 which means that the \a tp (time point) is recorded, ignoring the
547 adjustments to \c{Clock} while waiting. The function returns \c true
548 on acquiring the resource successfully.
549
550 \sa tryAcquire(), try_acquire(), try_acquire_for()
551*/
552
553/*!
554 \class QSemaphoreReleaser
555 \brief The QSemaphoreReleaser class provides exception-safe deferral of a QSemaphore::release() call.
556 \since 5.10
557 \ingroup thread
558 \inmodule QtCore
559
560 \reentrant
561
562 QSemaphoreReleaser can be used wherever you would otherwise use
563 QSemaphore::release(). Constructing a QSemaphoreReleaser defers the
564 release() call on the semaphore until the QSemaphoreReleaser is
565 destroyed (see
566 \l{http://en.cppreference.com/w/cpp/language/raii}{RAII pattern}).
567
568 You can use this to reliably release a semaphore to avoid dead-lock
569 in the face of exceptions or early returns:
570
571 \snippet code/src_corelib_thread_qsemaphore.cpp 4
572
573 If an early return is taken or an exception is thrown before the
574 \c{sem.release()} call is reached, the semaphore is not released,
575 possibly preventing the thread waiting in the corresponding
576 \c{sem.acquire()} call from ever continuing execution.
577
578 When using RAII instead:
579
580 \snippet code/src_corelib_thread_qsemaphore.cpp 5
581
582 this can no longer happen, because the compiler will make sure that
583 the QSemaphoreReleaser destructor is always called, and therefore
584 the semaphore is always released.
585
586 QSemaphoreReleaser is move-enabled and can therefore be returned
587 from functions to transfer responsibility for releasing a semaphore
588 out of a function or a scope:
589
590 \snippet code/src_corelib_thread_qsemaphore.cpp 6
591
592 A QSemaphoreReleaser can be canceled by a call to cancel(). A canceled
593 semaphore releaser will no longer call QSemaphore::release() in its
594 destructor.
595
596 \sa QMutexLocker
597*/
598
599/*!
600 \fn QSemaphoreReleaser::QSemaphoreReleaser()
601
602 Default constructor. Creates a QSemaphoreReleaser that does nothing.
603*/
604
605/*!
606 \fn QSemaphoreReleaser::QSemaphoreReleaser(QSemaphore &sem, int n)
607
608 Constructor. Stores the arguments and calls \a{sem}.release(\a{n})
609 in the destructor.
610*/
611
612/*!
613 \fn QSemaphoreReleaser::QSemaphoreReleaser(QSemaphore *sem, int n)
614
615 Constructor. Stores the arguments and calls \a{sem}->release(\a{n})
616 in the destructor.
617*/
618
619/*!
620 \fn QSemaphoreReleaser::QSemaphoreReleaser(QSemaphoreReleaser &&other)
621
622 Move constructor. Takes over responsibility to call QSemaphore::release()
623 from \a other, which in turn is canceled.
624
625 \sa cancel()
626*/
627
628/*!
629 \fn QSemaphoreReleaser::operator=(QSemaphoreReleaser &&other)
630
631 Move assignment operator. Takes over responsibility to call QSemaphore::release()
632 from \a other, which in turn is canceled.
633
634 If this semaphore releaser had the responsibility to call some QSemaphore::release()
635 itself, it performs the call before taking over from \a other.
636
637 \sa cancel()
638*/
639
640/*!
641 \fn QSemaphoreReleaser::~QSemaphoreReleaser()
642
643 Unless canceled, calls QSemaphore::release() with the arguments provided
644 to the constructor, or by the last move assignment.
645*/
646
647/*!
648 \fn QSemaphoreReleaser::swap(QSemaphoreReleaser &other)
649
650 Exchanges the responsibilities of \c{*this} and \a other.
651
652 Unlike move assignment, neither of the two objects ever releases its
653 semaphore, if any, as a consequence of swapping.
654
655 Therefore this function is very fast and never fails.
656*/
657
658/*!
659 \fn QSemaphoreReleaser::semaphore() const
660
661 Returns a pointer to the QSemaphore object provided to the constructor,
662 or by the last move assignment, if any. Otherwise, returns \nullptr.
663*/
664
665/*!
666 \fn QSemaphoreReleaser::cancel()
667
668 Cancels this QSemaphoreReleaser such that the destructor will no longer
669 call \c{semaphore()->release()}. Returns the value of semaphore()
670 before this call. After this call, semaphore() will return \nullptr.
671
672 To enable again, assign a new QSemaphoreReleaser:
673
674 \snippet code/src_corelib_thread_qsemaphore.cpp 7
675*/
676
677
678QT_END_NAMESPACE
679

Provided by KDAB

Privacy Policy
Learn Advanced QML with KDAB
Find out more

source code of qtbase/src/corelib/thread/qsemaphore.cpp