1 | /* -*- C++ -*- |
2 | The Queue class in ThreadWeaver. |
3 | |
4 | SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org> |
5 | |
6 | SPDX-License-Identifier: LGPL-2.0-or-later |
7 | */ |
8 | |
9 | #include <QCoreApplication> |
10 | #include <QList> |
11 | #include <QMutex> |
12 | |
13 | #include "queue.h" |
14 | #include "weaver.h" |
15 | |
16 | using namespace ThreadWeaver; |
17 | |
18 | namespace |
19 | { |
20 | static Queue::GlobalQueueFactory *globalQueueFactory; |
21 | } |
22 | |
23 | class Q_DECL_HIDDEN Queue::Private |
24 | { |
25 | public: |
26 | Private(Queue *q, QueueSignals *queue) |
27 | : implementation(queue) |
28 | { |
29 | Q_ASSERT_X(qApp != nullptr, Q_FUNC_INFO, "Cannot create global ThreadWeaver instance before QApplication!" ); |
30 | Q_ASSERT(queue); |
31 | queue->setParent(q); |
32 | q->connect(asender: implementation, SIGNAL(finished()), SIGNAL(finished())); |
33 | q->connect(asender: implementation, SIGNAL(suspended()), SIGNAL(suspended())); |
34 | } |
35 | |
36 | QueueSignals *implementation; |
37 | void init(QueueSignals *implementation); |
38 | }; |
39 | |
40 | /** @brief Construct a Queue. */ |
41 | Queue::Queue(QObject *parent) |
42 | : QueueSignals(parent) |
43 | , d(new Private(this, new Weaver)) |
44 | { |
45 | } |
46 | |
47 | /** @brief Construct a Queue, specifying the QueueSignals implementation to use. |
48 | * |
49 | * The QueueSignals instance is usually a Weaver object, which may be customized for specific |
50 | * application needs. The Weaver instance will take ownership of the implementation object and |
51 | * deletes it when destructed. |
52 | * @see Weaver |
53 | * @see GlobalQueueFactory |
54 | */ |
55 | Queue::Queue(QueueSignals *implementation, QObject *parent) |
56 | : QueueSignals(parent) |
57 | , d(new Private(this, implementation)) |
58 | { |
59 | } |
60 | |
61 | /** @brief Destruct the Queue object. |
62 | * |
63 | * If the queue is not already in Destructed state, the destructor will call shutDown() to make sure |
64 | * enqueued jobs are completed and the queue is idle. |
65 | * The queue implementation will be destroyed. |
66 | * @see shutDown() |
67 | * @see ThreadWeaver::Destructed |
68 | */ |
69 | Queue::~Queue() |
70 | { |
71 | if (d->implementation->state()->stateId() != Destructed) { |
72 | d->implementation->shutDown(); |
73 | } |
74 | delete d->implementation; |
75 | delete d; |
76 | } |
77 | |
78 | /** @brief Create a QueueStream to enqueue jobs into this queue. */ |
79 | QueueStream Queue::stream() |
80 | { |
81 | return QueueStream(this); |
82 | } |
83 | |
84 | void Queue::shutDown() |
85 | { |
86 | d->implementation->shutDown(); |
87 | } |
88 | |
89 | /** @brief Set the factory object that will create the global queue. |
90 | * |
91 | * Once set, the global queue factory will be deleted when the global ThreadWeaver pool is deleted. |
92 | * The factory object needs to be set before the global ThreadWeaver pool is instantiated. Call this |
93 | * method before Q(Core)Application is constructed. */ |
94 | void Queue::setGlobalQueueFactory(Queue::GlobalQueueFactory *factory) |
95 | { |
96 | if (globalQueueFactory) { |
97 | delete globalQueueFactory; |
98 | } |
99 | globalQueueFactory = factory; |
100 | } |
101 | |
102 | const State *Queue::state() const |
103 | { |
104 | return d->implementation->state(); |
105 | } |
106 | |
107 | namespace |
108 | { |
109 | class StaticThreadWeaverInstanceGuard : public QObject |
110 | { |
111 | Q_OBJECT |
112 | public: |
113 | explicit StaticThreadWeaverInstanceGuard(QAtomicPointer<Queue> &instance, QCoreApplication *app) |
114 | : QObject(app) |
115 | , instance_(instance) |
116 | { |
117 | Q_ASSERT_X(app != nullptr, Q_FUNC_INFO, "Calling ThreadWeaver::Weaver::instance() requires a QCoreApplication!" ); |
118 | QObject *impl = instance.loadRelaxed()->findChild<QueueSignals *>(); |
119 | Q_ASSERT(impl); |
120 | impl->setObjectName(QStringLiteral("GlobalQueue" )); |
121 | qAddPostRoutine(shutDownGlobalQueue); |
122 | } |
123 | |
124 | ~StaticThreadWeaverInstanceGuard() override |
125 | { |
126 | instance_.fetchAndStoreOrdered(newValue: nullptr); |
127 | delete globalQueueFactory; |
128 | globalQueueFactory = nullptr; |
129 | } |
130 | |
131 | private: |
132 | static void shutDownGlobalQueue() |
133 | { |
134 | Queue::instance()->shutDown(); |
135 | Q_ASSERT(Queue::instance()->state()->stateId() == Destructed); |
136 | } |
137 | |
138 | QAtomicPointer<Queue> &instance_; |
139 | }; |
140 | |
141 | } |
142 | |
143 | /** @brief Access the application-global Queue. |
144 | * |
145 | * In some cases, the global queue is sufficient for the applications purpose. The global queue will only be |
146 | * created if this method is actually called in the lifetime of the application. |
147 | * |
148 | * The Q(Core)Application object must exist when instance() is called for the first time. |
149 | * The global queue will be destroyed when Q(Core)Application is destructed. After that, the instance() method |
150 | * returns zero. |
151 | */ |
152 | Queue *Queue::instance() |
153 | { |
154 | static QAtomicPointer<Queue> s_instance(globalQueueFactory ? globalQueueFactory->create(qApp) : new Queue(qApp)); |
155 | // Order is of importance here: |
156 | // When s_instanceGuard is destructed (first, before s_instance), it sets the value of s_instance to zero. Next, qApp will delete |
157 | // the object s_instance pointed to. |
158 | static StaticThreadWeaverInstanceGuard *s_instanceGuard = new StaticThreadWeaverInstanceGuard(s_instance, qApp); |
159 | Q_UNUSED(s_instanceGuard); |
160 | Q_ASSERT_X(s_instance.loadRelaxed() == nullptr // |
161 | || s_instance.loadRelaxed()->thread() == QCoreApplication::instance()->thread(), |
162 | Q_FUNC_INFO, |
163 | "The global ThreadWeaver queue needs to be instantiated (accessed first) from the main thread!" ); |
164 | return s_instance.loadAcquire(); |
165 | } |
166 | |
167 | void Queue::enqueue(const QList<JobPointer> &jobs) |
168 | { |
169 | d->implementation->enqueue(jobs); |
170 | } |
171 | |
172 | void Queue::enqueue(const JobPointer &job) |
173 | { |
174 | enqueue(jobs: QList<JobPointer>() << job); |
175 | } |
176 | |
177 | bool Queue::dequeue(const JobPointer &job) |
178 | { |
179 | return d->implementation->dequeue(job); |
180 | } |
181 | |
182 | void Queue::dequeue() |
183 | { |
184 | return d->implementation->dequeue(); |
185 | } |
186 | |
187 | void Queue::finish() |
188 | { |
189 | return d->implementation->finish(); |
190 | } |
191 | |
192 | void Queue::suspend() |
193 | { |
194 | return d->implementation->suspend(); |
195 | } |
196 | |
197 | void Queue::resume() |
198 | { |
199 | return d->implementation->resume(); |
200 | } |
201 | |
202 | bool Queue::isEmpty() const |
203 | { |
204 | return d->implementation->isEmpty(); |
205 | } |
206 | |
207 | bool Queue::isIdle() const |
208 | { |
209 | return d->implementation->isIdle(); |
210 | } |
211 | |
212 | int Queue::queueLength() const |
213 | { |
214 | return d->implementation->queueLength(); |
215 | } |
216 | |
217 | void Queue::setMaximumNumberOfThreads(int cap) |
218 | { |
219 | d->implementation->setMaximumNumberOfThreads(cap); |
220 | } |
221 | |
222 | int Queue::currentNumberOfThreads() const |
223 | { |
224 | return d->implementation->currentNumberOfThreads(); |
225 | } |
226 | |
227 | int Queue::maximumNumberOfThreads() const |
228 | { |
229 | return d->implementation->maximumNumberOfThreads(); |
230 | } |
231 | |
232 | void Queue::requestAbort() |
233 | { |
234 | d->implementation->requestAbort(); |
235 | } |
236 | |
237 | void Queue::reschedule() |
238 | { |
239 | d->implementation->reschedule(); |
240 | } |
241 | |
242 | #include "moc_queue.cpp" |
243 | #include "queue.moc" |
244 | |