1/* -*- C++ -*-
2 This file implements the WeaverImpl class.
3
4 SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org>
5
6 SPDX-License-Identifier: LGPL-2.0-or-later
7
8 $Id: WeaverImpl.cpp 30 2005-08-16 16:16:04Z mirko $
9*/
10
11#include "weaver.h"
12
13#include <QCoreApplication>
14#include <QDebug>
15#include <QMutex>
16#include <QDeadlineTimer>
17#include "debuggingaids.h"
18#include "destructedstate.h"
19#include "exception.h"
20#include "inconstructionstate.h"
21#include "job.h"
22#include "managedjobpointer.h"
23#include "queuepolicy.h"
24#include "shuttingdownstate.h"
25#include "state.h"
26#include "suspendedstate.h"
27#include "suspendingstate.h"
28#include "thread.h"
29#include "threadweaver.h"
30#include "weaver_p.h"
31#include "workinghardstate.h"
32
33using namespace ThreadWeaver;
34
35/** @brief Constructs a Weaver object. */
36Weaver::Weaver(QObject *parent)
37 : QueueAPI(new Private::Weaver_Private(), parent)
38{
39 qRegisterMetaType<ThreadWeaver::JobPointer>(typeName: "ThreadWeaver::JobPointer");
40 QMutexLocker l(d()->mutex);
41 Q_UNUSED(l);
42 // initialize state objects:
43 d()->states[InConstruction] = QSharedPointer<State>(new InConstructionState(this));
44 setState_p(InConstruction);
45 d()->states[WorkingHard] = QSharedPointer<State>(new WorkingHardState(this));
46 d()->states[Suspending] = QSharedPointer<State>(new SuspendingState(this));
47 d()->states[Suspended] = QSharedPointer<State>(new SuspendedState(this));
48 d()->states[ShuttingDown] = QSharedPointer<State>(new ShuttingDownState(this));
49 d()->states[Destructed] = QSharedPointer<State>(new DestructedState(this));
50
51 setState_p(WorkingHard);
52}
53
54/** @brief Destructs a Weaver object. */
55Weaver::~Weaver()
56{
57 Q_ASSERT_X(state()->stateId() == Destructed, Q_FUNC_INFO, "shutDown() method was not called before Weaver destructor!");
58}
59
60/** @brief Enter Destructed state.
61 *
62 * Once this method returns, it is save to delete this object.
63 */
64void Weaver::shutDown()
65{
66 state()->shutDown();
67}
68
69void Weaver::shutDown_p()
70{
71 // the constructor may only be called from the thread that owns this
72 // object (everything else would be what we professionals call "insane")
73
74 REQUIRE(QThread::currentThread() == thread());
75 TWDEBUG(3, "WeaverImpl::shutDown: destroying inventory.\n");
76 d()->semaphore.acquire(n: d()->createdThreads.loadAcquire());
77 finish();
78 suspend();
79 setState(ShuttingDown);
80 reschedule();
81 d()->jobFinished.wakeAll();
82
83 // problem: Some threads might not be asleep yet, just finding
84 // out if a job is available. Those threads will suspend
85 // waiting for their next job (a rare case, but not impossible).
86 // Therefore, if we encounter a thread that has not exited, we
87 // have to wake it again (which we do in the following for
88 // loop).
89
90 for (;;) {
91 Thread *th = nullptr;
92 {
93 QMutexLocker l(d()->mutex);
94 Q_UNUSED(l);
95 if (d()->inventory.isEmpty()) {
96 break;
97 }
98 th = d()->inventory.takeFirst();
99 }
100 if (!th->isFinished()) {
101 for (;;) {
102 Q_ASSERT(state()->stateId() == ShuttingDown);
103 reschedule();
104 if (th->wait(time: 100)) {
105 break;
106 }
107 TWDEBUG(1,
108 "WeaverImpl::shutDown: thread %i did not exit as expected, "
109 "retrying.\n",
110 th->id());
111 }
112 }
113 Q_EMIT(threadExited(th));
114 delete th;
115 }
116 Q_ASSERT(d()->inventory.isEmpty());
117 TWDEBUG(3, "WeaverImpl::shutDown: done\n");
118 setState(Destructed); // Destructed ignores all calls into the queue API
119}
120
121/** @brief Set the Weaver state.
122 * @see StateId
123 * @see WeaverImplState
124 * @see State
125 */
126void Weaver::setState(StateId id)
127{
128 QMutexLocker l(d()->mutex);
129 Q_UNUSED(l);
130 setState_p(id);
131}
132
133void Weaver::setState_p(StateId id)
134{
135 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
136 State *newState = d()->states[id].data();
137 State *previous = d()->state.fetchAndStoreOrdered(newValue: newState);
138 if (previous == nullptr || previous->stateId() != id) {
139 newState->activated();
140 TWDEBUG(2, "WeaverImpl::setState: state changed to \"%s\".\n", newState->stateName().toLatin1().constData());
141 if (id == Suspended) {
142 Q_EMIT(suspended());
143 }
144 Q_EMIT(stateChanged(newState));
145 }
146}
147
148const State *Weaver::state() const
149{
150 return d()->state.loadAcquire();
151}
152
153State *Weaver::state()
154{
155 return d()->state.loadAcquire();
156}
157
158void Weaver::setMaximumNumberOfThreads(int cap)
159{
160 Q_ASSERT_X(cap >= 0, "Weaver Impl", "Thread inventory size has to be larger than or equal to zero.");
161 QMutexLocker l(d()->mutex);
162 Q_UNUSED(l);
163 state()->setMaximumNumberOfThreads(cap);
164 reschedule();
165}
166
167void Weaver::setMaximumNumberOfThreads_p(int cap)
168{
169 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
170 const bool createInitialThread = (d()->inventoryMax == 0 && cap > 0);
171 d()->inventoryMax = cap;
172 if (createInitialThread) {
173 adjustInventory(noOfNewJobs: 1);
174 }
175}
176
177int Weaver::maximumNumberOfThreads() const
178{
179 QMutexLocker l(d()->mutex);
180 Q_UNUSED(l);
181 return state()->maximumNumberOfThreads();
182}
183
184int Weaver::maximumNumberOfThreads_p() const
185{
186 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
187 return d()->inventoryMax;
188}
189
190int Weaver::currentNumberOfThreads() const
191{
192 QMutexLocker l(d()->mutex);
193 Q_UNUSED(l);
194 return state()->currentNumberOfThreads();
195}
196
197int Weaver::currentNumberOfThreads_p() const
198{
199 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
200 return d()->inventory.count();
201}
202
203void Weaver::enqueue(const QList<JobPointer> &jobs)
204{
205 QMutexLocker l(d()->mutex);
206 Q_UNUSED(l);
207 state()->enqueue(jobs);
208}
209
210void Weaver::enqueue_p(const QList<JobPointer> &jobs)
211{
212 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
213 if (jobs.isEmpty()) {
214 return;
215 }
216 for (const JobPointer &job : jobs) {
217 if (job) {
218 Q_ASSERT(job->status() == Job::Status_New);
219 adjustInventory(noOfNewJobs: jobs.size());
220 TWDEBUG(3, "WeaverImpl::enqueue: queueing job %p.\n", (void *)job.data());
221 job->aboutToBeQueued(api: this);
222 // find position for insertion:
223 int i = d()->assignments.size();
224 if (i > 0) {
225 while (i > 0 && d()->assignments.at(i: i - 1)->priority() < job->priority()) {
226 --i;
227 }
228 d()->assignments.insert(i, t: job);
229 } else {
230 d()->assignments.append(t: job);
231 }
232 job->setStatus(Job::Status_Queued);
233 reschedule();
234 }
235 }
236}
237
238bool Weaver::dequeue(const JobPointer &job)
239{
240 QMutexLocker l(d()->mutex);
241 Q_UNUSED(l);
242 return state()->dequeue(job);
243}
244
245bool Weaver::dequeue_p(JobPointer job)
246{
247 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
248 int position = d()->assignments.indexOf(t: job);
249 if (position != -1) {
250 job->aboutToBeDequeued(api: this);
251 int newPosition = d()->assignments.indexOf(t: job);
252 JobPointer job = d()->assignments.takeAt(i: newPosition);
253 job->setStatus(Job::Status_New);
254 Q_ASSERT(!d()->assignments.contains(job));
255 TWDEBUG(3, "WeaverImpl::dequeue: job %p dequeued, %i jobs left.\n", (void *)job.data(), queueLength_p());
256 // from the queues point of view, a job is just as finished if it gets dequeued:
257 d()->jobFinished.wakeAll();
258 Q_ASSERT(!d()->assignments.contains(job));
259 return true;
260 } else {
261 TWDEBUG(3, "WeaverImpl::dequeue: job %p not found in queue.\n", (void *)job.data());
262 return false;
263 }
264}
265
266void Weaver::dequeue()
267{
268 QMutexLocker l(d()->mutex);
269 Q_UNUSED(l);
270 state()->dequeue();
271}
272
273void Weaver::dequeue_p()
274{
275 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
276 TWDEBUG(3, "WeaverImpl::dequeue: dequeueing all jobs.\n");
277 for (int index = 0; index < d()->assignments.size(); ++index) {
278 d()->assignments.at(i: index)->aboutToBeDequeued(api: this);
279 }
280 d()->assignments.clear();
281 ENSURE(d()->assignments.isEmpty());
282}
283
284void Weaver::finish()
285{
286 QMutexLocker l(d()->mutex);
287 Q_UNUSED(l);
288 state()->finish();
289}
290
291void Weaver::finish_p()
292{
293 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
294#ifdef QT_NO_DEBUG
295 const int MaxWaitMilliSeconds = 50;
296#else
297 const int MaxWaitMilliSeconds = 500;
298#endif
299 while (!isIdle_p()) {
300 Q_ASSERT_X(state()->stateId() == WorkingHard, Q_FUNC_INFO, qPrintable(state()->stateName()));
301 TWDEBUG(2, "WeaverImpl::finish: not done, waiting.\n");
302 if (d()->jobFinished.wait(lockedMutex: d()->mutex, deadline: QDeadlineTimer(MaxWaitMilliSeconds)) == false) {
303 TWDEBUG(2, "WeaverImpl::finish: wait timed out, %i jobs left, waking threads.\n", queueLength_p());
304 reschedule();
305 }
306 }
307 TWDEBUG(2, "WeaverImpl::finish: done.\n\n\n");
308}
309
310void Weaver::suspend()
311{
312 // FIXME?
313 // QMutexLocker l(m_mutex); Q_UNUSED(l);
314 state()->suspend();
315}
316
317void Weaver::suspend_p()
318{
319 // FIXME ?
320}
321
322void Weaver::resume()
323{
324 // FIXME?
325 // QMutexLocker l(m_mutex); Q_UNUSED(l);
326 state()->resume();
327}
328
329void Weaver::resume_p()
330{
331 // FIXME ?
332}
333
334bool Weaver::isEmpty() const
335{
336 QMutexLocker l(d()->mutex);
337 Q_UNUSED(l);
338 return state()->isEmpty();
339}
340
341bool Weaver::isEmpty_p() const
342{
343 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
344 return d()->assignments.isEmpty();
345}
346
347bool Weaver::isIdle() const
348{
349 QMutexLocker l(d()->mutex);
350 Q_UNUSED(l);
351 return state()->isIdle();
352}
353
354bool Weaver::isIdle_p() const
355{
356 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
357 return isEmpty_p() && d()->active == 0;
358}
359
360int Weaver::queueLength() const
361{
362 QMutexLocker l(d()->mutex);
363 Q_UNUSED(l);
364 return state()->queueLength();
365}
366
367int Weaver::queueLength_p() const
368{
369 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
370 return d()->assignments.count();
371}
372
373void Weaver::requestAbort()
374{
375 QMutexLocker l(d()->mutex);
376 Q_UNUSED(l);
377 return state()->requestAbort();
378}
379
380void Weaver::reschedule()
381{
382 d()->jobAvailable.wakeAll();
383}
384
385void Weaver::requestAbort_p()
386{
387 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
388 for (int i = 0; i < d()->inventory.size(); ++i) {
389 d()->inventory[i]->requestAbort();
390 }
391}
392
393/** @brief Adjust the inventory size.
394 *
395 * Requires that the mutex is being held when called.
396 *
397 * This method creates threads on demand. Threads in the inventory
398 * are not created upon construction of the WeaverImpl object, but
399 * when jobs are queued. This avoids costly delays on the application
400 * startup time. Threads are created when the inventory size is under
401 * inventoryMin and new jobs are queued.
402 */
403void Weaver::adjustInventory(int numberOfNewJobs)
404{
405 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
406 // number of threads that can be created:
407 const int reserve = d()->inventoryMax - d()->inventory.count();
408
409 if (reserve > 0) {
410 for (int i = 0; i < qMin(a: reserve, b: numberOfNewJobs); ++i) {
411 Thread *th = createThread();
412 th->moveToThread(thread: th); // be sane from the start
413 d()->inventory.append(t: th);
414 th->start();
415 d()->createdThreads.ref();
416 TWDEBUG(2,
417 "WeaverImpl::adjustInventory: thread created, "
418 "%i threads in inventory.\n",
419 currentNumberOfThreads_p());
420 }
421 }
422}
423
424Private::Weaver_Private *Weaver::d()
425{
426 return reinterpret_cast<Private::Weaver_Private *>(QueueSignals::d());
427}
428
429const Private::Weaver_Private *Weaver::d() const
430{
431 return reinterpret_cast<const Private::Weaver_Private *>(QueueSignals::d());
432}
433
434/** @brief Factory method to create the threads.
435 *
436 * Overload in adapted Weaver implementations.
437 */
438Thread *Weaver::createThread()
439{
440 return new Thread(this);
441}
442
443/** @brief Increment the count of active threads. */
444void Weaver::incActiveThreadCount()
445{
446 adjustActiveThreadCount(diff: 1);
447}
448
449/** brief Decrement the count of active threads. */
450void Weaver::decActiveThreadCount()
451{
452 adjustActiveThreadCount(diff: -1);
453 // the done job could have freed another set of jobs, and we do not know how
454 // many - therefore we need to wake all threads:
455 d()->jobFinished.wakeAll();
456}
457
458/** @brief Adjust active thread count.
459 *
460 * This is a helper function for incActiveThreadCount and decActiveThreadCount.
461 */
462void Weaver::adjustActiveThreadCount(int diff)
463{
464 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
465 d()->active += diff;
466 TWDEBUG(4,
467 "WeaverImpl::adjustActiveThreadCount: %i active threads (%i jobs"
468 " in queue).\n",
469 d()->active,
470 queueLength_p());
471
472 if (d()->assignments.isEmpty() && d()->active == 0) {
473 P_ASSERT(diff < 0); // cannot reach zero otherwise
474 Q_EMIT(finished());
475 }
476}
477
478/** @brief Returns the number of active threads.
479 *
480 * Threads are active if they process a job. Requires that the mutex is being held when called.
481 */
482int Weaver::activeThreadCount()
483{
484 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
485 return d()->active;
486}
487
488/** @brief Called from a new thread when entering the run method. */
489void Weaver::threadEnteredRun(Thread *thread)
490{
491 d()->semaphore.release(n: 1);
492 Q_EMIT threadStarted(thread);
493}
494
495/** @brief Take the first available job out of the queue and return it.
496 *
497 * The job will be removed from the queue (therefore, take). Only jobs that have no unresolved dependencies
498 * are considered available. If only jobs that depend on other unfinished jobs are in the queue, this method
499 * blocks on m_jobAvailable.
500 *
501 * This method will enter suspended state if the active thread count is now zero and
502 * suspendIfAllThreadsInactive is true.
503 * If justReturning is true, do not assign a new job, just process the completed previous one.
504 */
505JobPointer Weaver::takeFirstAvailableJobOrSuspendOrWait(Thread *th, bool threadWasBusy, bool suspendIfInactive, bool justReturning)
506{
507 QMutexLocker l(d()->mutex);
508 Q_UNUSED(l);
509 Q_ASSERT(threadWasBusy == false || (threadWasBusy == true && d()->active > 0));
510 TWDEBUG(3, "WeaverImpl::takeFirstAvailableJobOrWait: trying to assign new job to thread %i (%s state).\n", th->id(), qPrintable(state()->stateName()));
511 TWDEBUG(5,
512 "WeaverImpl::takeFirstAvailableJobOrWait: %i active threads, was busy: %s, suspend: %s, assign new job: %s.\n",
513 activeThreadCount(),
514 threadWasBusy ? "yes" : "no",
515 suspendIfInactive ? "yes" : "no",
516 !justReturning ? "yes" : "no");
517 d()->deleteExpiredThreads();
518 adjustInventory(numberOfNewJobs: 1);
519
520 if (threadWasBusy) {
521 // cleanup and send events:
522 decActiveThreadCount();
523 }
524 Q_ASSERT(d()->active >= 0);
525
526 if (suspendIfInactive && d()->active == 0 && state()->stateId() == Suspending) {
527 setState_p(Suspended);
528 return JobPointer();
529 }
530
531 if (state()->stateId() != WorkingHard || justReturning) {
532 return JobPointer();
533 }
534
535 if (state()->stateId() == WorkingHard && d()->inventory.size() > d()->inventoryMax) {
536 const int count = d()->inventory.removeAll(t: th);
537 Q_ASSERT(count == 1);
538 d()->expiredThreads.append(t: th);
539 throw AbortThread(QStringLiteral("Inventory size exceeded"));
540 }
541
542 JobPointer next;
543 for (int index = 0; index < d()->assignments.size(); ++index) {
544 const JobPointer &candidate = d()->assignments.at(i: index);
545 if (d()->canBeExecuted(candidate)) {
546 next = candidate;
547 d()->assignments.removeAt(i: index);
548 break;
549 }
550 }
551 if (next) {
552 incActiveThreadCount();
553 TWDEBUG(3,
554 "WeaverImpl::takeFirstAvailableJobOrWait: job %p assigned to thread %i (%s state).\n",
555 next.data(),
556 th->id(),
557 qPrintable(state()->stateName()));
558 return next;
559 }
560
561 blockThreadUntilJobsAreBeingAssigned_locked(th);
562 return JobPointer();
563}
564
565/** @brief Assign a job to the calling thread.
566 *
567 * This is supposed to be called from the Thread objects in the inventory. Do not call this method from
568 * your code.
569 * Returns 0 if the weaver is shutting down, telling the calling thread to finish and exit. If no jobs are
570 * available and shut down is not in progress, the calling thread is suspended until either condition is met.
571 * @param wasBusy True if the thread is returning from processing a job
572 */
573JobPointer Weaver::applyForWork(Thread *th, bool wasBusy)
574{
575 return state()->applyForWork(th, wasBusy);
576}
577
578/** @brief Wait for a job to become available. */
579void Weaver::waitForAvailableJob(Thread *th)
580{
581 state()->waitForAvailableJob(th);
582}
583
584/** @brief Blocks the calling thread until jobs can be assigned. */
585void Weaver::blockThreadUntilJobsAreBeingAssigned(Thread *th)
586{
587 QMutexLocker l(d()->mutex);
588 Q_UNUSED(l);
589 blockThreadUntilJobsAreBeingAssigned_locked(th);
590}
591
592/** @brief Blocks the calling thread until jobs can be assigned.
593 *
594 * The mutex must be held when calling this method.
595 */
596void Weaver::blockThreadUntilJobsAreBeingAssigned_locked(Thread *th)
597{
598 Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called
599 TWDEBUG(4, "WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i blocked (%s state).\n", th->id(), qPrintable(state()->stateName()));
600 Q_EMIT threadSuspended(th);
601 d()->jobAvailable.wait(lockedMutex: d()->mutex);
602 TWDEBUG(4, "WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i resumed (%s state).\n", th->id(), qPrintable(state()->stateName()));
603}
604
605#include "moc_weaver.cpp"
606

source code of threadweaver/src/weaver.cpp