| 1 | /* -*- C++ -*- |
| 2 | This file implements the WeaverImpl class. |
| 3 | |
| 4 | SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org> |
| 5 | |
| 6 | SPDX-License-Identifier: LGPL-2.0-or-later |
| 7 | |
| 8 | $Id: WeaverImpl.cpp 30 2005-08-16 16:16:04Z mirko $ |
| 9 | */ |
| 10 | |
| 11 | #include "weaver.h" |
| 12 | |
| 13 | #include <QCoreApplication> |
| 14 | #include <QDebug> |
| 15 | #include <QMutex> |
| 16 | #include <QDeadlineTimer> |
| 17 | #include "debuggingaids.h" |
| 18 | #include "destructedstate.h" |
| 19 | #include "exception.h" |
| 20 | #include "inconstructionstate.h" |
| 21 | #include "job.h" |
| 22 | #include "managedjobpointer.h" |
| 23 | #include "queuepolicy.h" |
| 24 | #include "shuttingdownstate.h" |
| 25 | #include "state.h" |
| 26 | #include "suspendedstate.h" |
| 27 | #include "suspendingstate.h" |
| 28 | #include "thread.h" |
| 29 | #include "threadweaver.h" |
| 30 | #include "weaver_p.h" |
| 31 | #include "workinghardstate.h" |
| 32 | |
| 33 | using namespace ThreadWeaver; |
| 34 | |
| 35 | /** @brief Constructs a Weaver object. */ |
| 36 | Weaver::Weaver(QObject *parent) |
| 37 | : QueueAPI(new Private::Weaver_Private(), parent) |
| 38 | { |
| 39 | qRegisterMetaType<ThreadWeaver::JobPointer>(typeName: "ThreadWeaver::JobPointer" ); |
| 40 | QMutexLocker l(d()->mutex); |
| 41 | Q_UNUSED(l); |
| 42 | // initialize state objects: |
| 43 | d()->states[InConstruction] = QSharedPointer<State>(new InConstructionState(this)); |
| 44 | setState_p(InConstruction); |
| 45 | d()->states[WorkingHard] = QSharedPointer<State>(new WorkingHardState(this)); |
| 46 | d()->states[Suspending] = QSharedPointer<State>(new SuspendingState(this)); |
| 47 | d()->states[Suspended] = QSharedPointer<State>(new SuspendedState(this)); |
| 48 | d()->states[ShuttingDown] = QSharedPointer<State>(new ShuttingDownState(this)); |
| 49 | d()->states[Destructed] = QSharedPointer<State>(new DestructedState(this)); |
| 50 | |
| 51 | setState_p(WorkingHard); |
| 52 | } |
| 53 | |
| 54 | /** @brief Destructs a Weaver object. */ |
| 55 | Weaver::~Weaver() |
| 56 | { |
| 57 | Q_ASSERT_X(state()->stateId() == Destructed, Q_FUNC_INFO, "shutDown() method was not called before Weaver destructor!" ); |
| 58 | } |
| 59 | |
| 60 | /** @brief Enter Destructed state. |
| 61 | * |
| 62 | * Once this method returns, it is save to delete this object. |
| 63 | */ |
| 64 | void Weaver::shutDown() |
| 65 | { |
| 66 | state()->shutDown(); |
| 67 | } |
| 68 | |
| 69 | void Weaver::shutDown_p() |
| 70 | { |
| 71 | // the constructor may only be called from the thread that owns this |
| 72 | // object (everything else would be what we professionals call "insane") |
| 73 | |
| 74 | REQUIRE(QThread::currentThread() == thread()); |
| 75 | TWDEBUG(3, "WeaverImpl::shutDown: destroying inventory.\n" ); |
| 76 | d()->semaphore.acquire(n: d()->createdThreads.loadAcquire()); |
| 77 | finish(); |
| 78 | suspend(); |
| 79 | setState(ShuttingDown); |
| 80 | reschedule(); |
| 81 | d()->jobFinished.wakeAll(); |
| 82 | |
| 83 | // problem: Some threads might not be asleep yet, just finding |
| 84 | // out if a job is available. Those threads will suspend |
| 85 | // waiting for their next job (a rare case, but not impossible). |
| 86 | // Therefore, if we encounter a thread that has not exited, we |
| 87 | // have to wake it again (which we do in the following for |
| 88 | // loop). |
| 89 | |
| 90 | for (;;) { |
| 91 | Thread *th = nullptr; |
| 92 | { |
| 93 | QMutexLocker l(d()->mutex); |
| 94 | Q_UNUSED(l); |
| 95 | if (d()->inventory.isEmpty()) { |
| 96 | break; |
| 97 | } |
| 98 | th = d()->inventory.takeFirst(); |
| 99 | } |
| 100 | if (!th->isFinished()) { |
| 101 | for (;;) { |
| 102 | Q_ASSERT(state()->stateId() == ShuttingDown); |
| 103 | reschedule(); |
| 104 | if (th->wait(time: 100)) { |
| 105 | break; |
| 106 | } |
| 107 | TWDEBUG(1, |
| 108 | "WeaverImpl::shutDown: thread %i did not exit as expected, " |
| 109 | "retrying.\n" , |
| 110 | th->id()); |
| 111 | } |
| 112 | } |
| 113 | Q_EMIT(threadExited(th)); |
| 114 | delete th; |
| 115 | } |
| 116 | Q_ASSERT(d()->inventory.isEmpty()); |
| 117 | TWDEBUG(3, "WeaverImpl::shutDown: done\n" ); |
| 118 | setState(Destructed); // Destructed ignores all calls into the queue API |
| 119 | } |
| 120 | |
| 121 | /** @brief Set the Weaver state. |
| 122 | * @see StateId |
| 123 | * @see WeaverImplState |
| 124 | * @see State |
| 125 | */ |
| 126 | void Weaver::setState(StateId id) |
| 127 | { |
| 128 | QMutexLocker l(d()->mutex); |
| 129 | Q_UNUSED(l); |
| 130 | setState_p(id); |
| 131 | } |
| 132 | |
| 133 | void Weaver::setState_p(StateId id) |
| 134 | { |
| 135 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 136 | State *newState = d()->states[id].data(); |
| 137 | State *previous = d()->state.fetchAndStoreOrdered(newValue: newState); |
| 138 | if (previous == nullptr || previous->stateId() != id) { |
| 139 | newState->activated(); |
| 140 | TWDEBUG(2, "WeaverImpl::setState: state changed to \"%s\".\n" , newState->stateName().toLatin1().constData()); |
| 141 | if (id == Suspended) { |
| 142 | Q_EMIT(suspended()); |
| 143 | } |
| 144 | Q_EMIT(stateChanged(newState)); |
| 145 | } |
| 146 | } |
| 147 | |
| 148 | const State *Weaver::state() const |
| 149 | { |
| 150 | return d()->state.loadAcquire(); |
| 151 | } |
| 152 | |
| 153 | State *Weaver::state() |
| 154 | { |
| 155 | return d()->state.loadAcquire(); |
| 156 | } |
| 157 | |
| 158 | void Weaver::setMaximumNumberOfThreads(int cap) |
| 159 | { |
| 160 | Q_ASSERT_X(cap >= 0, "Weaver Impl" , "Thread inventory size has to be larger than or equal to zero." ); |
| 161 | QMutexLocker l(d()->mutex); |
| 162 | Q_UNUSED(l); |
| 163 | state()->setMaximumNumberOfThreads(cap); |
| 164 | reschedule(); |
| 165 | } |
| 166 | |
| 167 | void Weaver::setMaximumNumberOfThreads_p(int cap) |
| 168 | { |
| 169 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 170 | const bool createInitialThread = (d()->inventoryMax == 0 && cap > 0); |
| 171 | d()->inventoryMax = cap; |
| 172 | if (createInitialThread) { |
| 173 | adjustInventory(noOfNewJobs: 1); |
| 174 | } |
| 175 | } |
| 176 | |
| 177 | int Weaver::maximumNumberOfThreads() const |
| 178 | { |
| 179 | QMutexLocker l(d()->mutex); |
| 180 | Q_UNUSED(l); |
| 181 | return state()->maximumNumberOfThreads(); |
| 182 | } |
| 183 | |
| 184 | int Weaver::maximumNumberOfThreads_p() const |
| 185 | { |
| 186 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 187 | return d()->inventoryMax; |
| 188 | } |
| 189 | |
| 190 | int Weaver::currentNumberOfThreads() const |
| 191 | { |
| 192 | QMutexLocker l(d()->mutex); |
| 193 | Q_UNUSED(l); |
| 194 | return state()->currentNumberOfThreads(); |
| 195 | } |
| 196 | |
| 197 | int Weaver::currentNumberOfThreads_p() const |
| 198 | { |
| 199 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 200 | return d()->inventory.count(); |
| 201 | } |
| 202 | |
| 203 | void Weaver::enqueue(const QList<JobPointer> &jobs) |
| 204 | { |
| 205 | QMutexLocker l(d()->mutex); |
| 206 | Q_UNUSED(l); |
| 207 | state()->enqueue(jobs); |
| 208 | } |
| 209 | |
| 210 | void Weaver::enqueue_p(const QList<JobPointer> &jobs) |
| 211 | { |
| 212 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 213 | if (jobs.isEmpty()) { |
| 214 | return; |
| 215 | } |
| 216 | for (const JobPointer &job : jobs) { |
| 217 | if (job) { |
| 218 | Q_ASSERT(job->status() == Job::Status_New); |
| 219 | adjustInventory(noOfNewJobs: jobs.size()); |
| 220 | TWDEBUG(3, "WeaverImpl::enqueue: queueing job %p.\n" , (void *)job.data()); |
| 221 | job->aboutToBeQueued(api: this); |
| 222 | // find position for insertion: |
| 223 | int i = d()->assignments.size(); |
| 224 | if (i > 0) { |
| 225 | while (i > 0 && d()->assignments.at(i: i - 1)->priority() < job->priority()) { |
| 226 | --i; |
| 227 | } |
| 228 | d()->assignments.insert(i, t: job); |
| 229 | } else { |
| 230 | d()->assignments.append(t: job); |
| 231 | } |
| 232 | job->setStatus(Job::Status_Queued); |
| 233 | reschedule(); |
| 234 | } |
| 235 | } |
| 236 | } |
| 237 | |
| 238 | bool Weaver::dequeue(const JobPointer &job) |
| 239 | { |
| 240 | QMutexLocker l(d()->mutex); |
| 241 | Q_UNUSED(l); |
| 242 | return state()->dequeue(job); |
| 243 | } |
| 244 | |
| 245 | bool Weaver::dequeue_p(JobPointer job) |
| 246 | { |
| 247 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 248 | int position = d()->assignments.indexOf(t: job); |
| 249 | if (position != -1) { |
| 250 | job->aboutToBeDequeued(api: this); |
| 251 | int newPosition = d()->assignments.indexOf(t: job); |
| 252 | JobPointer job = d()->assignments.takeAt(i: newPosition); |
| 253 | job->setStatus(Job::Status_New); |
| 254 | Q_ASSERT(!d()->assignments.contains(job)); |
| 255 | TWDEBUG(3, "WeaverImpl::dequeue: job %p dequeued, %i jobs left.\n" , (void *)job.data(), queueLength_p()); |
| 256 | // from the queues point of view, a job is just as finished if it gets dequeued: |
| 257 | d()->jobFinished.wakeAll(); |
| 258 | Q_ASSERT(!d()->assignments.contains(job)); |
| 259 | return true; |
| 260 | } else { |
| 261 | TWDEBUG(3, "WeaverImpl::dequeue: job %p not found in queue.\n" , (void *)job.data()); |
| 262 | return false; |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | void Weaver::dequeue() |
| 267 | { |
| 268 | QMutexLocker l(d()->mutex); |
| 269 | Q_UNUSED(l); |
| 270 | state()->dequeue(); |
| 271 | } |
| 272 | |
| 273 | void Weaver::dequeue_p() |
| 274 | { |
| 275 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 276 | TWDEBUG(3, "WeaverImpl::dequeue: dequeueing all jobs.\n" ); |
| 277 | for (int index = 0; index < d()->assignments.size(); ++index) { |
| 278 | d()->assignments.at(i: index)->aboutToBeDequeued(api: this); |
| 279 | } |
| 280 | d()->assignments.clear(); |
| 281 | ENSURE(d()->assignments.isEmpty()); |
| 282 | } |
| 283 | |
| 284 | void Weaver::finish() |
| 285 | { |
| 286 | QMutexLocker l(d()->mutex); |
| 287 | Q_UNUSED(l); |
| 288 | state()->finish(); |
| 289 | } |
| 290 | |
| 291 | void Weaver::finish_p() |
| 292 | { |
| 293 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 294 | #ifdef QT_NO_DEBUG |
| 295 | const int MaxWaitMilliSeconds = 50; |
| 296 | #else |
| 297 | const int MaxWaitMilliSeconds = 500; |
| 298 | #endif |
| 299 | while (!isIdle_p()) { |
| 300 | Q_ASSERT_X(state()->stateId() == WorkingHard, Q_FUNC_INFO, qPrintable(state()->stateName())); |
| 301 | TWDEBUG(2, "WeaverImpl::finish: not done, waiting.\n" ); |
| 302 | if (d()->jobFinished.wait(lockedMutex: d()->mutex, deadline: QDeadlineTimer(MaxWaitMilliSeconds)) == false) { |
| 303 | TWDEBUG(2, "WeaverImpl::finish: wait timed out, %i jobs left, waking threads.\n" , queueLength_p()); |
| 304 | reschedule(); |
| 305 | } |
| 306 | } |
| 307 | TWDEBUG(2, "WeaverImpl::finish: done.\n\n\n" ); |
| 308 | } |
| 309 | |
| 310 | void Weaver::suspend() |
| 311 | { |
| 312 | // FIXME? |
| 313 | // QMutexLocker l(m_mutex); Q_UNUSED(l); |
| 314 | state()->suspend(); |
| 315 | } |
| 316 | |
| 317 | void Weaver::suspend_p() |
| 318 | { |
| 319 | // FIXME ? |
| 320 | } |
| 321 | |
| 322 | void Weaver::resume() |
| 323 | { |
| 324 | // FIXME? |
| 325 | // QMutexLocker l(m_mutex); Q_UNUSED(l); |
| 326 | state()->resume(); |
| 327 | } |
| 328 | |
| 329 | void Weaver::resume_p() |
| 330 | { |
| 331 | // FIXME ? |
| 332 | } |
| 333 | |
| 334 | bool Weaver::isEmpty() const |
| 335 | { |
| 336 | QMutexLocker l(d()->mutex); |
| 337 | Q_UNUSED(l); |
| 338 | return state()->isEmpty(); |
| 339 | } |
| 340 | |
| 341 | bool Weaver::isEmpty_p() const |
| 342 | { |
| 343 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 344 | return d()->assignments.isEmpty(); |
| 345 | } |
| 346 | |
| 347 | bool Weaver::isIdle() const |
| 348 | { |
| 349 | QMutexLocker l(d()->mutex); |
| 350 | Q_UNUSED(l); |
| 351 | return state()->isIdle(); |
| 352 | } |
| 353 | |
| 354 | bool Weaver::isIdle_p() const |
| 355 | { |
| 356 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 357 | return isEmpty_p() && d()->active == 0; |
| 358 | } |
| 359 | |
| 360 | int Weaver::queueLength() const |
| 361 | { |
| 362 | QMutexLocker l(d()->mutex); |
| 363 | Q_UNUSED(l); |
| 364 | return state()->queueLength(); |
| 365 | } |
| 366 | |
| 367 | int Weaver::queueLength_p() const |
| 368 | { |
| 369 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 370 | return d()->assignments.count(); |
| 371 | } |
| 372 | |
| 373 | void Weaver::requestAbort() |
| 374 | { |
| 375 | QMutexLocker l(d()->mutex); |
| 376 | Q_UNUSED(l); |
| 377 | return state()->requestAbort(); |
| 378 | } |
| 379 | |
| 380 | void Weaver::reschedule() |
| 381 | { |
| 382 | d()->jobAvailable.wakeAll(); |
| 383 | } |
| 384 | |
| 385 | void Weaver::requestAbort_p() |
| 386 | { |
| 387 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 388 | for (int i = 0; i < d()->inventory.size(); ++i) { |
| 389 | d()->inventory[i]->requestAbort(); |
| 390 | } |
| 391 | } |
| 392 | |
| 393 | /** @brief Adjust the inventory size. |
| 394 | * |
| 395 | * Requires that the mutex is being held when called. |
| 396 | * |
| 397 | * This method creates threads on demand. Threads in the inventory |
| 398 | * are not created upon construction of the WeaverImpl object, but |
| 399 | * when jobs are queued. This avoids costly delays on the application |
| 400 | * startup time. Threads are created when the inventory size is under |
| 401 | * inventoryMin and new jobs are queued. |
| 402 | */ |
| 403 | void Weaver::adjustInventory(int numberOfNewJobs) |
| 404 | { |
| 405 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 406 | // number of threads that can be created: |
| 407 | const int reserve = d()->inventoryMax - d()->inventory.count(); |
| 408 | |
| 409 | if (reserve > 0) { |
| 410 | for (int i = 0; i < qMin(a: reserve, b: numberOfNewJobs); ++i) { |
| 411 | Thread *th = createThread(); |
| 412 | th->moveToThread(thread: th); // be sane from the start |
| 413 | d()->inventory.append(t: th); |
| 414 | th->start(); |
| 415 | d()->createdThreads.ref(); |
| 416 | TWDEBUG(2, |
| 417 | "WeaverImpl::adjustInventory: thread created, " |
| 418 | "%i threads in inventory.\n" , |
| 419 | currentNumberOfThreads_p()); |
| 420 | } |
| 421 | } |
| 422 | } |
| 423 | |
| 424 | Private::Weaver_Private *Weaver::d() |
| 425 | { |
| 426 | return reinterpret_cast<Private::Weaver_Private *>(QueueSignals::d()); |
| 427 | } |
| 428 | |
| 429 | const Private::Weaver_Private *Weaver::d() const |
| 430 | { |
| 431 | return reinterpret_cast<const Private::Weaver_Private *>(QueueSignals::d()); |
| 432 | } |
| 433 | |
| 434 | /** @brief Factory method to create the threads. |
| 435 | * |
| 436 | * Overload in adapted Weaver implementations. |
| 437 | */ |
| 438 | Thread *Weaver::createThread() |
| 439 | { |
| 440 | return new Thread(this); |
| 441 | } |
| 442 | |
| 443 | /** @brief Increment the count of active threads. */ |
| 444 | void Weaver::incActiveThreadCount() |
| 445 | { |
| 446 | adjustActiveThreadCount(diff: 1); |
| 447 | } |
| 448 | |
| 449 | /** brief Decrement the count of active threads. */ |
| 450 | void Weaver::decActiveThreadCount() |
| 451 | { |
| 452 | adjustActiveThreadCount(diff: -1); |
| 453 | // the done job could have freed another set of jobs, and we do not know how |
| 454 | // many - therefore we need to wake all threads: |
| 455 | d()->jobFinished.wakeAll(); |
| 456 | } |
| 457 | |
| 458 | /** @brief Adjust active thread count. |
| 459 | * |
| 460 | * This is a helper function for incActiveThreadCount and decActiveThreadCount. |
| 461 | */ |
| 462 | void Weaver::adjustActiveThreadCount(int diff) |
| 463 | { |
| 464 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 465 | d()->active += diff; |
| 466 | TWDEBUG(4, |
| 467 | "WeaverImpl::adjustActiveThreadCount: %i active threads (%i jobs" |
| 468 | " in queue).\n" , |
| 469 | d()->active, |
| 470 | queueLength_p()); |
| 471 | |
| 472 | if (d()->assignments.isEmpty() && d()->active == 0) { |
| 473 | P_ASSERT(diff < 0); // cannot reach zero otherwise |
| 474 | Q_EMIT(finished()); |
| 475 | } |
| 476 | } |
| 477 | |
| 478 | /** @brief Returns the number of active threads. |
| 479 | * |
| 480 | * Threads are active if they process a job. Requires that the mutex is being held when called. |
| 481 | */ |
| 482 | int Weaver::activeThreadCount() |
| 483 | { |
| 484 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 485 | return d()->active; |
| 486 | } |
| 487 | |
| 488 | /** @brief Called from a new thread when entering the run method. */ |
| 489 | void Weaver::threadEnteredRun(Thread *thread) |
| 490 | { |
| 491 | d()->semaphore.release(n: 1); |
| 492 | Q_EMIT threadStarted(thread); |
| 493 | } |
| 494 | |
| 495 | /** @brief Take the first available job out of the queue and return it. |
| 496 | * |
| 497 | * The job will be removed from the queue (therefore, take). Only jobs that have no unresolved dependencies |
| 498 | * are considered available. If only jobs that depend on other unfinished jobs are in the queue, this method |
| 499 | * blocks on m_jobAvailable. |
| 500 | * |
| 501 | * This method will enter suspended state if the active thread count is now zero and |
| 502 | * suspendIfAllThreadsInactive is true. |
| 503 | * If justReturning is true, do not assign a new job, just process the completed previous one. |
| 504 | */ |
| 505 | JobPointer Weaver::takeFirstAvailableJobOrSuspendOrWait(Thread *th, bool threadWasBusy, bool suspendIfInactive, bool justReturning) |
| 506 | { |
| 507 | QMutexLocker l(d()->mutex); |
| 508 | Q_UNUSED(l); |
| 509 | Q_ASSERT(threadWasBusy == false || (threadWasBusy == true && d()->active > 0)); |
| 510 | TWDEBUG(3, "WeaverImpl::takeFirstAvailableJobOrWait: trying to assign new job to thread %i (%s state).\n" , th->id(), qPrintable(state()->stateName())); |
| 511 | TWDEBUG(5, |
| 512 | "WeaverImpl::takeFirstAvailableJobOrWait: %i active threads, was busy: %s, suspend: %s, assign new job: %s.\n" , |
| 513 | activeThreadCount(), |
| 514 | threadWasBusy ? "yes" : "no" , |
| 515 | suspendIfInactive ? "yes" : "no" , |
| 516 | !justReturning ? "yes" : "no" ); |
| 517 | d()->deleteExpiredThreads(); |
| 518 | adjustInventory(numberOfNewJobs: 1); |
| 519 | |
| 520 | if (threadWasBusy) { |
| 521 | // cleanup and send events: |
| 522 | decActiveThreadCount(); |
| 523 | } |
| 524 | Q_ASSERT(d()->active >= 0); |
| 525 | |
| 526 | if (suspendIfInactive && d()->active == 0 && state()->stateId() == Suspending) { |
| 527 | setState_p(Suspended); |
| 528 | return JobPointer(); |
| 529 | } |
| 530 | |
| 531 | if (state()->stateId() != WorkingHard || justReturning) { |
| 532 | return JobPointer(); |
| 533 | } |
| 534 | |
| 535 | if (state()->stateId() == WorkingHard && d()->inventory.size() > d()->inventoryMax) { |
| 536 | const int count = d()->inventory.removeAll(t: th); |
| 537 | Q_ASSERT(count == 1); |
| 538 | d()->expiredThreads.append(t: th); |
| 539 | throw AbortThread(QStringLiteral("Inventory size exceeded" )); |
| 540 | } |
| 541 | |
| 542 | JobPointer next; |
| 543 | for (int index = 0; index < d()->assignments.size(); ++index) { |
| 544 | const JobPointer &candidate = d()->assignments.at(i: index); |
| 545 | if (d()->canBeExecuted(candidate)) { |
| 546 | next = candidate; |
| 547 | d()->assignments.removeAt(i: index); |
| 548 | break; |
| 549 | } |
| 550 | } |
| 551 | if (next) { |
| 552 | incActiveThreadCount(); |
| 553 | TWDEBUG(3, |
| 554 | "WeaverImpl::takeFirstAvailableJobOrWait: job %p assigned to thread %i (%s state).\n" , |
| 555 | next.data(), |
| 556 | th->id(), |
| 557 | qPrintable(state()->stateName())); |
| 558 | return next; |
| 559 | } |
| 560 | |
| 561 | blockThreadUntilJobsAreBeingAssigned_locked(th); |
| 562 | return JobPointer(); |
| 563 | } |
| 564 | |
| 565 | /** @brief Assign a job to the calling thread. |
| 566 | * |
| 567 | * This is supposed to be called from the Thread objects in the inventory. Do not call this method from |
| 568 | * your code. |
| 569 | * Returns 0 if the weaver is shutting down, telling the calling thread to finish and exit. If no jobs are |
| 570 | * available and shut down is not in progress, the calling thread is suspended until either condition is met. |
| 571 | * @param wasBusy True if the thread is returning from processing a job |
| 572 | */ |
| 573 | JobPointer Weaver::applyForWork(Thread *th, bool wasBusy) |
| 574 | { |
| 575 | return state()->applyForWork(th, wasBusy); |
| 576 | } |
| 577 | |
| 578 | /** @brief Wait for a job to become available. */ |
| 579 | void Weaver::waitForAvailableJob(Thread *th) |
| 580 | { |
| 581 | state()->waitForAvailableJob(th); |
| 582 | } |
| 583 | |
| 584 | /** @brief Blocks the calling thread until jobs can be assigned. */ |
| 585 | void Weaver::blockThreadUntilJobsAreBeingAssigned(Thread *th) |
| 586 | { |
| 587 | QMutexLocker l(d()->mutex); |
| 588 | Q_UNUSED(l); |
| 589 | blockThreadUntilJobsAreBeingAssigned_locked(th); |
| 590 | } |
| 591 | |
| 592 | /** @brief Blocks the calling thread until jobs can be assigned. |
| 593 | * |
| 594 | * The mutex must be held when calling this method. |
| 595 | */ |
| 596 | void Weaver::blockThreadUntilJobsAreBeingAssigned_locked(Thread *th) |
| 597 | { |
| 598 | Q_ASSERT(!d()->mutex->tryLock()); // mutex has to be held when this method is called |
| 599 | TWDEBUG(4, "WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i blocked (%s state).\n" , th->id(), qPrintable(state()->stateName())); |
| 600 | Q_EMIT threadSuspended(th); |
| 601 | d()->jobAvailable.wait(lockedMutex: d()->mutex); |
| 602 | TWDEBUG(4, "WeaverImpl::blockThreadUntilJobsAreBeingAssigned_locked: thread %i resumed (%s state).\n" , th->id(), qPrintable(state()->stateName())); |
| 603 | } |
| 604 | |
| 605 | #include "moc_weaver.cpp" |
| 606 | |