1// Copyright (C) 2020 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4// qfutureinterface.h included from qfuture.h
5#include "qfuture.h"
6#include "qfutureinterface_p.h"
7
8#include <QtCore/qatomic.h>
9#include <QtCore/qcoreapplication.h>
10#include <QtCore/qthread.h>
11#include <QtCore/qvarlengtharray.h>
12#include <private/qthreadpool_p.h>
13#include <private/qobject_p.h>
14
15// GCC 12 gets confused about QFutureInterfaceBase::state, for some non-obvious
16// reason
17// warning: ‘unsigned int __atomic_or_fetch_4(volatile void*, unsigned int, int)’ writing 4 bytes into a region of size 0 overflows the destination [-Wstringop-overflow=]
18QT_WARNING_DISABLE_GCC("-Wstringop-overflow")
19
20QT_BEGIN_NAMESPACE
21
22enum {
23 MaxProgressEmitsPerSecond = 25
24};
25
26namespace {
27class ThreadPoolThreadReleaser {
28 QThreadPool *m_pool;
29public:
30 Q_NODISCARD_CTOR
31 explicit ThreadPoolThreadReleaser(QThreadPool *pool)
32 : m_pool(pool)
33 { if (pool) pool->releaseThread(); }
34 ~ThreadPoolThreadReleaser()
35 { if (m_pool) m_pool->reserveThread(); }
36};
37
38const auto suspendingOrSuspended =
39 QFutureInterfaceBase::Suspending | QFutureInterfaceBase::Suspended;
40
41} // unnamed namespace
42
43class QObjectContinuationWrapper : public QObject
44{
45 Q_OBJECT
46public:
47 explicit QObjectContinuationWrapper(QObject *parent = nullptr)
48 : QObject(parent)
49 {
50 }
51
52signals:
53 void run();
54};
55
56void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *slotObj,
57 QFutureInterfaceBase &fi)
58{
59 Q_ASSERT(context);
60 Q_ASSERT(slotObj);
61
62 auto slot = SlotObjUniquePtr(slotObj);
63
64 auto *watcher = new QObjectContinuationWrapper;
65 watcher->moveToThread(thread: context->thread());
66
67 // We need to protect acccess to the watcher. The context object (and in turn, the watcher)
68 // could be destroyed while the continuation that emits the signal is running. We have to
69 // prevent that.
70 // The mutex has to be recursive, because the continuation itself could delete the context
71 // object (and thus the watcher), which will try to lock the mutex from the same thread twice.
72 auto watcherMutex = std::make_shared<QRecursiveMutex>();
73 const auto destroyWatcher = [watcherMutex, watcher]() mutable {
74 QMutexLocker lock(watcherMutex.get());
75 delete watcher;
76 };
77
78 // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`...
79 QObject::connect(sender: watcher, signal: &QObjectContinuationWrapper::run,
80 // for the following, cf. QMetaObject::invokeMethodImpl():
81 // we know `slot` is a lambda returning `void`, so we can just
82 // `call()` with `obj` and `args[0]` set to `nullptr`:
83 context, slot: [slot = std::move(slot)] {
84 void *args[] = { nullptr }; // for `void` return value
85 slot->call(r: nullptr, a: args);
86 });
87 QObject::connect(sender: watcher, signal: &QObjectContinuationWrapper::run, context: watcher, slot: destroyWatcher);
88
89 // We need to connect to destroyWatcher here, instead of delete or deleteLater().
90 // If the continuation is called from a separate thread, emit watcher->run() can't detect that
91 // the watcher has been deleted in the separate thread, causing a race condition and potential
92 // heap-use-after-free issue inside QObject::doActivate. destroyWatcher forces the deletion of
93 // the watcher to occur after emit watcher->run() completes and prevents the race condition.
94 QObject::connect(sender: context, signal: &QObject::destroyed, context: watcher, slot: destroyWatcher);
95
96 fi.setContinuation([watcherMutex, watcher = QPointer(watcher)]
97 (const QFutureInterfaceBase &parentData)
98 {
99 Q_UNUSED(parentData);
100 QMutexLocker lock(watcherMutex.get());
101 if (watcher)
102 emit watcher->run();
103 });
104}
105
106QFutureCallOutInterface::~QFutureCallOutInterface()
107 = default;
108
109Q_IMPL_EVENT_COMMON(QFutureCallOutEvent)
110
111QFutureInterfaceBase::QFutureInterfaceBase(State initialState)
112 : d(new QFutureInterfaceBasePrivate(initialState))
113{ }
114
115QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other)
116 : d(other.d)
117{
118 d->refCount.ref();
119}
120
121QFutureInterfaceBase::~QFutureInterfaceBase()
122{
123 if (d && !d->refCount.deref())
124 delete d;
125}
126
127static inline int switch_on(QAtomicInt &a, int which)
128{
129 return a.fetchAndOrRelaxed(valueToAdd: which) | which;
130}
131
132static inline int switch_off(QAtomicInt &a, int which)
133{
134 return a.fetchAndAndRelaxed(valueToAdd: ~which) & ~which;
135}
136
137static inline int switch_from_to(QAtomicInt &a, int from, int to)
138{
139 const auto adjusted = [&](int old) { return (old & ~from) | to; };
140 int value = a.loadRelaxed();
141 while (!a.testAndSetRelaxed(expectedValue: value, newValue: adjusted(value), currentValue&: value))
142 qYieldCpu();
143 return value;
144}
145
146void QFutureInterfaceBase::cancel()
147{
148 cancel(mode: CancelMode::CancelOnly);
149}
150
151void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode)
152{
153 QMutexLocker locker(&d->m_mutex);
154
155 const auto oldState = d->state.loadRelaxed();
156
157 switch (mode) {
158 case CancelMode::CancelAndFinish:
159 if ((oldState & Finished) && (oldState & Canceled))
160 return;
161 switch_from_to(a&: d->state, from: suspendingOrSuspended | Running, to: Canceled | Finished);
162 break;
163 case CancelMode::CancelOnly:
164 if (oldState & Canceled)
165 return;
166 switch_from_to(a&: d->state, from: suspendingOrSuspended, to: Canceled);
167 break;
168 }
169
170 // Cancel the continuations chain
171 QFutureInterfaceBasePrivate *next = d->continuationData;
172 while (next) {
173 next->continuationState = QFutureInterfaceBasePrivate::Canceled;
174 next = next->continuationData;
175 }
176
177 d->waitCondition.wakeAll();
178 d->pausedWaitCondition.wakeAll();
179
180 if (!(oldState & Canceled))
181 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
182 if (mode == CancelMode::CancelAndFinish && !(oldState & Finished))
183 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Finished));
184
185 d->isValid = false;
186}
187
188void QFutureInterfaceBase::setSuspended(bool suspend)
189{
190 QMutexLocker locker(&d->m_mutex);
191 if (suspend) {
192 switch_on(a&: d->state, which: Suspending);
193 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
194 } else {
195 switch_off(a&: d->state, which: suspendingOrSuspended);
196 d->pausedWaitCondition.wakeAll();
197 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
198 }
199}
200
201void QFutureInterfaceBase::toggleSuspended()
202{
203 QMutexLocker locker(&d->m_mutex);
204 if (d->state.loadRelaxed() & suspendingOrSuspended) {
205 switch_off(a&: d->state, which: suspendingOrSuspended);
206 d->pausedWaitCondition.wakeAll();
207 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
208 } else {
209 switch_on(a&: d->state, which: Suspending);
210 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
211 }
212}
213
214void QFutureInterfaceBase::reportSuspended() const
215{
216 // Needs to be called when pause is in effect,
217 // i.e. no more events will be reported.
218
219 QMutexLocker locker(&d->m_mutex);
220 const int state = d->state.loadRelaxed();
221 if (!(state & Suspending) || (state & Suspended))
222 return;
223
224 switch_from_to(a&: d->state, from: Suspending, to: Suspended);
225 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
226}
227
228void QFutureInterfaceBase::setThrottled(bool enable)
229{
230 QMutexLocker lock(&d->m_mutex);
231 if (enable) {
232 switch_on(a&: d->state, which: Throttled);
233 } else {
234 switch_off(a&: d->state, which: Throttled);
235 if (!(d->state.loadRelaxed() & suspendingOrSuspended))
236 d->pausedWaitCondition.wakeAll();
237 }
238}
239
240
241bool QFutureInterfaceBase::isRunning() const
242{
243 return queryState(state: Running);
244}
245
246bool QFutureInterfaceBase::isStarted() const
247{
248 return queryState(state: Started);
249}
250
251bool QFutureInterfaceBase::isCanceled() const
252{
253 return queryState(state: Canceled);
254}
255
256bool QFutureInterfaceBase::isFinished() const
257{
258 return queryState(state: Finished);
259}
260
261bool QFutureInterfaceBase::isSuspending() const
262{
263 return queryState(state: Suspending);
264}
265
266#if QT_DEPRECATED_SINCE(6, 0)
267bool QFutureInterfaceBase::isPaused() const
268{
269 return queryState(state: static_cast<State>(suspendingOrSuspended));
270}
271#endif
272
273bool QFutureInterfaceBase::isSuspended() const
274{
275 return queryState(state: Suspended);
276}
277
278bool QFutureInterfaceBase::isThrottled() const
279{
280 return queryState(state: Throttled);
281}
282
283bool QFutureInterfaceBase::isResultReadyAt(int index) const
284{
285 QMutexLocker lock(&d->m_mutex);
286 return d->internal_isResultReadyAt(index);
287}
288
289bool QFutureInterfaceBase::isValid() const
290{
291 const QMutexLocker lock(&d->m_mutex);
292 return d->isValid;
293}
294
295bool QFutureInterfaceBase::isRunningOrPending() const
296{
297 return queryState(state: static_cast<State>(Running | Pending));
298}
299
300bool QFutureInterfaceBase::waitForNextResult()
301{
302 QMutexLocker lock(&d->m_mutex);
303 return d->internal_waitForNextResult();
304}
305
306void QFutureInterfaceBase::waitForResume()
307{
308 // return early if possible to avoid taking the mutex lock.
309 {
310 const int state = d->state.loadRelaxed();
311 if (!(state & suspendingOrSuspended) || (state & Canceled))
312 return;
313 }
314
315 QMutexLocker lock(&d->m_mutex);
316 const int state = d->state.loadRelaxed();
317 if (!(state & suspendingOrSuspended) || (state & Canceled))
318 return;
319
320 // decrease active thread count since this thread will wait.
321 const ThreadPoolThreadReleaser releaser(d->pool());
322
323 d->pausedWaitCondition.wait(lockedMutex: &d->m_mutex);
324}
325
326void QFutureInterfaceBase::suspendIfRequested()
327{
328 const auto canSuspend = [] (int state) {
329 // can suspend only if 1) in any suspend-related state; 2) not canceled
330 return (state & suspendingOrSuspended) && !(state & Canceled);
331 };
332
333 // return early if possible to avoid taking the mutex lock.
334 {
335 const int state = d->state.loadRelaxed();
336 if (!canSuspend(state))
337 return;
338 }
339
340 QMutexLocker lock(&d->m_mutex);
341 const int state = d->state.loadRelaxed();
342 if (!canSuspend(state))
343 return;
344
345 // Note: expecting that Suspending and Suspended are mutually exclusive
346 if (!(state & Suspended)) {
347 // switch state in case this is the first invocation
348 switch_from_to(a&: d->state, from: Suspending, to: Suspended);
349 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
350 }
351
352 // decrease active thread count since this thread will wait.
353 const ThreadPoolThreadReleaser releaser(d->pool());
354 d->pausedWaitCondition.wait(lockedMutex: &d->m_mutex);
355}
356
357int QFutureInterfaceBase::progressValue() const
358{
359 const QMutexLocker lock(&d->m_mutex);
360 return d->m_progressValue;
361}
362
363int QFutureInterfaceBase::progressMinimum() const
364{
365 const QMutexLocker lock(&d->m_mutex);
366 return d->m_progress ? d->m_progress->minimum : 0;
367}
368
369int QFutureInterfaceBase::progressMaximum() const
370{
371 const QMutexLocker lock(&d->m_mutex);
372 return d->m_progress ? d->m_progress->maximum : 0;
373}
374
375int QFutureInterfaceBase::resultCount() const
376{
377 QMutexLocker lock(&d->m_mutex);
378 return d->internal_resultCount();
379}
380
381QString QFutureInterfaceBase::progressText() const
382{
383 QMutexLocker locker(&d->m_mutex);
384 return d->m_progress ? d->m_progress->text : QString();
385}
386
387bool QFutureInterfaceBase::isProgressUpdateNeeded() const
388{
389 QMutexLocker locker(&d->m_mutex);
390 return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
391}
392
393void QFutureInterfaceBase::reportStarted()
394{
395 QMutexLocker locker(&d->m_mutex);
396 if (d->state.loadRelaxed() & (Started|Canceled|Finished))
397 return;
398 d->setState(State(Started | Running));
399 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Started));
400 d->isValid = true;
401}
402
403void QFutureInterfaceBase::reportCanceled()
404{
405 cancel();
406}
407
408#ifndef QT_NO_EXCEPTIONS
409void QFutureInterfaceBase::reportException(const QException &exception)
410{
411 try {
412 exception.raise();
413 } catch (...) {
414 reportException(e: std::current_exception());
415 }
416}
417
418#if QT_VERSION < QT_VERSION_CHECK(7, 0, 0)
419void QFutureInterfaceBase::reportException(std::exception_ptr exception)
420#else
421void QFutureInterfaceBase::reportException(const std::exception_ptr &exception)
422#endif
423{
424 QMutexLocker locker(&d->m_mutex);
425 if (d->state.loadRelaxed() & (Canceled|Finished))
426 return;
427
428 d->hasException = true;
429 d->data.setException(exception);
430 switch_on(a&: d->state, which: Canceled);
431 d->waitCondition.wakeAll();
432 d->pausedWaitCondition.wakeAll();
433 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
434}
435#endif
436
437void QFutureInterfaceBase::reportFinished()
438{
439 QMutexLocker locker(&d->m_mutex);
440 if (!isFinished()) {
441 switch_from_to(a&: d->state, from: Running, to: Finished);
442 d->waitCondition.wakeAll();
443 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Finished));
444 }
445}
446
447void QFutureInterfaceBase::setExpectedResultCount(int resultCount)
448{
449 if (d->m_progress)
450 setProgressRange(minimum: 0, maximum: resultCount);
451 d->m_expectedResultCount = resultCount;
452}
453
454int QFutureInterfaceBase::expectedResultCount()
455{
456 return d->m_expectedResultCount;
457}
458
459bool QFutureInterfaceBase::queryState(State state) const
460{
461 return d->state.loadRelaxed() & state;
462}
463
464int QFutureInterfaceBase::loadState() const
465{
466 // Used from ~QPromise, so this check is needed
467 if (!d)
468 return QFutureInterfaceBase::State::NoState;
469 return d->state.loadRelaxed();
470}
471
472void QFutureInterfaceBase::waitForResult(int resultIndex)
473{
474 if (d->hasException)
475 d->data.m_exceptionStore.rethrowException();
476
477 QMutexLocker lock(&d->m_mutex);
478 if (!isRunningOrPending())
479 return;
480 lock.unlock();
481
482 // To avoid deadlocks and reduce the number of threads used, try to
483 // run the runnable in the current thread.
484 d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable);
485
486 lock.relock();
487
488 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
489 while (isRunningOrPending() && !d->internal_isResultReadyAt(index: waitIndex))
490 d->waitCondition.wait(lockedMutex: &d->m_mutex);
491
492 if (d->hasException)
493 d->data.m_exceptionStore.rethrowException();
494}
495
496void QFutureInterfaceBase::waitForFinished()
497{
498 QMutexLocker lock(&d->m_mutex);
499 const bool alreadyFinished = isFinished();
500 lock.unlock();
501
502 if (!alreadyFinished) {
503 d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable);
504
505 lock.relock();
506
507 while (!isFinished())
508 d->waitCondition.wait(lockedMutex: &d->m_mutex);
509 }
510
511 if (d->hasException)
512 d->data.m_exceptionStore.rethrowException();
513}
514
515void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
516{
517 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
518 return;
519
520 d->waitCondition.wakeAll();
521
522 if (!d->m_progress) {
523 if (d->internal_updateProgressValue(progress: d->m_progressValue + endIndex - beginIndex) == false) {
524 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
525 beginIndex,
526 endIndex));
527 return;
528 }
529
530 d->sendCallOuts(callOut1: QFutureCallOutEvent(QFutureCallOutEvent::Progress,
531 d->m_progressValue,
532 QString()),
533 callOut2: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
534 beginIndex,
535 endIndex));
536 return;
537 }
538 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
539}
540
541void QFutureInterfaceBase::setRunnable(QRunnable *runnable)
542{
543 d->runnable = runnable;
544}
545
546void QFutureInterfaceBase::setThreadPool(QThreadPool *pool)
547{
548 d->m_pool = pool;
549}
550
551QThreadPool *QFutureInterfaceBase::threadPool() const
552{
553 return d->m_pool;
554}
555
556void QFutureInterfaceBase::setFilterMode(bool enable)
557{
558 QMutexLocker locker(&d->m_mutex);
559 if (!hasException())
560 resultStoreBase().setFilterMode(enable);
561}
562
563/*!
564 \internal
565 Sets the progress range's minimum and maximum values to \a minimum and
566 \a maximum respectively.
567
568 If \a maximum is smaller than \a minimum, \a minimum becomes the only
569 legal value.
570
571 The progress value is reset to be \a minimum.
572
573 The progress range usage can be disabled by using setProgressRange(0, 0).
574 In this case progress value is also reset to 0.
575
576 The behavior of this method is mostly inspired by
577 \l QProgressBar::setRange.
578*/
579void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
580{
581 QMutexLocker locker(&d->m_mutex);
582 if (!d->m_progress)
583 d->m_progress.reset(other: new QFutureInterfaceBasePrivate::ProgressData());
584 d->m_progress->minimum = minimum;
585 d->m_progress->maximum = qMax(a: minimum, b: maximum);
586 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
587 d->m_progressValue = minimum;
588}
589
590void QFutureInterfaceBase::setProgressValue(int progressValue)
591{
592 setProgressValueAndText(progressValue, progressText: QString());
593}
594
595/*!
596 \internal
597 In case of the \a progressValue falling out of the progress range,
598 this method has no effect.
599 Such behavior is inspired by \l QProgressBar::setValue.
600*/
601void QFutureInterfaceBase::setProgressValueAndText(int progressValue,
602 const QString &progressText)
603{
604 QMutexLocker locker(&d->m_mutex);
605 if (!d->m_progress)
606 d->m_progress.reset(other: new QFutureInterfaceBasePrivate::ProgressData());
607
608 const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0);
609 if (useProgressRange
610 && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) {
611 return;
612 }
613
614 if (d->m_progressValue >= progressValue)
615 return;
616
617 if (d->state.loadRelaxed() & (Canceled|Finished))
618 return;
619
620 if (d->internal_updateProgress(progress: progressValue, progressText)) {
621 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Progress,
622 d->m_progressValue,
623 d->m_progress->text));
624 }
625}
626
627QMutex &QFutureInterfaceBase::mutex() const
628{
629 return d->m_mutex;
630}
631
632bool QFutureInterfaceBase::hasException() const
633{
634 return d->hasException;
635}
636
637QtPrivate::ExceptionStore &QFutureInterfaceBase::exceptionStore()
638{
639 Q_ASSERT(d->hasException);
640 return d->data.m_exceptionStore;
641}
642
643QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase()
644{
645 Q_ASSERT(!d->hasException);
646 return d->data.m_results;
647}
648
649const QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const
650{
651 Q_ASSERT(!d->hasException);
652 return d->data.m_results;
653}
654
655QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other)
656{
657 QFutureInterfaceBase copy(other);
658 swap(other&: copy);
659 return *this;
660}
661
662// ### Qt 7: inline
663void QFutureInterfaceBase::swap(QFutureInterfaceBase &other) noexcept
664{
665 qSwap(value1&: d, value2&: other.d);
666}
667
668bool QFutureInterfaceBase::refT() const noexcept
669{
670 return d->refCount.refT();
671}
672
673bool QFutureInterfaceBase::derefT() const noexcept
674{
675 // Called from ~QFutureInterface
676 return !d || d->refCount.derefT();
677}
678
679void QFutureInterfaceBase::reset()
680{
681 d->m_progressValue = 0;
682 d->m_progress.reset();
683 d->progressTime.invalidate();
684 d->isValid = false;
685}
686
687void QFutureInterfaceBase::rethrowPossibleException()
688{
689 if (hasException())
690 exceptionStore().rethrowException();
691}
692
693QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
694 : state(initialState)
695{
696 progressTime.invalidate();
697}
698
699QFutureInterfaceBasePrivate::~QFutureInterfaceBasePrivate()
700{
701 if (hasException)
702 data.m_exceptionStore.~ExceptionStore();
703 else
704 data.m_results.~ResultStoreBase();
705}
706
707int QFutureInterfaceBasePrivate::internal_resultCount() const
708{
709 return hasException ? 0 : data.m_results.count(); // ### subtract canceled results.
710}
711
712bool QFutureInterfaceBasePrivate::internal_isResultReadyAt(int index) const
713{
714 return hasException ? false : (data.m_results.contains(index));
715}
716
717bool QFutureInterfaceBasePrivate::internal_waitForNextResult()
718{
719 if (hasException)
720 return false;
721
722 if (data.m_results.hasNextResult())
723 return true;
724
725 while ((state.loadRelaxed() & QFutureInterfaceBase::Running)
726 && data.m_results.hasNextResult() == false)
727 waitCondition.wait(lockedMutex: &m_mutex);
728
729 return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled)
730 && data.m_results.hasNextResult();
731}
732
733bool QFutureInterfaceBasePrivate::internal_updateProgressValue(int progress)
734{
735 if (m_progressValue >= progress)
736 return false;
737
738 m_progressValue = progress;
739
740 if (progressTime.isValid() && m_progressValue != 0) // make sure the first and last steps are emitted.
741 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
742 return false;
743
744 progressTime.start();
745 return true;
746
747}
748
749bool QFutureInterfaceBasePrivate::internal_updateProgress(int progress,
750 const QString &progressText)
751{
752 if (m_progressValue >= progress)
753 return false;
754
755 Q_ASSERT(m_progress);
756
757 m_progressValue = progress;
758 m_progress->text = progressText;
759
760 if (progressTime.isValid() && m_progressValue != m_progress->maximum) // make sure the first and last steps are emitted.
761 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
762 return false;
763
764 progressTime.start();
765 return true;
766}
767
768void QFutureInterfaceBasePrivate::internal_setThrottled(bool enable)
769{
770 // bail out if we are not changing the state
771 if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled))
772 || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled)))
773 return;
774
775 // change the state
776 if (enable) {
777 switch_on(a&: state, which: QFutureInterfaceBase::Throttled);
778 } else {
779 switch_off(a&: state, which: QFutureInterfaceBase::Throttled);
780 if (!(state.loadRelaxed() & suspendingOrSuspended))
781 pausedWaitCondition.wakeAll();
782 }
783}
784
785void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent)
786{
787 if (outputConnections.isEmpty())
788 return;
789
790 for (int i = 0; i < outputConnections.size(); ++i)
791 outputConnections.at(i)->postCallOutEvent(callOutEvent);
792}
793
794void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1,
795 const QFutureCallOutEvent &callOutEvent2)
796{
797 if (outputConnections.isEmpty())
798 return;
799
800 for (int i = 0; i < outputConnections.size(); ++i) {
801 QFutureCallOutInterface *iface = outputConnections.at(i);
802 iface->postCallOutEvent(callOutEvent1);
803 iface->postCallOutEvent(callOutEvent2);
804 }
805}
806
807// This function connects an output interface (for example a QFutureWatcher)
808// to this future. While holding the lock we check the state and ready results
809// and add the appropriate callouts to the queue.
810void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *iface)
811{
812 QMutexLocker locker(&m_mutex);
813
814 const auto currentState = state.loadRelaxed();
815 if (currentState & QFutureInterfaceBase::Started) {
816 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
817 if (m_progress) {
818 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
819 m_progress->minimum,
820 m_progress->maximum));
821 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
822 m_progressValue,
823 m_progress->text));
824 } else {
825 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
826 0,
827 0));
828 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
829 m_progressValue,
830 QString()));
831 }
832 }
833
834 if (!hasException) {
835 QtPrivate::ResultIteratorBase it = data.m_results.begin();
836 while (it != data.m_results.end()) {
837 const int begin = it.resultIndex();
838 const int end = begin + it.batchSize();
839 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
840 begin,
841 end));
842 it.batchedAdvance();
843 }
844 }
845
846 if (currentState & QFutureInterfaceBase::Suspended)
847 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
848 else if (currentState & QFutureInterfaceBase::Suspending)
849 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
850
851 if (currentState & QFutureInterfaceBase::Canceled)
852 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
853
854 if (currentState & QFutureInterfaceBase::Finished)
855 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
856
857 outputConnections.append(t: iface);
858}
859
860void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *iface)
861{
862 QMutexLocker lock(&m_mutex);
863 const qsizetype index = outputConnections.indexOf(t: iface);
864 if (index == -1)
865 return;
866 outputConnections.removeAt(i: index);
867
868 iface->callOutInterfaceDisconnected();
869}
870
871void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
872{
873 state.storeRelaxed(newValue: newState);
874}
875
876void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInterfaceBase &)> func)
877{
878 setContinuation(func: std::move(func), continuationFutureData: nullptr);
879}
880
881void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInterfaceBase &)> func,
882 QFutureInterfaceBasePrivate *continuationFutureData)
883{
884 QMutexLocker lock(&d->continuationMutex);
885
886 // If the state is ready, run continuation immediately,
887 // otherwise save it for later.
888 if (isFinished()) {
889 lock.unlock();
890 func(*this);
891 lock.relock();
892 }
893 // Unless the continuation has been cleaned earlier, we have to
894 // store the move-only continuation, to guarantee that the associated
895 // future's data stays alive.
896 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) {
897 if (d->continuation) {
898 qWarning() << "Adding a continuation to a future which already has a continuation. "
899 "The existing continuation is overwritten.";
900 }
901 d->continuation = std::move(func);
902 d->continuationData = continuationFutureData;
903 }
904}
905
906void QFutureInterfaceBase::cleanContinuation()
907{
908 if (!d)
909 return;
910
911 QMutexLocker lock(&d->continuationMutex);
912 d->continuation = nullptr;
913 d->continuationState = QFutureInterfaceBasePrivate::Cleaned;
914 d->continuationData = nullptr;
915}
916
917void QFutureInterfaceBase::runContinuation() const
918{
919 QMutexLocker lock(&d->continuationMutex);
920 if (d->continuation) {
921 // Save the continuation in a local function, to avoid calling
922 // a null std::function below, in case cleanContinuation() is
923 // called from some other thread right after unlock() below.
924 auto fn = std::move(d->continuation);
925 lock.unlock();
926 fn(*this);
927
928 lock.relock();
929 // Unless the continuation has been cleaned earlier, we have to
930 // store the move-only continuation, to guarantee that the associated
931 // future's data stays alive.
932 if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned)
933 d->continuation = std::move(fn);
934 }
935}
936
937bool QFutureInterfaceBase::isChainCanceled() const
938{
939 return isCanceled() || d->continuationState == QFutureInterfaceBasePrivate::Canceled;
940}
941
942void QFutureInterfaceBase::setLaunchAsync(bool value)
943{
944 d->launchAsync = value;
945}
946
947bool QFutureInterfaceBase::launchAsync() const
948{
949 return d->launchAsync;
950}
951
952namespace QtFuture {
953
954QFuture<void> makeReadyVoidFuture()
955{
956 QFutureInterface<void> promise;
957 promise.reportStarted();
958 promise.reportFinished();
959
960 return promise.future();
961}
962
963} // namespace QtFuture
964
965QT_END_NAMESPACE
966
967#include "qfutureinterface.moc"
968

Provided by KDAB

Privacy Policy
Learn Advanced QML with KDAB
Find out more

source code of qtbase/src/corelib/thread/qfutureinterface.cpp