1// Copyright (C) 2022 The Qt Company Ltd.
2// Copyright (C) 2019 Alexey Edelev <semlanik@gmail.com>, Viktor Kopp <vifactor@gmail.com>
3// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only
4
5#include <QtCore/QEventLoop>
6#include <QtCore/QMetaObject>
7#include <QtCore/QTimer>
8#include <QtCore/QUrl>
9#include <QtGrpc/qabstractgrpcclient.h>
10#include <QtGrpc/qgrpccallreply.h>
11#include <QtGrpc/qgrpcstream.h>
12#include <QtNetwork/QNetworkAccessManager>
13#include <QtNetwork/QNetworkReply>
14#include <QtNetwork/QNetworkRequest>
15#include <QtProtobuf/qprotobufserializer.h>
16#include <qtgrpcglobal_p.h>
17
18#include <unordered_map>
19
20#include "qgrpchttp2channel.h"
21
22QT_BEGIN_NAMESPACE
23
24using namespace Qt::StringLiterals;
25
26/*!
27 \class QGrpcHttp2Channel
28 \inmodule QtGrpc
29
30 \brief The QGrpcHttp2Channel class is an HTTP/2 implementation of
31 QAbstractGrpcChannel interface.
32
33 QGrpcHttp2Channel utilizes channel and call credentials.
34 Channel credential QGrpcHttp2Channel supports SslConfigCredential key.
35 When HTTPS is used, this key has to be explicitly specified and provide
36 QSslConfiguration and value. The QSslConfiguration provided will be used
37 to establish HTTP/2 secured connection. All keys passed as
38 QGrpcCallCredentials will be used as HTTP/2 headers with related
39 values assigned.
40*/
41
42// This QNetworkReply::NetworkError -> QGrpcStatus::StatusCode mapping should be kept in sync
43// with original https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
44const static std::unordered_map<QNetworkReply::NetworkError, QGrpcStatus::StatusCode>
45 StatusCodeMap = {
46 { QNetworkReply::ConnectionRefusedError, QGrpcStatus::Unavailable },
47 { QNetworkReply::RemoteHostClosedError, QGrpcStatus::Unavailable },
48 { QNetworkReply::HostNotFoundError, QGrpcStatus::Unavailable },
49 { QNetworkReply::TimeoutError, QGrpcStatus::DeadlineExceeded },
50 { QNetworkReply::OperationCanceledError, QGrpcStatus::Unavailable },
51 { QNetworkReply::SslHandshakeFailedError, QGrpcStatus::PermissionDenied },
52 { QNetworkReply::TemporaryNetworkFailureError, QGrpcStatus::Unknown },
53 { QNetworkReply::NetworkSessionFailedError, QGrpcStatus::Unavailable },
54 { QNetworkReply::BackgroundRequestNotAllowedError, QGrpcStatus::Unknown },
55 { QNetworkReply::TooManyRedirectsError, QGrpcStatus::Unavailable },
56 { QNetworkReply::InsecureRedirectError, QGrpcStatus::PermissionDenied },
57 { QNetworkReply::UnknownNetworkError, QGrpcStatus::Unknown },
58 { QNetworkReply::ProxyConnectionRefusedError, QGrpcStatus::Unavailable },
59 { QNetworkReply::ProxyConnectionClosedError, QGrpcStatus::Unavailable },
60 { QNetworkReply::ProxyNotFoundError, QGrpcStatus::Unavailable },
61 { QNetworkReply::ProxyTimeoutError, QGrpcStatus::DeadlineExceeded },
62 { QNetworkReply::ProxyAuthenticationRequiredError, QGrpcStatus::Unauthenticated },
63 { QNetworkReply::UnknownProxyError, QGrpcStatus::Unknown },
64 { QNetworkReply::ContentAccessDenied, QGrpcStatus::PermissionDenied },
65 { QNetworkReply::ContentOperationNotPermittedError, QGrpcStatus::PermissionDenied },
66 { QNetworkReply::ContentNotFoundError, QGrpcStatus::NotFound },
67 { QNetworkReply::AuthenticationRequiredError, QGrpcStatus::PermissionDenied },
68 { QNetworkReply::ContentReSendError, QGrpcStatus::DataLoss },
69 { QNetworkReply::ContentConflictError, QGrpcStatus::InvalidArgument },
70 { QNetworkReply::ContentGoneError, QGrpcStatus::DataLoss },
71 { QNetworkReply::UnknownContentError, QGrpcStatus::Unknown },
72 { QNetworkReply::ProtocolUnknownError, QGrpcStatus::Unknown },
73 { QNetworkReply::ProtocolInvalidOperationError, QGrpcStatus::Unimplemented },
74 { QNetworkReply::ProtocolFailure, QGrpcStatus::Unknown },
75 { QNetworkReply::InternalServerError, QGrpcStatus::Internal },
76 { QNetworkReply::OperationNotImplementedError, QGrpcStatus::Unimplemented },
77 { QNetworkReply::ServiceUnavailableError, QGrpcStatus::Unavailable },
78 { QNetworkReply::UnknownServerError, QGrpcStatus::Unknown }
79 };
80
81constexpr char GrpcAcceptEncodingHeader[] = "grpc-accept-encoding";
82constexpr char AcceptEncodingHeader[] = "accept-encoding";
83constexpr char TEHeader[] = "te";
84constexpr char GrpcStatusHeader[] = "grpc-status";
85constexpr char GrpcStatusMessageHeader[] = "grpc-message";
86constexpr qsizetype GrpcMessageSizeHeaderSize = 5;
87
88static void addMetadataToRequest(QNetworkRequest *request, const QGrpcMetadata &channelMetadata,
89 const QGrpcMetadata &callMetadata)
90{
91 auto iterateMetadata = [&request](const auto &metadata) {
92 for (const auto &[key, value] : std::as_const(metadata)) {
93 request->setRawHeader(headerName: key, value);
94 }
95 };
96
97 iterateMetadata(channelMetadata);
98 iterateMetadata(callMetadata);
99}
100
101static QGrpcMetadata collectMetadata(QNetworkReply *networkReply)
102{
103 return QGrpcMetadata(networkReply->rawHeaderPairs().begin(),
104 networkReply->rawHeaderPairs().end());
105}
106
107static std::optional<std::chrono::milliseconds> deadlineForCall(
108 const QGrpcChannelOptions &channelOptions, const QGrpcCallOptions &callOptions)
109{
110 if (callOptions.deadline())
111 return *callOptions.deadline();
112 if (channelOptions.deadline())
113 return *channelOptions.deadline();
114 return std::nullopt;
115}
116
117struct QGrpcHttp2ChannelPrivate
118{
119 struct ExpectedData
120 {
121 qsizetype expectedSize;
122 QByteArray container;
123 };
124
125 QNetworkAccessManager nm;
126 QGrpcChannelOptions channelOptions;
127#if QT_CONFIG(ssl)
128 QSslConfiguration sslConfig;
129#endif
130 std::unordered_map<QNetworkReply *, ExpectedData> activeStreamReplies;
131 QObject lambdaContext;
132
133 QNetworkReply *post(QLatin1StringView method, QLatin1StringView service, QByteArrayView args,
134 const QGrpcCallOptions &callOptions)
135 {
136 QUrl callUrl = channelOptions.host();
137 callUrl.setPath(path: "/%1/%2"_L1.arg(args&: service, args&: method));
138
139 qGrpcDebug() << "Service call url:" << callUrl;
140 QNetworkRequest request(callUrl);
141 request.setHeader(header: QNetworkRequest::ContentTypeHeader, value: QVariant("application/grpc"_L1));
142 request.setRawHeader(headerName: GrpcAcceptEncodingHeader, value: "identity,deflate,gzip");
143 request.setRawHeader(headerName: AcceptEncodingHeader, value: "identity,gzip");
144 request.setRawHeader(headerName: TEHeader, value: "trailers");
145#if QT_CONFIG(ssl)
146 request.setSslConfiguration(sslConfig);
147#endif
148
149 addMetadataToRequest(request: &request, channelMetadata: channelOptions.metadata(), callMetadata: callOptions.metadata());
150
151 request.setAttribute(code: QNetworkRequest::Http2DirectAttribute, value: true);
152
153 QByteArray msg(GrpcMessageSizeHeaderSize, '\0');
154 // Args must be 4-byte unsigned int to fit into 4-byte big endian
155 qToBigEndian(src: static_cast<quint32>(args.size()), dest: msg.data() + 1);
156 msg += args;
157 qGrpcDebug() << "SEND msg with size:" << msg.size();
158
159 QNetworkReply *networkReply = nm.post(request, data: msg);
160#if QT_CONFIG(ssl)
161 QObject::connect(sender: networkReply, signal: &QNetworkReply::sslErrors, context: networkReply,
162 slot: [networkReply](const QList<QSslError> &errors) {
163 qGrpcCritical() << errors;
164 // TODO: filter out noncritical SSL handshake errors
165 // FIXME: error due to ssl failure is not transferred to the client:
166 // last error will be Operation canceled
167 QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
168 });
169#endif
170 if (auto deadline = deadlineForCall(channelOptions, callOptions)) {
171 QTimer::singleShot(interval: *deadline, receiver: networkReply, slot: [networkReply] {
172 QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
173 });
174 }
175 return networkReply;
176 }
177
178 static void abortNetworkReply(QNetworkReply *networkReply)
179 {
180 if (networkReply->isRunning())
181 networkReply->abort();
182 else
183 networkReply->deleteLater();
184 }
185
186 static QByteArray processReply(QNetworkReply *networkReply, QGrpcStatus::StatusCode &statusCode)
187 {
188 // Check if no network error occurred
189 if (networkReply->error() != QNetworkReply::NoError) {
190 statusCode = StatusCodeMap.at(k: networkReply->error());
191 return {};
192 }
193
194 // Check if server answer with error
195 statusCode = static_cast<QGrpcStatus::StatusCode>(
196 networkReply->rawHeader(headerName: GrpcStatusHeader).toInt());
197 if (statusCode != QGrpcStatus::StatusCode::Ok)
198 return {};
199
200 // Message size doesn't matter for now
201 return networkReply->readAll().mid(index: GrpcMessageSizeHeaderSize);
202 }
203
204 QGrpcHttp2ChannelPrivate(const QGrpcChannelOptions &options) : channelOptions(options)
205 {
206#if QT_CONFIG(ssl)
207 if (channelOptions.host().scheme() == "https"_L1) {
208 // HTTPS connection requested but not ssl configuration provided.
209 Q_ASSERT(channelOptions.sslConfiguration());
210 sslConfig = *channelOptions.sslConfiguration();
211 } else if (channelOptions.host().scheme().isEmpty()) {
212 auto tmpHost = channelOptions.host();
213 tmpHost.setScheme("http"_L1);
214 channelOptions.withHost(host: tmpHost);
215 }
216#else
217 auto tmpHost = channelOptions.host();
218 tmpHost.setScheme("http"_L1);
219 channelOptions.withHost(tmpHost);
220#endif
221 }
222
223 static int getExpectedDataSize(QByteArrayView container)
224 {
225 return qFromBigEndian(source: *reinterpret_cast<const quint32 *>(container.data() + 1))
226 + GrpcMessageSizeHeaderSize;
227 }
228};
229
230/*!
231 QGrpcHttp2Channel constructs QGrpcHttp2Channel with \a options.
232*/
233QGrpcHttp2Channel::QGrpcHttp2Channel(const QGrpcChannelOptions &options)
234 : QAbstractGrpcChannel(), dPtr(std::make_unique<QGrpcHttp2ChannelPrivate>(args: options))
235{
236}
237
238/*!
239 Destroys the QGrpcHttp2Channel object.
240*/
241QGrpcHttp2Channel::~QGrpcHttp2Channel() = default;
242
243/*!
244 Synchronously calls the RPC method and writes the result to the output parameter \a ret.
245
246 The RPC method name is constructed by concatenating the \a method
247 and \a service parameters and called with the \a args argument.
248 Uses \a options argument to set additional parameter for the call.
249*/
250QGrpcStatus QGrpcHttp2Channel::call(QLatin1StringView method, QLatin1StringView service,
251 QByteArrayView args, QByteArray &ret,
252 const QGrpcCallOptions &options)
253{
254 QEventLoop loop;
255
256 QNetworkReply *networkReply = dPtr->post(method, service, args, callOptions: options);
257 QObject::connect(sender: networkReply, signal: &QNetworkReply::finished, context: &loop, slot: &QEventLoop::quit);
258
259 // If reply was finished in same stack it doesn't make sense to start event loop
260 if (!networkReply->isFinished())
261 loop.exec();
262
263 QGrpcStatus::StatusCode grpcStatus = QGrpcStatus::StatusCode::Unknown;
264 ret = dPtr->processReply(networkReply, statusCode&: grpcStatus);
265
266 networkReply->deleteLater();
267 qGrpcDebug() << __func__ << "RECV:" << ret.toHex() << "grpcStatus" << grpcStatus;
268 return { grpcStatus, QString::fromUtf8(ba: networkReply->rawHeader(headerName: GrpcStatusMessageHeader)) };
269}
270
271/*!
272 Asynchronously calls the RPC method.
273
274 The RPC method name is constructed by concatenating the \a method
275 and \a service parameters and called with the \a args argument.
276 Uses \a options argument to set additional parameter for the call.
277 The method can emit QGrpcCallReply::finished() and QGrpcCallReply::errorOccurred()
278 signals on a QGrpcCallReply returned object.
279*/
280std::shared_ptr<QGrpcCallReply> QGrpcHttp2Channel::call(QLatin1StringView method,
281 QLatin1StringView service,
282 QByteArrayView args,
283 const QGrpcCallOptions &options)
284{
285 std::shared_ptr<QGrpcCallReply> reply(new QGrpcCallReply(serializer()),
286 [](QGrpcCallReply *reply) { reply->deleteLater(); });
287
288 QNetworkReply *networkReply = dPtr->post(method, service, args, callOptions: options);
289
290 auto connection = std::make_shared<QMetaObject::Connection>();
291 auto abortConnection = std::make_shared<QMetaObject::Connection>();
292
293 *connection = QObject::connect(
294 sender: networkReply, signal: &QNetworkReply::finished, context: reply.get(),
295 slot: [reply, networkReply, connection, abortConnection] {
296 QGrpcStatus::StatusCode grpcStatus = QGrpcStatus::StatusCode::Unknown;
297 QByteArray data = QGrpcHttp2ChannelPrivate::processReply(networkReply, statusCode&: grpcStatus);
298 reply->setMetadata(collectMetadata(networkReply));
299 QObject::disconnect(*connection);
300 QObject::disconnect(*abortConnection);
301
302 qGrpcDebug() << "RECV:" << data;
303 if (QGrpcStatus::StatusCode::Ok == grpcStatus) {
304 reply->setData(data);
305 emit reply->finished();
306 } else {
307 reply->setData({});
308 emit reply->errorOccurred(status: { grpcStatus,
309 QLatin1StringView(networkReply->rawHeader(
310 headerName: GrpcStatusMessageHeader)) });
311 }
312 networkReply->deleteLater();
313 });
314
315 *abortConnection = QObject::connect(sender: reply.get(), signal: &QGrpcCallReply::errorOccurred, context: networkReply,
316 slot: [networkReply, connection,
317 abortConnection](const QGrpcStatus &status) {
318 if (status.code() == QGrpcStatus::Aborted) {
319 QObject::disconnect(*connection);
320 QObject::disconnect(*abortConnection);
321
322 networkReply->deleteLater();
323 }
324 });
325 return reply;
326}
327
328/*!
329 Creates and starts a stream to the RPC method.
330
331 The RPC method name is constructed by concatenating the \a method
332 and \a service parameters and called with the \a arg argument.
333 Returns a shared pointer to the QGrpcStream. Uses \a options argument
334 to set additional parameter for the stream.
335
336 Calls QGrpcStream::updateData() when the stream receives data from the server.
337 The method may emit QGrpcStream::errorOccurred() when the stream has terminated with an error.
338*/
339std::shared_ptr<QGrpcStream> QGrpcHttp2Channel::startStream(QLatin1StringView method,
340 QLatin1StringView service,
341 QByteArrayView arg,
342 const QGrpcCallOptions &options)
343{
344 QNetworkReply *networkReply = dPtr->post(method, service, args: arg, callOptions: options);
345
346 std::shared_ptr<QGrpcStream> grpcStream(new QGrpcStream(method, arg, serializer()));
347 auto finishConnection = std::make_shared<QMetaObject::Connection>();
348 auto abortConnection = std::make_shared<QMetaObject::Connection>();
349 auto readConnection = std::make_shared<QMetaObject::Connection>();
350
351 *readConnection = QObject::connect(
352 sender: networkReply, signal: &QNetworkReply::readyRead, context: grpcStream.get(),
353 slot: [networkReply, grpcStream, this] {
354 auto replyIt = dPtr->activeStreamReplies.find(x: networkReply);
355
356 const QByteArray data = networkReply->readAll();
357 qGrpcDebug() << "RECV data size:" << data.size();
358
359 if (replyIt == dPtr->activeStreamReplies.end()) {
360 qGrpcDebug() << data.toHex();
361 int expectedDataSize = QGrpcHttp2ChannelPrivate::getExpectedDataSize(container: data);
362 qGrpcDebug() << "First chunk received:" << data.size()
363 << "expectedDataSize:" << expectedDataSize;
364
365 if (expectedDataSize == 0) {
366 grpcStream->updateData(data: QByteArray());
367 return;
368 }
369
370 QGrpcHttp2ChannelPrivate::ExpectedData dataContainer{ .expectedSize: expectedDataSize,
371 .container: QByteArray{} };
372 replyIt = dPtr->activeStreamReplies.insert(x: { networkReply, dataContainer })
373 .first;
374 }
375
376 QGrpcHttp2ChannelPrivate::ExpectedData &dataContainer = replyIt->second;
377 dataContainer.container.append(a: data);
378
379 qGrpcDebug() << "Processeded chunk:" << data.size()
380 << "dataContainer:" << dataContainer.container.size()
381 << "capacity:" << dataContainer.expectedSize;
382 while (dataContainer.container.size() >= dataContainer.expectedSize
383 && !networkReply->isFinished()) {
384 qGrpcDebug() << "Full data received:" << data.size()
385 << "dataContainer:" << dataContainer.container.size()
386 << "capacity:" << dataContainer.expectedSize;
387 grpcStream->setMetadata(collectMetadata(networkReply));
388 grpcStream->updateData(data: dataContainer.container.mid(
389 index: GrpcMessageSizeHeaderSize,
390 len: dataContainer.expectedSize - GrpcMessageSizeHeaderSize));
391 dataContainer.container.remove(index: 0, len: dataContainer.expectedSize);
392 if (dataContainer.container.size() > GrpcMessageSizeHeaderSize) {
393 dataContainer.expectedSize = QGrpcHttp2ChannelPrivate::getExpectedDataSize(
394 container: dataContainer.container);
395 } else if (dataContainer.container.size() > 0) {
396 qGrpcWarning("Invalid container size received, size header is less than 5 "
397 "bytes");
398 }
399 }
400
401 if (dataContainer.container.size() < GrpcMessageSizeHeaderSize
402 || networkReply->isFinished()) {
403 dPtr->activeStreamReplies.erase(position: replyIt);
404 }
405 });
406
407 std::weak_ptr<QGrpcStream> weakGrpcStream(grpcStream);
408 *finishConnection = QObject::connect(
409 sender: networkReply, signal: &QNetworkReply::finished, context: grpcStream.get(),
410 slot: [weakGrpcStream, service, networkReply, abortConnection, readConnection,
411 finishConnection, this]() {
412 const QString errorString = networkReply->errorString();
413 const QNetworkReply::NetworkError networkError = networkReply->error();
414 QObject::disconnect(*readConnection);
415 QObject::disconnect(*abortConnection);
416
417 dPtr->activeStreamReplies.erase(x: networkReply);
418 QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
419 networkReply->deleteLater();
420
421 auto grpcStream = weakGrpcStream.lock();
422 if (!grpcStream) {
423 qGrpcWarning() << "Could not lock gRPC stream pointer.";
424 return;
425 }
426 qGrpcWarning() << grpcStream->method() << "call" << service
427 << "stream finished:" << errorString;
428 grpcStream->setMetadata(collectMetadata(networkReply));
429 switch (networkError) {
430 case QNetworkReply::NoError: {
431 // Reply is closed without network error, but may contain an unhandled data
432 // TODO: processReply returns the data, that might need the processing. It's
433 // should be taken into account in new HTTP/2 channel implementation.
434 QGrpcStatus::StatusCode grpcStatus;
435 QGrpcHttp2ChannelPrivate::processReply(networkReply, statusCode&: grpcStatus);
436 if (grpcStatus != QGrpcStatus::StatusCode::Ok) {
437 emit grpcStream->errorOccurred(
438 status: QGrpcStatus{ grpcStatus,
439 QLatin1StringView(networkReply->rawHeader(
440 headerName: GrpcStatusMessageHeader)) });
441 }
442 break;
443 }
444 default:
445 emit grpcStream->errorOccurred(
446 status: QGrpcStatus{ StatusCodeMap.at(k: networkError),
447 "%1 call %2 stream failed: %3"_L1.arg(
448 args: service, args: grpcStream->method(), args: errorString) });
449 break;
450 }
451 emit grpcStream->finished();
452 });
453
454 *abortConnection = QObject::connect(
455 sender: grpcStream.get(), signal: &QGrpcStream::finished, context: networkReply,
456 slot: [networkReply, finishConnection, abortConnection, readConnection] {
457 QObject::disconnect(*finishConnection);
458 QObject::disconnect(*readConnection);
459 QObject::disconnect(*abortConnection);
460
461 QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
462 networkReply->deleteLater();
463 });
464
465 return grpcStream;
466}
467
468/*!
469 Returns the newly created QProtobufSerializer shared pointer.
470*/
471std::shared_ptr<QAbstractProtobufSerializer> QGrpcHttp2Channel::serializer() const
472{
473 // TODO: make selection based on credentials or channel settings
474 return std::make_shared<QProtobufSerializer>();
475}
476
477QT_END_NAMESPACE
478

source code of qtgrpc/src/grpc/qgrpchttp2channel.cpp