| 1 | // Copyright (C) 2017 The Qt Company Ltd. |
| 2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only |
| 3 | // Qt-Security score:critical reason:network-protocol |
| 4 | |
| 5 | #include "qmqttconnection_p.h" |
| 6 | #include "qmqttconnectionproperties_p.h" |
| 7 | #include "qmqttcontrolpacket_p.h" |
| 8 | #include "qmqttmessage_p.h" |
| 9 | #include "qmqttpublishproperties_p.h" |
| 10 | #include "qmqttsubscription_p.h" |
| 11 | #include "qmqttclient_p.h" |
| 12 | #include "transportLayers/mqtt_websocket_io_p.h" |
| 13 | #include "transportLayers/mqtt_secure_websocket_io_p.h" |
| 14 | |
| 15 | #include <QtCore/QLoggingCategory> |
| 16 | #include <QtNetwork/QSslSocket> |
| 17 | #include <QtNetwork/QTcpSocket> |
| 18 | |
| 19 | #include <limits> |
| 20 | #include <cstdint> |
| 21 | |
| 22 | QT_BEGIN_NAMESPACE |
| 23 | |
| 24 | Q_LOGGING_CATEGORY(lcMqttConnection, "qt.mqtt.connection" ) |
| 25 | Q_STATIC_LOGGING_CATEGORY(lcMqttConnectionVerbose, "qt.mqtt.connection.verbose" ); |
| 26 | |
| 27 | template <typename T> |
| 28 | T QMqttConnection::readBufferTyped(qint64 *dataSize) |
| 29 | { |
| 30 | Q_STATIC_ASSERT(std::is_integral<T>::value); |
| 31 | |
| 32 | T result = 0; |
| 33 | if (Q_UNLIKELY(dataSize != nullptr && *dataSize < qint64(sizeof(result)))) { |
| 34 | qCWarning(lcMqttConnection) << "Attempt to read past the data" ; |
| 35 | return result; |
| 36 | } |
| 37 | if (readBuffer(data: reinterpret_cast<char *>(&result), size: sizeof(result)) && dataSize != nullptr) |
| 38 | *dataSize -= sizeof(result); |
| 39 | return qFromBigEndian(result); |
| 40 | } |
| 41 | |
| 42 | template<> |
| 43 | QByteArray QMqttConnection::readBufferTyped(qint64 *dataSize) |
| 44 | { |
| 45 | const quint16 size = readBufferTyped<quint16>(dataSize); |
| 46 | if (Q_UNLIKELY(dataSize != nullptr && *dataSize < qint64(size))) { |
| 47 | qCWarning(lcMqttConnection) << "Attempt to read past the data" ; |
| 48 | return QByteArray(); |
| 49 | } |
| 50 | QByteArray ba(int(size), Qt::Uninitialized); |
| 51 | if (readBuffer(data: ba.data(), size) && dataSize != nullptr) |
| 52 | *dataSize -= size; |
| 53 | return ba; |
| 54 | } |
| 55 | |
| 56 | template<> |
| 57 | QString QMqttConnection::readBufferTyped(qint64 *dataSize) |
| 58 | { |
| 59 | return QString::fromUtf8(ba: readBufferTyped<QByteArray>(dataSize)); |
| 60 | } |
| 61 | |
| 62 | bool QMqttConnection::isPendingUnsubscribe(QMqttSubscription *sub) const |
| 63 | { |
| 64 | for (const auto [key, value] : m_pendingUnsubscriptions.asKeyValueRange()) { |
| 65 | if (value == sub) |
| 66 | return true; |
| 67 | } |
| 68 | return false; |
| 69 | } |
| 70 | |
| 71 | QMqttSubscription *QMqttConnection::findActiveSubscription(const QMqttTopicFilter &topic) const |
| 72 | { |
| 73 | const auto range = m_activeSubscriptions.equal_range(key: topic); |
| 74 | for (auto it = range.first; it != range.second; ++it) { |
| 75 | if (!isPendingUnsubscribe(sub: it.value())) |
| 76 | return it.value(); |
| 77 | } |
| 78 | return nullptr; |
| 79 | } |
| 80 | |
| 81 | QMqttConnection::QMqttConnection(QObject *parent) : QObject(parent) |
| 82 | { |
| 83 | } |
| 84 | |
| 85 | QMqttConnection::~QMqttConnection() |
| 86 | { |
| 87 | if (m_internalState == BrokerConnected) |
| 88 | sendControlDisconnect(); |
| 89 | m_transport = nullptr; |
| 90 | } |
| 91 | |
| 92 | void QMqttConnection::timerEvent(QTimerEvent *event) |
| 93 | { |
| 94 | if (Q_LIKELY(event->timerId() == m_pingTimer.timerId())) { |
| 95 | sendControlPingRequest(); |
| 96 | return; |
| 97 | } |
| 98 | |
| 99 | QObject::timerEvent(event); |
| 100 | } |
| 101 | |
| 102 | void QMqttConnection::disconnectAndResetTransport() |
| 103 | { |
| 104 | if (m_transport) { |
| 105 | if (m_connectedTransport) { |
| 106 | // |
| 107 | // Looks weird. We know that m_transport is not set from |
| 108 | // the outside because m_connectedTransport is only set |
| 109 | // by us for the websocket cases, or in ensureTransport. |
| 110 | // |
| 111 | // Thus we can disconnect everything. |
| 112 | // |
| 113 | disconnect(receiver: m_transport.get()); |
| 114 | } else { |
| 115 | // |
| 116 | // m_transport is set from outside with setTransport, |
| 117 | // there might be other connections that shall not |
| 118 | // be removed. |
| 119 | // |
| 120 | disconnect(sender: m_transport.get(), signal: &QIODevice::aboutToClose, receiver: this, slot: &QMqttConnection::transportConnectionClosed); |
| 121 | disconnect(sender: m_transport.get(), signal: &QIODevice::readyRead, receiver: this, slot: &QMqttConnection::transportReadyRead); |
| 122 | } |
| 123 | } |
| 124 | m_transport = nullptr; |
| 125 | m_transportType = QMqttClient::TransportType::IODevice; |
| 126 | } |
| 127 | |
| 128 | void QMqttConnection::setTransport(QIODevice *device, QMqttClient::TransportType transport) |
| 129 | { |
| 130 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << device << " Type:" << transport; |
| 131 | |
| 132 | disconnectAndResetTransport(); |
| 133 | |
| 134 | m_transport = std::shared_ptr<QIODevice>(device, [](QIODevice *) { ; }); |
| 135 | m_transportType = transport; |
| 136 | m_connectedTransport = false; |
| 137 | m_transportIsSet = true; |
| 138 | |
| 139 | connect(sender: m_transport.get(), signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed); |
| 140 | connect(sender: m_transport.get(), signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead); |
| 141 | } |
| 142 | |
| 143 | void QMqttConnection::connectTransport(QMqttClient::TransportType transportType, const std::shared_ptr<QIODevice> &transport) |
| 144 | { |
| 145 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Type:" << transportType; |
| 146 | |
| 147 | disconnectAndResetTransport(); |
| 148 | |
| 149 | m_transportType = transportType; |
| 150 | m_transport = transport; |
| 151 | m_transportIsSet = true; |
| 152 | m_connectedTransport = true; |
| 153 | |
| 154 | if (!m_transport) { |
| 155 | qWarning(catFunc: lcMqttConnection) << " No transport created for " << transportType; |
| 156 | return; |
| 157 | } |
| 158 | |
| 159 | QObject::connect(sender: m_transport.get(), signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed); |
| 160 | QObject::connect(sender: m_transport.get(), signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead); |
| 161 | |
| 162 | switch (m_transportType) { |
| 163 | case QMqttClient::TransportType::IODevice: |
| 164 | break; |
| 165 | case QMqttClient::TransportType::AbstractSocket: |
| 166 | QObject::connect(sender: qobject_cast<QAbstractSocket *>(object: m_transport.get()), signal: &QAbstractSocket::connected, context: this, slot: &QMqttConnection::transportConnectionEstablished); |
| 167 | QObject::connect(sender: qobject_cast<QAbstractSocket *>(object: m_transport.get()), signal: &QAbstractSocket::disconnected, context: this, slot: &QMqttConnection::transportConnectionClosed); |
| 168 | QObject::connect(sender: qobject_cast<QAbstractSocket *>(object: m_transport.get()), signal: &QAbstractSocket::errorOccurred, context: this, slot: &QMqttConnection::transportError); |
| 169 | break; |
| 170 | case QMqttClient::TransportType::WebSocket: |
| 171 | #ifdef QT_MQTT_WITH_WEBSOCKETS |
| 172 | QObject::connect(qobject_cast<QMqttWebSocketIO *>(m_transport.get()), &QMqttWebSocketIO::connected, this, &QMqttConnection::transportConnectionEstablished); |
| 173 | QObject::connect(qobject_cast<QMqttWebSocketIO *>(m_transport.get()), &QMqttWebSocketIO::disconnected, this, &QMqttConnection::transportConnectionClosed); |
| 174 | QObject::connect(qobject_cast<QMqttWebSocketIO *>(m_transport.get()), &QMqttWebSocketIO::errorOccurred, this, &QMqttConnection::transportError); |
| 175 | #endif |
| 176 | break; |
| 177 | |
| 178 | case QMqttClient::TransportType::SecureSocket: |
| 179 | #ifndef QT_NO_SSL |
| 180 | QObject::connect(sender: qobject_cast<QSslSocket *>(object: m_transport.get()), signal: &QSslSocket::encrypted, context: this, slot: &QMqttConnection::transportConnectionEstablished); |
| 181 | QObject::connect(sender: qobject_cast<QSslSocket *>(object: m_transport.get()), signal: &QAbstractSocket::disconnected, context: this, slot: &QMqttConnection::transportConnectionClosed); |
| 182 | QObject::connect(sender: qobject_cast<QSslSocket *>(object: m_transport.get()), signal: &QAbstractSocket::errorOccurred, context: this, slot: &QMqttConnection::transportError); |
| 183 | #endif |
| 184 | break; |
| 185 | |
| 186 | case QMqttClient::TransportType::SecureWebSocket: |
| 187 | #ifdef QT_MQTT_WITH_WEBSOCKETS |
| 188 | QObject::connect(qobject_cast<QMqttSecureWebSocketIO *>(m_transport.get()), &QMqttSecureWebSocketIO::encrypted, this, &QMqttConnection::transportConnectionEstablished); |
| 189 | QObject::connect(qobject_cast<QMqttSecureWebSocketIO *>(m_transport.get()), &QMqttSecureWebSocketIO::disconnected, this, &QMqttConnection::transportConnectionClosed); |
| 190 | QObject::connect(qobject_cast<QMqttSecureWebSocketIO *>(m_transport.get()), &QMqttSecureWebSocketIO::errorOccurred, this, &QMqttConnection::transportError); |
| 191 | #endif |
| 192 | break; |
| 193 | |
| 194 | default: |
| 195 | Q_ASSERT(false); |
| 196 | break; |
| 197 | } // end switch |
| 198 | } |
| 199 | |
| 200 | QIODevice *QMqttConnection::transport() const |
| 201 | { |
| 202 | return m_transport.get(); |
| 203 | } |
| 204 | |
| 205 | bool QMqttConnection::ensureTransport(bool createSecureIfNeeded) |
| 206 | { |
| 207 | Q_UNUSED(createSecureIfNeeded); // QT_NO_SSL |
| 208 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << m_transport.get(); |
| 209 | |
| 210 | if (m_transportIsSet) |
| 211 | return true; // transport set by setTransport |
| 212 | |
| 213 | disconnectAndResetTransport(); |
| 214 | |
| 215 | // We are asked to create a transport layer |
| 216 | if (m_clientPrivate->m_hostname.isEmpty() || m_clientPrivate->m_port == 0) { |
| 217 | qCDebug(lcMqttConnection) << "No hostname specified, or port is 0. Not able to create a transport layer." ; |
| 218 | return false; |
| 219 | } |
| 220 | |
| 221 | #ifdef Q_OS_WASM |
| 222 | qCWarning(lcMqttConnection) << "For WebAssembly, use connectToHostWebSocket[Encrypted]" ; |
| 223 | return false; |
| 224 | #endif |
| 225 | |
| 226 | #ifndef QT_NO_SSL |
| 227 | if (createSecureIfNeeded) { |
| 228 | auto socket = std::make_shared<QSslSocket>(); |
| 229 | m_transport = socket; |
| 230 | m_connectedTransport = true; |
| 231 | m_transportType = QMqttClient::SecureSocket; |
| 232 | |
| 233 | QObject::connect(sender: socket.get(), signal: &QSslSocket::encrypted, context: this, slot: &QMqttConnection::transportConnectionEstablished); |
| 234 | QObject::connect(sender: socket.get(), signal: &QAbstractSocket::disconnected, context: this, slot: &QMqttConnection::transportConnectionClosed); |
| 235 | QObject::connect(sender: socket.get(), signal: &QAbstractSocket::errorOccurred, context: this, slot: &QMqttConnection::transportError); |
| 236 | } |
| 237 | #endif |
| 238 | |
| 239 | if (!m_transport) { |
| 240 | auto socket = std::make_shared<QTcpSocket>(); |
| 241 | m_transport = socket; |
| 242 | m_connectedTransport = true; |
| 243 | m_transportType = QMqttClient::AbstractSocket; |
| 244 | |
| 245 | QObject::connect(sender: socket.get(), signal: &QAbstractSocket::connected, context: this, slot: &QMqttConnection::transportConnectionEstablished); |
| 246 | QObject::connect(sender: socket.get(), signal: &QAbstractSocket::disconnected, context: this, slot: &QMqttConnection::transportConnectionClosed); |
| 247 | QObject::connect(sender: socket.get(), signal: &QAbstractSocket::errorOccurred, context: this, slot: &QMqttConnection::transportError); |
| 248 | } |
| 249 | QObject::connect(sender: m_transport.get(), signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed); |
| 250 | QObject::connect(sender: m_transport.get(), signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead); |
| 251 | |
| 252 | return true; |
| 253 | } |
| 254 | |
| 255 | bool QMqttConnection::ensureTransportOpen(const QString &sslPeerName) |
| 256 | { |
| 257 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << m_transportType; |
| 258 | |
| 259 | if (m_transportType == QMqttClient::IODevice) { |
| 260 | if (m_transport->isOpen()) |
| 261 | return sendControlConnect(); |
| 262 | |
| 263 | if (!m_transport->open(mode: QIODevice::ReadWrite)) { |
| 264 | qCDebug(lcMqttConnection) << "Could not open Transport IO device." ; |
| 265 | m_internalState = BrokerDisconnected; |
| 266 | return false; |
| 267 | } |
| 268 | return sendControlConnect(); |
| 269 | } |
| 270 | |
| 271 | if (m_transportType == QMqttClient::AbstractSocket) { |
| 272 | auto socket = qobject_cast<QTcpSocket *>(object: m_transport.get()); |
| 273 | Q_ASSERT(socket); |
| 274 | if (socket->state() == QAbstractSocket::ConnectedState) |
| 275 | return sendControlConnect(); |
| 276 | |
| 277 | m_internalState = BrokerConnecting; |
| 278 | socket->connectToHost(hostName: m_clientPrivate->m_hostname, port: m_clientPrivate->m_port); |
| 279 | } |
| 280 | else if (m_transportType == QMqttClient::WebSocket) { |
| 281 | #ifdef QT_MQTT_WITH_WEBSOCKETS |
| 282 | auto socket = qobject_cast<QMqttWebSocketIO *>(m_transport.get()); |
| 283 | Q_ASSERT(socket); |
| 284 | if (socket->state() == QAbstractSocket::ConnectedState) |
| 285 | return sendControlConnect(); |
| 286 | |
| 287 | m_internalState = BrokerConnecting; |
| 288 | socket->connectToHost(m_clientPrivate->m_hostname, m_clientPrivate->m_port, m_clientPrivate->m_protocolVersion); |
| 289 | #endif |
| 290 | } |
| 291 | else if (m_transportType == QMqttClient::SecureWebSocket) { |
| 292 | #ifdef QT_MQTT_WITH_WEBSOCKETS |
| 293 | auto socket = qobject_cast<QMqttSecureWebSocketIO *>(m_transport.get()); |
| 294 | Q_ASSERT(socket); |
| 295 | if (socket->state() == QAbstractSocket::ConnectedState) |
| 296 | return sendControlConnect(); |
| 297 | |
| 298 | m_internalState = BrokerConnecting; |
| 299 | socket->connectToHostEncrypted(m_clientPrivate->m_hostname, m_clientPrivate->m_port, m_clientPrivate->m_protocolVersion); |
| 300 | #endif |
| 301 | } |
| 302 | #ifndef QT_NO_SSL |
| 303 | else if (m_transportType == QMqttClient::SecureSocket) { |
| 304 | auto socket = qobject_cast<QSslSocket *>(object: m_transport.get()); |
| 305 | Q_ASSERT(socket); |
| 306 | if (socket->state() == QAbstractSocket::ConnectedState) |
| 307 | return sendControlConnect(); |
| 308 | |
| 309 | m_internalState = BrokerConnecting; |
| 310 | if (!m_sslConfiguration.isNull()) |
| 311 | socket->setSslConfiguration(m_sslConfiguration); |
| 312 | socket->connectToHostEncrypted(hostName: m_clientPrivate->m_hostname, port: m_clientPrivate->m_port, sslPeerName); |
| 313 | } |
| 314 | #endif |
| 315 | Q_UNUSED(sslPeerName); |
| 316 | return true; |
| 317 | } |
| 318 | |
| 319 | bool QMqttConnection::sendControlConnect() |
| 320 | { |
| 321 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
| 322 | |
| 323 | QMqttControlPacket packet(QMqttControlPacket::CONNECT); |
| 324 | |
| 325 | // Variable header |
| 326 | // 3.1.2.1 Protocol Name |
| 327 | // 3.1.2.2 Protocol Level |
| 328 | switch (m_clientPrivate->m_protocolVersion) { |
| 329 | case QMqttClient::MQTT_3_1: |
| 330 | packet.append(data: "MQIsdp" ); |
| 331 | packet.append(value: char(3)); // Version 3.1 |
| 332 | break; |
| 333 | case QMqttClient::MQTT_3_1_1: |
| 334 | packet.append(data: "MQTT" ); |
| 335 | packet.append(value: char(4)); // Version 3.1.1 |
| 336 | break; |
| 337 | case QMqttClient::MQTT_5_0: |
| 338 | packet.append(data: "MQTT" ); |
| 339 | packet.append(value: char(5)); // Version 5.0 |
| 340 | break; |
| 341 | } |
| 342 | |
| 343 | // 3.1.2.3 Connect Flags |
| 344 | quint8 flags = 0; |
| 345 | // Clean session |
| 346 | if (m_clientPrivate->m_cleanSession) |
| 347 | flags |= 1 << 1; |
| 348 | |
| 349 | if (!m_clientPrivate->m_willTopic.isEmpty()) { |
| 350 | flags |= 1 << 2; |
| 351 | if (m_clientPrivate->m_willQoS > 2) { |
| 352 | qCDebug(lcMqttConnection) << "Invalid Will QoS specified." ; |
| 353 | return false; |
| 354 | } |
| 355 | if (m_clientPrivate->m_willQoS == 1) |
| 356 | flags |= 1 << 3; |
| 357 | else if (m_clientPrivate->m_willQoS == 2) |
| 358 | flags |= 1 << 4; |
| 359 | if (m_clientPrivate->m_willRetain) |
| 360 | flags |= 1 << 5; |
| 361 | } |
| 362 | if (m_clientPrivate->m_username.size()) |
| 363 | flags |= 1 << 7; |
| 364 | |
| 365 | if (m_clientPrivate->m_password.size()) |
| 366 | flags |= 1 << 6; |
| 367 | |
| 368 | packet.append(value: char(flags)); |
| 369 | |
| 370 | // 3.1.2.10 Keep Alive |
| 371 | packet.append(value: m_clientPrivate->m_keepAlive); |
| 372 | |
| 373 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 374 | packet.appendRaw(data: writeConnectProperties()); |
| 375 | |
| 376 | // 3.1.3 Payload |
| 377 | // 3.1.3.1 Client Identifier |
| 378 | const QByteArray clientStringArray = m_clientPrivate->m_clientId.toUtf8(); |
| 379 | if (clientStringArray.size()) { |
| 380 | packet.append(data: clientStringArray); |
| 381 | } else { |
| 382 | packet.append(value: char(0)); |
| 383 | packet.append(value: char(0)); |
| 384 | } |
| 385 | |
| 386 | if (!m_clientPrivate->m_willTopic.isEmpty()) { |
| 387 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 388 | packet.appendRaw(data: writeLastWillProperties()); |
| 389 | |
| 390 | packet.append(data: m_clientPrivate->m_willTopic.toUtf8()); |
| 391 | packet.append(data: m_clientPrivate->m_willMessage); |
| 392 | } |
| 393 | |
| 394 | if (m_clientPrivate->m_username.size()) |
| 395 | packet.append(data: m_clientPrivate->m_username.toUtf8()); |
| 396 | |
| 397 | if (m_clientPrivate->m_password.size()) |
| 398 | packet.append(data: m_clientPrivate->m_password.toUtf8()); |
| 399 | |
| 400 | m_internalState = BrokerWaitForConnectAck; |
| 401 | m_missingData = 0; |
| 402 | |
| 403 | if (!writePacketToTransport(p: packet)) { |
| 404 | qCDebug(lcMqttConnection) << "Could not write CONNECT frame to transport." ; |
| 405 | return false; |
| 406 | } |
| 407 | return true; |
| 408 | } |
| 409 | |
| 410 | bool QMqttConnection::sendControlAuthenticate(const QMqttAuthenticationProperties &properties) |
| 411 | { |
| 412 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
| 413 | |
| 414 | QMqttControlPacket packet(QMqttControlPacket::AUTH); |
| 415 | |
| 416 | switch (m_internalState) { |
| 417 | case BrokerDisconnected: |
| 418 | case BrokerConnecting: |
| 419 | case ClientDestruction: |
| 420 | qCDebug(lcMqttConnection) << "Using AUTH while disconnected." ; |
| 421 | return false; |
| 422 | case BrokerWaitForConnectAck: |
| 423 | qCDebug(lcMqttConnection) << "AUTH while connecting, set continuation flag." ; |
| 424 | packet.append(value: char(QMqtt::ReasonCode::ContinueAuthentication)); |
| 425 | break; |
| 426 | case BrokerConnected: |
| 427 | qCDebug(lcMqttConnection) << "AUTH while connected, initiate re-authentication." ; |
| 428 | packet.append(value: char(QMqtt::ReasonCode::ReAuthenticate)); |
| 429 | break; |
| 430 | } |
| 431 | |
| 432 | packet.appendRaw(data: writeAuthenticationProperties(properties)); |
| 433 | |
| 434 | if (!writePacketToTransport(p: packet)) { |
| 435 | qCDebug(lcMqttConnection) << "Could not write AUTH frame to transport." ; |
| 436 | return false; |
| 437 | } |
| 438 | |
| 439 | return true; |
| 440 | } |
| 441 | |
| 442 | qint32 QMqttConnection::sendControlPublish(const QMqttTopicName &topic, |
| 443 | const QByteArray &message, |
| 444 | quint8 qos, |
| 445 | bool retain, |
| 446 | const QMqttPublishProperties &properties) |
| 447 | { |
| 448 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << topic << " Size:" << message.size() << " bytes." |
| 449 | << "QoS:" << qos << " Retain:" << retain; |
| 450 | |
| 451 | if (!topic.isValid()) |
| 452 | return -1; |
| 453 | |
| 454 | quint8 = QMqttControlPacket::PUBLISH; |
| 455 | if (qos == 1) |
| 456 | header |= 0x02; |
| 457 | else if (qos == 2) |
| 458 | header |= 0x04; |
| 459 | |
| 460 | if (retain) |
| 461 | header |= 0x01; |
| 462 | |
| 463 | QSharedPointer<QMqttControlPacket> packet(new QMqttControlPacket(header)); |
| 464 | // topic alias |
| 465 | QMqttPublishProperties publishProperties(properties); |
| 466 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 467 | // 3.3.4 A PUBLISH packet sent from a Client to a Server MUST NOT contain a Subscription Identifier |
| 468 | if (publishProperties.availableProperties() & QMqttPublishProperties::SubscriptionIdentifier) { |
| 469 | qCWarning(lcMqttConnection) << "SubscriptionIdentifier must not be specified for publish." ; |
| 470 | return -1; |
| 471 | } |
| 472 | |
| 473 | const quint16 topicAlias = publishProperties.topicAlias(); |
| 474 | if (topicAlias > 0) { // User specified topic Alias |
| 475 | if (topicAlias > m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()) { |
| 476 | qCDebug(lcMqttConnection) << "TopicAlias publish: overflow." ; |
| 477 | return -1; |
| 478 | } |
| 479 | if (m_publishAliases.at(i: topicAlias - 1) != topic) { |
| 480 | qCDebug(lcMqttConnection) << "TopicAlias publish: Assign:" << topicAlias << ":" << topic; |
| 481 | m_publishAliases[topicAlias - 1] = topic; |
| 482 | packet->append(data: topic.name().toUtf8()); |
| 483 | } else { |
| 484 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: Reuse:" << topicAlias; |
| 485 | packet->append(value: quint16(0)); |
| 486 | } |
| 487 | } else if (m_publishAliases.size() > 0) { // Automatic module alias assignment |
| 488 | int autoAlias = m_publishAliases.indexOf(t: topic); |
| 489 | if (autoAlias != -1) { |
| 490 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: Use auto alias:" << autoAlias; |
| 491 | packet->append(value: quint16(0)); |
| 492 | publishProperties.setTopicAlias(quint16(autoAlias + 1)); |
| 493 | } else { |
| 494 | autoAlias = m_publishAliases.indexOf(t: QMqttTopicName()); |
| 495 | if (autoAlias != -1) { |
| 496 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: auto alias assignment:" << autoAlias; |
| 497 | m_publishAliases[autoAlias] = topic; |
| 498 | publishProperties.setTopicAlias(quint16(autoAlias) + 1); |
| 499 | } else |
| 500 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: alias storage full, using full topic" ; |
| 501 | packet->append(data: topic.name().toUtf8()); |
| 502 | } |
| 503 | } else { |
| 504 | packet->append(data: topic.name().toUtf8()); |
| 505 | } |
| 506 | } else { // ! MQTT_5_0 |
| 507 | packet->append(data: topic.name().toUtf8()); |
| 508 | } |
| 509 | quint16 identifier = 0; |
| 510 | if (qos > 0) { |
| 511 | identifier = unusedPacketIdentifier(); |
| 512 | packet->append(value: identifier); |
| 513 | m_pendingMessages.insert(key: identifier, value: packet); |
| 514 | } |
| 515 | |
| 516 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 517 | packet->appendRaw(data: writePublishProperties(properties: publishProperties)); |
| 518 | |
| 519 | packet->appendRaw(data: message); |
| 520 | |
| 521 | const bool written = writePacketToTransport(p: *packet.data()); |
| 522 | |
| 523 | if (!written && qos > 0) |
| 524 | m_pendingMessages.remove(key: identifier); |
| 525 | return written ? identifier : -1; |
| 526 | } |
| 527 | |
| 528 | bool QMqttConnection::sendControlPublishAcknowledge(quint16 id) |
| 529 | { |
| 530 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
| 531 | QMqttControlPacket packet(QMqttControlPacket::PUBACK); |
| 532 | packet.append(value: id); |
| 533 | return writePacketToTransport(p: packet); |
| 534 | } |
| 535 | |
| 536 | bool QMqttConnection::sendControlPublishRelease(quint16 id) |
| 537 | { |
| 538 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
| 539 | quint8 = QMqttControlPacket::PUBREL; |
| 540 | header |= 0x02; // MQTT-3.6.1-1 |
| 541 | |
| 542 | QMqttControlPacket packet(header); |
| 543 | packet.append(value: id); |
| 544 | return writePacketToTransport(p: packet); |
| 545 | } |
| 546 | |
| 547 | bool QMqttConnection::sendControlPublishReceive(quint16 id) |
| 548 | { |
| 549 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
| 550 | QMqttControlPacket packet(QMqttControlPacket::PUBREC); |
| 551 | packet.append(value: id); |
| 552 | return writePacketToTransport(p: packet); |
| 553 | } |
| 554 | |
| 555 | bool QMqttConnection::sendControlPublishComp(quint16 id) |
| 556 | { |
| 557 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
| 558 | QMqttControlPacket packet(QMqttControlPacket::PUBCOMP); |
| 559 | packet.append(value: id); |
| 560 | return writePacketToTransport(p: packet); |
| 561 | } |
| 562 | |
| 563 | QMqttSubscription *QMqttConnection::sendControlSubscribe(const QMqttTopicFilter &topic, |
| 564 | quint8 qos, |
| 565 | const QMqttSubscriptionProperties &properties) |
| 566 | { |
| 567 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic << " qos:" << qos; |
| 568 | |
| 569 | // Overflow protection |
| 570 | if (Q_UNLIKELY(!topic.isValid())) { |
| 571 | qCWarning(lcMqttConnection) << "Invalid subscription topic filter." ; |
| 572 | return nullptr; |
| 573 | } |
| 574 | |
| 575 | if (Q_UNLIKELY(qos > 2)) { |
| 576 | qCWarning(lcMqttConnection) << "Invalid subscription QoS." ; |
| 577 | return nullptr; |
| 578 | } |
| 579 | |
| 580 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 581 | const QString sharedSubscriptionName = topic.sharedSubscriptionName(); |
| 582 | if (!sharedSubscriptionName.isEmpty()) { |
| 583 | const QMqttTopicFilter filter(topic.filter().section(asep: QLatin1Char('/'), astart: 2)); |
| 584 | auto sub = findActiveSubscription(topic: filter); |
| 585 | if (sub && (sub->sharedSubscriptionName() == sharedSubscriptionName)) |
| 586 | return sub; |
| 587 | } else { |
| 588 | auto sub = findActiveSubscription(topic); |
| 589 | if (sub && !sub->isSharedSubscription()) |
| 590 | return sub; |
| 591 | } |
| 592 | } else { |
| 593 | auto sub = findActiveSubscription(topic); |
| 594 | if (sub) |
| 595 | return sub; |
| 596 | } |
| 597 | |
| 598 | // has to have 0010 as bits 3-0, maybe update SUBSCRIBE instead? |
| 599 | // MQTT-3.8.1-1 |
| 600 | const quint8 = QMqttControlPacket::SUBSCRIBE + 0x02; |
| 601 | QMqttControlPacket packet(header); |
| 602 | |
| 603 | // Add Packet Identifier |
| 604 | const quint16 identifier = unusedPacketIdentifier(); |
| 605 | |
| 606 | packet.append(value: identifier); |
| 607 | |
| 608 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 609 | packet.appendRaw(data: writeSubscriptionProperties(properties)); |
| 610 | |
| 611 | packet.append(data: topic.filter().toUtf8()); |
| 612 | char options = char(qos); |
| 613 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && properties.noLocal()) |
| 614 | options |= 1 << 2; |
| 615 | packet.append(value: options); |
| 616 | |
| 617 | auto result = new QMqttSubscription(this); |
| 618 | result->setTopic(topic); |
| 619 | result->setClient(m_clientPrivate->m_client); |
| 620 | result->setQos(qos); |
| 621 | result->setState(QMqttSubscription::SubscriptionPending); |
| 622 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && !topic.sharedSubscriptionName().isEmpty()) { |
| 623 | result->setSharedSubscriptionName(topic.sharedSubscriptionName()); |
| 624 | result->setSharedSubscription(true); |
| 625 | result->setTopic(topic.filter().section(asep: QLatin1Char('/'), astart: 2)); |
| 626 | } |
| 627 | |
| 628 | if (!writePacketToTransport(p: packet)) { |
| 629 | delete result; |
| 630 | return nullptr; |
| 631 | } |
| 632 | |
| 633 | // SUBACK must contain identifier MQTT-3.8.4-2 |
| 634 | m_pendingSubscriptionAck.insert(key: identifier, value: result); |
| 635 | m_activeSubscriptions.insert(key: result->topic(), value: result); |
| 636 | return result; |
| 637 | } |
| 638 | |
| 639 | bool QMqttConnection::sendControlUnsubscribe(const QMqttTopicFilter &topic, const QMqttUnsubscriptionProperties &properties) |
| 640 | { |
| 641 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic; |
| 642 | |
| 643 | // MQTT-3.10.3-2 |
| 644 | if (!topic.isValid()) |
| 645 | return false; |
| 646 | |
| 647 | auto sub = findActiveSubscription(topic); |
| 648 | if (!sub) |
| 649 | // Already unsubscribed |
| 650 | return false; |
| 651 | |
| 652 | if (m_internalState != QMqttConnection::BrokerConnected) { |
| 653 | m_activeSubscriptions.remove(key: topic, value: sub); |
| 654 | return true; |
| 655 | } |
| 656 | |
| 657 | // has to have 0010 as bits 3-0, maybe update UNSUBSCRIBE instead? |
| 658 | // MQTT-3.10.1-1 |
| 659 | const quint8 = QMqttControlPacket::UNSUBSCRIBE + 0x02; |
| 660 | QMqttControlPacket packet(header); |
| 661 | |
| 662 | // Add Packet Identifier |
| 663 | const quint16 identifier = unusedPacketIdentifier(); |
| 664 | |
| 665 | packet.append(value: identifier); |
| 666 | |
| 667 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 668 | packet.appendRaw(data: writeUnsubscriptionProperties(properties)); |
| 669 | } |
| 670 | |
| 671 | packet.append(data: topic.filter().toUtf8()); |
| 672 | sub->setState(QMqttSubscription::UnsubscriptionPending); |
| 673 | |
| 674 | if (!writePacketToTransport(p: packet)) |
| 675 | return false; |
| 676 | |
| 677 | // Do not remove from m_activeSubscriptions as there might be QoS1/2 messages to still |
| 678 | // be sent before UNSUBSCRIBE is acknowledged. |
| 679 | m_pendingUnsubscriptions.insert(key: identifier, value: sub); |
| 680 | |
| 681 | return true; |
| 682 | } |
| 683 | |
| 684 | bool QMqttConnection::sendControlPingRequest(bool isAuto) |
| 685 | { |
| 686 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
| 687 | |
| 688 | if (m_internalState != QMqttConnection::BrokerConnected) |
| 689 | return false; |
| 690 | |
| 691 | |
| 692 | if (!isAuto && m_clientPrivate->m_autoKeepAlive) { |
| 693 | qCDebug(lcMqttConnection) << "Requesting a manual ping while autoKeepAlive is enabled " |
| 694 | << "is not allowed." ; |
| 695 | return false; |
| 696 | } |
| 697 | |
| 698 | // 3.1.2.10 If a Client does not receive a PINGRESP packet within a reasonable amount of time |
| 699 | // after it has sent a PINGREQ, it SHOULD close the Network Connection to the Server |
| 700 | if (m_pingTimeout > 1) { |
| 701 | closeConnection(error: QMqttClient::ServerUnavailable); |
| 702 | return false; |
| 703 | } |
| 704 | |
| 705 | const QMqttControlPacket packet(QMqttControlPacket::PINGREQ); |
| 706 | if (!writePacketToTransport(p: packet)) { |
| 707 | qCDebug(lcMqttConnection) << "Failed to write PINGREQ to transport." ; |
| 708 | return false; |
| 709 | } |
| 710 | m_pingTimeout++; |
| 711 | return true; |
| 712 | } |
| 713 | |
| 714 | bool QMqttConnection::sendControlDisconnect() |
| 715 | { |
| 716 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
| 717 | |
| 718 | m_pingTimer.stop(); |
| 719 | m_pingTimeout = 0; |
| 720 | |
| 721 | m_activeSubscriptions.clear(); |
| 722 | |
| 723 | m_receiveAliases.clear(); |
| 724 | m_publishAliases.clear(); |
| 725 | |
| 726 | const QMqttControlPacket packet(QMqttControlPacket::DISCONNECT); |
| 727 | if (!writePacketToTransport(p: packet)) { |
| 728 | qCDebug(lcMqttConnection) << "Failed to write DISCONNECT to transport." ; |
| 729 | return false; |
| 730 | } |
| 731 | if (m_internalState != ClientDestruction) |
| 732 | m_internalState = BrokerDisconnected; |
| 733 | |
| 734 | if (m_transport->waitForBytesWritten(msecs: 30000)) { |
| 735 | // MQTT-3.14.4-1 must disconnect |
| 736 | m_transport->close(); |
| 737 | return true; |
| 738 | } |
| 739 | return false; |
| 740 | } |
| 741 | |
| 742 | void QMqttConnection::setClientPrivate(QMqttClientPrivate *clientPrivate) |
| 743 | { |
| 744 | m_clientPrivate = clientPrivate; |
| 745 | } |
| 746 | |
| 747 | quint16 QMqttConnection::unusedPacketIdentifier() const |
| 748 | { |
| 749 | // MQTT-2.3.1-1 Control Packets MUST contain a non-zero 16-bit Packet Identifier |
| 750 | static quint16 packetIdentifierCounter = 1; |
| 751 | const std::uint16_t u16max = std::numeric_limits<std::uint16_t>::max(); |
| 752 | |
| 753 | // MQTT-2.3.1-2 ...it MUST assign it a currently unused Packet Identifier |
| 754 | const quint16 lastIdentifier = packetIdentifierCounter; |
| 755 | do { |
| 756 | if (packetIdentifierCounter == u16max) |
| 757 | packetIdentifierCounter = 1; |
| 758 | else |
| 759 | packetIdentifierCounter++; |
| 760 | |
| 761 | if (lastIdentifier == packetIdentifierCounter) { |
| 762 | qCDebug(lcMqttConnection) << "Could not generate unique packet identifier." ; |
| 763 | break; |
| 764 | } |
| 765 | } while (m_pendingSubscriptionAck.contains(key: packetIdentifierCounter) |
| 766 | || m_pendingUnsubscriptions.contains(key: packetIdentifierCounter) |
| 767 | || m_pendingMessages.contains(key: packetIdentifierCounter) |
| 768 | || m_pendingReleaseMessages.contains(key: packetIdentifierCounter)); |
| 769 | return packetIdentifierCounter; |
| 770 | } |
| 771 | |
| 772 | void QMqttConnection::cleanSubscriptions() |
| 773 | { |
| 774 | for (auto item : m_pendingSubscriptionAck) |
| 775 | item->setState(QMqttSubscription::Unsubscribed); |
| 776 | m_pendingSubscriptionAck.clear(); |
| 777 | |
| 778 | for (auto item : m_pendingUnsubscriptions) |
| 779 | item->setState(QMqttSubscription::Unsubscribed); |
| 780 | m_pendingUnsubscriptions.clear(); |
| 781 | |
| 782 | for (auto item : m_activeSubscriptions) |
| 783 | item->setState(QMqttSubscription::Unsubscribed); |
| 784 | m_activeSubscriptions.clear(); |
| 785 | } |
| 786 | |
| 787 | void QMqttConnection::transportConnectionEstablished() |
| 788 | { |
| 789 | if (m_internalState != BrokerConnecting) { |
| 790 | qCWarning(lcMqttConnection) << "Connection established at an unexpected time" ; |
| 791 | return; |
| 792 | } |
| 793 | |
| 794 | if (!sendControlConnect()) { |
| 795 | qCDebug(lcMqttConnection) << "Failed to write CONNECT to transport." ; |
| 796 | // ### Who disconnects now? Connection or client? |
| 797 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::TransportInvalid); |
| 798 | } |
| 799 | } |
| 800 | |
| 801 | void QMqttConnection::transportConnectionClosed() |
| 802 | { |
| 803 | m_readBuffer.clear(); |
| 804 | m_readPosition = 0; |
| 805 | m_pingTimer.stop(); |
| 806 | m_pingTimeout = 0; |
| 807 | if (m_internalState == ClientDestruction) |
| 808 | return; |
| 809 | if (m_internalState == BrokerDisconnected) // We manually disconnected |
| 810 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::NoError); |
| 811 | else |
| 812 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::TransportInvalid); |
| 813 | } |
| 814 | |
| 815 | void QMqttConnection::transportReadyRead() |
| 816 | { |
| 817 | qCDebug(lcMqttConnectionVerbose) << Q_FUNC_INFO; |
| 818 | m_readBuffer.append(a: m_transport->readAll()); |
| 819 | processData(); |
| 820 | } |
| 821 | |
| 822 | void QMqttConnection::transportError(QAbstractSocket::SocketError e) |
| 823 | { |
| 824 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << e; |
| 825 | closeConnection(error: QMqttClient::TransportInvalid); |
| 826 | } |
| 827 | |
| 828 | bool QMqttConnection::readBuffer(char *data, quint64 size) |
| 829 | { |
| 830 | if (Q_UNLIKELY(quint64(m_readBuffer.size() - m_readPosition) < size)) { |
| 831 | qCDebug(lcMqttConnection) << "Reaching out of buffer, protocol violation" ; |
| 832 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 833 | return false; |
| 834 | } |
| 835 | memcpy(dest: data, src: m_readBuffer.constData() + m_readPosition, n: size); |
| 836 | m_readPosition += size; |
| 837 | return true; |
| 838 | } |
| 839 | |
| 840 | qint32 QMqttConnection::readVariableByteInteger(qint64 *dataSize) |
| 841 | { |
| 842 | quint32 multiplier = 1; |
| 843 | qint32 msgLength = 0; |
| 844 | quint8 b = 0; |
| 845 | quint8 iteration = 0; |
| 846 | do { |
| 847 | b = readBufferTyped<quint8>(dataSize); |
| 848 | msgLength += (b & 127) * multiplier; |
| 849 | multiplier *= 128; |
| 850 | iteration++; |
| 851 | if (iteration > 4) { |
| 852 | qCDebug(lcMqttConnection) << "Overflow trying to read variable integer." ; |
| 853 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 854 | return -1; |
| 855 | } |
| 856 | } while ((b & 128) != 0); |
| 857 | return msgLength; |
| 858 | } |
| 859 | |
| 860 | void QMqttConnection::closeConnection(QMqttClient::ClientError error) |
| 861 | { |
| 862 | m_readBuffer.clear(); |
| 863 | m_readPosition = 0; |
| 864 | m_pingTimer.stop(); |
| 865 | m_pingTimeout = 0; |
| 866 | m_activeSubscriptions.clear(); |
| 867 | m_internalState = BrokerDisconnected; |
| 868 | m_transport->disconnect(); |
| 869 | m_transport->close(); |
| 870 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: error); |
| 871 | } |
| 872 | |
| 873 | QByteArray QMqttConnection::readBuffer(quint64 size) |
| 874 | { |
| 875 | if (Q_UNLIKELY(quint64(m_readBuffer.size() - m_readPosition) < size)) { |
| 876 | qCDebug(lcMqttConnection) << "Reaching out of buffer, protocol violation" ; |
| 877 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 878 | return QByteArray(); |
| 879 | } |
| 880 | QByteArray res(m_readBuffer.constData() + m_readPosition, int(size)); |
| 881 | m_readPosition += size; |
| 882 | return res; |
| 883 | } |
| 884 | |
| 885 | void QMqttConnection::readAuthProperties(QMqttAuthenticationProperties &properties) |
| 886 | { |
| 887 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
| 888 | m_missingData -= propertyLength; |
| 889 | |
| 890 | QMqttUserProperties userProperties; |
| 891 | while (propertyLength > 0) { |
| 892 | quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 893 | |
| 894 | switch (propertyId) { |
| 895 | case 0x15: { //3.15.2.2.2 Authentication Method |
| 896 | const QString method = readBufferTyped<QString>(dataSize: &propertyLength); |
| 897 | properties.setAuthenticationMethod(method); |
| 898 | break; |
| 899 | } |
| 900 | case 0x16: { // 3.15.2.2.3 Authentication Data |
| 901 | const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength); |
| 902 | properties.setAuthenticationData(data); |
| 903 | break; |
| 904 | } |
| 905 | case 0x1F: { // 3.15.2.2.4 Reason String |
| 906 | const QString reasonString = readBufferTyped<QString>(dataSize: &propertyLength); |
| 907 | properties.setReason(reasonString); |
| 908 | break; |
| 909 | } |
| 910 | case 0x26: { // 3.15.2.2.5 User property |
| 911 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
| 912 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
| 913 | |
| 914 | userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
| 915 | break; |
| 916 | } |
| 917 | default: |
| 918 | qCDebug(lcMqttConnection) << "Unknown property id in AUTH:" << propertyId; |
| 919 | break; |
| 920 | } |
| 921 | } |
| 922 | if (!userProperties.isEmpty()) |
| 923 | properties.setUserProperties(userProperties); |
| 924 | } |
| 925 | |
| 926 | void QMqttConnection::readConnackProperties(QMqttServerConnectionProperties &properties) |
| 927 | { |
| 928 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
| 929 | m_missingData -= propertyLength; |
| 930 | |
| 931 | properties.serverData->valid = true; |
| 932 | |
| 933 | while (propertyLength > 0) { |
| 934 | quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 935 | switch (propertyId) { |
| 936 | case 0x11: { // 3.2.2.3.2 Session Expiry Interval |
| 937 | const quint32 expiryInterval = readBufferTyped<quint32>(dataSize: &propertyLength); |
| 938 | properties.serverData->details |= QMqttServerConnectionProperties::SessionExpiryInterval; |
| 939 | properties.setSessionExpiryInterval(expiryInterval); |
| 940 | break; |
| 941 | } |
| 942 | case 0x21: { // 3.2.2.3.3 Receive Maximum |
| 943 | const quint16 receiveMaximum = readBufferTyped<quint16>(dataSize: &propertyLength); |
| 944 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumReceive; |
| 945 | properties.setMaximumReceive(receiveMaximum); |
| 946 | break; |
| 947 | } |
| 948 | case 0x24: { // 3.2.2.3.4 Maximum QoS Level |
| 949 | const quint8 maxQoS = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 950 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumQoS; |
| 951 | properties.serverData->maximumQoS = maxQoS; |
| 952 | break; |
| 953 | } |
| 954 | case 0x25: { // 3.2.2.3.5 Retain available |
| 955 | const quint8 retainAvailable = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 956 | properties.serverData->details |= QMqttServerConnectionProperties::RetainAvailable; |
| 957 | properties.serverData->retainAvailable = retainAvailable == 1; |
| 958 | break; |
| 959 | } |
| 960 | case 0x27: { // 3.2.2.3.6 Maximum packet size |
| 961 | const quint32 maxPacketSize = readBufferTyped<quint32>(dataSize: &propertyLength); |
| 962 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumPacketSize; |
| 963 | properties.setMaximumPacketSize(maxPacketSize); |
| 964 | break; |
| 965 | } |
| 966 | case 0x12: { // 3.2.2.3.7 Assigned clientId |
| 967 | const QString assignedClientId = readBufferTyped<QString>(dataSize: &propertyLength); |
| 968 | properties.serverData->details |= QMqttServerConnectionProperties::AssignedClientId; |
| 969 | m_clientPrivate->setClientId(assignedClientId); |
| 970 | break; |
| 971 | } |
| 972 | case 0x22: { // 3.2.2.3.8 Topic Alias Maximum |
| 973 | const quint16 topicAliasMaximum = readBufferTyped<quint16>(dataSize: &propertyLength); |
| 974 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumTopicAlias; |
| 975 | properties.setMaximumTopicAlias(topicAliasMaximum); |
| 976 | break; |
| 977 | } |
| 978 | case 0x1F: { // 3.2.2.3.9 Reason String |
| 979 | const QString reasonString = readBufferTyped<QString>(dataSize: &propertyLength); |
| 980 | properties.serverData->details |= QMqttServerConnectionProperties::ReasonString; |
| 981 | properties.serverData->reasonString = reasonString; |
| 982 | break; |
| 983 | } |
| 984 | case 0x26: { // 3.2.2.3.10 User property |
| 985 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
| 986 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
| 987 | |
| 988 | properties.serverData->details |= QMqttServerConnectionProperties::UserProperty; |
| 989 | properties.data->userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
| 990 | break; |
| 991 | } |
| 992 | case 0x28: { // 3.2.2.3.11 Wildcard subscriptions available |
| 993 | const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 994 | properties.serverData->details |= QMqttServerConnectionProperties::WildCardSupported; |
| 995 | properties.serverData->wildcardSupported = available == 1; |
| 996 | break; |
| 997 | } |
| 998 | case 0x29: { // 3.2.2.3.12 Subscription identifiers available |
| 999 | const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 1000 | properties.serverData->details |= QMqttServerConnectionProperties::SubscriptionIdentifierSupport; |
| 1001 | properties.serverData->subscriptionIdentifierSupported = available == 1; |
| 1002 | break; |
| 1003 | } |
| 1004 | case 0x2A: { // 3.2.2.3.13 Shared subscriptions available |
| 1005 | const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 1006 | properties.serverData->details |= QMqttServerConnectionProperties::SharedSubscriptionSupport; |
| 1007 | properties.serverData->sharedSubscriptionSupported = available == 1; |
| 1008 | break; |
| 1009 | } |
| 1010 | case 0x13: { // 3.2.2.3.14 Server Keep Alive |
| 1011 | const quint16 serverKeepAlive = readBufferTyped<quint16>(dataSize: &propertyLength); |
| 1012 | properties.serverData->details |= QMqttServerConnectionProperties::ServerKeepAlive; |
| 1013 | m_clientPrivate->m_client->setKeepAlive(serverKeepAlive); |
| 1014 | break; |
| 1015 | } |
| 1016 | case 0x1A: { // 3.2.2.3.15 Response information |
| 1017 | const QString responseInfo = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1018 | properties.serverData->details |= QMqttServerConnectionProperties::ResponseInformation; |
| 1019 | properties.serverData->responseInformation = responseInfo; |
| 1020 | break; |
| 1021 | } |
| 1022 | case 0x1C: { // 3.2.2.3.16 Server reference |
| 1023 | const QString serverReference = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1024 | properties.serverData->details |= QMqttServerConnectionProperties::ServerReference; |
| 1025 | properties.serverData->serverReference = serverReference; |
| 1026 | break; |
| 1027 | } |
| 1028 | case 0x15: { // 3.2.2.3.17 Authentication method |
| 1029 | const QString method = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1030 | properties.serverData->details |= QMqttServerConnectionProperties::AuthenticationMethod; |
| 1031 | properties.data->authenticationMethod = method; |
| 1032 | break; |
| 1033 | } |
| 1034 | case 0x16: { // 3.2.2.3.18 Authentication data |
| 1035 | const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength); |
| 1036 | properties.serverData->details |= QMqttServerConnectionProperties::AuthenticationData; |
| 1037 | properties.data->authenticationData = data; |
| 1038 | break; |
| 1039 | } |
| 1040 | default: |
| 1041 | qCDebug(lcMqttConnection) << "Unknown property id in CONNACK:" << int(propertyId); |
| 1042 | break; |
| 1043 | } |
| 1044 | } |
| 1045 | } |
| 1046 | |
| 1047 | void QMqttConnection::readMessageStatusProperties(QMqttMessageStatusProperties &properties) |
| 1048 | { |
| 1049 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
| 1050 | m_missingData -= propertyLength; |
| 1051 | |
| 1052 | while (propertyLength > 0) { |
| 1053 | const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 1054 | switch (propertyId) { |
| 1055 | case 0x1f: { // 3.4.2.2.2 Reason String |
| 1056 | const QString content = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1057 | properties.data->reasonString = content; |
| 1058 | break; |
| 1059 | } |
| 1060 | case 0x26: { // 3.4.2.2.3 User Properites |
| 1061 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1062 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1063 | properties.data->userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
| 1064 | break; |
| 1065 | } |
| 1066 | default: |
| 1067 | qCDebug(lcMqttConnection) << "Unknown subscription property received." ; |
| 1068 | break; |
| 1069 | } |
| 1070 | } |
| 1071 | } |
| 1072 | |
| 1073 | void QMqttConnection::readPublishProperties(QMqttPublishProperties &properties) |
| 1074 | { |
| 1075 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
| 1076 | m_missingData -= propertyLength; |
| 1077 | |
| 1078 | QMqttUserProperties userProperties; |
| 1079 | QList<quint32> subscriptionIds; |
| 1080 | |
| 1081 | while (propertyLength > 0) { |
| 1082 | const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 1083 | switch (propertyId) { |
| 1084 | case 0x01: { // 3.3.2.3.2 Payload Format Indicator |
| 1085 | const quint8 format = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 1086 | if (format == 1) |
| 1087 | properties.setPayloadFormatIndicator(QMqtt::PayloadFormatIndicator::UTF8Encoded); |
| 1088 | break; |
| 1089 | } |
| 1090 | case 0x02: { // 3.3.2.3.3 Message Expiry Interval |
| 1091 | const quint32 interval = readBufferTyped<quint32>(dataSize: &propertyLength); |
| 1092 | properties.setMessageExpiryInterval(interval); |
| 1093 | break; |
| 1094 | } |
| 1095 | case 0x23: { // 3.3.2.3.4 Topic alias |
| 1096 | const quint16 alias = readBufferTyped<quint16>(dataSize: &propertyLength); |
| 1097 | properties.setTopicAlias(alias); |
| 1098 | break; |
| 1099 | } |
| 1100 | case 0x08: { // 3.3.2.3.5 Response Topic |
| 1101 | const QString responseTopic = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1102 | properties.setResponseTopic(responseTopic); |
| 1103 | break; |
| 1104 | } |
| 1105 | case 0x09: { // 3.3.2.3.6 Correlation Data |
| 1106 | const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength); |
| 1107 | properties.setCorrelationData(data); |
| 1108 | break; |
| 1109 | } |
| 1110 | case 0x26: { // 3.3.2.3.7 User property |
| 1111 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1112 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1113 | userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
| 1114 | break; |
| 1115 | } |
| 1116 | case 0x0b: { // 3.3.2.3.8 Subscription Identifier |
| 1117 | qint32 id = readVariableByteInteger(dataSize: &propertyLength); |
| 1118 | if (id < 0) |
| 1119 | return; // readVariableByteInteger closes connection |
| 1120 | subscriptionIds.append(t: quint32(id)); |
| 1121 | break; |
| 1122 | } |
| 1123 | case 0x03: { // 3.3.2.3.9 Content Type |
| 1124 | const QString content = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1125 | properties.setContentType(content); |
| 1126 | break; |
| 1127 | } |
| 1128 | default: |
| 1129 | qCDebug(lcMqttConnection) << "Unknown publish property received." ; |
| 1130 | break; |
| 1131 | } |
| 1132 | } |
| 1133 | if (!userProperties.isEmpty()) |
| 1134 | properties.setUserProperties(userProperties); |
| 1135 | |
| 1136 | if (!subscriptionIds.isEmpty()) |
| 1137 | properties.setSubscriptionIdentifiers(subscriptionIds); |
| 1138 | } |
| 1139 | |
| 1140 | void QMqttConnection::readSubscriptionProperties(QMqttSubscription *sub) |
| 1141 | { |
| 1142 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
| 1143 | m_missingData -= propertyLength; |
| 1144 | |
| 1145 | while (propertyLength > 0) { |
| 1146 | const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
| 1147 | switch (propertyId) { |
| 1148 | case 0x1f: { // 3.9.2.1.2 Reason String |
| 1149 | const QString content = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1150 | sub->d_func()->m_reasonString = content; |
| 1151 | break; |
| 1152 | } |
| 1153 | case 0x26: { // 3.9.2.1.3 |
| 1154 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1155 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
| 1156 | |
| 1157 | sub->d_func()->m_userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
| 1158 | break; |
| 1159 | } |
| 1160 | default: |
| 1161 | qCDebug(lcMqttConnection) << "Unknown subscription property received." ; |
| 1162 | break; |
| 1163 | } |
| 1164 | } |
| 1165 | } |
| 1166 | |
| 1167 | QByteArray QMqttConnection::writeConnectProperties() |
| 1168 | { |
| 1169 | QMqttControlPacket properties; |
| 1170 | |
| 1171 | // According to MQTT5 3.1.2.11 default values do not need to be included in the |
| 1172 | // connect statement. |
| 1173 | |
| 1174 | // 3.1.2.11.2 |
| 1175 | if (m_clientPrivate->m_connectionProperties.sessionExpiryInterval() != 0) { |
| 1176 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify sessionExpiryInterval" ; |
| 1177 | properties.append(value: char(0x11)); |
| 1178 | properties.append(value: m_clientPrivate->m_connectionProperties.sessionExpiryInterval()); |
| 1179 | } |
| 1180 | |
| 1181 | // 3.1.2.11.3 |
| 1182 | if (m_clientPrivate->m_connectionProperties.maximumReceive() != 65535) { |
| 1183 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumReceive" ; |
| 1184 | properties.append(value: char(0x21)); |
| 1185 | properties.append(value: m_clientPrivate->m_connectionProperties.maximumReceive()); |
| 1186 | } |
| 1187 | |
| 1188 | // 3.1.2.11.4 |
| 1189 | if (m_clientPrivate->m_connectionProperties.maximumPacketSize() != std::numeric_limits<quint32>::max()) { |
| 1190 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumPacketSize" ; |
| 1191 | properties.append(value: char(0x27)); |
| 1192 | properties.append(value: m_clientPrivate->m_connectionProperties.maximumPacketSize()); |
| 1193 | } |
| 1194 | |
| 1195 | // 3.1.2.11.5 |
| 1196 | if (m_clientPrivate->m_connectionProperties.maximumTopicAlias() != 0) { |
| 1197 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumTopicAlias" ; |
| 1198 | properties.append(value: char(0x22)); |
| 1199 | properties.append(value: m_clientPrivate->m_connectionProperties.maximumTopicAlias()); |
| 1200 | } |
| 1201 | |
| 1202 | // 3.1.2.11.6 |
| 1203 | if (m_clientPrivate->m_connectionProperties.requestResponseInformation()) { |
| 1204 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify requestResponseInformation" ; |
| 1205 | properties.append(value: char(0x19)); |
| 1206 | properties.append(value: char(1)); |
| 1207 | } |
| 1208 | |
| 1209 | // 3.1.2.11.7 |
| 1210 | if (!m_clientPrivate->m_connectionProperties.requestProblemInformation()) { |
| 1211 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify requestProblemInformation" ; |
| 1212 | properties.append(value: char(0x17)); |
| 1213 | properties.append(value: char(0)); |
| 1214 | } |
| 1215 | |
| 1216 | // 3.1.2.11.8 Add User properties |
| 1217 | auto userProperties = m_clientPrivate->m_connectionProperties.userProperties(); |
| 1218 | if (!userProperties.isEmpty()) { |
| 1219 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify user properties" ; |
| 1220 | for (const auto &prop : userProperties) { |
| 1221 | properties.append(value: char(0x26)); |
| 1222 | properties.append(data: prop.name().toUtf8()); |
| 1223 | properties.append(data: prop.value().toUtf8()); |
| 1224 | } |
| 1225 | } |
| 1226 | |
| 1227 | // 3.1.2.11.9 Add Authentication |
| 1228 | const QString authenticationMethod = m_clientPrivate->m_connectionProperties.authenticationMethod(); |
| 1229 | if (!authenticationMethod.isEmpty()) { |
| 1230 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify AuthenticationMethod:" ; |
| 1231 | qCDebug(lcMqttConnectionVerbose) << " " << authenticationMethod; |
| 1232 | properties.append(value: char(0x15)); |
| 1233 | properties.append(data: authenticationMethod.toUtf8()); |
| 1234 | // 3.1.2.11.10 |
| 1235 | const QByteArray authenticationData = m_clientPrivate->m_connectionProperties.authenticationData(); |
| 1236 | if (!authenticationData.isEmpty()) { |
| 1237 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: Authentication Data:" ; |
| 1238 | qCDebug(lcMqttConnectionVerbose) << " " << authenticationData; |
| 1239 | properties.append(value: char(0x16)); |
| 1240 | properties.append(data: authenticationData); |
| 1241 | } |
| 1242 | } |
| 1243 | |
| 1244 | return properties.serializePayload(); |
| 1245 | } |
| 1246 | |
| 1247 | QByteArray QMqttConnection::writeLastWillProperties() const |
| 1248 | { |
| 1249 | QMqttControlPacket properties; |
| 1250 | const QMqttLastWillProperties &lastWillProperties = m_clientPrivate->m_lastWillProperties; |
| 1251 | // Will Delay interval 3.1.3.2.2 |
| 1252 | if (lastWillProperties.willDelayInterval() > 0) { |
| 1253 | const quint32 delay = lastWillProperties.willDelayInterval(); |
| 1254 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: specify will delay interval:" |
| 1255 | << delay; |
| 1256 | properties.append(value: char(0x18)); |
| 1257 | properties.append(value: delay); |
| 1258 | } |
| 1259 | |
| 1260 | // Payload Format Indicator 3.1.3.2.3 |
| 1261 | if (lastWillProperties.payloadFormatIndicator() != QMqtt::PayloadFormatIndicator::Unspecified) { |
| 1262 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: payload format indicator specified" ; |
| 1263 | properties.append(value: char(0x01)); |
| 1264 | properties.append(value: char(0x01)); // UTF8 |
| 1265 | } |
| 1266 | |
| 1267 | // Message Expiry Interval 3.1.3.2.4 |
| 1268 | if (lastWillProperties.messageExpiryInterval() > 0) { |
| 1269 | const quint32 interval = lastWillProperties.messageExpiryInterval(); |
| 1270 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Message Expiry interval:" |
| 1271 | << interval; |
| 1272 | properties.append(value: char(0x02)); |
| 1273 | properties.append(value: interval); |
| 1274 | } |
| 1275 | |
| 1276 | // Content Type 3.1.3.2.5 |
| 1277 | if (!lastWillProperties.contentType().isEmpty()) { |
| 1278 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Content Type:" |
| 1279 | << lastWillProperties.contentType(); |
| 1280 | properties.append(value: char(0x03)); |
| 1281 | properties.append(data: lastWillProperties.contentType().toUtf8()); |
| 1282 | } |
| 1283 | |
| 1284 | // Response Topic 3.1.3.2.6 |
| 1285 | if (!lastWillProperties.responseTopic().isEmpty()) { |
| 1286 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Response Topic:" |
| 1287 | << lastWillProperties.responseTopic(); |
| 1288 | properties.append(value: char(0x08)); |
| 1289 | properties.append(data: lastWillProperties.responseTopic().toUtf8()); |
| 1290 | } |
| 1291 | |
| 1292 | // Correlation Data 3.1.3.2.7 |
| 1293 | if (!lastWillProperties.correlationData().isEmpty()) { |
| 1294 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Correlation Data:" |
| 1295 | << lastWillProperties.correlationData(); |
| 1296 | properties.append(value: char(0x09)); |
| 1297 | properties.append(data: lastWillProperties.correlationData()); |
| 1298 | } |
| 1299 | |
| 1300 | // User Properties 3.1.3.2.8 |
| 1301 | if (!lastWillProperties.userProperties().isEmpty()) { |
| 1302 | auto userProperties = lastWillProperties.userProperties(); |
| 1303 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: specify user properties" ; |
| 1304 | for (const auto &prop : userProperties) { |
| 1305 | properties.append(value: char(0x26)); |
| 1306 | properties.append(data: prop.name().toUtf8()); |
| 1307 | properties.append(data: prop.value().toUtf8()); |
| 1308 | } |
| 1309 | } |
| 1310 | |
| 1311 | return properties.serializePayload(); |
| 1312 | } |
| 1313 | |
| 1314 | QByteArray QMqttConnection::writePublishProperties(const QMqttPublishProperties &properties) |
| 1315 | { |
| 1316 | QMqttControlPacket packet; |
| 1317 | |
| 1318 | // 3.3.2.3.2 Payload Indicator |
| 1319 | if (properties.availableProperties() & QMqttPublishProperties::PayloadFormatIndicator && |
| 1320 | properties.payloadFormatIndicator() != QMqtt::PayloadFormatIndicator::Unspecified) { |
| 1321 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Payload Indicator:" |
| 1322 | << (properties.payloadFormatIndicator() == QMqtt::PayloadFormatIndicator::UTF8Encoded ? 1 : 0); |
| 1323 | packet.append(value: char(0x01)); |
| 1324 | switch (properties.payloadFormatIndicator()) { |
| 1325 | case QMqtt::PayloadFormatIndicator::UTF8Encoded: |
| 1326 | packet.append(value: char(0x01)); |
| 1327 | break; |
| 1328 | default: |
| 1329 | qCDebug(lcMqttConnection) << "Unknown payload indicator." ; |
| 1330 | break; |
| 1331 | } |
| 1332 | } |
| 1333 | |
| 1334 | // 3.3.2.3.3 Message Expiry |
| 1335 | if (properties.availableProperties() & QMqttPublishProperties::MessageExpiryInterval && |
| 1336 | properties.messageExpiryInterval() > 0) { |
| 1337 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Message Expiry :" |
| 1338 | << properties.messageExpiryInterval(); |
| 1339 | packet.append(value: char(0x02)); |
| 1340 | packet.append(value: properties.messageExpiryInterval()); |
| 1341 | } |
| 1342 | |
| 1343 | // 3.3.2.3.4 Topic alias |
| 1344 | if (properties.availableProperties() & QMqttPublishProperties::TopicAlias && |
| 1345 | properties.topicAlias() > 0) { |
| 1346 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Topic Alias :" |
| 1347 | << properties.topicAlias(); |
| 1348 | if (m_clientPrivate->m_serverConnectionProperties.availableProperties() & QMqttServerConnectionProperties::MaximumTopicAlias |
| 1349 | && properties.topicAlias() > m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()) { |
| 1350 | qCDebug(lcMqttConnection) << "Invalid topic alias specified: " << properties.topicAlias() |
| 1351 | << " Maximum by server is:" |
| 1352 | << m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias(); |
| 1353 | |
| 1354 | } else { |
| 1355 | packet.append(value: char(0x23)); |
| 1356 | packet.append(value: properties.topicAlias()); |
| 1357 | } |
| 1358 | } |
| 1359 | |
| 1360 | // 3.3.2.3.5 Response Topic |
| 1361 | if (properties.availableProperties() & QMqttPublishProperties::ResponseTopic && |
| 1362 | !properties.responseTopic().isEmpty()) { |
| 1363 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Response Topic :" |
| 1364 | << properties.responseTopic(); |
| 1365 | packet.append(value: char(0x08)); |
| 1366 | packet.append(data: properties.responseTopic().toUtf8()); |
| 1367 | } |
| 1368 | |
| 1369 | // 3.3.2.3.6 Correlation Data |
| 1370 | if (properties.availableProperties() & QMqttPublishProperties::CorrelationData && |
| 1371 | !properties.correlationData().isEmpty()) { |
| 1372 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Correlation Data :" |
| 1373 | << properties.correlationData(); |
| 1374 | packet.append(value: char(0x09)); |
| 1375 | packet.append(data: properties.correlationData()); |
| 1376 | } |
| 1377 | |
| 1378 | // 3.3.2.3.7 User Property |
| 1379 | if (properties.availableProperties() & QMqttPublishProperties::UserProperty) { |
| 1380 | auto userProperties = properties.userProperties(); |
| 1381 | if (!userProperties.isEmpty()) { |
| 1382 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: specify user properties" ; |
| 1383 | for (const auto &prop : userProperties) { |
| 1384 | packet.append(value: char(0x26)); |
| 1385 | packet.append(data: prop.name().toUtf8()); |
| 1386 | packet.append(data: prop.value().toUtf8()); |
| 1387 | } |
| 1388 | } |
| 1389 | } |
| 1390 | |
| 1391 | // 3.3.2.3.8 Subscription Identifier |
| 1392 | if (properties.availableProperties() & QMqttPublishProperties::SubscriptionIdentifier) { |
| 1393 | for (auto id : properties.subscriptionIdentifiers()) { |
| 1394 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Subscription ID:" << id; |
| 1395 | packet.append(value: char(0x0b)); |
| 1396 | packet.appendRawVariableInteger(value: id); |
| 1397 | } |
| 1398 | } |
| 1399 | |
| 1400 | // 3.3.2.3.9 Content Type |
| 1401 | if (properties.availableProperties() & QMqttPublishProperties::ContentType && |
| 1402 | !properties.contentType().isEmpty()) { |
| 1403 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Content Type :" |
| 1404 | << properties.contentType(); |
| 1405 | packet.append(value: char(0x03)); |
| 1406 | packet.append(data: properties.contentType().toUtf8()); |
| 1407 | } |
| 1408 | |
| 1409 | return packet.serializePayload(); |
| 1410 | } |
| 1411 | |
| 1412 | QByteArray QMqttConnection::writeSubscriptionProperties(const QMqttSubscriptionProperties &properties) |
| 1413 | { |
| 1414 | QMqttControlPacket packet; |
| 1415 | |
| 1416 | // 3.8.2.1.2 Subscription Identifier |
| 1417 | if (properties.subscriptionIdentifier() > 0) { |
| 1418 | qCDebug(lcMqttConnectionVerbose) << "Subscription Properties: Subscription Identifier" ; |
| 1419 | packet.append(value: char(0x0b)); |
| 1420 | packet.appendRawVariableInteger(value: properties.subscriptionIdentifier()); |
| 1421 | } |
| 1422 | |
| 1423 | // 3.8.2.1.3 User Property |
| 1424 | auto userProperties = properties.userProperties(); |
| 1425 | if (!userProperties.isEmpty()) { |
| 1426 | qCDebug(lcMqttConnectionVerbose) << "Subscription Properties: specify user properties" ; |
| 1427 | for (const auto &prop : userProperties) { |
| 1428 | packet.append(value: char(0x26)); |
| 1429 | packet.append(data: prop.name().toUtf8()); |
| 1430 | packet.append(data: prop.value().toUtf8()); |
| 1431 | } |
| 1432 | } |
| 1433 | |
| 1434 | return packet.serializePayload(); |
| 1435 | } |
| 1436 | |
| 1437 | QByteArray QMqttConnection::writeUnsubscriptionProperties(const QMqttUnsubscriptionProperties &properties) |
| 1438 | { |
| 1439 | QMqttControlPacket packet; |
| 1440 | |
| 1441 | // 3.10.2.1.2 |
| 1442 | auto userProperties = properties.userProperties(); |
| 1443 | if (!userProperties.isEmpty()) { |
| 1444 | qCDebug(lcMqttConnectionVerbose) << "Unsubscription Properties: specify user properties" ; |
| 1445 | for (const auto &prop : userProperties) { |
| 1446 | packet.append(value: char(0x26)); |
| 1447 | packet.append(data: prop.name().toUtf8()); |
| 1448 | packet.append(data: prop.value().toUtf8()); |
| 1449 | } |
| 1450 | } |
| 1451 | |
| 1452 | return packet.serializePayload(); |
| 1453 | } |
| 1454 | |
| 1455 | QByteArray QMqttConnection::writeAuthenticationProperties(const QMqttAuthenticationProperties &properties) |
| 1456 | { |
| 1457 | QMqttControlPacket packet; |
| 1458 | |
| 1459 | // 3.15.2.2.2 |
| 1460 | if (!properties.authenticationMethod().isEmpty()) { |
| 1461 | packet.append(value: char(0x15)); |
| 1462 | packet.append(data: properties.authenticationMethod().toUtf8()); |
| 1463 | } |
| 1464 | // 3.15.2.2.3 |
| 1465 | if (!properties.authenticationData().isEmpty()) { |
| 1466 | packet.append(value: char(0x16)); |
| 1467 | packet.append(data: properties.authenticationData()); |
| 1468 | } |
| 1469 | |
| 1470 | // 3.15.2.2.4 |
| 1471 | if (!properties.reason().isEmpty()) { |
| 1472 | packet.append(value: char(0x1F)); |
| 1473 | packet.append(data: properties.reason().toUtf8()); |
| 1474 | } |
| 1475 | |
| 1476 | // 3.15.2.2.5 |
| 1477 | auto userProperties = properties.userProperties(); |
| 1478 | if (!userProperties.isEmpty()) { |
| 1479 | qCDebug(lcMqttConnectionVerbose) << "Unsubscription Properties: specify user properties" ; |
| 1480 | for (const auto &prop : userProperties) { |
| 1481 | packet.append(value: char(0x26)); |
| 1482 | packet.append(data: prop.name().toUtf8()); |
| 1483 | packet.append(data: prop.value().toUtf8()); |
| 1484 | } |
| 1485 | } |
| 1486 | |
| 1487 | return packet.serializePayload(); |
| 1488 | } |
| 1489 | |
| 1490 | void QMqttConnection::finalize_auth() |
| 1491 | { |
| 1492 | qCDebug(lcMqttConnectionVerbose) << "Finalize AUTH" ; |
| 1493 | |
| 1494 | quint8 authReason = 0; |
| 1495 | QMqttAuthenticationProperties authProperties; |
| 1496 | // 3.15.2.1 - The Reason Code and Property Length can be omitted if the Reason Code |
| 1497 | // is 0x00 (Success) and there are no Properties. In this case the AUTH has a |
| 1498 | // Remaining Length of 0. |
| 1499 | if (m_missingData > 0) { |
| 1500 | authReason = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1501 | readAuthProperties(properties&: authProperties); |
| 1502 | } |
| 1503 | |
| 1504 | // 3.15.2.1 |
| 1505 | switch (QMqtt::ReasonCode(authReason)) { |
| 1506 | case QMqtt::ReasonCode::Success: |
| 1507 | emit m_clientPrivate->m_client->authenticationFinished(p: authProperties); |
| 1508 | break; |
| 1509 | case QMqtt::ReasonCode::ContinueAuthentication: |
| 1510 | case QMqtt::ReasonCode::ReAuthenticate: |
| 1511 | emit m_clientPrivate->m_client->authenticationRequested(p: authProperties); |
| 1512 | break; |
| 1513 | default: |
| 1514 | qCDebug(lcMqttConnection) << "Received illegal AUTH reason code:" << authReason; |
| 1515 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1516 | break; |
| 1517 | } |
| 1518 | } |
| 1519 | |
| 1520 | void QMqttConnection::finalize_connack() |
| 1521 | { |
| 1522 | qCDebug(lcMqttConnectionVerbose) << "Finalize CONNACK" ; |
| 1523 | |
| 1524 | const quint8 ackFlags = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1525 | |
| 1526 | if (ackFlags > 1) { // MQTT-3.2.2.1 |
| 1527 | qCDebug(lcMqttConnection) << "Unexpected CONNACK Flags specified:" << QString::number(ackFlags); |
| 1528 | readBuffer(size: quint64(m_missingData)); |
| 1529 | m_missingData = 0; |
| 1530 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1531 | return; |
| 1532 | } |
| 1533 | bool sessionPresent = ackFlags == 1; |
| 1534 | |
| 1535 | // MQTT-3.2.2-1 & MQTT-3.2.2-2 |
| 1536 | if (sessionPresent) { |
| 1537 | emit m_clientPrivate->m_client->brokerSessionRestored(); |
| 1538 | if (m_clientPrivate->m_cleanSession) |
| 1539 | qCDebug(lcMqttConnection) << "Connected with a clean session, ack contains session present." ; |
| 1540 | } else { |
| 1541 | // MQTT-4.1.0.-1 MQTT-4.1.0-2 Session not stored on broker side |
| 1542 | // regardless whether cleanSession is false |
| 1543 | cleanSubscriptions(); |
| 1544 | } |
| 1545 | |
| 1546 | quint8 connectResultValue = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1547 | QMqttServerConnectionProperties serverProp; |
| 1548 | serverProp.serverData->reasonCode = QMqtt::ReasonCode(connectResultValue); |
| 1549 | m_clientPrivate->m_serverConnectionProperties = serverProp; |
| 1550 | if (connectResultValue != 0 && m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
| 1551 | qCDebug(lcMqttConnection) << "Connection has been rejected." ; |
| 1552 | closeConnection(error: static_cast<QMqttClient::ClientError>(connectResultValue)); |
| 1553 | return; |
| 1554 | } |
| 1555 | |
| 1556 | // MQTT 5.0 has variable part != 2 in the header |
| 1557 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 1558 | readConnackProperties(properties&: m_clientPrivate->m_serverConnectionProperties); |
| 1559 | m_receiveAliases.resize(size: m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()); |
| 1560 | m_publishAliases.resize(size: m_clientPrivate->m_connectionProperties.maximumTopicAlias()); |
| 1561 | |
| 1562 | // 3.2.2.2 |
| 1563 | switch (QMqtt::ReasonCode(connectResultValue)) { |
| 1564 | case QMqtt::ReasonCode::Success: |
| 1565 | break; |
| 1566 | case QMqtt::ReasonCode::MalformedPacket: |
| 1567 | case QMqtt::ReasonCode::ProtocolError: |
| 1568 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1569 | return; |
| 1570 | case QMqtt::ReasonCode::UnsupportedProtocolVersion: |
| 1571 | closeConnection(error: QMqttClient::InvalidProtocolVersion); |
| 1572 | return; |
| 1573 | case QMqtt::ReasonCode::InvalidClientId: |
| 1574 | closeConnection(error: QMqttClient::IdRejected); |
| 1575 | return; |
| 1576 | case QMqtt::ReasonCode::ServerNotAvailable: |
| 1577 | case QMqtt::ReasonCode::ServerBusy: |
| 1578 | case QMqtt::ReasonCode::UseAnotherServer: |
| 1579 | case QMqtt::ReasonCode::ServerMoved: |
| 1580 | closeConnection(error: QMqttClient::ServerUnavailable); |
| 1581 | return; |
| 1582 | case QMqtt::ReasonCode::InvalidUserNameOrPassword: |
| 1583 | closeConnection(error: QMqttClient::BadUsernameOrPassword); |
| 1584 | return; |
| 1585 | case QMqtt::ReasonCode::NotAuthorized: |
| 1586 | closeConnection(error: QMqttClient::NotAuthorized); |
| 1587 | return; |
| 1588 | case QMqtt::ReasonCode::UnspecifiedError: |
| 1589 | closeConnection(error: QMqttClient::UnknownError); |
| 1590 | return; |
| 1591 | case QMqtt::ReasonCode::ImplementationSpecificError: |
| 1592 | case QMqtt::ReasonCode::ClientBanned: |
| 1593 | case QMqtt::ReasonCode::InvalidAuthenticationMethod: |
| 1594 | case QMqtt::ReasonCode::InvalidTopicName: |
| 1595 | case QMqtt::ReasonCode::PacketTooLarge: |
| 1596 | case QMqtt::ReasonCode::QuotaExceeded: |
| 1597 | case QMqtt::ReasonCode::InvalidPayloadFormat: |
| 1598 | case QMqtt::ReasonCode::RetainNotSupported: |
| 1599 | case QMqtt::ReasonCode::QoSNotSupported: |
| 1600 | case QMqtt::ReasonCode::ExceededConnectionRate: |
| 1601 | closeConnection(error: QMqttClient::Mqtt5SpecificError); |
| 1602 | return; |
| 1603 | default: |
| 1604 | qCDebug(lcMqttConnection) << "Received illegal CONNACK reason code:" << connectResultValue; |
| 1605 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1606 | return; |
| 1607 | } |
| 1608 | } |
| 1609 | |
| 1610 | m_internalState = BrokerConnected; |
| 1611 | m_clientPrivate->setStateAndError(s: QMqttClient::Connected); |
| 1612 | |
| 1613 | if (m_clientPrivate->m_autoKeepAlive) |
| 1614 | m_pingTimer.start(msec: m_clientPrivate->m_keepAlive * 1000, obj: this); |
| 1615 | } |
| 1616 | |
| 1617 | void QMqttConnection::finalize_suback() |
| 1618 | { |
| 1619 | const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData); |
| 1620 | |
| 1621 | auto sub = m_pendingSubscriptionAck.take(key: id); |
| 1622 | if (Q_UNLIKELY(sub == nullptr)) { |
| 1623 | qCDebug(lcMqttConnection) << "Received SUBACK for unknown subscription request." ; |
| 1624 | return; |
| 1625 | } |
| 1626 | |
| 1627 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 1628 | readSubscriptionProperties(sub); |
| 1629 | |
| 1630 | // 3.9.3 - The Payload contains a list of Reason Codes. Each Reason Code corresponds to a Topic Filter in the SUBSCRIBE packet being acknowledged. |
| 1631 | // Whereas 3.8.3 states "The Payload MUST contain at least one Topic Filter and Subscription Options pair. A SUBSCRIBE packet with no Payload is a Protocol Error." |
| 1632 | do { |
| 1633 | quint8 reason = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1634 | |
| 1635 | sub->d_func()->m_reasonCode = QMqtt::ReasonCode(reason); |
| 1636 | |
| 1637 | // 3.9.3 |
| 1638 | switch (QMqtt::ReasonCode(reason)) { |
| 1639 | case QMqtt::ReasonCode::SubscriptionQoSLevel0: |
| 1640 | case QMqtt::ReasonCode::SubscriptionQoSLevel1: |
| 1641 | case QMqtt::ReasonCode::SubscriptionQoSLevel2: |
| 1642 | qCDebug(lcMqttConnectionVerbose) << "Finalize SUBACK: id:" << id << "qos:" << reason; |
| 1643 | // The broker might have a different support level for QoS than what |
| 1644 | // the client requested |
| 1645 | if (reason != sub->qos()) { |
| 1646 | sub->setQos(reason); |
| 1647 | emit sub->qosChanged(reason); |
| 1648 | } |
| 1649 | sub->setState(QMqttSubscription::Subscribed); |
| 1650 | break; |
| 1651 | case QMqtt::ReasonCode::UnspecifiedError: |
| 1652 | qCWarning(lcMqttConnection) << "Subscription for id " << id << " failed. Reason Code:" << reason; |
| 1653 | sub->setState(QMqttSubscription::Error); |
| 1654 | break; |
| 1655 | case QMqtt::ReasonCode::ImplementationSpecificError: |
| 1656 | case QMqtt::ReasonCode::NotAuthorized: |
| 1657 | case QMqtt::ReasonCode::InvalidTopicFilter: |
| 1658 | case QMqtt::ReasonCode::MessageIdInUse: |
| 1659 | case QMqtt::ReasonCode::QuotaExceeded: |
| 1660 | case QMqtt::ReasonCode::SharedSubscriptionsNotSupported: |
| 1661 | case QMqtt::ReasonCode::SubscriptionIdsNotSupported: |
| 1662 | case QMqtt::ReasonCode::WildCardSubscriptionsNotSupported: |
| 1663 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 1664 | qCWarning(lcMqttConnection) << "Subscription for id " << id << " failed. Reason Code:" << reason; |
| 1665 | sub->setState(QMqttSubscription::Error); |
| 1666 | break; |
| 1667 | } |
| 1668 | Q_FALLTHROUGH(); |
| 1669 | default: |
| 1670 | qCWarning(lcMqttConnection) << "Received illegal SUBACK reason code:" << reason; |
| 1671 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1672 | break; |
| 1673 | } |
| 1674 | } while (m_missingData > 0); |
| 1675 | } |
| 1676 | |
| 1677 | void QMqttConnection::finalize_unsuback() |
| 1678 | { |
| 1679 | const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData); |
| 1680 | qCDebug(lcMqttConnectionVerbose) << "Finalize UNSUBACK: " << id; |
| 1681 | |
| 1682 | auto sub = m_pendingUnsubscriptions.take(key: id); |
| 1683 | if (Q_UNLIKELY(sub == nullptr)) { |
| 1684 | qCDebug(lcMqttConnection) << "Received UNSUBACK for unknown request." ; |
| 1685 | return; |
| 1686 | } |
| 1687 | |
| 1688 | m_activeSubscriptions.remove(key: sub->topic(), value: sub); |
| 1689 | |
| 1690 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
| 1691 | readSubscriptionProperties(sub); |
| 1692 | } else { |
| 1693 | // 3.11.3 - The UNSUBACK Packet has no payload. |
| 1694 | // emulate successful unsubscription |
| 1695 | sub->d_func()->m_reasonCode = QMqtt::ReasonCode::Success; |
| 1696 | sub->setState(QMqttSubscription::Unsubscribed); |
| 1697 | return; |
| 1698 | } |
| 1699 | |
| 1700 | // 3.1.3 - The Payload contains a list of Reason Codes. Each Reason Code corresponds to a Topic Filter in the UNSUBSCRIBE packet being acknowledged. |
| 1701 | // Whereas 3.10.3 states "The Payload of an UNSUBSCRIBE packet MUST contain at least one Topic Filter. An UNSUBSCRIBE packet with no Payload is a Protocol Error." |
| 1702 | do { |
| 1703 | const quint8 reasonCode = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1704 | sub->d_func()->m_reasonCode = QMqtt::ReasonCode(reasonCode); |
| 1705 | |
| 1706 | // 3.11.3 |
| 1707 | switch (QMqtt::ReasonCode(reasonCode)) { |
| 1708 | case QMqtt::ReasonCode::Success: |
| 1709 | sub->setState(QMqttSubscription::Unsubscribed); |
| 1710 | break; |
| 1711 | case QMqtt::ReasonCode::NoSubscriptionExisted: |
| 1712 | case QMqtt::ReasonCode::ImplementationSpecificError: |
| 1713 | case QMqtt::ReasonCode::NotAuthorized: |
| 1714 | case QMqtt::ReasonCode::InvalidTopicFilter: |
| 1715 | case QMqtt::ReasonCode::MessageIdInUse: |
| 1716 | case QMqtt::ReasonCode::UnspecifiedError: |
| 1717 | qCWarning(lcMqttConnection) << "Unsubscription for id " << id << " failed. Reason Code:" << reasonCode; |
| 1718 | sub->setState(QMqttSubscription::Error); |
| 1719 | break; |
| 1720 | default: |
| 1721 | qCWarning(lcMqttConnection) << "Received illegal UNSUBACK reason code:" << reasonCode; |
| 1722 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1723 | break; |
| 1724 | } |
| 1725 | } while (m_missingData > 0); |
| 1726 | } |
| 1727 | |
| 1728 | void QMqttConnection::finalize_publish() |
| 1729 | { |
| 1730 | // String topic |
| 1731 | QMqttTopicName topic = readBufferTyped<QString>(dataSize: &m_missingData); |
| 1732 | const int topicLength = topic.name().size(); |
| 1733 | |
| 1734 | quint16 id = 0; |
| 1735 | if (m_currentPublish.qos > 0) |
| 1736 | id = readBufferTyped<quint16>(dataSize: &m_missingData); |
| 1737 | |
| 1738 | QMqttPublishProperties publishProperties; |
| 1739 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
| 1740 | readPublishProperties(properties&: publishProperties); |
| 1741 | |
| 1742 | if (publishProperties.availableProperties() & QMqttPublishProperties::TopicAlias) { |
| 1743 | const quint16 topicAlias = publishProperties.topicAlias(); |
| 1744 | if (topicAlias == 0 || topicAlias > m_clientPrivate->m_connectionProperties.maximumTopicAlias()) { |
| 1745 | qCDebug(lcMqttConnection) << "TopicAlias receive: overflow." ; |
| 1746 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1747 | return; |
| 1748 | } |
| 1749 | if (topicLength == 0) { // New message on existing topic alias |
| 1750 | topic = m_receiveAliases.at(i: topicAlias - 1); |
| 1751 | if (topic.name().isEmpty()) { |
| 1752 | qCDebug(lcMqttConnection) << "TopicAlias receive: alias for unknown topic." ; |
| 1753 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1754 | return; |
| 1755 | } |
| 1756 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias receive: Using " << topicAlias; |
| 1757 | } else { // Resetting a topic alias |
| 1758 | qCDebug(lcMqttConnection) << "TopicAlias receive: Resetting:" << topic.name() << " : " << topicAlias; |
| 1759 | m_receiveAliases[topicAlias - 1] = topic; |
| 1760 | } |
| 1761 | } |
| 1762 | |
| 1763 | // message |
| 1764 | const quint64 payloadLength = quint64(m_missingData); |
| 1765 | const QByteArray message = readBuffer(size: payloadLength); |
| 1766 | m_missingData -= payloadLength; |
| 1767 | |
| 1768 | qCDebug(lcMqttConnectionVerbose) << "Finalize PUBLISH: topic:" << topic |
| 1769 | << " payloadLength:" << payloadLength; |
| 1770 | |
| 1771 | emit m_clientPrivate->m_client->messageReceived(message, topic); |
| 1772 | |
| 1773 | QMqttMessage qmsg(topic, message, id, m_currentPublish.qos, |
| 1774 | m_currentPublish.dup, m_currentPublish.retain); |
| 1775 | qmsg.d->m_publishProperties = publishProperties; |
| 1776 | |
| 1777 | if (id != 0) { |
| 1778 | QMqttMessageStatusProperties statusProp; |
| 1779 | statusProp.data->userProperties = publishProperties.userProperties(); |
| 1780 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Published, properties: statusProp); |
| 1781 | } |
| 1782 | |
| 1783 | // Store subscriptions in a temporary container as each messageReceived is allowed to subscribe |
| 1784 | // again and thus invalid the iterator of the loop. |
| 1785 | QList<QMqttSubscription *> subscribers; |
| 1786 | for (const auto [key, value] : m_activeSubscriptions.asKeyValueRange()) { |
| 1787 | if (key.match(name: topic)) |
| 1788 | subscribers.append(t: value); |
| 1789 | } |
| 1790 | for (const auto &s : subscribers) |
| 1791 | emit s->messageReceived(msg: qmsg); |
| 1792 | |
| 1793 | if (m_currentPublish.qos == 1) |
| 1794 | sendControlPublishAcknowledge(id); |
| 1795 | else if (m_currentPublish.qos == 2) |
| 1796 | sendControlPublishReceive(id); |
| 1797 | } |
| 1798 | |
| 1799 | void QMqttConnection::finalize_pubAckRecRelComp() |
| 1800 | { |
| 1801 | qCDebug(lcMqttConnectionVerbose) << "Finalize PUBACK/REC/REL/COMP" ; |
| 1802 | const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData); |
| 1803 | |
| 1804 | QMqttMessageStatusProperties properties; |
| 1805 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) { |
| 1806 | // Reason Code (1byte) |
| 1807 | const quint8 reasonCode = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1808 | properties.data->reasonCode = QMqtt::ReasonCode(reasonCode); |
| 1809 | |
| 1810 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBACK || (m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) { |
| 1811 | // 3.4.2.1, 3.5.2.1 |
| 1812 | switch (QMqtt::ReasonCode(reasonCode)) { |
| 1813 | case QMqtt::ReasonCode::Success: |
| 1814 | case QMqtt::ReasonCode::NoMatchingSubscriber: |
| 1815 | case QMqtt::ReasonCode::UnspecifiedError: |
| 1816 | case QMqtt::ReasonCode::ImplementationSpecificError: |
| 1817 | case QMqtt::ReasonCode::NotAuthorized: |
| 1818 | case QMqtt::ReasonCode::InvalidTopicName: |
| 1819 | case QMqtt::ReasonCode::MessageIdInUse: |
| 1820 | case QMqtt::ReasonCode::QuotaExceeded: |
| 1821 | case QMqtt::ReasonCode::InvalidPayloadFormat: |
| 1822 | break; |
| 1823 | default: |
| 1824 | qCWarning(lcMqttConnection) << "Received illegal PUBACK/REC reason code:" << reasonCode; |
| 1825 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1826 | return; |
| 1827 | } |
| 1828 | } else { |
| 1829 | // 3.6.2.1, 3.7.2.1 |
| 1830 | switch (QMqtt::ReasonCode(reasonCode)) { |
| 1831 | case QMqtt::ReasonCode::Success: |
| 1832 | case QMqtt::ReasonCode::MessageIdNotFound: |
| 1833 | break; |
| 1834 | default: |
| 1835 | qCWarning(lcMqttConnection) << "Received illegal PUBREL/COMP reason code:" << reasonCode; |
| 1836 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1837 | return; |
| 1838 | } |
| 1839 | } |
| 1840 | |
| 1841 | readMessageStatusProperties(properties); |
| 1842 | } |
| 1843 | |
| 1844 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREL) { |
| 1845 | qCDebug(lcMqttConnectionVerbose) << " PUBREL:" << id; |
| 1846 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Released, properties); |
| 1847 | sendControlPublishComp(id); |
| 1848 | return; |
| 1849 | } |
| 1850 | |
| 1851 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBCOMP) { |
| 1852 | qCDebug(lcMqttConnectionVerbose) << " PUBCOMP:" << id; |
| 1853 | auto pendingRelease = m_pendingReleaseMessages.take(key: id); |
| 1854 | if (!pendingRelease) |
| 1855 | qCDebug(lcMqttConnection) << "Received PUBCOMP for unknown released message." ; |
| 1856 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Completed, properties); |
| 1857 | emit m_clientPrivate->m_client->messageSent(id); |
| 1858 | return; |
| 1859 | } |
| 1860 | |
| 1861 | auto pendingMsg = m_pendingMessages.take(key: id); |
| 1862 | if (!pendingMsg) { |
| 1863 | qCDebug(lcMqttConnection) << "Received PUBACK for unknown message: " << id; |
| 1864 | return; |
| 1865 | } |
| 1866 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) { |
| 1867 | qCDebug(lcMqttConnectionVerbose) << " PUBREC:" << id; |
| 1868 | m_pendingReleaseMessages.insert(key: id, value: pendingMsg); |
| 1869 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Received, properties); |
| 1870 | sendControlPublishRelease(id); |
| 1871 | } else { |
| 1872 | qCDebug(lcMqttConnectionVerbose) << " PUBACK:" << id; |
| 1873 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Acknowledged, properties); |
| 1874 | emit m_clientPrivate->m_client->messageSent(id); |
| 1875 | } |
| 1876 | } |
| 1877 | |
| 1878 | void QMqttConnection::finalize_pingresp() |
| 1879 | { |
| 1880 | qCDebug(lcMqttConnectionVerbose) << "Finalize PINGRESP" ; |
| 1881 | const quint8 v = readBufferTyped<quint8>(dataSize: &m_missingData); |
| 1882 | |
| 1883 | if (v != 0) { |
| 1884 | qCDebug(lcMqttConnection) << "Received a PINGRESP including payload." ; |
| 1885 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1886 | return; |
| 1887 | } |
| 1888 | m_pingTimeout--; |
| 1889 | emit m_clientPrivate->m_client->pingResponseReceived(); |
| 1890 | } |
| 1891 | |
| 1892 | bool QMqttConnection::processDataHelper() |
| 1893 | { |
| 1894 | if (m_missingData > 0) { |
| 1895 | if ((m_readBuffer.size() - m_readPosition) < m_missingData) |
| 1896 | return false; |
| 1897 | |
| 1898 | switch (m_currentPacket & 0xF0) { |
| 1899 | case QMqttControlPacket::AUTH: |
| 1900 | finalize_auth(); |
| 1901 | break; |
| 1902 | case QMqttControlPacket::CONNACK: |
| 1903 | finalize_connack(); |
| 1904 | break; |
| 1905 | case QMqttControlPacket::SUBACK: |
| 1906 | finalize_suback(); |
| 1907 | break; |
| 1908 | case QMqttControlPacket::UNSUBACK: |
| 1909 | finalize_unsuback(); |
| 1910 | break; |
| 1911 | case QMqttControlPacket::PUBLISH: |
| 1912 | finalize_publish(); |
| 1913 | break; |
| 1914 | case QMqttControlPacket::PUBACK: |
| 1915 | case QMqttControlPacket::PUBREC: |
| 1916 | case QMqttControlPacket::PUBREL: |
| 1917 | case QMqttControlPacket::PUBCOMP: |
| 1918 | finalize_pubAckRecRelComp(); |
| 1919 | break; |
| 1920 | case QMqttControlPacket::PINGRESP: |
| 1921 | finalize_pingresp(); |
| 1922 | break; |
| 1923 | default: |
| 1924 | qCDebug(lcMqttConnection) << "Unknown packet to finalize." ; |
| 1925 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1926 | break; |
| 1927 | } |
| 1928 | |
| 1929 | if (m_internalState == BrokerDisconnected) |
| 1930 | return false; |
| 1931 | |
| 1932 | Q_ASSERT(m_missingData == 0); |
| 1933 | |
| 1934 | m_readBuffer = m_readBuffer.mid(index: m_readPosition); |
| 1935 | m_readPosition = 0; |
| 1936 | } |
| 1937 | |
| 1938 | // MQTT-2.2 A fixed header of a control packet must be at least 2 bytes. If the payload is |
| 1939 | // longer than 127 bytes the header can be up to 5 bytes long. |
| 1940 | switch (m_readBuffer.size()) { |
| 1941 | case 0: |
| 1942 | case 1: |
| 1943 | return false; |
| 1944 | case 2: |
| 1945 | if ((m_readBuffer.at(i: 1) & 128) != 0) |
| 1946 | return false; |
| 1947 | break; |
| 1948 | case 3: |
| 1949 | if ((m_readBuffer.at(i: 1) & 128) != 0 && (m_readBuffer.at(i: 2) & 128) != 0) |
| 1950 | return false; |
| 1951 | break; |
| 1952 | case 4: |
| 1953 | if ((m_readBuffer.at(i: 1) & 128) != 0 && (m_readBuffer.at(i: 2) & 128) != 0 && (m_readBuffer.at(i: 3) & 128) != 0) |
| 1954 | return false; |
| 1955 | break; |
| 1956 | default: |
| 1957 | break; |
| 1958 | } |
| 1959 | |
| 1960 | readBuffer(data: reinterpret_cast<char *>(&m_currentPacket), size: 1); |
| 1961 | |
| 1962 | switch (m_currentPacket & 0xF0) { |
| 1963 | case QMqttControlPacket::CONNACK: { |
| 1964 | qCDebug(lcMqttConnectionVerbose) << "Received CONNACK" ; |
| 1965 | if (m_internalState != BrokerWaitForConnectAck) { |
| 1966 | qCDebug(lcMqttConnection) << "Received CONNACK at unexpected time." ; |
| 1967 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1968 | return false; |
| 1969 | } |
| 1970 | |
| 1971 | qint32 payloadSize = readVariableByteInteger(); |
| 1972 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
| 1973 | if (payloadSize != 2) { |
| 1974 | qCDebug(lcMqttConnection) << "Unexpected FRAME size in CONNACK." ; |
| 1975 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1976 | return false; |
| 1977 | } |
| 1978 | } |
| 1979 | m_missingData = payloadSize; |
| 1980 | break; |
| 1981 | } |
| 1982 | case QMqttControlPacket::SUBACK: { |
| 1983 | qCDebug(lcMqttConnectionVerbose) << "Received SUBACK" ; |
| 1984 | const quint8 remaining = readBufferTyped<quint8>(); |
| 1985 | m_missingData = remaining; |
| 1986 | break; |
| 1987 | } |
| 1988 | case QMqttControlPacket::PUBLISH: { |
| 1989 | qCDebug(lcMqttConnectionVerbose) << "Received PUBLISH" ; |
| 1990 | m_currentPublish.dup = m_currentPacket & 0x08; |
| 1991 | m_currentPublish.qos = (m_currentPacket & 0x06) >> 1; |
| 1992 | m_currentPublish.retain = m_currentPacket & 0x01; |
| 1993 | if ((m_currentPublish.qos == 0 && m_currentPublish.dup != 0) |
| 1994 | || m_currentPublish.qos > 2) { |
| 1995 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 1996 | return false; |
| 1997 | } |
| 1998 | |
| 1999 | m_missingData = readVariableByteInteger(); |
| 2000 | if (m_missingData == -1) |
| 2001 | return false; // Connection closed inside readVariableByteInteger |
| 2002 | break; |
| 2003 | } |
| 2004 | case QMqttControlPacket::PINGRESP: |
| 2005 | qCDebug(lcMqttConnectionVerbose) << "Received PINGRESP" ; |
| 2006 | m_missingData = 1; |
| 2007 | break; |
| 2008 | |
| 2009 | |
| 2010 | case QMqttControlPacket::PUBREL: { |
| 2011 | qCDebug(lcMqttConnectionVerbose) << "Received PUBREL" ; |
| 2012 | const quint8 remaining = readBufferTyped<quint8>(); |
| 2013 | if (remaining != 0x02) { |
| 2014 | qCDebug(lcMqttConnection) << "Received 2 byte message with invalid remaining length." ; |
| 2015 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 2016 | return false; |
| 2017 | } |
| 2018 | if ((m_currentPacket & 0x0F) != 0x02) { |
| 2019 | qCDebug(lcMqttConnection) << "Malformed fixed header for PUBREL." ; |
| 2020 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 2021 | return false; |
| 2022 | } |
| 2023 | m_missingData = 2; |
| 2024 | break; |
| 2025 | } |
| 2026 | |
| 2027 | case QMqttControlPacket::UNSUBACK: |
| 2028 | case QMqttControlPacket::PUBACK: |
| 2029 | case QMqttControlPacket::PUBREC: |
| 2030 | case QMqttControlPacket::PUBCOMP: { |
| 2031 | qCDebug(lcMqttConnectionVerbose) << "Received UNSUBACK/PUBACK/PUBREC/PUBCOMP" ; |
| 2032 | if ((m_currentPacket & 0x0F) != 0) { |
| 2033 | qCDebug(lcMqttConnection) << "Malformed fixed header for UNSUBACK/PUBACK/PUBREC/PUBCOMP." ; |
| 2034 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 2035 | return false; |
| 2036 | } |
| 2037 | const quint8 remaining = readBufferTyped<quint8>(); |
| 2038 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0 && remaining != 0x02) { |
| 2039 | qCDebug(lcMqttConnection) << "Received 2 byte message with invalid remaining length." ; |
| 2040 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 2041 | return false; |
| 2042 | } |
| 2043 | m_missingData = remaining; |
| 2044 | break; |
| 2045 | } |
| 2046 | case QMqttControlPacket::AUTH: |
| 2047 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
| 2048 | qCDebug(lcMqttConnection) << "Received unknown command." ; |
| 2049 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 2050 | return false; |
| 2051 | } |
| 2052 | qCDebug(lcMqttConnectionVerbose) << "Received AUTH" ; |
| 2053 | if ((m_currentPacket & 0x0F) != 0) { |
| 2054 | qCDebug(lcMqttConnection) << "Malformed fixed header for AUTH." ; |
| 2055 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 2056 | return false; |
| 2057 | } |
| 2058 | m_missingData = readVariableByteInteger(); |
| 2059 | if (m_missingData == -1) |
| 2060 | return false; // Connection closed inside readVariableByteInteger |
| 2061 | break; |
| 2062 | case QMqttControlPacket::DISCONNECT: |
| 2063 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
| 2064 | qCDebug(lcMqttConnection) << "Received unknown command." ; |
| 2065 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 2066 | return false; |
| 2067 | } |
| 2068 | qCDebug(lcMqttConnectionVerbose) << "Received DISCONNECT" ; |
| 2069 | if ((m_currentPacket & 0x0F) != 0) { |
| 2070 | qCDebug(lcMqttConnection) << "Malformed fixed header for DISCONNECT." ; |
| 2071 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 2072 | return false; |
| 2073 | } |
| 2074 | if (m_internalState != BrokerConnected) { |
| 2075 | qCDebug(lcMqttConnection) << "Received DISCONNECT at unexpected time." ; |
| 2076 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 2077 | return false; |
| 2078 | } |
| 2079 | closeConnection(error: QMqttClient::NoError); |
| 2080 | return false; |
| 2081 | default: |
| 2082 | qCDebug(lcMqttConnection) << "Received unknown command." ; |
| 2083 | closeConnection(error: QMqttClient::ProtocolViolation); |
| 2084 | return false; |
| 2085 | } |
| 2086 | |
| 2087 | /* set current command CONNACK - PINGRESP */ |
| 2088 | /* read command size */ |
| 2089 | /* calculate missing_data */ |
| 2090 | return true; // reiterate. implicitly finishes and enqueues |
| 2091 | } |
| 2092 | |
| 2093 | void QMqttConnection::processData() |
| 2094 | { |
| 2095 | while (processDataHelper()) |
| 2096 | ; |
| 2097 | } |
| 2098 | |
| 2099 | bool QMqttConnection::writePacketToTransport(const QMqttControlPacket &p) |
| 2100 | { |
| 2101 | const QByteArray writeData = p.serialize(); |
| 2102 | qCDebug(lcMqttConnectionVerbose) << Q_FUNC_INFO << " DataSize:" << writeData.size(); |
| 2103 | const qint64 res = m_transport->write(data: writeData.constData(), len: writeData.size()); |
| 2104 | if (Q_UNLIKELY(res == -1)) { |
| 2105 | qCDebug(lcMqttConnection) << "Could not write frame to transport." ; |
| 2106 | return false; |
| 2107 | } |
| 2108 | return true; |
| 2109 | } |
| 2110 | |
| 2111 | QT_END_NAMESPACE |
| 2112 | |