| 1 | // Copyright (C) 2017 The Qt Company Ltd. |
| 2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only |
| 3 | |
| 4 | #include "qmqttconnection_p.h" |
| 5 | #include "qmqttconnectionproperties_p.h" |
| 6 | #include "qmqttcontrolpacket_p.h" |
| 7 | #include "qmqttmessage_p.h" |
| 8 | #include "qmqttpublishproperties_p.h" |
| 9 | #include "qmqttsubscription_p.h" |
| 10 | #include "qmqttclient_p.h" |
| 11 | |
| 12 | #include <QtCore/QLoggingCategory> |
| 13 | #include <QtNetwork/QSslSocket> |
| 14 | #include <QtNetwork/QTcpSocket> |
| 15 | |
| 16 | #include <limits> |
| 17 | #include <cstdint> |
| 18 | |
| 19 | QT_BEGIN_NAMESPACE |
| 20 | |
| 21 | Q_LOGGING_CATEGORY(lcMqttConnection, "qt.mqtt.connection" ) |
| 22 | Q_LOGGING_CATEGORY(lcMqttConnectionVerbose, "qt.mqtt.connection.verbose" ); |
| 23 | |
| 24 | template <typename T> |
| 25 | T QMqttConnection::readBufferTyped(qint64 *dataSize) |
| 26 | { |
| 27 | Q_STATIC_ASSERT(std::is_integral<T>::value); |
| 28 | |
| 29 | T result = 0; |
| 30 | if (Q_UNLIKELY(dataSize != nullptr && *dataSize < qint64(sizeof(result)))) { |
| 31 | qCWarning(lcMqttConnection) << "Attempt to read past the data" ; |
| 32 | return result; |
| 33 | } |
| 34 | if (readBuffer(data: reinterpret_cast<char *>(&result), size: sizeof(result)) && dataSize != nullptr) |
| 35 | *dataSize -= sizeof(result); |
| 36 | return qFromBigEndian(result); |
| 37 | } |
| 38 | |
| 39 | template<> |
| 40 | QByteArray QMqttConnection::readBufferTyped(qint64 *dataSize) |
| 41 | { |
| 42 | const quint16 size = readBufferTyped<quint16>(dataSize); |
| 43 | if (Q_UNLIKELY(dataSize != nullptr && *dataSize < qint64(size))) { |
| 44 | qCWarning(lcMqttConnection) << "Attempt to read past the data" ; |
| 45 | return QByteArray(); |
| 46 | } |
| 47 | QByteArray ba(int(size), Qt::Uninitialized); |
| 48 | if (readBuffer(data: ba.data(), size) && dataSize != nullptr) |
| 49 | *dataSize -= size; |
| 50 | return ba; |
| 51 | } |
| 52 | |
| 53 | template<> |
| 54 | QString QMqttConnection::readBufferTyped(qint64 *dataSize) |
| 55 | { |
| 56 | return QString::fromUtf8(ba: readBufferTyped<QByteArray>(dataSize)); |
| 57 | } |
| 58 | |
| 59 | QMqttConnection::QMqttConnection(QObject *parent) : QObject(parent) |
| 60 | { |
| 61 | } |
| 62 | |
| 63 | QMqttConnection::~QMqttConnection() |
| 64 | { |
| 65 | if (m_internalState == BrokerConnected) |
| 66 | sendControlDisconnect(); |
| 67 | |
| 68 | if (m_ownTransport && m_transport) |
| 69 | delete m_transport; |
| 70 | } |
| 71 | |
| 72 | void QMqttConnection::timerEvent(QTimerEvent *event) |
| 73 | { |
| 74 | if (Q_LIKELY(event->timerId() == m_pingTimer.timerId())) { |
| 75 | sendControlPingRequest(); |
| 76 | return; |
| 77 | } |
| 78 | |
| 79 | QObject::timerEvent(event); |
| 80 | } |
| 81 | |
| 82 | void QMqttConnection::setTransport(QIODevice *device, QMqttClient::TransportType transport) |
| 83 | { |
| 84 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << device << " Type:" << transport; |
| 85 | |
| 86 | if (m_transport) { |
| 87 | disconnect(sender: m_transport, signal: &QIODevice::aboutToClose, receiver: this, slot: &QMqttConnection::transportConnectionClosed); |
| 88 | disconnect(sender: m_transport, signal: &QIODevice::readyRead, receiver: this, slot: &QMqttConnection::transportReadyRead); |
| 89 | if (m_ownTransport) |
| 90 | delete m_transport; |
| 91 | } |
| 92 | |
| 93 | m_transport = device; |
| 94 | m_transportType = transport; |
| 95 | m_ownTransport = false; |
| 96 | |
| 97 | connect(sender: m_transport, signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed); |
| 98 | connect(sender: m_transport, signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead); |
| 99 | } |
| 100 | |
| 101 | QIODevice *QMqttConnection::transport() const |
| 102 | { |
| 103 | return m_transport; |
| 104 | } |
| 105 | |
| 106 | bool QMqttConnection::ensureTransport(bool createSecureIfNeeded) |
| 107 | { |
| 108 | Q_UNUSED(createSecureIfNeeded); // QT_NO_SSL |
| 109 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << m_transport; |
| 110 | |
| 111 | if (m_transport) { |
| 112 | if (m_ownTransport) |
| 113 | delete m_transport; |
| 114 | else |
| 115 | return true; |
| 116 | } |
| 117 | |
| 118 | // We are asked to create a transport layer |
| 119 | if (m_clientPrivate->m_hostname.isEmpty() || m_clientPrivate->m_port == 0) { |
| 120 | qCDebug(lcMqttConnection) << "No hostname specified, not able to create a transport layer." ; |
| 121 | return false; |
| 122 | } |
| 123 | auto socket = |
| 124 | #ifndef QT_NO_SSL |
| 125 | createSecureIfNeeded ? new QSslSocket() : |
| 126 | #endif |
| 127 | new QTcpSocket(); |
| 128 | m_transport = socket; |
| 129 | m_ownTransport = true; |
| 130 | m_transportType = |
| 131 | #ifndef QT_NO_SSL |
| 132 | createSecureIfNeeded ? QMqttClient::SecureSocket : |
| 133 | #endif |
| 134 | QMqttClient::AbstractSocket; |
| 135 | |
| 136 | #ifndef QT_NO_SSL |
| 137 | if (QSslSocket *sslSocket = qobject_cast<QSslSocket *>(object: socket)) |
| 138 | QObject::connect(sender: sslSocket, signal: &QSslSocket::encrypted, context: this, slot: &QMqttConnection::transportConnectionEstablished); |
| 139 | else |
| 140 | #endif |
| 141 | connect(sender: socket, signal: &QAbstractSocket::connected, context: this, slot: &QMqttConnection::transportConnectionEstablished); |
| 142 | connect(sender: socket, signal: &QAbstractSocket::disconnected, context: this, slot: &QMqttConnection::transportConnectionClosed); |
| 143 | connect(sender: socket, signal: &QAbstractSocket::errorOccurred, context: this, slot: &QMqttConnection::transportError); |
| 144 | |
| 145 | connect(sender: m_transport, signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed); |
| 146 | connect(sender: m_transport, signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead); |
| 147 | return true; |
| 148 | } |
| 149 | |
| 150 | bool QMqttConnection::ensureTransportOpen(const QString &sslPeerName) |
| 151 | { |
| 152 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << m_transportType; |
| 153 | |
| 154 | if (m_transportType == QMqttClient::IODevice) { |
| 155 | if (m_transport->isOpen()) |
| 156 | return sendControlConnect(); |
| 157 | |
| 158 | if (!m_transport->open(mode: QIODevice::ReadWrite)) { |
| 159 | qCDebug(lcMqttConnection) << "Could not open Transport IO device." ; |
| 160 | m_internalState = BrokerDisconnected; |
| 161 | return false; |
| 162 | } |
| 163 | return sendControlConnect(); |
| 164 | } |
| 165 | |
| 166 | if (m_transportType == QMqttClient::AbstractSocket) { |
| 167 | auto socket = qobject_cast<QTcpSocket *>(object: m_transport); |
| 168 | Q_ASSERT(socket); |
| 169 | if (socket->state() == QAbstractSocket::ConnectedState) |
| 170 | return sendControlConnect(); |
| 171 | |
| 172 | m_internalState = BrokerConnecting; |
| 173 | socket->connectToHost(hostName: m_clientPrivate->m_hostname, port: m_clientPrivate->m_port); |
| 174 | } |
| 175 | #ifndef QT_NO_SSL |
| 176 | else if (m_transportType == QMqttClient::SecureSocket) { |
| 177 | auto socket = qobject_cast<QSslSocket *>(object: m_transport); |
| 178 | Q_ASSERT(socket); |
| 179 | if (socket->state() == QAbstractSocket::ConnectedState) |
| 180 | return sendControlConnect(); |
| 181 | |
| 182 | m_internalState = BrokerConnecting; |
| 183 | if (!m_sslConfiguration.isNull()) |
| 184 | socket->setSslConfiguration(m_sslConfiguration); |
| 185 | socket->connectToHostEncrypted(hostName: m_clientPrivate->m_hostname, port: m_clientPrivate->m_port, sslPeerName); |
| 186 | } |
| 187 | #else |
| 188 | Q_UNUSED(sslPeerName); |
| 189 | #endif |
| 190 | |
| 191 | return true; |
| 192 | } |
| 193 | |
| 194 | bool QMqttConnection::sendControlConnect() |
| 195 | { |
| 196 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
| 197 | |
| 198 | QMqttControlPacket packet(QMqttControlPacket::CONNECT); |
| 199 | |
| 200 | // Variable header |
| 201 | // 3.1.2.1 Protocol Name |
| 202 | // 3.1.2.2 Protocol Level |
| 203 | switch (m_clientPrivate->m_protocolVersion) { |
| 204 | case QMqttClient::MQTT_3_1: |
| 205 | packet.append(data: "MQIsdp" ); |
| 206 | packet.append(value: char(3)); // Version 3.1 |
| 207 | break; |
| 208 | case QMqttClient::MQTT_3_1_1: |
| 209 | packet.append(data: "MQTT" ); |
| 210 | packet.append(value: char(4)); // Version 3.1.1 |
| 211 | break; |
| 212 | case QMqttClient::MQTT_5_0: |
| 213 | packet.append(data: "MQTT" ); |
| 214 | packet.append(value: char(5)); // Version 5.0 |
| 215 | break; |
| 216 | } |
| 217 | |
| 218 | // 3.1.2.3 Connect Flags |
| 219 | quint8 flags = 0; |
| 220 | // Clean session |
| 221 | if (m_clientPrivate->m_cleanSession) |
| 222 | flags |= 1 << 1; |
| 223 | |
| 224 | if (!m_clientPrivate->m_willTopic.isEmpty()) { |
| 225 | flags |= 1 << 2; |
| 226 | if (m_clientPrivate->m_willQoS > 2) { |
| 227 | qCDebug(lcMqttConnection) << "Invalid Will QoS specified." ; |
| 228 | return false; |
| 229 | } |
| 230 | if (m_clientPrivate->m_willQoS == 1) |
| 231 | flags |= 1 << 3; |
| 232 | else if (m_clientPrivate->m_willQoS == 2) |
| 233 | flags |= 1 << 4; |
| 234 | if (m_clientPrivate->m_willRetain) |
| 235 | flags |= 1 << 5; |
| 236 | } |
| 237 | if (m_clientPrivate->m_username.size()) |
| 238 | flags |= 1 << 7; |
| 239 | |
| 240 | if (m_clientPrivate->m_password.size()) |
| 241 | flags |= 1 << 6; |
| 242 | |
| 243 | packet.append(value: char(flags)); |
| 244 | |
| 245 | // 3.1.2.10 Keep Alive |
| 246 | packet.append(value: m_clientPrivate->m_keepAlive); |
| 247 | |
| 248 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 249 | packet.appendRaw(data: writeConnectProperties()); |
| 250 | |
| 251 | // 3.1.3 Payload |
| 252 | // 3.1.3.1 Client Identifier |
| 253 | const QByteArray clientStringArray = m_clientPrivate->m_clientId.toUtf8(); |
| 254 | if (clientStringArray.size()) { |
| 255 | packet.append(data: clientStringArray); |
| 256 | } else { |
| 257 | packet.append(value: char(0)); |
| 258 | packet.append(value: char(0)); |
| 259 | } |
| 260 | |
| 261 | if (!m_clientPrivate->m_willTopic.isEmpty()) { |
| 262 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 263 | packet.appendRaw(data: writeLastWillProperties()); |
| 264 | |
| 265 | packet.append(data: m_clientPrivate->m_willTopic.toUtf8()); |
| 266 | packet.append(data: m_clientPrivate->m_willMessage); |
| 267 | } |
| 268 | |
| 269 | if (m_clientPrivate->m_username.size()) |
| 270 | packet.append(data: m_clientPrivate->m_username.toUtf8()); |
| 271 | |
| 272 | if (m_clientPrivate->m_password.size()) |
| 273 | packet.append(data: m_clientPrivate->m_password.toUtf8()); |
| 274 | |
| 275 | m_internalState = BrokerWaitForConnectAck; |
| 276 | m_missingData = 0; |
| 277 | |
| 278 | if (!writePacketToTransport(p: packet)) { |
| 279 | qCDebug(lcMqttConnection) << "Could not write CONNECT frame to transport." ; |
| 280 | return false; |
| 281 | } |
| 282 | return true; |
| 283 | } |
| 284 | |
| 285 | bool QMqttConnection::sendControlAuthenticate(const QMqttAuthenticationProperties &properties) |
| 286 | { |
| 287 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
| 288 | |
| 289 | QMqttControlPacket packet(QMqttControlPacket::AUTH); |
| 290 | |
| 291 | switch (m_internalState) { |
| 292 | case BrokerDisconnected: |
| 293 | case BrokerConnecting: |
| 294 | case ClientDestruction: |
| 295 | qCDebug(lcMqttConnection) << "Using AUTH while disconnected." ; |
| 296 | return false; |
| 297 | case BrokerWaitForConnectAck: |
| 298 | qCDebug(lcMqttConnection) << "AUTH while connecting, set continuation flag." ; |
| 299 | packet.append(value: char(QMqtt::ReasonCode::ContinueAuthentication)); |
| 300 | break; |
| 301 | case BrokerConnected: |
| 302 | qCDebug(lcMqttConnection) << "AUTH while connected, initiate re-authentication." ; |
| 303 | packet.append(value: char(QMqtt::ReasonCode::ReAuthenticate)); |
| 304 | break; |
| 305 | } |
| 306 | |
| 307 | packet.appendRaw(data: writeAuthenticationProperties(properties)); |
| 308 | |
| 309 | if (!writePacketToTransport(p: packet)) { |
| 310 | qCDebug(lcMqttConnection) << "Could not write AUTH frame to transport." ; |
| 311 | return false; |
| 312 | } |
| 313 | |
| 314 | return true; |
| 315 | } |
| 316 | |
| 317 | qint32 QMqttConnection::sendControlPublish(const QMqttTopicName &topic, |
| 318 | const QByteArray &message, |
| 319 | quint8 qos, |
| 320 | bool retain, |
| 321 | const QMqttPublishProperties &properties) |
| 322 | { |
| 323 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << topic << " Size:" << message.size() << " bytes." |
| 324 | << "QoS:" << qos << " Retain:" << retain; |
| 325 | |
| 326 | if (!topic.isValid()) |
| 327 | return -1; |
| 328 | |
| 329 | quint8 = QMqttControlPacket::PUBLISH; |
| 330 | if (qos == 1) |
| 331 | header |= 0x02; |
| 332 | else if (qos == 2) |
| 333 | header |= 0x04; |
| 334 | |
| 335 | if (retain) |
| 336 | header |= 0x01; |
| 337 | |
| 338 | QSharedPointer<QMqttControlPacket> packet(new QMqttControlPacket(header)); |
| 339 | // topic alias |
| 340 | QMqttPublishProperties publishProperties(properties); |
| 341 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 342 | // 3.3.4 A PUBLISH packet sent from a Client to a Server MUST NOT contain a Subscription Identifier |
| 343 | if (publishProperties.availableProperties() & QMqttPublishProperties::SubscriptionIdentifier) { |
| 344 | qCWarning(lcMqttConnection) << "SubscriptionIdentifier must not be specified for publish." ; |
| 345 | return -1; |
| 346 | } |
| 347 | |
| 348 | const quint16 topicAlias = publishProperties.topicAlias(); |
| 349 | if (topicAlias > 0) { // User specified topic Alias |
| 350 | if (topicAlias > m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()) { |
| 351 | qCDebug(lcMqttConnection) << "TopicAlias publish: overflow." ; |
| 352 | return -1; |
| 353 | } |
| 354 | if (m_publishAliases.at(i: topicAlias - 1) != topic) { |
| 355 | qCDebug(lcMqttConnection) << "TopicAlias publish: Assign:" << topicAlias << ":" << topic; |
| 356 | m_publishAliases[topicAlias - 1] = topic; |
| 357 | packet->append(data: topic.name().toUtf8()); |
| 358 | } else { |
| 359 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: Reuse:" << topicAlias; |
| 360 | packet->append(value: quint16(0)); |
| 361 | } |
| 362 | } else if (m_publishAliases.size() > 0) { // Automatic module alias assignment |
| 363 | int autoAlias = m_publishAliases.indexOf(t: topic); |
| 364 | if (autoAlias != -1) { |
| 365 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: Use auto alias:" << autoAlias; |
| 366 | packet->append(value: quint16(0)); |
| 367 | publishProperties.setTopicAlias(quint16(autoAlias + 1)); |
| 368 | } else { |
| 369 | autoAlias = m_publishAliases.indexOf(t: QMqttTopicName()); |
| 370 | if (autoAlias != -1) { |
| 371 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: auto alias assignment:" << autoAlias; |
| 372 | m_publishAliases[autoAlias] = topic; |
| 373 | publishProperties.setTopicAlias(quint16(autoAlias) + 1); |
| 374 | } else |
| 375 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: alias storage full, using full topic" ; |
| 376 | packet->append(data: topic.name().toUtf8()); |
| 377 | } |
| 378 | } else { |
| 379 | packet->append(data: topic.name().toUtf8()); |
| 380 | } |
| 381 | } else { // ! MQTT_5_0 |
| 382 | packet->append(data: topic.name().toUtf8()); |
| 383 | } |
| 384 | quint16 identifier = 0; |
| 385 | if (qos > 0) { |
| 386 | identifier = unusedPacketIdentifier(); |
| 387 | packet->append(value: identifier); |
| 388 | m_pendingMessages.insert(key: identifier, value: packet); |
| 389 | } |
| 390 | |
| 391 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 392 | packet->appendRaw(data: writePublishProperties(properties: publishProperties)); |
| 393 | |
| 394 | packet->appendRaw(data: message); |
| 395 | |
| 396 | const bool written = writePacketToTransport(p: *packet.data()); |
| 397 | |
| 398 | if (!written && qos > 0) |
| 399 | m_pendingMessages.remove(key: identifier); |
| 400 | return written ? identifier : -1; |
| 401 | } |
| 402 | |
| 403 | bool QMqttConnection::sendControlPublishAcknowledge(quint16 id) |
| 404 | { |
| 405 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
| 406 | QMqttControlPacket packet(QMqttControlPacket::PUBACK); |
| 407 | packet.append(value: id); |
| 408 | return writePacketToTransport(p: packet); |
| 409 | } |
| 410 | |
| 411 | bool QMqttConnection::sendControlPublishRelease(quint16 id) |
| 412 | { |
| 413 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
| 414 | quint8 = QMqttControlPacket::PUBREL; |
| 415 | header |= 0x02; // MQTT-3.6.1-1 |
| 416 | |
| 417 | QMqttControlPacket packet(header); |
| 418 | packet.append(value: id); |
| 419 | return writePacketToTransport(p: packet); |
| 420 | } |
| 421 | |
| 422 | bool QMqttConnection::sendControlPublishReceive(quint16 id) |
| 423 | { |
| 424 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
| 425 | QMqttControlPacket packet(QMqttControlPacket::PUBREC); |
| 426 | packet.append(value: id); |
| 427 | return writePacketToTransport(p: packet); |
| 428 | } |
| 429 | |
| 430 | bool QMqttConnection::sendControlPublishComp(quint16 id) |
| 431 | { |
| 432 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
| 433 | QMqttControlPacket packet(QMqttControlPacket::PUBCOMP); |
| 434 | packet.append(value: id); |
| 435 | return writePacketToTransport(p: packet); |
| 436 | } |
| 437 | |
| 438 | QMqttSubscription *QMqttConnection::sendControlSubscribe(const QMqttTopicFilter &topic, |
| 439 | quint8 qos, |
| 440 | const QMqttSubscriptionProperties &properties) |
| 441 | { |
| 442 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic << " qos:" << qos; |
| 443 | |
| 444 | // Overflow protection |
| 445 | if (Q_UNLIKELY(!topic.isValid())) { |
| 446 | qCWarning(lcMqttConnection) << "Invalid subscription topic filter." ; |
| 447 | return nullptr; |
| 448 | } |
| 449 | |
| 450 | if (Q_UNLIKELY(qos > 2)) { |
| 451 | qCWarning(lcMqttConnection) << "Invalid subscription QoS." ; |
| 452 | return nullptr; |
| 453 | } |
| 454 | |
| 455 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 456 | const QString sharedSubscriptionName = topic.sharedSubscriptionName(); |
| 457 | if (!sharedSubscriptionName.isEmpty()) { |
| 458 | const QMqttTopicFilter filter(topic.filter().section(asep: QLatin1Char('/'), astart: 2)); |
| 459 | auto it = m_activeSubscriptions.constFind(key: filter); |
| 460 | if (it != m_activeSubscriptions.cend() && (*it)->sharedSubscriptionName() == sharedSubscriptionName) |
| 461 | return *it; |
| 462 | } else { |
| 463 | auto it = m_activeSubscriptions.constFind(key: topic); |
| 464 | if (it != m_activeSubscriptions.cend() && !(*it)->isSharedSubscription()) |
| 465 | return *it; |
| 466 | } |
| 467 | } else { |
| 468 | auto it = m_activeSubscriptions.constFind(key: topic); |
| 469 | if (it != m_activeSubscriptions.cend()) |
| 470 | return *it; |
| 471 | } |
| 472 | |
| 473 | // has to have 0010 as bits 3-0, maybe update SUBSCRIBE instead? |
| 474 | // MQTT-3.8.1-1 |
| 475 | const quint8 = QMqttControlPacket::SUBSCRIBE + 0x02; |
| 476 | QMqttControlPacket packet(header); |
| 477 | |
| 478 | // Add Packet Identifier |
| 479 | const quint16 identifier = unusedPacketIdentifier(); |
| 480 | |
| 481 | packet.append(value: identifier); |
| 482 | |
| 483 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 484 | packet.appendRaw(data: writeSubscriptionProperties(properties)); |
| 485 | |
| 486 | packet.append(data: topic.filter().toUtf8()); |
| 487 | char options = char(qos); |
| 488 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && properties.noLocal()) |
| 489 | options |= 1 << 2; |
| 490 | packet.append(value: options); |
| 491 | |
| 492 | auto result = new QMqttSubscription(this); |
| 493 | result->setTopic(topic); |
| 494 | result->setClient(m_clientPrivate->m_client); |
| 495 | result->setQos(qos); |
| 496 | result->setState(QMqttSubscription::SubscriptionPending); |
| 497 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && !topic.sharedSubscriptionName().isEmpty()) { |
| 498 | result->setSharedSubscriptionName(topic.sharedSubscriptionName()); |
| 499 | result->setSharedSubscription(true); |
| 500 | result->setTopic(topic.filter().section(asep: QLatin1Char('/'), astart: 2)); |
| 501 | } |
| 502 | |
| 503 | if (!writePacketToTransport(p: packet)) { |
| 504 | delete result; |
| 505 | return nullptr; |
| 506 | } |
| 507 | |
| 508 | // SUBACK must contain identifier MQTT-3.8.4-2 |
| 509 | m_pendingSubscriptionAck.insert(key: identifier, value: result); |
| 510 | m_activeSubscriptions.insert(key: result->topic(), value: result); |
| 511 | return result; |
| 512 | } |
| 513 | |
| 514 | bool QMqttConnection::sendControlUnsubscribe(const QMqttTopicFilter &topic, const QMqttUnsubscriptionProperties &properties) |
| 515 | { |
| 516 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic; |
| 517 | |
| 518 | // MQTT-3.10.3-2 |
| 519 | if (!topic.isValid()) |
| 520 | return false; |
| 521 | |
| 522 | if (!m_activeSubscriptions.contains(key: topic)) |
| 523 | return false; |
| 524 | |
| 525 | if (m_internalState != QMqttConnection::BrokerConnected) { |
| 526 | m_activeSubscriptions.remove(key: topic); |
| 527 | return true; |
| 528 | } |
| 529 | |
| 530 | // has to have 0010 as bits 3-0, maybe update UNSUBSCRIBE instead? |
| 531 | // MQTT-3.10.1-1 |
| 532 | const quint8 = QMqttControlPacket::UNSUBSCRIBE + 0x02; |
| 533 | QMqttControlPacket packet(header); |
| 534 | |
| 535 | // Add Packet Identifier |
| 536 | const quint16 identifier = unusedPacketIdentifier(); |
| 537 | |
| 538 | packet.append(value: identifier); |
| 539 | |
| 540 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 541 | packet.appendRaw(data: writeUnsubscriptionProperties(properties)); |
| 542 | } |
| 543 | |
| 544 | packet.append(data: topic.filter().toUtf8()); |
| 545 | auto sub = m_activeSubscriptions[topic]; |
| 546 | sub->setState(QMqttSubscription::UnsubscriptionPending); |
| 547 | |
| 548 | if (!writePacketToTransport(p: packet)) |
| 549 | return false; |
| 550 | |
| 551 | // Do not remove from m_activeSubscriptions as there might be QoS1/2 messages to still |
| 552 | // be sent before UNSUBSCRIBE is acknowledged. |
| 553 | m_pendingUnsubscriptions.insert(key: identifier, value: sub); |
| 554 | |
| 555 | return true; |
| 556 | } |
| 557 | |
| 558 | bool QMqttConnection::sendControlPingRequest(bool isAuto) |
| 559 | { |
| 560 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
| 561 | |
| 562 | if (m_internalState != QMqttConnection::BrokerConnected) |
| 563 | return false; |
| 564 | |
| 565 | |
| 566 | if (!isAuto && m_clientPrivate->m_autoKeepAlive) { |
| 567 | qCDebug(lcMqttConnection) << "Requesting a manual ping while autoKeepAlive is enabled " |
| 568 | << "is not allowed." ; |
| 569 | return false; |
| 570 | } |
| 571 | |
| 572 | // 3.1.2.10 If a Client does not receive a PINGRESP packet within a reasonable amount of time |
| 573 | // after it has sent a PINGREQ, it SHOULD close the Network Connection to the Server |
| 574 | if (m_pingTimeout > 1) { |
| 575 | closeConnection(error: QMqttClient::ServerUnavailable); |
| 576 | return false; |
| 577 | } |
| 578 | |
| 579 | const QMqttControlPacket packet(QMqttControlPacket::PINGREQ); |
| 580 | if (!writePacketToTransport(p: packet)) { |
| 581 | qCDebug(lcMqttConnection) << "Failed to write PINGREQ to transport." ; |
| 582 | return false; |
| 583 | } |
| 584 | m_pingTimeout++; |
| 585 | return true; |
| 586 | } |
| 587 | |
| 588 | bool QMqttConnection::sendControlDisconnect() |
| 589 | { |
| 590 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
| 591 | |
| 592 | m_pingTimer.stop(); |
| 593 | m_pingTimeout = 0; |
| 594 | |
| 595 | m_activeSubscriptions.clear(); |
| 596 | |
| 597 | m_receiveAliases.clear(); |
| 598 | m_publishAliases.clear(); |
| 599 | |
| 600 | const QMqttControlPacket packet(QMqttControlPacket::DISCONNECT); |
| 601 | if (!writePacketToTransport(p: packet)) { |
| 602 | qCDebug(lcMqttConnection) << "Failed to write DISCONNECT to transport." ; |
| 603 | return false; |
| 604 | } |
| 605 | if (m_internalState != ClientDestruction) |
| 606 | m_internalState = BrokerDisconnected; |
| 607 | |
| 608 | if (m_transport->waitForBytesWritten(msecs: 30000)) { |
| 609 | // MQTT-3.14.4-1 must disconnect |
| 610 | m_transport->close(); |
| 611 | return true; |
| 612 | } |
| 613 | return false; |
| 614 | } |
| 615 | |
| 616 | void QMqttConnection::setClientPrivate(QMqttClientPrivate *clientPrivate) |
| 617 | { |
| 618 | m_clientPrivate = clientPrivate; |
| 619 | } |
| 620 | |
| 621 | quint16 QMqttConnection::unusedPacketIdentifier() const |
| 622 | { |
| 623 | // MQTT-2.3.1-1 Control Packets MUST contain a non-zero 16-bit Packet Identifier |
| 624 | static quint16 packetIdentifierCounter = 1; |
| 625 | const std::uint16_t u16max = std::numeric_limits<std::uint16_t>::max(); |
| 626 | |
| 627 | // MQTT-2.3.1-2 ...it MUST assign it a currently unused Packet Identifier |
| 628 | const quint16 lastIdentifier = packetIdentifierCounter; |
| 629 | do { |
| 630 | if (packetIdentifierCounter == u16max) |
| 631 | packetIdentifierCounter = 1; |
| 632 | else |
| 633 | packetIdentifierCounter++; |
| 634 | |
| 635 | if (lastIdentifier == packetIdentifierCounter) { |
| 636 | qCDebug(lcMqttConnection) << "Could not generate unique packet identifier." ; |
| 637 | break; |
| 638 | } |
| 639 | } while (m_pendingSubscriptionAck.contains(key: packetIdentifierCounter) |
| 640 | || m_pendingUnsubscriptions.contains(key: packetIdentifierCounter) |
| 641 | || m_pendingMessages.contains(key: packetIdentifierCounter) |
| 642 | || m_pendingReleaseMessages.contains(key: packetIdentifierCounter)); |
| 643 | return packetIdentifierCounter; |
| 644 | } |
| 645 | |
| 646 | void QMqttConnection::cleanSubscriptions() |
| 647 | { |
| 648 | for (auto item : m_pendingSubscriptionAck) |
| 649 | item->setState(QMqttSubscription::Unsubscribed); |
| 650 | m_pendingSubscriptionAck.clear(); |
| 651 | |
| 652 | for (auto item : m_pendingUnsubscriptions) |
| 653 | item->setState(QMqttSubscription::Unsubscribed); |
| 654 | m_pendingUnsubscriptions.clear(); |
| 655 | |
| 656 | for (auto item : m_activeSubscriptions) |
| 657 | item->setState(QMqttSubscription::Unsubscribed); |
| 658 | m_activeSubscriptions.clear(); |
| 659 | } |
| 660 | |
| 661 | void QMqttConnection::transportConnectionEstablished() |
| 662 | { |
| 663 | if (m_internalState != BrokerConnecting) { |
| 664 | qCWarning(lcMqttConnection) << "Connection established at an unexpected time" ; |
| 665 | return; |
| 666 | } |
| 667 | |
| 668 | if (!sendControlConnect()) { |
| 669 | qCDebug(lcMqttConnection) << "Failed to write CONNECT to transport." ; |
| 670 | // ### Who disconnects now? Connection or client? |
| 671 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::TransportInvalid); |
| 672 | } |
| 673 | } |
| 674 | |
| 675 | void QMqttConnection::transportConnectionClosed() |
| 676 | { |
| 677 | m_readBuffer.clear(); |
| 678 | m_readPosition = 0; |
| 679 | m_pingTimer.stop(); |
| 680 | m_pingTimeout = 0; |
| 681 | if (m_internalState == ClientDestruction) |
| 682 | return; |
| 683 | if (m_internalState == BrokerDisconnected) // We manually disconnected |
| 684 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::NoError); |
| 685 | else |
| 686 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::TransportInvalid); |
| 687 | } |
| 688 | |
| 689 | void QMqttConnection::transportReadyRead() |
| 690 | { |
| 691 | qCDebug(lcMqttConnectionVerbose) << Q_FUNC_INFO; |
| 692 | m_readBuffer.append(a: m_transport->readAll()); |
| 693 | processData(); |
| 694 | } |
| 695 | |
| 696 | void QMqttConnection::transportError(QAbstractSocket::SocketError e) |
| 697 | { |
| 698 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << e; |
| 699 | closeConnection(error: QMqttClient::TransportInvalid); |
| 700 | } |
| 701 | |
| 702 | bool QMqttConnection::readBuffer(char *data, quint64 size) |
| 703 | { |
| 704 | if (Q_UNLIKELY(quint64(m_readBuffer.size() - m_readPosition) < size)) { |
| 705 | qCDebug(lcMqttConnection) << "Reaching out of buffer, protocol violation" ; |
| 706 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 707 | return false; |
| 708 | } |
| 709 | memcpy(dest: data, src: m_readBuffer.constData() + m_readPosition, n: size); |
| 710 | m_readPosition += size; |
| 711 | return true; |
| 712 | } |
| 713 | |
| 714 | qint32 QMqttConnection::readVariableByteInteger(qint64 *dataSize) |
| 715 | { |
| 716 | quint32 multiplier = 1; |
| 717 | qint32 msgLength = 0; |
| 718 | quint8 b = 0; |
| 719 | quint8 iteration = 0; |
| 720 | do { |
| 721 | b = readBufferTyped<quint8>(dataSize); |
| 722 | msgLength += (b & 127) * multiplier; |
| 723 | multiplier *= 128; |
| 724 | iteration++; |
| 725 | if (iteration > 4) { |
| 726 | qCDebug(lcMqttConnection) << "Overflow trying to read variable integer." ; |
| 727 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 728 | return -1; |
| 729 | } |
| 730 | } while ((b & 128) != 0); |
| 731 | return msgLength; |
| 732 | } |
| 733 | |
| 734 | void QMqttConnection::closeConnection(QMqttClient::ClientError error) |
| 735 | { |
| 736 | m_readBuffer.clear(); |
| 737 | m_readPosition = 0; |
| 738 | m_pingTimer.stop(); |
| 739 | m_pingTimeout = 0; |
| 740 | m_activeSubscriptions.clear(); |
| 741 | m_internalState = BrokerDisconnected; |
| 742 | m_transport->disconnect(); |
| 743 | m_transport->close(); |
| 744 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: error); |
| 745 | } |
| 746 | |
| 747 | QByteArray QMqttConnection::readBuffer(quint64 size) |
| 748 | { |
| 749 | if (Q_UNLIKELY(quint64(m_readBuffer.size() - m_readPosition) < size)) { |
| 750 | qCDebug(lcMqttConnection) << "Reaching out of buffer, protocol violation" ; |
| 751 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 752 | return QByteArray(); |
| 753 | } |
| 754 | QByteArray res(m_readBuffer.constData() + m_readPosition, int(size)); |
| 755 | m_readPosition += size; |
| 756 | return res; |
| 757 | } |
| 758 | |
| 759 | void QMqttConnection::readAuthProperties(QMqttAuthenticationProperties &properties) |
| 760 | { |
| 761 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
| 762 | m_missingData -= propertyLength; |
| 763 | |
| 764 | QMqttUserProperties userProperties; |
| 765 | while (propertyLength > 0) { |
| 766 | quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 767 | |
| 768 | switch (propertyId) { |
| 769 | case 0x15: { //3.15.2.2.2 Authentication Method |
| 770 | const QString method = readBufferTyped<QString>(dataSize: &propertyLength); |
| 771 | properties.setAuthenticationMethod(method); |
| 772 | break; |
| 773 | } |
| 774 | case 0x16: { // 3.15.2.2.3 Authentication Data |
| 775 | const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength); |
| 776 | properties.setAuthenticationData(data); |
| 777 | break; |
| 778 | } |
| 779 | case 0x1F: { // 3.15.2.2.4 Reason String |
| 780 | const QString reasonString = readBufferTyped<QString>(dataSize: &propertyLength); |
| 781 | properties.setReason(reasonString); |
| 782 | break; |
| 783 | } |
| 784 | case 0x26: { // 3.15.2.2.5 User property |
| 785 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
| 786 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
| 787 | |
| 788 | userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
| 789 | break; |
| 790 | } |
| 791 | default: |
| 792 | qCDebug(lcMqttConnection) << "Unknown property id in AUTH:" << propertyId; |
| 793 | break; |
| 794 | } |
| 795 | } |
| 796 | if (!userProperties.isEmpty()) |
| 797 | properties.setUserProperties(userProperties); |
| 798 | } |
| 799 | |
| 800 | void QMqttConnection::readConnackProperties(QMqttServerConnectionProperties &properties) |
| 801 | { |
| 802 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
| 803 | m_missingData -= propertyLength; |
| 804 | |
| 805 | properties.serverData->valid = true; |
| 806 | |
| 807 | while (propertyLength > 0) { |
| 808 | quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 809 | switch (propertyId) { |
| 810 | case 0x11: { // 3.2.2.3.2 Session Expiry Interval |
| 811 | const quint32 expiryInterval = readBufferTyped<quint32>(dataSize: &propertyLength); |
| 812 | properties.serverData->details |= QMqttServerConnectionProperties::SessionExpiryInterval; |
| 813 | properties.setSessionExpiryInterval(expiryInterval); |
| 814 | break; |
| 815 | } |
| 816 | case 0x21: { // 3.2.2.3.3 Receive Maximum |
| 817 | const quint16 receiveMaximum = readBufferTyped<quint16>(dataSize: &propertyLength); |
| 818 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumReceive; |
| 819 | properties.setMaximumReceive(receiveMaximum); |
| 820 | break; |
| 821 | } |
| 822 | case 0x24: { // 3.2.2.3.4 Maximum QoS Level |
| 823 | const quint8 maxQoS = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 824 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumQoS; |
| 825 | properties.serverData->maximumQoS = maxQoS; |
| 826 | break; |
| 827 | } |
| 828 | case 0x25: { // 3.2.2.3.5 Retain available |
| 829 | const quint8 retainAvailable = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 830 | properties.serverData->details |= QMqttServerConnectionProperties::RetainAvailable; |
| 831 | properties.serverData->retainAvailable = retainAvailable == 1; |
| 832 | break; |
| 833 | } |
| 834 | case 0x27: { // 3.2.2.3.6 Maximum packet size |
| 835 | const quint32 maxPacketSize = readBufferTyped<quint32>(dataSize: &propertyLength); |
| 836 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumPacketSize; |
| 837 | properties.setMaximumPacketSize(maxPacketSize); |
| 838 | break; |
| 839 | } |
| 840 | case 0x12: { // 3.2.2.3.7 Assigned clientId |
| 841 | const QString assignedClientId = readBufferTyped<QString>(dataSize: &propertyLength); |
| 842 | properties.serverData->details |= QMqttServerConnectionProperties::AssignedClientId; |
| 843 | m_clientPrivate->setClientId(assignedClientId); |
| 844 | break; |
| 845 | } |
| 846 | case 0x22: { // 3.2.2.3.8 Topic Alias Maximum |
| 847 | const quint16 topicAliasMaximum = readBufferTyped<quint16>(dataSize: &propertyLength); |
| 848 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumTopicAlias; |
| 849 | properties.setMaximumTopicAlias(topicAliasMaximum); |
| 850 | break; |
| 851 | } |
| 852 | case 0x1F: { // 3.2.2.3.9 Reason String |
| 853 | const QString reasonString = readBufferTyped<QString>(dataSize: &propertyLength); |
| 854 | properties.serverData->details |= QMqttServerConnectionProperties::ReasonString; |
| 855 | properties.serverData->reasonString = reasonString; |
| 856 | break; |
| 857 | } |
| 858 | case 0x26: { // 3.2.2.3.10 User property |
| 859 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
| 860 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
| 861 | |
| 862 | properties.serverData->details |= QMqttServerConnectionProperties::UserProperty; |
| 863 | properties.data->userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
| 864 | break; |
| 865 | } |
| 866 | case 0x28: { // 3.2.2.3.11 Wildcard subscriptions available |
| 867 | const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 868 | properties.serverData->details |= QMqttServerConnectionProperties::WildCardSupported; |
| 869 | properties.serverData->wildcardSupported = available == 1; |
| 870 | break; |
| 871 | } |
| 872 | case 0x29: { // 3.2.2.3.12 Subscription identifiers available |
| 873 | const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 874 | properties.serverData->details |= QMqttServerConnectionProperties::SubscriptionIdentifierSupport; |
| 875 | properties.serverData->subscriptionIdentifierSupported = available == 1; |
| 876 | break; |
| 877 | } |
| 878 | case 0x2A: { // 3.2.2.3.13 Shared subscriptions available |
| 879 | const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 880 | properties.serverData->details |= QMqttServerConnectionProperties::SharedSubscriptionSupport; |
| 881 | properties.serverData->sharedSubscriptionSupported = available == 1; |
| 882 | break; |
| 883 | } |
| 884 | case 0x13: { // 3.2.2.3.14 Server Keep Alive |
| 885 | const quint16 serverKeepAlive = readBufferTyped<quint16>(dataSize: &propertyLength); |
| 886 | properties.serverData->details |= QMqttServerConnectionProperties::ServerKeepAlive; |
| 887 | m_clientPrivate->m_client->setKeepAlive(serverKeepAlive); |
| 888 | break; |
| 889 | } |
| 890 | case 0x1A: { // 3.2.2.3.15 Response information |
| 891 | const QString responseInfo = readBufferTyped<QString>(dataSize: &propertyLength); |
| 892 | properties.serverData->details |= QMqttServerConnectionProperties::ResponseInformation; |
| 893 | properties.serverData->responseInformation = responseInfo; |
| 894 | break; |
| 895 | } |
| 896 | case 0x1C: { // 3.2.2.3.16 Server reference |
| 897 | const QString serverReference = readBufferTyped<QString>(dataSize: &propertyLength); |
| 898 | properties.serverData->details |= QMqttServerConnectionProperties::ServerReference; |
| 899 | properties.serverData->serverReference = serverReference; |
| 900 | break; |
| 901 | } |
| 902 | case 0x15: { // 3.2.2.3.17 Authentication method |
| 903 | const QString method = readBufferTyped<QString>(dataSize: &propertyLength); |
| 904 | properties.serverData->details |= QMqttServerConnectionProperties::AuthenticationMethod; |
| 905 | properties.data->authenticationMethod = method; |
| 906 | break; |
| 907 | } |
| 908 | case 0x16: { // 3.2.2.3.18 Authentication data |
| 909 | const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength); |
| 910 | properties.serverData->details |= QMqttServerConnectionProperties::AuthenticationData; |
| 911 | properties.data->authenticationData = data; |
| 912 | break; |
| 913 | } |
| 914 | default: |
| 915 | qCDebug(lcMqttConnection) << "Unknown property id in CONNACK:" << int(propertyId); |
| 916 | break; |
| 917 | } |
| 918 | } |
| 919 | } |
| 920 | |
| 921 | void QMqttConnection::readMessageStatusProperties(QMqttMessageStatusProperties &properties) |
| 922 | { |
| 923 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
| 924 | m_missingData -= propertyLength; |
| 925 | |
| 926 | while (propertyLength > 0) { |
| 927 | const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 928 | switch (propertyId) { |
| 929 | case 0x1f: { // 3.4.2.2.2 Reason String |
| 930 | const QString content = readBufferTyped<QString>(dataSize: &propertyLength); |
| 931 | properties.data->reasonString = content; |
| 932 | break; |
| 933 | } |
| 934 | case 0x26: { // 3.4.2.2.3 User Properites |
| 935 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
| 936 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
| 937 | properties.data->userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
| 938 | break; |
| 939 | } |
| 940 | default: |
| 941 | qCDebug(lcMqttConnection) << "Unknown subscription property received." ; |
| 942 | break; |
| 943 | } |
| 944 | } |
| 945 | } |
| 946 | |
| 947 | void QMqttConnection::readPublishProperties(QMqttPublishProperties &properties) |
| 948 | { |
| 949 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
| 950 | m_missingData -= propertyLength; |
| 951 | |
| 952 | QMqttUserProperties userProperties; |
| 953 | QList<quint32> subscriptionIds; |
| 954 | |
| 955 | while (propertyLength > 0) { |
| 956 | const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 957 | switch (propertyId) { |
| 958 | case 0x01: { // 3.3.2.3.2 Payload Format Indicator |
| 959 | const quint8 format = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 960 | if (format == 1) |
| 961 | properties.setPayloadFormatIndicator(QMqtt::PayloadFormatIndicator::UTF8Encoded); |
| 962 | break; |
| 963 | } |
| 964 | case 0x02: { // 3.3.2.3.3 Message Expiry Interval |
| 965 | const quint32 interval = readBufferTyped<quint32>(dataSize: &propertyLength); |
| 966 | properties.setMessageExpiryInterval(interval); |
| 967 | break; |
| 968 | } |
| 969 | case 0x23: { // 3.3.2.3.4 Topic alias |
| 970 | const quint16 alias = readBufferTyped<quint16>(dataSize: &propertyLength); |
| 971 | properties.setTopicAlias(alias); |
| 972 | break; |
| 973 | } |
| 974 | case 0x08: { // 3.3.2.3.5 Response Topic |
| 975 | const QString responseTopic = readBufferTyped<QString>(dataSize: &propertyLength); |
| 976 | properties.setResponseTopic(responseTopic); |
| 977 | break; |
| 978 | } |
| 979 | case 0x09: { // 3.3.2.3.6 Correlation Data |
| 980 | const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength); |
| 981 | properties.setCorrelationData(data); |
| 982 | break; |
| 983 | } |
| 984 | case 0x26: { // 3.3.2.3.7 User property |
| 985 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
| 986 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
| 987 | userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
| 988 | break; |
| 989 | } |
| 990 | case 0x0b: { // 3.3.2.3.8 Subscription Identifier |
| 991 | qint32 id = readVariableByteInteger(dataSize: &propertyLength); |
| 992 | if (id < 0) |
| 993 | return; // readVariableByteInteger closes connection |
| 994 | subscriptionIds.append(t: quint32(id)); |
| 995 | break; |
| 996 | } |
| 997 | case 0x03: { // 3.3.2.3.9 Content Type |
| 998 | const QString content = readBufferTyped<QString>(dataSize: &propertyLength); |
| 999 | properties.setContentType(content); |
| 1000 | break; |
| 1001 | } |
| 1002 | default: |
| 1003 | qCDebug(lcMqttConnection) << "Unknown publish property received." ; |
| 1004 | break; |
| 1005 | } |
| 1006 | } |
| 1007 | if (!userProperties.isEmpty()) |
| 1008 | properties.setUserProperties(userProperties); |
| 1009 | |
| 1010 | if (!subscriptionIds.isEmpty()) |
| 1011 | properties.setSubscriptionIdentifiers(subscriptionIds); |
| 1012 | } |
| 1013 | |
| 1014 | void QMqttConnection::readSubscriptionProperties(QMqttSubscription *sub) |
| 1015 | { |
| 1016 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
| 1017 | m_missingData -= propertyLength; |
| 1018 | |
| 1019 | while (propertyLength > 0) { |
| 1020 | const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 1021 | switch (propertyId) { |
| 1022 | case 0x1f: { // 3.9.2.1.2 Reason String |
| 1023 | const QString content = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1024 | sub->d_func()->m_reasonString = content; |
| 1025 | break; |
| 1026 | } |
| 1027 | case 0x26: { // 3.9.2.1.3 |
| 1028 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1029 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1030 | |
| 1031 | sub->d_func()->m_userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
| 1032 | break; |
| 1033 | } |
| 1034 | default: |
| 1035 | qCDebug(lcMqttConnection) << "Unknown subscription property received." ; |
| 1036 | break; |
| 1037 | } |
| 1038 | } |
| 1039 | } |
| 1040 | |
| 1041 | QByteArray QMqttConnection::writeConnectProperties() |
| 1042 | { |
| 1043 | QMqttControlPacket properties; |
| 1044 | |
| 1045 | // According to MQTT5 3.1.2.11 default values do not need to be included in the |
| 1046 | // connect statement. |
| 1047 | |
| 1048 | // 3.1.2.11.2 |
| 1049 | if (m_clientPrivate->m_connectionProperties.sessionExpiryInterval() != 0) { |
| 1050 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify sessionExpiryInterval" ; |
| 1051 | properties.append(value: char(0x11)); |
| 1052 | properties.append(value: m_clientPrivate->m_connectionProperties.sessionExpiryInterval()); |
| 1053 | } |
| 1054 | |
| 1055 | // 3.1.2.11.3 |
| 1056 | if (m_clientPrivate->m_connectionProperties.maximumReceive() != 65535) { |
| 1057 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumReceive" ; |
| 1058 | properties.append(value: char(0x21)); |
| 1059 | properties.append(value: m_clientPrivate->m_connectionProperties.maximumReceive()); |
| 1060 | } |
| 1061 | |
| 1062 | // 3.1.2.11.4 |
| 1063 | if (m_clientPrivate->m_connectionProperties.maximumPacketSize() != std::numeric_limits<quint32>::max()) { |
| 1064 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumPacketSize" ; |
| 1065 | properties.append(value: char(0x27)); |
| 1066 | properties.append(value: m_clientPrivate->m_connectionProperties.maximumPacketSize()); |
| 1067 | } |
| 1068 | |
| 1069 | // 3.1.2.11.5 |
| 1070 | if (m_clientPrivate->m_connectionProperties.maximumTopicAlias() != 0) { |
| 1071 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumTopicAlias" ; |
| 1072 | properties.append(value: char(0x22)); |
| 1073 | properties.append(value: m_clientPrivate->m_connectionProperties.maximumTopicAlias()); |
| 1074 | } |
| 1075 | |
| 1076 | // 3.1.2.11.6 |
| 1077 | if (m_clientPrivate->m_connectionProperties.requestResponseInformation()) { |
| 1078 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify requestResponseInformation" ; |
| 1079 | properties.append(value: char(0x19)); |
| 1080 | properties.append(value: char(1)); |
| 1081 | } |
| 1082 | |
| 1083 | // 3.1.2.11.7 |
| 1084 | if (!m_clientPrivate->m_connectionProperties.requestProblemInformation()) { |
| 1085 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify requestProblemInformation" ; |
| 1086 | properties.append(value: char(0x17)); |
| 1087 | properties.append(value: char(0)); |
| 1088 | } |
| 1089 | |
| 1090 | // 3.1.2.11.8 Add User properties |
| 1091 | auto userProperties = m_clientPrivate->m_connectionProperties.userProperties(); |
| 1092 | if (!userProperties.isEmpty()) { |
| 1093 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify user properties" ; |
| 1094 | for (const auto &prop : userProperties) { |
| 1095 | properties.append(value: char(0x26)); |
| 1096 | properties.append(data: prop.name().toUtf8()); |
| 1097 | properties.append(data: prop.value().toUtf8()); |
| 1098 | } |
| 1099 | } |
| 1100 | |
| 1101 | // 3.1.2.11.9 Add Authentication |
| 1102 | const QString authenticationMethod = m_clientPrivate->m_connectionProperties.authenticationMethod(); |
| 1103 | if (!authenticationMethod.isEmpty()) { |
| 1104 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify AuthenticationMethod:" ; |
| 1105 | qCDebug(lcMqttConnectionVerbose) << " " << authenticationMethod; |
| 1106 | properties.append(value: char(0x15)); |
| 1107 | properties.append(data: authenticationMethod.toUtf8()); |
| 1108 | // 3.1.2.11.10 |
| 1109 | const QByteArray authenticationData = m_clientPrivate->m_connectionProperties.authenticationData(); |
| 1110 | if (!authenticationData.isEmpty()) { |
| 1111 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: Authentication Data:" ; |
| 1112 | qCDebug(lcMqttConnectionVerbose) << " " << authenticationData; |
| 1113 | properties.append(value: char(0x16)); |
| 1114 | properties.append(data: authenticationData); |
| 1115 | } |
| 1116 | } |
| 1117 | |
| 1118 | return properties.serializePayload(); |
| 1119 | } |
| 1120 | |
| 1121 | QByteArray QMqttConnection::writeLastWillProperties() const |
| 1122 | { |
| 1123 | QMqttControlPacket properties; |
| 1124 | const QMqttLastWillProperties &lastWillProperties = m_clientPrivate->m_lastWillProperties; |
| 1125 | // Will Delay interval 3.1.3.2.2 |
| 1126 | if (lastWillProperties.willDelayInterval() > 0) { |
| 1127 | const quint32 delay = lastWillProperties.willDelayInterval(); |
| 1128 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: specify will delay interval:" |
| 1129 | << delay; |
| 1130 | properties.append(value: char(0x18)); |
| 1131 | properties.append(value: delay); |
| 1132 | } |
| 1133 | |
| 1134 | // Payload Format Indicator 3.1.3.2.3 |
| 1135 | if (lastWillProperties.payloadFormatIndicator() != QMqtt::PayloadFormatIndicator::Unspecified) { |
| 1136 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: payload format indicator specified" ; |
| 1137 | properties.append(value: char(0x01)); |
| 1138 | properties.append(value: char(0x01)); // UTF8 |
| 1139 | } |
| 1140 | |
| 1141 | // Message Expiry Interval 3.1.3.2.4 |
| 1142 | if (lastWillProperties.messageExpiryInterval() > 0) { |
| 1143 | const quint32 interval = lastWillProperties.messageExpiryInterval(); |
| 1144 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Message Expiry interval:" |
| 1145 | << interval; |
| 1146 | properties.append(value: char(0x02)); |
| 1147 | properties.append(value: interval); |
| 1148 | } |
| 1149 | |
| 1150 | // Content Type 3.1.3.2.5 |
| 1151 | if (!lastWillProperties.contentType().isEmpty()) { |
| 1152 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Content Type:" |
| 1153 | << lastWillProperties.contentType(); |
| 1154 | properties.append(value: char(0x03)); |
| 1155 | properties.append(data: lastWillProperties.contentType().toUtf8()); |
| 1156 | } |
| 1157 | |
| 1158 | // Response Topic 3.1.3.2.6 |
| 1159 | if (!lastWillProperties.responseTopic().isEmpty()) { |
| 1160 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Response Topic:" |
| 1161 | << lastWillProperties.responseTopic(); |
| 1162 | properties.append(value: char(0x08)); |
| 1163 | properties.append(data: lastWillProperties.responseTopic().toUtf8()); |
| 1164 | } |
| 1165 | |
| 1166 | // Correlation Data 3.1.3.2.7 |
| 1167 | if (!lastWillProperties.correlationData().isEmpty()) { |
| 1168 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Correlation Data:" |
| 1169 | << lastWillProperties.correlationData(); |
| 1170 | properties.append(value: char(0x09)); |
| 1171 | properties.append(data: lastWillProperties.correlationData()); |
| 1172 | } |
| 1173 | |
| 1174 | // User Properties 3.1.3.2.8 |
| 1175 | if (!lastWillProperties.userProperties().isEmpty()) { |
| 1176 | auto userProperties = lastWillProperties.userProperties(); |
| 1177 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: specify user properties" ; |
| 1178 | for (const auto &prop : userProperties) { |
| 1179 | properties.append(value: char(0x26)); |
| 1180 | properties.append(data: prop.name().toUtf8()); |
| 1181 | properties.append(data: prop.value().toUtf8()); |
| 1182 | } |
| 1183 | } |
| 1184 | |
| 1185 | return properties.serializePayload(); |
| 1186 | } |
| 1187 | |
| 1188 | QByteArray QMqttConnection::writePublishProperties(const QMqttPublishProperties &properties) |
| 1189 | { |
| 1190 | QMqttControlPacket packet; |
| 1191 | |
| 1192 | // 3.3.2.3.2 Payload Indicator |
| 1193 | if (properties.availableProperties() & QMqttPublishProperties::PayloadFormatIndicator && |
| 1194 | properties.payloadFormatIndicator() != QMqtt::PayloadFormatIndicator::Unspecified) { |
| 1195 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Payload Indicator:" |
| 1196 | << (properties.payloadFormatIndicator() == QMqtt::PayloadFormatIndicator::UTF8Encoded ? 1 : 0); |
| 1197 | packet.append(value: char(0x01)); |
| 1198 | switch (properties.payloadFormatIndicator()) { |
| 1199 | case QMqtt::PayloadFormatIndicator::UTF8Encoded: |
| 1200 | packet.append(value: char(0x01)); |
| 1201 | break; |
| 1202 | default: |
| 1203 | qCDebug(lcMqttConnection) << "Unknown payload indicator." ; |
| 1204 | break; |
| 1205 | } |
| 1206 | } |
| 1207 | |
| 1208 | // 3.3.2.3.3 Message Expiry |
| 1209 | if (properties.availableProperties() & QMqttPublishProperties::MessageExpiryInterval && |
| 1210 | properties.messageExpiryInterval() > 0) { |
| 1211 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Message Expiry :" |
| 1212 | << properties.messageExpiryInterval(); |
| 1213 | packet.append(value: char(0x02)); |
| 1214 | packet.append(value: properties.messageExpiryInterval()); |
| 1215 | } |
| 1216 | |
| 1217 | // 3.3.2.3.4 Topic alias |
| 1218 | if (properties.availableProperties() & QMqttPublishProperties::TopicAlias && |
| 1219 | properties.topicAlias() > 0) { |
| 1220 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Topic Alias :" |
| 1221 | << properties.topicAlias(); |
| 1222 | if (m_clientPrivate->m_serverConnectionProperties.availableProperties() & QMqttServerConnectionProperties::MaximumTopicAlias |
| 1223 | && properties.topicAlias() > m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()) { |
| 1224 | qCDebug(lcMqttConnection) << "Invalid topic alias specified: " << properties.topicAlias() |
| 1225 | << " Maximum by server is:" |
| 1226 | << m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias(); |
| 1227 | |
| 1228 | } else { |
| 1229 | packet.append(value: char(0x23)); |
| 1230 | packet.append(value: properties.topicAlias()); |
| 1231 | } |
| 1232 | } |
| 1233 | |
| 1234 | // 3.3.2.3.5 Response Topic |
| 1235 | if (properties.availableProperties() & QMqttPublishProperties::ResponseTopic && |
| 1236 | !properties.responseTopic().isEmpty()) { |
| 1237 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Response Topic :" |
| 1238 | << properties.responseTopic(); |
| 1239 | packet.append(value: char(0x08)); |
| 1240 | packet.append(data: properties.responseTopic().toUtf8()); |
| 1241 | } |
| 1242 | |
| 1243 | // 3.3.2.3.6 Correlation Data |
| 1244 | if (properties.availableProperties() & QMqttPublishProperties::CorrelationData && |
| 1245 | !properties.correlationData().isEmpty()) { |
| 1246 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Correlation Data :" |
| 1247 | << properties.correlationData(); |
| 1248 | packet.append(value: char(0x09)); |
| 1249 | packet.append(data: properties.correlationData()); |
| 1250 | } |
| 1251 | |
| 1252 | // 3.3.2.3.7 User Property |
| 1253 | if (properties.availableProperties() & QMqttPublishProperties::UserProperty) { |
| 1254 | auto userProperties = properties.userProperties(); |
| 1255 | if (!userProperties.isEmpty()) { |
| 1256 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: specify user properties" ; |
| 1257 | for (const auto &prop : userProperties) { |
| 1258 | packet.append(value: char(0x26)); |
| 1259 | packet.append(data: prop.name().toUtf8()); |
| 1260 | packet.append(data: prop.value().toUtf8()); |
| 1261 | } |
| 1262 | } |
| 1263 | } |
| 1264 | |
| 1265 | // 3.3.2.3.8 Subscription Identifier |
| 1266 | if (properties.availableProperties() & QMqttPublishProperties::SubscriptionIdentifier) { |
| 1267 | for (auto id : properties.subscriptionIdentifiers()) { |
| 1268 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Subscription ID:" << id; |
| 1269 | packet.append(value: char(0x0b)); |
| 1270 | packet.appendRawVariableInteger(value: id); |
| 1271 | } |
| 1272 | } |
| 1273 | |
| 1274 | // 3.3.2.3.9 Content Type |
| 1275 | if (properties.availableProperties() & QMqttPublishProperties::ContentType && |
| 1276 | !properties.contentType().isEmpty()) { |
| 1277 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Content Type :" |
| 1278 | << properties.contentType(); |
| 1279 | packet.append(value: char(0x03)); |
| 1280 | packet.append(data: properties.contentType().toUtf8()); |
| 1281 | } |
| 1282 | |
| 1283 | return packet.serializePayload(); |
| 1284 | } |
| 1285 | |
| 1286 | QByteArray QMqttConnection::writeSubscriptionProperties(const QMqttSubscriptionProperties &properties) |
| 1287 | { |
| 1288 | QMqttControlPacket packet; |
| 1289 | |
| 1290 | // 3.8.2.1.2 Subscription Identifier |
| 1291 | if (properties.subscriptionIdentifier() > 0) { |
| 1292 | qCDebug(lcMqttConnectionVerbose) << "Subscription Properties: Subscription Identifier" ; |
| 1293 | packet.append(value: char(0x0b)); |
| 1294 | packet.appendRawVariableInteger(value: properties.subscriptionIdentifier()); |
| 1295 | } |
| 1296 | |
| 1297 | // 3.8.2.1.3 User Property |
| 1298 | auto userProperties = properties.userProperties(); |
| 1299 | if (!userProperties.isEmpty()) { |
| 1300 | qCDebug(lcMqttConnectionVerbose) << "Subscription Properties: specify user properties" ; |
| 1301 | for (const auto &prop : userProperties) { |
| 1302 | packet.append(value: char(0x26)); |
| 1303 | packet.append(data: prop.name().toUtf8()); |
| 1304 | packet.append(data: prop.value().toUtf8()); |
| 1305 | } |
| 1306 | } |
| 1307 | |
| 1308 | return packet.serializePayload(); |
| 1309 | } |
| 1310 | |
| 1311 | QByteArray QMqttConnection::writeUnsubscriptionProperties(const QMqttUnsubscriptionProperties &properties) |
| 1312 | { |
| 1313 | QMqttControlPacket packet; |
| 1314 | |
| 1315 | // 3.10.2.1.2 |
| 1316 | auto userProperties = properties.userProperties(); |
| 1317 | if (!userProperties.isEmpty()) { |
| 1318 | qCDebug(lcMqttConnectionVerbose) << "Unsubscription Properties: specify user properties" ; |
| 1319 | for (const auto &prop : userProperties) { |
| 1320 | packet.append(value: char(0x26)); |
| 1321 | packet.append(data: prop.name().toUtf8()); |
| 1322 | packet.append(data: prop.value().toUtf8()); |
| 1323 | } |
| 1324 | } |
| 1325 | |
| 1326 | return packet.serializePayload(); |
| 1327 | } |
| 1328 | |
| 1329 | QByteArray QMqttConnection::writeAuthenticationProperties(const QMqttAuthenticationProperties &properties) |
| 1330 | { |
| 1331 | QMqttControlPacket packet; |
| 1332 | |
| 1333 | // 3.15.2.2.2 |
| 1334 | if (!properties.authenticationMethod().isEmpty()) { |
| 1335 | packet.append(value: char(0x15)); |
| 1336 | packet.append(data: properties.authenticationMethod().toUtf8()); |
| 1337 | } |
| 1338 | // 3.15.2.2.3 |
| 1339 | if (!properties.authenticationData().isEmpty()) { |
| 1340 | packet.append(value: char(0x16)); |
| 1341 | packet.append(data: properties.authenticationData()); |
| 1342 | } |
| 1343 | |
| 1344 | // 3.15.2.2.4 |
| 1345 | if (!properties.reason().isEmpty()) { |
| 1346 | packet.append(value: char(0x1F)); |
| 1347 | packet.append(data: properties.reason().toUtf8()); |
| 1348 | } |
| 1349 | |
| 1350 | // 3.15.2.2.5 |
| 1351 | auto userProperties = properties.userProperties(); |
| 1352 | if (!userProperties.isEmpty()) { |
| 1353 | qCDebug(lcMqttConnectionVerbose) << "Unsubscription Properties: specify user properties" ; |
| 1354 | for (const auto &prop : userProperties) { |
| 1355 | packet.append(value: char(0x26)); |
| 1356 | packet.append(data: prop.name().toUtf8()); |
| 1357 | packet.append(data: prop.value().toUtf8()); |
| 1358 | } |
| 1359 | } |
| 1360 | |
| 1361 | return packet.serializePayload(); |
| 1362 | } |
| 1363 | |
| 1364 | void QMqttConnection::finalize_auth() |
| 1365 | { |
| 1366 | qCDebug(lcMqttConnectionVerbose) << "Finalize AUTH" ; |
| 1367 | |
| 1368 | quint8 authReason = 0; |
| 1369 | QMqttAuthenticationProperties authProperties; |
| 1370 | // 3.15.2.1 - The Reason Code and Property Length can be omitted if the Reason Code |
| 1371 | // is 0x00 (Success) and there are no Properties. In this case the AUTH has a |
| 1372 | // Remaining Length of 0. |
| 1373 | if (m_missingData > 0) { |
| 1374 | authReason = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1375 | readAuthProperties(properties&: authProperties); |
| 1376 | } |
| 1377 | |
| 1378 | // 3.15.2.1 |
| 1379 | switch (QMqtt::ReasonCode(authReason)) { |
| 1380 | case QMqtt::ReasonCode::Success: |
| 1381 | emit m_clientPrivate->m_client->authenticationFinished(p: authProperties); |
| 1382 | break; |
| 1383 | case QMqtt::ReasonCode::ContinueAuthentication: |
| 1384 | case QMqtt::ReasonCode::ReAuthenticate: |
| 1385 | emit m_clientPrivate->m_client->authenticationRequested(p: authProperties); |
| 1386 | break; |
| 1387 | default: |
| 1388 | qCDebug(lcMqttConnection) << "Received illegal AUTH reason code:" << authReason; |
| 1389 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1390 | break; |
| 1391 | } |
| 1392 | } |
| 1393 | |
| 1394 | void QMqttConnection::finalize_connack() |
| 1395 | { |
| 1396 | qCDebug(lcMqttConnectionVerbose) << "Finalize CONNACK" ; |
| 1397 | |
| 1398 | const quint8 ackFlags = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1399 | |
| 1400 | if (ackFlags > 1) { // MQTT-3.2.2.1 |
| 1401 | qCDebug(lcMqttConnection) << "Unexpected CONNACK Flags specified:" << QString::number(ackFlags); |
| 1402 | readBuffer(size: quint64(m_missingData)); |
| 1403 | m_missingData = 0; |
| 1404 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1405 | return; |
| 1406 | } |
| 1407 | bool sessionPresent = ackFlags == 1; |
| 1408 | |
| 1409 | // MQTT-3.2.2-1 & MQTT-3.2.2-2 |
| 1410 | if (sessionPresent) { |
| 1411 | emit m_clientPrivate->m_client->brokerSessionRestored(); |
| 1412 | if (m_clientPrivate->m_cleanSession) |
| 1413 | qCDebug(lcMqttConnection) << "Connected with a clean session, ack contains session present." ; |
| 1414 | } else { |
| 1415 | // MQTT-4.1.0.-1 MQTT-4.1.0-2 Session not stored on broker side |
| 1416 | // regardless whether cleanSession is false |
| 1417 | cleanSubscriptions(); |
| 1418 | } |
| 1419 | |
| 1420 | quint8 connectResultValue = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1421 | QMqttServerConnectionProperties serverProp; |
| 1422 | serverProp.serverData->reasonCode = QMqtt::ReasonCode(connectResultValue); |
| 1423 | m_clientPrivate->m_serverConnectionProperties = serverProp; |
| 1424 | if (connectResultValue != 0 && m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
| 1425 | qCDebug(lcMqttConnection) << "Connection has been rejected." ; |
| 1426 | closeConnection(error: static_cast<QMqttClient::ClientError>(connectResultValue)); |
| 1427 | return; |
| 1428 | } |
| 1429 | |
| 1430 | // MQTT 5.0 has variable part != 2 in the header |
| 1431 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 1432 | readConnackProperties(properties&: m_clientPrivate->m_serverConnectionProperties); |
| 1433 | m_receiveAliases.resize(size: m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()); |
| 1434 | m_publishAliases.resize(size: m_clientPrivate->m_connectionProperties.maximumTopicAlias()); |
| 1435 | |
| 1436 | // 3.2.2.2 |
| 1437 | switch (QMqtt::ReasonCode(connectResultValue)) { |
| 1438 | case QMqtt::ReasonCode::Success: |
| 1439 | break; |
| 1440 | case QMqtt::ReasonCode::MalformedPacket: |
| 1441 | case QMqtt::ReasonCode::ProtocolError: |
| 1442 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1443 | return; |
| 1444 | case QMqtt::ReasonCode::UnsupportedProtocolVersion: |
| 1445 | closeConnection(error: QMqttClient::InvalidProtocolVersion); |
| 1446 | return; |
| 1447 | case QMqtt::ReasonCode::InvalidClientId: |
| 1448 | closeConnection(error: QMqttClient::IdRejected); |
| 1449 | return; |
| 1450 | case QMqtt::ReasonCode::ServerNotAvailable: |
| 1451 | case QMqtt::ReasonCode::ServerBusy: |
| 1452 | case QMqtt::ReasonCode::UseAnotherServer: |
| 1453 | case QMqtt::ReasonCode::ServerMoved: |
| 1454 | closeConnection(error: QMqttClient::ServerUnavailable); |
| 1455 | return; |
| 1456 | case QMqtt::ReasonCode::InvalidUserNameOrPassword: |
| 1457 | closeConnection(error: QMqttClient::BadUsernameOrPassword); |
| 1458 | return; |
| 1459 | case QMqtt::ReasonCode::NotAuthorized: |
| 1460 | closeConnection(error: QMqttClient::NotAuthorized); |
| 1461 | return; |
| 1462 | case QMqtt::ReasonCode::UnspecifiedError: |
| 1463 | closeConnection(error: QMqttClient::UnknownError); |
| 1464 | return; |
| 1465 | case QMqtt::ReasonCode::ImplementationSpecificError: |
| 1466 | case QMqtt::ReasonCode::ClientBanned: |
| 1467 | case QMqtt::ReasonCode::InvalidAuthenticationMethod: |
| 1468 | case QMqtt::ReasonCode::InvalidTopicName: |
| 1469 | case QMqtt::ReasonCode::PacketTooLarge: |
| 1470 | case QMqtt::ReasonCode::QuotaExceeded: |
| 1471 | case QMqtt::ReasonCode::InvalidPayloadFormat: |
| 1472 | case QMqtt::ReasonCode::RetainNotSupported: |
| 1473 | case QMqtt::ReasonCode::QoSNotSupported: |
| 1474 | case QMqtt::ReasonCode::ExceededConnectionRate: |
| 1475 | closeConnection(error: QMqttClient::Mqtt5SpecificError); |
| 1476 | return; |
| 1477 | default: |
| 1478 | qCDebug(lcMqttConnection) << "Received illegal CONNACK reason code:" << connectResultValue; |
| 1479 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1480 | return; |
| 1481 | } |
| 1482 | } |
| 1483 | |
| 1484 | m_internalState = BrokerConnected; |
| 1485 | m_clientPrivate->setStateAndError(s: QMqttClient::Connected); |
| 1486 | |
| 1487 | if (m_clientPrivate->m_autoKeepAlive) |
| 1488 | m_pingTimer.start(msec: m_clientPrivate->m_keepAlive * 1000, obj: this); |
| 1489 | } |
| 1490 | |
| 1491 | void QMqttConnection::finalize_suback() |
| 1492 | { |
| 1493 | const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData); |
| 1494 | |
| 1495 | auto sub = m_pendingSubscriptionAck.take(key: id); |
| 1496 | if (Q_UNLIKELY(sub == nullptr)) { |
| 1497 | qCDebug(lcMqttConnection) << "Received SUBACK for unknown subscription request." ; |
| 1498 | return; |
| 1499 | } |
| 1500 | |
| 1501 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 1502 | readSubscriptionProperties(sub); |
| 1503 | |
| 1504 | // 3.9.3 - The Payload contains a list of Reason Codes. Each Reason Code corresponds to a Topic Filter in the SUBSCRIBE packet being acknowledged. |
| 1505 | // Whereas 3.8.3 states "The Payload MUST contain at least one Topic Filter and Subscription Options pair. A SUBSCRIBE packet with no Payload is a Protocol Error." |
| 1506 | do { |
| 1507 | quint8 reason = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1508 | |
| 1509 | sub->d_func()->m_reasonCode = QMqtt::ReasonCode(reason); |
| 1510 | |
| 1511 | // 3.9.3 |
| 1512 | switch (QMqtt::ReasonCode(reason)) { |
| 1513 | case QMqtt::ReasonCode::SubscriptionQoSLevel0: |
| 1514 | case QMqtt::ReasonCode::SubscriptionQoSLevel1: |
| 1515 | case QMqtt::ReasonCode::SubscriptionQoSLevel2: |
| 1516 | qCDebug(lcMqttConnectionVerbose) << "Finalize SUBACK: id:" << id << "qos:" << reason; |
| 1517 | // The broker might have a different support level for QoS than what |
| 1518 | // the client requested |
| 1519 | if (reason != sub->qos()) { |
| 1520 | sub->setQos(reason); |
| 1521 | emit sub->qosChanged(reason); |
| 1522 | } |
| 1523 | sub->setState(QMqttSubscription::Subscribed); |
| 1524 | break; |
| 1525 | case QMqtt::ReasonCode::UnspecifiedError: |
| 1526 | qCWarning(lcMqttConnection) << "Subscription for id " << id << " failed. Reason Code:" << reason; |
| 1527 | sub->setState(QMqttSubscription::Error); |
| 1528 | break; |
| 1529 | case QMqtt::ReasonCode::ImplementationSpecificError: |
| 1530 | case QMqtt::ReasonCode::NotAuthorized: |
| 1531 | case QMqtt::ReasonCode::InvalidTopicFilter: |
| 1532 | case QMqtt::ReasonCode::MessageIdInUse: |
| 1533 | case QMqtt::ReasonCode::QuotaExceeded: |
| 1534 | case QMqtt::ReasonCode::SharedSubscriptionsNotSupported: |
| 1535 | case QMqtt::ReasonCode::SubscriptionIdsNotSupported: |
| 1536 | case QMqtt::ReasonCode::WildCardSubscriptionsNotSupported: |
| 1537 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 1538 | qCWarning(lcMqttConnection) << "Subscription for id " << id << " failed. Reason Code:" << reason; |
| 1539 | sub->setState(QMqttSubscription::Error); |
| 1540 | break; |
| 1541 | } |
| 1542 | Q_FALLTHROUGH(); |
| 1543 | default: |
| 1544 | qCWarning(lcMqttConnection) << "Received illegal SUBACK reason code:" << reason; |
| 1545 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1546 | break; |
| 1547 | } |
| 1548 | } while (m_missingData > 0); |
| 1549 | } |
| 1550 | |
| 1551 | void QMqttConnection::finalize_unsuback() |
| 1552 | { |
| 1553 | const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData); |
| 1554 | qCDebug(lcMqttConnectionVerbose) << "Finalize UNSUBACK: " << id; |
| 1555 | |
| 1556 | auto sub = m_pendingUnsubscriptions.take(key: id); |
| 1557 | if (Q_UNLIKELY(sub == nullptr)) { |
| 1558 | qCDebug(lcMqttConnection) << "Received UNSUBACK for unknown request." ; |
| 1559 | return; |
| 1560 | } |
| 1561 | |
| 1562 | m_activeSubscriptions.remove(key: sub->topic()); |
| 1563 | |
| 1564 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 1565 | readSubscriptionProperties(sub); |
| 1566 | } else { |
| 1567 | // 3.11.3 - The UNSUBACK Packet has no payload. |
| 1568 | // emulate successful unsubscription |
| 1569 | sub->d_func()->m_reasonCode = QMqtt::ReasonCode::Success; |
| 1570 | sub->setState(QMqttSubscription::Unsubscribed); |
| 1571 | return; |
| 1572 | } |
| 1573 | |
| 1574 | // 3.1.3 - The Payload contains a list of Reason Codes. Each Reason Code corresponds to a Topic Filter in the UNSUBSCRIBE packet being acknowledged. |
| 1575 | // Whereas 3.10.3 states "The Payload of an UNSUBSCRIBE packet MUST contain at least one Topic Filter. An UNSUBSCRIBE packet with no Payload is a Protocol Error." |
| 1576 | do { |
| 1577 | const quint8 reasonCode = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1578 | sub->d_func()->m_reasonCode = QMqtt::ReasonCode(reasonCode); |
| 1579 | |
| 1580 | // 3.11.3 |
| 1581 | switch (QMqtt::ReasonCode(reasonCode)) { |
| 1582 | case QMqtt::ReasonCode::Success: |
| 1583 | sub->setState(QMqttSubscription::Unsubscribed); |
| 1584 | break; |
| 1585 | case QMqtt::ReasonCode::NoSubscriptionExisted: |
| 1586 | case QMqtt::ReasonCode::ImplementationSpecificError: |
| 1587 | case QMqtt::ReasonCode::NotAuthorized: |
| 1588 | case QMqtt::ReasonCode::InvalidTopicFilter: |
| 1589 | case QMqtt::ReasonCode::MessageIdInUse: |
| 1590 | case QMqtt::ReasonCode::UnspecifiedError: |
| 1591 | qCWarning(lcMqttConnection) << "Unsubscription for id " << id << " failed. Reason Code:" << reasonCode; |
| 1592 | sub->setState(QMqttSubscription::Error); |
| 1593 | break; |
| 1594 | default: |
| 1595 | qCWarning(lcMqttConnection) << "Received illegal UNSUBACK reason code:" << reasonCode; |
| 1596 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1597 | break; |
| 1598 | } |
| 1599 | } while (m_missingData > 0); |
| 1600 | } |
| 1601 | |
| 1602 | void QMqttConnection::finalize_publish() |
| 1603 | { |
| 1604 | // String topic |
| 1605 | QMqttTopicName topic = readBufferTyped<QString>(dataSize: &m_missingData); |
| 1606 | const int topicLength = topic.name().size(); |
| 1607 | |
| 1608 | quint16 id = 0; |
| 1609 | if (m_currentPublish.qos > 0) |
| 1610 | id = readBufferTyped<quint16>(dataSize: &m_missingData); |
| 1611 | |
| 1612 | QMqttPublishProperties publishProperties; |
| 1613 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 1614 | readPublishProperties(properties&: publishProperties); |
| 1615 | |
| 1616 | if (publishProperties.availableProperties() & QMqttPublishProperties::TopicAlias) { |
| 1617 | const quint16 topicAlias = publishProperties.topicAlias(); |
| 1618 | if (topicAlias == 0 || topicAlias > m_clientPrivate->m_connectionProperties.maximumTopicAlias()) { |
| 1619 | qCDebug(lcMqttConnection) << "TopicAlias receive: overflow." ; |
| 1620 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1621 | return; |
| 1622 | } |
| 1623 | if (topicLength == 0) { // New message on existing topic alias |
| 1624 | topic = m_receiveAliases.at(i: topicAlias - 1); |
| 1625 | if (topic.name().isEmpty()) { |
| 1626 | qCDebug(lcMqttConnection) << "TopicAlias receive: alias for unknown topic." ; |
| 1627 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1628 | return; |
| 1629 | } |
| 1630 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias receive: Using " << topicAlias; |
| 1631 | } else { // Resetting a topic alias |
| 1632 | qCDebug(lcMqttConnection) << "TopicAlias receive: Resetting:" << topic.name() << " : " << topicAlias; |
| 1633 | m_receiveAliases[topicAlias - 1] = topic; |
| 1634 | } |
| 1635 | } |
| 1636 | |
| 1637 | // message |
| 1638 | const quint64 payloadLength = quint64(m_missingData); |
| 1639 | const QByteArray message = readBuffer(size: payloadLength); |
| 1640 | m_missingData -= payloadLength; |
| 1641 | |
| 1642 | qCDebug(lcMqttConnectionVerbose) << "Finalize PUBLISH: topic:" << topic |
| 1643 | << " payloadLength:" << payloadLength; |
| 1644 | |
| 1645 | emit m_clientPrivate->m_client->messageReceived(message, topic); |
| 1646 | |
| 1647 | QMqttMessage qmsg(topic, message, id, m_currentPublish.qos, |
| 1648 | m_currentPublish.dup, m_currentPublish.retain); |
| 1649 | qmsg.d->m_publishProperties = publishProperties; |
| 1650 | |
| 1651 | if (id != 0) { |
| 1652 | QMqttMessageStatusProperties statusProp; |
| 1653 | statusProp.data->userProperties = publishProperties.userProperties(); |
| 1654 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Published, properties: statusProp); |
| 1655 | } |
| 1656 | |
| 1657 | // Store subscriptions in a temporary container as each messageReceived is allowed to subscribe |
| 1658 | // again and thus invalid the iterator of the loop. |
| 1659 | QList<QMqttSubscription *> subscribers; |
| 1660 | for (const auto [key, value] : m_activeSubscriptions.asKeyValueRange()) { |
| 1661 | if (key.match(name: topic)) |
| 1662 | subscribers.append(t: value); |
| 1663 | } |
| 1664 | for (const auto &s : subscribers) |
| 1665 | emit s->messageReceived(msg: qmsg); |
| 1666 | |
| 1667 | if (m_currentPublish.qos == 1) |
| 1668 | sendControlPublishAcknowledge(id); |
| 1669 | else if (m_currentPublish.qos == 2) |
| 1670 | sendControlPublishReceive(id); |
| 1671 | } |
| 1672 | |
| 1673 | void QMqttConnection::finalize_pubAckRecRelComp() |
| 1674 | { |
| 1675 | qCDebug(lcMqttConnectionVerbose) << "Finalize PUBACK/REC/REL/COMP" ; |
| 1676 | const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData); |
| 1677 | |
| 1678 | QMqttMessageStatusProperties properties; |
| 1679 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) { |
| 1680 | // Reason Code (1byte) |
| 1681 | const quint8 reasonCode = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1682 | properties.data->reasonCode = QMqtt::ReasonCode(reasonCode); |
| 1683 | |
| 1684 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBACK || (m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) { |
| 1685 | // 3.4.2.1, 3.5.2.1 |
| 1686 | switch (QMqtt::ReasonCode(reasonCode)) { |
| 1687 | case QMqtt::ReasonCode::Success: |
| 1688 | case QMqtt::ReasonCode::NoMatchingSubscriber: |
| 1689 | case QMqtt::ReasonCode::UnspecifiedError: |
| 1690 | case QMqtt::ReasonCode::ImplementationSpecificError: |
| 1691 | case QMqtt::ReasonCode::NotAuthorized: |
| 1692 | case QMqtt::ReasonCode::InvalidTopicName: |
| 1693 | case QMqtt::ReasonCode::MessageIdInUse: |
| 1694 | case QMqtt::ReasonCode::QuotaExceeded: |
| 1695 | case QMqtt::ReasonCode::InvalidPayloadFormat: |
| 1696 | break; |
| 1697 | default: |
| 1698 | qCWarning(lcMqttConnection) << "Received illegal PUBACK/REC reason code:" << reasonCode; |
| 1699 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1700 | return; |
| 1701 | } |
| 1702 | } else { |
| 1703 | // 3.6.2.1, 3.7.2.1 |
| 1704 | switch (QMqtt::ReasonCode(reasonCode)) { |
| 1705 | case QMqtt::ReasonCode::Success: |
| 1706 | case QMqtt::ReasonCode::MessageIdNotFound: |
| 1707 | break; |
| 1708 | default: |
| 1709 | qCWarning(lcMqttConnection) << "Received illegal PUBREL/COMP reason code:" << reasonCode; |
| 1710 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1711 | return; |
| 1712 | } |
| 1713 | } |
| 1714 | |
| 1715 | readMessageStatusProperties(properties); |
| 1716 | } |
| 1717 | |
| 1718 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREL) { |
| 1719 | qCDebug(lcMqttConnectionVerbose) << " PUBREL:" << id; |
| 1720 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Released, properties); |
| 1721 | sendControlPublishComp(id); |
| 1722 | return; |
| 1723 | } |
| 1724 | |
| 1725 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBCOMP) { |
| 1726 | qCDebug(lcMqttConnectionVerbose) << " PUBCOMP:" << id; |
| 1727 | auto pendingRelease = m_pendingReleaseMessages.take(key: id); |
| 1728 | if (!pendingRelease) |
| 1729 | qCDebug(lcMqttConnection) << "Received PUBCOMP for unknown released message." ; |
| 1730 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Completed, properties); |
| 1731 | emit m_clientPrivate->m_client->messageSent(id); |
| 1732 | return; |
| 1733 | } |
| 1734 | |
| 1735 | auto pendingMsg = m_pendingMessages.take(key: id); |
| 1736 | if (!pendingMsg) { |
| 1737 | qCDebug(lcMqttConnection) << "Received PUBACK for unknown message: " << id; |
| 1738 | return; |
| 1739 | } |
| 1740 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) { |
| 1741 | qCDebug(lcMqttConnectionVerbose) << " PUBREC:" << id; |
| 1742 | m_pendingReleaseMessages.insert(key: id, value: pendingMsg); |
| 1743 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Received, properties); |
| 1744 | sendControlPublishRelease(id); |
| 1745 | } else { |
| 1746 | qCDebug(lcMqttConnectionVerbose) << " PUBACK:" << id; |
| 1747 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Acknowledged, properties); |
| 1748 | emit m_clientPrivate->m_client->messageSent(id); |
| 1749 | } |
| 1750 | } |
| 1751 | |
| 1752 | void QMqttConnection::finalize_pingresp() |
| 1753 | { |
| 1754 | qCDebug(lcMqttConnectionVerbose) << "Finalize PINGRESP" ; |
| 1755 | const quint8 v = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1756 | |
| 1757 | if (v != 0) { |
| 1758 | qCDebug(lcMqttConnection) << "Received a PINGRESP including payload." ; |
| 1759 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1760 | return; |
| 1761 | } |
| 1762 | m_pingTimeout--; |
| 1763 | emit m_clientPrivate->m_client->pingResponseReceived(); |
| 1764 | } |
| 1765 | |
| 1766 | bool QMqttConnection::processDataHelper() |
| 1767 | { |
| 1768 | if (m_missingData > 0) { |
| 1769 | if ((m_readBuffer.size() - m_readPosition) < m_missingData) |
| 1770 | return false; |
| 1771 | |
| 1772 | switch (m_currentPacket & 0xF0) { |
| 1773 | case QMqttControlPacket::AUTH: |
| 1774 | finalize_auth(); |
| 1775 | break; |
| 1776 | case QMqttControlPacket::CONNACK: |
| 1777 | finalize_connack(); |
| 1778 | break; |
| 1779 | case QMqttControlPacket::SUBACK: |
| 1780 | finalize_suback(); |
| 1781 | break; |
| 1782 | case QMqttControlPacket::UNSUBACK: |
| 1783 | finalize_unsuback(); |
| 1784 | break; |
| 1785 | case QMqttControlPacket::PUBLISH: |
| 1786 | finalize_publish(); |
| 1787 | break; |
| 1788 | case QMqttControlPacket::PUBACK: |
| 1789 | case QMqttControlPacket::PUBREC: |
| 1790 | case QMqttControlPacket::PUBREL: |
| 1791 | case QMqttControlPacket::PUBCOMP: |
| 1792 | finalize_pubAckRecRelComp(); |
| 1793 | break; |
| 1794 | case QMqttControlPacket::PINGRESP: |
| 1795 | finalize_pingresp(); |
| 1796 | break; |
| 1797 | default: |
| 1798 | qCDebug(lcMqttConnection) << "Unknown packet to finalize." ; |
| 1799 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1800 | break; |
| 1801 | } |
| 1802 | |
| 1803 | if (m_internalState == BrokerDisconnected) |
| 1804 | return false; |
| 1805 | |
| 1806 | Q_ASSERT(m_missingData == 0); |
| 1807 | |
| 1808 | m_readBuffer = m_readBuffer.mid(index: m_readPosition); |
| 1809 | m_readPosition = 0; |
| 1810 | } |
| 1811 | |
| 1812 | // MQTT-2.2 A fixed header of a control packet must be at least 2 bytes. If the payload is |
| 1813 | // longer than 127 bytes the header can be up to 5 bytes long. |
| 1814 | switch (m_readBuffer.size()) { |
| 1815 | case 0: |
| 1816 | case 1: |
| 1817 | return false; |
| 1818 | case 2: |
| 1819 | if ((m_readBuffer.at(i: 1) & 128) != 0) |
| 1820 | return false; |
| 1821 | break; |
| 1822 | case 3: |
| 1823 | if ((m_readBuffer.at(i: 1) & 128) != 0 && (m_readBuffer.at(i: 2) & 128) != 0) |
| 1824 | return false; |
| 1825 | break; |
| 1826 | case 4: |
| 1827 | if ((m_readBuffer.at(i: 1) & 128) != 0 && (m_readBuffer.at(i: 2) & 128) != 0 && (m_readBuffer.at(i: 3) & 128) != 0) |
| 1828 | return false; |
| 1829 | break; |
| 1830 | default: |
| 1831 | break; |
| 1832 | } |
| 1833 | |
| 1834 | readBuffer(data: reinterpret_cast<char *>(&m_currentPacket), size: 1); |
| 1835 | |
| 1836 | switch (m_currentPacket & 0xF0) { |
| 1837 | case QMqttControlPacket::CONNACK: { |
| 1838 | qCDebug(lcMqttConnectionVerbose) << "Received CONNACK" ; |
| 1839 | if (m_internalState != BrokerWaitForConnectAck) { |
| 1840 | qCDebug(lcMqttConnection) << "Received CONNACK at unexpected time." ; |
| 1841 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1842 | return false; |
| 1843 | } |
| 1844 | |
| 1845 | qint32 payloadSize = readVariableByteInteger(); |
| 1846 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
| 1847 | if (payloadSize != 2) { |
| 1848 | qCDebug(lcMqttConnection) << "Unexpected FRAME size in CONNACK." ; |
| 1849 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1850 | return false; |
| 1851 | } |
| 1852 | } |
| 1853 | m_missingData = payloadSize; |
| 1854 | break; |
| 1855 | } |
| 1856 | case QMqttControlPacket::SUBACK: { |
| 1857 | qCDebug(lcMqttConnectionVerbose) << "Received SUBACK" ; |
| 1858 | const quint8 remaining = readBufferTyped<quint8>(); |
| 1859 | m_missingData = remaining; |
| 1860 | break; |
| 1861 | } |
| 1862 | case QMqttControlPacket::PUBLISH: { |
| 1863 | qCDebug(lcMqttConnectionVerbose) << "Received PUBLISH" ; |
| 1864 | m_currentPublish.dup = m_currentPacket & 0x08; |
| 1865 | m_currentPublish.qos = (m_currentPacket & 0x06) >> 1; |
| 1866 | m_currentPublish.retain = m_currentPacket & 0x01; |
| 1867 | if ((m_currentPublish.qos == 0 && m_currentPublish.dup != 0) |
| 1868 | || m_currentPublish.qos > 2) { |
| 1869 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1870 | return false; |
| 1871 | } |
| 1872 | |
| 1873 | m_missingData = readVariableByteInteger(); |
| 1874 | if (m_missingData == -1) |
| 1875 | return false; // Connection closed inside readVariableByteInteger |
| 1876 | break; |
| 1877 | } |
| 1878 | case QMqttControlPacket::PINGRESP: |
| 1879 | qCDebug(lcMqttConnectionVerbose) << "Received PINGRESP" ; |
| 1880 | m_missingData = 1; |
| 1881 | break; |
| 1882 | |
| 1883 | |
| 1884 | case QMqttControlPacket::PUBREL: { |
| 1885 | qCDebug(lcMqttConnectionVerbose) << "Received PUBREL" ; |
| 1886 | const quint8 remaining = readBufferTyped<quint8>(); |
| 1887 | if (remaining != 0x02) { |
| 1888 | qCDebug(lcMqttConnection) << "Received 2 byte message with invalid remaining length." ; |
| 1889 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1890 | return false; |
| 1891 | } |
| 1892 | if ((m_currentPacket & 0x0F) != 0x02) { |
| 1893 | qCDebug(lcMqttConnection) << "Malformed fixed header for PUBREL." ; |
| 1894 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1895 | return false; |
| 1896 | } |
| 1897 | m_missingData = 2; |
| 1898 | break; |
| 1899 | } |
| 1900 | |
| 1901 | case QMqttControlPacket::UNSUBACK: |
| 1902 | case QMqttControlPacket::PUBACK: |
| 1903 | case QMqttControlPacket::PUBREC: |
| 1904 | case QMqttControlPacket::PUBCOMP: { |
| 1905 | qCDebug(lcMqttConnectionVerbose) << "Received UNSUBACK/PUBACK/PUBREC/PUBCOMP" ; |
| 1906 | if ((m_currentPacket & 0x0F) != 0) { |
| 1907 | qCDebug(lcMqttConnection) << "Malformed fixed header for UNSUBACK/PUBACK/PUBREC/PUBCOMP." ; |
| 1908 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1909 | return false; |
| 1910 | } |
| 1911 | const quint8 remaining = readBufferTyped<quint8>(); |
| 1912 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0 && remaining != 0x02) { |
| 1913 | qCDebug(lcMqttConnection) << "Received 2 byte message with invalid remaining length." ; |
| 1914 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1915 | return false; |
| 1916 | } |
| 1917 | m_missingData = remaining; |
| 1918 | break; |
| 1919 | } |
| 1920 | case QMqttControlPacket::AUTH: |
| 1921 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
| 1922 | qCDebug(lcMqttConnection) << "Received unknown command." ; |
| 1923 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1924 | return false; |
| 1925 | } |
| 1926 | qCDebug(lcMqttConnectionVerbose) << "Received AUTH" ; |
| 1927 | if ((m_currentPacket & 0x0F) != 0) { |
| 1928 | qCDebug(lcMqttConnection) << "Malformed fixed header for AUTH." ; |
| 1929 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1930 | return false; |
| 1931 | } |
| 1932 | m_missingData = readVariableByteInteger(); |
| 1933 | if (m_missingData == -1) |
| 1934 | return false; // Connection closed inside readVariableByteInteger |
| 1935 | break; |
| 1936 | case QMqttControlPacket::DISCONNECT: |
| 1937 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
| 1938 | qCDebug(lcMqttConnection) << "Received unknown command." ; |
| 1939 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1940 | return false; |
| 1941 | } |
| 1942 | qCDebug(lcMqttConnectionVerbose) << "Received DISCONNECT" ; |
| 1943 | if ((m_currentPacket & 0x0F) != 0) { |
| 1944 | qCDebug(lcMqttConnection) << "Malformed fixed header for DISCONNECT." ; |
| 1945 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1946 | return false; |
| 1947 | } |
| 1948 | if (m_internalState != BrokerConnected) { |
| 1949 | qCDebug(lcMqttConnection) << "Received DISCONNECT at unexpected time." ; |
| 1950 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1951 | return false; |
| 1952 | } |
| 1953 | closeConnection(error: QMqttClient::NoError); |
| 1954 | return false; |
| 1955 | default: |
| 1956 | qCDebug(lcMqttConnection) << "Received unknown command." ; |
| 1957 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1958 | return false; |
| 1959 | } |
| 1960 | |
| 1961 | /* set current command CONNACK - PINGRESP */ |
| 1962 | /* read command size */ |
| 1963 | /* calculate missing_data */ |
| 1964 | return true; // reiterate. implicitly finishes and enqueues |
| 1965 | } |
| 1966 | |
| 1967 | void QMqttConnection::processData() |
| 1968 | { |
| 1969 | while (processDataHelper()) |
| 1970 | ; |
| 1971 | } |
| 1972 | |
| 1973 | bool QMqttConnection::writePacketToTransport(const QMqttControlPacket &p) |
| 1974 | { |
| 1975 | const QByteArray writeData = p.serialize(); |
| 1976 | qCDebug(lcMqttConnectionVerbose) << Q_FUNC_INFO << " DataSize:" << writeData.size(); |
| 1977 | const qint64 res = m_transport->write(data: writeData.constData(), len: writeData.size()); |
| 1978 | if (Q_UNLIKELY(res == -1)) { |
| 1979 | qCDebug(lcMqttConnection) << "Could not write frame to transport." ; |
| 1980 | return false; |
| 1981 | } |
| 1982 | return true; |
| 1983 | } |
| 1984 | |
| 1985 | QT_END_NAMESPACE |
| 1986 | |