1// Copyright (C) 2022 The Qt Company Ltd.
2// Copyright (C) 2019 Alexey Edelev <semlanik@gmail.com>, Viktor Kopp <vifactor@gmail.com>
3// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only
4
5#include <QtGrpc/private/qtgrpclogging_p.h>
6#include <QtGrpc/qgrpccalloptions.h>
7#include <QtGrpc/qgrpcchanneloptions.h>
8#include <QtGrpc/qgrpchttp2channel.h>
9#include <QtGrpc/qgrpcoperationcontext.h>
10#include <QtGrpc/qgrpcserializationformat.h>
11#include <QtGrpc/qgrpcstatus.h>
12
13#include <QtProtobuf/qprotobufjsonserializer.h>
14#include <QtProtobuf/qprotobufserializer.h>
15
16#include <QtNetwork/private/hpack_p.h>
17#include <QtNetwork/private/http2protocol_p.h>
18#include <QtNetwork/private/qhttp2connection_p.h>
19#if QT_CONFIG(localserver)
20# include <QtNetwork/qlocalsocket.h>
21#endif
22#include <QtNetwork/qtcpsocket.h>
23#if QT_CONFIG(ssl)
24# include <QtNetwork/qsslsocket.h>
25#endif
26
27#include <QtCore/private/qnoncontiguousbytedevice_p.h>
28#include <QtCore/qalgorithms.h>
29#include <QtCore/qbytearray.h>
30#include <QtCore/qendian.h>
31#include <QtCore/qiodevice.h>
32#include <QtCore/qlist.h>
33#include <QtCore/qmetaobject.h>
34#include <QtCore/qpointer.h>
35#include <QtCore/qqueue.h>
36#include <QtCore/qtimer.h>
37
38#include <functional>
39#include <optional>
40#include <utility>
41
42QT_BEGIN_NAMESPACE
43
44using namespace Qt::StringLiterals;
45using namespace QtGrpc;
46
47/*!
48 \class QGrpcHttp2Channel
49 \inmodule QtGrpc
50
51 \brief The QGrpcHttp2Channel class is an HTTP/2-based of
52 QAbstractGrpcChannel, based on \l {Qt Network} HTTP/2 implementation.
53
54 Uses \l QGrpcChannelOptions and \l QGrpcCallOptions
55 to control the HTTP/2 communication with the server.
56
57 Use \l QGrpcChannelOptions to set the SSL configuration,
58 application-specific HTTP/2 headers, and connection timeouts.
59
60 \l QGrpcCallOptions control channel parameters for the
61 specific unary call or gRPC stream.
62
63 QGrpcHttp2Channel uses QGrpcChannelOptions to select the serialization
64 format for the protobuf messages. The serialization format can be set
65 either using the \c {content-type} metadata or by setting the
66 \l QGrpcChannelOptions::serializationFormat directly.
67
68 Using the following example you can create a QGrpcHttp2Channel with the
69 JSON serialization format using the \c {content-type} metadata:
70 \code
71 auto channelJson = std::make_shared<
72 QGrpcHttp2Channel>(QGrpcChannelOptions{ QUrl("http://localhost:50051", QUrl::StrictMode) }
73 .withMetadata({ { "content-type"_ba,
74 "application/grpc+json"_ba } }));
75 \endcode
76
77 Also you can use own serializer and custom \c {content-type} as following:
78 \code
79 class DummySerializer : public QAbstractProtobufSerializer
80 {
81 ...
82 };
83 auto channel = std::make_shared<
84 QGrpcHttp2Channel>(QUrl("http://localhost:50051", QUrl::StrictMode), QGrpcChannelOptions{}
85 .setSerializationFormat(QGrpcSerializationFormat{ "dummy",
86 std::make_shared<DummySerializer>() }));
87 \endcode
88
89 QGrpcHttp2Channel will use the \c DummySerializer to serialize
90 and deserialize protobuf message and use the
91 \c { content-type: application/grpc+dummy } header when sending
92 HTTP/2 requests to server.
93
94 \l QGrpcChannelOptions::serializationFormat has higher priority and
95 if the \c {content-type} metadata suffix doesn't match the
96 \l QGrpcSerializationFormat::suffix of the specified
97 \l QGrpcChannelOptions::serializationFormat QGrpcHttp2Channel produces
98 warning.
99
100 \sa QGrpcChannelOptions, QGrpcCallOptions, QSslConfiguration
101*/
102
103namespace {
104
105constexpr QByteArrayView AuthorityHeader(":authority");
106constexpr QByteArrayView MethodHeader(":method");
107constexpr QByteArrayView PathHeader(":path");
108constexpr QByteArrayView SchemeHeader(":scheme");
109
110constexpr QByteArrayView ContentTypeHeader("content-type");
111constexpr QByteArrayView AcceptEncodingHeader("accept-encoding");
112constexpr QByteArrayView TEHeader("te");
113
114constexpr QByteArrayView GrpcServiceNameHeader("service-name");
115constexpr QByteArrayView GrpcAcceptEncodingHeader("grpc-accept-encoding");
116constexpr QByteArrayView GrpcStatusHeader("grpc-status");
117constexpr QByteArrayView GrpcStatusMessageHeader("grpc-message");
118constexpr qsizetype GrpcMessageSizeHeaderSize = 5;
119constexpr QByteArrayView DefaultContentType = "application/grpc";
120
121// This HTTP/2 Error Codes to QGrpcStatus::StatusCode mapping should be kept in sync
122// with the following docs:
123// https://www.rfc-editor.org/rfc/rfc7540#section-7
124// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
125// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
126constexpr StatusCode http2ErrorToStatusCode(const quint32 http2Error)
127{
128 using Http2Error = Http2::Http2Error;
129
130 switch (http2Error) {
131 case Http2Error::HTTP2_NO_ERROR:
132 case Http2Error::PROTOCOL_ERROR:
133 case Http2Error::INTERNAL_ERROR:
134 case Http2Error::FLOW_CONTROL_ERROR:
135 case Http2Error::SETTINGS_TIMEOUT:
136 case Http2Error::STREAM_CLOSED:
137 case Http2Error::FRAME_SIZE_ERROR:
138 return StatusCode::Internal;
139 case Http2Error::REFUSE_STREAM:
140 return StatusCode::Unavailable;
141 case Http2Error::CANCEL:
142 return StatusCode::Cancelled;
143 case Http2Error::COMPRESSION_ERROR:
144 case Http2Error::CONNECT_ERROR:
145 return StatusCode::Internal;
146 case Http2Error::ENHANCE_YOUR_CALM:
147 return StatusCode::ResourceExhausted;
148 case Http2Error::INADEQUATE_SECURITY:
149 return StatusCode::PermissionDenied;
150 case Http2Error::HTTP_1_1_REQUIRED:
151 return StatusCode::Unknown;
152 }
153 return StatusCode::Internal;
154}
155
156// Sends the errorOccured and finished signals asynchronously to make sure user
157// connections work correctly.
158void operationContextAsyncError(QGrpcOperationContext *operationContext, const QGrpcStatus &status)
159{
160 Q_ASSERT_X(operationContext != nullptr, "QGrpcHttp2ChannelPrivate::operationContextAsyncError",
161 "operationContext is null");
162 QTimer::singleShot(interval: 0, receiver: operationContext,
163 slot: [operationContext, status]() { emit operationContext->finished(status); });
164}
165
166} // namespace
167
168struct ExpectedData
169{
170 qsizetype expectedSize = 0;
171 QByteArray container;
172
173 bool updateExpectedSize()
174 {
175 if (expectedSize == 0) {
176 if (container.size() < GrpcMessageSizeHeaderSize)
177 return false;
178 expectedSize = qFromBigEndian<quint32>(src: container.data() + 1)
179 + GrpcMessageSizeHeaderSize;
180 }
181 return true;
182 }
183};
184
185// The Http2Handler manages an individual RPC over the HTTP/2 channel.
186// Each instance corresponds to an RPC initiated by the user.
187class Http2Handler : public QObject
188{
189 // Q_OBJECT macro is not needed and adds unwanted overhead.
190
191public:
192 enum State : uint8_t { Active, Cancelled, Finished };
193
194 explicit Http2Handler(const std::shared_ptr<QGrpcOperationContext> &operation,
195 QGrpcHttp2ChannelPrivate *parent, bool endStream);
196 ~Http2Handler() override;
197
198 void sendInitialRequest();
199 void attachStream(QHttp2Stream *stream_);
200 void processQueue();
201
202 [[nodiscard]] QGrpcOperationContext *operation() const;
203 [[nodiscard]] bool expired() const { return m_operation.expired(); }
204
205 [[nodiscard]] bool isStreamClosedForSending() const
206 {
207 // If stream pointer is nullptr this means we never opened it and should collect
208 // the incoming messages in queue until the stream is opened or the error occurred.
209 return m_stream != nullptr
210 && (m_stream->state() == QHttp2Stream::State::HalfClosedLocal
211 || m_stream->state() == QHttp2Stream::State::Closed);
212 }
213
214// context slot handlers:
215 bool cancel();
216 void writesDone();
217 void writeMessage(QByteArrayView data);
218 void deadlineTimeout();
219
220private:
221 void prepareInitialRequest(QGrpcOperationContext *operationContext,
222 QGrpcHttp2ChannelPrivate *channel);
223
224 HPack::HttpHeader m_initialHeaders;
225 std::weak_ptr<QGrpcOperationContext> m_operation;
226 QQueue<QByteArray> m_queue;
227 QPointer<QHttp2Stream> m_stream;
228 ExpectedData m_expectedData;
229 State m_handlerState = Active;
230 const bool m_endStreamAtFirstData;
231 QTimer m_deadlineTimer;
232
233 Q_DISABLE_COPY_MOVE(Http2Handler)
234};
235
236class QGrpcHttp2ChannelPrivate : public QObject
237{
238 Q_OBJECT
239public:
240 explicit QGrpcHttp2ChannelPrivate(const QUrl &uri, QGrpcHttp2Channel *q);
241 ~QGrpcHttp2ChannelPrivate() override = default;
242
243 void processOperation(const std::shared_ptr<QGrpcOperationContext> &operationContext,
244 bool endStream = false);
245
246 void deleteHandler(Http2Handler *handler);
247 [[nodiscard]] bool isLocalSocket() const
248 {
249#if QT_CONFIG(localserver)
250 return m_isLocalSocket;
251#else
252 return false;
253#endif
254 }
255 [[nodiscard]] QByteArrayView contentType() const { return m_contentType; }
256
257 std::shared_ptr<QAbstractProtobufSerializer> serializer;
258 QUrl hostUri;
259 QGrpcHttp2Channel *q_ptr = nullptr;
260
261private:
262 enum ConnectionState { Connecting = 0, Connected, Error };
263
264 template <typename T>
265 static void connectErrorHandler(T *socket, QGrpcOperationContext *operationContext)
266 {
267 QObject::connect(socket, &T::errorOccurred, operationContext,
268 [operationContextPtr = QPointer(operationContext)](auto error) {
269 emit operationContextPtr->finished(status: QGrpcStatus{
270 StatusCode::Unavailable,
271 QGrpcHttp2ChannelPrivate::tr(s: "Network error occurred %1")
272 .arg(error) });
273 });
274 }
275
276 void sendInitialRequest(Http2Handler *handler);
277 void createHttp2Connection();
278 void handleSocketError();
279
280 template <typename T>
281 T *initSocket()
282 {
283 auto p = std::make_unique<T>();
284 T *typedSocket = p.get();
285 m_socket = std::move(p);
286 return typedSocket;
287 }
288
289 std::unique_ptr<QIODevice> m_socket = nullptr;
290 QHttp2Connection *m_connection = nullptr;
291 QList<Http2Handler *> m_activeHandlers;
292 QList<Http2Handler *> m_pendingHandlers;
293#if QT_CONFIG(localserver)
294 bool m_isLocalSocket = false;
295#endif
296 QByteArray m_contentType;
297 ConnectionState m_state = Connecting;
298 std::function<void()> m_reconnectFunction;
299
300 Q_DISABLE_COPY_MOVE(QGrpcHttp2ChannelPrivate)
301};
302
303///
304/// ## Http2Handler Implementations
305///
306
307Http2Handler::Http2Handler(const std::shared_ptr<QGrpcOperationContext> &operation,
308 QGrpcHttp2ChannelPrivate *parent, bool endStream)
309 : QObject(parent), m_operation(operation), m_endStreamAtFirstData(endStream)
310{
311 auto *channelOpPtr = operation.get();
312 QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::cancelRequested, context: this,
313 slot: &Http2Handler::cancel);
314 QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::writesDoneRequested, context: this,
315 slot: &Http2Handler::writesDone);
316 if (!m_endStreamAtFirstData) {
317 QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::writeMessageRequested, context: this,
318 slot: &Http2Handler::writeMessage);
319 }
320 QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::finished, context: &m_deadlineTimer,
321 slot: &QTimer::stop);
322 prepareInitialRequest(operationContext: channelOpPtr, channel: parent);
323}
324
325Http2Handler::~Http2Handler()
326{
327 if (m_stream) {
328 QHttp2Stream *streamPtr = m_stream.get();
329 m_stream.clear();
330 delete streamPtr;
331 }
332}
333
334// Attaches the HTTP/2 stream and sets up the necessary connections and
335// preconditions. The HTTP/2 connection is established, and the transport
336// is now ready for communication.
337void Http2Handler::attachStream(QHttp2Stream *stream_)
338{
339 Q_ASSERT(m_stream == nullptr);
340 Q_ASSERT(stream_ != nullptr);
341
342 auto *channelOpPtr = operation();
343 m_stream = stream_;
344
345 auto *parentChannel = qobject_cast<QGrpcHttp2ChannelPrivate *>(object: parent());
346 QObject::connect(sender: m_stream.get(), signal: &QHttp2Stream::headersReceived, context: channelOpPtr,
347 slot: [channelOpPtr, parentChannel, this](const HPack::HttpHeader &headers,
348 bool endStream) {
349 auto md = channelOpPtr->serverMetadata();
350 QtGrpc::StatusCode statusCode = StatusCode::Ok;
351 QString statusMessage;
352 for (const auto &header : headers) {
353 md.insert(key: header.name, value: header.value);
354 if (header.name == GrpcStatusHeader) {
355 statusCode = static_cast<
356 StatusCode>(QString::fromLatin1(ba: header.value).toShort());
357 } else if (header.name == GrpcStatusMessageHeader) {
358 statusMessage = QString::fromUtf8(ba: header.value);
359 }
360 }
361
362 channelOpPtr->setServerMetadata(std::move(md));
363
364 if (endStream) {
365 if (m_handlerState != Cancelled) {
366 emit channelOpPtr->finished(
367 status: QGrpcStatus{ statusCode,statusMessage });
368 }
369 parentChannel->deleteHandler(handler: this);
370 }
371 });
372
373 Q_ASSERT(parentChannel != nullptr);
374 auto errorConnection = std::make_shared<QMetaObject::Connection>();
375 *errorConnection = QObject::connect(
376 sender: m_stream.get(), signal: &QHttp2Stream::errorOccurred, context: parentChannel,
377 slot: [parentChannel, errorConnection, this](quint32 http2ErrorCode, const QString &errorString) {
378 if (!m_operation.expired()) {
379 auto channelOp = m_operation.lock();
380 emit channelOp->finished(status: QGrpcStatus{ http2ErrorToStatusCode(http2Error: http2ErrorCode),
381 errorString });
382 }
383 parentChannel->deleteHandler(handler: this);
384 QObject::disconnect(*errorConnection);
385 });
386
387 QObject::connect(sender: m_stream.get(), signal: &QHttp2Stream::dataReceived, context: channelOpPtr,
388 slot: [channelOpPtr, parentChannel, this](const QByteArray &data, bool endStream) {
389 if (m_handlerState != Cancelled) {
390 m_expectedData.container.append(a: data);
391
392 if (!m_expectedData.updateExpectedSize())
393 return;
394
395 while (m_expectedData.container.size()
396 >= m_expectedData.expectedSize) {
397 qGrpcDebug() << "Full data received:" << data.size()
398 << "dataContainer:" << m_expectedData.container.size()
399 << "capacity:" << m_expectedData.expectedSize;
400 emit channelOpPtr
401 ->messageReceived(data: m_expectedData.container
402 .mid(index: GrpcMessageSizeHeaderSize,
403 len: m_expectedData.expectedSize
404 - GrpcMessageSizeHeaderSize));
405 m_expectedData.container.remove(index: 0, len: m_expectedData.expectedSize);
406 m_expectedData.expectedSize = 0;
407 if (!m_expectedData.updateExpectedSize())
408 return;
409 }
410 if (endStream) {
411 m_handlerState = Finished;
412 emit channelOpPtr->finished(status: {});
413 parentChannel->deleteHandler(handler: this);
414 }
415 }
416 });
417
418 QObject::connect(sender: m_stream.get(), signal: &QHttp2Stream::uploadFinished, context: this,
419 slot: &Http2Handler::processQueue);
420
421 std::optional<std::chrono::milliseconds> deadline;
422 if (channelOpPtr->callOptions().deadlineTimeout())
423 deadline = channelOpPtr->callOptions().deadlineTimeout();
424 else if (parentChannel->q_ptr->channelOptions().deadlineTimeout())
425 deadline = parentChannel->q_ptr->channelOptions().deadlineTimeout();
426 if (deadline) {
427 // We have an active stream and a deadline. It's time to start the timer.
428 QObject::connect(sender: &m_deadlineTimer, signal: &QTimer::timeout, context: this, slot: &Http2Handler::deadlineTimeout);
429 m_deadlineTimer.start(value: *deadline);
430 }
431}
432
433QGrpcOperationContext *Http2Handler::operation() const
434{
435 Q_ASSERT(!m_operation.expired());
436
437 return m_operation.lock().get();
438}
439
440// Prepares the initial headers and enqueues the initial message.
441// The headers are sent once the HTTP/2 connection is established.
442void Http2Handler::prepareInitialRequest(QGrpcOperationContext *operationContext,
443 QGrpcHttp2ChannelPrivate *channel)
444{
445 const auto &channelOptions = channel->q_ptr->channelOptions();
446 QByteArray service{ operationContext->service().data(), operationContext->service().size() };
447 QByteArray method{ operationContext->method().data(), operationContext->method().size() };
448 m_initialHeaders = HPack::HttpHeader{
449 { AuthorityHeader.toByteArray(), channel->hostUri.host().toLatin1() },
450 { MethodHeader.toByteArray(), "POST"_ba },
451 { PathHeader.toByteArray(), QByteArray('/' + service + '/' + method) },
452 { SchemeHeader.toByteArray(),
453 channel->isLocalSocket() ? "http"_ba : channel->hostUri.scheme().toLatin1() },
454 { ContentTypeHeader.toByteArray(), channel->contentType().toByteArray() },
455 { GrpcServiceNameHeader.toByteArray(), { service } },
456 { GrpcAcceptEncodingHeader.toByteArray(), "identity,deflate,gzip"_ba },
457 { AcceptEncodingHeader.toByteArray(), "identity,gzip"_ba },
458 { TEHeader.toByteArray(), "trailers"_ba },
459 };
460
461 auto iterateMetadata = [this](const auto &metadata) {
462 for (const auto &[key, value] : metadata.asKeyValueRange()) {
463 const auto lowerKey = key.toLower();
464 if (lowerKey == AuthorityHeader || lowerKey == MethodHeader || lowerKey == PathHeader
465 || lowerKey == SchemeHeader || lowerKey == ContentTypeHeader) {
466 continue;
467 }
468 m_initialHeaders.emplace_back(lowerKey, value);
469 }
470 };
471
472 iterateMetadata(channelOptions.metadata());
473 iterateMetadata(operationContext->callOptions().metadata());
474
475 writeMessage(data: operationContext->argument());
476}
477
478// Slot to enqueue a writeMessage request, either from the initial message
479// or from the user in client/bidirectional streaming RPCs.
480void Http2Handler::writeMessage(QByteArrayView data)
481{
482 if (m_handlerState != Active || isStreamClosedForSending()) {
483 qGrpcDebug("Attempt sending data to the ended stream");
484 return;
485 }
486
487 QByteArray msg(GrpcMessageSizeHeaderSize + data.size(), '\0');
488 // Args must be 4-byte unsigned int to fit into 4-byte big endian
489 qToBigEndian(src: static_cast<quint32>(data.size()), dest: msg.data() + 1);
490
491 // protect against nullptr data.
492 if (!data.isEmpty()) {
493 std::memcpy(dest: msg.begin() + GrpcMessageSizeHeaderSize, src: data.begin(),
494 n: static_cast<size_t>(data.size()));
495 }
496
497 m_queue.enqueue(t: msg);
498 processQueue();
499}
500
501// Sends the initial headers and processes the message queue containing the
502// initial message. At this point, the HTTP/2 connection is established, and
503// the stream is attached.
504void Http2Handler::sendInitialRequest()
505{
506 Q_ASSERT(!m_initialHeaders.empty());
507 Q_ASSERT(m_stream);
508
509 if (!m_stream->sendHEADERS(headers: m_initialHeaders, endStream: false)) {
510 operationContextAsyncError(operationContext: operation(),
511 status: QGrpcStatus{
512 StatusCode::Unavailable,
513 tr(s: "Unable to send initial headers to an HTTP/2 stream") });
514 return;
515 }
516 m_initialHeaders.clear();
517 processQueue();
518}
519
520// The core logic for sending the already serialized data through the HTTP/2 stream.
521// This function is invoked either by the user via writeMessageRequested() or
522// writesDoneRequested(), or it is continuously polled after the previous uploadFinished()
523void Http2Handler::processQueue()
524{
525 if (!m_stream)
526 return;
527
528 if (m_stream->isUploadingDATA())
529 return;
530
531 if (m_queue.isEmpty())
532 return;
533
534 // Take ownership of the byte device.
535 auto *device = QNonContiguousByteDeviceFactory::create(byteArray: m_queue.dequeue());
536 device->setParent(m_stream);
537
538 m_stream->sendDATA(device, endStream: device->atEnd() || m_endStreamAtFirstData);
539 // Manage the lifetime through the uploadFinished signal (or this
540 // Http2Handler). Don't use QObject::deleteLater here as this function is
541 // expensive and blocking. Delete the byte device directly.
542 // This is fine in this context.
543 connect(sender: m_stream.get(), signal: &QHttp2Stream::uploadFinished, context: device, slot: [device] { delete device; });
544}
545
546bool Http2Handler::cancel()
547{
548 if (m_handlerState != Active || !m_stream)
549 return false;
550 m_handlerState = Cancelled;
551
552 // Client cancelled the stream before the deadline exceeded.
553 m_deadlineTimer.stop();
554
555 // Immediate cancellation by sending the RST_STREAM frame.
556 return m_stream->sendRST_STREAM(errorCode: Http2::Http2Error::CANCEL);
557}
558
559void Http2Handler::writesDone()
560{
561 if (m_handlerState != Active)
562 return;
563
564 m_handlerState = Finished;
565
566 // Stream is already (half)closed, skip sending the DATA frame with the end-of-stream flag.
567 if (isStreamClosedForSending())
568 return;
569
570 m_queue.enqueue(t: {});
571 processQueue();
572}
573
574void Http2Handler::deadlineTimeout()
575{
576 Q_ASSERT_X(m_stream, "onDeadlineTimeout", "stream is not available");
577
578 if (m_operation.expired()) {
579 qGrpcWarning("Operation expired on deadline timeout");
580 return;
581 }
582 // cancel the stream by sending the RST_FRAME and report
583 // the status back to our user.
584 if (cancel()) {
585 emit m_operation.lock()->finished(status: { StatusCode::DeadlineExceeded,
586 "Deadline Exceeded" });
587 } else {
588 qGrpcWarning("Cancellation failed on deadline timeout.");
589 }
590}
591
592///
593/// ## QGrpcHttp2ChannelPrivate Implementations
594///
595
596QGrpcHttp2ChannelPrivate::QGrpcHttp2ChannelPrivate(const QUrl &uri, QGrpcHttp2Channel *q)
597 : hostUri(uri), q_ptr(q)
598{
599 auto channelOptions = q_ptr->channelOptions();
600 auto formatSuffix = channelOptions.serializationFormat().suffix();
601 const QByteArray defaultContentType = DefaultContentType.toByteArray();
602 const QByteArray contentTypeFromOptions = !formatSuffix.isEmpty()
603 ? defaultContentType + '+' + formatSuffix
604 : defaultContentType;
605 bool warnAboutFormatConflict = !formatSuffix.isEmpty();
606
607 const auto it = channelOptions.metadata().constFind(key: ContentTypeHeader.data());
608 if (it != channelOptions.metadata().cend()) {
609 if (formatSuffix.isEmpty() && it.value() != DefaultContentType) {
610 if (it.value() == "application/grpc+json") {
611 channelOptions.setSerializationFormat(SerializationFormat::Json);
612 } else if (it.value() == "application/grpc+proto" || it.value() == DefaultContentType) {
613 channelOptions.setSerializationFormat(SerializationFormat::Protobuf);
614 } else {
615 qGrpcWarning() << "Cannot choose the serializer for " << ContentTypeHeader
616 << it.value() << ". Using protobuf format as the default one.";
617 channelOptions.setSerializationFormat(SerializationFormat::Default);
618 }
619 q_ptr->setChannelOptions(channelOptions);
620 } else if (it.value() != contentTypeFromOptions) {
621 warnAboutFormatConflict = true;
622 } else {
623 warnAboutFormatConflict = false;
624 }
625 } else {
626 warnAboutFormatConflict = false;
627 }
628
629 if (formatSuffix == channelOptions.serializationFormat().suffix()) { // no change
630 m_contentType = contentTypeFromOptions;
631 } else { // format has changed, update content type
632 m_contentType = !channelOptions.serializationFormat().suffix().isEmpty()
633 ? defaultContentType + '+' + channelOptions.serializationFormat().suffix()
634 : defaultContentType;
635 }
636
637 if (warnAboutFormatConflict) {
638 qGrpcWarning()
639 << "Manually specified serialization format '%1' doesn't match the %2 header value "
640 "'%3'"_L1.arg(args: QString::fromLatin1(ba: contentTypeFromOptions),
641 args: QString::fromLatin1(ba: ContentTypeHeader),
642 args: QString::fromLatin1(ba: it.value()));
643 }
644
645#if QT_CONFIG(localserver)
646 if (hostUri.scheme() == "unix"_L1) {
647 auto *localSocket = initSocket<QLocalSocket>();
648 m_isLocalSocket = true;
649
650 QObject::connect(sender: localSocket, signal: &QLocalSocket::connected, context: this,
651 slot: &QGrpcHttp2ChannelPrivate::createHttp2Connection);
652 QObject::connect(sender: localSocket, signal: &QLocalSocket::errorOccurred, context: this,
653 slot: [this](QLocalSocket::LocalSocketError error) {
654 qGrpcDebug()
655 << "Error occurred(" << error << "):"
656 << static_cast<QLocalSocket *>(m_socket.get())->errorString()
657 << hostUri;
658 handleSocketError();
659 });
660 m_reconnectFunction = [localSocket, this] {
661 localSocket->connectToServer(name: hostUri.host() + hostUri.path());
662 };
663 } else
664#endif
665#if QT_CONFIG(ssl)
666 if (hostUri.scheme() == "https"_L1 || channelOptions.sslConfiguration()) {
667 auto *sslSocket = initSocket<QSslSocket>();
668 if (hostUri.port() < 0) {
669 hostUri.setPort(443);
670 }
671
672 if (const auto userSslConfig = channelOptions.sslConfiguration(); userSslConfig) {
673 sslSocket->setSslConfiguration(*userSslConfig);
674 } else {
675 static const QByteArray h2NexProtocol = "h2"_ba;
676 auto defautlSslConfig = QSslConfiguration::defaultConfiguration();
677 auto allowedNextProtocols = defautlSslConfig.allowedNextProtocols();
678 if (!allowedNextProtocols.contains(t: h2NexProtocol))
679 allowedNextProtocols.append(t: h2NexProtocol);
680 defautlSslConfig.setAllowedNextProtocols(allowedNextProtocols);
681 sslSocket->setSslConfiguration(defautlSslConfig);
682 }
683
684 QObject::connect(sender: sslSocket, signal: &QSslSocket::encrypted, context: this,
685 slot: &QGrpcHttp2ChannelPrivate::createHttp2Connection);
686 QObject::connect(sender: sslSocket, signal: &QAbstractSocket::errorOccurred, context: this,
687 slot: [this](QAbstractSocket::SocketError error) {
688 qDebug()
689 << "Error occurred(" << error << "):"
690 << static_cast<QAbstractSocket *>(m_socket.get())->errorString()
691 << hostUri;
692 handleSocketError();
693 });
694 m_reconnectFunction = [sslSocket, this] {
695 sslSocket->connectToHostEncrypted(hostName: hostUri.host(), port: static_cast<quint16>(hostUri.port()));
696 };
697 } else
698#endif
699 {
700 if (hostUri.scheme() != "http"_L1) {
701 qGrpcWarning() << "Unsupported transport protocol scheme '" << hostUri.scheme()
702 << "'. Fall back to 'http'.";
703 }
704
705 auto *httpSocket = initSocket<QTcpSocket>();
706 if (hostUri.port() < 0) {
707 hostUri.setPort(80);
708 }
709
710 QObject::connect(sender: httpSocket, signal: &QAbstractSocket::connected, context: this,
711 slot: &QGrpcHttp2ChannelPrivate::createHttp2Connection);
712 QObject::connect(sender: httpSocket, signal: &QAbstractSocket::errorOccurred, context: this,
713 slot: [this](QAbstractSocket::SocketError error) {
714 qGrpcDebug()
715 << "Error occurred(" << error << "):"
716 << static_cast<QAbstractSocket *>(m_socket.get())->errorString()
717 << hostUri;
718 handleSocketError();
719 });
720 m_reconnectFunction = [httpSocket, this] {
721 httpSocket->connectToHost(hostName: hostUri.host(), port: static_cast<quint16>(hostUri.port()));
722 };
723 }
724 m_reconnectFunction();
725}
726
727void QGrpcHttp2ChannelPrivate::processOperation(const std::shared_ptr<QGrpcOperationContext>
728 &operationContext,
729 bool endStream)
730{
731 auto *operationContextPtr = operationContext.get();
732 Q_ASSERT_X(operationContextPtr != nullptr, "QGrpcHttp2ChannelPrivate::processOperation",
733 "operation context is nullptr.");
734
735 if (!m_socket->isWritable()) {
736 operationContextAsyncError(operationContext: operationContextPtr,
737 status: QGrpcStatus{ StatusCode::Unavailable,
738 m_socket->errorString() });
739 return;
740 }
741
742#if QT_CONFIG(localserver)
743 if (m_isLocalSocket) {
744 connectErrorHandler<QLocalSocket>(socket: static_cast<QLocalSocket *>(m_socket.get()),
745 operationContext: operationContextPtr);
746 } else
747#endif
748 {
749 connectErrorHandler<QAbstractSocket>(socket: static_cast<QAbstractSocket *>(m_socket.get()),
750 operationContext: operationContextPtr);
751 }
752
753 auto *handler = new Http2Handler(operationContext, this, endStream);
754 if (m_connection == nullptr) {
755 m_pendingHandlers.push_back(t: handler);
756 } else {
757 sendInitialRequest(handler);
758 m_activeHandlers.push_back(t: handler);
759 }
760
761 if (m_state == ConnectionState::Error) {
762 Q_ASSERT_X(m_reconnectFunction, "QGrpcHttp2ChannelPrivate::processOperation",
763 "Socket reconnection function is not defined.");
764 m_reconnectFunction();
765 m_state = ConnectionState::Connecting;
766 }
767}
768
769void QGrpcHttp2ChannelPrivate::createHttp2Connection()
770{
771 Q_ASSERT_X(m_connection == nullptr, "QGrpcHttp2ChannelPrivate::createHttp2Connection",
772 "Attempt to create the HTTP/2 connection, but it already exists. This situation is "
773 "exceptional.");
774 m_connection = QHttp2Connection::createDirectConnection(socket: m_socket.get(), config: {});
775
776 if (m_connection) {
777 QObject::connect(sender: m_socket.get(), signal: &QAbstractSocket::readyRead, context: m_connection,
778 slot: &QHttp2Connection::handleReadyRead);
779 m_state = ConnectionState::Connected;
780 }
781
782 for (const auto &handler : m_pendingHandlers) {
783 if (handler->expired()) {
784 delete handler;
785 continue;
786 }
787 sendInitialRequest(handler);
788 }
789 m_activeHandlers.append(l: m_pendingHandlers);
790 m_pendingHandlers.clear();
791}
792
793void QGrpcHttp2ChannelPrivate::handleSocketError()
794{
795 qDeleteAll(c: m_activeHandlers);
796 m_activeHandlers.clear();
797 qDeleteAll(c: m_pendingHandlers);
798 m_pendingHandlers.clear();
799 delete m_connection;
800 m_connection = nullptr;
801 m_state = ConnectionState::Error;
802}
803
804void QGrpcHttp2ChannelPrivate::sendInitialRequest(Http2Handler *handler)
805{
806 Q_ASSERT(handler != nullptr);
807 auto *channelOpPtr = handler->operation();
808 if (!m_connection) {
809 operationContextAsyncError(operationContext: channelOpPtr,
810 status: QGrpcStatus{
811 StatusCode::Unavailable,
812 tr(s: "Unable to establish an HTTP/2 connection") });
813 return;
814 }
815
816 const auto streamAttempt = m_connection->createStream();
817 if (!streamAttempt.ok()) {
818 operationContextAsyncError(operationContext: channelOpPtr,
819 status: QGrpcStatus{
820 StatusCode::Unavailable,
821 tr(s: "Unable to create an HTTP/2 stream (%1)")
822 .arg(a: QDebug::toString(object: streamAttempt.error())) });
823 return;
824 }
825 handler->attachStream(stream_: streamAttempt.unwrap());
826 handler->sendInitialRequest();
827}
828
829void QGrpcHttp2ChannelPrivate::deleteHandler(Http2Handler *handler)
830{
831 const auto it = std::find(first: m_activeHandlers.constBegin(), last: m_activeHandlers.constEnd(), val: handler);
832 if (it == m_activeHandlers.constEnd())
833 return;
834 handler->deleteLater();
835 m_activeHandlers.erase(pos: it);
836}
837
838///
839/// ## QGrpcHttp2Channel Implementations
840///
841
842/*!
843 Constructs QGrpcHttp2Channel with \a hostUri.
844*/
845QGrpcHttp2Channel::QGrpcHttp2Channel(const QUrl &hostUri)
846 : d_ptr(std::make_unique<QGrpcHttp2ChannelPrivate>(args: hostUri, args: this))
847{
848}
849
850/*!
851 Constructs QGrpcHttp2Channel with \a hostUri and \a options.
852*/
853QGrpcHttp2Channel::QGrpcHttp2Channel(const QUrl &hostUri, const QGrpcChannelOptions &options)
854 : QAbstractGrpcChannel(options),
855 d_ptr(std::make_unique<QGrpcHttp2ChannelPrivate>(args: hostUri, args: this))
856{
857}
858
859/*!
860 Destroys the QGrpcHttp2Channel object.
861*/
862QGrpcHttp2Channel::~QGrpcHttp2Channel() = default;
863
864/*!
865 Returns the host URI for this channel.
866*/
867QUrl QGrpcHttp2Channel::hostUri() const
868{
869 return d_ptr->hostUri;
870}
871
872/*!
873 \internal
874 Implementation of unary gRPC call based on \l QNetworkAccessManager.
875*/
876void QGrpcHttp2Channel::call(std::shared_ptr<QGrpcOperationContext> operationContext)
877{
878 d_ptr->processOperation(operationContext, endStream: true);
879}
880
881/*!
882 \internal
883 Implementation of server-side gRPC stream based on \l QNetworkAccessManager.
884*/
885void QGrpcHttp2Channel::serverStream(std::shared_ptr<QGrpcOperationContext> operationContext)
886{
887 d_ptr->processOperation(operationContext, endStream: true);
888}
889
890/*!
891 \internal
892 Implementation of client-side gRPC stream based on \l QNetworkAccessManager.
893*/
894void QGrpcHttp2Channel::clientStream(std::shared_ptr<QGrpcOperationContext> operationContext)
895{
896 d_ptr->processOperation(operationContext);
897}
898
899/*!
900 \internal
901 Implementation of bidirectional gRPC stream based on \l QNetworkAccessManager.
902*/
903void QGrpcHttp2Channel::bidiStream(std::shared_ptr<QGrpcOperationContext> operationContext)
904{
905 d_ptr->processOperation(operationContext);
906}
907
908/*!
909 Returns the newly created QProtobufSerializer shared pointer.
910*/
911std::shared_ptr<QAbstractProtobufSerializer> QGrpcHttp2Channel::serializer() const
912{
913 return channelOptions().serializationFormat().serializer();
914}
915
916QT_END_NAMESPACE
917
918#include "qgrpchttp2channel.moc"
919

Provided by KDAB

Privacy Policy
Start learning QML with our Intro Training
Find out more

source code of qtgrpc/src/grpc/qgrpchttp2channel.cpp