| 1 | /* -*- C++ -*- |
| 2 | The Queue class in ThreadWeaver. |
| 3 | |
| 4 | SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org> |
| 5 | |
| 6 | SPDX-License-Identifier: LGPL-2.0-or-later |
| 7 | */ |
| 8 | |
| 9 | #include <QCoreApplication> |
| 10 | #include <QList> |
| 11 | #include <QMutex> |
| 12 | |
| 13 | #include "queue.h" |
| 14 | #include "weaver.h" |
| 15 | |
| 16 | using namespace ThreadWeaver; |
| 17 | |
| 18 | namespace |
| 19 | { |
| 20 | static Queue::GlobalQueueFactory *globalQueueFactory; |
| 21 | } |
| 22 | |
| 23 | class Q_DECL_HIDDEN Queue::Private |
| 24 | { |
| 25 | public: |
| 26 | Private(Queue *q, QueueSignals *queue) |
| 27 | : implementation(queue) |
| 28 | { |
| 29 | Q_ASSERT_X(qApp != nullptr, Q_FUNC_INFO, "Cannot create global ThreadWeaver instance before QApplication!" ); |
| 30 | Q_ASSERT(queue); |
| 31 | queue->setParent(q); |
| 32 | q->connect(asender: implementation, SIGNAL(finished()), SIGNAL(finished())); |
| 33 | q->connect(asender: implementation, SIGNAL(suspended()), SIGNAL(suspended())); |
| 34 | } |
| 35 | |
| 36 | QueueSignals *implementation; |
| 37 | void init(QueueSignals *implementation); |
| 38 | }; |
| 39 | |
| 40 | /** @brief Construct a Queue. */ |
| 41 | Queue::Queue(QObject *parent) |
| 42 | : QueueSignals(parent) |
| 43 | , d(new Private(this, new Weaver)) |
| 44 | { |
| 45 | } |
| 46 | |
| 47 | /** @brief Construct a Queue, specifying the QueueSignals implementation to use. |
| 48 | * |
| 49 | * The QueueSignals instance is usually a Weaver object, which may be customized for specific |
| 50 | * application needs. The Weaver instance will take ownership of the implementation object and |
| 51 | * deletes it when destructed. |
| 52 | * @see Weaver |
| 53 | * @see GlobalQueueFactory |
| 54 | */ |
| 55 | Queue::Queue(QueueSignals *implementation, QObject *parent) |
| 56 | : QueueSignals(parent) |
| 57 | , d(new Private(this, implementation)) |
| 58 | { |
| 59 | } |
| 60 | |
| 61 | /** @brief Destruct the Queue object. |
| 62 | * |
| 63 | * If the queue is not already in Destructed state, the destructor will call shutDown() to make sure |
| 64 | * enqueued jobs are completed and the queue is idle. |
| 65 | * The queue implementation will be destroyed. |
| 66 | * @see shutDown() |
| 67 | * @see ThreadWeaver::Destructed |
| 68 | */ |
| 69 | Queue::~Queue() |
| 70 | { |
| 71 | if (d->implementation->state()->stateId() != Destructed) { |
| 72 | d->implementation->shutDown(); |
| 73 | } |
| 74 | delete d->implementation; |
| 75 | delete d; |
| 76 | } |
| 77 | |
| 78 | /** @brief Create a QueueStream to enqueue jobs into this queue. */ |
| 79 | QueueStream Queue::stream() |
| 80 | { |
| 81 | return QueueStream(this); |
| 82 | } |
| 83 | |
| 84 | void Queue::shutDown() |
| 85 | { |
| 86 | d->implementation->shutDown(); |
| 87 | } |
| 88 | |
| 89 | /** @brief Set the factory object that will create the global queue. |
| 90 | * |
| 91 | * Once set, the global queue factory will be deleted when the global ThreadWeaver pool is deleted. |
| 92 | * The factory object needs to be set before the global ThreadWeaver pool is instantiated. Call this |
| 93 | * method before Q(Core)Application is constructed. */ |
| 94 | void Queue::setGlobalQueueFactory(Queue::GlobalQueueFactory *factory) |
| 95 | { |
| 96 | if (globalQueueFactory) { |
| 97 | delete globalQueueFactory; |
| 98 | } |
| 99 | globalQueueFactory = factory; |
| 100 | } |
| 101 | |
| 102 | const State *Queue::state() const |
| 103 | { |
| 104 | return d->implementation->state(); |
| 105 | } |
| 106 | |
| 107 | namespace |
| 108 | { |
| 109 | class StaticThreadWeaverInstanceGuard : public QObject |
| 110 | { |
| 111 | Q_OBJECT |
| 112 | public: |
| 113 | explicit StaticThreadWeaverInstanceGuard(QAtomicPointer<Queue> &instance, QCoreApplication *app) |
| 114 | : QObject(app) |
| 115 | , instance_(instance) |
| 116 | { |
| 117 | Q_ASSERT_X(app != nullptr, Q_FUNC_INFO, "Calling ThreadWeaver::Weaver::instance() requires a QCoreApplication!" ); |
| 118 | QObject *impl = instance.loadRelaxed()->findChild<QueueSignals *>(); |
| 119 | Q_ASSERT(impl); |
| 120 | impl->setObjectName(QStringLiteral("GlobalQueue" )); |
| 121 | qAddPostRoutine(shutDownGlobalQueue); |
| 122 | } |
| 123 | |
| 124 | ~StaticThreadWeaverInstanceGuard() override |
| 125 | { |
| 126 | instance_.fetchAndStoreOrdered(newValue: nullptr); |
| 127 | delete globalQueueFactory; |
| 128 | globalQueueFactory = nullptr; |
| 129 | } |
| 130 | |
| 131 | private: |
| 132 | static void shutDownGlobalQueue() |
| 133 | { |
| 134 | Queue::instance()->shutDown(); |
| 135 | Q_ASSERT(Queue::instance()->state()->stateId() == Destructed); |
| 136 | } |
| 137 | |
| 138 | QAtomicPointer<Queue> &instance_; |
| 139 | }; |
| 140 | |
| 141 | } |
| 142 | |
| 143 | /** @brief Access the application-global Queue. |
| 144 | * |
| 145 | * In some cases, the global queue is sufficient for the applications purpose. The global queue will only be |
| 146 | * created if this method is actually called in the lifetime of the application. |
| 147 | * |
| 148 | * The Q(Core)Application object must exist when instance() is called for the first time. |
| 149 | * The global queue will be destroyed when Q(Core)Application is destructed. After that, the instance() method |
| 150 | * returns zero. |
| 151 | */ |
| 152 | Queue *Queue::instance() |
| 153 | { |
| 154 | static QAtomicPointer<Queue> s_instance(globalQueueFactory ? globalQueueFactory->create(qApp) : new Queue(qApp)); |
| 155 | // Order is of importance here: |
| 156 | // When s_instanceGuard is destructed (first, before s_instance), it sets the value of s_instance to zero. Next, qApp will delete |
| 157 | // the object s_instance pointed to. |
| 158 | static StaticThreadWeaverInstanceGuard *s_instanceGuard = new StaticThreadWeaverInstanceGuard(s_instance, qApp); |
| 159 | Q_UNUSED(s_instanceGuard); |
| 160 | Q_ASSERT_X(s_instance.loadRelaxed() == nullptr // |
| 161 | || s_instance.loadRelaxed()->thread() == QCoreApplication::instance()->thread(), |
| 162 | Q_FUNC_INFO, |
| 163 | "The global ThreadWeaver queue needs to be instantiated (accessed first) from the main thread!" ); |
| 164 | return s_instance.loadAcquire(); |
| 165 | } |
| 166 | |
| 167 | void Queue::enqueue(const QList<JobPointer> &jobs) |
| 168 | { |
| 169 | d->implementation->enqueue(jobs); |
| 170 | } |
| 171 | |
| 172 | void Queue::enqueue(const JobPointer &job) |
| 173 | { |
| 174 | enqueue(jobs: QList<JobPointer>() << job); |
| 175 | } |
| 176 | |
| 177 | bool Queue::dequeue(const JobPointer &job) |
| 178 | { |
| 179 | return d->implementation->dequeue(job); |
| 180 | } |
| 181 | |
| 182 | void Queue::dequeue() |
| 183 | { |
| 184 | return d->implementation->dequeue(); |
| 185 | } |
| 186 | |
| 187 | void Queue::finish() |
| 188 | { |
| 189 | return d->implementation->finish(); |
| 190 | } |
| 191 | |
| 192 | void Queue::suspend() |
| 193 | { |
| 194 | return d->implementation->suspend(); |
| 195 | } |
| 196 | |
| 197 | void Queue::resume() |
| 198 | { |
| 199 | return d->implementation->resume(); |
| 200 | } |
| 201 | |
| 202 | bool Queue::isEmpty() const |
| 203 | { |
| 204 | return d->implementation->isEmpty(); |
| 205 | } |
| 206 | |
| 207 | bool Queue::isIdle() const |
| 208 | { |
| 209 | return d->implementation->isIdle(); |
| 210 | } |
| 211 | |
| 212 | int Queue::queueLength() const |
| 213 | { |
| 214 | return d->implementation->queueLength(); |
| 215 | } |
| 216 | |
| 217 | void Queue::setMaximumNumberOfThreads(int cap) |
| 218 | { |
| 219 | d->implementation->setMaximumNumberOfThreads(cap); |
| 220 | } |
| 221 | |
| 222 | int Queue::currentNumberOfThreads() const |
| 223 | { |
| 224 | return d->implementation->currentNumberOfThreads(); |
| 225 | } |
| 226 | |
| 227 | int Queue::maximumNumberOfThreads() const |
| 228 | { |
| 229 | return d->implementation->maximumNumberOfThreads(); |
| 230 | } |
| 231 | |
| 232 | void Queue::requestAbort() |
| 233 | { |
| 234 | d->implementation->requestAbort(); |
| 235 | } |
| 236 | |
| 237 | void Queue::reschedule() |
| 238 | { |
| 239 | d->implementation->reschedule(); |
| 240 | } |
| 241 | |
| 242 | #include "moc_queue.cpp" |
| 243 | #include "queue.moc" |
| 244 | |