1// Copyright (C) 2020 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4// qfutureinterface.h included from qfuture.h
5#include "qfuture.h"
6#include "qfutureinterface_p.h"
7
8#include <QtCore/qatomic.h>
9#include <QtCore/qcoreapplication.h>
10#include <QtCore/qthread.h>
11#include <QtCore/qvarlengtharray.h>
12#include <private/qthreadpool_p.h>
13#include <private/qobject_p.h>
14
15#include <climits> // For INT_MAX
16
17// GCC 12 gets confused about QFutureInterfaceBase::state, for some non-obvious
18// reason
19// warning: ‘unsigned int __atomic_or_fetch_4(volatile void*, unsigned int, int)’ writing 4 bytes into a region of size 0 overflows the destination [-Wstringop-overflow=]
20QT_WARNING_DISABLE_GCC("-Wstringop-overflow")
21
22QT_BEGIN_NAMESPACE
23
24enum {
25 MaxProgressEmitsPerSecond = 25
26};
27
28namespace {
29class ThreadPoolThreadReleaser {
30 QThreadPool *m_pool;
31public:
32 Q_NODISCARD_CTOR
33 explicit ThreadPoolThreadReleaser(QThreadPool *pool)
34 : m_pool(pool)
35 { if (pool) pool->releaseThread(); }
36 ~ThreadPoolThreadReleaser()
37 { if (m_pool) m_pool->reserveThread(); }
38};
39
40const auto suspendingOrSuspended =
41 QFutureInterfaceBase::Suspending | QFutureInterfaceBase::Suspended;
42
43} // unnamed namespace
44
45class QObjectContinuationWrapper : public QObject
46{
47 Q_OBJECT
48public:
49 explicit QObjectContinuationWrapper(QObject *parent = nullptr)
50 : QObject(parent)
51 {
52 }
53
54signals:
55 void run();
56};
57
58void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *slotObj,
59 QFutureInterfaceBase &fi)
60{
61 Q_ASSERT(context);
62 Q_ASSERT(slotObj);
63
64 auto slot = SlotObjUniquePtr(slotObj);
65
66 auto *watcher = new QObjectContinuationWrapper;
67 watcher->moveToThread(thread: context->thread());
68
69 // We need to protect acccess to the watcher. The context object (and in turn, the watcher)
70 // could be destroyed while the continuation that emits the signal is running. We have to
71 // prevent that.
72 // The mutex has to be recursive, because the continuation itself could delete the context
73 // object (and thus the watcher), which will try to lock the mutex from the same thread twice.
74 auto watcherMutex = std::make_shared<QRecursiveMutex>();
75 const auto destroyWatcher = [watcherMutex, watcher]() mutable {
76 QMutexLocker lock(watcherMutex.get());
77 delete watcher;
78 };
79
80 // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`...
81 QObject::connect(sender: watcher, signal: &QObjectContinuationWrapper::run,
82 // for the following, cf. QMetaObject::invokeMethodImpl():
83 // we know `slot` is a lambda returning `void`, so we can just
84 // `call()` with `obj` and `args[0]` set to `nullptr`:
85 context, slot: [slot = std::move(slot)] {
86 void *args[] = { nullptr }; // for `void` return value
87 slot->call(r: nullptr, a: args);
88 });
89 QObject::connect(sender: watcher, signal: &QObjectContinuationWrapper::run, context: watcher, slot: destroyWatcher);
90
91 // We need to connect to destroyWatcher here, instead of delete or deleteLater().
92 // If the continuation is called from a separate thread, emit watcher->run() can't detect that
93 // the watcher has been deleted in the separate thread, causing a race condition and potential
94 // heap-use-after-free issue inside QObject::doActivate. destroyWatcher forces the deletion of
95 // the watcher to occur after emit watcher->run() completes and prevents the race condition.
96 QObject::connect(sender: context, signal: &QObject::destroyed, context: watcher, slot: destroyWatcher);
97
98 fi.setContinuation([watcherMutex, watcher = QPointer(watcher)]
99 (const QFutureInterfaceBase &parentData)
100 {
101 Q_UNUSED(parentData);
102 QMutexLocker lock(watcherMutex.get());
103 if (watcher)
104 emit watcher->run();
105 });
106}
107
108QFutureCallOutInterface::~QFutureCallOutInterface()
109 = default;
110
111Q_IMPL_EVENT_COMMON(QFutureCallOutEvent)
112
113QFutureInterfaceBase::QFutureInterfaceBase(State initialState)
114 : d(new QFutureInterfaceBasePrivate(initialState))
115{ }
116
117QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other)
118 : d(other.d)
119{
120 d->refCount.ref();
121}
122
123QFutureInterfaceBase::~QFutureInterfaceBase()
124{
125 if (d && !d->refCount.deref())
126 delete d;
127}
128
129static inline int switch_on(QAtomicInt &a, int which)
130{
131 return a.fetchAndOrRelaxed(valueToAdd: which) | which;
132}
133
134static inline int switch_off(QAtomicInt &a, int which)
135{
136 return a.fetchAndAndRelaxed(valueToAdd: ~which) & ~which;
137}
138
139static inline int switch_from_to(QAtomicInt &a, int from, int to)
140{
141 const auto adjusted = [&](int old) { return (old & ~from) | to; };
142 int value = a.loadRelaxed();
143 while (!a.testAndSetRelaxed(expectedValue: value, newValue: adjusted(value), currentValue&: value))
144 qYieldCpu();
145 return value;
146}
147
148void QFutureInterfaceBase::cancel()
149{
150 cancel(mode: CancelMode::CancelOnly);
151}
152
153void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode)
154{
155 QMutexLocker locker(&d->m_mutex);
156
157 const auto oldState = d->state.loadRelaxed();
158
159 switch (mode) {
160 case CancelMode::CancelAndFinish:
161 if ((oldState & Finished) && (oldState & Canceled))
162 return;
163 switch_from_to(a&: d->state, from: suspendingOrSuspended | Running, to: Canceled | Finished);
164 break;
165 case CancelMode::CancelOnly:
166 if (oldState & Canceled)
167 return;
168 switch_from_to(a&: d->state, from: suspendingOrSuspended, to: Canceled);
169 break;
170 }
171
172 // Cancel the continuations chain
173 QFutureInterfaceBasePrivate *next = d->continuationData;
174 while (next) {
175 next->continuationState = QFutureInterfaceBasePrivate::Canceled;
176 next = next->continuationData;
177 }
178
179 d->waitCondition.wakeAll();
180 d->pausedWaitCondition.wakeAll();
181
182 if (!(oldState & Canceled))
183 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
184 if (mode == CancelMode::CancelAndFinish && !(oldState & Finished))
185 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Finished));
186
187 d->isValid = false;
188}
189
190void QFutureInterfaceBase::setSuspended(bool suspend)
191{
192 QMutexLocker locker(&d->m_mutex);
193 if (suspend) {
194 switch_on(a&: d->state, which: Suspending);
195 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
196 } else {
197 switch_off(a&: d->state, which: suspendingOrSuspended);
198 d->pausedWaitCondition.wakeAll();
199 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
200 }
201}
202
203void QFutureInterfaceBase::toggleSuspended()
204{
205 QMutexLocker locker(&d->m_mutex);
206 if (d->state.loadRelaxed() & suspendingOrSuspended) {
207 switch_off(a&: d->state, which: suspendingOrSuspended);
208 d->pausedWaitCondition.wakeAll();
209 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
210 } else {
211 switch_on(a&: d->state, which: Suspending);
212 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
213 }
214}
215
216void QFutureInterfaceBase::reportSuspended() const
217{
218 // Needs to be called when pause is in effect,
219 // i.e. no more events will be reported.
220
221 QMutexLocker locker(&d->m_mutex);
222 const int state = d->state.loadRelaxed();
223 if (!(state & Suspending) || (state & Suspended))
224 return;
225
226 switch_from_to(a&: d->state, from: Suspending, to: Suspended);
227 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
228}
229
230void QFutureInterfaceBase::setThrottled(bool enable)
231{
232 QMutexLocker lock(&d->m_mutex);
233 if (enable) {
234 switch_on(a&: d->state, which: Throttled);
235 } else {
236 switch_off(a&: d->state, which: Throttled);
237 if (!(d->state.loadRelaxed() & suspendingOrSuspended))
238 d->pausedWaitCondition.wakeAll();
239 }
240}
241
242
243bool QFutureInterfaceBase::isRunning() const
244{
245 return queryState(state: Running);
246}
247
248bool QFutureInterfaceBase::isStarted() const
249{
250 return queryState(state: Started);
251}
252
253bool QFutureInterfaceBase::isCanceled() const
254{
255 return queryState(state: Canceled);
256}
257
258bool QFutureInterfaceBase::isFinished() const
259{
260 return queryState(state: Finished);
261}
262
263bool QFutureInterfaceBase::isSuspending() const
264{
265 return queryState(state: Suspending);
266}
267
268#if QT_DEPRECATED_SINCE(6, 0)
269bool QFutureInterfaceBase::isPaused() const
270{
271 return queryState(state: static_cast<State>(suspendingOrSuspended));
272}
273#endif
274
275bool QFutureInterfaceBase::isSuspended() const
276{
277 return queryState(state: Suspended);
278}
279
280bool QFutureInterfaceBase::isThrottled() const
281{
282 return queryState(state: Throttled);
283}
284
285bool QFutureInterfaceBase::isResultReadyAt(int index) const
286{
287 QMutexLocker lock(&d->m_mutex);
288 return d->internal_isResultReadyAt(index);
289}
290
291bool QFutureInterfaceBase::isValid() const
292{
293 const QMutexLocker lock(&d->m_mutex);
294 return d->isValid;
295}
296
297bool QFutureInterfaceBase::isRunningOrPending() const
298{
299 return queryState(state: static_cast<State>(Running | Pending));
300}
301
302bool QFutureInterfaceBase::waitForNextResult()
303{
304 QMutexLocker lock(&d->m_mutex);
305 return d->internal_waitForNextResult();
306}
307
308void QFutureInterfaceBase::waitForResume()
309{
310 // return early if possible to avoid taking the mutex lock.
311 {
312 const int state = d->state.loadRelaxed();
313 if (!(state & suspendingOrSuspended) || (state & Canceled))
314 return;
315 }
316
317 QMutexLocker lock(&d->m_mutex);
318 const int state = d->state.loadRelaxed();
319 if (!(state & suspendingOrSuspended) || (state & Canceled))
320 return;
321
322 // decrease active thread count since this thread will wait.
323 const ThreadPoolThreadReleaser releaser(d->pool());
324
325 d->pausedWaitCondition.wait(lockedMutex: &d->m_mutex);
326}
327
328void QFutureInterfaceBase::suspendIfRequested()
329{
330 const auto canSuspend = [] (int state) {
331 // can suspend only if 1) in any suspend-related state; 2) not canceled
332 return (state & suspendingOrSuspended) && !(state & Canceled);
333 };
334
335 // return early if possible to avoid taking the mutex lock.
336 {
337 const int state = d->state.loadRelaxed();
338 if (!canSuspend(state))
339 return;
340 }
341
342 QMutexLocker lock(&d->m_mutex);
343 const int state = d->state.loadRelaxed();
344 if (!canSuspend(state))
345 return;
346
347 // Note: expecting that Suspending and Suspended are mutually exclusive
348 if (!(state & Suspended)) {
349 // switch state in case this is the first invocation
350 switch_from_to(a&: d->state, from: Suspending, to: Suspended);
351 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
352 }
353
354 // decrease active thread count since this thread will wait.
355 const ThreadPoolThreadReleaser releaser(d->pool());
356 d->pausedWaitCondition.wait(lockedMutex: &d->m_mutex);
357}
358
359int QFutureInterfaceBase::progressValue() const
360{
361 const QMutexLocker lock(&d->m_mutex);
362 return d->m_progressValue;
363}
364
365int QFutureInterfaceBase::progressMinimum() const
366{
367 const QMutexLocker lock(&d->m_mutex);
368 return d->m_progress ? d->m_progress->minimum : 0;
369}
370
371int QFutureInterfaceBase::progressMaximum() const
372{
373 const QMutexLocker lock(&d->m_mutex);
374 return d->m_progress ? d->m_progress->maximum : 0;
375}
376
377int QFutureInterfaceBase::resultCount() const
378{
379 QMutexLocker lock(&d->m_mutex);
380 return d->internal_resultCount();
381}
382
383QString QFutureInterfaceBase::progressText() const
384{
385 QMutexLocker locker(&d->m_mutex);
386 return d->m_progress ? d->m_progress->text : QString();
387}
388
389bool QFutureInterfaceBase::isProgressUpdateNeeded() const
390{
391 QMutexLocker locker(&d->m_mutex);
392 return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
393}
394
395void QFutureInterfaceBase::reportStarted()
396{
397 QMutexLocker locker(&d->m_mutex);
398 if (d->state.loadRelaxed() & (Started|Canceled|Finished))
399 return;
400 d->setState(State(Started | Running));
401 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Started));
402 d->isValid = true;
403}
404
405void QFutureInterfaceBase::reportCanceled()
406{
407 cancel();
408}
409
410#ifndef QT_NO_EXCEPTIONS
411void QFutureInterfaceBase::reportException(const QException &exception)
412{
413 try {
414 exception.raise();
415 } catch (...) {
416 reportException(e: std::current_exception());
417 }
418}
419
420#if QT_VERSION < QT_VERSION_CHECK(7, 0, 0)
421void QFutureInterfaceBase::reportException(std::exception_ptr exception)
422#else
423void QFutureInterfaceBase::reportException(const std::exception_ptr &exception)
424#endif
425{
426 QMutexLocker locker(&d->m_mutex);
427 if (d->state.loadRelaxed() & (Canceled|Finished))
428 return;
429
430 d->hasException = true;
431 d->data.setException(exception);
432 switch_on(a&: d->state, which: Canceled);
433 d->waitCondition.wakeAll();
434 d->pausedWaitCondition.wakeAll();
435 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
436}
437#endif
438
439void QFutureInterfaceBase::reportFinished()
440{
441 QMutexLocker locker(&d->m_mutex);
442 if (!isFinished()) {
443 switch_from_to(a&: d->state, from: Running, to: Finished);
444 d->waitCondition.wakeAll();
445 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Finished));
446 }
447}
448
449void QFutureInterfaceBase::setExpectedResultCount(int resultCount)
450{
451 if (d->m_progress)
452 setProgressRange(minimum: 0, maximum: resultCount);
453 d->m_expectedResultCount = resultCount;
454}
455
456int QFutureInterfaceBase::expectedResultCount()
457{
458 return d->m_expectedResultCount;
459}
460
461bool QFutureInterfaceBase::queryState(State state) const
462{
463 return d->state.loadRelaxed() & state;
464}
465
466int QFutureInterfaceBase::loadState() const
467{
468 // Used from ~QPromise, so this check is needed
469 if (!d)
470 return QFutureInterfaceBase::State::NoState;
471 return d->state.loadRelaxed();
472}
473
474void QFutureInterfaceBase::waitForResult(int resultIndex)
475{
476 if (d->hasException)
477 d->data.m_exceptionStore.rethrowException();
478
479 QMutexLocker lock(&d->m_mutex);
480 if (!isRunningOrPending())
481 return;
482 lock.unlock();
483
484 // To avoid deadlocks and reduce the number of threads used, try to
485 // run the runnable in the current thread.
486 d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable);
487
488 lock.relock();
489
490 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
491 while (isRunningOrPending() && !d->internal_isResultReadyAt(index: waitIndex))
492 d->waitCondition.wait(lockedMutex: &d->m_mutex);
493
494 if (d->hasException)
495 d->data.m_exceptionStore.rethrowException();
496}
497
498void QFutureInterfaceBase::waitForFinished()
499{
500 QMutexLocker lock(&d->m_mutex);
501 const bool alreadyFinished = isFinished();
502 lock.unlock();
503
504 if (!alreadyFinished) {
505 d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable);
506
507 lock.relock();
508
509 while (!isFinished())
510 d->waitCondition.wait(lockedMutex: &d->m_mutex);
511 }
512
513 if (d->hasException)
514 d->data.m_exceptionStore.rethrowException();
515}
516
517void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
518{
519 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
520 return;
521
522 d->waitCondition.wakeAll();
523
524 if (!d->m_progress) {
525 if (d->internal_updateProgressValue(progress: d->m_progressValue + endIndex - beginIndex) == false) {
526 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
527 beginIndex,
528 endIndex));
529 return;
530 }
531
532 d->sendCallOuts(callOut1: QFutureCallOutEvent(QFutureCallOutEvent::Progress,
533 d->m_progressValue,
534 QString()),
535 callOut2: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
536 beginIndex,
537 endIndex));
538 return;
539 }
540 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
541}
542
543void QFutureInterfaceBase::setRunnable(QRunnable *runnable)
544{
545 d->runnable = runnable;
546}
547
548void QFutureInterfaceBase::setThreadPool(QThreadPool *pool)
549{
550 d->m_pool = pool;
551}
552
553QThreadPool *QFutureInterfaceBase::threadPool() const
554{
555 return d->m_pool;
556}
557
558void QFutureInterfaceBase::setFilterMode(bool enable)
559{
560 QMutexLocker locker(&d->m_mutex);
561 if (!hasException())
562 resultStoreBase().setFilterMode(enable);
563}
564
565/*!
566 \internal
567 Sets the progress range's minimum and maximum values to \a minimum and
568 \a maximum respectively.
569
570 If \a maximum is smaller than \a minimum, \a minimum becomes the only
571 legal value.
572
573 The progress value is reset to be \a minimum.
574
575 The progress range usage can be disabled by using setProgressRange(0, 0).
576 In this case progress value is also reset to 0.
577
578 The behavior of this method is mostly inspired by
579 \l QProgressBar::setRange.
580*/
581void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
582{
583 QMutexLocker locker(&d->m_mutex);
584 if (!d->m_progress)
585 d->m_progress.reset(other: new QFutureInterfaceBasePrivate::ProgressData());
586 d->m_progress->minimum = minimum;
587 d->m_progress->maximum = qMax(a: minimum, b: maximum);
588 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
589 d->m_progressValue = minimum;
590}
591
592void QFutureInterfaceBase::setProgressValue(int progressValue)
593{
594 setProgressValueAndText(progressValue, progressText: QString());
595}
596
597/*!
598 \internal
599 In case of the \a progressValue falling out of the progress range,
600 this method has no effect.
601 Such behavior is inspired by \l QProgressBar::setValue.
602*/
603void QFutureInterfaceBase::setProgressValueAndText(int progressValue,
604 const QString &progressText)
605{
606 QMutexLocker locker(&d->m_mutex);
607 if (!d->m_progress)
608 d->m_progress.reset(other: new QFutureInterfaceBasePrivate::ProgressData());
609
610 const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0);
611 if (useProgressRange
612 && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) {
613 return;
614 }
615
616 if (d->m_progressValue >= progressValue)
617 return;
618
619 if (d->state.loadRelaxed() & (Canceled|Finished))
620 return;
621
622 if (d->internal_updateProgress(progress: progressValue, progressText)) {
623 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Progress,
624 d->m_progressValue,
625 d->m_progress->text));
626 }
627}
628
629QMutex &QFutureInterfaceBase::mutex() const
630{
631 return d->m_mutex;
632}
633
634bool QFutureInterfaceBase::hasException() const
635{
636 return d->hasException;
637}
638
639QtPrivate::ExceptionStore &QFutureInterfaceBase::exceptionStore()
640{
641 Q_ASSERT(d->hasException);
642 return d->data.m_exceptionStore;
643}
644
645QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase()
646{
647 Q_ASSERT(!d->hasException);
648 return d->data.m_results;
649}
650
651const QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const
652{
653 Q_ASSERT(!d->hasException);
654 return d->data.m_results;
655}
656
657QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other)
658{
659 QFutureInterfaceBase copy(other);
660 swap(other&: copy);
661 return *this;
662}
663
664// ### Qt 7: inline
665void QFutureInterfaceBase::swap(QFutureInterfaceBase &other) noexcept
666{
667 qSwap(value1&: d, value2&: other.d);
668}
669
670bool QFutureInterfaceBase::refT() const noexcept
671{
672 return d->refCount.refT();
673}
674
675bool QFutureInterfaceBase::derefT() const noexcept
676{
677 // Called from ~QFutureInterface
678 return !d || d->refCount.derefT();
679}
680
681void QFutureInterfaceBase::reset()
682{
683 d->m_progressValue = 0;
684 d->m_progress.reset();
685 d->progressTime.invalidate();
686 d->isValid = false;
687}
688
689void QFutureInterfaceBase::rethrowPossibleException()
690{
691 if (hasException())
692 exceptionStore().rethrowException();
693}
694
695QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
696 : state(initialState)
697{
698 progressTime.invalidate();
699}
700
701QFutureInterfaceBasePrivate::~QFutureInterfaceBasePrivate()
702{
703 if (hasException)
704 data.m_exceptionStore.~ExceptionStore();
705 else
706 data.m_results.~ResultStoreBase();
707}
708
709int QFutureInterfaceBasePrivate::internal_resultCount() const
710{
711 return hasException ? 0 : data.m_results.count(); // ### subtract canceled results.
712}
713
714bool QFutureInterfaceBasePrivate::internal_isResultReadyAt(int index) const
715{
716 return hasException ? false : (data.m_results.contains(index));
717}
718
719bool QFutureInterfaceBasePrivate::internal_waitForNextResult()
720{
721 if (hasException)
722 return false;
723
724 if (data.m_results.hasNextResult())
725 return true;
726
727 while ((state.loadRelaxed() & QFutureInterfaceBase::Running)
728 && data.m_results.hasNextResult() == false)
729 waitCondition.wait(lockedMutex: &m_mutex);
730
731 return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled)
732 && data.m_results.hasNextResult();
733}
734
735bool QFutureInterfaceBasePrivate::internal_updateProgressValue(int progress)
736{
737 if (m_progressValue >= progress)
738 return false;
739
740 m_progressValue = progress;
741
742 if (progressTime.isValid() && m_progressValue != 0) // make sure the first and last steps are emitted.
743 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
744 return false;
745
746 progressTime.start();
747 return true;
748
749}
750
751bool QFutureInterfaceBasePrivate::internal_updateProgress(int progress,
752 const QString &progressText)
753{
754 if (m_progressValue >= progress)
755 return false;
756
757 Q_ASSERT(m_progress);
758
759 m_progressValue = progress;
760 m_progress->text = progressText;
761
762 if (progressTime.isValid() && m_progressValue != m_progress->maximum) // make sure the first and last steps are emitted.
763 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
764 return false;
765
766 progressTime.start();
767 return true;
768}
769
770void QFutureInterfaceBasePrivate::internal_setThrottled(bool enable)
771{
772 // bail out if we are not changing the state
773 if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled))
774 || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled)))
775 return;
776
777 // change the state
778 if (enable) {
779 switch_on(a&: state, which: QFutureInterfaceBase::Throttled);
780 } else {
781 switch_off(a&: state, which: QFutureInterfaceBase::Throttled);
782 if (!(state.loadRelaxed() & suspendingOrSuspended))
783 pausedWaitCondition.wakeAll();
784 }
785}
786
787void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent)
788{
789 if (outputConnections.isEmpty())
790 return;
791
792 for (int i = 0; i < outputConnections.size(); ++i)
793 outputConnections.at(i)->postCallOutEvent(callOutEvent);
794}
795
796void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1,
797 const QFutureCallOutEvent &callOutEvent2)
798{
799 if (outputConnections.isEmpty())
800 return;
801
802 for (int i = 0; i < outputConnections.size(); ++i) {
803 QFutureCallOutInterface *iface = outputConnections.at(i);
804 iface->postCallOutEvent(callOutEvent1);
805 iface->postCallOutEvent(callOutEvent2);
806 }
807}
808
809// This function connects an output interface (for example a QFutureWatcher)
810// to this future. While holding the lock we check the state and ready results
811// and add the appropriate callouts to the queue.
812void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *iface)
813{
814 QMutexLocker locker(&m_mutex);
815
816 const auto currentState = state.loadRelaxed();
817 if (currentState & QFutureInterfaceBase::Started) {
818 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
819 if (m_progress) {
820 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
821 m_progress->minimum,
822 m_progress->maximum));
823 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
824 m_progressValue,
825 m_progress->text));
826 } else {
827 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
828 0,
829 0));
830 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
831 m_progressValue,
832 QString()));
833 }
834 }
835
836 if (!hasException) {
837 QtPrivate::ResultIteratorBase it = data.m_results.begin();
838 while (it != data.m_results.end()) {
839 const int begin = it.resultIndex();
840 const int end = begin + it.batchSize();
841 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
842 begin,
843 end));
844 it.batchedAdvance();
845 }
846 }
847
848 if (currentState & QFutureInterfaceBase::Suspended)
849 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
850 else if (currentState & QFutureInterfaceBase::Suspending)
851 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
852
853 if (currentState & QFutureInterfaceBase::Canceled)
854 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
855
856 if (currentState & QFutureInterfaceBase::Finished)
857 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
858
859 outputConnections.append(t: iface);
860}
861
862void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *iface)
863{
864 QMutexLocker lock(&m_mutex);
865 const qsizetype index = outputConnections.indexOf(t: iface);
866 if (index == -1)
867 return;
868 outputConnections.removeAt(i: index);
869
870 iface->callOutInterfaceDisconnected();
871}
872
873void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
874{
875 state.storeRelaxed(newValue: newState);
876}
877
878void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInterfaceBase &)> func)
879{
880 setContinuation(func: std::move(func), continuationFutureData: nullptr);
881}
882
883void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInterfaceBase &)> func,
884 QFutureInterfaceBasePrivate *continuationFutureData)
885{
886 QMutexLocker lock(&d->continuationMutex);
887
888 // If the state is ready, run continuation immediately,
889 // otherwise save it for later.
890 if (isFinished()) {
891 d->continuationExecuted = true;
892 lock.unlock();
893 func(*this);
894 lock.relock();
895 }
896 // Unless the continuation has been cleaned earlier, we have to
897 // store the move-only continuation, to guarantee that the associated
898 // future's data stays alive.
899 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) {
900 if (d->continuation) {
901 qWarning() << "Adding a continuation to a future which already has a continuation. "
902 "The existing continuation is overwritten.";
903 }
904 d->continuation = std::move(func);
905 d->continuationData = continuationFutureData;
906 }
907}
908
909void QFutureInterfaceBase::cleanContinuation()
910{
911 if (!d)
912 return;
913
914 QMutexLocker lock(&d->continuationMutex);
915 d->continuation = nullptr;
916 d->continuationState = QFutureInterfaceBasePrivate::Cleaned;
917 d->continuationData = nullptr;
918}
919
920void QFutureInterfaceBase::runContinuation() const
921{
922 QMutexLocker lock(&d->continuationMutex);
923 if (d->continuation && !d->continuationExecuted) {
924 // Save the continuation in a local function, to avoid calling
925 // a null std::function below, in case cleanContinuation() is
926 // called from some other thread right after unlock() below.
927 d->continuationExecuted = true;
928 auto fn = std::move(d->continuation);
929 lock.unlock();
930 fn(*this);
931
932 lock.relock();
933 // Unless the continuation has been cleaned earlier, we have to
934 // store the move-only continuation, to guarantee that the associated
935 // future's data stays alive.
936 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned)
937 d->continuation = std::move(fn);
938 }
939}
940
941bool QFutureInterfaceBase::isChainCanceled() const
942{
943 return isCanceled() || d->continuationState == QFutureInterfaceBasePrivate::Canceled;
944}
945
946void QFutureInterfaceBase::setLaunchAsync(bool value)
947{
948 d->launchAsync = value;
949}
950
951bool QFutureInterfaceBase::launchAsync() const
952{
953 return d->launchAsync;
954}
955
956namespace QtFuture {
957
958QFuture<void> makeReadyVoidFuture()
959{
960 QFutureInterface<void> promise;
961 promise.reportStarted();
962 promise.reportFinished();
963
964 return promise.future();
965}
966
967} // namespace QtFuture
968
969QT_END_NAMESPACE
970
971#include "qfutureinterface.moc"
972

source code of qtbase/src/corelib/thread/qfutureinterface.cpp