1// Copyright (C) 2017 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only
3// Qt-Security score:critical reason:network-protocol
4
5#include "qmqttconnection_p.h"
6#include "qmqttconnectionproperties_p.h"
7#include "qmqttcontrolpacket_p.h"
8#include "qmqttmessage_p.h"
9#include "qmqttpublishproperties_p.h"
10#include "qmqttsubscription_p.h"
11#include "qmqttclient_p.h"
12#include "transportLayers/mqtt_websocket_io_p.h"
13#include "transportLayers/mqtt_secure_websocket_io_p.h"
14
15#include <QtCore/QLoggingCategory>
16#include <QtNetwork/QSslSocket>
17#include <QtNetwork/QTcpSocket>
18
19#include <limits>
20#include <cstdint>
21
22QT_BEGIN_NAMESPACE
23
24Q_LOGGING_CATEGORY(lcMqttConnection, "qt.mqtt.connection")
25Q_STATIC_LOGGING_CATEGORY(lcMqttConnectionVerbose, "qt.mqtt.connection.verbose");
26
27template <typename T>
28T QMqttConnection::readBufferTyped(qint64 *dataSize)
29{
30 Q_STATIC_ASSERT(std::is_integral<T>::value);
31
32 T result = 0;
33 if (Q_UNLIKELY(dataSize != nullptr && *dataSize < qint64(sizeof(result)))) {
34 qCWarning(lcMqttConnection) << "Attempt to read past the data";
35 return result;
36 }
37 if (readBuffer(data: reinterpret_cast<char *>(&result), size: sizeof(result)) && dataSize != nullptr)
38 *dataSize -= sizeof(result);
39 return qFromBigEndian(result);
40}
41
42template<>
43QByteArray QMqttConnection::readBufferTyped(qint64 *dataSize)
44{
45 const quint16 size = readBufferTyped<quint16>(dataSize);
46 if (Q_UNLIKELY(dataSize != nullptr && *dataSize < qint64(size))) {
47 qCWarning(lcMqttConnection) << "Attempt to read past the data";
48 return QByteArray();
49 }
50 QByteArray ba(int(size), Qt::Uninitialized);
51 if (readBuffer(data: ba.data(), size) && dataSize != nullptr)
52 *dataSize -= size;
53 return ba;
54}
55
56template<>
57QString QMqttConnection::readBufferTyped(qint64 *dataSize)
58{
59 return QString::fromUtf8(ba: readBufferTyped<QByteArray>(dataSize));
60}
61
62bool QMqttConnection::isPendingUnsubscribe(QMqttSubscription *sub) const
63{
64 for (const auto [key, value] : m_pendingUnsubscriptions.asKeyValueRange()) {
65 if (value == sub)
66 return true;
67 }
68 return false;
69}
70
71QMqttSubscription *QMqttConnection::findActiveSubscription(const QMqttTopicFilter &topic) const
72{
73 const auto range = m_activeSubscriptions.equal_range(key: topic);
74 for (auto it = range.first; it != range.second; ++it) {
75 if (!isPendingUnsubscribe(sub: it.value()))
76 return it.value();
77 }
78 return nullptr;
79}
80
81QMqttConnection::QMqttConnection(QObject *parent) : QObject(parent)
82{
83}
84
85QMqttConnection::~QMqttConnection()
86{
87 if (m_internalState == BrokerConnected)
88 sendControlDisconnect();
89 m_transport = nullptr;
90}
91
92void QMqttConnection::timerEvent(QTimerEvent *event)
93{
94 if (Q_LIKELY(event->timerId() == m_pingTimer.timerId())) {
95 sendControlPingRequest();
96 return;
97 }
98
99 QObject::timerEvent(event);
100}
101
102void QMqttConnection::disconnectAndResetTransport()
103{
104 if (m_transport) {
105 if (m_connectedTransport) {
106 //
107 // Looks weird. We know that m_transport is not set from
108 // the outside because m_connectedTransport is only set
109 // by us for the websocket cases, or in ensureTransport.
110 //
111 // Thus we can disconnect everything.
112 //
113 disconnect(receiver: m_transport.get());
114 } else {
115 //
116 // m_transport is set from outside with setTransport,
117 // there might be other connections that shall not
118 // be removed.
119 //
120 disconnect(sender: m_transport.get(), signal: &QIODevice::aboutToClose, receiver: this, slot: &QMqttConnection::transportConnectionClosed);
121 disconnect(sender: m_transport.get(), signal: &QIODevice::readyRead, receiver: this, slot: &QMqttConnection::transportReadyRead);
122 }
123 }
124 m_transport = nullptr;
125 m_transportType = QMqttClient::TransportType::IODevice;
126}
127
128void QMqttConnection::setTransport(QIODevice *device, QMqttClient::TransportType transport)
129{
130 qCDebug(lcMqttConnection) << Q_FUNC_INFO << device << " Type:" << transport;
131
132 disconnectAndResetTransport();
133
134 m_transport = std::shared_ptr<QIODevice>(device, [](QIODevice *) { ; });
135 m_transportType = transport;
136 m_connectedTransport = false;
137 m_transportIsSet = true;
138
139 connect(sender: m_transport.get(), signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed);
140 connect(sender: m_transport.get(), signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead);
141}
142
143void QMqttConnection::connectTransport(QMqttClient::TransportType transportType, const std::shared_ptr<QIODevice> &transport)
144{
145 qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Type:" << transportType;
146
147 disconnectAndResetTransport();
148
149 m_transportType = transportType;
150 m_transport = transport;
151 m_transportIsSet = true;
152 m_connectedTransport = true;
153
154 if (!m_transport) {
155 qWarning(catFunc: lcMqttConnection) << " No transport created for " << transportType;
156 return;
157 }
158
159 QObject::connect(sender: m_transport.get(), signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed);
160 QObject::connect(sender: m_transport.get(), signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead);
161
162 switch (m_transportType) {
163 case QMqttClient::TransportType::IODevice:
164 break;
165 case QMqttClient::TransportType::AbstractSocket:
166 QObject::connect(sender: qobject_cast<QAbstractSocket *>(object: m_transport.get()), signal: &QAbstractSocket::connected, context: this, slot: &QMqttConnection::transportConnectionEstablished);
167 QObject::connect(sender: qobject_cast<QAbstractSocket *>(object: m_transport.get()), signal: &QAbstractSocket::disconnected, context: this, slot: &QMqttConnection::transportConnectionClosed);
168 QObject::connect(sender: qobject_cast<QAbstractSocket *>(object: m_transport.get()), signal: &QAbstractSocket::errorOccurred, context: this, slot: &QMqttConnection::transportError);
169 break;
170 case QMqttClient::TransportType::WebSocket:
171#ifdef QT_MQTT_WITH_WEBSOCKETS
172 QObject::connect(qobject_cast<QMqttWebSocketIO *>(m_transport.get()), &QMqttWebSocketIO::connected, this, &QMqttConnection::transportConnectionEstablished);
173 QObject::connect(qobject_cast<QMqttWebSocketIO *>(m_transport.get()), &QMqttWebSocketIO::disconnected, this, &QMqttConnection::transportConnectionClosed);
174 QObject::connect(qobject_cast<QMqttWebSocketIO *>(m_transport.get()), &QMqttWebSocketIO::errorOccurred, this, &QMqttConnection::transportError);
175#endif
176 break;
177
178 case QMqttClient::TransportType::SecureSocket:
179#ifndef QT_NO_SSL
180 QObject::connect(sender: qobject_cast<QSslSocket *>(object: m_transport.get()), signal: &QSslSocket::encrypted, context: this, slot: &QMqttConnection::transportConnectionEstablished);
181 QObject::connect(sender: qobject_cast<QSslSocket *>(object: m_transport.get()), signal: &QAbstractSocket::disconnected, context: this, slot: &QMqttConnection::transportConnectionClosed);
182 QObject::connect(sender: qobject_cast<QSslSocket *>(object: m_transport.get()), signal: &QAbstractSocket::errorOccurred, context: this, slot: &QMqttConnection::transportError);
183#endif
184 break;
185
186 case QMqttClient::TransportType::SecureWebSocket:
187#ifdef QT_MQTT_WITH_WEBSOCKETS
188 QObject::connect(qobject_cast<QMqttSecureWebSocketIO *>(m_transport.get()), &QMqttSecureWebSocketIO::encrypted, this, &QMqttConnection::transportConnectionEstablished);
189 QObject::connect(qobject_cast<QMqttSecureWebSocketIO *>(m_transport.get()), &QMqttSecureWebSocketIO::disconnected, this, &QMqttConnection::transportConnectionClosed);
190 QObject::connect(qobject_cast<QMqttSecureWebSocketIO *>(m_transport.get()), &QMqttSecureWebSocketIO::errorOccurred, this, &QMqttConnection::transportError);
191#endif
192 break;
193
194 default:
195 Q_ASSERT(false);
196 break;
197 } // end switch
198}
199
200QIODevice *QMqttConnection::transport() const
201{
202 return m_transport.get();
203}
204
205bool QMqttConnection::ensureTransport(bool createSecureIfNeeded)
206{
207 Q_UNUSED(createSecureIfNeeded); // QT_NO_SSL
208 qCDebug(lcMqttConnection) << Q_FUNC_INFO << m_transport.get();
209
210 if (m_transportIsSet)
211 return true; // transport set by setTransport
212
213 disconnectAndResetTransport();
214
215 // We are asked to create a transport layer
216 if (m_clientPrivate->m_hostname.isEmpty() || m_clientPrivate->m_port == 0) {
217 qCDebug(lcMqttConnection) << "No hostname specified, or port is 0. Not able to create a transport layer.";
218 return false;
219 }
220
221#ifdef Q_OS_WASM
222 qCWarning(lcMqttConnection) << "For WebAssembly, use connectToHostWebSocket[Encrypted]";
223 return false;
224#endif
225
226#ifndef QT_NO_SSL
227 if (createSecureIfNeeded) {
228 auto socket = std::make_shared<QSslSocket>();
229 m_transport = socket;
230 m_connectedTransport = true;
231 m_transportType = QMqttClient::SecureSocket;
232
233 QObject::connect(sender: socket.get(), signal: &QSslSocket::encrypted, context: this, slot: &QMqttConnection::transportConnectionEstablished);
234 QObject::connect(sender: socket.get(), signal: &QAbstractSocket::disconnected, context: this, slot: &QMqttConnection::transportConnectionClosed);
235 QObject::connect(sender: socket.get(), signal: &QAbstractSocket::errorOccurred, context: this, slot: &QMqttConnection::transportError);
236 }
237#endif
238
239 if (!m_transport) {
240 auto socket = std::make_shared<QTcpSocket>();
241 m_transport = socket;
242 m_connectedTransport = true;
243 m_transportType = QMqttClient::AbstractSocket;
244
245 QObject::connect(sender: socket.get(), signal: &QAbstractSocket::connected, context: this, slot: &QMqttConnection::transportConnectionEstablished);
246 QObject::connect(sender: socket.get(), signal: &QAbstractSocket::disconnected, context: this, slot: &QMqttConnection::transportConnectionClosed);
247 QObject::connect(sender: socket.get(), signal: &QAbstractSocket::errorOccurred, context: this, slot: &QMqttConnection::transportError);
248 }
249 QObject::connect(sender: m_transport.get(), signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed);
250 QObject::connect(sender: m_transport.get(), signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead);
251
252 return true;
253}
254
255bool QMqttConnection::ensureTransportOpen(const QString &sslPeerName)
256{
257 qCDebug(lcMqttConnection) << Q_FUNC_INFO << m_transportType;
258
259 if (m_transportType == QMqttClient::IODevice) {
260 if (m_transport->isOpen())
261 return sendControlConnect();
262
263 if (!m_transport->open(mode: QIODevice::ReadWrite)) {
264 qCDebug(lcMqttConnection) << "Could not open Transport IO device.";
265 m_internalState = BrokerDisconnected;
266 return false;
267 }
268 return sendControlConnect();
269 }
270
271 if (m_transportType == QMqttClient::AbstractSocket) {
272 auto socket = qobject_cast<QTcpSocket *>(object: m_transport.get());
273 Q_ASSERT(socket);
274 if (socket->state() == QAbstractSocket::ConnectedState)
275 return sendControlConnect();
276
277 m_internalState = BrokerConnecting;
278 socket->connectToHost(hostName: m_clientPrivate->m_hostname, port: m_clientPrivate->m_port);
279 }
280 else if (m_transportType == QMqttClient::WebSocket) {
281#ifdef QT_MQTT_WITH_WEBSOCKETS
282 auto socket = qobject_cast<QMqttWebSocketIO *>(m_transport.get());
283 Q_ASSERT(socket);
284 if (socket->state() == QAbstractSocket::ConnectedState)
285 return sendControlConnect();
286
287 m_internalState = BrokerConnecting;
288 socket->connectToHost(m_clientPrivate->m_hostname, m_clientPrivate->m_port, m_clientPrivate->m_protocolVersion);
289#endif
290 }
291 else if (m_transportType == QMqttClient::SecureWebSocket) {
292#ifdef QT_MQTT_WITH_WEBSOCKETS
293 auto socket = qobject_cast<QMqttSecureWebSocketIO *>(m_transport.get());
294 Q_ASSERT(socket);
295 if (socket->state() == QAbstractSocket::ConnectedState)
296 return sendControlConnect();
297
298 m_internalState = BrokerConnecting;
299 socket->connectToHostEncrypted(m_clientPrivate->m_hostname, m_clientPrivate->m_port, m_clientPrivate->m_protocolVersion);
300#endif
301 }
302#ifndef QT_NO_SSL
303 else if (m_transportType == QMqttClient::SecureSocket) {
304 auto socket = qobject_cast<QSslSocket *>(object: m_transport.get());
305 Q_ASSERT(socket);
306 if (socket->state() == QAbstractSocket::ConnectedState)
307 return sendControlConnect();
308
309 m_internalState = BrokerConnecting;
310 if (!m_sslConfiguration.isNull())
311 socket->setSslConfiguration(m_sslConfiguration);
312 socket->connectToHostEncrypted(hostName: m_clientPrivate->m_hostname, port: m_clientPrivate->m_port, sslPeerName);
313 }
314#endif
315 Q_UNUSED(sslPeerName);
316 return true;
317}
318
319bool QMqttConnection::sendControlConnect()
320{
321 qCDebug(lcMqttConnection) << Q_FUNC_INFO;
322
323 QMqttControlPacket packet(QMqttControlPacket::CONNECT);
324
325 // Variable header
326 // 3.1.2.1 Protocol Name
327 // 3.1.2.2 Protocol Level
328 switch (m_clientPrivate->m_protocolVersion) {
329 case QMqttClient::MQTT_3_1:
330 packet.append(data: "MQIsdp");
331 packet.append(value: char(3)); // Version 3.1
332 break;
333 case QMqttClient::MQTT_3_1_1:
334 packet.append(data: "MQTT");
335 packet.append(value: char(4)); // Version 3.1.1
336 break;
337 case QMqttClient::MQTT_5_0:
338 packet.append(data: "MQTT");
339 packet.append(value: char(5)); // Version 5.0
340 break;
341 }
342
343 // 3.1.2.3 Connect Flags
344 quint8 flags = 0;
345 // Clean session
346 if (m_clientPrivate->m_cleanSession)
347 flags |= 1 << 1;
348
349 if (!m_clientPrivate->m_willTopic.isEmpty()) {
350 flags |= 1 << 2;
351 if (m_clientPrivate->m_willQoS > 2) {
352 qCDebug(lcMqttConnection) << "Invalid Will QoS specified.";
353 return false;
354 }
355 if (m_clientPrivate->m_willQoS == 1)
356 flags |= 1 << 3;
357 else if (m_clientPrivate->m_willQoS == 2)
358 flags |= 1 << 4;
359 if (m_clientPrivate->m_willRetain)
360 flags |= 1 << 5;
361 }
362 if (m_clientPrivate->m_username.size())
363 flags |= 1 << 7;
364
365 if (m_clientPrivate->m_password.size())
366 flags |= 1 << 6;
367
368 packet.append(value: char(flags));
369
370 // 3.1.2.10 Keep Alive
371 packet.append(value: m_clientPrivate->m_keepAlive);
372
373 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
374 packet.appendRaw(data: writeConnectProperties());
375
376 // 3.1.3 Payload
377 // 3.1.3.1 Client Identifier
378 const QByteArray clientStringArray = m_clientPrivate->m_clientId.toUtf8();
379 if (clientStringArray.size()) {
380 packet.append(data: clientStringArray);
381 } else {
382 packet.append(value: char(0));
383 packet.append(value: char(0));
384 }
385
386 if (!m_clientPrivate->m_willTopic.isEmpty()) {
387 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
388 packet.appendRaw(data: writeLastWillProperties());
389
390 packet.append(data: m_clientPrivate->m_willTopic.toUtf8());
391 packet.append(data: m_clientPrivate->m_willMessage);
392 }
393
394 if (m_clientPrivate->m_username.size())
395 packet.append(data: m_clientPrivate->m_username.toUtf8());
396
397 if (m_clientPrivate->m_password.size())
398 packet.append(data: m_clientPrivate->m_password.toUtf8());
399
400 m_internalState = BrokerWaitForConnectAck;
401 m_missingData = 0;
402
403 if (!writePacketToTransport(p: packet)) {
404 qCDebug(lcMqttConnection) << "Could not write CONNECT frame to transport.";
405 return false;
406 }
407 return true;
408}
409
410bool QMqttConnection::sendControlAuthenticate(const QMqttAuthenticationProperties &properties)
411{
412 qCDebug(lcMqttConnection) << Q_FUNC_INFO;
413
414 QMqttControlPacket packet(QMqttControlPacket::AUTH);
415
416 switch (m_internalState) {
417 case BrokerDisconnected:
418 case BrokerConnecting:
419 case ClientDestruction:
420 qCDebug(lcMqttConnection) << "Using AUTH while disconnected.";
421 return false;
422 case BrokerWaitForConnectAck:
423 qCDebug(lcMqttConnection) << "AUTH while connecting, set continuation flag.";
424 packet.append(value: char(QMqtt::ReasonCode::ContinueAuthentication));
425 break;
426 case BrokerConnected:
427 qCDebug(lcMqttConnection) << "AUTH while connected, initiate re-authentication.";
428 packet.append(value: char(QMqtt::ReasonCode::ReAuthenticate));
429 break;
430 }
431
432 packet.appendRaw(data: writeAuthenticationProperties(properties));
433
434 if (!writePacketToTransport(p: packet)) {
435 qCDebug(lcMqttConnection) << "Could not write AUTH frame to transport.";
436 return false;
437 }
438
439 return true;
440}
441
442qint32 QMqttConnection::sendControlPublish(const QMqttTopicName &topic,
443 const QByteArray &message,
444 quint8 qos,
445 bool retain,
446 const QMqttPublishProperties &properties)
447{
448 qCDebug(lcMqttConnection) << Q_FUNC_INFO << topic << " Size:" << message.size() << " bytes."
449 << "QoS:" << qos << " Retain:" << retain;
450
451 if (!topic.isValid())
452 return -1;
453
454 quint8 header = QMqttControlPacket::PUBLISH;
455 if (qos == 1)
456 header |= 0x02;
457 else if (qos == 2)
458 header |= 0x04;
459
460 if (retain)
461 header |= 0x01;
462
463 QSharedPointer<QMqttControlPacket> packet(new QMqttControlPacket(header));
464 // topic alias
465 QMqttPublishProperties publishProperties(properties);
466 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
467 // 3.3.4 A PUBLISH packet sent from a Client to a Server MUST NOT contain a Subscription Identifier
468 if (publishProperties.availableProperties() & QMqttPublishProperties::SubscriptionIdentifier) {
469 qCWarning(lcMqttConnection) << "SubscriptionIdentifier must not be specified for publish.";
470 return -1;
471 }
472
473 const quint16 topicAlias = publishProperties.topicAlias();
474 if (topicAlias > 0) { // User specified topic Alias
475 if (topicAlias > m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()) {
476 qCDebug(lcMqttConnection) << "TopicAlias publish: overflow.";
477 return -1;
478 }
479 if (m_publishAliases.at(i: topicAlias - 1) != topic) {
480 qCDebug(lcMqttConnection) << "TopicAlias publish: Assign:" << topicAlias << ":" << topic;
481 m_publishAliases[topicAlias - 1] = topic;
482 packet->append(data: topic.name().toUtf8());
483 } else {
484 qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: Reuse:" << topicAlias;
485 packet->append(value: quint16(0));
486 }
487 } else if (m_publishAliases.size() > 0) { // Automatic module alias assignment
488 int autoAlias = m_publishAliases.indexOf(t: topic);
489 if (autoAlias != -1) {
490 qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: Use auto alias:" << autoAlias;
491 packet->append(value: quint16(0));
492 publishProperties.setTopicAlias(quint16(autoAlias + 1));
493 } else {
494 autoAlias = m_publishAliases.indexOf(t: QMqttTopicName());
495 if (autoAlias != -1) {
496 qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: auto alias assignment:" << autoAlias;
497 m_publishAliases[autoAlias] = topic;
498 publishProperties.setTopicAlias(quint16(autoAlias) + 1);
499 } else
500 qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: alias storage full, using full topic";
501 packet->append(data: topic.name().toUtf8());
502 }
503 } else {
504 packet->append(data: topic.name().toUtf8());
505 }
506 } else { // ! MQTT_5_0
507 packet->append(data: topic.name().toUtf8());
508 }
509 quint16 identifier = 0;
510 if (qos > 0) {
511 identifier = unusedPacketIdentifier();
512 packet->append(value: identifier);
513 m_pendingMessages.insert(key: identifier, value: packet);
514 }
515
516 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
517 packet->appendRaw(data: writePublishProperties(properties: publishProperties));
518
519 packet->appendRaw(data: message);
520
521 const bool written = writePacketToTransport(p: *packet.data());
522
523 if (!written && qos > 0)
524 m_pendingMessages.remove(key: identifier);
525 return written ? identifier : -1;
526}
527
528bool QMqttConnection::sendControlPublishAcknowledge(quint16 id)
529{
530 qCDebug(lcMqttConnection) << Q_FUNC_INFO << id;
531 QMqttControlPacket packet(QMqttControlPacket::PUBACK);
532 packet.append(value: id);
533 return writePacketToTransport(p: packet);
534}
535
536bool QMqttConnection::sendControlPublishRelease(quint16 id)
537{
538 qCDebug(lcMqttConnection) << Q_FUNC_INFO << id;
539 quint8 header = QMqttControlPacket::PUBREL;
540 header |= 0x02; // MQTT-3.6.1-1
541
542 QMqttControlPacket packet(header);
543 packet.append(value: id);
544 return writePacketToTransport(p: packet);
545}
546
547bool QMqttConnection::sendControlPublishReceive(quint16 id)
548{
549 qCDebug(lcMqttConnection) << Q_FUNC_INFO << id;
550 QMqttControlPacket packet(QMqttControlPacket::PUBREC);
551 packet.append(value: id);
552 return writePacketToTransport(p: packet);
553}
554
555bool QMqttConnection::sendControlPublishComp(quint16 id)
556{
557 qCDebug(lcMqttConnection) << Q_FUNC_INFO << id;
558 QMqttControlPacket packet(QMqttControlPacket::PUBCOMP);
559 packet.append(value: id);
560 return writePacketToTransport(p: packet);
561}
562
563QMqttSubscription *QMqttConnection::sendControlSubscribe(const QMqttTopicFilter &topic,
564 quint8 qos,
565 const QMqttSubscriptionProperties &properties)
566{
567 qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic << " qos:" << qos;
568
569 // Overflow protection
570 if (Q_UNLIKELY(!topic.isValid())) {
571 qCWarning(lcMqttConnection) << "Invalid subscription topic filter.";
572 return nullptr;
573 }
574
575 if (Q_UNLIKELY(qos > 2)) {
576 qCWarning(lcMqttConnection) << "Invalid subscription QoS.";
577 return nullptr;
578 }
579
580 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
581 const QString sharedSubscriptionName = topic.sharedSubscriptionName();
582 if (!sharedSubscriptionName.isEmpty()) {
583 const QMqttTopicFilter filter(topic.filter().section(asep: QLatin1Char('/'), astart: 2));
584 auto sub = findActiveSubscription(topic: filter);
585 if (sub && (sub->sharedSubscriptionName() == sharedSubscriptionName))
586 return sub;
587 } else {
588 auto sub = findActiveSubscription(topic);
589 if (sub && !sub->isSharedSubscription())
590 return sub;
591 }
592 } else {
593 auto sub = findActiveSubscription(topic);
594 if (sub)
595 return sub;
596 }
597
598 // has to have 0010 as bits 3-0, maybe update SUBSCRIBE instead?
599 // MQTT-3.8.1-1
600 const quint8 header = QMqttControlPacket::SUBSCRIBE + 0x02;
601 QMqttControlPacket packet(header);
602
603 // Add Packet Identifier
604 const quint16 identifier = unusedPacketIdentifier();
605
606 packet.append(value: identifier);
607
608 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
609 packet.appendRaw(data: writeSubscriptionProperties(properties));
610
611 packet.append(data: topic.filter().toUtf8());
612 char options = char(qos);
613 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && properties.noLocal())
614 options |= 1 << 2;
615 packet.append(value: options);
616
617 auto result = new QMqttSubscription(this);
618 result->setTopic(topic);
619 result->setClient(m_clientPrivate->m_client);
620 result->setQos(qos);
621 result->setState(QMqttSubscription::SubscriptionPending);
622 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && !topic.sharedSubscriptionName().isEmpty()) {
623 result->setSharedSubscriptionName(topic.sharedSubscriptionName());
624 result->setSharedSubscription(true);
625 result->setTopic(topic.filter().section(asep: QLatin1Char('/'), astart: 2));
626 }
627
628 if (!writePacketToTransport(p: packet)) {
629 delete result;
630 return nullptr;
631 }
632
633 // SUBACK must contain identifier MQTT-3.8.4-2
634 m_pendingSubscriptionAck.insert(key: identifier, value: result);
635 m_activeSubscriptions.insert(key: result->topic(), value: result);
636 return result;
637}
638
639bool QMqttConnection::sendControlUnsubscribe(const QMqttTopicFilter &topic, const QMqttUnsubscriptionProperties &properties)
640{
641 qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic;
642
643 // MQTT-3.10.3-2
644 if (!topic.isValid())
645 return false;
646
647 auto sub = findActiveSubscription(topic);
648 if (!sub)
649 // Already unsubscribed
650 return false;
651
652 if (m_internalState != QMqttConnection::BrokerConnected) {
653 m_activeSubscriptions.remove(key: topic, value: sub);
654 return true;
655 }
656
657 // has to have 0010 as bits 3-0, maybe update UNSUBSCRIBE instead?
658 // MQTT-3.10.1-1
659 const quint8 header = QMqttControlPacket::UNSUBSCRIBE + 0x02;
660 QMqttControlPacket packet(header);
661
662 // Add Packet Identifier
663 const quint16 identifier = unusedPacketIdentifier();
664
665 packet.append(value: identifier);
666
667 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
668 packet.appendRaw(data: writeUnsubscriptionProperties(properties));
669 }
670
671 packet.append(data: topic.filter().toUtf8());
672 sub->setState(QMqttSubscription::UnsubscriptionPending);
673
674 if (!writePacketToTransport(p: packet))
675 return false;
676
677 // Do not remove from m_activeSubscriptions as there might be QoS1/2 messages to still
678 // be sent before UNSUBSCRIBE is acknowledged.
679 m_pendingUnsubscriptions.insert(key: identifier, value: sub);
680
681 return true;
682}
683
684bool QMqttConnection::sendControlPingRequest(bool isAuto)
685{
686 qCDebug(lcMqttConnection) << Q_FUNC_INFO;
687
688 if (m_internalState != QMqttConnection::BrokerConnected)
689 return false;
690
691
692 if (!isAuto && m_clientPrivate->m_autoKeepAlive) {
693 qCDebug(lcMqttConnection) << "Requesting a manual ping while autoKeepAlive is enabled "
694 << "is not allowed.";
695 return false;
696 }
697
698 // 3.1.2.10 If a Client does not receive a PINGRESP packet within a reasonable amount of time
699 // after it has sent a PINGREQ, it SHOULD close the Network Connection to the Server
700 if (m_pingTimeout > 1) {
701 closeConnection(error: QMqttClient::ServerUnavailable);
702 return false;
703 }
704
705 const QMqttControlPacket packet(QMqttControlPacket::PINGREQ);
706 if (!writePacketToTransport(p: packet)) {
707 qCDebug(lcMqttConnection) << "Failed to write PINGREQ to transport.";
708 return false;
709 }
710 m_pingTimeout++;
711 return true;
712}
713
714bool QMqttConnection::sendControlDisconnect()
715{
716 qCDebug(lcMqttConnection) << Q_FUNC_INFO;
717
718 m_pingTimer.stop();
719 m_pingTimeout = 0;
720
721 m_activeSubscriptions.clear();
722
723 m_receiveAliases.clear();
724 m_publishAliases.clear();
725
726 const QMqttControlPacket packet(QMqttControlPacket::DISCONNECT);
727 if (!writePacketToTransport(p: packet)) {
728 qCDebug(lcMqttConnection) << "Failed to write DISCONNECT to transport.";
729 return false;
730 }
731 if (m_internalState != ClientDestruction)
732 m_internalState = BrokerDisconnected;
733
734 if (m_transport->waitForBytesWritten(msecs: 30000)) {
735 // MQTT-3.14.4-1 must disconnect
736 m_transport->close();
737 return true;
738 }
739 return false;
740}
741
742void QMqttConnection::setClientPrivate(QMqttClientPrivate *clientPrivate)
743{
744 m_clientPrivate = clientPrivate;
745}
746
747quint16 QMqttConnection::unusedPacketIdentifier() const
748{
749 // MQTT-2.3.1-1 Control Packets MUST contain a non-zero 16-bit Packet Identifier
750 static quint16 packetIdentifierCounter = 1;
751 const std::uint16_t u16max = std::numeric_limits<std::uint16_t>::max();
752
753 // MQTT-2.3.1-2 ...it MUST assign it a currently unused Packet Identifier
754 const quint16 lastIdentifier = packetIdentifierCounter;
755 do {
756 if (packetIdentifierCounter == u16max)
757 packetIdentifierCounter = 1;
758 else
759 packetIdentifierCounter++;
760
761 if (lastIdentifier == packetIdentifierCounter) {
762 qCDebug(lcMqttConnection) << "Could not generate unique packet identifier.";
763 break;
764 }
765 } while (m_pendingSubscriptionAck.contains(key: packetIdentifierCounter)
766 || m_pendingUnsubscriptions.contains(key: packetIdentifierCounter)
767 || m_pendingMessages.contains(key: packetIdentifierCounter)
768 || m_pendingReleaseMessages.contains(key: packetIdentifierCounter));
769 return packetIdentifierCounter;
770}
771
772void QMqttConnection::cleanSubscriptions()
773{
774 for (auto item : m_pendingSubscriptionAck)
775 item->setState(QMqttSubscription::Unsubscribed);
776 m_pendingSubscriptionAck.clear();
777
778 for (auto item : m_pendingUnsubscriptions)
779 item->setState(QMqttSubscription::Unsubscribed);
780 m_pendingUnsubscriptions.clear();
781
782 for (auto item : m_activeSubscriptions)
783 item->setState(QMqttSubscription::Unsubscribed);
784 m_activeSubscriptions.clear();
785}
786
787void QMqttConnection::transportConnectionEstablished()
788{
789 if (m_internalState != BrokerConnecting) {
790 qCWarning(lcMqttConnection) << "Connection established at an unexpected time";
791 return;
792 }
793
794 if (!sendControlConnect()) {
795 qCDebug(lcMqttConnection) << "Failed to write CONNECT to transport.";
796 // ### Who disconnects now? Connection or client?
797 m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::TransportInvalid);
798 }
799}
800
801void QMqttConnection::transportConnectionClosed()
802{
803 m_readBuffer.clear();
804 m_readPosition = 0;
805 m_pingTimer.stop();
806 m_pingTimeout = 0;
807 if (m_internalState == ClientDestruction)
808 return;
809 if (m_internalState == BrokerDisconnected) // We manually disconnected
810 m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::NoError);
811 else
812 m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::TransportInvalid);
813}
814
815void QMqttConnection::transportReadyRead()
816{
817 qCDebug(lcMqttConnectionVerbose) << Q_FUNC_INFO;
818 m_readBuffer.append(a: m_transport->readAll());
819 processData();
820}
821
822void QMqttConnection::transportError(QAbstractSocket::SocketError e)
823{
824 qCDebug(lcMqttConnection) << Q_FUNC_INFO << e;
825 closeConnection(error: QMqttClient::TransportInvalid);
826}
827
828bool QMqttConnection::readBuffer(char *data, quint64 size)
829{
830 if (Q_UNLIKELY(quint64(m_readBuffer.size() - m_readPosition) < size)) {
831 qCDebug(lcMqttConnection) << "Reaching out of buffer, protocol violation";
832 closeConnection(error: QMqttClient::ProtocolViolation);
833 return false;
834 }
835 memcpy(dest: data, src: m_readBuffer.constData() + m_readPosition, n: size);
836 m_readPosition += size;
837 return true;
838}
839
840qint32 QMqttConnection::readVariableByteInteger(qint64 *dataSize)
841{
842 quint32 multiplier = 1;
843 qint32 msgLength = 0;
844 quint8 b = 0;
845 quint8 iteration = 0;
846 do {
847 b = readBufferTyped<quint8>(dataSize);
848 msgLength += (b & 127) * multiplier;
849 multiplier *= 128;
850 iteration++;
851 if (iteration > 4) {
852 qCDebug(lcMqttConnection) << "Overflow trying to read variable integer.";
853 closeConnection(error: QMqttClient::ProtocolViolation);
854 return -1;
855 }
856 } while ((b & 128) != 0);
857 return msgLength;
858}
859
860void QMqttConnection::closeConnection(QMqttClient::ClientError error)
861{
862 m_readBuffer.clear();
863 m_readPosition = 0;
864 m_pingTimer.stop();
865 m_pingTimeout = 0;
866 m_activeSubscriptions.clear();
867 m_internalState = BrokerDisconnected;
868 m_transport->disconnect();
869 m_transport->close();
870 m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: error);
871}
872
873QByteArray QMqttConnection::readBuffer(quint64 size)
874{
875 if (Q_UNLIKELY(quint64(m_readBuffer.size() - m_readPosition) < size)) {
876 qCDebug(lcMqttConnection) << "Reaching out of buffer, protocol violation";
877 closeConnection(error: QMqttClient::ProtocolViolation);
878 return QByteArray();
879 }
880 QByteArray res(m_readBuffer.constData() + m_readPosition, int(size));
881 m_readPosition += size;
882 return res;
883}
884
885void QMqttConnection::readAuthProperties(QMqttAuthenticationProperties &properties)
886{
887 qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData);
888 m_missingData -= propertyLength;
889
890 QMqttUserProperties userProperties;
891 while (propertyLength > 0) {
892 quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength);
893
894 switch (propertyId) {
895 case 0x15: { //3.15.2.2.2 Authentication Method
896 const QString method = readBufferTyped<QString>(dataSize: &propertyLength);
897 properties.setAuthenticationMethod(method);
898 break;
899 }
900 case 0x16: { // 3.15.2.2.3 Authentication Data
901 const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength);
902 properties.setAuthenticationData(data);
903 break;
904 }
905 case 0x1F: { // 3.15.2.2.4 Reason String
906 const QString reasonString = readBufferTyped<QString>(dataSize: &propertyLength);
907 properties.setReason(reasonString);
908 break;
909 }
910 case 0x26: { // 3.15.2.2.5 User property
911 const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength);
912 const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength);
913
914 userProperties.append(t: QMqttStringPair(propertyName, propertyValue));
915 break;
916 }
917 default:
918 qCDebug(lcMqttConnection) << "Unknown property id in AUTH:" << propertyId;
919 break;
920 }
921 }
922 if (!userProperties.isEmpty())
923 properties.setUserProperties(userProperties);
924}
925
926void QMqttConnection::readConnackProperties(QMqttServerConnectionProperties &properties)
927{
928 qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData);
929 m_missingData -= propertyLength;
930
931 properties.serverData->valid = true;
932
933 while (propertyLength > 0) {
934 quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength);
935 switch (propertyId) {
936 case 0x11: { // 3.2.2.3.2 Session Expiry Interval
937 const quint32 expiryInterval = readBufferTyped<quint32>(dataSize: &propertyLength);
938 properties.serverData->details |= QMqttServerConnectionProperties::SessionExpiryInterval;
939 properties.setSessionExpiryInterval(expiryInterval);
940 break;
941 }
942 case 0x21: { // 3.2.2.3.3 Receive Maximum
943 const quint16 receiveMaximum = readBufferTyped<quint16>(dataSize: &propertyLength);
944 properties.serverData->details |= QMqttServerConnectionProperties::MaximumReceive;
945 properties.setMaximumReceive(receiveMaximum);
946 break;
947 }
948 case 0x24: { // 3.2.2.3.4 Maximum QoS Level
949 const quint8 maxQoS = readBufferTyped<quint8>(dataSize: &propertyLength);
950 properties.serverData->details |= QMqttServerConnectionProperties::MaximumQoS;
951 properties.serverData->maximumQoS = maxQoS;
952 break;
953 }
954 case 0x25: { // 3.2.2.3.5 Retain available
955 const quint8 retainAvailable = readBufferTyped<quint8>(dataSize: &propertyLength);
956 properties.serverData->details |= QMqttServerConnectionProperties::RetainAvailable;
957 properties.serverData->retainAvailable = retainAvailable == 1;
958 break;
959 }
960 case 0x27: { // 3.2.2.3.6 Maximum packet size
961 const quint32 maxPacketSize = readBufferTyped<quint32>(dataSize: &propertyLength);
962 properties.serverData->details |= QMqttServerConnectionProperties::MaximumPacketSize;
963 properties.setMaximumPacketSize(maxPacketSize);
964 break;
965 }
966 case 0x12: { // 3.2.2.3.7 Assigned clientId
967 const QString assignedClientId = readBufferTyped<QString>(dataSize: &propertyLength);
968 properties.serverData->details |= QMqttServerConnectionProperties::AssignedClientId;
969 m_clientPrivate->setClientId(assignedClientId);
970 break;
971 }
972 case 0x22: { // 3.2.2.3.8 Topic Alias Maximum
973 const quint16 topicAliasMaximum = readBufferTyped<quint16>(dataSize: &propertyLength);
974 properties.serverData->details |= QMqttServerConnectionProperties::MaximumTopicAlias;
975 properties.setMaximumTopicAlias(topicAliasMaximum);
976 break;
977 }
978 case 0x1F: { // 3.2.2.3.9 Reason String
979 const QString reasonString = readBufferTyped<QString>(dataSize: &propertyLength);
980 properties.serverData->details |= QMqttServerConnectionProperties::ReasonString;
981 properties.serverData->reasonString = reasonString;
982 break;
983 }
984 case 0x26: { // 3.2.2.3.10 User property
985 const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength);
986 const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength);
987
988 properties.serverData->details |= QMqttServerConnectionProperties::UserProperty;
989 properties.data->userProperties.append(t: QMqttStringPair(propertyName, propertyValue));
990 break;
991 }
992 case 0x28: { // 3.2.2.3.11 Wildcard subscriptions available
993 const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength);
994 properties.serverData->details |= QMqttServerConnectionProperties::WildCardSupported;
995 properties.serverData->wildcardSupported = available == 1;
996 break;
997 }
998 case 0x29: { // 3.2.2.3.12 Subscription identifiers available
999 const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength);
1000 properties.serverData->details |= QMqttServerConnectionProperties::SubscriptionIdentifierSupport;
1001 properties.serverData->subscriptionIdentifierSupported = available == 1;
1002 break;
1003 }
1004 case 0x2A: { // 3.2.2.3.13 Shared subscriptions available
1005 const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength);
1006 properties.serverData->details |= QMqttServerConnectionProperties::SharedSubscriptionSupport;
1007 properties.serverData->sharedSubscriptionSupported = available == 1;
1008 break;
1009 }
1010 case 0x13: { // 3.2.2.3.14 Server Keep Alive
1011 const quint16 serverKeepAlive = readBufferTyped<quint16>(dataSize: &propertyLength);
1012 properties.serverData->details |= QMqttServerConnectionProperties::ServerKeepAlive;
1013 m_clientPrivate->m_client->setKeepAlive(serverKeepAlive);
1014 break;
1015 }
1016 case 0x1A: { // 3.2.2.3.15 Response information
1017 const QString responseInfo = readBufferTyped<QString>(dataSize: &propertyLength);
1018 properties.serverData->details |= QMqttServerConnectionProperties::ResponseInformation;
1019 properties.serverData->responseInformation = responseInfo;
1020 break;
1021 }
1022 case 0x1C: { // 3.2.2.3.16 Server reference
1023 const QString serverReference = readBufferTyped<QString>(dataSize: &propertyLength);
1024 properties.serverData->details |= QMqttServerConnectionProperties::ServerReference;
1025 properties.serverData->serverReference = serverReference;
1026 break;
1027 }
1028 case 0x15: { // 3.2.2.3.17 Authentication method
1029 const QString method = readBufferTyped<QString>(dataSize: &propertyLength);
1030 properties.serverData->details |= QMqttServerConnectionProperties::AuthenticationMethod;
1031 properties.data->authenticationMethod = method;
1032 break;
1033 }
1034 case 0x16: { // 3.2.2.3.18 Authentication data
1035 const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength);
1036 properties.serverData->details |= QMqttServerConnectionProperties::AuthenticationData;
1037 properties.data->authenticationData = data;
1038 break;
1039 }
1040 default:
1041 qCDebug(lcMqttConnection) << "Unknown property id in CONNACK:" << int(propertyId);
1042 break;
1043 }
1044 }
1045}
1046
1047void QMqttConnection::readMessageStatusProperties(QMqttMessageStatusProperties &properties)
1048{
1049 qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData);
1050 m_missingData -= propertyLength;
1051
1052 while (propertyLength > 0) {
1053 const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength);
1054 switch (propertyId) {
1055 case 0x1f: { // 3.4.2.2.2 Reason String
1056 const QString content = readBufferTyped<QString>(dataSize: &propertyLength);
1057 properties.data->reasonString = content;
1058 break;
1059 }
1060 case 0x26: { // 3.4.2.2.3 User Properites
1061 const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength);
1062 const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength);
1063 properties.data->userProperties.append(t: QMqttStringPair(propertyName, propertyValue));
1064 break;
1065 }
1066 default:
1067 qCDebug(lcMqttConnection) << "Unknown subscription property received.";
1068 break;
1069 }
1070 }
1071}
1072
1073void QMqttConnection::readPublishProperties(QMqttPublishProperties &properties)
1074{
1075 qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData);
1076 m_missingData -= propertyLength;
1077
1078 QMqttUserProperties userProperties;
1079 QList<quint32> subscriptionIds;
1080
1081 while (propertyLength > 0) {
1082 const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength);
1083 switch (propertyId) {
1084 case 0x01: { // 3.3.2.3.2 Payload Format Indicator
1085 const quint8 format = readBufferTyped<quint8>(dataSize: &propertyLength);
1086 if (format == 1)
1087 properties.setPayloadFormatIndicator(QMqtt::PayloadFormatIndicator::UTF8Encoded);
1088 break;
1089 }
1090 case 0x02: { // 3.3.2.3.3 Message Expiry Interval
1091 const quint32 interval = readBufferTyped<quint32>(dataSize: &propertyLength);
1092 properties.setMessageExpiryInterval(interval);
1093 break;
1094 }
1095 case 0x23: { // 3.3.2.3.4 Topic alias
1096 const quint16 alias = readBufferTyped<quint16>(dataSize: &propertyLength);
1097 properties.setTopicAlias(alias);
1098 break;
1099 }
1100 case 0x08: { // 3.3.2.3.5 Response Topic
1101 const QString responseTopic = readBufferTyped<QString>(dataSize: &propertyLength);
1102 properties.setResponseTopic(responseTopic);
1103 break;
1104 }
1105 case 0x09: { // 3.3.2.3.6 Correlation Data
1106 const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength);
1107 properties.setCorrelationData(data);
1108 break;
1109 }
1110 case 0x26: { // 3.3.2.3.7 User property
1111 const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength);
1112 const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength);
1113 userProperties.append(t: QMqttStringPair(propertyName, propertyValue));
1114 break;
1115 }
1116 case 0x0b: { // 3.3.2.3.8 Subscription Identifier
1117 qint32 id = readVariableByteInteger(dataSize: &propertyLength);
1118 if (id < 0)
1119 return; // readVariableByteInteger closes connection
1120 subscriptionIds.append(t: quint32(id));
1121 break;
1122 }
1123 case 0x03: { // 3.3.2.3.9 Content Type
1124 const QString content = readBufferTyped<QString>(dataSize: &propertyLength);
1125 properties.setContentType(content);
1126 break;
1127 }
1128 default:
1129 qCDebug(lcMqttConnection) << "Unknown publish property received.";
1130 break;
1131 }
1132 }
1133 if (!userProperties.isEmpty())
1134 properties.setUserProperties(userProperties);
1135
1136 if (!subscriptionIds.isEmpty())
1137 properties.setSubscriptionIdentifiers(subscriptionIds);
1138}
1139
1140void QMqttConnection::readSubscriptionProperties(QMqttSubscription *sub)
1141{
1142 qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData);
1143 m_missingData -= propertyLength;
1144
1145 while (propertyLength > 0) {
1146 const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength);
1147 switch (propertyId) {
1148 case 0x1f: { // 3.9.2.1.2 Reason String
1149 const QString content = readBufferTyped<QString>(dataSize: &propertyLength);
1150 sub->d_func()->m_reasonString = content;
1151 break;
1152 }
1153 case 0x26: { // 3.9.2.1.3
1154 const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength);
1155 const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength);
1156
1157 sub->d_func()->m_userProperties.append(t: QMqttStringPair(propertyName, propertyValue));
1158 break;
1159 }
1160 default:
1161 qCDebug(lcMqttConnection) << "Unknown subscription property received.";
1162 break;
1163 }
1164 }
1165}
1166
1167QByteArray QMqttConnection::writeConnectProperties()
1168{
1169 QMqttControlPacket properties;
1170
1171 // According to MQTT5 3.1.2.11 default values do not need to be included in the
1172 // connect statement.
1173
1174 // 3.1.2.11.2
1175 if (m_clientPrivate->m_connectionProperties.sessionExpiryInterval() != 0) {
1176 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify sessionExpiryInterval";
1177 properties.append(value: char(0x11));
1178 properties.append(value: m_clientPrivate->m_connectionProperties.sessionExpiryInterval());
1179 }
1180
1181 // 3.1.2.11.3
1182 if (m_clientPrivate->m_connectionProperties.maximumReceive() != 65535) {
1183 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumReceive";
1184 properties.append(value: char(0x21));
1185 properties.append(value: m_clientPrivate->m_connectionProperties.maximumReceive());
1186 }
1187
1188 // 3.1.2.11.4
1189 if (m_clientPrivate->m_connectionProperties.maximumPacketSize() != std::numeric_limits<quint32>::max()) {
1190 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumPacketSize";
1191 properties.append(value: char(0x27));
1192 properties.append(value: m_clientPrivate->m_connectionProperties.maximumPacketSize());
1193 }
1194
1195 // 3.1.2.11.5
1196 if (m_clientPrivate->m_connectionProperties.maximumTopicAlias() != 0) {
1197 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumTopicAlias";
1198 properties.append(value: char(0x22));
1199 properties.append(value: m_clientPrivate->m_connectionProperties.maximumTopicAlias());
1200 }
1201
1202 // 3.1.2.11.6
1203 if (m_clientPrivate->m_connectionProperties.requestResponseInformation()) {
1204 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify requestResponseInformation";
1205 properties.append(value: char(0x19));
1206 properties.append(value: char(1));
1207 }
1208
1209 // 3.1.2.11.7
1210 if (!m_clientPrivate->m_connectionProperties.requestProblemInformation()) {
1211 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify requestProblemInformation";
1212 properties.append(value: char(0x17));
1213 properties.append(value: char(0));
1214 }
1215
1216 // 3.1.2.11.8 Add User properties
1217 auto userProperties = m_clientPrivate->m_connectionProperties.userProperties();
1218 if (!userProperties.isEmpty()) {
1219 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify user properties";
1220 for (const auto &prop : userProperties) {
1221 properties.append(value: char(0x26));
1222 properties.append(data: prop.name().toUtf8());
1223 properties.append(data: prop.value().toUtf8());
1224 }
1225 }
1226
1227 // 3.1.2.11.9 Add Authentication
1228 const QString authenticationMethod = m_clientPrivate->m_connectionProperties.authenticationMethod();
1229 if (!authenticationMethod.isEmpty()) {
1230 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify AuthenticationMethod:";
1231 qCDebug(lcMqttConnectionVerbose) << " " << authenticationMethod;
1232 properties.append(value: char(0x15));
1233 properties.append(data: authenticationMethod.toUtf8());
1234 // 3.1.2.11.10
1235 const QByteArray authenticationData = m_clientPrivate->m_connectionProperties.authenticationData();
1236 if (!authenticationData.isEmpty()) {
1237 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: Authentication Data:";
1238 qCDebug(lcMqttConnectionVerbose) << " " << authenticationData;
1239 properties.append(value: char(0x16));
1240 properties.append(data: authenticationData);
1241 }
1242 }
1243
1244 return properties.serializePayload();
1245}
1246
1247QByteArray QMqttConnection::writeLastWillProperties() const
1248{
1249 QMqttControlPacket properties;
1250 const QMqttLastWillProperties &lastWillProperties = m_clientPrivate->m_lastWillProperties;
1251 // Will Delay interval 3.1.3.2.2
1252 if (lastWillProperties.willDelayInterval() > 0) {
1253 const quint32 delay = lastWillProperties.willDelayInterval();
1254 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: specify will delay interval:"
1255 << delay;
1256 properties.append(value: char(0x18));
1257 properties.append(value: delay);
1258 }
1259
1260 // Payload Format Indicator 3.1.3.2.3
1261 if (lastWillProperties.payloadFormatIndicator() != QMqtt::PayloadFormatIndicator::Unspecified) {
1262 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: payload format indicator specified";
1263 properties.append(value: char(0x01));
1264 properties.append(value: char(0x01)); // UTF8
1265 }
1266
1267 // Message Expiry Interval 3.1.3.2.4
1268 if (lastWillProperties.messageExpiryInterval() > 0) {
1269 const quint32 interval = lastWillProperties.messageExpiryInterval();
1270 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Message Expiry interval:"
1271 << interval;
1272 properties.append(value: char(0x02));
1273 properties.append(value: interval);
1274 }
1275
1276 // Content Type 3.1.3.2.5
1277 if (!lastWillProperties.contentType().isEmpty()) {
1278 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Content Type:"
1279 << lastWillProperties.contentType();
1280 properties.append(value: char(0x03));
1281 properties.append(data: lastWillProperties.contentType().toUtf8());
1282 }
1283
1284 // Response Topic 3.1.3.2.6
1285 if (!lastWillProperties.responseTopic().isEmpty()) {
1286 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Response Topic:"
1287 << lastWillProperties.responseTopic();
1288 properties.append(value: char(0x08));
1289 properties.append(data: lastWillProperties.responseTopic().toUtf8());
1290 }
1291
1292 // Correlation Data 3.1.3.2.7
1293 if (!lastWillProperties.correlationData().isEmpty()) {
1294 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Correlation Data:"
1295 << lastWillProperties.correlationData();
1296 properties.append(value: char(0x09));
1297 properties.append(data: lastWillProperties.correlationData());
1298 }
1299
1300 // User Properties 3.1.3.2.8
1301 if (!lastWillProperties.userProperties().isEmpty()) {
1302 auto userProperties = lastWillProperties.userProperties();
1303 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: specify user properties";
1304 for (const auto &prop : userProperties) {
1305 properties.append(value: char(0x26));
1306 properties.append(data: prop.name().toUtf8());
1307 properties.append(data: prop.value().toUtf8());
1308 }
1309 }
1310
1311 return properties.serializePayload();
1312}
1313
1314QByteArray QMqttConnection::writePublishProperties(const QMqttPublishProperties &properties)
1315{
1316 QMqttControlPacket packet;
1317
1318 // 3.3.2.3.2 Payload Indicator
1319 if (properties.availableProperties() & QMqttPublishProperties::PayloadFormatIndicator &&
1320 properties.payloadFormatIndicator() != QMqtt::PayloadFormatIndicator::Unspecified) {
1321 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Payload Indicator:"
1322 << (properties.payloadFormatIndicator() == QMqtt::PayloadFormatIndicator::UTF8Encoded ? 1 : 0);
1323 packet.append(value: char(0x01));
1324 switch (properties.payloadFormatIndicator()) {
1325 case QMqtt::PayloadFormatIndicator::UTF8Encoded:
1326 packet.append(value: char(0x01));
1327 break;
1328 default:
1329 qCDebug(lcMqttConnection) << "Unknown payload indicator.";
1330 break;
1331 }
1332 }
1333
1334 // 3.3.2.3.3 Message Expiry
1335 if (properties.availableProperties() & QMqttPublishProperties::MessageExpiryInterval &&
1336 properties.messageExpiryInterval() > 0) {
1337 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Message Expiry :"
1338 << properties.messageExpiryInterval();
1339 packet.append(value: char(0x02));
1340 packet.append(value: properties.messageExpiryInterval());
1341 }
1342
1343 // 3.3.2.3.4 Topic alias
1344 if (properties.availableProperties() & QMqttPublishProperties::TopicAlias &&
1345 properties.topicAlias() > 0) {
1346 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Topic Alias :"
1347 << properties.topicAlias();
1348 if (m_clientPrivate->m_serverConnectionProperties.availableProperties() & QMqttServerConnectionProperties::MaximumTopicAlias
1349 && properties.topicAlias() > m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()) {
1350 qCDebug(lcMqttConnection) << "Invalid topic alias specified: " << properties.topicAlias()
1351 << " Maximum by server is:"
1352 << m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias();
1353
1354 } else {
1355 packet.append(value: char(0x23));
1356 packet.append(value: properties.topicAlias());
1357 }
1358 }
1359
1360 // 3.3.2.3.5 Response Topic
1361 if (properties.availableProperties() & QMqttPublishProperties::ResponseTopic &&
1362 !properties.responseTopic().isEmpty()) {
1363 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Response Topic :"
1364 << properties.responseTopic();
1365 packet.append(value: char(0x08));
1366 packet.append(data: properties.responseTopic().toUtf8());
1367 }
1368
1369 // 3.3.2.3.6 Correlation Data
1370 if (properties.availableProperties() & QMqttPublishProperties::CorrelationData &&
1371 !properties.correlationData().isEmpty()) {
1372 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Correlation Data :"
1373 << properties.correlationData();
1374 packet.append(value: char(0x09));
1375 packet.append(data: properties.correlationData());
1376 }
1377
1378 // 3.3.2.3.7 User Property
1379 if (properties.availableProperties() & QMqttPublishProperties::UserProperty) {
1380 auto userProperties = properties.userProperties();
1381 if (!userProperties.isEmpty()) {
1382 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: specify user properties";
1383 for (const auto &prop : userProperties) {
1384 packet.append(value: char(0x26));
1385 packet.append(data: prop.name().toUtf8());
1386 packet.append(data: prop.value().toUtf8());
1387 }
1388 }
1389 }
1390
1391 // 3.3.2.3.8 Subscription Identifier
1392 if (properties.availableProperties() & QMqttPublishProperties::SubscriptionIdentifier) {
1393 for (auto id : properties.subscriptionIdentifiers()) {
1394 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Subscription ID:" << id;
1395 packet.append(value: char(0x0b));
1396 packet.appendRawVariableInteger(value: id);
1397 }
1398 }
1399
1400 // 3.3.2.3.9 Content Type
1401 if (properties.availableProperties() & QMqttPublishProperties::ContentType &&
1402 !properties.contentType().isEmpty()) {
1403 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Content Type :"
1404 << properties.contentType();
1405 packet.append(value: char(0x03));
1406 packet.append(data: properties.contentType().toUtf8());
1407 }
1408
1409 return packet.serializePayload();
1410}
1411
1412QByteArray QMqttConnection::writeSubscriptionProperties(const QMqttSubscriptionProperties &properties)
1413{
1414 QMqttControlPacket packet;
1415
1416 // 3.8.2.1.2 Subscription Identifier
1417 if (properties.subscriptionIdentifier() > 0) {
1418 qCDebug(lcMqttConnectionVerbose) << "Subscription Properties: Subscription Identifier";
1419 packet.append(value: char(0x0b));
1420 packet.appendRawVariableInteger(value: properties.subscriptionIdentifier());
1421 }
1422
1423 // 3.8.2.1.3 User Property
1424 auto userProperties = properties.userProperties();
1425 if (!userProperties.isEmpty()) {
1426 qCDebug(lcMqttConnectionVerbose) << "Subscription Properties: specify user properties";
1427 for (const auto &prop : userProperties) {
1428 packet.append(value: char(0x26));
1429 packet.append(data: prop.name().toUtf8());
1430 packet.append(data: prop.value().toUtf8());
1431 }
1432 }
1433
1434 return packet.serializePayload();
1435}
1436
1437QByteArray QMqttConnection::writeUnsubscriptionProperties(const QMqttUnsubscriptionProperties &properties)
1438{
1439 QMqttControlPacket packet;
1440
1441 // 3.10.2.1.2
1442 auto userProperties = properties.userProperties();
1443 if (!userProperties.isEmpty()) {
1444 qCDebug(lcMqttConnectionVerbose) << "Unsubscription Properties: specify user properties";
1445 for (const auto &prop : userProperties) {
1446 packet.append(value: char(0x26));
1447 packet.append(data: prop.name().toUtf8());
1448 packet.append(data: prop.value().toUtf8());
1449 }
1450 }
1451
1452 return packet.serializePayload();
1453}
1454
1455QByteArray QMqttConnection::writeAuthenticationProperties(const QMqttAuthenticationProperties &properties)
1456{
1457 QMqttControlPacket packet;
1458
1459 // 3.15.2.2.2
1460 if (!properties.authenticationMethod().isEmpty()) {
1461 packet.append(value: char(0x15));
1462 packet.append(data: properties.authenticationMethod().toUtf8());
1463 }
1464 // 3.15.2.2.3
1465 if (!properties.authenticationData().isEmpty()) {
1466 packet.append(value: char(0x16));
1467 packet.append(data: properties.authenticationData());
1468 }
1469
1470 // 3.15.2.2.4
1471 if (!properties.reason().isEmpty()) {
1472 packet.append(value: char(0x1F));
1473 packet.append(data: properties.reason().toUtf8());
1474 }
1475
1476 // 3.15.2.2.5
1477 auto userProperties = properties.userProperties();
1478 if (!userProperties.isEmpty()) {
1479 qCDebug(lcMqttConnectionVerbose) << "Unsubscription Properties: specify user properties";
1480 for (const auto &prop : userProperties) {
1481 packet.append(value: char(0x26));
1482 packet.append(data: prop.name().toUtf8());
1483 packet.append(data: prop.value().toUtf8());
1484 }
1485 }
1486
1487 return packet.serializePayload();
1488}
1489
1490void QMqttConnection::finalize_auth()
1491{
1492 qCDebug(lcMqttConnectionVerbose) << "Finalize AUTH";
1493
1494 quint8 authReason = 0;
1495 QMqttAuthenticationProperties authProperties;
1496 // 3.15.2.1 - The Reason Code and Property Length can be omitted if the Reason Code
1497 // is 0x00 (Success) and there are no Properties. In this case the AUTH has a
1498 // Remaining Length of 0.
1499 if (m_missingData > 0) {
1500 authReason = readBufferTyped<quint8>(dataSize: &m_missingData);
1501 readAuthProperties(properties&: authProperties);
1502 }
1503
1504 // 3.15.2.1
1505 switch (QMqtt::ReasonCode(authReason)) {
1506 case QMqtt::ReasonCode::Success:
1507 emit m_clientPrivate->m_client->authenticationFinished(p: authProperties);
1508 break;
1509 case QMqtt::ReasonCode::ContinueAuthentication:
1510 case QMqtt::ReasonCode::ReAuthenticate:
1511 emit m_clientPrivate->m_client->authenticationRequested(p: authProperties);
1512 break;
1513 default:
1514 qCDebug(lcMqttConnection) << "Received illegal AUTH reason code:" << authReason;
1515 closeConnection(error: QMqttClient::ProtocolViolation);
1516 break;
1517 }
1518}
1519
1520void QMqttConnection::finalize_connack()
1521{
1522 qCDebug(lcMqttConnectionVerbose) << "Finalize CONNACK";
1523
1524 const quint8 ackFlags = readBufferTyped<quint8>(dataSize: &m_missingData);
1525
1526 if (ackFlags > 1) { // MQTT-3.2.2.1
1527 qCDebug(lcMqttConnection) << "Unexpected CONNACK Flags specified:" << QString::number(ackFlags);
1528 readBuffer(size: quint64(m_missingData));
1529 m_missingData = 0;
1530 closeConnection(error: QMqttClient::ProtocolViolation);
1531 return;
1532 }
1533 bool sessionPresent = ackFlags == 1;
1534
1535 // MQTT-3.2.2-1 & MQTT-3.2.2-2
1536 if (sessionPresent) {
1537 emit m_clientPrivate->m_client->brokerSessionRestored();
1538 if (m_clientPrivate->m_cleanSession)
1539 qCDebug(lcMqttConnection) << "Connected with a clean session, ack contains session present.";
1540 } else {
1541 // MQTT-4.1.0.-1 MQTT-4.1.0-2 Session not stored on broker side
1542 // regardless whether cleanSession is false
1543 cleanSubscriptions();
1544 }
1545
1546 quint8 connectResultValue = readBufferTyped<quint8>(dataSize: &m_missingData);
1547 QMqttServerConnectionProperties serverProp;
1548 serverProp.serverData->reasonCode = QMqtt::ReasonCode(connectResultValue);
1549 m_clientPrivate->m_serverConnectionProperties = serverProp;
1550 if (connectResultValue != 0 && m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) {
1551 qCDebug(lcMqttConnection) << "Connection has been rejected.";
1552 closeConnection(error: static_cast<QMqttClient::ClientError>(connectResultValue));
1553 return;
1554 }
1555
1556 // MQTT 5.0 has variable part != 2 in the header
1557 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
1558 readConnackProperties(properties&: m_clientPrivate->m_serverConnectionProperties);
1559 m_receiveAliases.resize(size: m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias());
1560 m_publishAliases.resize(size: m_clientPrivate->m_connectionProperties.maximumTopicAlias());
1561
1562 // 3.2.2.2
1563 switch (QMqtt::ReasonCode(connectResultValue)) {
1564 case QMqtt::ReasonCode::Success:
1565 break;
1566 case QMqtt::ReasonCode::MalformedPacket:
1567 case QMqtt::ReasonCode::ProtocolError:
1568 closeConnection(error: QMqttClient::ProtocolViolation);
1569 return;
1570 case QMqtt::ReasonCode::UnsupportedProtocolVersion:
1571 closeConnection(error: QMqttClient::InvalidProtocolVersion);
1572 return;
1573 case QMqtt::ReasonCode::InvalidClientId:
1574 closeConnection(error: QMqttClient::IdRejected);
1575 return;
1576 case QMqtt::ReasonCode::ServerNotAvailable:
1577 case QMqtt::ReasonCode::ServerBusy:
1578 case QMqtt::ReasonCode::UseAnotherServer:
1579 case QMqtt::ReasonCode::ServerMoved:
1580 closeConnection(error: QMqttClient::ServerUnavailable);
1581 return;
1582 case QMqtt::ReasonCode::InvalidUserNameOrPassword:
1583 closeConnection(error: QMqttClient::BadUsernameOrPassword);
1584 return;
1585 case QMqtt::ReasonCode::NotAuthorized:
1586 closeConnection(error: QMqttClient::NotAuthorized);
1587 return;
1588 case QMqtt::ReasonCode::UnspecifiedError:
1589 closeConnection(error: QMqttClient::UnknownError);
1590 return;
1591 case QMqtt::ReasonCode::ImplementationSpecificError:
1592 case QMqtt::ReasonCode::ClientBanned:
1593 case QMqtt::ReasonCode::InvalidAuthenticationMethod:
1594 case QMqtt::ReasonCode::InvalidTopicName:
1595 case QMqtt::ReasonCode::PacketTooLarge:
1596 case QMqtt::ReasonCode::QuotaExceeded:
1597 case QMqtt::ReasonCode::InvalidPayloadFormat:
1598 case QMqtt::ReasonCode::RetainNotSupported:
1599 case QMqtt::ReasonCode::QoSNotSupported:
1600 case QMqtt::ReasonCode::ExceededConnectionRate:
1601 closeConnection(error: QMqttClient::Mqtt5SpecificError);
1602 return;
1603 default:
1604 qCDebug(lcMqttConnection) << "Received illegal CONNACK reason code:" << connectResultValue;
1605 closeConnection(error: QMqttClient::ProtocolViolation);
1606 return;
1607 }
1608 }
1609
1610 m_internalState = BrokerConnected;
1611 m_clientPrivate->setStateAndError(s: QMqttClient::Connected);
1612
1613 if (m_clientPrivate->m_autoKeepAlive)
1614 m_pingTimer.start(msec: m_clientPrivate->m_keepAlive * 1000, obj: this);
1615}
1616
1617void QMqttConnection::finalize_suback()
1618{
1619 const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData);
1620
1621 auto sub = m_pendingSubscriptionAck.take(key: id);
1622 if (Q_UNLIKELY(sub == nullptr)) {
1623 qCDebug(lcMqttConnection) << "Received SUBACK for unknown subscription request.";
1624 return;
1625 }
1626
1627 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
1628 readSubscriptionProperties(sub);
1629
1630 // 3.9.3 - The Payload contains a list of Reason Codes. Each Reason Code corresponds to a Topic Filter in the SUBSCRIBE packet being acknowledged.
1631 // Whereas 3.8.3 states "The Payload MUST contain at least one Topic Filter and Subscription Options pair. A SUBSCRIBE packet with no Payload is a Protocol Error."
1632 do {
1633 quint8 reason = readBufferTyped<quint8>(dataSize: &m_missingData);
1634
1635 sub->d_func()->m_reasonCode = QMqtt::ReasonCode(reason);
1636
1637 // 3.9.3
1638 switch (QMqtt::ReasonCode(reason)) {
1639 case QMqtt::ReasonCode::SubscriptionQoSLevel0:
1640 case QMqtt::ReasonCode::SubscriptionQoSLevel1:
1641 case QMqtt::ReasonCode::SubscriptionQoSLevel2:
1642 qCDebug(lcMqttConnectionVerbose) << "Finalize SUBACK: id:" << id << "qos:" << reason;
1643 // The broker might have a different support level for QoS than what
1644 // the client requested
1645 if (reason != sub->qos()) {
1646 sub->setQos(reason);
1647 emit sub->qosChanged(reason);
1648 }
1649 sub->setState(QMqttSubscription::Subscribed);
1650 break;
1651 case QMqtt::ReasonCode::UnspecifiedError:
1652 qCWarning(lcMqttConnection) << "Subscription for id " << id << " failed. Reason Code:" << reason;
1653 sub->setState(QMqttSubscription::Error);
1654 break;
1655 case QMqtt::ReasonCode::ImplementationSpecificError:
1656 case QMqtt::ReasonCode::NotAuthorized:
1657 case QMqtt::ReasonCode::InvalidTopicFilter:
1658 case QMqtt::ReasonCode::MessageIdInUse:
1659 case QMqtt::ReasonCode::QuotaExceeded:
1660 case QMqtt::ReasonCode::SharedSubscriptionsNotSupported:
1661 case QMqtt::ReasonCode::SubscriptionIdsNotSupported:
1662 case QMqtt::ReasonCode::WildCardSubscriptionsNotSupported:
1663 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
1664 qCWarning(lcMqttConnection) << "Subscription for id " << id << " failed. Reason Code:" << reason;
1665 sub->setState(QMqttSubscription::Error);
1666 break;
1667 }
1668 Q_FALLTHROUGH();
1669 default:
1670 qCWarning(lcMqttConnection) << "Received illegal SUBACK reason code:" << reason;
1671 closeConnection(error: QMqttClient::ProtocolViolation);
1672 break;
1673 }
1674 } while (m_missingData > 0);
1675}
1676
1677void QMqttConnection::finalize_unsuback()
1678{
1679 const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData);
1680 qCDebug(lcMqttConnectionVerbose) << "Finalize UNSUBACK: " << id;
1681
1682 auto sub = m_pendingUnsubscriptions.take(key: id);
1683 if (Q_UNLIKELY(sub == nullptr)) {
1684 qCDebug(lcMqttConnection) << "Received UNSUBACK for unknown request.";
1685 return;
1686 }
1687
1688 m_activeSubscriptions.remove(key: sub->topic(), value: sub);
1689
1690 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
1691 readSubscriptionProperties(sub);
1692 } else {
1693 // 3.11.3 - The UNSUBACK Packet has no payload.
1694 // emulate successful unsubscription
1695 sub->d_func()->m_reasonCode = QMqtt::ReasonCode::Success;
1696 sub->setState(QMqttSubscription::Unsubscribed);
1697 return;
1698 }
1699
1700 // 3.1.3 - The Payload contains a list of Reason Codes. Each Reason Code corresponds to a Topic Filter in the UNSUBSCRIBE packet being acknowledged.
1701 // Whereas 3.10.3 states "The Payload of an UNSUBSCRIBE packet MUST contain at least one Topic Filter. An UNSUBSCRIBE packet with no Payload is a Protocol Error."
1702 do {
1703 const quint8 reasonCode = readBufferTyped<quint8>(dataSize: &m_missingData);
1704 sub->d_func()->m_reasonCode = QMqtt::ReasonCode(reasonCode);
1705
1706 // 3.11.3
1707 switch (QMqtt::ReasonCode(reasonCode)) {
1708 case QMqtt::ReasonCode::Success:
1709 sub->setState(QMqttSubscription::Unsubscribed);
1710 break;
1711 case QMqtt::ReasonCode::NoSubscriptionExisted:
1712 case QMqtt::ReasonCode::ImplementationSpecificError:
1713 case QMqtt::ReasonCode::NotAuthorized:
1714 case QMqtt::ReasonCode::InvalidTopicFilter:
1715 case QMqtt::ReasonCode::MessageIdInUse:
1716 case QMqtt::ReasonCode::UnspecifiedError:
1717 qCWarning(lcMqttConnection) << "Unsubscription for id " << id << " failed. Reason Code:" << reasonCode;
1718 sub->setState(QMqttSubscription::Error);
1719 break;
1720 default:
1721 qCWarning(lcMqttConnection) << "Received illegal UNSUBACK reason code:" << reasonCode;
1722 closeConnection(error: QMqttClient::ProtocolViolation);
1723 break;
1724 }
1725 } while (m_missingData > 0);
1726}
1727
1728void QMqttConnection::finalize_publish()
1729{
1730 // String topic
1731 QMqttTopicName topic = readBufferTyped<QString>(dataSize: &m_missingData);
1732 const int topicLength = topic.name().size();
1733
1734 quint16 id = 0;
1735 if (m_currentPublish.qos > 0)
1736 id = readBufferTyped<quint16>(dataSize: &m_missingData);
1737
1738 QMqttPublishProperties publishProperties;
1739 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
1740 readPublishProperties(properties&: publishProperties);
1741
1742 if (publishProperties.availableProperties() & QMqttPublishProperties::TopicAlias) {
1743 const quint16 topicAlias = publishProperties.topicAlias();
1744 if (topicAlias == 0 || topicAlias > m_clientPrivate->m_connectionProperties.maximumTopicAlias()) {
1745 qCDebug(lcMqttConnection) << "TopicAlias receive: overflow.";
1746 closeConnection(error: QMqttClient::ProtocolViolation);
1747 return;
1748 }
1749 if (topicLength == 0) { // New message on existing topic alias
1750 topic = m_receiveAliases.at(i: topicAlias - 1);
1751 if (topic.name().isEmpty()) {
1752 qCDebug(lcMqttConnection) << "TopicAlias receive: alias for unknown topic.";
1753 closeConnection(error: QMqttClient::ProtocolViolation);
1754 return;
1755 }
1756 qCDebug(lcMqttConnectionVerbose) << "TopicAlias receive: Using " << topicAlias;
1757 } else { // Resetting a topic alias
1758 qCDebug(lcMqttConnection) << "TopicAlias receive: Resetting:" << topic.name() << " : " << topicAlias;
1759 m_receiveAliases[topicAlias - 1] = topic;
1760 }
1761 }
1762
1763 // message
1764 const quint64 payloadLength = quint64(m_missingData);
1765 const QByteArray message = readBuffer(size: payloadLength);
1766 m_missingData -= payloadLength;
1767
1768 qCDebug(lcMqttConnectionVerbose) << "Finalize PUBLISH: topic:" << topic
1769 << " payloadLength:" << payloadLength;
1770
1771 emit m_clientPrivate->m_client->messageReceived(message, topic);
1772
1773 QMqttMessage qmsg(topic, message, id, m_currentPublish.qos,
1774 m_currentPublish.dup, m_currentPublish.retain);
1775 qmsg.d->m_publishProperties = publishProperties;
1776
1777 if (id != 0) {
1778 QMqttMessageStatusProperties statusProp;
1779 statusProp.data->userProperties = publishProperties.userProperties();
1780 emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Published, properties: statusProp);
1781 }
1782
1783 // Store subscriptions in a temporary container as each messageReceived is allowed to subscribe
1784 // again and thus invalid the iterator of the loop.
1785 QList<QMqttSubscription *> subscribers;
1786 for (const auto [key, value] : m_activeSubscriptions.asKeyValueRange()) {
1787 if (key.match(name: topic))
1788 subscribers.append(t: value);
1789 }
1790 for (const auto &s : subscribers)
1791 emit s->messageReceived(msg: qmsg);
1792
1793 if (m_currentPublish.qos == 1)
1794 sendControlPublishAcknowledge(id);
1795 else if (m_currentPublish.qos == 2)
1796 sendControlPublishReceive(id);
1797}
1798
1799void QMqttConnection::finalize_pubAckRecRelComp()
1800{
1801 qCDebug(lcMqttConnectionVerbose) << "Finalize PUBACK/REC/REL/COMP";
1802 const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData);
1803
1804 QMqttMessageStatusProperties properties;
1805 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) {
1806 // Reason Code (1byte)
1807 const quint8 reasonCode = readBufferTyped<quint8>(dataSize: &m_missingData);
1808 properties.data->reasonCode = QMqtt::ReasonCode(reasonCode);
1809
1810 if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBACK || (m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) {
1811 // 3.4.2.1, 3.5.2.1
1812 switch (QMqtt::ReasonCode(reasonCode)) {
1813 case QMqtt::ReasonCode::Success:
1814 case QMqtt::ReasonCode::NoMatchingSubscriber:
1815 case QMqtt::ReasonCode::UnspecifiedError:
1816 case QMqtt::ReasonCode::ImplementationSpecificError:
1817 case QMqtt::ReasonCode::NotAuthorized:
1818 case QMqtt::ReasonCode::InvalidTopicName:
1819 case QMqtt::ReasonCode::MessageIdInUse:
1820 case QMqtt::ReasonCode::QuotaExceeded:
1821 case QMqtt::ReasonCode::InvalidPayloadFormat:
1822 break;
1823 default:
1824 qCWarning(lcMqttConnection) << "Received illegal PUBACK/REC reason code:" << reasonCode;
1825 closeConnection(error: QMqttClient::ProtocolViolation);
1826 return;
1827 }
1828 } else {
1829 // 3.6.2.1, 3.7.2.1
1830 switch (QMqtt::ReasonCode(reasonCode)) {
1831 case QMqtt::ReasonCode::Success:
1832 case QMqtt::ReasonCode::MessageIdNotFound:
1833 break;
1834 default:
1835 qCWarning(lcMqttConnection) << "Received illegal PUBREL/COMP reason code:" << reasonCode;
1836 closeConnection(error: QMqttClient::ProtocolViolation);
1837 return;
1838 }
1839 }
1840
1841 readMessageStatusProperties(properties);
1842 }
1843
1844 if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREL) {
1845 qCDebug(lcMqttConnectionVerbose) << " PUBREL:" << id;
1846 emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Released, properties);
1847 sendControlPublishComp(id);
1848 return;
1849 }
1850
1851 if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBCOMP) {
1852 qCDebug(lcMqttConnectionVerbose) << " PUBCOMP:" << id;
1853 auto pendingRelease = m_pendingReleaseMessages.take(key: id);
1854 if (!pendingRelease)
1855 qCDebug(lcMqttConnection) << "Received PUBCOMP for unknown released message.";
1856 emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Completed, properties);
1857 emit m_clientPrivate->m_client->messageSent(id);
1858 return;
1859 }
1860
1861 auto pendingMsg = m_pendingMessages.take(key: id);
1862 if (!pendingMsg) {
1863 qCDebug(lcMqttConnection) << "Received PUBACK for unknown message: " << id;
1864 return;
1865 }
1866 if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) {
1867 qCDebug(lcMqttConnectionVerbose) << " PUBREC:" << id;
1868 m_pendingReleaseMessages.insert(key: id, value: pendingMsg);
1869 emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Received, properties);
1870 sendControlPublishRelease(id);
1871 } else {
1872 qCDebug(lcMqttConnectionVerbose) << " PUBACK:" << id;
1873 emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Acknowledged, properties);
1874 emit m_clientPrivate->m_client->messageSent(id);
1875 }
1876}
1877
1878void QMqttConnection::finalize_pingresp()
1879{
1880 qCDebug(lcMqttConnectionVerbose) << "Finalize PINGRESP";
1881 const quint8 v = readBufferTyped<quint8>(dataSize: &m_missingData);
1882
1883 if (v != 0) {
1884 qCDebug(lcMqttConnection) << "Received a PINGRESP including payload.";
1885 closeConnection(error: QMqttClient::ProtocolViolation);
1886 return;
1887 }
1888 m_pingTimeout--;
1889 emit m_clientPrivate->m_client->pingResponseReceived();
1890}
1891
1892bool QMqttConnection::processDataHelper()
1893{
1894 if (m_missingData > 0) {
1895 if ((m_readBuffer.size() - m_readPosition) < m_missingData)
1896 return false;
1897
1898 switch (m_currentPacket & 0xF0) {
1899 case QMqttControlPacket::AUTH:
1900 finalize_auth();
1901 break;
1902 case QMqttControlPacket::CONNACK:
1903 finalize_connack();
1904 break;
1905 case QMqttControlPacket::SUBACK:
1906 finalize_suback();
1907 break;
1908 case QMqttControlPacket::UNSUBACK:
1909 finalize_unsuback();
1910 break;
1911 case QMqttControlPacket::PUBLISH:
1912 finalize_publish();
1913 break;
1914 case QMqttControlPacket::PUBACK:
1915 case QMqttControlPacket::PUBREC:
1916 case QMqttControlPacket::PUBREL:
1917 case QMqttControlPacket::PUBCOMP:
1918 finalize_pubAckRecRelComp();
1919 break;
1920 case QMqttControlPacket::PINGRESP:
1921 finalize_pingresp();
1922 break;
1923 default:
1924 qCDebug(lcMqttConnection) << "Unknown packet to finalize.";
1925 closeConnection(error: QMqttClient::ProtocolViolation);
1926 break;
1927 }
1928
1929 if (m_internalState == BrokerDisconnected)
1930 return false;
1931
1932 Q_ASSERT(m_missingData == 0);
1933
1934 m_readBuffer = m_readBuffer.mid(index: m_readPosition);
1935 m_readPosition = 0;
1936 }
1937
1938 // MQTT-2.2 A fixed header of a control packet must be at least 2 bytes. If the payload is
1939 // longer than 127 bytes the header can be up to 5 bytes long.
1940 switch (m_readBuffer.size()) {
1941 case 0:
1942 case 1:
1943 return false;
1944 case 2:
1945 if ((m_readBuffer.at(i: 1) & 128) != 0)
1946 return false;
1947 break;
1948 case 3:
1949 if ((m_readBuffer.at(i: 1) & 128) != 0 && (m_readBuffer.at(i: 2) & 128) != 0)
1950 return false;
1951 break;
1952 case 4:
1953 if ((m_readBuffer.at(i: 1) & 128) != 0 && (m_readBuffer.at(i: 2) & 128) != 0 && (m_readBuffer.at(i: 3) & 128) != 0)
1954 return false;
1955 break;
1956 default:
1957 break;
1958 }
1959
1960 readBuffer(data: reinterpret_cast<char *>(&m_currentPacket), size: 1);
1961
1962 switch (m_currentPacket & 0xF0) {
1963 case QMqttControlPacket::CONNACK: {
1964 qCDebug(lcMqttConnectionVerbose) << "Received CONNACK";
1965 if (m_internalState != BrokerWaitForConnectAck) {
1966 qCDebug(lcMqttConnection) << "Received CONNACK at unexpected time.";
1967 closeConnection(error: QMqttClient::ProtocolViolation);
1968 return false;
1969 }
1970
1971 qint32 payloadSize = readVariableByteInteger();
1972 if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) {
1973 if (payloadSize != 2) {
1974 qCDebug(lcMqttConnection) << "Unexpected FRAME size in CONNACK.";
1975 closeConnection(error: QMqttClient::ProtocolViolation);
1976 return false;
1977 }
1978 }
1979 m_missingData = payloadSize;
1980 break;
1981 }
1982 case QMqttControlPacket::SUBACK: {
1983 qCDebug(lcMqttConnectionVerbose) << "Received SUBACK";
1984 const quint8 remaining = readBufferTyped<quint8>();
1985 m_missingData = remaining;
1986 break;
1987 }
1988 case QMqttControlPacket::PUBLISH: {
1989 qCDebug(lcMqttConnectionVerbose) << "Received PUBLISH";
1990 m_currentPublish.dup = m_currentPacket & 0x08;
1991 m_currentPublish.qos = (m_currentPacket & 0x06) >> 1;
1992 m_currentPublish.retain = m_currentPacket & 0x01;
1993 if ((m_currentPublish.qos == 0 && m_currentPublish.dup != 0)
1994 || m_currentPublish.qos > 2) {
1995 closeConnection(error: QMqttClient::ProtocolViolation);
1996 return false;
1997 }
1998
1999 m_missingData = readVariableByteInteger();
2000 if (m_missingData == -1)
2001 return false; // Connection closed inside readVariableByteInteger
2002 break;
2003 }
2004 case QMqttControlPacket::PINGRESP:
2005 qCDebug(lcMqttConnectionVerbose) << "Received PINGRESP";
2006 m_missingData = 1;
2007 break;
2008
2009
2010 case QMqttControlPacket::PUBREL: {
2011 qCDebug(lcMqttConnectionVerbose) << "Received PUBREL";
2012 const quint8 remaining = readBufferTyped<quint8>();
2013 if (remaining != 0x02) {
2014 qCDebug(lcMqttConnection) << "Received 2 byte message with invalid remaining length.";
2015 closeConnection(error: QMqttClient::ProtocolViolation);
2016 return false;
2017 }
2018 if ((m_currentPacket & 0x0F) != 0x02) {
2019 qCDebug(lcMqttConnection) << "Malformed fixed header for PUBREL.";
2020 closeConnection(error: QMqttClient::ProtocolViolation);
2021 return false;
2022 }
2023 m_missingData = 2;
2024 break;
2025 }
2026
2027 case QMqttControlPacket::UNSUBACK:
2028 case QMqttControlPacket::PUBACK:
2029 case QMqttControlPacket::PUBREC:
2030 case QMqttControlPacket::PUBCOMP: {
2031 qCDebug(lcMqttConnectionVerbose) << "Received UNSUBACK/PUBACK/PUBREC/PUBCOMP";
2032 if ((m_currentPacket & 0x0F) != 0) {
2033 qCDebug(lcMqttConnection) << "Malformed fixed header for UNSUBACK/PUBACK/PUBREC/PUBCOMP.";
2034 closeConnection(error: QMqttClient::ProtocolViolation);
2035 return false;
2036 }
2037 const quint8 remaining = readBufferTyped<quint8>();
2038 if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0 && remaining != 0x02) {
2039 qCDebug(lcMqttConnection) << "Received 2 byte message with invalid remaining length.";
2040 closeConnection(error: QMqttClient::ProtocolViolation);
2041 return false;
2042 }
2043 m_missingData = remaining;
2044 break;
2045 }
2046 case QMqttControlPacket::AUTH:
2047 if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) {
2048 qCDebug(lcMqttConnection) << "Received unknown command.";
2049 closeConnection(error: QMqttClient::ProtocolViolation);
2050 return false;
2051 }
2052 qCDebug(lcMqttConnectionVerbose) << "Received AUTH";
2053 if ((m_currentPacket & 0x0F) != 0) {
2054 qCDebug(lcMqttConnection) << "Malformed fixed header for AUTH.";
2055 closeConnection(error: QMqttClient::ProtocolViolation);
2056 return false;
2057 }
2058 m_missingData = readVariableByteInteger();
2059 if (m_missingData == -1)
2060 return false; // Connection closed inside readVariableByteInteger
2061 break;
2062 case QMqttControlPacket::DISCONNECT:
2063 if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) {
2064 qCDebug(lcMqttConnection) << "Received unknown command.";
2065 closeConnection(error: QMqttClient::ProtocolViolation);
2066 return false;
2067 }
2068 qCDebug(lcMqttConnectionVerbose) << "Received DISCONNECT";
2069 if ((m_currentPacket & 0x0F) != 0) {
2070 qCDebug(lcMqttConnection) << "Malformed fixed header for DISCONNECT.";
2071 closeConnection(error: QMqttClient::ProtocolViolation);
2072 return false;
2073 }
2074 if (m_internalState != BrokerConnected) {
2075 qCDebug(lcMqttConnection) << "Received DISCONNECT at unexpected time.";
2076 closeConnection(error: QMqttClient::ProtocolViolation);
2077 return false;
2078 }
2079 closeConnection(error: QMqttClient::NoError);
2080 return false;
2081 default:
2082 qCDebug(lcMqttConnection) << "Received unknown command.";
2083 closeConnection(error: QMqttClient::ProtocolViolation);
2084 return false;
2085 }
2086
2087 /* set current command CONNACK - PINGRESP */
2088 /* read command size */
2089 /* calculate missing_data */
2090 return true; // reiterate. implicitly finishes and enqueues
2091}
2092
2093void QMqttConnection::processData()
2094{
2095 while (processDataHelper())
2096 ;
2097}
2098
2099bool QMqttConnection::writePacketToTransport(const QMqttControlPacket &p)
2100{
2101 const QByteArray writeData = p.serialize();
2102 qCDebug(lcMqttConnectionVerbose) << Q_FUNC_INFO << " DataSize:" << writeData.size();
2103 const qint64 res = m_transport->write(data: writeData.constData(), len: writeData.size());
2104 if (Q_UNLIKELY(res == -1)) {
2105 qCDebug(lcMqttConnection) << "Could not write frame to transport.";
2106 return false;
2107 }
2108 return true;
2109}
2110
2111QT_END_NAMESPACE
2112

source code of qtmqtt/src/mqtt/qmqttconnection.cpp