1// Copyright (C) 2020 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4// qfutureinterface.h included from qfuture.h
5#include "qfuture.h"
6#include "qfutureinterface_p.h"
7
8#include <QtCore/qatomic.h>
9#include <QtCore/qcoreapplication.h>
10#include <QtCore/qthread.h>
11#include <QtCore/qvarlengtharray.h>
12#include <QtCore/private/qsimd_p.h> // for qYieldCpu()
13#include <private/qthreadpool_p.h>
14#include <private/qobject_p.h>
15
16#ifdef interface
17# undef interface
18#endif
19
20// GCC 12 gets confused about QFutureInterfaceBase::state, for some non-obvious
21// reason
22// warning: ‘unsigned int __atomic_or_fetch_4(volatile void*, unsigned int, int)’ writing 4 bytes into a region of size 0 overflows the destination [-Wstringop-overflow=]
23QT_WARNING_DISABLE_GCC("-Wstringop-overflow")
24
25QT_BEGIN_NAMESPACE
26
27enum {
28 MaxProgressEmitsPerSecond = 25
29};
30
31namespace {
32class ThreadPoolThreadReleaser {
33 QThreadPool *m_pool;
34public:
35 Q_NODISCARD_CTOR
36 explicit ThreadPoolThreadReleaser(QThreadPool *pool)
37 : m_pool(pool)
38 { if (pool) pool->releaseThread(); }
39 ~ThreadPoolThreadReleaser()
40 { if (m_pool) m_pool->reserveThread(); }
41};
42
43const auto suspendingOrSuspended =
44 QFutureInterfaceBase::Suspending | QFutureInterfaceBase::Suspended;
45
46} // unnamed namespace
47
48class QBasicFutureWatcher : public QObject, QFutureCallOutInterface
49{
50 Q_OBJECT
51public:
52 explicit QBasicFutureWatcher(QObject *parent = nullptr);
53 ~QBasicFutureWatcher() override;
54
55 void setFuture(QFutureInterfaceBase &fi);
56
57 bool event(QEvent *event) override;
58
59Q_SIGNALS:
60 void finished();
61
62private:
63 QFutureInterfaceBase future;
64
65 void postCallOutEvent(const QFutureCallOutEvent &event) override;
66 void callOutInterfaceDisconnected() override;
67};
68
69void QBasicFutureWatcher::postCallOutEvent(const QFutureCallOutEvent &event)
70{
71 if (thread() == QThread::currentThread()) {
72 // If we are in the same thread, don't queue up anything.
73 std::unique_ptr<QFutureCallOutEvent> clonedEvent(event.clone());
74 QCoreApplication::sendEvent(receiver: this, event: clonedEvent.get());
75 } else {
76 QCoreApplication::postEvent(receiver: this, event: event.clone());
77 }
78}
79
80void QBasicFutureWatcher::callOutInterfaceDisconnected()
81{
82 QCoreApplication::removePostedEvents(receiver: this, eventType: QEvent::FutureCallOut);
83}
84
85/*
86 * QBasicFutureWatcher is a more lightweight version of QFutureWatcher for internal use
87 */
88QBasicFutureWatcher::QBasicFutureWatcher(QObject *parent)
89 : QObject(parent)
90{
91}
92
93QBasicFutureWatcher::~QBasicFutureWatcher()
94{
95 future.d->disconnectOutputInterface(iface: this);
96}
97
98void QBasicFutureWatcher::setFuture(QFutureInterfaceBase &fi)
99{
100 future = fi;
101 future.d->connectOutputInterface(iface: this);
102}
103
104bool QBasicFutureWatcher::event(QEvent *event)
105{
106 if (event->type() == QEvent::FutureCallOut) {
107 QFutureCallOutEvent *callOutEvent = static_cast<QFutureCallOutEvent *>(event);
108 if (callOutEvent->callOutType == QFutureCallOutEvent::Finished)
109 emit finished();
110 return true;
111 }
112 return QObject::event(event);
113}
114
115void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *slotObj,
116 QFutureInterfaceBase &fi)
117{
118 Q_ASSERT(context);
119 Q_ASSERT(slotObj);
120
121 auto slot = SlotObjUniquePtr(slotObj);
122
123 auto *watcher = new QBasicFutureWatcher;
124 watcher->moveToThread(thread: context->thread());
125 // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`...
126 QObject::connect(sender: watcher, signal: &QBasicFutureWatcher::finished,
127 // for the following, cf. QMetaObject::invokeMethodImpl():
128 // we know `slot` is a lambda returning `void`, so we can just
129 // `call()` with `obj` and `args[0]` set to `nullptr`:
130 context: watcher, slot: [slot = std::move(slot)] {
131 void *args[] = { nullptr }; // for `void` return value
132 slot->call(r: nullptr, a: args);
133 });
134 QObject::connect(sender: watcher, signal: &QBasicFutureWatcher::finished,
135 context: watcher, slot: &QObject::deleteLater);
136 QObject::connect(sender: context, signal: &QObject::destroyed,
137 context: watcher, slot: &QObject::deleteLater);
138 watcher->setFuture(fi);
139}
140
141QFutureCallOutInterface::~QFutureCallOutInterface()
142 = default;
143
144Q_IMPL_EVENT_COMMON(QFutureCallOutEvent)
145
146QFutureInterfaceBase::QFutureInterfaceBase(State initialState)
147 : d(new QFutureInterfaceBasePrivate(initialState))
148{ }
149
150QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other)
151 : d(other.d)
152{
153 d->refCount.ref();
154}
155
156QFutureInterfaceBase::~QFutureInterfaceBase()
157{
158 if (d && !d->refCount.deref())
159 delete d;
160}
161
162static inline int switch_on(QAtomicInt &a, int which)
163{
164 return a.fetchAndOrRelaxed(valueToAdd: which) | which;
165}
166
167static inline int switch_off(QAtomicInt &a, int which)
168{
169 return a.fetchAndAndRelaxed(valueToAdd: ~which) & ~which;
170}
171
172static inline int switch_from_to(QAtomicInt &a, int from, int to)
173{
174 const auto adjusted = [&](int old) { return (old & ~from) | to; };
175 int value = a.loadRelaxed();
176 while (!a.testAndSetRelaxed(expectedValue: value, newValue: adjusted(value), currentValue&: value))
177 qYieldCpu();
178 return value;
179}
180
181void QFutureInterfaceBase::cancel()
182{
183 cancel(mode: CancelMode::CancelOnly);
184}
185
186void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode)
187{
188 QMutexLocker locker(&d->m_mutex);
189
190 const auto oldState = d->state.loadRelaxed();
191
192 switch (mode) {
193 case CancelMode::CancelAndFinish:
194 if ((oldState & Finished) && (oldState & Canceled))
195 return;
196 switch_from_to(a&: d->state, from: suspendingOrSuspended | Running, to: Canceled | Finished);
197 break;
198 case CancelMode::CancelOnly:
199 if (oldState & Canceled)
200 return;
201 switch_from_to(a&: d->state, from: suspendingOrSuspended, to: Canceled);
202 break;
203 }
204
205 // Cancel the continuations chain
206 QFutureInterfaceBasePrivate *next = d->continuationData;
207 while (next) {
208 next->continuationState = QFutureInterfaceBasePrivate::Canceled;
209 next = next->continuationData;
210 }
211
212 d->waitCondition.wakeAll();
213 d->pausedWaitCondition.wakeAll();
214
215 if (!(oldState & Canceled))
216 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
217 if (mode == CancelMode::CancelAndFinish && !(oldState & Finished))
218 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Finished));
219
220 d->isValid = false;
221}
222
223void QFutureInterfaceBase::setSuspended(bool suspend)
224{
225 QMutexLocker locker(&d->m_mutex);
226 if (suspend) {
227 switch_on(a&: d->state, which: Suspending);
228 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
229 } else {
230 switch_off(a&: d->state, which: suspendingOrSuspended);
231 d->pausedWaitCondition.wakeAll();
232 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
233 }
234}
235
236void QFutureInterfaceBase::toggleSuspended()
237{
238 QMutexLocker locker(&d->m_mutex);
239 if (d->state.loadRelaxed() & suspendingOrSuspended) {
240 switch_off(a&: d->state, which: suspendingOrSuspended);
241 d->pausedWaitCondition.wakeAll();
242 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
243 } else {
244 switch_on(a&: d->state, which: Suspending);
245 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
246 }
247}
248
249void QFutureInterfaceBase::reportSuspended() const
250{
251 // Needs to be called when pause is in effect,
252 // i.e. no more events will be reported.
253
254 QMutexLocker locker(&d->m_mutex);
255 const int state = d->state.loadRelaxed();
256 if (!(state & Suspending) || (state & Suspended))
257 return;
258
259 switch_from_to(a&: d->state, from: Suspending, to: Suspended);
260 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
261}
262
263void QFutureInterfaceBase::setThrottled(bool enable)
264{
265 QMutexLocker lock(&d->m_mutex);
266 if (enable) {
267 switch_on(a&: d->state, which: Throttled);
268 } else {
269 switch_off(a&: d->state, which: Throttled);
270 if (!(d->state.loadRelaxed() & suspendingOrSuspended))
271 d->pausedWaitCondition.wakeAll();
272 }
273}
274
275
276bool QFutureInterfaceBase::isRunning() const
277{
278 return queryState(state: Running);
279}
280
281bool QFutureInterfaceBase::isStarted() const
282{
283 return queryState(state: Started);
284}
285
286bool QFutureInterfaceBase::isCanceled() const
287{
288 return queryState(state: Canceled);
289}
290
291bool QFutureInterfaceBase::isFinished() const
292{
293 return queryState(state: Finished);
294}
295
296bool QFutureInterfaceBase::isSuspending() const
297{
298 return queryState(state: Suspending);
299}
300
301#if QT_DEPRECATED_SINCE(6, 0)
302bool QFutureInterfaceBase::isPaused() const
303{
304 return queryState(state: static_cast<State>(suspendingOrSuspended));
305}
306#endif
307
308bool QFutureInterfaceBase::isSuspended() const
309{
310 return queryState(state: Suspended);
311}
312
313bool QFutureInterfaceBase::isThrottled() const
314{
315 return queryState(state: Throttled);
316}
317
318bool QFutureInterfaceBase::isResultReadyAt(int index) const
319{
320 QMutexLocker lock(&d->m_mutex);
321 return d->internal_isResultReadyAt(index);
322}
323
324bool QFutureInterfaceBase::isValid() const
325{
326 const QMutexLocker lock(&d->m_mutex);
327 return d->isValid;
328}
329
330bool QFutureInterfaceBase::isRunningOrPending() const
331{
332 return queryState(state: static_cast<State>(Running | Pending));
333}
334
335bool QFutureInterfaceBase::waitForNextResult()
336{
337 QMutexLocker lock(&d->m_mutex);
338 return d->internal_waitForNextResult();
339}
340
341void QFutureInterfaceBase::waitForResume()
342{
343 // return early if possible to avoid taking the mutex lock.
344 {
345 const int state = d->state.loadRelaxed();
346 if (!(state & suspendingOrSuspended) || (state & Canceled))
347 return;
348 }
349
350 QMutexLocker lock(&d->m_mutex);
351 const int state = d->state.loadRelaxed();
352 if (!(state & suspendingOrSuspended) || (state & Canceled))
353 return;
354
355 // decrease active thread count since this thread will wait.
356 const ThreadPoolThreadReleaser releaser(d->pool());
357
358 d->pausedWaitCondition.wait(lockedMutex: &d->m_mutex);
359}
360
361void QFutureInterfaceBase::suspendIfRequested()
362{
363 const auto canSuspend = [] (int state) {
364 // can suspend only if 1) in any suspend-related state; 2) not canceled
365 return (state & suspendingOrSuspended) && !(state & Canceled);
366 };
367
368 // return early if possible to avoid taking the mutex lock.
369 {
370 const int state = d->state.loadRelaxed();
371 if (!canSuspend(state))
372 return;
373 }
374
375 QMutexLocker lock(&d->m_mutex);
376 const int state = d->state.loadRelaxed();
377 if (!canSuspend(state))
378 return;
379
380 // Note: expecting that Suspending and Suspended are mutually exclusive
381 if (!(state & Suspended)) {
382 // switch state in case this is the first invocation
383 switch_from_to(a&: d->state, from: Suspending, to: Suspended);
384 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
385 }
386
387 // decrease active thread count since this thread will wait.
388 const ThreadPoolThreadReleaser releaser(d->pool());
389 d->pausedWaitCondition.wait(lockedMutex: &d->m_mutex);
390}
391
392int QFutureInterfaceBase::progressValue() const
393{
394 const QMutexLocker lock(&d->m_mutex);
395 return d->m_progressValue;
396}
397
398int QFutureInterfaceBase::progressMinimum() const
399{
400 const QMutexLocker lock(&d->m_mutex);
401 return d->m_progress ? d->m_progress->minimum : 0;
402}
403
404int QFutureInterfaceBase::progressMaximum() const
405{
406 const QMutexLocker lock(&d->m_mutex);
407 return d->m_progress ? d->m_progress->maximum : 0;
408}
409
410int QFutureInterfaceBase::resultCount() const
411{
412 QMutexLocker lock(&d->m_mutex);
413 return d->internal_resultCount();
414}
415
416QString QFutureInterfaceBase::progressText() const
417{
418 QMutexLocker locker(&d->m_mutex);
419 return d->m_progress ? d->m_progress->text : QString();
420}
421
422bool QFutureInterfaceBase::isProgressUpdateNeeded() const
423{
424 QMutexLocker locker(&d->m_mutex);
425 return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
426}
427
428void QFutureInterfaceBase::reportStarted()
429{
430 QMutexLocker locker(&d->m_mutex);
431 if (d->state.loadRelaxed() & (Started|Canceled|Finished))
432 return;
433 d->setState(State(Started | Running));
434 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Started));
435 d->isValid = true;
436}
437
438void QFutureInterfaceBase::reportCanceled()
439{
440 cancel();
441}
442
443#ifndef QT_NO_EXCEPTIONS
444void QFutureInterfaceBase::reportException(const QException &exception)
445{
446 try {
447 exception.raise();
448 } catch (...) {
449 reportException(e: std::current_exception());
450 }
451}
452
453#if QT_VERSION < QT_VERSION_CHECK(7, 0, 0)
454void QFutureInterfaceBase::reportException(std::exception_ptr exception)
455#else
456void QFutureInterfaceBase::reportException(const std::exception_ptr &exception)
457#endif
458{
459 QMutexLocker locker(&d->m_mutex);
460 if (d->state.loadRelaxed() & (Canceled|Finished))
461 return;
462
463 d->hasException = true;
464 d->data.setException(exception);
465 switch_on(a&: d->state, which: Canceled);
466 d->waitCondition.wakeAll();
467 d->pausedWaitCondition.wakeAll();
468 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
469}
470#endif
471
472void QFutureInterfaceBase::reportFinished()
473{
474 QMutexLocker locker(&d->m_mutex);
475 if (!isFinished()) {
476 switch_from_to(a&: d->state, from: Running, to: Finished);
477 d->waitCondition.wakeAll();
478 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Finished));
479 }
480}
481
482void QFutureInterfaceBase::setExpectedResultCount(int resultCount)
483{
484 if (d->m_progress)
485 setProgressRange(minimum: 0, maximum: resultCount);
486 d->m_expectedResultCount = resultCount;
487}
488
489int QFutureInterfaceBase::expectedResultCount()
490{
491 return d->m_expectedResultCount;
492}
493
494bool QFutureInterfaceBase::queryState(State state) const
495{
496 return d->state.loadRelaxed() & state;
497}
498
499int QFutureInterfaceBase::loadState() const
500{
501 // Used from ~QPromise, so this check is needed
502 if (!d)
503 return QFutureInterfaceBase::State::NoState;
504 return d->state.loadRelaxed();
505}
506
507void QFutureInterfaceBase::waitForResult(int resultIndex)
508{
509 if (d->hasException)
510 d->data.m_exceptionStore.rethrowException();
511
512 QMutexLocker lock(&d->m_mutex);
513 if (!isRunningOrPending())
514 return;
515 lock.unlock();
516
517 // To avoid deadlocks and reduce the number of threads used, try to
518 // run the runnable in the current thread.
519 d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable);
520
521 lock.relock();
522
523 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
524 while (isRunningOrPending() && !d->internal_isResultReadyAt(index: waitIndex))
525 d->waitCondition.wait(lockedMutex: &d->m_mutex);
526
527 if (d->hasException)
528 d->data.m_exceptionStore.rethrowException();
529}
530
531void QFutureInterfaceBase::waitForFinished()
532{
533 QMutexLocker lock(&d->m_mutex);
534 const bool alreadyFinished = isFinished();
535 lock.unlock();
536
537 if (!alreadyFinished) {
538 d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable);
539
540 lock.relock();
541
542 while (!isFinished())
543 d->waitCondition.wait(lockedMutex: &d->m_mutex);
544 }
545
546 if (d->hasException)
547 d->data.m_exceptionStore.rethrowException();
548}
549
550void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
551{
552 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
553 return;
554
555 d->waitCondition.wakeAll();
556
557 if (!d->m_progress) {
558 if (d->internal_updateProgressValue(progress: d->m_progressValue + endIndex - beginIndex) == false) {
559 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
560 beginIndex,
561 endIndex));
562 return;
563 }
564
565 d->sendCallOuts(callOut1: QFutureCallOutEvent(QFutureCallOutEvent::Progress,
566 d->m_progressValue,
567 QString()),
568 callOut2: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
569 beginIndex,
570 endIndex));
571 return;
572 }
573 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
574}
575
576void QFutureInterfaceBase::setRunnable(QRunnable *runnable)
577{
578 d->runnable = runnable;
579}
580
581void QFutureInterfaceBase::setThreadPool(QThreadPool *pool)
582{
583 d->m_pool = pool;
584}
585
586QThreadPool *QFutureInterfaceBase::threadPool() const
587{
588 return d->m_pool;
589}
590
591void QFutureInterfaceBase::setFilterMode(bool enable)
592{
593 QMutexLocker locker(&d->m_mutex);
594 if (!hasException())
595 resultStoreBase().setFilterMode(enable);
596}
597
598/*!
599 \internal
600 Sets the progress range's minimum and maximum values to \a minimum and
601 \a maximum respectively.
602
603 If \a maximum is smaller than \a minimum, \a minimum becomes the only
604 legal value.
605
606 The progress value is reset to be \a minimum.
607
608 The progress range usage can be disabled by using setProgressRange(0, 0).
609 In this case progress value is also reset to 0.
610
611 The behavior of this method is mostly inspired by
612 \l QProgressBar::setRange.
613*/
614void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
615{
616 QMutexLocker locker(&d->m_mutex);
617 if (!d->m_progress)
618 d->m_progress.reset(other: new QFutureInterfaceBasePrivate::ProgressData());
619 d->m_progress->minimum = minimum;
620 d->m_progress->maximum = qMax(a: minimum, b: maximum);
621 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
622 d->m_progressValue = minimum;
623}
624
625void QFutureInterfaceBase::setProgressValue(int progressValue)
626{
627 setProgressValueAndText(progressValue, progressText: QString());
628}
629
630/*!
631 \internal
632 In case of the \a progressValue falling out of the progress range,
633 this method has no effect.
634 Such behavior is inspired by \l QProgressBar::setValue.
635*/
636void QFutureInterfaceBase::setProgressValueAndText(int progressValue,
637 const QString &progressText)
638{
639 QMutexLocker locker(&d->m_mutex);
640 if (!d->m_progress)
641 d->m_progress.reset(other: new QFutureInterfaceBasePrivate::ProgressData());
642
643 const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0);
644 if (useProgressRange
645 && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) {
646 return;
647 }
648
649 if (d->m_progressValue >= progressValue)
650 return;
651
652 if (d->state.loadRelaxed() & (Canceled|Finished))
653 return;
654
655 if (d->internal_updateProgress(progress: progressValue, progressText)) {
656 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Progress,
657 d->m_progressValue,
658 d->m_progress->text));
659 }
660}
661
662QMutex &QFutureInterfaceBase::mutex() const
663{
664 return d->m_mutex;
665}
666
667bool QFutureInterfaceBase::hasException() const
668{
669 return d->hasException;
670}
671
672QtPrivate::ExceptionStore &QFutureInterfaceBase::exceptionStore()
673{
674 Q_ASSERT(d->hasException);
675 return d->data.m_exceptionStore;
676}
677
678QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase()
679{
680 Q_ASSERT(!d->hasException);
681 return d->data.m_results;
682}
683
684const QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const
685{
686 Q_ASSERT(!d->hasException);
687 return d->data.m_results;
688}
689
690QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other)
691{
692 QFutureInterfaceBase copy(other);
693 swap(other&: copy);
694 return *this;
695}
696
697// ### Qt 7: inline
698void QFutureInterfaceBase::swap(QFutureInterfaceBase &other) noexcept
699{
700 qSwap(value1&: d, value2&: other.d);
701}
702
703bool QFutureInterfaceBase::refT() const noexcept
704{
705 return d->refCount.refT();
706}
707
708bool QFutureInterfaceBase::derefT() const noexcept
709{
710 // Called from ~QFutureInterface
711 return !d || d->refCount.derefT();
712}
713
714void QFutureInterfaceBase::reset()
715{
716 d->m_progressValue = 0;
717 d->m_progress.reset();
718 d->progressTime.invalidate();
719 d->isValid = false;
720}
721
722void QFutureInterfaceBase::rethrowPossibleException()
723{
724 if (hasException())
725 exceptionStore().rethrowException();
726}
727
728QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
729 : state(initialState)
730{
731 progressTime.invalidate();
732}
733
734QFutureInterfaceBasePrivate::~QFutureInterfaceBasePrivate()
735{
736 if (hasException)
737 data.m_exceptionStore.~ExceptionStore();
738 else
739 data.m_results.~ResultStoreBase();
740}
741
742int QFutureInterfaceBasePrivate::internal_resultCount() const
743{
744 return hasException ? 0 : data.m_results.count(); // ### subtract canceled results.
745}
746
747bool QFutureInterfaceBasePrivate::internal_isResultReadyAt(int index) const
748{
749 return hasException ? false : (data.m_results.contains(index));
750}
751
752bool QFutureInterfaceBasePrivate::internal_waitForNextResult()
753{
754 if (hasException)
755 return false;
756
757 if (data.m_results.hasNextResult())
758 return true;
759
760 while ((state.loadRelaxed() & QFutureInterfaceBase::Running)
761 && data.m_results.hasNextResult() == false)
762 waitCondition.wait(lockedMutex: &m_mutex);
763
764 return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled)
765 && data.m_results.hasNextResult();
766}
767
768bool QFutureInterfaceBasePrivate::internal_updateProgressValue(int progress)
769{
770 if (m_progressValue >= progress)
771 return false;
772
773 m_progressValue = progress;
774
775 if (progressTime.isValid() && m_progressValue != 0) // make sure the first and last steps are emitted.
776 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
777 return false;
778
779 progressTime.start();
780 return true;
781
782}
783
784bool QFutureInterfaceBasePrivate::internal_updateProgress(int progress,
785 const QString &progressText)
786{
787 if (m_progressValue >= progress)
788 return false;
789
790 Q_ASSERT(m_progress);
791
792 m_progressValue = progress;
793 m_progress->text = progressText;
794
795 if (progressTime.isValid() && m_progressValue != m_progress->maximum) // make sure the first and last steps are emitted.
796 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
797 return false;
798
799 progressTime.start();
800 return true;
801}
802
803void QFutureInterfaceBasePrivate::internal_setThrottled(bool enable)
804{
805 // bail out if we are not changing the state
806 if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled))
807 || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled)))
808 return;
809
810 // change the state
811 if (enable) {
812 switch_on(a&: state, which: QFutureInterfaceBase::Throttled);
813 } else {
814 switch_off(a&: state, which: QFutureInterfaceBase::Throttled);
815 if (!(state.loadRelaxed() & suspendingOrSuspended))
816 pausedWaitCondition.wakeAll();
817 }
818}
819
820void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent)
821{
822 if (outputConnections.isEmpty())
823 return;
824
825 for (int i = 0; i < outputConnections.size(); ++i)
826 outputConnections.at(i)->postCallOutEvent(callOutEvent);
827}
828
829void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1,
830 const QFutureCallOutEvent &callOutEvent2)
831{
832 if (outputConnections.isEmpty())
833 return;
834
835 for (int i = 0; i < outputConnections.size(); ++i) {
836 QFutureCallOutInterface *interface = outputConnections.at(i);
837 interface->postCallOutEvent(callOutEvent1);
838 interface->postCallOutEvent(callOutEvent2);
839 }
840}
841
842// This function connects an output interface (for example a QFutureWatcher)
843// to this future. While holding the lock we check the state and ready results
844// and add the appropriate callouts to the queue. In order to avoid deadlocks,
845// the actual callouts are made at the end while not holding the lock.
846void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *interface)
847{
848 QMutexLocker locker(&m_mutex);
849
850 QVarLengthArray<std::unique_ptr<QFutureCallOutEvent>, 3> events;
851
852 const auto currentState = state.loadRelaxed();
853 if (currentState & QFutureInterfaceBase::Started) {
854 events.emplace_back(args: new QFutureCallOutEvent(QFutureCallOutEvent::Started));
855 if (m_progress) {
856 events.emplace_back(args: new QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
857 m_progress->minimum,
858 m_progress->maximum));
859 events.emplace_back(args: new QFutureCallOutEvent(QFutureCallOutEvent::Progress,
860 m_progressValue,
861 m_progress->text));
862 } else {
863 events.emplace_back(args: new QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
864 0,
865 0));
866 events.emplace_back(args: new QFutureCallOutEvent(QFutureCallOutEvent::Progress,
867 m_progressValue,
868 QString()));
869 }
870 }
871
872 if (!hasException) {
873 QtPrivate::ResultIteratorBase it = data.m_results.begin();
874 while (it != data.m_results.end()) {
875 const int begin = it.resultIndex();
876 const int end = begin + it.batchSize();
877 events.emplace_back(args: new QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
878 begin,
879 end));
880 it.batchedAdvance();
881 }
882 }
883
884 if (currentState & QFutureInterfaceBase::Suspended)
885 events.emplace_back(args: new QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
886 else if (currentState & QFutureInterfaceBase::Suspending)
887 events.emplace_back(args: new QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
888
889 if (currentState & QFutureInterfaceBase::Canceled)
890 events.emplace_back(args: new QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
891
892 if (currentState & QFutureInterfaceBase::Finished)
893 events.emplace_back(args: new QFutureCallOutEvent(QFutureCallOutEvent::Finished));
894
895 outputConnections.append(t: interface);
896
897 locker.unlock();
898 for (auto &&event : events)
899 interface->postCallOutEvent(*event);
900}
901
902void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *interface)
903{
904 QMutexLocker lock(&m_mutex);
905 const qsizetype index = outputConnections.indexOf(t: interface);
906 if (index == -1)
907 return;
908 outputConnections.removeAt(i: index);
909
910 interface->callOutInterfaceDisconnected();
911}
912
913void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
914{
915 state.storeRelaxed(newValue: newState);
916}
917
918void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInterfaceBase &)> func)
919{
920 setContinuation(func: std::move(func), continuationFutureData: nullptr);
921}
922
923void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInterfaceBase &)> func,
924 QFutureInterfaceBasePrivate *continuationFutureData)
925{
926 QMutexLocker lock(&d->continuationMutex);
927
928 // If the state is ready, run continuation immediately,
929 // otherwise save it for later.
930 if (isFinished()) {
931 lock.unlock();
932 func(*this);
933 lock.relock();
934 }
935 // Unless the continuation has been cleaned earlier, we have to
936 // store the move-only continuation, to guarantee that the associated
937 // future's data stays alive.
938 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) {
939 if (d->continuation) {
940 qWarning() << "Adding a continuation to a future which already has a continuation. "
941 "The existing continuation is overwritten.";
942 }
943 d->continuation = std::move(func);
944 d->continuationData = continuationFutureData;
945 }
946}
947
948void QFutureInterfaceBase::cleanContinuation()
949{
950 if (!d)
951 return;
952
953 QMutexLocker lock(&d->continuationMutex);
954 d->continuation = nullptr;
955 d->continuationState = QFutureInterfaceBasePrivate::Cleaned;
956 d->continuationData = nullptr;
957}
958
959void QFutureInterfaceBase::runContinuation() const
960{
961 QMutexLocker lock(&d->continuationMutex);
962 if (d->continuation) {
963 // Save the continuation in a local function, to avoid calling
964 // a null std::function below, in case cleanContinuation() is
965 // called from some other thread right after unlock() below.
966 auto fn = std::move(d->continuation);
967 lock.unlock();
968 fn(*this);
969
970 lock.relock();
971 // Unless the continuation has been cleaned earlier, we have to
972 // store the move-only continuation, to guarantee that the associated
973 // future's data stays alive.
974 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned)
975 d->continuation = std::move(fn);
976 }
977}
978
979bool QFutureInterfaceBase::isChainCanceled() const
980{
981 return isCanceled() || d->continuationState == QFutureInterfaceBasePrivate::Canceled;
982}
983
984void QFutureInterfaceBase::setLaunchAsync(bool value)
985{
986 d->launchAsync = value;
987}
988
989bool QFutureInterfaceBase::launchAsync() const
990{
991 return d->launchAsync;
992}
993
994namespace QtFuture {
995
996QFuture<void> makeReadyVoidFuture()
997{
998 QFutureInterface<void> promise;
999 promise.reportStarted();
1000 promise.reportFinished();
1001
1002 return promise.future();
1003}
1004
1005} // namespace QtFuture
1006
1007QT_END_NAMESPACE
1008
1009#include "qfutureinterface.moc"
1010

source code of qtbase/src/corelib/thread/qfutureinterface.cpp