1 | // Copyright (C) 2020 The Qt Company Ltd. |
2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only |
3 | |
4 | // qfutureinterface.h included from qfuture.h |
5 | #include "qfuture.h" |
6 | #include "qfutureinterface_p.h" |
7 | |
8 | #include <QtCore/qatomic.h> |
9 | #include <QtCore/qcoreapplication.h> |
10 | #include <QtCore/qthread.h> |
11 | #include <QtCore/qvarlengtharray.h> |
12 | #include <private/qthreadpool_p.h> |
13 | #include <private/qobject_p.h> |
14 | |
15 | #include <climits> // For INT_MAX |
16 | |
17 | // GCC 12 gets confused about QFutureInterfaceBase::state, for some non-obvious |
18 | // reason |
19 | // warning: ‘unsigned int __atomic_or_fetch_4(volatile void*, unsigned int, int)’ writing 4 bytes into a region of size 0 overflows the destination [-Wstringop-overflow=] |
20 | QT_WARNING_DISABLE_GCC("-Wstringop-overflow" ) |
21 | |
22 | QT_BEGIN_NAMESPACE |
23 | |
24 | enum { |
25 | MaxProgressEmitsPerSecond = 25 |
26 | }; |
27 | |
28 | namespace { |
29 | class ThreadPoolThreadReleaser { |
30 | QThreadPool *m_pool; |
31 | public: |
32 | Q_NODISCARD_CTOR |
33 | explicit ThreadPoolThreadReleaser(QThreadPool *pool) |
34 | : m_pool(pool) |
35 | { if (pool) pool->releaseThread(); } |
36 | ~ThreadPoolThreadReleaser() |
37 | { if (m_pool) m_pool->reserveThread(); } |
38 | }; |
39 | |
40 | const auto suspendingOrSuspended = |
41 | QFutureInterfaceBase::Suspending | QFutureInterfaceBase::Suspended; |
42 | |
43 | } // unnamed namespace |
44 | |
45 | class QObjectContinuationWrapper : public QObject |
46 | { |
47 | Q_OBJECT |
48 | public: |
49 | explicit QObjectContinuationWrapper(QObject *parent = nullptr) |
50 | : QObject(parent) |
51 | { |
52 | } |
53 | |
54 | signals: |
55 | void run(); |
56 | }; |
57 | |
58 | void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *slotObj, |
59 | QFutureInterfaceBase &fi) |
60 | { |
61 | Q_ASSERT(context); |
62 | Q_ASSERT(slotObj); |
63 | |
64 | auto slot = SlotObjUniquePtr(slotObj); |
65 | |
66 | auto *watcher = new QObjectContinuationWrapper; |
67 | watcher->moveToThread(thread: context->thread()); |
68 | |
69 | // We need to protect acccess to the watcher. The context object (and in turn, the watcher) |
70 | // could be destroyed while the continuation that emits the signal is running. We have to |
71 | // prevent that. |
72 | // The mutex has to be recursive, because the continuation itself could delete the context |
73 | // object (and thus the watcher), which will try to lock the mutex from the same thread twice. |
74 | auto watcherMutex = std::make_shared<QRecursiveMutex>(); |
75 | const auto destroyWatcher = [watcherMutex, watcher]() mutable { |
76 | QMutexLocker lock(watcherMutex.get()); |
77 | delete watcher; |
78 | }; |
79 | |
80 | // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`... |
81 | QObject::connect(sender: watcher, signal: &QObjectContinuationWrapper::run, |
82 | // for the following, cf. QMetaObject::invokeMethodImpl(): |
83 | // we know `slot` is a lambda returning `void`, so we can just |
84 | // `call()` with `obj` and `args[0]` set to `nullptr`: |
85 | context, slot: [slot = std::move(slot)] { |
86 | void *args[] = { nullptr }; // for `void` return value |
87 | slot->call(r: nullptr, a: args); |
88 | }); |
89 | QObject::connect(sender: watcher, signal: &QObjectContinuationWrapper::run, context: watcher, slot: destroyWatcher); |
90 | |
91 | // We need to connect to destroyWatcher here, instead of delete or deleteLater(). |
92 | // If the continuation is called from a separate thread, emit watcher->run() can't detect that |
93 | // the watcher has been deleted in the separate thread, causing a race condition and potential |
94 | // heap-use-after-free issue inside QObject::doActivate. destroyWatcher forces the deletion of |
95 | // the watcher to occur after emit watcher->run() completes and prevents the race condition. |
96 | QObject::connect(sender: context, signal: &QObject::destroyed, context: watcher, slot: destroyWatcher); |
97 | |
98 | fi.setContinuation([watcherMutex, watcher = QPointer(watcher)] |
99 | (const QFutureInterfaceBase &parentData) |
100 | { |
101 | Q_UNUSED(parentData); |
102 | QMutexLocker lock(watcherMutex.get()); |
103 | if (watcher) |
104 | emit watcher->run(); |
105 | }); |
106 | } |
107 | |
108 | QFutureCallOutInterface::~QFutureCallOutInterface() |
109 | = default; |
110 | |
111 | Q_IMPL_EVENT_COMMON(QFutureCallOutEvent) |
112 | |
113 | QFutureInterfaceBase::QFutureInterfaceBase(State initialState) |
114 | : d(new QFutureInterfaceBasePrivate(initialState)) |
115 | { } |
116 | |
117 | QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other) |
118 | : d(other.d) |
119 | { |
120 | d->refCount.ref(); |
121 | } |
122 | |
123 | QFutureInterfaceBase::~QFutureInterfaceBase() |
124 | { |
125 | if (d && !d->refCount.deref()) |
126 | delete d; |
127 | } |
128 | |
129 | static inline int switch_on(QAtomicInt &a, int which) |
130 | { |
131 | return a.fetchAndOrRelaxed(valueToAdd: which) | which; |
132 | } |
133 | |
134 | static inline int switch_off(QAtomicInt &a, int which) |
135 | { |
136 | return a.fetchAndAndRelaxed(valueToAdd: ~which) & ~which; |
137 | } |
138 | |
139 | static inline int switch_from_to(QAtomicInt &a, int from, int to) |
140 | { |
141 | const auto adjusted = [&](int old) { return (old & ~from) | to; }; |
142 | int value = a.loadRelaxed(); |
143 | while (!a.testAndSetRelaxed(expectedValue: value, newValue: adjusted(value), currentValue&: value)) |
144 | qYieldCpu(); |
145 | return value; |
146 | } |
147 | |
148 | void QFutureInterfaceBase::cancel() |
149 | { |
150 | cancel(mode: CancelMode::CancelOnly); |
151 | } |
152 | |
153 | void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode) |
154 | { |
155 | QMutexLocker locker(&d->m_mutex); |
156 | |
157 | const auto oldState = d->state.loadRelaxed(); |
158 | |
159 | switch (mode) { |
160 | case CancelMode::CancelAndFinish: |
161 | if ((oldState & Finished) && (oldState & Canceled)) |
162 | return; |
163 | switch_from_to(a&: d->state, from: suspendingOrSuspended | Running, to: Canceled | Finished); |
164 | break; |
165 | case CancelMode::CancelOnly: |
166 | if (oldState & Canceled) |
167 | return; |
168 | switch_from_to(a&: d->state, from: suspendingOrSuspended, to: Canceled); |
169 | break; |
170 | } |
171 | |
172 | // Cancel the continuations chain |
173 | QFutureInterfaceBasePrivate *next = d->continuationData; |
174 | while (next) { |
175 | next->continuationState = QFutureInterfaceBasePrivate::Canceled; |
176 | next = next->continuationData; |
177 | } |
178 | |
179 | d->waitCondition.wakeAll(); |
180 | d->pausedWaitCondition.wakeAll(); |
181 | |
182 | if (!(oldState & Canceled)) |
183 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); |
184 | if (mode == CancelMode::CancelAndFinish && !(oldState & Finished)) |
185 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Finished)); |
186 | |
187 | d->isValid = false; |
188 | } |
189 | |
190 | void QFutureInterfaceBase::setSuspended(bool suspend) |
191 | { |
192 | QMutexLocker locker(&d->m_mutex); |
193 | if (suspend) { |
194 | switch_on(a&: d->state, which: Suspending); |
195 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspending)); |
196 | } else { |
197 | switch_off(a&: d->state, which: suspendingOrSuspended); |
198 | d->pausedWaitCondition.wakeAll(); |
199 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed)); |
200 | } |
201 | } |
202 | |
203 | void QFutureInterfaceBase::toggleSuspended() |
204 | { |
205 | QMutexLocker locker(&d->m_mutex); |
206 | if (d->state.loadRelaxed() & suspendingOrSuspended) { |
207 | switch_off(a&: d->state, which: suspendingOrSuspended); |
208 | d->pausedWaitCondition.wakeAll(); |
209 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed)); |
210 | } else { |
211 | switch_on(a&: d->state, which: Suspending); |
212 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspending)); |
213 | } |
214 | } |
215 | |
216 | void QFutureInterfaceBase::reportSuspended() const |
217 | { |
218 | // Needs to be called when pause is in effect, |
219 | // i.e. no more events will be reported. |
220 | |
221 | QMutexLocker locker(&d->m_mutex); |
222 | const int state = d->state.loadRelaxed(); |
223 | if (!(state & Suspending) || (state & Suspended)) |
224 | return; |
225 | |
226 | switch_from_to(a&: d->state, from: Suspending, to: Suspended); |
227 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspended)); |
228 | } |
229 | |
230 | void QFutureInterfaceBase::setThrottled(bool enable) |
231 | { |
232 | QMutexLocker lock(&d->m_mutex); |
233 | if (enable) { |
234 | switch_on(a&: d->state, which: Throttled); |
235 | } else { |
236 | switch_off(a&: d->state, which: Throttled); |
237 | if (!(d->state.loadRelaxed() & suspendingOrSuspended)) |
238 | d->pausedWaitCondition.wakeAll(); |
239 | } |
240 | } |
241 | |
242 | |
243 | bool QFutureInterfaceBase::isRunning() const |
244 | { |
245 | return queryState(state: Running); |
246 | } |
247 | |
248 | bool QFutureInterfaceBase::isStarted() const |
249 | { |
250 | return queryState(state: Started); |
251 | } |
252 | |
253 | bool QFutureInterfaceBase::isCanceled() const |
254 | { |
255 | return queryState(state: Canceled); |
256 | } |
257 | |
258 | bool QFutureInterfaceBase::isFinished() const |
259 | { |
260 | return queryState(state: Finished); |
261 | } |
262 | |
263 | bool QFutureInterfaceBase::isSuspending() const |
264 | { |
265 | return queryState(state: Suspending); |
266 | } |
267 | |
268 | #if QT_DEPRECATED_SINCE(6, 0) |
269 | bool QFutureInterfaceBase::isPaused() const |
270 | { |
271 | return queryState(state: static_cast<State>(suspendingOrSuspended)); |
272 | } |
273 | #endif |
274 | |
275 | bool QFutureInterfaceBase::isSuspended() const |
276 | { |
277 | return queryState(state: Suspended); |
278 | } |
279 | |
280 | bool QFutureInterfaceBase::isThrottled() const |
281 | { |
282 | return queryState(state: Throttled); |
283 | } |
284 | |
285 | bool QFutureInterfaceBase::isResultReadyAt(int index) const |
286 | { |
287 | QMutexLocker lock(&d->m_mutex); |
288 | return d->internal_isResultReadyAt(index); |
289 | } |
290 | |
291 | bool QFutureInterfaceBase::isValid() const |
292 | { |
293 | const QMutexLocker lock(&d->m_mutex); |
294 | return d->isValid; |
295 | } |
296 | |
297 | bool QFutureInterfaceBase::isRunningOrPending() const |
298 | { |
299 | return queryState(state: static_cast<State>(Running | Pending)); |
300 | } |
301 | |
302 | bool QFutureInterfaceBase::waitForNextResult() |
303 | { |
304 | QMutexLocker lock(&d->m_mutex); |
305 | return d->internal_waitForNextResult(); |
306 | } |
307 | |
308 | void QFutureInterfaceBase::waitForResume() |
309 | { |
310 | // return early if possible to avoid taking the mutex lock. |
311 | { |
312 | const int state = d->state.loadRelaxed(); |
313 | if (!(state & suspendingOrSuspended) || (state & Canceled)) |
314 | return; |
315 | } |
316 | |
317 | QMutexLocker lock(&d->m_mutex); |
318 | const int state = d->state.loadRelaxed(); |
319 | if (!(state & suspendingOrSuspended) || (state & Canceled)) |
320 | return; |
321 | |
322 | // decrease active thread count since this thread will wait. |
323 | const ThreadPoolThreadReleaser releaser(d->pool()); |
324 | |
325 | d->pausedWaitCondition.wait(lockedMutex: &d->m_mutex); |
326 | } |
327 | |
328 | void QFutureInterfaceBase::suspendIfRequested() |
329 | { |
330 | const auto canSuspend = [] (int state) { |
331 | // can suspend only if 1) in any suspend-related state; 2) not canceled |
332 | return (state & suspendingOrSuspended) && !(state & Canceled); |
333 | }; |
334 | |
335 | // return early if possible to avoid taking the mutex lock. |
336 | { |
337 | const int state = d->state.loadRelaxed(); |
338 | if (!canSuspend(state)) |
339 | return; |
340 | } |
341 | |
342 | QMutexLocker lock(&d->m_mutex); |
343 | const int state = d->state.loadRelaxed(); |
344 | if (!canSuspend(state)) |
345 | return; |
346 | |
347 | // Note: expecting that Suspending and Suspended are mutually exclusive |
348 | if (!(state & Suspended)) { |
349 | // switch state in case this is the first invocation |
350 | switch_from_to(a&: d->state, from: Suspending, to: Suspended); |
351 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Suspended)); |
352 | } |
353 | |
354 | // decrease active thread count since this thread will wait. |
355 | const ThreadPoolThreadReleaser releaser(d->pool()); |
356 | d->pausedWaitCondition.wait(lockedMutex: &d->m_mutex); |
357 | } |
358 | |
359 | int QFutureInterfaceBase::progressValue() const |
360 | { |
361 | const QMutexLocker lock(&d->m_mutex); |
362 | return d->m_progressValue; |
363 | } |
364 | |
365 | int QFutureInterfaceBase::progressMinimum() const |
366 | { |
367 | const QMutexLocker lock(&d->m_mutex); |
368 | return d->m_progress ? d->m_progress->minimum : 0; |
369 | } |
370 | |
371 | int QFutureInterfaceBase::progressMaximum() const |
372 | { |
373 | const QMutexLocker lock(&d->m_mutex); |
374 | return d->m_progress ? d->m_progress->maximum : 0; |
375 | } |
376 | |
377 | int QFutureInterfaceBase::resultCount() const |
378 | { |
379 | QMutexLocker lock(&d->m_mutex); |
380 | return d->internal_resultCount(); |
381 | } |
382 | |
383 | QString QFutureInterfaceBase::progressText() const |
384 | { |
385 | QMutexLocker locker(&d->m_mutex); |
386 | return d->m_progress ? d->m_progress->text : QString(); |
387 | } |
388 | |
389 | bool QFutureInterfaceBase::isProgressUpdateNeeded() const |
390 | { |
391 | QMutexLocker locker(&d->m_mutex); |
392 | return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond)); |
393 | } |
394 | |
395 | void QFutureInterfaceBase::reportStarted() |
396 | { |
397 | QMutexLocker locker(&d->m_mutex); |
398 | if (d->state.loadRelaxed() & (Started|Canceled|Finished)) |
399 | return; |
400 | d->setState(State(Started | Running)); |
401 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Started)); |
402 | d->isValid = true; |
403 | } |
404 | |
405 | void QFutureInterfaceBase::reportCanceled() |
406 | { |
407 | cancel(); |
408 | } |
409 | |
410 | #ifndef QT_NO_EXCEPTIONS |
411 | void QFutureInterfaceBase::reportException(const QException &exception) |
412 | { |
413 | try { |
414 | exception.raise(); |
415 | } catch (...) { |
416 | reportException(e: std::current_exception()); |
417 | } |
418 | } |
419 | |
420 | #if QT_VERSION < QT_VERSION_CHECK(7, 0, 0) |
421 | void QFutureInterfaceBase::reportException(std::exception_ptr exception) |
422 | #else |
423 | void QFutureInterfaceBase::reportException(const std::exception_ptr &exception) |
424 | #endif |
425 | { |
426 | QMutexLocker locker(&d->m_mutex); |
427 | if (d->state.loadRelaxed() & (Canceled|Finished)) |
428 | return; |
429 | |
430 | d->hasException = true; |
431 | d->data.setException(exception); |
432 | switch_on(a&: d->state, which: Canceled); |
433 | d->waitCondition.wakeAll(); |
434 | d->pausedWaitCondition.wakeAll(); |
435 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); |
436 | } |
437 | #endif |
438 | |
439 | void QFutureInterfaceBase::reportFinished() |
440 | { |
441 | QMutexLocker locker(&d->m_mutex); |
442 | if (!isFinished()) { |
443 | switch_from_to(a&: d->state, from: Running, to: Finished); |
444 | d->waitCondition.wakeAll(); |
445 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Finished)); |
446 | } |
447 | } |
448 | |
449 | void QFutureInterfaceBase::setExpectedResultCount(int resultCount) |
450 | { |
451 | if (d->m_progress) |
452 | setProgressRange(minimum: 0, maximum: resultCount); |
453 | d->m_expectedResultCount = resultCount; |
454 | } |
455 | |
456 | int QFutureInterfaceBase::expectedResultCount() |
457 | { |
458 | return d->m_expectedResultCount; |
459 | } |
460 | |
461 | bool QFutureInterfaceBase::queryState(State state) const |
462 | { |
463 | return d->state.loadRelaxed() & state; |
464 | } |
465 | |
466 | int QFutureInterfaceBase::loadState() const |
467 | { |
468 | // Used from ~QPromise, so this check is needed |
469 | if (!d) |
470 | return QFutureInterfaceBase::State::NoState; |
471 | return d->state.loadRelaxed(); |
472 | } |
473 | |
474 | void QFutureInterfaceBase::waitForResult(int resultIndex) |
475 | { |
476 | if (d->hasException) |
477 | d->data.m_exceptionStore.rethrowException(); |
478 | |
479 | QMutexLocker lock(&d->m_mutex); |
480 | if (!isRunningOrPending()) |
481 | return; |
482 | lock.unlock(); |
483 | |
484 | // To avoid deadlocks and reduce the number of threads used, try to |
485 | // run the runnable in the current thread. |
486 | d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable); |
487 | |
488 | lock.relock(); |
489 | |
490 | const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex; |
491 | while (isRunningOrPending() && !d->internal_isResultReadyAt(index: waitIndex)) |
492 | d->waitCondition.wait(lockedMutex: &d->m_mutex); |
493 | |
494 | if (d->hasException) |
495 | d->data.m_exceptionStore.rethrowException(); |
496 | } |
497 | |
498 | void QFutureInterfaceBase::waitForFinished() |
499 | { |
500 | QMutexLocker lock(&d->m_mutex); |
501 | const bool alreadyFinished = isFinished(); |
502 | lock.unlock(); |
503 | |
504 | if (!alreadyFinished) { |
505 | d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable); |
506 | |
507 | lock.relock(); |
508 | |
509 | while (!isFinished()) |
510 | d->waitCondition.wait(lockedMutex: &d->m_mutex); |
511 | } |
512 | |
513 | if (d->hasException) |
514 | d->data.m_exceptionStore.rethrowException(); |
515 | } |
516 | |
517 | void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex) |
518 | { |
519 | if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished))) |
520 | return; |
521 | |
522 | d->waitCondition.wakeAll(); |
523 | |
524 | if (!d->m_progress) { |
525 | if (d->internal_updateProgressValue(progress: d->m_progressValue + endIndex - beginIndex) == false) { |
526 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, |
527 | beginIndex, |
528 | endIndex)); |
529 | return; |
530 | } |
531 | |
532 | d->sendCallOuts(callOut1: QFutureCallOutEvent(QFutureCallOutEvent::Progress, |
533 | d->m_progressValue, |
534 | QString()), |
535 | callOut2: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, |
536 | beginIndex, |
537 | endIndex)); |
538 | return; |
539 | } |
540 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex)); |
541 | } |
542 | |
543 | void QFutureInterfaceBase::setRunnable(QRunnable *runnable) |
544 | { |
545 | d->runnable = runnable; |
546 | } |
547 | |
548 | void QFutureInterfaceBase::setThreadPool(QThreadPool *pool) |
549 | { |
550 | d->m_pool = pool; |
551 | } |
552 | |
553 | QThreadPool *QFutureInterfaceBase::threadPool() const |
554 | { |
555 | return d->m_pool; |
556 | } |
557 | |
558 | void QFutureInterfaceBase::setFilterMode(bool enable) |
559 | { |
560 | QMutexLocker locker(&d->m_mutex); |
561 | if (!hasException()) |
562 | resultStoreBase().setFilterMode(enable); |
563 | } |
564 | |
565 | /*! |
566 | \internal |
567 | Sets the progress range's minimum and maximum values to \a minimum and |
568 | \a maximum respectively. |
569 | |
570 | If \a maximum is smaller than \a minimum, \a minimum becomes the only |
571 | legal value. |
572 | |
573 | The progress value is reset to be \a minimum. |
574 | |
575 | The progress range usage can be disabled by using setProgressRange(0, 0). |
576 | In this case progress value is also reset to 0. |
577 | |
578 | The behavior of this method is mostly inspired by |
579 | \l QProgressBar::setRange. |
580 | */ |
581 | void QFutureInterfaceBase::setProgressRange(int minimum, int maximum) |
582 | { |
583 | QMutexLocker locker(&d->m_mutex); |
584 | if (!d->m_progress) |
585 | d->m_progress.reset(other: new QFutureInterfaceBasePrivate::ProgressData()); |
586 | d->m_progress->minimum = minimum; |
587 | d->m_progress->maximum = qMax(a: minimum, b: maximum); |
588 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum)); |
589 | d->m_progressValue = minimum; |
590 | } |
591 | |
592 | void QFutureInterfaceBase::setProgressValue(int progressValue) |
593 | { |
594 | setProgressValueAndText(progressValue, progressText: QString()); |
595 | } |
596 | |
597 | /*! |
598 | \internal |
599 | In case of the \a progressValue falling out of the progress range, |
600 | this method has no effect. |
601 | Such behavior is inspired by \l QProgressBar::setValue. |
602 | */ |
603 | void QFutureInterfaceBase::setProgressValueAndText(int progressValue, |
604 | const QString &progressText) |
605 | { |
606 | QMutexLocker locker(&d->m_mutex); |
607 | if (!d->m_progress) |
608 | d->m_progress.reset(other: new QFutureInterfaceBasePrivate::ProgressData()); |
609 | |
610 | const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0); |
611 | if (useProgressRange |
612 | && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) { |
613 | return; |
614 | } |
615 | |
616 | if (d->m_progressValue >= progressValue) |
617 | return; |
618 | |
619 | if (d->state.loadRelaxed() & (Canceled|Finished)) |
620 | return; |
621 | |
622 | if (d->internal_updateProgress(progress: progressValue, progressText)) { |
623 | d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Progress, |
624 | d->m_progressValue, |
625 | d->m_progress->text)); |
626 | } |
627 | } |
628 | |
629 | QMutex &QFutureInterfaceBase::mutex() const |
630 | { |
631 | return d->m_mutex; |
632 | } |
633 | |
634 | bool QFutureInterfaceBase::hasException() const |
635 | { |
636 | return d->hasException; |
637 | } |
638 | |
639 | QtPrivate::ExceptionStore &QFutureInterfaceBase::exceptionStore() |
640 | { |
641 | Q_ASSERT(d->hasException); |
642 | return d->data.m_exceptionStore; |
643 | } |
644 | |
645 | QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() |
646 | { |
647 | Q_ASSERT(!d->hasException); |
648 | return d->data.m_results; |
649 | } |
650 | |
651 | const QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const |
652 | { |
653 | Q_ASSERT(!d->hasException); |
654 | return d->data.m_results; |
655 | } |
656 | |
657 | QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other) |
658 | { |
659 | QFutureInterfaceBase copy(other); |
660 | swap(other&: copy); |
661 | return *this; |
662 | } |
663 | |
664 | // ### Qt 7: inline |
665 | void QFutureInterfaceBase::swap(QFutureInterfaceBase &other) noexcept |
666 | { |
667 | qSwap(value1&: d, value2&: other.d); |
668 | } |
669 | |
670 | bool QFutureInterfaceBase::refT() const noexcept |
671 | { |
672 | return d->refCount.refT(); |
673 | } |
674 | |
675 | bool QFutureInterfaceBase::derefT() const noexcept |
676 | { |
677 | // Called from ~QFutureInterface |
678 | return !d || d->refCount.derefT(); |
679 | } |
680 | |
681 | void QFutureInterfaceBase::reset() |
682 | { |
683 | d->m_progressValue = 0; |
684 | d->m_progress.reset(); |
685 | d->progressTime.invalidate(); |
686 | d->isValid = false; |
687 | } |
688 | |
689 | void QFutureInterfaceBase::rethrowPossibleException() |
690 | { |
691 | if (hasException()) |
692 | exceptionStore().rethrowException(); |
693 | } |
694 | |
695 | QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState) |
696 | : state(initialState) |
697 | { |
698 | progressTime.invalidate(); |
699 | } |
700 | |
701 | QFutureInterfaceBasePrivate::~QFutureInterfaceBasePrivate() |
702 | { |
703 | if (hasException) |
704 | data.m_exceptionStore.~ExceptionStore(); |
705 | else |
706 | data.m_results.~ResultStoreBase(); |
707 | } |
708 | |
709 | int QFutureInterfaceBasePrivate::internal_resultCount() const |
710 | { |
711 | return hasException ? 0 : data.m_results.count(); // ### subtract canceled results. |
712 | } |
713 | |
714 | bool QFutureInterfaceBasePrivate::internal_isResultReadyAt(int index) const |
715 | { |
716 | return hasException ? false : (data.m_results.contains(index)); |
717 | } |
718 | |
719 | bool QFutureInterfaceBasePrivate::internal_waitForNextResult() |
720 | { |
721 | if (hasException) |
722 | return false; |
723 | |
724 | if (data.m_results.hasNextResult()) |
725 | return true; |
726 | |
727 | while ((state.loadRelaxed() & QFutureInterfaceBase::Running) |
728 | && data.m_results.hasNextResult() == false) |
729 | waitCondition.wait(lockedMutex: &m_mutex); |
730 | |
731 | return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled) |
732 | && data.m_results.hasNextResult(); |
733 | } |
734 | |
735 | bool QFutureInterfaceBasePrivate::internal_updateProgressValue(int progress) |
736 | { |
737 | if (m_progressValue >= progress) |
738 | return false; |
739 | |
740 | m_progressValue = progress; |
741 | |
742 | if (progressTime.isValid() && m_progressValue != 0) // make sure the first and last steps are emitted. |
743 | if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond)) |
744 | return false; |
745 | |
746 | progressTime.start(); |
747 | return true; |
748 | |
749 | } |
750 | |
751 | bool QFutureInterfaceBasePrivate::internal_updateProgress(int progress, |
752 | const QString &progressText) |
753 | { |
754 | if (m_progressValue >= progress) |
755 | return false; |
756 | |
757 | Q_ASSERT(m_progress); |
758 | |
759 | m_progressValue = progress; |
760 | m_progress->text = progressText; |
761 | |
762 | if (progressTime.isValid() && m_progressValue != m_progress->maximum) // make sure the first and last steps are emitted. |
763 | if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond)) |
764 | return false; |
765 | |
766 | progressTime.start(); |
767 | return true; |
768 | } |
769 | |
770 | void QFutureInterfaceBasePrivate::internal_setThrottled(bool enable) |
771 | { |
772 | // bail out if we are not changing the state |
773 | if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled)) |
774 | || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled))) |
775 | return; |
776 | |
777 | // change the state |
778 | if (enable) { |
779 | switch_on(a&: state, which: QFutureInterfaceBase::Throttled); |
780 | } else { |
781 | switch_off(a&: state, which: QFutureInterfaceBase::Throttled); |
782 | if (!(state.loadRelaxed() & suspendingOrSuspended)) |
783 | pausedWaitCondition.wakeAll(); |
784 | } |
785 | } |
786 | |
787 | void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent) |
788 | { |
789 | if (outputConnections.isEmpty()) |
790 | return; |
791 | |
792 | for (int i = 0; i < outputConnections.size(); ++i) |
793 | outputConnections.at(i)->postCallOutEvent(callOutEvent); |
794 | } |
795 | |
796 | void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1, |
797 | const QFutureCallOutEvent &callOutEvent2) |
798 | { |
799 | if (outputConnections.isEmpty()) |
800 | return; |
801 | |
802 | for (int i = 0; i < outputConnections.size(); ++i) { |
803 | QFutureCallOutInterface *iface = outputConnections.at(i); |
804 | iface->postCallOutEvent(callOutEvent1); |
805 | iface->postCallOutEvent(callOutEvent2); |
806 | } |
807 | } |
808 | |
809 | // This function connects an output interface (for example a QFutureWatcher) |
810 | // to this future. While holding the lock we check the state and ready results |
811 | // and add the appropriate callouts to the queue. |
812 | void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *iface) |
813 | { |
814 | QMutexLocker locker(&m_mutex); |
815 | |
816 | const auto currentState = state.loadRelaxed(); |
817 | if (currentState & QFutureInterfaceBase::Started) { |
818 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started)); |
819 | if (m_progress) { |
820 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, |
821 | m_progress->minimum, |
822 | m_progress->maximum)); |
823 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress, |
824 | m_progressValue, |
825 | m_progress->text)); |
826 | } else { |
827 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, |
828 | 0, |
829 | 0)); |
830 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress, |
831 | m_progressValue, |
832 | QString())); |
833 | } |
834 | } |
835 | |
836 | if (!hasException) { |
837 | QtPrivate::ResultIteratorBase it = data.m_results.begin(); |
838 | while (it != data.m_results.end()) { |
839 | const int begin = it.resultIndex(); |
840 | const int end = begin + it.batchSize(); |
841 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, |
842 | begin, |
843 | end)); |
844 | it.batchedAdvance(); |
845 | } |
846 | } |
847 | |
848 | if (currentState & QFutureInterfaceBase::Suspended) |
849 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended)); |
850 | else if (currentState & QFutureInterfaceBase::Suspending) |
851 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending)); |
852 | |
853 | if (currentState & QFutureInterfaceBase::Canceled) |
854 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); |
855 | |
856 | if (currentState & QFutureInterfaceBase::Finished) |
857 | iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished)); |
858 | |
859 | outputConnections.append(t: iface); |
860 | } |
861 | |
862 | void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *iface) |
863 | { |
864 | QMutexLocker lock(&m_mutex); |
865 | const qsizetype index = outputConnections.indexOf(t: iface); |
866 | if (index == -1) |
867 | return; |
868 | outputConnections.removeAt(i: index); |
869 | |
870 | iface->callOutInterfaceDisconnected(); |
871 | } |
872 | |
873 | void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState) |
874 | { |
875 | state.storeRelaxed(newValue: newState); |
876 | } |
877 | |
878 | void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInterfaceBase &)> func) |
879 | { |
880 | setContinuation(func: std::move(func), continuationFutureData: nullptr); |
881 | } |
882 | |
883 | void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInterfaceBase &)> func, |
884 | QFutureInterfaceBasePrivate *continuationFutureData) |
885 | { |
886 | QMutexLocker lock(&d->continuationMutex); |
887 | |
888 | // If the state is ready, run continuation immediately, |
889 | // otherwise save it for later. |
890 | if (isFinished()) { |
891 | d->continuationExecuted = true; |
892 | lock.unlock(); |
893 | func(*this); |
894 | lock.relock(); |
895 | } |
896 | // Unless the continuation has been cleaned earlier, we have to |
897 | // store the move-only continuation, to guarantee that the associated |
898 | // future's data stays alive. |
899 | if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) { |
900 | if (d->continuation) { |
901 | qWarning() << "Adding a continuation to a future which already has a continuation. " |
902 | "The existing continuation is overwritten." ; |
903 | } |
904 | d->continuation = std::move(func); |
905 | d->continuationData = continuationFutureData; |
906 | } |
907 | } |
908 | |
909 | void QFutureInterfaceBase::cleanContinuation() |
910 | { |
911 | if (!d) |
912 | return; |
913 | |
914 | QMutexLocker lock(&d->continuationMutex); |
915 | d->continuation = nullptr; |
916 | d->continuationState = QFutureInterfaceBasePrivate::Cleaned; |
917 | d->continuationData = nullptr; |
918 | } |
919 | |
920 | void QFutureInterfaceBase::runContinuation() const |
921 | { |
922 | QMutexLocker lock(&d->continuationMutex); |
923 | if (d->continuation && !d->continuationExecuted) { |
924 | // Save the continuation in a local function, to avoid calling |
925 | // a null std::function below, in case cleanContinuation() is |
926 | // called from some other thread right after unlock() below. |
927 | d->continuationExecuted = true; |
928 | auto fn = std::move(d->continuation); |
929 | lock.unlock(); |
930 | fn(*this); |
931 | |
932 | lock.relock(); |
933 | // Unless the continuation has been cleaned earlier, we have to |
934 | // store the move-only continuation, to guarantee that the associated |
935 | // future's data stays alive. |
936 | if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) |
937 | d->continuation = std::move(fn); |
938 | } |
939 | } |
940 | |
941 | bool QFutureInterfaceBase::isChainCanceled() const |
942 | { |
943 | return isCanceled() || d->continuationState == QFutureInterfaceBasePrivate::Canceled; |
944 | } |
945 | |
946 | void QFutureInterfaceBase::setLaunchAsync(bool value) |
947 | { |
948 | d->launchAsync = value; |
949 | } |
950 | |
951 | bool QFutureInterfaceBase::launchAsync() const |
952 | { |
953 | return d->launchAsync; |
954 | } |
955 | |
956 | namespace QtFuture { |
957 | |
958 | QFuture<void> makeReadyVoidFuture() |
959 | { |
960 | QFutureInterface<void> promise; |
961 | promise.reportStarted(); |
962 | promise.reportFinished(); |
963 | |
964 | return promise.future(); |
965 | } |
966 | |
967 | } // namespace QtFuture |
968 | |
969 | QT_END_NAMESPACE |
970 | |
971 | #include "qfutureinterface.moc" |
972 | |