1// Copyright (C) 2022 The Qt Company Ltd.
2// Copyright (C) 2019 Alexey Edelev <semlanik@gmail.com>
3// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only
4
5#include <QtCore/QThread>
6#include <QtCore/QTimer>
7#include <QtGrpc/private/qabstractgrpcchannel_p.h>
8#include <QtGrpc/qgrpccallreply.h>
9#include <QtGrpc/qgrpcstream.h>
10#include <QtProtobuf/qprotobufserializer.h>
11
12#include <qtgrpcglobal_p.h>
13
14#include <private/qobject_p.h>
15
16#include "qabstractgrpcclient.h"
17
18QT_BEGIN_NAMESPACE
19
20using namespace Qt::StringLiterals;
21
22namespace {
23static QString threadSafetyWarning(QLatin1StringView methodName)
24{
25 return "%1 is called from a different thread.\n"
26 "Qt GRPC doesn't guarantee thread safety on the channel level.\n"
27 "You have to be confident that channel routines are working in "
28 "the same thread as QAbstractGrpcClient."_L1.arg(args&: methodName);
29}
30} // namespace
31
32/*!
33 \class QAbstractGrpcClient
34 \inmodule QtGrpc
35 \brief The QAbstractGrpcClient class is bridge between gRPC clients
36 and channels.
37
38 QAbstractGrpcClient provides a set of functions for client classes
39 generated out of protobuf services.
40 QAbstractGrpcClient enforces thread safety for startStream() and call() methods
41 of generated clients.
42 The methods QAbstractGrpcClient::call() and QAbstractGrpcClient::startStream()
43 should only be called by the generated client classes.
44*/
45
46/*!
47 \fn template <typename ParamType> QGrpcStatus QAbstractGrpcClient::call(QLatin1StringView method,
48 const QProtobufMessage &arg, const QGrpcCallOptions &options);
49
50 Synchronously calls the given \a method of this service client,
51 with argument \a arg.
52 Uses \a options argument to set additional parameter for the call.
53*/
54
55/*!
56 \fn template <typename ParamType, typename ReturnType> QGrpcStatus QAbstractGrpcClient::call(QLatin1StringView method,
57 const QProtobufMessage &arg, ReturnType &ret, const QGrpcCallOptions &options);
58
59 Synchronously calls the given \a method of this service client,
60 with argument \a arg and fills \a ret with gRPC reply.
61 Uses \a options argument to set additional parameter for the call.
62*/
63
64/*!
65 \fn template <typename ParamType> QSharedPointer<QGrpcStream> QAbstractGrpcClient::startStream(QLatin1StringView method,
66 const QProtobufMessage &arg, const QGrpcCallOptions &options);
67
68 Streams messages from the server stream \a method with the message
69 argument \a arg to the attached channel.
70 Uses \a options argument to set additional parameter for the call.
71*/
72
73class QAbstractGrpcClientPrivate : public QObjectPrivate
74{
75 Q_DECLARE_PUBLIC(QAbstractGrpcClient)
76public:
77 QAbstractGrpcClientPrivate(QLatin1StringView service) : service(service.data(), service.size())
78 {
79 }
80
81 QGrpcStatus checkThread(QLatin1StringView warningPreamble);
82
83 std::shared_ptr<QAbstractGrpcChannel> channel;
84 const std::string service;
85 std::vector<std::shared_ptr<QGrpcStream>> activeStreams;
86};
87
88QGrpcStatus QAbstractGrpcClientPrivate::checkThread(QLatin1StringView warningPreamble)
89{
90 Q_Q(QAbstractGrpcClient);
91
92 QGrpcStatus status;
93 if (q->thread() != QThread::currentThread()) {
94 status = { QGrpcStatus::Unknown, threadSafetyWarning(methodName: warningPreamble) };
95 qGrpcCritical() << status.message();
96 emit q->errorOccurred(status);
97 }
98 return status;
99}
100
101QAbstractGrpcClient::QAbstractGrpcClient(QLatin1StringView service, QObject *parent)
102 : QObject(*new QAbstractGrpcClientPrivate(service), parent)
103{
104}
105
106QAbstractGrpcClient::~QAbstractGrpcClient() = default;
107
108/*!
109 Attaches \a channel to client as transport layer for gRPC.
110
111 Parameters and return values will be serialized to the channel
112 in a format it supports.
113
114 \note \b Warning: Qt GRPC doesn't guarantee thread safety on the channel level.
115 You have to invoke the channel-related functions on the same thread as
116 QAbstractGrpcClient.
117*/
118void QAbstractGrpcClient::attachChannel(const std::shared_ptr<QAbstractGrpcChannel> &channel)
119{
120 if (channel->dPtr->threadId != QThread::currentThreadId()) {
121 const QString status = threadSafetyWarning(methodName: "QAbstractGrpcClient::attachChannel"_L1);
122 qGrpcCritical() << status;
123 emit errorOccurred(status: { QGrpcStatus::Unknown, status });
124 return;
125 }
126 Q_D(QAbstractGrpcClient);
127 for (auto &stream : d->activeStreams)
128 stream->abort();
129
130 d->channel = channel;
131}
132
133QGrpcStatus QAbstractGrpcClient::call(QLatin1StringView method, QByteArrayView arg, QByteArray &ret,
134 const QGrpcCallOptions &options)
135{
136 Q_D(QAbstractGrpcClient);
137
138 QGrpcStatus callStatus = d->checkThread(warningPreamble: "QAbstractGrpcClient::call"_L1);
139 if (callStatus != QGrpcStatus::Ok)
140 return callStatus;
141
142 callStatus = d->channel
143 ? d->channel->call(method, service: QLatin1StringView(d->service), args: arg, ret, options)
144 : QGrpcStatus{ QGrpcStatus::Unknown, "No channel(s) attached."_L1 };
145
146 if (callStatus != QGrpcStatus::Ok)
147 emit errorOccurred(status: callStatus);
148
149 return callStatus;
150}
151
152std::shared_ptr<QGrpcCallReply> QAbstractGrpcClient::call(QLatin1StringView method,
153 QByteArrayView arg,
154 const QGrpcCallOptions &options)
155{
156 std::shared_ptr<QGrpcCallReply> reply;
157 Q_D(QAbstractGrpcClient);
158 if (d->checkThread(warningPreamble: "QAbstractGrpcClient::call"_L1) != QGrpcStatus::Ok)
159 return reply;
160
161 if (d->channel) {
162 reply = d->channel->call(method, service: QLatin1StringView(d->service), args: arg, options);
163
164 auto errorConnection = std::make_shared<QMetaObject::Connection>();
165 *errorConnection = connect(sender: reply.get(), signal: &QGrpcCallReply::errorOccurred, context: this,
166 slot: [this](const QGrpcStatus &status) {
167 emit errorOccurred(status);
168 });
169 } else {
170 emit errorOccurred(status: { QGrpcStatus::Unknown, "No channel(s) attached."_L1 });
171 }
172
173 return reply;
174}
175
176std::shared_ptr<QGrpcStream> QAbstractGrpcClient::startStream(QLatin1StringView method,
177 QByteArrayView arg,
178 const QGrpcCallOptions &options)
179{
180 Q_D(QAbstractGrpcClient);
181
182 std::shared_ptr<QGrpcStream> grpcStream;
183 if (d->checkThread(warningPreamble: "QAbstractGrpcClient::startStream"_L1) != QGrpcStatus::Ok)
184 return grpcStream;
185
186 if (d->channel) {
187 grpcStream = d->channel->startStream(method, service: QLatin1StringView(d->service), arg, options);
188
189 auto errorConnection = std::make_shared<QMetaObject::Connection>();
190 *errorConnection = connect(sender: grpcStream.get(), signal: &QGrpcStream::errorOccurred, context: this,
191 slot: [this, grpcStream](const QGrpcStatus &status) {
192 Q_D(QAbstractGrpcClient);
193 qGrpcWarning()
194 << grpcStream->method() << "call" << d->service
195 << "stream error: " << status.message();
196 errorOccurred(status);
197 });
198
199 auto finishedConnection = std::make_shared<QMetaObject::Connection>();
200 *finishedConnection = connect(
201 sender: grpcStream.get(), signal: &QGrpcStream::finished, context: this,
202 slot: [this, grpcStream, errorConnection, finishedConnection]() mutable {
203 Q_D(QAbstractGrpcClient);
204 qGrpcWarning()
205 << grpcStream->method() << "call" << d->service << "stream finished.";
206
207 auto it =
208 std::find(first: d->activeStreams.begin(), last: d->activeStreams.end(), val: grpcStream);
209 if (it != d->activeStreams.end())
210 d->activeStreams.erase(position: it);
211
212 QObject::disconnect(*errorConnection);
213 QObject::disconnect(*finishedConnection);
214 grpcStream.reset();
215 });
216
217 d->activeStreams.push_back(x: grpcStream);
218 } else {
219 emit errorOccurred(status: { QGrpcStatus::Unknown, "No channel(s) attached."_L1 });
220 }
221 return grpcStream;
222}
223
224/*!
225 Serializer provides assigned to client serializer.
226 Returns pointer to serializerowned by QProtobufSerializerRegistry.
227*/
228std::shared_ptr<QAbstractProtobufSerializer> QAbstractGrpcClient::serializer() const
229{
230 Q_D(const QAbstractGrpcClient);
231 if (const auto &c = d->channel)
232 return c->serializer();
233 return nullptr;
234}
235
236QGrpcStatus QAbstractGrpcClient::handleDeserializationError(
237 const QAbstractProtobufSerializer::DeserializationError &err)
238{
239 QGrpcStatus status{ QGrpcStatus::Ok };
240 switch (err) {
241 case QAbstractProtobufSerializer::InvalidHeaderError: {
242 const QLatin1StringView errStr("Response deserialization failed: invalid field found.");
243 status = { QGrpcStatus::InvalidArgument, errStr };
244 qGrpcCritical() << errStr;
245 emit errorOccurred(status);
246 } break;
247 case QAbstractProtobufSerializer::NoDeserializerError: {
248 const QLatin1StringView errStr("No deserializer was found for a given type.");
249 status = { QGrpcStatus::InvalidArgument, errStr };
250 qGrpcCritical() << errStr;
251 emit errorOccurred(status);
252 } break;
253 case QAbstractProtobufSerializer::UnexpectedEndOfStreamError: {
254 const QLatin1StringView errStr("Invalid size of received buffer.");
255 status = { QGrpcStatus::OutOfRange, errStr };
256 qGrpcCritical() << errStr;
257 emit errorOccurred(status);
258 } break;
259 case QAbstractProtobufSerializer::NoError:
260 Q_FALLTHROUGH();
261 default:
262 const QLatin1StringView errStr("Deserializing failed, but no error was set.");
263 status = { QGrpcStatus::InvalidArgument, errStr };
264 qGrpcCritical() << errStr;
265 emit errorOccurred(status);
266 }
267 return status;
268}
269
270QT_END_NAMESPACE
271
272#include "moc_qabstractgrpcclient.cpp"
273

source code of qtgrpc/src/grpc/qabstractgrpcclient.cpp