1 | /* |
2 | This file is part of the KDE libraries |
3 | SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org> |
4 | SPDX-FileCopyrightText: 2000 Waldo Bastian <bastian@kde.org> |
5 | SPDX-FileCopyrightText: 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com> |
6 | |
7 | SPDX-License-Identifier: LGPL-2.0-only |
8 | */ |
9 | |
10 | #include "scheduler.h" |
11 | #include "scheduler_p.h" |
12 | |
13 | #include "job_p.h" |
14 | #include "worker_p.h" |
15 | #include "workerconfig.h" |
16 | |
17 | #include <kprotocolinfo.h> |
18 | #include <kprotocolmanager.h> |
19 | |
20 | #ifdef WITH_QTDBUS |
21 | #include <QDBusConnection> |
22 | #include <QDBusMessage> |
23 | #endif |
24 | #include <QHash> |
25 | #include <QThread> |
26 | #include <QThreadStorage> |
27 | |
28 | // Workers may be idle for a certain time (3 minutes) before they are killed. |
29 | static const int s_idleWorkerLifetime = 3 * 60; |
30 | |
31 | using namespace KIO; |
32 | |
33 | static inline Worker *jobSWorker(SimpleJob *job) |
34 | { |
35 | return SimpleJobPrivate::get(job)->m_worker; |
36 | } |
37 | |
38 | static inline void startJob(SimpleJob *job, Worker *worker) |
39 | { |
40 | SimpleJobPrivate::get(job)->start(worker); |
41 | } |
42 | |
43 | class KIO::SchedulerPrivate |
44 | { |
45 | public: |
46 | SchedulerPrivate() |
47 | : q(new Scheduler()) |
48 | { |
49 | } |
50 | |
51 | ~SchedulerPrivate() |
52 | { |
53 | removeWorkerOnHold(); |
54 | delete q; |
55 | q = nullptr; |
56 | qDeleteAll(c: m_protocols); // ~ProtoQueue will kill and delete all workers |
57 | } |
58 | |
59 | SchedulerPrivate(const SchedulerPrivate &) = delete; |
60 | SchedulerPrivate &operator=(const SchedulerPrivate &) = delete; |
61 | |
62 | Scheduler *q; |
63 | |
64 | Worker *m_workerOnHold = nullptr; |
65 | QUrl m_urlOnHold; |
66 | bool m_ignoreConfigReparse = false; |
67 | |
68 | void doJob(SimpleJob *job); |
69 | void cancelJob(SimpleJob *job); |
70 | void jobFinished(KIO::SimpleJob *job, KIO::Worker *worker); |
71 | void putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url); |
72 | void removeWorkerOnHold(); |
73 | Worker *heldWorkerForJob(KIO::SimpleJob *job); |
74 | bool isWorkerOnHoldFor(const QUrl &url); |
75 | void updateInternalMetaData(SimpleJob *job); |
76 | |
77 | MetaData metaDataFor(const QString &protocol, const QUrl &url); |
78 | void setupWorker(KIO::Worker *worker, const QUrl &url, const QString &protocol, bool newWorker, const KIO::MetaData *config = nullptr); |
79 | |
80 | void slotWorkerDied(KIO::Worker *worker); |
81 | |
82 | #ifdef WITH_QTDBUS |
83 | void slotReparseSlaveConfiguration(const QString &, const QDBusMessage &); |
84 | #endif |
85 | |
86 | ProtoQueue *protoQ(const QString &protocol, const QString &host); |
87 | |
88 | private: |
89 | QHash<QString, ProtoQueue *> m_protocols; |
90 | }; |
91 | |
92 | static QThreadStorage<SchedulerPrivate *> s_storage; |
93 | static SchedulerPrivate *schedulerPrivate() |
94 | { |
95 | if (!s_storage.hasLocalData()) { |
96 | s_storage.setLocalData(new SchedulerPrivate); |
97 | } |
98 | return s_storage.localData(); |
99 | } |
100 | |
101 | Scheduler *Scheduler::self() |
102 | { |
103 | return schedulerPrivate()->q; |
104 | } |
105 | |
106 | SchedulerPrivate *Scheduler::d_func() |
107 | { |
108 | return schedulerPrivate(); |
109 | } |
110 | |
111 | // static |
112 | Scheduler *scheduler() |
113 | { |
114 | return schedulerPrivate()->q; |
115 | } |
116 | |
117 | //////////////////////////// |
118 | |
119 | WorkerManager::WorkerManager() |
120 | { |
121 | m_grimTimer.setSingleShot(true); |
122 | connect(sender: &m_grimTimer, signal: &QTimer::timeout, context: this, slot: &WorkerManager::grimReaper); |
123 | } |
124 | |
125 | WorkerManager::~WorkerManager() |
126 | { |
127 | grimReaper(); |
128 | } |
129 | |
130 | void WorkerManager::returnWorker(Worker *worker) |
131 | { |
132 | Q_ASSERT(worker); |
133 | worker->setIdle(); |
134 | m_idleWorkers.insert(key: worker->host(), value: worker); |
135 | scheduleGrimReaper(); |
136 | } |
137 | |
138 | Worker *WorkerManager::takeWorkerForJob(SimpleJob *job) |
139 | { |
140 | Worker *worker = schedulerPrivate()->heldWorkerForJob(job); |
141 | if (worker) { |
142 | return worker; |
143 | } |
144 | |
145 | QUrl url = SimpleJobPrivate::get(job)->m_url; |
146 | // TODO take port, username and password into account |
147 | QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.find(key: url.host()); |
148 | if (it == m_idleWorkers.end()) { |
149 | it = m_idleWorkers.begin(); |
150 | } |
151 | if (it == m_idleWorkers.end()) { |
152 | return nullptr; |
153 | } |
154 | worker = it.value(); |
155 | m_idleWorkers.erase(it); |
156 | return worker; |
157 | } |
158 | |
159 | bool WorkerManager::removeWorker(Worker *worker) |
160 | { |
161 | // ### performance not so great |
162 | QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin(); |
163 | for (; it != m_idleWorkers.end(); ++it) { |
164 | if (it.value() == worker) { |
165 | m_idleWorkers.erase(it); |
166 | return true; |
167 | } |
168 | } |
169 | return false; |
170 | } |
171 | |
172 | void WorkerManager::clear() |
173 | { |
174 | m_idleWorkers.clear(); |
175 | } |
176 | |
177 | QList<Worker *> WorkerManager::allWorkers() const |
178 | { |
179 | return m_idleWorkers.values(); |
180 | } |
181 | |
182 | void WorkerManager::scheduleGrimReaper() |
183 | { |
184 | if (!m_grimTimer.isActive()) { |
185 | m_grimTimer.start(msec: (s_idleWorkerLifetime / 2) * 1000); |
186 | } |
187 | } |
188 | |
189 | // private slot |
190 | void WorkerManager::grimReaper() |
191 | { |
192 | QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin(); |
193 | while (it != m_idleWorkers.end()) { |
194 | Worker *worker = it.value(); |
195 | if (worker->idleTime() >= s_idleWorkerLifetime) { |
196 | it = m_idleWorkers.erase(it); |
197 | if (worker->job()) { |
198 | // qDebug() << "Idle worker" << worker << "still has job" << worker->job(); |
199 | } |
200 | // avoid invoking slotWorkerDied() because its cleanup services are not needed |
201 | worker->kill(); |
202 | } else { |
203 | ++it; |
204 | } |
205 | } |
206 | if (!m_idleWorkers.isEmpty()) { |
207 | scheduleGrimReaper(); |
208 | } |
209 | } |
210 | |
211 | int HostQueue::lowestSerial() const |
212 | { |
213 | QMap<int, SimpleJob *>::ConstIterator first = m_queuedJobs.constBegin(); |
214 | if (first != m_queuedJobs.constEnd()) { |
215 | return first.key(); |
216 | } |
217 | return SerialPicker::maxSerial; |
218 | } |
219 | |
220 | void HostQueue::queueJob(SimpleJob *job) |
221 | { |
222 | const int serial = SimpleJobPrivate::get(job)->m_schedSerial; |
223 | Q_ASSERT(serial != 0); |
224 | Q_ASSERT(!m_queuedJobs.contains(serial)); |
225 | Q_ASSERT(!m_runningJobs.contains(job)); |
226 | m_queuedJobs.insert(key: serial, value: job); |
227 | } |
228 | |
229 | SimpleJob *HostQueue::takeFirstInQueue() |
230 | { |
231 | Q_ASSERT(!m_queuedJobs.isEmpty()); |
232 | QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin(); |
233 | SimpleJob *job = first.value(); |
234 | m_queuedJobs.erase(it: first); |
235 | m_runningJobs.insert(value: job); |
236 | return job; |
237 | } |
238 | |
239 | bool HostQueue::removeJob(SimpleJob *job) |
240 | { |
241 | const int serial = SimpleJobPrivate::get(job)->m_schedSerial; |
242 | if (m_runningJobs.remove(value: job)) { |
243 | Q_ASSERT(!m_queuedJobs.contains(serial)); |
244 | return true; |
245 | } |
246 | if (m_queuedJobs.remove(key: serial)) { |
247 | return true; |
248 | } |
249 | return false; |
250 | } |
251 | |
252 | QList<Worker *> HostQueue::allWorkers() const |
253 | { |
254 | QList<Worker *> ret; |
255 | ret.reserve(asize: m_runningJobs.size()); |
256 | for (SimpleJob *job : m_runningJobs) { |
257 | Worker *worker = jobSWorker(job); |
258 | Q_ASSERT(worker); |
259 | ret.append(t: worker); |
260 | } |
261 | return ret; |
262 | } |
263 | |
264 | static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial) |
265 | { |
266 | Q_UNUSED(queuesBySerial); |
267 | #ifdef SCHEDULER_DEBUG |
268 | // a host queue may *never* be in queuesBySerial twice. |
269 | QSet<HostQueue *> seen; |
270 | auto it = queuesBySerial->cbegin(); |
271 | for (; it != queuesBySerial->cend(); ++it) { |
272 | Q_ASSERT(!seen.contains(it.value())); |
273 | seen.insert(it.value()); |
274 | } |
275 | #endif |
276 | } |
277 | |
278 | static void verifyRunningJobsCount(const std::unordered_map<QString, HostQueue> *const queues, int runningJobsCount) |
279 | { |
280 | Q_UNUSED(queues); |
281 | Q_UNUSED(runningJobsCount); |
282 | #ifdef SCHEDULER_DEBUG |
283 | int realRunningJobsCount = 0; |
284 | auto it = queues->cbegin(); |
285 | for (; it != queues->cend(); ++it) { |
286 | realRunningJobsCount += it->second.runningJobsCount(); |
287 | } |
288 | Q_ASSERT(realRunningJobsCount == runningJobsCount); |
289 | |
290 | // ...and of course we may never run the same job twice! |
291 | QSet<SimpleJob *> seenJobs; |
292 | auto it2 = queues->cbegin(); |
293 | for (; it2 != queues->cend(); ++it2) { |
294 | for (SimpleJob *job : it2->second.runningJobs()) { |
295 | Q_ASSERT(!seenJobs.contains(job)); |
296 | seenJobs.insert(job); |
297 | } |
298 | } |
299 | #endif |
300 | } |
301 | |
302 | ProtoQueue::ProtoQueue(int maxWorkers, int maxWorkersPerHost) |
303 | : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers) |
304 | , m_maxConnectionsTotal(qMax(a: maxWorkers, b: maxWorkersPerHost)) |
305 | , m_runningJobsCount(0) |
306 | |
307 | { |
308 | /*qDebug() << "m_maxConnectionsTotal:" << m_maxConnectionsTotal |
309 | << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;*/ |
310 | Q_ASSERT(m_maxConnectionsPerHost >= 1); |
311 | Q_ASSERT(maxWorkers >= maxWorkersPerHost); |
312 | m_startJobTimer.setSingleShot(true); |
313 | connect(sender: &m_startJobTimer, signal: &QTimer::timeout, context: this, slot: &ProtoQueue::startAJob); |
314 | } |
315 | |
316 | ProtoQueue::~ProtoQueue() |
317 | { |
318 | // Gather list of all workers first |
319 | const QList<Worker *> workers = allWorkers(); |
320 | // Clear the idle workers in the manager to avoid dangling pointers |
321 | m_workerManager.clear(); |
322 | for (Worker *worker : workers) { |
323 | // kill the worker process and remove the interface in our process |
324 | worker->kill(); |
325 | } |
326 | } |
327 | |
328 | void ProtoQueue::queueJob(SimpleJob *job) |
329 | { |
330 | QString hostname = SimpleJobPrivate::get(job)->m_url.host(); |
331 | HostQueue &hq = m_queuesByHostname[hostname]; |
332 | const int prevLowestSerial = hq.lowestSerial(); |
333 | Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); |
334 | |
335 | // nevert insert a job twice |
336 | Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0); |
337 | SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next(); |
338 | |
339 | const bool wasQueueEmpty = hq.isQueueEmpty(); |
340 | hq.queueJob(job); |
341 | // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too... |
342 | // the queue's lowest serial job may have changed, so update the ordered list of queues. |
343 | // however, we ignore all jobs that would cause more connections to a host than allowed. |
344 | if (prevLowestSerial != hq.lowestSerial()) { |
345 | if (hq.runningJobsCount() < m_maxConnectionsPerHost) { |
346 | // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs |
347 | if (m_queuesBySerial.remove(key: prevLowestSerial) == 0) { |
348 | Q_UNUSED(wasQueueEmpty); |
349 | Q_ASSERT(wasQueueEmpty); |
350 | } |
351 | m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq); |
352 | } else { |
353 | #ifdef SCHEDULER_DEBUG |
354 | // ### this assertion may fail if the limits were modified at runtime! |
355 | // if the per-host connection limit is already reached the host queue's lowest serial |
356 | // should not be queued. |
357 | Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial)); |
358 | #endif |
359 | } |
360 | } |
361 | // just in case; startAJob() will refuse to start a job if it shouldn't. |
362 | m_startJobTimer.start(); |
363 | |
364 | ensureNoDuplicates(queuesBySerial: &m_queuesBySerial); |
365 | } |
366 | |
367 | void ProtoQueue::removeJob(SimpleJob *job) |
368 | { |
369 | SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job); |
370 | HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()]; |
371 | const int prevLowestSerial = hq.lowestSerial(); |
372 | const int prevRunningJobs = hq.runningJobsCount(); |
373 | |
374 | Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); |
375 | |
376 | if (hq.removeJob(job)) { |
377 | if (hq.lowestSerial() != prevLowestSerial) { |
378 | // we have dequeued the not yet running job with the lowest serial |
379 | Q_ASSERT(!jobPriv->m_worker); |
380 | Q_ASSERT(prevRunningJobs == hq.runningJobsCount()); |
381 | if (m_queuesBySerial.remove(key: prevLowestSerial) == 0) { |
382 | // make sure that the queue was not scheduled for a good reason |
383 | Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost); |
384 | } |
385 | } else { |
386 | if (prevRunningJobs != hq.runningJobsCount()) { |
387 | // we have dequeued a previously running job |
388 | Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount()); |
389 | m_runningJobsCount--; |
390 | Q_ASSERT(m_runningJobsCount >= 0); |
391 | } |
392 | } |
393 | if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) { |
394 | // this may be a no-op, but it's faster than first checking if it's already in. |
395 | m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq); |
396 | } |
397 | |
398 | if (hq.isEmpty()) { |
399 | // no queued jobs, no running jobs. this destroys hq from above. |
400 | m_queuesByHostname.erase(x: jobPriv->m_url.host()); |
401 | } |
402 | |
403 | if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) { |
404 | m_workerManager.returnWorker(worker: jobPriv->m_worker); |
405 | } |
406 | // just in case; startAJob() will refuse to start a job if it shouldn't. |
407 | m_startJobTimer.start(); |
408 | } |
409 | |
410 | ensureNoDuplicates(queuesBySerial: &m_queuesBySerial); |
411 | } |
412 | |
413 | Worker *ProtoQueue::createWorker(const QString &protocol, SimpleJob *job, const QUrl &url) |
414 | { |
415 | int error; |
416 | QString errortext; |
417 | Worker *worker = Worker::createWorker(protocol, url, error, error_text&: errortext); |
418 | if (worker) { |
419 | connect(sender: worker, signal: &Worker::workerDied, context: scheduler(), slot: [](KIO::Worker *worker) { |
420 | schedulerPrivate()->slotWorkerDied(worker); |
421 | }); |
422 | } else { |
423 | qCWarning(KIO_CORE) << "couldn't create worker:" << errortext; |
424 | if (job) { |
425 | job->slotError(error, errortext); |
426 | } |
427 | } |
428 | return worker; |
429 | } |
430 | |
431 | bool ProtoQueue::removeWorker(KIO::Worker *worker) |
432 | { |
433 | const bool removed = m_workerManager.removeWorker(worker); |
434 | return removed; |
435 | } |
436 | |
437 | QList<Worker *> ProtoQueue::allWorkers() const |
438 | { |
439 | QList<Worker *> ret(m_workerManager.allWorkers()); |
440 | auto it = m_queuesByHostname.cbegin(); |
441 | for (; it != m_queuesByHostname.cend(); ++it) { |
442 | ret.append(other: it->second.allWorkers()); |
443 | } |
444 | |
445 | return ret; |
446 | } |
447 | |
448 | // private slot |
449 | void ProtoQueue::startAJob() |
450 | { |
451 | ensureNoDuplicates(queuesBySerial: &m_queuesBySerial); |
452 | verifyRunningJobsCount(queues: &m_queuesByHostname, runningJobsCount: m_runningJobsCount); |
453 | |
454 | #ifdef SCHEDULER_DEBUG |
455 | // qDebug() << "m_runningJobsCount:" << m_runningJobsCount; |
456 | auto it = m_queuesByHostname.cbegin(); |
457 | for (; it != m_queuesByHostname.cend(); ++it) { |
458 | const QList<KIO::SimpleJob *> list = it->second.runningJobs(); |
459 | for (SimpleJob *job : list) { |
460 | // qDebug() << SimpleJobPrivate::get(job)->m_url; |
461 | } |
462 | } |
463 | #endif |
464 | if (m_runningJobsCount >= m_maxConnectionsTotal) { |
465 | #ifdef SCHEDULER_DEBUG |
466 | // qDebug() << "not starting any jobs because maxConnectionsTotal has been reached."; |
467 | #endif |
468 | return; |
469 | } |
470 | |
471 | QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin(); |
472 | if (first != m_queuesBySerial.end()) { |
473 | // pick a job and maintain the queue invariant: lower serials first |
474 | HostQueue *hq = first.value(); |
475 | const int prevLowestSerial = first.key(); |
476 | Q_UNUSED(prevLowestSerial); |
477 | Q_ASSERT(hq->lowestSerial() == prevLowestSerial); |
478 | // the following assertions should hold due to queueJob(), takeFirstInQueue() and |
479 | // removeJob() being correct |
480 | Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost); |
481 | SimpleJob *startingJob = hq->takeFirstInQueue(); |
482 | Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost); |
483 | Q_ASSERT(hq->lowestSerial() != prevLowestSerial); |
484 | |
485 | m_queuesBySerial.erase(it: first); |
486 | // we've increased hq's runningJobsCount() by calling nexStartingJob() |
487 | // so we need to check again. |
488 | if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) { |
489 | m_queuesBySerial.insert(key: hq->lowestSerial(), value: hq); |
490 | } |
491 | |
492 | // always increase m_runningJobsCount because it's correct if there is a worker and if there |
493 | // is no worker, removeJob() will balance the number again. removeJob() would decrease the |
494 | // number too much otherwise. |
495 | // Note that createWorker() can call slotError() on a job which in turn calls removeJob(), |
496 | // so increase the count here already. |
497 | m_runningJobsCount++; |
498 | |
499 | bool isNewWorker = false; |
500 | Worker *worker = m_workerManager.takeWorkerForJob(job: startingJob); |
501 | SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job: startingJob); |
502 | if (!worker) { |
503 | isNewWorker = true; |
504 | worker = createWorker(protocol: jobPriv->m_protocol, job: startingJob, url: jobPriv->m_url); |
505 | } |
506 | |
507 | if (worker) { |
508 | jobPriv->m_worker = worker; |
509 | schedulerPrivate()->setupWorker(worker, url: jobPriv->m_url, protocol: jobPriv->m_protocol, newWorker: isNewWorker); |
510 | startJob(job: startingJob, worker); |
511 | } else { |
512 | // dispose of our records about the job and mark the job as unknown |
513 | // (to prevent crashes later) |
514 | // note that the job's slotError() can have called removeJob() first, so check that |
515 | // it's not a ghost job with null serial already. |
516 | if (jobPriv->m_schedSerial) { |
517 | removeJob(job: startingJob); |
518 | jobPriv->m_schedSerial = 0; |
519 | } |
520 | } |
521 | } else { |
522 | #ifdef SCHEDULER_DEBUG |
523 | // qDebug() << "not starting any jobs because there is no queued job."; |
524 | #endif |
525 | } |
526 | |
527 | if (!m_queuesBySerial.isEmpty()) { |
528 | m_startJobTimer.start(); |
529 | } |
530 | } |
531 | |
532 | Scheduler::Scheduler() |
533 | { |
534 | setObjectName(QStringLiteral("scheduler" )); |
535 | |
536 | #ifdef WITH_QTDBUS |
537 | const QString dbusPath = QStringLiteral("/KIO/Scheduler" ); |
538 | const QString dbusInterface = QStringLiteral("org.kde.KIO.Scheduler" ); |
539 | QDBusConnection dbus = QDBusConnection::sessionBus(); |
540 | // Not needed, right? We just want to emit two signals. |
541 | // dbus.registerObject(dbusPath, this, QDBusConnection::ExportScriptableSlots | |
542 | // QDBusConnection::ExportScriptableSignals); |
543 | dbus.connect(service: QString(), |
544 | path: dbusPath, |
545 | interface: dbusInterface, |
546 | QStringLiteral("reparseSlaveConfiguration" ), |
547 | receiver: this, |
548 | SLOT(slotReparseSlaveConfiguration(QString, QDBusMessage))); |
549 | #endif |
550 | } |
551 | |
552 | Scheduler::~Scheduler() |
553 | { |
554 | } |
555 | |
556 | void Scheduler::doJob(SimpleJob *job) |
557 | { |
558 | schedulerPrivate()->doJob(job); |
559 | } |
560 | |
561 | // static |
562 | void Scheduler::cancelJob(SimpleJob *job) |
563 | { |
564 | schedulerPrivate()->cancelJob(job); |
565 | } |
566 | |
567 | void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Worker *worker) |
568 | { |
569 | schedulerPrivate()->jobFinished(job, worker); |
570 | } |
571 | |
572 | void Scheduler::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url) |
573 | { |
574 | schedulerPrivate()->putWorkerOnHold(job, url); |
575 | } |
576 | |
577 | void Scheduler::removeWorkerOnHold() |
578 | { |
579 | schedulerPrivate()->removeWorkerOnHold(); |
580 | } |
581 | |
582 | bool Scheduler::isWorkerOnHoldFor(const QUrl &url) |
583 | { |
584 | return schedulerPrivate()->isWorkerOnHoldFor(url); |
585 | } |
586 | |
587 | void Scheduler::updateInternalMetaData(SimpleJob *job) |
588 | { |
589 | schedulerPrivate()->updateInternalMetaData(job); |
590 | } |
591 | |
592 | void Scheduler::emitReparseSlaveConfiguration() |
593 | { |
594 | #ifdef WITH_QTDBUS |
595 | // Do it immediately in this process, otherwise we might send a request before reparsing |
596 | // (e.g. when changing useragent in the plugin) |
597 | schedulerPrivate()->slotReparseSlaveConfiguration(QString(), QDBusMessage()); |
598 | #endif |
599 | |
600 | schedulerPrivate()->m_ignoreConfigReparse = true; |
601 | Q_EMIT self()->reparseSlaveConfiguration(QString()); |
602 | } |
603 | |
604 | #ifdef WITH_QTDBUS |
605 | void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage &) |
606 | { |
607 | if (m_ignoreConfigReparse) { |
608 | // qDebug() << "Ignoring signal sent by myself"; |
609 | m_ignoreConfigReparse = false; |
610 | return; |
611 | } |
612 | |
613 | // qDebug() << "proto=" << proto; |
614 | KProtocolManager::reparseConfiguration(); |
615 | WorkerConfig::self()->reset(); |
616 | |
617 | QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : m_protocols.constFind(key: proto); |
618 | QHash<QString, ProtoQueue *>::ConstIterator endIt = m_protocols.constEnd(); |
619 | |
620 | // not found? |
621 | if (it == endIt) { |
622 | return; |
623 | } |
624 | |
625 | if (!proto.isEmpty()) { |
626 | endIt = it; |
627 | ++endIt; |
628 | } |
629 | |
630 | for (; it != endIt; ++it) { |
631 | const QList<KIO::Worker *> list = it.value()->allWorkers(); |
632 | for (Worker *worker : list) { |
633 | worker->send(cmd: CMD_REPARSECONFIGURATION); |
634 | worker->resetHost(); |
635 | } |
636 | } |
637 | } |
638 | #endif |
639 | |
640 | void SchedulerPrivate::doJob(SimpleJob *job) |
641 | { |
642 | // qDebug() << job; |
643 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
644 | jobPriv->m_protocol = job->url().scheme(); |
645 | |
646 | ProtoQueue *proto = protoQ(protocol: jobPriv->m_protocol, host: job->url().host()); |
647 | proto->queueJob(job); |
648 | } |
649 | |
650 | void SchedulerPrivate::cancelJob(SimpleJob *job) |
651 | { |
652 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
653 | // this method is called all over the place in job.cpp, so just do this check here to avoid |
654 | // much boilerplate in job code. |
655 | if (jobPriv->m_schedSerial == 0) { |
656 | // qDebug() << "Doing nothing because I don't know job" << job; |
657 | return; |
658 | } |
659 | Worker *worker = jobSWorker(job); |
660 | // qDebug() << job << worker; |
661 | jobFinished(job, worker); |
662 | if (worker) { |
663 | ProtoQueue *pq = m_protocols.value(key: jobPriv->m_protocol); |
664 | if (pq) { |
665 | pq->removeWorker(worker); |
666 | } |
667 | worker->kill(); // don't use worker after this! |
668 | } |
669 | } |
670 | |
671 | void SchedulerPrivate::jobFinished(SimpleJob *job, Worker *worker) |
672 | { |
673 | // qDebug() << job << worker; |
674 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
675 | |
676 | // make sure that we knew about the job! |
677 | Q_ASSERT(jobPriv->m_schedSerial); |
678 | |
679 | ProtoQueue *pq = m_protocols.value(key: jobPriv->m_protocol); |
680 | if (pq) { |
681 | pq->removeJob(job); |
682 | } |
683 | |
684 | if (worker) { |
685 | // If we have internal meta-data, tell existing KIO workers to reload |
686 | // their configuration. |
687 | if (jobPriv->m_internalMetaData.count()) { |
688 | // qDebug() << "Updating KIO workers with new internal metadata information"; |
689 | ProtoQueue *queue = m_protocols.value(key: worker->protocol()); |
690 | if (queue) { |
691 | const QList<Worker *> workers = queue->allWorkers(); |
692 | for (auto *runningWorker : workers) { |
693 | if (worker->host() == runningWorker->host()) { |
694 | worker->setConfig(metaDataFor(protocol: worker->protocol(), url: job->url())); |
695 | /*qDebug() << "Updated configuration of" << worker->protocol() |
696 | << "KIO worker, pid=" << worker->worker_pid();*/ |
697 | } |
698 | } |
699 | } |
700 | } |
701 | worker->setJob(nullptr); |
702 | worker->disconnect(receiver: job); |
703 | } |
704 | jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again |
705 | jobPriv->m_worker = nullptr; |
706 | // Clear the values in the internal metadata container since they have |
707 | // already been taken care of above... |
708 | jobPriv->m_internalMetaData.clear(); |
709 | } |
710 | |
711 | MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QUrl &url) |
712 | { |
713 | const QString host = url.host(); |
714 | MetaData configData = WorkerConfig::self()->configData(protocol, host); |
715 | |
716 | return configData; |
717 | } |
718 | |
719 | void SchedulerPrivate::setupWorker(KIO::Worker *worker, const QUrl &url, const QString &protocol, bool newWorker, const KIO::MetaData *config) |
720 | { |
721 | int port = url.port(); |
722 | if (port == -1) { // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that. |
723 | port = 0; |
724 | } |
725 | const QString host = url.host(); |
726 | const QString user = url.userName(); |
727 | const QString passwd = url.password(); |
728 | |
729 | if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) { |
730 | MetaData configData = metaDataFor(protocol, url); |
731 | if (config) { |
732 | configData += *config; |
733 | } |
734 | |
735 | worker->setConfig(configData); |
736 | worker->setProtocol(url.scheme()); |
737 | worker->setHost(host, port, user, passwd); |
738 | } |
739 | } |
740 | |
741 | void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker) |
742 | { |
743 | // qDebug() << worker; |
744 | Q_ASSERT(worker); |
745 | Q_ASSERT(!worker->isAlive()); |
746 | ProtoQueue *pq = m_protocols.value(key: worker->protocol()); |
747 | if (pq) { |
748 | if (worker->job()) { |
749 | pq->removeJob(job: worker->job()); |
750 | } |
751 | // in case this was a connected worker... |
752 | pq->removeWorker(worker); |
753 | } |
754 | if (worker == m_workerOnHold) { |
755 | m_workerOnHold = nullptr; |
756 | m_urlOnHold.clear(); |
757 | } |
758 | // can't use worker->deref() here because we need to use deleteLater |
759 | worker->aboutToDelete(); |
760 | worker->deleteLater(); |
761 | } |
762 | |
763 | void SchedulerPrivate::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url) |
764 | { |
765 | Worker *worker = jobSWorker(job); |
766 | // qDebug() << job << url << worker; |
767 | worker->disconnect(receiver: job); |
768 | // prevent the fake death of the worker from trying to kill the job again; |
769 | // cf. Worker::hold(const QUrl &url) called in SchedulerPrivate::publishWorkerOnHold(). |
770 | worker->setJob(nullptr); |
771 | SimpleJobPrivate::get(job)->m_worker = nullptr; |
772 | |
773 | if (m_workerOnHold) { |
774 | m_workerOnHold->kill(); |
775 | } |
776 | m_workerOnHold = worker; |
777 | m_urlOnHold = url; |
778 | m_workerOnHold->suspend(); |
779 | } |
780 | |
781 | bool SchedulerPrivate::isWorkerOnHoldFor(const QUrl &url) |
782 | { |
783 | if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) { |
784 | return true; |
785 | } |
786 | |
787 | return false; |
788 | } |
789 | |
790 | Worker *SchedulerPrivate::heldWorkerForJob(SimpleJob *job) |
791 | { |
792 | Worker *worker = nullptr; |
793 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
794 | |
795 | if (m_workerOnHold) { |
796 | // Make sure that the job wants to do a GET or a POST, and with no offset |
797 | const int cmd = jobPriv->m_command; |
798 | bool canJobReuse = (cmd == CMD_GET); |
799 | |
800 | if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(object: job)) { |
801 | canJobReuse = (canJobReuse || cmd == CMD_SPECIAL); |
802 | if (canJobReuse) { |
803 | KIO::MetaData outgoing = tJob->outgoingMetaData(); |
804 | const QString resume = outgoing.value(QStringLiteral("resume" )); |
805 | const QString rangeStart = outgoing.value(QStringLiteral("range-start" )); |
806 | // qDebug() << "Resume metadata is" << resume; |
807 | canJobReuse = (resume.isEmpty() || resume == QLatin1Char('0')) && (rangeStart.isEmpty() || rangeStart == QLatin1Char('0')); |
808 | } |
809 | } |
810 | |
811 | if (job->url() == m_urlOnHold) { |
812 | if (canJobReuse) { |
813 | // qDebug() << "HOLD: Reusing held worker (" << m_workerOnHold << ")"; |
814 | worker = m_workerOnHold; |
815 | } else { |
816 | // qDebug() << "HOLD: Discarding held worker (" << m_workerOnHold << ")"; |
817 | m_workerOnHold->kill(); |
818 | } |
819 | m_workerOnHold = nullptr; |
820 | m_urlOnHold.clear(); |
821 | } |
822 | } |
823 | |
824 | return worker; |
825 | } |
826 | |
827 | void SchedulerPrivate::removeWorkerOnHold() |
828 | { |
829 | // qDebug() << m_workerOnHold; |
830 | if (m_workerOnHold) { |
831 | m_workerOnHold->kill(); |
832 | } |
833 | m_workerOnHold = nullptr; |
834 | m_urlOnHold.clear(); |
835 | } |
836 | |
837 | ProtoQueue *SchedulerPrivate::protoQ(const QString &protocol, const QString &host) |
838 | { |
839 | ProtoQueue *pq = m_protocols.value(key: protocol, defaultValue: nullptr); |
840 | if (!pq) { |
841 | // qDebug() << "creating ProtoQueue instance for" << protocol; |
842 | |
843 | const int maxWorkers = KProtocolInfo::maxWorkers(protocol); |
844 | int maxWorkersPerHost = -1; |
845 | if (!host.isEmpty()) { |
846 | bool ok = false; |
847 | const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral("MaxConnections" )).toInt(ok: &ok); |
848 | if (ok) { |
849 | maxWorkersPerHost = value; |
850 | } |
851 | } |
852 | if (maxWorkersPerHost == -1) { |
853 | maxWorkersPerHost = KProtocolInfo::maxWorkersPerHost(protocol); |
854 | } |
855 | // Never allow maxWorkersPerHost to exceed maxWorkers. |
856 | pq = new ProtoQueue(maxWorkers, qMin(a: maxWorkers, b: maxWorkersPerHost)); |
857 | m_protocols.insert(key: protocol, value: pq); |
858 | } |
859 | return pq; |
860 | } |
861 | |
862 | void SchedulerPrivate::updateInternalMetaData(SimpleJob *job) |
863 | { |
864 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
865 | // Preserve all internal meta-data so they can be sent back to the |
866 | // KIO workers as needed... |
867 | const QUrl jobUrl = job->url(); |
868 | |
869 | const QLatin1String currHostToken("{internal~currenthost}" ); |
870 | const QLatin1String allHostsToken("{internal~allhosts}" ); |
871 | // qDebug() << job << jobPriv->m_internalMetaData; |
872 | QMapIterator<QString, QString> it(jobPriv->m_internalMetaData); |
873 | while (it.hasNext()) { |
874 | it.next(); |
875 | if (it.key().startsWith(s: currHostToken, cs: Qt::CaseInsensitive)) { |
876 | WorkerConfig::self()->setConfigData(protocol: jobUrl.scheme(), host: jobUrl.host(), key: it.key().mid(position: currHostToken.size()), value: it.value()); |
877 | } else if (it.key().startsWith(s: allHostsToken, cs: Qt::CaseInsensitive)) { |
878 | WorkerConfig::self()->setConfigData(protocol: jobUrl.scheme(), host: QString(), key: it.key().mid(position: allHostsToken.size()), value: it.value()); |
879 | } |
880 | } |
881 | } |
882 | |
883 | #include "moc_scheduler.cpp" |
884 | #include "moc_scheduler_p.cpp" |
885 | |