1 | // Copyright (C) 2020 The Qt Company Ltd. |
---|---|
2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only |
3 | |
4 | // qfutureinterface.h included from qfuture.h |
5 | #include "qfuture.h" |
6 | #include "qfutureinterface_p.h" |
7 | |
8 | #include <QtCore/qatomic.h> |
9 | #include <QtCore/qcoreapplication.h> |
10 | #include <QtCore/qthread.h> |
11 | #include <QtCore/qvarlengtharray.h> |
12 | #include <private/qthreadpool_p.h> |
13 | #include <private/qobject_p.h> |
14 | |
15 | // GCC 12 gets confused about QFutureInterfaceBase::state, for some non-obvious |
16 | // reason |
17 | // warning: ‘unsigned int __atomic_or_fetch_4(volatile void*, unsigned int, int)’ writing 4 bytes into a region of size 0 overflows the destination [-Wstringop-overflow=] |
18 | QT_WARNING_DISABLE_GCC("-Wstringop-overflow") |
19 | |
20 | QT_BEGIN_NAMESPACE |
21 | |
22 | enum { |
23 | MaxProgressEmitsPerSecond = 25 |
24 | }; |
25 | |
26 | namespace { |
27 | class ThreadPoolThreadReleaser { |
28 | QThreadPool *m_pool; |
29 | public: |
30 | Q_NODISCARD_CTOR |
31 | explicit ThreadPoolThreadReleaser(QThreadPool *pool) |
32 | : m_pool(pool) |
33 | { if (pool) pool->releaseThread(); } |
34 | ~ThreadPoolThreadReleaser() |
35 | { if (m_pool) m_pool->reserveThread(); } |
36 | }; |
37 | |
38 | const auto suspendingOrSuspended = |
39 | QFutureInterfaceBase::Suspending | QFutureInterfaceBase::Suspended; |
40 | |
41 | } // unnamed namespace |
42 | |
43 | class QObjectContinuationWrapper : public QObject |
44 | { |
45 | Q_OBJECT |
46 | public: |
47 | explicit QObjectContinuationWrapper(QObject *parent = nullptr) |
48 | : QObject(parent) |
49 | { |
50 | } |
51 | |
52 | signals: |
53 | void run(); |
54 | }; |
55 | |
56 | void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *slotObj, |
57 | QFutureInterfaceBase &fi) |
58 | { |
59 | Q_ASSERT(context); |
60 | Q_ASSERT(slotObj); |
61 | |
62 | auto slot = SlotObjUniquePtr(slotObj); |
63 | |
64 | auto *watcher = new QObjectContinuationWrapper; |
65 | watcher->moveToThread(thread: context->thread()); |
66 | |
67 | // We need to protect acccess to the watcher. The context object (and in turn, the watcher) |
68 | // could be destroyed while the continuation that emits the signal is running. We have to |
69 | // prevent that. |
70 | // The mutex has to be recursive, because the continuation itself could delete the context |
71 | // object (and thus the watcher), which will try to lock the mutex from the same thread twice. |
72 | auto watcherMutex = std::make_shared<QRecursiveMutex>(); |
73 | const auto destroyWatcher = [watcherMutex, watcher]() mutable { |
74 | QMutexLocker lock(watcherMutex.get()); |
75 | delete watcher; |
76 | }; |
77 | |
78 | // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`... |
79 | QObject::connect(sender: watcher, signal: &QObjectContinuationWrapper::run, |
80 | // for the following, cf. QMetaObject::invokeMethodImpl(): |
81 | // we know `slot` is a lambda returning `void`, so we can just |
82 | // `call()` with `obj` and `args[0]` set to `nullptr`: |
83 | context, slot: [slot = std::move(slot)] { |
84 | void *args[] = { nullptr }; // for `void` return value |
85 | slot->call(r: nullptr, a: args); |
86 | }); |
87 | QObject::connect(sender: watcher, signal: &QObjectContinuationWrapper::run, context: watcher, slot: destroyWatcher); |
88 | |
89 | // We need to connect to destroyWatcher here, instead of delete or deleteLater(). |
90 | // If the continuation is called from a separate thread, emit watcher->run() can't detect that |
91 | // the watcher has been deleted in the separate thread, causing a race condition and potential |
92 | // heap-use-after-free issue inside QObject::doActivate. destroyWatcher forces the deletion of |
93 | // the watcher to occur after emit watcher->run() completes and prevents the race condition. |
94 | QObject::connect(sender: context, signal: &QObject::destroyed, context: watcher, slot: destroyWatcher); |
95 | |
96 | fi.setContinuation([watcherMutex, watcher = QPointer(watcher)] |
97 | (const QFutureInterfaceBase &parentData) |
98 | { |
99 | Q_UNUSED(parentData); |
100 | QMutexLocker lock(watcherMutex.get()); |
101 | if (watcher) |
102 | emit watcher->run(); |
103 | }); |
104 | } |
105 | |
106 | QFutureCallOutInterface::~QFutureCallOutInterface() |
107 | = default; |
108 | |
109 | Q_IMPL_EVENT_COMMON(QFutureCallOutEvent) |
110 | |
111 | QFutureInterfaceBase::QFutureInterfaceBase(State initialState) |
112 | : d(new QFutureInterfaceBasePrivate(initialState)) |
113 | { } |
114 | |
115 | QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other) |
116 | : d(other.d) |
117 | { |
118 | d->refCount.ref(); |
119 | } |
120 | |
121 | QFutureInterfaceBase::~QFutureInterfaceBase() |
122 | { |
123 | if (d && !d->refCount.deref()) |
124 | delete d; |
125 | } |
126 | |
127 | static inline int switch_on(QAtomicInt &a, int which) |
128 | { |
129 | return a.fetchAndOrRelaxed(valueToAdd: which) | which; |
130 | } |
131 | |
132 | static inline int switch_off(QAtomicInt &a, int which) |
133 | { |
134 | return a.fetchAndAndRelaxed(valueToAdd: ~which) & ~which; |
135 | } |
136 | |
137 | static inline int switch_from_to(QAtomicInt &a, int from, int to) |
138 | { |
139 | const auto adjusted = [&](int old) { return (old & ~from) | to; }; |
140 | int value = a.loadRelaxed(); |
141 | while (!a.testAndSetRelaxed(expectedValue: value, newValue: adjusted(value), currentValue&: value)) |
142 | qYieldCpu(); |
143 | return value; |
144 | } |
145 | |
146 | void QFutureInterfaceBase::cancel() |
147 | { |
148 | cancel(mode: CancelMode::CancelOnly); |
149 | } |
150 | |
151 | void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode) |
152 | { |
153 | QMutexLocker locker(&d->m_mutex); |
154 | |
155 | const auto oldState = d->state.loadRelaxed(); |
156 | |
157 | switch (mode) { |
158 | case CancelMode::CancelAndFinish: |
159 | if ((oldState & Finished) && (oldState & Canceled)) |
160 | return; |
161 | switch_from_to(a&: d->state, from: suspendingOrSuspended | Running, to: Canceled | Finished); |
162 | break; |
163 | case CancelMode::CancelOnly: |
164 | if (oldState & Canceled) |
165 | return; |
166 | switch_from_to(a&: d->state, from: suspendingOrSuspended, to: Canceled); |
167 | break; |
168 | } |
169 | |
170 | // Cancel the continuations chain |
171 | QFutureInterfaceBasePrivate *next = d->continuationData; |
172 | while (next) { |
173 | next->continuationState = QFutureInterfaceBasePrivate::Canceled; |
174 | next = next->continuationData; |
175 | } |
176 | |
177 | d->waitCondition.wakeAll(); |
178 | d->pausedWaitCondition.wakeAll(); |
179 | |
180 | if (!(oldState & Canceled)) |
181 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); |
182 | if (mode == CancelMode::CancelAndFinish && !(oldState & Finished)) |
183 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Finished)); |
184 | |
185 | d->isValid = false; |
186 | } |
187 | |
188 | void QFutureInterfaceBase::setSuspended(bool suspend) |
189 | { |
190 | QMutexLocker locker(&d->m_mutex); |
191 | if (suspend) { |
192 | switch_on(a&: d->state, which: Suspending); |
193 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspending)); |
194 | } else { |
195 | switch_off(a&: d->state, which: suspendingOrSuspended); |
196 | d->pausedWaitCondition.wakeAll(); |
197 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed)); |
198 | } |
199 | } |
200 | |
201 | void QFutureInterfaceBase::toggleSuspended() |
202 | { |
203 | QMutexLocker locker(&d->m_mutex); |
204 | if (d->state.loadRelaxed() & suspendingOrSuspended) { |
205 | switch_off(a&: d->state, which: suspendingOrSuspended); |
206 | d->pausedWaitCondition.wakeAll(); |
207 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed)); |
208 | } else { |
209 | switch_on(a&: d->state, which: Suspending); |
210 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspending)); |
211 | } |
212 | } |
213 | |
214 | void QFutureInterfaceBase::reportSuspended() const |
215 | { |
216 | // Needs to be called when pause is in effect, |
217 | // i.e. no more events will be reported. |
218 | |
219 | QMutexLocker locker(&d->m_mutex); |
220 | const int state = d->state.loadRelaxed(); |
221 | if (!(state & Suspending) || (state & Suspended)) |
222 | return; |
223 | |
224 | switch_from_to(a&: d->state, from: Suspending, to: Suspended); |
225 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspended)); |
226 | } |
227 | |
228 | void QFutureInterfaceBase::setThrottled(bool enable) |
229 | { |
230 | QMutexLocker lock(&d->m_mutex); |
231 | if (enable) { |
232 | switch_on(a&: d->state, which: Throttled); |
233 | } else { |
234 | switch_off(a&: d->state, which: Throttled); |
235 | if (!(d->state.loadRelaxed() & suspendingOrSuspended)) |
236 | d->pausedWaitCondition.wakeAll(); |
237 | } |
238 | } |
239 | |
240 | |
241 | bool QFutureInterfaceBase::isRunning() const |
242 | { |
243 | return queryState(state: Running); |
244 | } |
245 | |
246 | bool QFutureInterfaceBase::isStarted() const |
247 | { |
248 | return queryState(state: Started); |
249 | } |
250 | |
251 | bool QFutureInterfaceBase::isCanceled() const |
252 | { |
253 | return queryState(state: Canceled); |
254 | } |
255 | |
256 | bool QFutureInterfaceBase::isFinished() const |
257 | { |
258 | return queryState(state: Finished); |
259 | } |
260 | |
261 | bool QFutureInterfaceBase::isSuspending() const |
262 | { |
263 | return queryState(state: Suspending); |
264 | } |
265 | |
266 | #if QT_DEPRECATED_SINCE(6, 0) |
267 | bool QFutureInterfaceBase::isPaused() const |
268 | { |
269 | return queryState(state: static_cast<State>(suspendingOrSuspended)); |
270 | } |
271 | #endif |
272 | |
273 | bool QFutureInterfaceBase::isSuspended() const |
274 | { |
275 | return queryState(state: Suspended); |
276 | } |
277 | |
278 | bool QFutureInterfaceBase::isThrottled() const |
279 | { |
280 | return queryState(state: Throttled); |
281 | } |
282 | |
283 | bool QFutureInterfaceBase::isResultReadyAt(int index) const |
284 | { |
285 | QMutexLocker lock(&d->m_mutex); |
286 | return d->internal_isResultReadyAt(index); |
287 | } |
288 | |
289 | bool QFutureInterfaceBase::isValid() const |
290 | { |
291 | const QMutexLocker lock(&d->m_mutex); |
292 | return d->isValid; |
293 | } |
294 | |
295 | bool QFutureInterfaceBase::isRunningOrPending() const |
296 | { |
297 | return queryState(state: static_cast<State>(Running | Pending)); |
298 | } |
299 | |
300 | bool QFutureInterfaceBase::waitForNextResult() |
301 | { |
302 | QMutexLocker lock(&d->m_mutex); |
303 | return d->internal_waitForNextResult(); |
304 | } |
305 | |
306 | void QFutureInterfaceBase::waitForResume() |
307 | { |
308 | // return early if possible to avoid taking the mutex lock. |
309 | { |
310 | const int state = d->state.loadRelaxed(); |
311 | if (!(state & suspendingOrSuspended) || (state & Canceled)) |
312 | return; |
313 | } |
314 | |
315 | QMutexLocker lock(&d->m_mutex); |
316 | const int state = d->state.loadRelaxed(); |
317 | if (!(state & suspendingOrSuspended) || (state & Canceled)) |
318 | return; |
319 | |
320 | // decrease active thread count since this thread will wait. |
321 | const ThreadPoolThreadReleaser releaser(d->pool()); |
322 | |
323 | d->pausedWaitCondition.wait(lockedMutex: &d->m_mutex); |
324 | } |
325 | |
326 | void QFutureInterfaceBase::suspendIfRequested() |
327 | { |
328 | const auto canSuspend = [] (int state) { |
329 | // can suspend only if 1) in any suspend-related state; 2) not canceled |
330 | return (state & suspendingOrSuspended) && !(state & Canceled); |
331 | }; |
332 | |
333 | // return early if possible to avoid taking the mutex lock. |
334 | { |
335 | const int state = d->state.loadRelaxed(); |
336 | if (!canSuspend(state)) |
337 | return; |
338 | } |
339 | |
340 | QMutexLocker lock(&d->m_mutex); |
341 | const int state = d->state.loadRelaxed(); |
342 | if (!canSuspend(state)) |
343 | return; |
344 | |
345 | // Note: expecting that Suspending and Suspended are mutually exclusive |
346 | if (!(state & Suspended)) { |
347 | // switch state in case this is the first invocation |
348 | switch_from_to(a&: d->state, from: Suspending, to: Suspended); |
349 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspended)); |
350 | } |
351 | |
352 | // decrease active thread count since this thread will wait. |
353 | const ThreadPoolThreadReleaser releaser(d->pool()); |
354 | d->pausedWaitCondition.wait(lockedMutex: &d->m_mutex); |
355 | } |
356 | |
357 | int QFutureInterfaceBase::progressValue() const |
358 | { |
359 | const QMutexLocker lock(&d->m_mutex); |
360 | return d->m_progressValue; |
361 | } |
362 | |
363 | int QFutureInterfaceBase::progressMinimum() const |
364 | { |
365 | const QMutexLocker lock(&d->m_mutex); |
366 | return d->m_progress ? d->m_progress->minimum : 0; |
367 | } |
368 | |
369 | int QFutureInterfaceBase::progressMaximum() const |
370 | { |
371 | const QMutexLocker lock(&d->m_mutex); |
372 | return d->m_progress ? d->m_progress->maximum : 0; |
373 | } |
374 | |
375 | int QFutureInterfaceBase::resultCount() const |
376 | { |
377 | QMutexLocker lock(&d->m_mutex); |
378 | return d->internal_resultCount(); |
379 | } |
380 | |
381 | QString QFutureInterfaceBase::progressText() const |
382 | { |
383 | QMutexLocker locker(&d->m_mutex); |
384 | return d->m_progress ? d->m_progress->text : QString(); |
385 | } |
386 | |
387 | bool QFutureInterfaceBase::isProgressUpdateNeeded() const |
388 | { |
389 | QMutexLocker locker(&d->m_mutex); |
390 | return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond)); |
391 | } |
392 | |
393 | void QFutureInterfaceBase::reportStarted() |
394 | { |
395 | QMutexLocker locker(&d->m_mutex); |
396 | if (d->state.loadRelaxed() & (Started|Canceled|Finished)) |
397 | return; |
398 | d->setState(State(Started | Running)); |
399 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Started)); |
400 | d->isValid = true; |
401 | } |
402 | |
403 | void QFutureInterfaceBase::reportCanceled() |
404 | { |
405 | cancel(); |
406 | } |
407 | |
408 | #ifndef QT_NO_EXCEPTIONS |
409 | void QFutureInterfaceBase::reportException(const QException &exception) |
410 | { |
411 | try { |
412 | exception.raise(); |
413 | } catch (...) { |
414 | reportException(e: std::current_exception()); |
415 | } |
416 | } |
417 | |
418 | #if QT_VERSION < QT_VERSION_CHECK(7, 0, 0) |
419 | void QFutureInterfaceBase::reportException(std::exception_ptr exception) |
420 | #else |
421 | void QFutureInterfaceBase::reportException(const std::exception_ptr &exception) |
422 | #endif |
423 | { |
424 | QMutexLocker locker(&d->m_mutex); |
425 | if (d->state.loadRelaxed() & (Canceled|Finished)) |
426 | return; |
427 | |
428 | d->hasException = true; |
429 | d->data.setException(exception); |
430 | switch_on(a&: d->state, which: Canceled); |
431 | d->waitCondition.wakeAll(); |
432 | d->pausedWaitCondition.wakeAll(); |
433 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); |
434 | } |
435 | #endif |
436 | |
437 | void QFutureInterfaceBase::reportFinished() |
438 | { |
439 | QMutexLocker locker(&d->m_mutex); |
440 | if (!isFinished()) { |
441 | switch_from_to(a&: d->state, from: Running, to: Finished); |
442 | d->waitCondition.wakeAll(); |
443 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Finished)); |
444 | } |
445 | } |
446 | |
447 | void QFutureInterfaceBase::setExpectedResultCount(int resultCount) |
448 | { |
449 | if (d->m_progress) |
450 | setProgressRange(minimum: 0, maximum: resultCount); |
451 | d->m_expectedResultCount = resultCount; |
452 | } |
453 | |
454 | int QFutureInterfaceBase::expectedResultCount() |
455 | { |
456 | return d->m_expectedResultCount; |
457 | } |
458 | |
459 | bool QFutureInterfaceBase::queryState(State state) const |
460 | { |
461 | return d->state.loadRelaxed() & state; |
462 | } |
463 | |
464 | int QFutureInterfaceBase::loadState() const |
465 | { |
466 | // Used from ~QPromise, so this check is needed |
467 | if (!d) |
468 | return QFutureInterfaceBase::State::NoState; |
469 | return d->state.loadRelaxed(); |
470 | } |
471 | |
472 | void QFutureInterfaceBase::waitForResult(int resultIndex) |
473 | { |
474 | if (d->hasException) |
475 | d->data.m_exceptionStore.rethrowException(); |
476 | |
477 | QMutexLocker lock(&d->m_mutex); |
478 | if (!isRunningOrPending()) |
479 | return; |
480 | lock.unlock(); |
481 | |
482 | // To avoid deadlocks and reduce the number of threads used, try to |
483 | // run the runnable in the current thread. |
484 | d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable); |
485 | |
486 | lock.relock(); |
487 | |
488 | const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex; |
489 | while (isRunningOrPending() && !d->internal_isResultReadyAt(index: waitIndex)) |
490 | d->waitCondition.wait(lockedMutex: &d->m_mutex); |
491 | |
492 | if (d->hasException) |
493 | d->data.m_exceptionStore.rethrowException(); |
494 | } |
495 | |
496 | void QFutureInterfaceBase::waitForFinished() |
497 | { |
498 | QMutexLocker lock(&d->m_mutex); |
499 | const bool alreadyFinished = isFinished(); |
500 | lock.unlock(); |
501 | |
502 | if (!alreadyFinished) { |
503 | d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable); |
504 | |
505 | lock.relock(); |
506 | |
507 | while (!isFinished()) |
508 | d->waitCondition.wait(lockedMutex: &d->m_mutex); |
509 | } |
510 | |
511 | if (d->hasException) |
512 | d->data.m_exceptionStore.rethrowException(); |
513 | } |
514 | |
515 | void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex) |
516 | { |
517 | if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished))) |
518 | return; |
519 | |
520 | d->waitCondition.wakeAll(); |
521 | |
522 | if (!d->m_progress) { |
523 | if (d->internal_updateProgressValue(progress: d->m_progressValue + endIndex - beginIndex) == false) { |
524 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, |
525 | beginIndex, |
526 | endIndex)); |
527 | return; |
528 | } |
529 | |
530 | d->sendCallOuts(callOut1: QFutureCallOutEvent(QFutureCallOutEvent::Progress, |
531 | d->m_progressValue, |
532 | QString()), |
533 | callOut2: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, |
534 | beginIndex, |
535 | endIndex)); |
536 | return; |
537 | } |
538 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex)); |
539 | } |
540 | |
541 | void QFutureInterfaceBase::setRunnable(QRunnable *runnable) |
542 | { |
543 | d->runnable = runnable; |
544 | } |
545 | |
546 | void QFutureInterfaceBase::setThreadPool(QThreadPool *pool) |
547 | { |
548 | d->m_pool = pool; |
549 | } |
550 | |
551 | QThreadPool *QFutureInterfaceBase::threadPool() const |
552 | { |
553 | return d->m_pool; |
554 | } |
555 | |
556 | void QFutureInterfaceBase::setFilterMode(bool enable) |
557 | { |
558 | QMutexLocker locker(&d->m_mutex); |
559 | if (!hasException()) |
560 | resultStoreBase().setFilterMode(enable); |
561 | } |
562 | |
563 | /*! |
564 | \internal |
565 | Sets the progress range's minimum and maximum values to \a minimum and |
566 | \a maximum respectively. |
567 | |
568 | If \a maximum is smaller than \a minimum, \a minimum becomes the only |
569 | legal value. |
570 | |
571 | The progress value is reset to be \a minimum. |
572 | |
573 | The progress range usage can be disabled by using setProgressRange(0, 0). |
574 | In this case progress value is also reset to 0. |
575 | |
576 | The behavior of this method is mostly inspired by |
577 | \l QProgressBar::setRange. |
578 | */ |
579 | void QFutureInterfaceBase::setProgressRange(int minimum, int maximum) |
580 | { |
581 | QMutexLocker locker(&d->m_mutex); |
582 | if (!d->m_progress) |
583 | d->m_progress.reset(other: new QFutureInterfaceBasePrivate::ProgressData()); |
584 | d->m_progress->minimum = minimum; |
585 | d->m_progress->maximum = qMax(a: minimum, b: maximum); |
586 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum)); |
587 | d->m_progressValue = minimum; |
588 | } |
589 | |
590 | void QFutureInterfaceBase::setProgressValue(int progressValue) |
591 | { |
592 | setProgressValueAndText(progressValue, progressText: QString()); |
593 | } |
594 | |
595 | /*! |
596 | \internal |
597 | In case of the \a progressValue falling out of the progress range, |
598 | this method has no effect. |
599 | Such behavior is inspired by \l QProgressBar::setValue. |
600 | */ |
601 | void QFutureInterfaceBase::setProgressValueAndText(int progressValue, |
602 | const QString &progressText) |
603 | { |
604 | QMutexLocker locker(&d->m_mutex); |
605 | if (!d->m_progress) |
606 | d->m_progress.reset(other: new QFutureInterfaceBasePrivate::ProgressData()); |
607 | |
608 | const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0); |
609 | if (useProgressRange |
610 | && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) { |
611 | return; |
612 | } |
613 | |
614 | if (d->m_progressValue >= progressValue) |
615 | return; |
616 | |
617 | if (d->state.loadRelaxed() & (Canceled|Finished)) |
618 | return; |
619 | |
620 | if (d->internal_updateProgress(progress: progressValue, progressText)) { |
621 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Progress, |
622 | d->m_progressValue, |
623 | d->m_progress->text)); |
624 | } |
625 | } |
626 | |
627 | QMutex &QFutureInterfaceBase::mutex() const |
628 | { |
629 | return d->m_mutex; |
630 | } |
631 | |
632 | bool QFutureInterfaceBase::hasException() const |
633 | { |
634 | return d->hasException; |
635 | } |
636 | |
637 | QtPrivate::ExceptionStore &QFutureInterfaceBase::exceptionStore() |
638 | { |
639 | Q_ASSERT(d->hasException); |
640 | return d->data.m_exceptionStore; |
641 | } |
642 | |
643 | QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() |
644 | { |
645 | Q_ASSERT(!d->hasException); |
646 | return d->data.m_results; |
647 | } |
648 | |
649 | const QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const |
650 | { |
651 | Q_ASSERT(!d->hasException); |
652 | return d->data.m_results; |
653 | } |
654 | |
655 | QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other) |
656 | { |
657 | QFutureInterfaceBase copy(other); |
658 | swap(other&: copy); |
659 | return *this; |
660 | } |
661 | |
662 | // ### Qt 7: inline |
663 | void QFutureInterfaceBase::swap(QFutureInterfaceBase &other) noexcept |
664 | { |
665 | qSwap(value1&: d, value2&: other.d); |
666 | } |
667 | |
668 | bool QFutureInterfaceBase::refT() const noexcept |
669 | { |
670 | return d->refCount.refT(); |
671 | } |
672 | |
673 | bool QFutureInterfaceBase::derefT() const noexcept |
674 | { |
675 | // Called from ~QFutureInterface |
676 | return !d || d->refCount.derefT(); |
677 | } |
678 | |
679 | void QFutureInterfaceBase::reset() |
680 | { |
681 | d->m_progressValue = 0; |
682 | d->m_progress.reset(); |
683 | d->progressTime.invalidate(); |
684 | d->isValid = false; |
685 | } |
686 | |
687 | void QFutureInterfaceBase::rethrowPossibleException() |
688 | { |
689 | if (hasException()) |
690 | exceptionStore().rethrowException(); |
691 | } |
692 | |
693 | QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState) |
694 | : state(initialState) |
695 | { |
696 | progressTime.invalidate(); |
697 | } |
698 | |
699 | QFutureInterfaceBasePrivate::~QFutureInterfaceBasePrivate() |
700 | { |
701 | if (hasException) |
702 | data.m_exceptionStore.~ExceptionStore(); |
703 | else |
704 | data.m_results.~ResultStoreBase(); |
705 | } |
706 | |
707 | int QFutureInterfaceBasePrivate::internal_resultCount() const |
708 | { |
709 | return hasException ? 0 : data.m_results.count(); // ### subtract canceled results. |
710 | } |
711 | |
712 | bool QFutureInterfaceBasePrivate::internal_isResultReadyAt(int index) const |
713 | { |
714 | return hasException ? false : (data.m_results.contains(index)); |
715 | } |
716 | |
717 | bool QFutureInterfaceBasePrivate::internal_waitForNextResult() |
718 | { |
719 | if (hasException) |
720 | return false; |
721 | |
722 | if (data.m_results.hasNextResult()) |
723 | return true; |
724 | |
725 | while ((state.loadRelaxed() & QFutureInterfaceBase::Running) |
726 | && data.m_results.hasNextResult() == false) |
727 | waitCondition.wait(lockedMutex: &m_mutex); |
728 | |
729 | return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled) |
730 | && data.m_results.hasNextResult(); |
731 | } |
732 | |
733 | bool QFutureInterfaceBasePrivate::internal_updateProgressValue(int progress) |
734 | { |
735 | if (m_progressValue >= progress) |
736 | return false; |
737 | |
738 | m_progressValue = progress; |
739 | |
740 | if (progressTime.isValid() && m_progressValue != 0) // make sure the first and last steps are emitted. |
741 | if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond)) |
742 | return false; |
743 | |
744 | progressTime.start(); |
745 | return true; |
746 | |
747 | } |
748 | |
749 | bool QFutureInterfaceBasePrivate::internal_updateProgress(int progress, |
750 | const QString &progressText) |
751 | { |
752 | if (m_progressValue >= progress) |
753 | return false; |
754 | |
755 | Q_ASSERT(m_progress); |
756 | |
757 | m_progressValue = progress; |
758 | m_progress->text = progressText; |
759 | |
760 | if (progressTime.isValid() && m_progressValue != m_progress->maximum) // make sure the first and last steps are emitted. |
761 | if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond)) |
762 | return false; |
763 | |
764 | progressTime.start(); |
765 | return true; |
766 | } |
767 | |
768 | void QFutureInterfaceBasePrivate::internal_setThrottled(bool enable) |
769 | { |
770 | // bail out if we are not changing the state |
771 | if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled)) |
772 | || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled))) |
773 | return; |
774 | |
775 | // change the state |
776 | if (enable) { |
777 | switch_on(a&: state, which: QFutureInterfaceBase::Throttled); |
778 | } else { |
779 | switch_off(a&: state, which: QFutureInterfaceBase::Throttled); |
780 | if (!(state.loadRelaxed() & suspendingOrSuspended)) |
781 | pausedWaitCondition.wakeAll(); |
782 | } |
783 | } |
784 | |
785 | void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent) |
786 | { |
787 | if (outputConnections.isEmpty()) |
788 | return; |
789 | |
790 | for (int i = 0; i < outputConnections.size(); ++i) |
791 | outputConnections.at(i)->postCallOutEvent(callOutEvent); |
792 | } |
793 | |
794 | void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1, |
795 | const QFutureCallOutEvent &callOutEvent2) |
796 | { |
797 | if (outputConnections.isEmpty()) |
798 | return; |
799 | |
800 | for (int i = 0; i < outputConnections.size(); ++i) { |
801 | QFutureCallOutInterface *iface = outputConnections.at(i); |
802 | iface->postCallOutEvent(callOutEvent1); |
803 | iface->postCallOutEvent(callOutEvent2); |
804 | } |
805 | } |
806 | |
807 | // This function connects an output interface (for example a QFutureWatcher) |
808 | // to this future. While holding the lock we check the state and ready results |
809 | // and add the appropriate callouts to the queue. |
810 | void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *iface) |
811 | { |
812 | QMutexLocker locker(&m_mutex); |
813 | |
814 | const auto currentState = state.loadRelaxed(); |
815 | if (currentState & QFutureInterfaceBase::Started) { |
816 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started)); |
817 | if (m_progress) { |
818 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, |
819 | m_progress->minimum, |
820 | m_progress->maximum)); |
821 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress, |
822 | m_progressValue, |
823 | m_progress->text)); |
824 | } else { |
825 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, |
826 | 0, |
827 | 0)); |
828 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress, |
829 | m_progressValue, |
830 | QString())); |
831 | } |
832 | } |
833 | |
834 | if (!hasException) { |
835 | QtPrivate::ResultIteratorBase it = data.m_results.begin(); |
836 | while (it != data.m_results.end()) { |
837 | const int begin = it.resultIndex(); |
838 | const int end = begin + it.batchSize(); |
839 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, |
840 | begin, |
841 | end)); |
842 | it.batchedAdvance(); |
843 | } |
844 | } |
845 | |
846 | if (currentState & QFutureInterfaceBase::Suspended) |
847 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended)); |
848 | else if (currentState & QFutureInterfaceBase::Suspending) |
849 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending)); |
850 | |
851 | if (currentState & QFutureInterfaceBase::Canceled) |
852 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); |
853 | |
854 | if (currentState & QFutureInterfaceBase::Finished) |
855 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished)); |
856 | |
857 | outputConnections.append(t: iface); |
858 | } |
859 | |
860 | void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *iface) |
861 | { |
862 | QMutexLocker lock(&m_mutex); |
863 | const qsizetype index = outputConnections.indexOf(t: iface); |
864 | if (index == -1) |
865 | return; |
866 | outputConnections.removeAt(i: index); |
867 | |
868 | iface->callOutInterfaceDisconnected(); |
869 | } |
870 | |
871 | void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState) |
872 | { |
873 | state.storeRelaxed(newValue: newState); |
874 | } |
875 | |
876 | void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInterfaceBase &)> func) |
877 | { |
878 | setContinuation(func: std::move(func), continuationFutureData: nullptr); |
879 | } |
880 | |
881 | void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInterfaceBase &)> func, |
882 | QFutureInterfaceBasePrivate *continuationFutureData) |
883 | { |
884 | QMutexLocker lock(&d->continuationMutex); |
885 | |
886 | // If the state is ready, run continuation immediately, |
887 | // otherwise save it for later. |
888 | if (isFinished()) { |
889 | lock.unlock(); |
890 | func(*this); |
891 | lock.relock(); |
892 | } |
893 | // Unless the continuation has been cleaned earlier, we have to |
894 | // store the move-only continuation, to guarantee that the associated |
895 | // future's data stays alive. |
896 | if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) { |
897 | if (d->continuation) { |
898 | qWarning() << "Adding a continuation to a future which already has a continuation. " |
899 | "The existing continuation is overwritten."; |
900 | } |
901 | d->continuation = std::move(func); |
902 | d->continuationData = continuationFutureData; |
903 | } |
904 | } |
905 | |
906 | void QFutureInterfaceBase::cleanContinuation() |
907 | { |
908 | if (!d) |
909 | return; |
910 | |
911 | QMutexLocker lock(&d->continuationMutex); |
912 | d->continuation = nullptr; |
913 | d->continuationState = QFutureInterfaceBasePrivate::Cleaned; |
914 | d->continuationData = nullptr; |
915 | } |
916 | |
917 | void QFutureInterfaceBase::runContinuation() const |
918 | { |
919 | QMutexLocker lock(&d->continuationMutex); |
920 | if (d->continuation) { |
921 | // Save the continuation in a local function, to avoid calling |
922 | // a null std::function below, in case cleanContinuation() is |
923 | // called from some other thread right after unlock() below. |
924 | auto fn = std::move(d->continuation); |
925 | lock.unlock(); |
926 | fn(*this); |
927 | |
928 | lock.relock(); |
929 | // Unless the continuation has been cleaned earlier, we have to |
930 | // store the move-only continuation, to guarantee that the associated |
931 | // future's data stays alive. |
932 | if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) |
933 | d->continuation = std::move(fn); |
934 | } |
935 | } |
936 | |
937 | bool QFutureInterfaceBase::isChainCanceled() const |
938 | { |
939 | return isCanceled() || d->continuationState == QFutureInterfaceBasePrivate::Canceled; |
940 | } |
941 | |
942 | void QFutureInterfaceBase::setLaunchAsync(bool value) |
943 | { |
944 | d->launchAsync = value; |
945 | } |
946 | |
947 | bool QFutureInterfaceBase::launchAsync() const |
948 | { |
949 | return d->launchAsync; |
950 | } |
951 | |
952 | namespace QtFuture { |
953 | |
954 | QFuture<void> makeReadyVoidFuture() |
955 | { |
956 | QFutureInterface<void> promise; |
957 | promise.reportStarted(); |
958 | promise.reportFinished(); |
959 | |
960 | return promise.future(); |
961 | } |
962 | |
963 | } // namespace QtFuture |
964 | |
965 | QT_END_NAMESPACE |
966 | |
967 | #include "qfutureinterface.moc" |
968 |
Definitions
- ThreadPoolThreadReleaser
- ThreadPoolThreadReleaser
- ~ThreadPoolThreadReleaser
- suspendingOrSuspended
- QObjectContinuationWrapper
- QObjectContinuationWrapper
- watchContinuationImpl
- ~QFutureCallOutInterface
- QFutureCallOutEvent
- QFutureInterfaceBase
- QFutureInterfaceBase
- ~QFutureInterfaceBase
- switch_on
- switch_off
- switch_from_to
- cancel
- cancel
- setSuspended
- toggleSuspended
- reportSuspended
- setThrottled
- isRunning
- isStarted
- isCanceled
- isFinished
- isSuspending
- isPaused
- isSuspended
- isThrottled
- isResultReadyAt
- isValid
- isRunningOrPending
- waitForNextResult
- waitForResume
- suspendIfRequested
- progressValue
- progressMinimum
- progressMaximum
- resultCount
- progressText
- isProgressUpdateNeeded
- reportStarted
- reportCanceled
- reportException
- reportException
- reportFinished
- setExpectedResultCount
- expectedResultCount
- queryState
- loadState
- waitForResult
- waitForFinished
- reportResultsReady
- setRunnable
- setThreadPool
- threadPool
- setFilterMode
- setProgressRange
- setProgressValue
- setProgressValueAndText
- mutex
- hasException
- exceptionStore
- resultStoreBase
- resultStoreBase
- operator=
- swap
- refT
- derefT
- reset
- rethrowPossibleException
- QFutureInterfaceBasePrivate
- ~QFutureInterfaceBasePrivate
- internal_resultCount
- internal_isResultReadyAt
- internal_waitForNextResult
- internal_updateProgressValue
- internal_updateProgress
- internal_setThrottled
- sendCallOut
- sendCallOuts
- connectOutputInterface
- disconnectOutputInterface
- setState
- setContinuation
- setContinuation
- cleanContinuation
- runContinuation
- isChainCanceled
- setLaunchAsync
- launchAsync
Learn Advanced QML with KDAB
Find out more