| 1 | /* |
| 2 | This file is part of the KDE libraries |
| 3 | SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org> |
| 4 | SPDX-FileCopyrightText: 2000 Waldo Bastian <bastian@kde.org> |
| 5 | SPDX-FileCopyrightText: 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com> |
| 6 | |
| 7 | SPDX-License-Identifier: LGPL-2.0-only |
| 8 | */ |
| 9 | |
| 10 | #include "scheduler.h" |
| 11 | #include "scheduler_p.h" |
| 12 | |
| 13 | #include "job_p.h" |
| 14 | #include "worker_p.h" |
| 15 | #include "workerconfig.h" |
| 16 | |
| 17 | #include <kprotocolinfo.h> |
| 18 | #include <kprotocolmanager.h> |
| 19 | |
| 20 | #ifdef WITH_QTDBUS |
| 21 | #include <QDBusConnection> |
| 22 | #include <QDBusMessage> |
| 23 | #endif |
| 24 | #include <QHash> |
| 25 | #include <QThread> |
| 26 | #include <QThreadStorage> |
| 27 | |
| 28 | // Workers may be idle for a certain time (3 minutes) before they are killed. |
| 29 | static const int s_idleWorkerLifetime = 3 * 60; |
| 30 | |
| 31 | using namespace KIO; |
| 32 | |
| 33 | static inline Worker *jobSWorker(SimpleJob *job) |
| 34 | { |
| 35 | return SimpleJobPrivate::get(job)->m_worker; |
| 36 | } |
| 37 | |
| 38 | static inline void startJob(SimpleJob *job, Worker *worker) |
| 39 | { |
| 40 | SimpleJobPrivate::get(job)->start(worker); |
| 41 | } |
| 42 | |
| 43 | class KIO::SchedulerPrivate |
| 44 | { |
| 45 | public: |
| 46 | SchedulerPrivate() |
| 47 | : q(new Scheduler()) |
| 48 | { |
| 49 | } |
| 50 | |
| 51 | ~SchedulerPrivate() |
| 52 | { |
| 53 | removeWorkerOnHold(); |
| 54 | delete q; |
| 55 | q = nullptr; |
| 56 | qDeleteAll(c: m_protocols); // ~ProtoQueue will kill and delete all workers |
| 57 | } |
| 58 | |
| 59 | SchedulerPrivate(const SchedulerPrivate &) = delete; |
| 60 | SchedulerPrivate &operator=(const SchedulerPrivate &) = delete; |
| 61 | |
| 62 | Scheduler *q; |
| 63 | |
| 64 | Worker *m_workerOnHold = nullptr; |
| 65 | QUrl m_urlOnHold; |
| 66 | bool m_ignoreConfigReparse = false; |
| 67 | |
| 68 | void doJob(SimpleJob *job); |
| 69 | void cancelJob(SimpleJob *job); |
| 70 | void jobFinished(KIO::SimpleJob *job, KIO::Worker *worker); |
| 71 | void putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url); |
| 72 | void removeWorkerOnHold(); |
| 73 | Worker *heldWorkerForJob(KIO::SimpleJob *job); |
| 74 | bool isWorkerOnHoldFor(const QUrl &url); |
| 75 | void updateInternalMetaData(SimpleJob *job); |
| 76 | |
| 77 | MetaData metaDataFor(const QString &protocol, const QUrl &url); |
| 78 | void setupWorker(KIO::Worker *worker, const QUrl &url, const QString &protocol, bool newWorker, const KIO::MetaData *config = nullptr); |
| 79 | |
| 80 | void slotWorkerDied(KIO::Worker *worker); |
| 81 | |
| 82 | #ifdef WITH_QTDBUS |
| 83 | void slotReparseSlaveConfiguration(const QString &, const QDBusMessage &); |
| 84 | #endif |
| 85 | |
| 86 | ProtoQueue *protoQ(const QString &protocol, const QString &host); |
| 87 | |
| 88 | private: |
| 89 | QHash<QString, ProtoQueue *> m_protocols; |
| 90 | }; |
| 91 | |
| 92 | static QThreadStorage<SchedulerPrivate *> s_storage; |
| 93 | static SchedulerPrivate *schedulerPrivate() |
| 94 | { |
| 95 | if (!s_storage.hasLocalData()) { |
| 96 | s_storage.setLocalData(new SchedulerPrivate); |
| 97 | } |
| 98 | return s_storage.localData(); |
| 99 | } |
| 100 | |
| 101 | Scheduler *Scheduler::self() |
| 102 | { |
| 103 | return schedulerPrivate()->q; |
| 104 | } |
| 105 | |
| 106 | SchedulerPrivate *Scheduler::d_func() |
| 107 | { |
| 108 | return schedulerPrivate(); |
| 109 | } |
| 110 | |
| 111 | // static |
| 112 | Scheduler *scheduler() |
| 113 | { |
| 114 | return schedulerPrivate()->q; |
| 115 | } |
| 116 | |
| 117 | //////////////////////////// |
| 118 | |
| 119 | WorkerManager::WorkerManager() |
| 120 | { |
| 121 | m_grimTimer.setSingleShot(true); |
| 122 | connect(sender: &m_grimTimer, signal: &QTimer::timeout, context: this, slot: &WorkerManager::grimReaper); |
| 123 | } |
| 124 | |
| 125 | WorkerManager::~WorkerManager() |
| 126 | { |
| 127 | grimReaper(); |
| 128 | } |
| 129 | |
| 130 | void WorkerManager::returnWorker(Worker *worker) |
| 131 | { |
| 132 | Q_ASSERT(worker); |
| 133 | worker->setIdle(); |
| 134 | m_idleWorkers.insert(key: worker->host(), value: worker); |
| 135 | scheduleGrimReaper(); |
| 136 | } |
| 137 | |
| 138 | Worker *WorkerManager::takeWorkerForJob(SimpleJob *job) |
| 139 | { |
| 140 | Worker *worker = schedulerPrivate()->heldWorkerForJob(job); |
| 141 | if (worker) { |
| 142 | return worker; |
| 143 | } |
| 144 | |
| 145 | QUrl url = SimpleJobPrivate::get(job)->m_url; |
| 146 | // TODO take port, username and password into account |
| 147 | QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.find(key: url.host()); |
| 148 | if (it == m_idleWorkers.end()) { |
| 149 | it = m_idleWorkers.begin(); |
| 150 | } |
| 151 | if (it == m_idleWorkers.end()) { |
| 152 | return nullptr; |
| 153 | } |
| 154 | worker = it.value(); |
| 155 | m_idleWorkers.erase(it); |
| 156 | return worker; |
| 157 | } |
| 158 | |
| 159 | bool WorkerManager::removeWorker(Worker *worker) |
| 160 | { |
| 161 | // ### performance not so great |
| 162 | QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin(); |
| 163 | for (; it != m_idleWorkers.end(); ++it) { |
| 164 | if (it.value() == worker) { |
| 165 | m_idleWorkers.erase(it); |
| 166 | return true; |
| 167 | } |
| 168 | } |
| 169 | return false; |
| 170 | } |
| 171 | |
| 172 | void WorkerManager::clear() |
| 173 | { |
| 174 | m_idleWorkers.clear(); |
| 175 | } |
| 176 | |
| 177 | QList<Worker *> WorkerManager::allWorkers() const |
| 178 | { |
| 179 | return m_idleWorkers.values(); |
| 180 | } |
| 181 | |
| 182 | void WorkerManager::scheduleGrimReaper() |
| 183 | { |
| 184 | if (!m_grimTimer.isActive()) { |
| 185 | m_grimTimer.start(msec: (s_idleWorkerLifetime / 2) * 1000); |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | // private slot |
| 190 | void WorkerManager::grimReaper() |
| 191 | { |
| 192 | QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin(); |
| 193 | while (it != m_idleWorkers.end()) { |
| 194 | Worker *worker = it.value(); |
| 195 | if (worker->idleTime() >= s_idleWorkerLifetime) { |
| 196 | it = m_idleWorkers.erase(it); |
| 197 | if (worker->job()) { |
| 198 | // qDebug() << "Idle worker" << worker << "still has job" << worker->job(); |
| 199 | } |
| 200 | // avoid invoking slotWorkerDied() because its cleanup services are not needed |
| 201 | worker->kill(); |
| 202 | } else { |
| 203 | ++it; |
| 204 | } |
| 205 | } |
| 206 | if (!m_idleWorkers.isEmpty()) { |
| 207 | scheduleGrimReaper(); |
| 208 | } |
| 209 | } |
| 210 | |
| 211 | int HostQueue::lowestSerial() const |
| 212 | { |
| 213 | QMap<int, SimpleJob *>::ConstIterator first = m_queuedJobs.constBegin(); |
| 214 | if (first != m_queuedJobs.constEnd()) { |
| 215 | return first.key(); |
| 216 | } |
| 217 | return SerialPicker::maxSerial; |
| 218 | } |
| 219 | |
| 220 | void HostQueue::queueJob(SimpleJob *job) |
| 221 | { |
| 222 | const int serial = SimpleJobPrivate::get(job)->m_schedSerial; |
| 223 | Q_ASSERT(serial != 0); |
| 224 | Q_ASSERT(!m_queuedJobs.contains(serial)); |
| 225 | Q_ASSERT(!m_runningJobs.contains(job)); |
| 226 | m_queuedJobs.insert(key: serial, value: job); |
| 227 | } |
| 228 | |
| 229 | SimpleJob *HostQueue::takeFirstInQueue() |
| 230 | { |
| 231 | Q_ASSERT(!m_queuedJobs.isEmpty()); |
| 232 | QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin(); |
| 233 | SimpleJob *job = first.value(); |
| 234 | m_queuedJobs.erase(it: first); |
| 235 | m_runningJobs.insert(value: job); |
| 236 | return job; |
| 237 | } |
| 238 | |
| 239 | bool HostQueue::removeJob(SimpleJob *job) |
| 240 | { |
| 241 | const int serial = SimpleJobPrivate::get(job)->m_schedSerial; |
| 242 | if (m_runningJobs.remove(value: job)) { |
| 243 | Q_ASSERT(!m_queuedJobs.contains(serial)); |
| 244 | return true; |
| 245 | } |
| 246 | if (m_queuedJobs.remove(key: serial)) { |
| 247 | return true; |
| 248 | } |
| 249 | return false; |
| 250 | } |
| 251 | |
| 252 | QList<Worker *> HostQueue::allWorkers() const |
| 253 | { |
| 254 | QList<Worker *> ret; |
| 255 | ret.reserve(asize: m_runningJobs.size()); |
| 256 | for (SimpleJob *job : m_runningJobs) { |
| 257 | Worker *worker = jobSWorker(job); |
| 258 | Q_ASSERT(worker); |
| 259 | ret.append(t: worker); |
| 260 | } |
| 261 | return ret; |
| 262 | } |
| 263 | |
| 264 | static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial) |
| 265 | { |
| 266 | Q_UNUSED(queuesBySerial); |
| 267 | #ifdef SCHEDULER_DEBUG |
| 268 | // a host queue may *never* be in queuesBySerial twice. |
| 269 | QSet<HostQueue *> seen; |
| 270 | auto it = queuesBySerial->cbegin(); |
| 271 | for (; it != queuesBySerial->cend(); ++it) { |
| 272 | Q_ASSERT(!seen.contains(it.value())); |
| 273 | seen.insert(it.value()); |
| 274 | } |
| 275 | #endif |
| 276 | } |
| 277 | |
| 278 | static void verifyRunningJobsCount(const std::unordered_map<QString, HostQueue> *const queues, int runningJobsCount) |
| 279 | { |
| 280 | Q_UNUSED(queues); |
| 281 | Q_UNUSED(runningJobsCount); |
| 282 | #ifdef SCHEDULER_DEBUG |
| 283 | int realRunningJobsCount = 0; |
| 284 | auto it = queues->cbegin(); |
| 285 | for (; it != queues->cend(); ++it) { |
| 286 | realRunningJobsCount += it->second.runningJobsCount(); |
| 287 | } |
| 288 | Q_ASSERT(realRunningJobsCount == runningJobsCount); |
| 289 | |
| 290 | // ...and of course we may never run the same job twice! |
| 291 | QSet<SimpleJob *> seenJobs; |
| 292 | auto it2 = queues->cbegin(); |
| 293 | for (; it2 != queues->cend(); ++it2) { |
| 294 | for (SimpleJob *job : it2->second.runningJobs()) { |
| 295 | Q_ASSERT(!seenJobs.contains(job)); |
| 296 | seenJobs.insert(job); |
| 297 | } |
| 298 | } |
| 299 | #endif |
| 300 | } |
| 301 | |
| 302 | ProtoQueue::ProtoQueue(int maxWorkers, int maxWorkersPerHost) |
| 303 | : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers) |
| 304 | , m_maxConnectionsTotal(qMax(a: maxWorkers, b: maxWorkersPerHost)) |
| 305 | , m_runningJobsCount(0) |
| 306 | |
| 307 | { |
| 308 | /*qDebug() << "m_maxConnectionsTotal:" << m_maxConnectionsTotal |
| 309 | << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;*/ |
| 310 | Q_ASSERT(m_maxConnectionsPerHost >= 1); |
| 311 | Q_ASSERT(maxWorkers >= maxWorkersPerHost); |
| 312 | m_startJobTimer.setSingleShot(true); |
| 313 | connect(sender: &m_startJobTimer, signal: &QTimer::timeout, context: this, slot: &ProtoQueue::startAJob); |
| 314 | } |
| 315 | |
| 316 | ProtoQueue::~ProtoQueue() |
| 317 | { |
| 318 | // Gather list of all workers first |
| 319 | const QList<Worker *> workers = allWorkers(); |
| 320 | // Clear the idle workers in the manager to avoid dangling pointers |
| 321 | m_workerManager.clear(); |
| 322 | for (Worker *worker : workers) { |
| 323 | // kill the worker process and remove the interface in our process |
| 324 | worker->kill(); |
| 325 | } |
| 326 | } |
| 327 | |
| 328 | void ProtoQueue::queueJob(SimpleJob *job) |
| 329 | { |
| 330 | QString hostname = SimpleJobPrivate::get(job)->m_url.host(); |
| 331 | HostQueue &hq = m_queuesByHostname[hostname]; |
| 332 | const int prevLowestSerial = hq.lowestSerial(); |
| 333 | Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); |
| 334 | |
| 335 | // nevert insert a job twice |
| 336 | Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0); |
| 337 | SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next(); |
| 338 | |
| 339 | const bool wasQueueEmpty = hq.isQueueEmpty(); |
| 340 | hq.queueJob(job); |
| 341 | // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too... |
| 342 | // the queue's lowest serial job may have changed, so update the ordered list of queues. |
| 343 | // however, we ignore all jobs that would cause more connections to a host than allowed. |
| 344 | if (prevLowestSerial != hq.lowestSerial()) { |
| 345 | if (hq.runningJobsCount() < m_maxConnectionsPerHost) { |
| 346 | // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs |
| 347 | if (m_queuesBySerial.remove(key: prevLowestSerial) == 0) { |
| 348 | Q_UNUSED(wasQueueEmpty); |
| 349 | Q_ASSERT(wasQueueEmpty); |
| 350 | } |
| 351 | m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq); |
| 352 | } else { |
| 353 | #ifdef SCHEDULER_DEBUG |
| 354 | // ### this assertion may fail if the limits were modified at runtime! |
| 355 | // if the per-host connection limit is already reached the host queue's lowest serial |
| 356 | // should not be queued. |
| 357 | Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial)); |
| 358 | #endif |
| 359 | } |
| 360 | } |
| 361 | // just in case; startAJob() will refuse to start a job if it shouldn't. |
| 362 | m_startJobTimer.start(); |
| 363 | |
| 364 | ensureNoDuplicates(queuesBySerial: &m_queuesBySerial); |
| 365 | } |
| 366 | |
| 367 | void ProtoQueue::removeJob(SimpleJob *job) |
| 368 | { |
| 369 | SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job); |
| 370 | HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()]; |
| 371 | const int prevLowestSerial = hq.lowestSerial(); |
| 372 | const int prevRunningJobs = hq.runningJobsCount(); |
| 373 | |
| 374 | Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); |
| 375 | |
| 376 | if (hq.removeJob(job)) { |
| 377 | if (hq.lowestSerial() != prevLowestSerial) { |
| 378 | // we have dequeued the not yet running job with the lowest serial |
| 379 | Q_ASSERT(!jobPriv->m_worker); |
| 380 | Q_ASSERT(prevRunningJobs == hq.runningJobsCount()); |
| 381 | if (m_queuesBySerial.remove(key: prevLowestSerial) == 0) { |
| 382 | // make sure that the queue was not scheduled for a good reason |
| 383 | Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost); |
| 384 | } |
| 385 | } else { |
| 386 | if (prevRunningJobs != hq.runningJobsCount()) { |
| 387 | // we have dequeued a previously running job |
| 388 | Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount()); |
| 389 | m_runningJobsCount--; |
| 390 | Q_ASSERT(m_runningJobsCount >= 0); |
| 391 | } |
| 392 | } |
| 393 | if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) { |
| 394 | // this may be a no-op, but it's faster than first checking if it's already in. |
| 395 | m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq); |
| 396 | } |
| 397 | |
| 398 | if (hq.isEmpty()) { |
| 399 | // no queued jobs, no running jobs. this destroys hq from above. |
| 400 | m_queuesByHostname.erase(x: jobPriv->m_url.host()); |
| 401 | } |
| 402 | |
| 403 | if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) { |
| 404 | m_workerManager.returnWorker(worker: jobPriv->m_worker); |
| 405 | } |
| 406 | // just in case; startAJob() will refuse to start a job if it shouldn't. |
| 407 | m_startJobTimer.start(); |
| 408 | } |
| 409 | |
| 410 | ensureNoDuplicates(queuesBySerial: &m_queuesBySerial); |
| 411 | } |
| 412 | |
| 413 | Worker *ProtoQueue::createWorker(const QString &protocol, SimpleJob *job, const QUrl &url) |
| 414 | { |
| 415 | int error; |
| 416 | QString errortext; |
| 417 | Worker *worker = Worker::createWorker(protocol, url, error, error_text&: errortext); |
| 418 | if (worker) { |
| 419 | connect(sender: worker, signal: &Worker::workerDied, context: scheduler(), slot: [](KIO::Worker *worker) { |
| 420 | schedulerPrivate()->slotWorkerDied(worker); |
| 421 | }); |
| 422 | } else { |
| 423 | qCWarning(KIO_CORE) << "couldn't create worker:" << errortext; |
| 424 | if (job) { |
| 425 | job->slotError(error, errortext); |
| 426 | } |
| 427 | } |
| 428 | return worker; |
| 429 | } |
| 430 | |
| 431 | bool ProtoQueue::removeWorker(KIO::Worker *worker) |
| 432 | { |
| 433 | const bool removed = m_workerManager.removeWorker(worker); |
| 434 | return removed; |
| 435 | } |
| 436 | |
| 437 | QList<Worker *> ProtoQueue::allWorkers() const |
| 438 | { |
| 439 | QList<Worker *> ret(m_workerManager.allWorkers()); |
| 440 | auto it = m_queuesByHostname.cbegin(); |
| 441 | for (; it != m_queuesByHostname.cend(); ++it) { |
| 442 | ret.append(other: it->second.allWorkers()); |
| 443 | } |
| 444 | |
| 445 | return ret; |
| 446 | } |
| 447 | |
| 448 | // private slot |
| 449 | void ProtoQueue::startAJob() |
| 450 | { |
| 451 | ensureNoDuplicates(queuesBySerial: &m_queuesBySerial); |
| 452 | verifyRunningJobsCount(queues: &m_queuesByHostname, runningJobsCount: m_runningJobsCount); |
| 453 | |
| 454 | #ifdef SCHEDULER_DEBUG |
| 455 | // qDebug() << "m_runningJobsCount:" << m_runningJobsCount; |
| 456 | auto it = m_queuesByHostname.cbegin(); |
| 457 | for (; it != m_queuesByHostname.cend(); ++it) { |
| 458 | const QList<KIO::SimpleJob *> list = it->second.runningJobs(); |
| 459 | for (SimpleJob *job : list) { |
| 460 | // qDebug() << SimpleJobPrivate::get(job)->m_url; |
| 461 | } |
| 462 | } |
| 463 | #endif |
| 464 | if (m_runningJobsCount >= m_maxConnectionsTotal) { |
| 465 | #ifdef SCHEDULER_DEBUG |
| 466 | // qDebug() << "not starting any jobs because maxConnectionsTotal has been reached."; |
| 467 | #endif |
| 468 | return; |
| 469 | } |
| 470 | |
| 471 | QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin(); |
| 472 | if (first != m_queuesBySerial.end()) { |
| 473 | // pick a job and maintain the queue invariant: lower serials first |
| 474 | HostQueue *hq = first.value(); |
| 475 | const int prevLowestSerial = first.key(); |
| 476 | Q_UNUSED(prevLowestSerial); |
| 477 | Q_ASSERT(hq->lowestSerial() == prevLowestSerial); |
| 478 | // the following assertions should hold due to queueJob(), takeFirstInQueue() and |
| 479 | // removeJob() being correct |
| 480 | Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost); |
| 481 | SimpleJob *startingJob = hq->takeFirstInQueue(); |
| 482 | Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost); |
| 483 | Q_ASSERT(hq->lowestSerial() != prevLowestSerial); |
| 484 | |
| 485 | m_queuesBySerial.erase(it: first); |
| 486 | // we've increased hq's runningJobsCount() by calling nexStartingJob() |
| 487 | // so we need to check again. |
| 488 | if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) { |
| 489 | m_queuesBySerial.insert(key: hq->lowestSerial(), value: hq); |
| 490 | } |
| 491 | |
| 492 | // always increase m_runningJobsCount because it's correct if there is a worker and if there |
| 493 | // is no worker, removeJob() will balance the number again. removeJob() would decrease the |
| 494 | // number too much otherwise. |
| 495 | // Note that createWorker() can call slotError() on a job which in turn calls removeJob(), |
| 496 | // so increase the count here already. |
| 497 | m_runningJobsCount++; |
| 498 | |
| 499 | bool isNewWorker = false; |
| 500 | Worker *worker = m_workerManager.takeWorkerForJob(job: startingJob); |
| 501 | SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job: startingJob); |
| 502 | if (!worker) { |
| 503 | isNewWorker = true; |
| 504 | worker = createWorker(protocol: jobPriv->m_protocol, job: startingJob, url: jobPriv->m_url); |
| 505 | } |
| 506 | |
| 507 | if (worker) { |
| 508 | jobPriv->m_worker = worker; |
| 509 | schedulerPrivate()->setupWorker(worker, url: jobPriv->m_url, protocol: jobPriv->m_protocol, newWorker: isNewWorker); |
| 510 | startJob(job: startingJob, worker); |
| 511 | } else { |
| 512 | // dispose of our records about the job and mark the job as unknown |
| 513 | // (to prevent crashes later) |
| 514 | // note that the job's slotError() can have called removeJob() first, so check that |
| 515 | // it's not a ghost job with null serial already. |
| 516 | if (jobPriv->m_schedSerial) { |
| 517 | removeJob(job: startingJob); |
| 518 | jobPriv->m_schedSerial = 0; |
| 519 | } |
| 520 | } |
| 521 | } else { |
| 522 | #ifdef SCHEDULER_DEBUG |
| 523 | // qDebug() << "not starting any jobs because there is no queued job."; |
| 524 | #endif |
| 525 | } |
| 526 | |
| 527 | if (!m_queuesBySerial.isEmpty()) { |
| 528 | m_startJobTimer.start(); |
| 529 | } |
| 530 | } |
| 531 | |
| 532 | Scheduler::Scheduler() |
| 533 | { |
| 534 | setObjectName(QStringLiteral("scheduler" )); |
| 535 | |
| 536 | #ifdef WITH_QTDBUS |
| 537 | const QString dbusPath = QStringLiteral("/KIO/Scheduler" ); |
| 538 | const QString dbusInterface = QStringLiteral("org.kde.KIO.Scheduler" ); |
| 539 | QDBusConnection dbus = QDBusConnection::sessionBus(); |
| 540 | // Not needed, right? We just want to emit two signals. |
| 541 | // dbus.registerObject(dbusPath, this, QDBusConnection::ExportScriptableSlots | |
| 542 | // QDBusConnection::ExportScriptableSignals); |
| 543 | dbus.connect(service: QString(), |
| 544 | path: dbusPath, |
| 545 | interface: dbusInterface, |
| 546 | QStringLiteral("reparseSlaveConfiguration" ), |
| 547 | receiver: this, |
| 548 | SLOT(slotReparseSlaveConfiguration(QString, QDBusMessage))); |
| 549 | #endif |
| 550 | } |
| 551 | |
| 552 | Scheduler::~Scheduler() |
| 553 | { |
| 554 | } |
| 555 | |
| 556 | void Scheduler::doJob(SimpleJob *job) |
| 557 | { |
| 558 | schedulerPrivate()->doJob(job); |
| 559 | } |
| 560 | |
| 561 | // static |
| 562 | void Scheduler::cancelJob(SimpleJob *job) |
| 563 | { |
| 564 | schedulerPrivate()->cancelJob(job); |
| 565 | } |
| 566 | |
| 567 | void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Worker *worker) |
| 568 | { |
| 569 | schedulerPrivate()->jobFinished(job, worker); |
| 570 | } |
| 571 | |
| 572 | void Scheduler::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url) |
| 573 | { |
| 574 | schedulerPrivate()->putWorkerOnHold(job, url); |
| 575 | } |
| 576 | |
| 577 | void Scheduler::removeWorkerOnHold() |
| 578 | { |
| 579 | schedulerPrivate()->removeWorkerOnHold(); |
| 580 | } |
| 581 | |
| 582 | bool Scheduler::isWorkerOnHoldFor(const QUrl &url) |
| 583 | { |
| 584 | return schedulerPrivate()->isWorkerOnHoldFor(url); |
| 585 | } |
| 586 | |
| 587 | void Scheduler::updateInternalMetaData(SimpleJob *job) |
| 588 | { |
| 589 | schedulerPrivate()->updateInternalMetaData(job); |
| 590 | } |
| 591 | |
| 592 | void Scheduler::emitReparseSlaveConfiguration() |
| 593 | { |
| 594 | #ifdef WITH_QTDBUS |
| 595 | // Do it immediately in this process, otherwise we might send a request before reparsing |
| 596 | // (e.g. when changing useragent in the plugin) |
| 597 | schedulerPrivate()->slotReparseSlaveConfiguration(QString(), QDBusMessage()); |
| 598 | #endif |
| 599 | |
| 600 | schedulerPrivate()->m_ignoreConfigReparse = true; |
| 601 | Q_EMIT self()->reparseSlaveConfiguration(QString()); |
| 602 | } |
| 603 | |
| 604 | #ifdef WITH_QTDBUS |
| 605 | void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage &) |
| 606 | { |
| 607 | if (m_ignoreConfigReparse) { |
| 608 | // qDebug() << "Ignoring signal sent by myself"; |
| 609 | m_ignoreConfigReparse = false; |
| 610 | return; |
| 611 | } |
| 612 | |
| 613 | // qDebug() << "proto=" << proto; |
| 614 | KProtocolManager::reparseConfiguration(); |
| 615 | WorkerConfig::self()->reset(); |
| 616 | |
| 617 | QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : m_protocols.constFind(key: proto); |
| 618 | QHash<QString, ProtoQueue *>::ConstIterator endIt = m_protocols.constEnd(); |
| 619 | |
| 620 | // not found? |
| 621 | if (it == endIt) { |
| 622 | return; |
| 623 | } |
| 624 | |
| 625 | if (!proto.isEmpty()) { |
| 626 | endIt = it; |
| 627 | ++endIt; |
| 628 | } |
| 629 | |
| 630 | for (; it != endIt; ++it) { |
| 631 | const QList<KIO::Worker *> list = it.value()->allWorkers(); |
| 632 | for (Worker *worker : list) { |
| 633 | worker->send(cmd: CMD_REPARSECONFIGURATION); |
| 634 | worker->resetHost(); |
| 635 | } |
| 636 | } |
| 637 | } |
| 638 | #endif |
| 639 | |
| 640 | void SchedulerPrivate::doJob(SimpleJob *job) |
| 641 | { |
| 642 | // qDebug() << job; |
| 643 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
| 644 | jobPriv->m_protocol = job->url().scheme(); |
| 645 | |
| 646 | ProtoQueue *proto = protoQ(protocol: jobPriv->m_protocol, host: job->url().host()); |
| 647 | proto->queueJob(job); |
| 648 | } |
| 649 | |
| 650 | void SchedulerPrivate::cancelJob(SimpleJob *job) |
| 651 | { |
| 652 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
| 653 | // this method is called all over the place in job.cpp, so just do this check here to avoid |
| 654 | // much boilerplate in job code. |
| 655 | if (jobPriv->m_schedSerial == 0) { |
| 656 | // qDebug() << "Doing nothing because I don't know job" << job; |
| 657 | return; |
| 658 | } |
| 659 | Worker *worker = jobSWorker(job); |
| 660 | // qDebug() << job << worker; |
| 661 | jobFinished(job, worker); |
| 662 | if (worker) { |
| 663 | ProtoQueue *pq = m_protocols.value(key: jobPriv->m_protocol); |
| 664 | if (pq) { |
| 665 | pq->removeWorker(worker); |
| 666 | } |
| 667 | worker->kill(); // don't use worker after this! |
| 668 | } |
| 669 | } |
| 670 | |
| 671 | void SchedulerPrivate::jobFinished(SimpleJob *job, Worker *worker) |
| 672 | { |
| 673 | // qDebug() << job << worker; |
| 674 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
| 675 | |
| 676 | // make sure that we knew about the job! |
| 677 | Q_ASSERT(jobPriv->m_schedSerial); |
| 678 | |
| 679 | ProtoQueue *pq = m_protocols.value(key: jobPriv->m_protocol); |
| 680 | if (pq) { |
| 681 | pq->removeJob(job); |
| 682 | } |
| 683 | |
| 684 | if (worker) { |
| 685 | // If we have internal meta-data, tell existing KIO workers to reload |
| 686 | // their configuration. |
| 687 | if (jobPriv->m_internalMetaData.count()) { |
| 688 | // qDebug() << "Updating KIO workers with new internal metadata information"; |
| 689 | ProtoQueue *queue = m_protocols.value(key: worker->protocol()); |
| 690 | if (queue) { |
| 691 | const QList<Worker *> workers = queue->allWorkers(); |
| 692 | for (auto *runningWorker : workers) { |
| 693 | if (worker->host() == runningWorker->host()) { |
| 694 | worker->setConfig(metaDataFor(protocol: worker->protocol(), url: job->url())); |
| 695 | /*qDebug() << "Updated configuration of" << worker->protocol() |
| 696 | << "KIO worker, pid=" << worker->worker_pid();*/ |
| 697 | } |
| 698 | } |
| 699 | } |
| 700 | } |
| 701 | worker->setJob(nullptr); |
| 702 | worker->disconnect(receiver: job); |
| 703 | } |
| 704 | jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again |
| 705 | jobPriv->m_worker = nullptr; |
| 706 | // Clear the values in the internal metadata container since they have |
| 707 | // already been taken care of above... |
| 708 | jobPriv->m_internalMetaData.clear(); |
| 709 | } |
| 710 | |
| 711 | MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QUrl &url) |
| 712 | { |
| 713 | const QString host = url.host(); |
| 714 | MetaData configData = WorkerConfig::self()->configData(protocol, host); |
| 715 | |
| 716 | return configData; |
| 717 | } |
| 718 | |
| 719 | void SchedulerPrivate::setupWorker(KIO::Worker *worker, const QUrl &url, const QString &protocol, bool newWorker, const KIO::MetaData *config) |
| 720 | { |
| 721 | int port = url.port(); |
| 722 | if (port == -1) { // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that. |
| 723 | port = 0; |
| 724 | } |
| 725 | const QString host = url.host(); |
| 726 | const QString user = url.userName(); |
| 727 | const QString passwd = url.password(); |
| 728 | |
| 729 | if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) { |
| 730 | MetaData configData = metaDataFor(protocol, url); |
| 731 | if (config) { |
| 732 | configData += *config; |
| 733 | } |
| 734 | |
| 735 | worker->setConfig(configData); |
| 736 | worker->setProtocol(url.scheme()); |
| 737 | worker->setHost(host, port, user, passwd); |
| 738 | } |
| 739 | } |
| 740 | |
| 741 | void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker) |
| 742 | { |
| 743 | // qDebug() << worker; |
| 744 | Q_ASSERT(worker); |
| 745 | Q_ASSERT(!worker->isAlive()); |
| 746 | ProtoQueue *pq = m_protocols.value(key: worker->protocol()); |
| 747 | if (pq) { |
| 748 | if (worker->job()) { |
| 749 | pq->removeJob(job: worker->job()); |
| 750 | } |
| 751 | // in case this was a connected worker... |
| 752 | pq->removeWorker(worker); |
| 753 | } |
| 754 | if (worker == m_workerOnHold) { |
| 755 | m_workerOnHold = nullptr; |
| 756 | m_urlOnHold.clear(); |
| 757 | } |
| 758 | // can't use worker->deref() here because we need to use deleteLater |
| 759 | worker->aboutToDelete(); |
| 760 | worker->deleteLater(); |
| 761 | } |
| 762 | |
| 763 | void SchedulerPrivate::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url) |
| 764 | { |
| 765 | Worker *worker = jobSWorker(job); |
| 766 | // qDebug() << job << url << worker; |
| 767 | worker->disconnect(receiver: job); |
| 768 | // prevent the fake death of the worker from trying to kill the job again; |
| 769 | // cf. Worker::hold(const QUrl &url) called in SchedulerPrivate::publishWorkerOnHold(). |
| 770 | worker->setJob(nullptr); |
| 771 | SimpleJobPrivate::get(job)->m_worker = nullptr; |
| 772 | |
| 773 | if (m_workerOnHold) { |
| 774 | m_workerOnHold->kill(); |
| 775 | } |
| 776 | m_workerOnHold = worker; |
| 777 | m_urlOnHold = url; |
| 778 | m_workerOnHold->suspend(); |
| 779 | } |
| 780 | |
| 781 | bool SchedulerPrivate::isWorkerOnHoldFor(const QUrl &url) |
| 782 | { |
| 783 | if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) { |
| 784 | return true; |
| 785 | } |
| 786 | |
| 787 | return false; |
| 788 | } |
| 789 | |
| 790 | Worker *SchedulerPrivate::heldWorkerForJob(SimpleJob *job) |
| 791 | { |
| 792 | Worker *worker = nullptr; |
| 793 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
| 794 | |
| 795 | if (m_workerOnHold) { |
| 796 | // Make sure that the job wants to do a GET or a POST, and with no offset |
| 797 | const int cmd = jobPriv->m_command; |
| 798 | bool canJobReuse = (cmd == CMD_GET); |
| 799 | |
| 800 | if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(object: job)) { |
| 801 | canJobReuse = (canJobReuse || cmd == CMD_SPECIAL); |
| 802 | if (canJobReuse) { |
| 803 | KIO::MetaData outgoing = tJob->outgoingMetaData(); |
| 804 | const QString resume = outgoing.value(QStringLiteral("resume" )); |
| 805 | const QString rangeStart = outgoing.value(QStringLiteral("range-start" )); |
| 806 | // qDebug() << "Resume metadata is" << resume; |
| 807 | canJobReuse = (resume.isEmpty() || resume == QLatin1Char('0')) && (rangeStart.isEmpty() || rangeStart == QLatin1Char('0')); |
| 808 | } |
| 809 | } |
| 810 | |
| 811 | if (job->url() == m_urlOnHold) { |
| 812 | if (canJobReuse) { |
| 813 | // qDebug() << "HOLD: Reusing held worker (" << m_workerOnHold << ")"; |
| 814 | worker = m_workerOnHold; |
| 815 | } else { |
| 816 | // qDebug() << "HOLD: Discarding held worker (" << m_workerOnHold << ")"; |
| 817 | m_workerOnHold->kill(); |
| 818 | } |
| 819 | m_workerOnHold = nullptr; |
| 820 | m_urlOnHold.clear(); |
| 821 | } |
| 822 | } |
| 823 | |
| 824 | return worker; |
| 825 | } |
| 826 | |
| 827 | void SchedulerPrivate::removeWorkerOnHold() |
| 828 | { |
| 829 | // qDebug() << m_workerOnHold; |
| 830 | if (m_workerOnHold) { |
| 831 | m_workerOnHold->kill(); |
| 832 | } |
| 833 | m_workerOnHold = nullptr; |
| 834 | m_urlOnHold.clear(); |
| 835 | } |
| 836 | |
| 837 | ProtoQueue *SchedulerPrivate::protoQ(const QString &protocol, const QString &host) |
| 838 | { |
| 839 | ProtoQueue *pq = m_protocols.value(key: protocol, defaultValue: nullptr); |
| 840 | if (!pq) { |
| 841 | // qDebug() << "creating ProtoQueue instance for" << protocol; |
| 842 | |
| 843 | const int maxWorkers = KProtocolInfo::maxWorkers(protocol); |
| 844 | int maxWorkersPerHost = -1; |
| 845 | if (!host.isEmpty()) { |
| 846 | bool ok = false; |
| 847 | const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral("MaxConnections" )).toInt(ok: &ok); |
| 848 | if (ok) { |
| 849 | maxWorkersPerHost = value; |
| 850 | } |
| 851 | } |
| 852 | if (maxWorkersPerHost == -1) { |
| 853 | maxWorkersPerHost = KProtocolInfo::maxWorkersPerHost(protocol); |
| 854 | } |
| 855 | // Never allow maxWorkersPerHost to exceed maxWorkers. |
| 856 | pq = new ProtoQueue(maxWorkers, qMin(a: maxWorkers, b: maxWorkersPerHost)); |
| 857 | m_protocols.insert(key: protocol, value: pq); |
| 858 | } |
| 859 | return pq; |
| 860 | } |
| 861 | |
| 862 | void SchedulerPrivate::updateInternalMetaData(SimpleJob *job) |
| 863 | { |
| 864 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
| 865 | // Preserve all internal meta-data so they can be sent back to the |
| 866 | // KIO workers as needed... |
| 867 | const QUrl jobUrl = job->url(); |
| 868 | |
| 869 | const QLatin1String currHostToken("{internal~currenthost}" ); |
| 870 | const QLatin1String allHostsToken("{internal~allhosts}" ); |
| 871 | // qDebug() << job << jobPriv->m_internalMetaData; |
| 872 | QMapIterator<QString, QString> it(jobPriv->m_internalMetaData); |
| 873 | while (it.hasNext()) { |
| 874 | it.next(); |
| 875 | if (it.key().startsWith(s: currHostToken, cs: Qt::CaseInsensitive)) { |
| 876 | WorkerConfig::self()->setConfigData(protocol: jobUrl.scheme(), host: jobUrl.host(), key: it.key().mid(position: currHostToken.size()), value: it.value()); |
| 877 | } else if (it.key().startsWith(s: allHostsToken, cs: Qt::CaseInsensitive)) { |
| 878 | WorkerConfig::self()->setConfigData(protocol: jobUrl.scheme(), host: QString(), key: it.key().mid(position: allHostsToken.size()), value: it.value()); |
| 879 | } |
| 880 | } |
| 881 | } |
| 882 | |
| 883 | #include "moc_scheduler.cpp" |
| 884 | #include "moc_scheduler_p.cpp" |
| 885 | |