1/****************************************************************************
2**
3** Copyright (C) 2016 The Qt Company Ltd.
4** Contact: https://www.qt.io/licensing/
5**
6** This file is part of the QtCore module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial License Usage
10** Licensees holding valid commercial Qt licenses may use this file in
11** accordance with the commercial license agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and The Qt Company. For licensing terms
14** and conditions see https://www.qt.io/terms-conditions. For further
15** information use the contact form at https://www.qt.io/contact-us.
16**
17** GNU Lesser General Public License Usage
18** Alternatively, this file may be used under the terms of the GNU Lesser
19** General Public License version 3 as published by the Free Software
20** Foundation and appearing in the file LICENSE.LGPL3 included in the
21** packaging of this file. Please review the following information to
22** ensure the GNU Lesser General Public License version 3 requirements
23** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
24**
25** GNU General Public License Usage
26** Alternatively, this file may be used under the terms of the GNU
27** General Public License version 2.0 or (at your option) the GNU General
28** Public license version 3 or any later version approved by the KDE Free
29** Qt Foundation. The licenses are as published by the Free Software
30** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
31** included in the packaging of this file. Please review the following
32** information to ensure the GNU General Public License requirements will
33** be met: https://www.gnu.org/licenses/gpl-2.0.html and
34** https://www.gnu.org/licenses/gpl-3.0.html.
35**
36** $QT_END_LICENSE$
37**
38****************************************************************************/
39
40// qfutureinterface.h included from qfuture.h
41#include "qfuture.h"
42#include "qfutureinterface_p.h"
43
44#include <QtCore/qatomic.h>
45#include <QtCore/qthread.h>
46#include <private/qthreadpool_p.h>
47
48#ifdef interface
49# undef interface
50#endif
51
52QT_BEGIN_NAMESPACE
53
54enum {
55 MaxProgressEmitsPerSecond = 25
56};
57
58namespace {
59class ThreadPoolThreadReleaser {
60 QThreadPool *m_pool;
61public:
62 explicit ThreadPoolThreadReleaser(QThreadPool *pool)
63 : m_pool(pool)
64 { if (pool) pool->releaseThread(); }
65 ~ThreadPoolThreadReleaser()
66 { if (m_pool) m_pool->reserveThread(); }
67};
68} // unnamed namespace
69
70
71QFutureInterfaceBase::QFutureInterfaceBase(State initialState)
72 : d(new QFutureInterfaceBasePrivate(initialState))
73{ }
74
75QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other)
76 : d(other.d)
77{
78 d->refCount.ref();
79}
80
81QFutureInterfaceBase::~QFutureInterfaceBase()
82{
83 if (!d->refCount.deref())
84 delete d;
85}
86
87static inline int switch_on(QAtomicInt &a, int which)
88{
89 return a.fetchAndOrRelaxed(valueToAdd: which) | which;
90}
91
92static inline int switch_off(QAtomicInt &a, int which)
93{
94 return a.fetchAndAndRelaxed(valueToAdd: ~which) & ~which;
95}
96
97static inline int switch_from_to(QAtomicInt &a, int from, int to)
98{
99 int newValue;
100 int expected = a.loadRelaxed();
101 do {
102 newValue = (expected & ~from) | to;
103 } while (!a.testAndSetRelaxed(expectedValue: expected, newValue, currentValue&: expected));
104 return newValue;
105}
106
107void QFutureInterfaceBase::cancel()
108{
109 QMutexLocker locker(&d->m_mutex);
110 if (d->state.loadRelaxed() & Canceled)
111 return;
112
113 switch_from_to(a&: d->state, from: Paused, to: Canceled);
114 d->waitCondition.wakeAll();
115 d->pausedWaitCondition.wakeAll();
116 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
117}
118
119void QFutureInterfaceBase::setPaused(bool paused)
120{
121 QMutexLocker locker(&d->m_mutex);
122 if (paused) {
123 switch_on(a&: d->state, which: Paused);
124 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Paused));
125 } else {
126 switch_off(a&: d->state, which: Paused);
127 d->pausedWaitCondition.wakeAll();
128 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
129 }
130}
131
132void QFutureInterfaceBase::togglePaused()
133{
134 QMutexLocker locker(&d->m_mutex);
135 if (d->state.loadRelaxed() & Paused) {
136 switch_off(a&: d->state, which: Paused);
137 d->pausedWaitCondition.wakeAll();
138 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
139 } else {
140 switch_on(a&: d->state, which: Paused);
141 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Paused));
142 }
143}
144
145void QFutureInterfaceBase::setThrottled(bool enable)
146{
147 QMutexLocker lock(&d->m_mutex);
148 if (enable) {
149 switch_on(a&: d->state, which: Throttled);
150 } else {
151 switch_off(a&: d->state, which: Throttled);
152 if (!(d->state.loadRelaxed() & Paused))
153 d->pausedWaitCondition.wakeAll();
154 }
155}
156
157
158bool QFutureInterfaceBase::isRunning() const
159{
160 return queryState(state: Running);
161}
162
163bool QFutureInterfaceBase::isStarted() const
164{
165 return queryState(state: Started);
166}
167
168bool QFutureInterfaceBase::isCanceled() const
169{
170 return queryState(state: Canceled);
171}
172
173bool QFutureInterfaceBase::isFinished() const
174{
175 return queryState(state: Finished);
176}
177
178bool QFutureInterfaceBase::isPaused() const
179{
180 return queryState(state: Paused);
181}
182
183bool QFutureInterfaceBase::isThrottled() const
184{
185 return queryState(state: Throttled);
186}
187
188bool QFutureInterfaceBase::isResultReadyAt(int index) const
189{
190 QMutexLocker lock(&d->m_mutex);
191 return d->internal_isResultReadyAt(index);
192}
193
194bool QFutureInterfaceBase::waitForNextResult()
195{
196 QMutexLocker lock(&d->m_mutex);
197 return d->internal_waitForNextResult();
198}
199
200void QFutureInterfaceBase::waitForResume()
201{
202 // return early if possible to avoid taking the mutex lock.
203 {
204 const int state = d->state.loadRelaxed();
205 if (!(state & Paused) || (state & Canceled))
206 return;
207 }
208
209 QMutexLocker lock(&d->m_mutex);
210 const int state = d->state.loadRelaxed();
211 if (!(state & Paused) || (state & Canceled))
212 return;
213
214 // decrease active thread count since this thread will wait.
215 const ThreadPoolThreadReleaser releaser(d->pool());
216
217 d->pausedWaitCondition.wait(lockedMutex: &d->m_mutex);
218}
219
220int QFutureInterfaceBase::progressValue() const
221{
222 const QMutexLocker lock(&d->m_mutex);
223 return d->m_progressValue;
224}
225
226int QFutureInterfaceBase::progressMinimum() const
227{
228 const QMutexLocker lock(&d->m_mutex);
229 return d->m_progressMinimum;
230}
231
232int QFutureInterfaceBase::progressMaximum() const
233{
234 const QMutexLocker lock(&d->m_mutex);
235 return d->m_progressMaximum;
236}
237
238int QFutureInterfaceBase::resultCount() const
239{
240 QMutexLocker lock(&d->m_mutex);
241 return d->internal_resultCount();
242}
243
244QString QFutureInterfaceBase::progressText() const
245{
246 QMutexLocker locker(&d->m_mutex);
247 return d->m_progressText;
248}
249
250bool QFutureInterfaceBase::isProgressUpdateNeeded() const
251{
252 QMutexLocker locker(&d->m_mutex);
253 return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
254}
255
256void QFutureInterfaceBase::reportStarted()
257{
258 QMutexLocker locker(&d->m_mutex);
259 if (d->state.loadRelaxed() & (Started|Canceled|Finished))
260 return;
261
262 d->setState(State(Started | Running));
263 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Started));
264}
265
266void QFutureInterfaceBase::reportCanceled()
267{
268 cancel();
269}
270
271#ifndef QT_NO_EXCEPTIONS
272void QFutureInterfaceBase::reportException(const QException &exception)
273{
274 QMutexLocker locker(&d->m_mutex);
275 if (d->state.loadRelaxed() & (Canceled|Finished))
276 return;
277
278 d->m_exceptionStore.setException(exception);
279 switch_on(a&: d->state, which: Canceled);
280 d->waitCondition.wakeAll();
281 d->pausedWaitCondition.wakeAll();
282 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
283}
284#endif
285
286void QFutureInterfaceBase::reportFinished()
287{
288 QMutexLocker locker(&d->m_mutex);
289 if (!isFinished()) {
290 switch_from_to(a&: d->state, from: Running, to: Finished);
291 d->waitCondition.wakeAll();
292 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Finished));
293 }
294}
295
296void QFutureInterfaceBase::setExpectedResultCount(int resultCount)
297{
298 if (d->manualProgress == false)
299 setProgressRange(minimum: 0, maximum: resultCount);
300 d->m_expectedResultCount = resultCount;
301}
302
303int QFutureInterfaceBase::expectedResultCount()
304{
305 return d->m_expectedResultCount;
306}
307
308bool QFutureInterfaceBase::queryState(State state) const
309{
310 return d->state.loadRelaxed() & state;
311}
312
313void QFutureInterfaceBase::waitForResult(int resultIndex)
314{
315 d->m_exceptionStore.throwPossibleException();
316
317 QMutexLocker lock(&d->m_mutex);
318 if (!isRunning())
319 return;
320 lock.unlock();
321
322 // To avoid deadlocks and reduce the number of threads used, try to
323 // run the runnable in the current thread.
324 d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable);
325
326 lock.relock();
327
328 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
329 while (isRunning() && !d->internal_isResultReadyAt(index: waitIndex))
330 d->waitCondition.wait(lockedMutex: &d->m_mutex);
331
332 d->m_exceptionStore.throwPossibleException();
333}
334
335void QFutureInterfaceBase::waitForFinished()
336{
337 QMutexLocker lock(&d->m_mutex);
338 const bool alreadyFinished = !isRunning();
339 lock.unlock();
340
341 if (!alreadyFinished) {
342 d->pool()->d_func()->stealAndRunRunnable(runnable: d->runnable);
343
344 lock.relock();
345
346 while (isRunning())
347 d->waitCondition.wait(lockedMutex: &d->m_mutex);
348 }
349
350 d->m_exceptionStore.throwPossibleException();
351}
352
353void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
354{
355 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
356 return;
357
358 d->waitCondition.wakeAll();
359
360 if (d->manualProgress == false) {
361 if (d->internal_updateProgress(progress: d->m_progressValue + endIndex - beginIndex) == false) {
362 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
363 beginIndex,
364 endIndex));
365 return;
366 }
367
368 d->sendCallOuts(callOut1: QFutureCallOutEvent(QFutureCallOutEvent::Progress,
369 d->m_progressValue,
370 d->m_progressText),
371 callOut2: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
372 beginIndex,
373 endIndex));
374 return;
375 }
376 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
377}
378
379void QFutureInterfaceBase::setRunnable(QRunnable *runnable)
380{
381 d->runnable = runnable;
382}
383
384void QFutureInterfaceBase::setThreadPool(QThreadPool *pool)
385{
386 d->m_pool = pool;
387}
388
389void QFutureInterfaceBase::setFilterMode(bool enable)
390{
391 QMutexLocker locker(&d->m_mutex);
392 resultStoreBase().setFilterMode(enable);
393}
394
395void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
396{
397 QMutexLocker locker(&d->m_mutex);
398 d->m_progressMinimum = minimum;
399 d->m_progressMaximum = maximum;
400 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
401}
402
403void QFutureInterfaceBase::setProgressValue(int progressValue)
404{
405 setProgressValueAndText(progressValue, progressText: QString());
406}
407
408void QFutureInterfaceBase::setProgressValueAndText(int progressValue,
409 const QString &progressText)
410{
411 QMutexLocker locker(&d->m_mutex);
412 if (d->manualProgress == false)
413 d->manualProgress = true;
414 if (d->m_progressValue >= progressValue)
415 return;
416
417 if (d->state.loadRelaxed() & (Canceled|Finished))
418 return;
419
420 if (d->internal_updateProgress(progress: progressValue, progressText)) {
421 d->sendCallOut(callOut: QFutureCallOutEvent(QFutureCallOutEvent::Progress,
422 d->m_progressValue,
423 d->m_progressText));
424 }
425}
426
427QMutex *QFutureInterfaceBase::mutex() const
428{
429 return &d->m_mutex;
430}
431
432QMutex &QFutureInterfaceBase::mutex(int) const
433{
434 return d->m_mutex;
435}
436
437QtPrivate::ExceptionStore &QFutureInterfaceBase::exceptionStore()
438{
439 return d->m_exceptionStore;
440}
441
442QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase()
443{
444 return d->m_results;
445}
446
447const QtPrivate::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const
448{
449 return d->m_results;
450}
451
452QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other)
453{
454 other.d->refCount.ref();
455 if (!d->refCount.deref())
456 delete d;
457 d = other.d;
458 return *this;
459}
460
461bool QFutureInterfaceBase::refT() const
462{
463 return d->refCount.refT();
464}
465
466bool QFutureInterfaceBase::derefT() const
467{
468 return d->refCount.derefT();
469}
470
471QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
472 : refCount(1), m_progressValue(0), m_progressMinimum(0), m_progressMaximum(0),
473 state(initialState),
474 manualProgress(false), m_expectedResultCount(0), runnable(nullptr), m_pool(nullptr)
475{
476 progressTime.invalidate();
477}
478
479int QFutureInterfaceBasePrivate::internal_resultCount() const
480{
481 return m_results.count(); // ### subtract canceled results.
482}
483
484bool QFutureInterfaceBasePrivate::internal_isResultReadyAt(int index) const
485{
486 return (m_results.contains(index));
487}
488
489bool QFutureInterfaceBasePrivate::internal_waitForNextResult()
490{
491 if (m_results.hasNextResult())
492 return true;
493
494 while ((state.loadRelaxed() & QFutureInterfaceBase::Running) && m_results.hasNextResult() == false)
495 waitCondition.wait(lockedMutex: &m_mutex);
496
497 return !(state.loadRelaxed() & QFutureInterfaceBase::Canceled) && m_results.hasNextResult();
498}
499
500bool QFutureInterfaceBasePrivate::internal_updateProgress(int progress,
501 const QString &progressText)
502{
503 if (m_progressValue >= progress)
504 return false;
505
506 m_progressValue = progress;
507 m_progressText = progressText;
508
509 if (progressTime.isValid() && m_progressValue != m_progressMaximum) // make sure the first and last steps are emitted.
510 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
511 return false;
512
513 progressTime.start();
514 return true;
515}
516
517void QFutureInterfaceBasePrivate::internal_setThrottled(bool enable)
518{
519 // bail out if we are not changing the state
520 if ((enable && (state.loadRelaxed() & QFutureInterfaceBase::Throttled))
521 || (!enable && !(state.loadRelaxed() & QFutureInterfaceBase::Throttled)))
522 return;
523
524 // change the state
525 if (enable) {
526 switch_on(a&: state, which: QFutureInterfaceBase::Throttled);
527 } else {
528 switch_off(a&: state, which: QFutureInterfaceBase::Throttled);
529 if (!(state.loadRelaxed() & QFutureInterfaceBase::Paused))
530 pausedWaitCondition.wakeAll();
531 }
532}
533
534void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent)
535{
536 if (outputConnections.isEmpty())
537 return;
538
539 for (int i = 0; i < outputConnections.count(); ++i)
540 outputConnections.at(i)->postCallOutEvent(callOutEvent);
541}
542
543void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1,
544 const QFutureCallOutEvent &callOutEvent2)
545{
546 if (outputConnections.isEmpty())
547 return;
548
549 for (int i = 0; i < outputConnections.count(); ++i) {
550 QFutureCallOutInterface *interface = outputConnections.at(i);
551 interface->postCallOutEvent(callOutEvent1);
552 interface->postCallOutEvent(callOutEvent2);
553 }
554}
555
556// This function connects an output interface (for example a QFutureWatcher)
557// to this future. While holding the lock we check the state and ready results
558// and add the appropriate callouts to the queue. In order to avoid deadlocks,
559// the actual callouts are made at the end while not holding the lock.
560void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *interface)
561{
562 QMutexLocker locker(&m_mutex);
563
564 if (state.loadRelaxed() & QFutureInterfaceBase::Started) {
565 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
566 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
567 m_progressMinimum,
568 m_progressMaximum));
569 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
570 m_progressValue,
571 m_progressText));
572 }
573
574 QtPrivate::ResultIteratorBase it = m_results.begin();
575 while (it != m_results.end()) {
576 const int begin = it.resultIndex();
577 const int end = begin + it.batchSize();
578 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
579 begin,
580 end));
581 it.batchedAdvance();
582 }
583
584 if (state.loadRelaxed() & QFutureInterfaceBase::Paused)
585 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
586
587 if (state.loadRelaxed() & QFutureInterfaceBase::Canceled)
588 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
589
590 if (state.loadRelaxed() & QFutureInterfaceBase::Finished)
591 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
592
593 outputConnections.append(t: interface);
594}
595
596void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *interface)
597{
598 QMutexLocker lock(&m_mutex);
599 const int index = outputConnections.indexOf(t: interface);
600 if (index == -1)
601 return;
602 outputConnections.removeAt(i: index);
603
604 interface->callOutInterfaceDisconnected();
605}
606
607void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
608{
609 state.storeRelaxed(newValue: newState);
610}
611
612QT_END_NAMESPACE
613

source code of qtbase/src/corelib/thread/qfutureinterface.cpp