1 | //===--- Client.cpp ----------------------------------------------*- C++-*-===// |
2 | // |
3 | // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
4 | // See https://llvm.org/LICENSE.txt for license information. |
5 | // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | |
9 | #include <grpc++/grpc++.h> |
10 | |
11 | #include "Client.h" |
12 | #include "Feature.h" |
13 | #include "Service.grpc.pb.h" |
14 | #include "index/Index.h" |
15 | #include "marshalling/Marshalling.h" |
16 | #include "support/Logger.h" |
17 | #include "support/Trace.h" |
18 | #include "llvm/ADT/SmallString.h" |
19 | #include "llvm/ADT/StringRef.h" |
20 | #include "llvm/Support/Error.h" |
21 | |
22 | #include <atomic> |
23 | #include <chrono> |
24 | #include <memory> |
25 | |
26 | namespace clang { |
27 | namespace clangd { |
28 | namespace remote { |
29 | namespace { |
30 | |
31 | llvm::StringRef toString(const grpc_connectivity_state &State) { |
32 | switch (State) { |
33 | case GRPC_CHANNEL_IDLE: |
34 | return "idle" ; |
35 | case GRPC_CHANNEL_CONNECTING: |
36 | return "connecting" ; |
37 | case GRPC_CHANNEL_READY: |
38 | return "ready" ; |
39 | case GRPC_CHANNEL_TRANSIENT_FAILURE: |
40 | return "transient failure" ; |
41 | case GRPC_CHANNEL_SHUTDOWN: |
42 | return "shutdown" ; |
43 | } |
44 | llvm_unreachable("Not a valid grpc_connectivity_state." ); |
45 | } |
46 | |
47 | class IndexClient : public clangd::SymbolIndex { |
48 | void updateConnectionStatus() const { |
49 | auto NewStatus = Channel->GetState(/*try_to_connect=*/false); |
50 | auto OldStatus = ConnectionStatus.exchange(i: NewStatus); |
51 | if (OldStatus != NewStatus) |
52 | vlog(Fmt: "Remote index connection [{0}]: {1} => {2}" , Vals: ServerAddress, |
53 | Vals: toString(State: OldStatus), Vals: toString(State: NewStatus)); |
54 | } |
55 | |
56 | template <typename RequestT, typename ReplyT> |
57 | using StreamingCall = std::unique_ptr<grpc::ClientReader<ReplyT>> ( |
58 | remote::v1::SymbolIndex::Stub::*)(grpc::ClientContext *, |
59 | const RequestT &); |
60 | |
61 | template <typename RequestT, typename ReplyT, typename ClangdRequestT, |
62 | typename CallbackT> |
63 | bool streamRPC(ClangdRequestT Request, |
64 | StreamingCall<RequestT, ReplyT> RPCCall, |
65 | CallbackT Callback) const { |
66 | updateConnectionStatus(); |
67 | // We initialize to true because stream might be broken before we see the |
68 | // final message. In such a case there are actually more results on the |
69 | // stream, but we couldn't get to them. |
70 | bool HasMore = true; |
71 | trace::Span Tracer(RequestT::descriptor()->name()); |
72 | const auto RPCRequest = ProtobufMarshaller->toProtobuf(Request); |
73 | SPAN_ATTACH(Tracer, "Request" , RPCRequest.DebugString()); |
74 | grpc::ClientContext Context; |
75 | Context.AddMetadata(meta_key: "version" , meta_value: versionString()); |
76 | Context.AddMetadata(meta_key: "features" , meta_value: featureString()); |
77 | Context.AddMetadata(meta_key: "platform" , meta_value: platformString()); |
78 | std::chrono::system_clock::time_point StartTime = |
79 | std::chrono::system_clock::now(); |
80 | auto Deadline = StartTime + DeadlineWaitingTime; |
81 | Context.set_deadline(Deadline); |
82 | auto Reader = (Stub.get()->*RPCCall)(&Context, RPCRequest); |
83 | dlog("Sending {0}: {1}" , RequestT::descriptor()->name(), |
84 | RPCRequest.DebugString()); |
85 | ReplyT Reply; |
86 | unsigned Successful = 0; |
87 | unsigned FailedToParse = 0; |
88 | while (Reader->Read(&Reply)) { |
89 | if (!Reply.has_stream_result()) { |
90 | HasMore = Reply.final_result().has_more(); |
91 | continue; |
92 | } |
93 | auto Response = ProtobufMarshaller->fromProtobuf(Reply.stream_result()); |
94 | if (!Response) { |
95 | elog("Received invalid {0}: {1}. Reason: {2}" , |
96 | ReplyT::descriptor()->name(), Reply.stream_result().DebugString(), |
97 | Response.takeError()); |
98 | ++FailedToParse; |
99 | continue; |
100 | } |
101 | Callback(*Response); |
102 | ++Successful; |
103 | } |
104 | auto Millis = std::chrono::duration_cast<std::chrono::milliseconds>( |
105 | d: std::chrono::system_clock::now() - StartTime) |
106 | .count(); |
107 | vlog("Remote index [{0}]: {1} => {2} results in {3}ms." , ServerAddress, |
108 | RequestT::descriptor()->name(), Successful, Millis); |
109 | SPAN_ATTACH(Tracer, "Status" , Reader->Finish().ok()); |
110 | SPAN_ATTACH(Tracer, "Successful" , Successful); |
111 | SPAN_ATTACH(Tracer, "Failed to parse" , FailedToParse); |
112 | updateConnectionStatus(); |
113 | return HasMore; |
114 | } |
115 | |
116 | public: |
117 | IndexClient( |
118 | std::shared_ptr<grpc::Channel> Channel, llvm::StringRef Address, |
119 | llvm::StringRef ProjectRoot, |
120 | std::chrono::milliseconds DeadlineTime = std::chrono::milliseconds(1000)) |
121 | : Stub(remote::v1::SymbolIndex::NewStub(Channel)), Channel(Channel), |
122 | ServerAddress(Address), |
123 | ConnectionStatus(Channel->GetState(/*try_to_connect=*/true)), |
124 | ProtobufMarshaller(new Marshaller(/*RemoteIndexRoot=*/"" , |
125 | /*LocalIndexRoot=*/ProjectRoot)), |
126 | DeadlineWaitingTime(DeadlineTime) { |
127 | assert(!ProjectRoot.empty()); |
128 | } |
129 | |
130 | void lookup(const clangd::LookupRequest &Request, |
131 | llvm::function_ref<void(const clangd::Symbol &)> Callback) |
132 | const override { |
133 | streamRPC(Request, &remote::v1::SymbolIndex::Stub::Lookup, Callback); |
134 | } |
135 | |
136 | bool fuzzyFind(const clangd::FuzzyFindRequest &Request, |
137 | llvm::function_ref<void(const clangd::Symbol &)> Callback) |
138 | const override { |
139 | return streamRPC(Request, &remote::v1::SymbolIndex::Stub::FuzzyFind, |
140 | Callback); |
141 | } |
142 | |
143 | bool |
144 | refs(const clangd::RefsRequest &Request, |
145 | llvm::function_ref<void(const clangd::Ref &)> Callback) const override { |
146 | return streamRPC(Request, &remote::v1::SymbolIndex::Stub::Refs, Callback); |
147 | } |
148 | |
149 | void |
150 | relations(const clangd::RelationsRequest &Request, |
151 | llvm::function_ref<void(const SymbolID &, const clangd::Symbol &)> |
152 | Callback) const override { |
153 | streamRPC(Request, &remote::v1::SymbolIndex::Stub::Relations, |
154 | // Unpack protobuf Relation. |
155 | [&](std::pair<SymbolID, clangd::Symbol> SubjectAndObject) { |
156 | Callback(SubjectAndObject.first, SubjectAndObject.second); |
157 | }); |
158 | } |
159 | |
160 | llvm::unique_function<IndexContents(llvm::StringRef) const> |
161 | indexedFiles() const override { |
162 | // FIXME: For now we always return IndexContents::None regardless of whether |
163 | // the file was indexed or not. A possible implementation could be |
164 | // based on the idea that we do not want to send a request at every |
165 | // call of a function returned by IndexClient::indexedFiles(). |
166 | return [](llvm::StringRef) { return IndexContents::None; }; |
167 | } |
168 | |
169 | // IndexClient does not take any space since the data is stored on the |
170 | // server. |
171 | size_t estimateMemoryUsage() const override { return 0; } |
172 | |
173 | private: |
174 | std::unique_ptr<remote::v1::SymbolIndex::Stub> Stub; |
175 | std::shared_ptr<grpc::Channel> Channel; |
176 | llvm::SmallString<256> ServerAddress; |
177 | mutable std::atomic<grpc_connectivity_state> ConnectionStatus; |
178 | std::unique_ptr<Marshaller> ProtobufMarshaller; |
179 | // Each request will be terminated if it takes too long. |
180 | std::chrono::milliseconds DeadlineWaitingTime; |
181 | }; |
182 | |
183 | } // namespace |
184 | |
185 | std::unique_ptr<clangd::SymbolIndex> getClient(llvm::StringRef Address, |
186 | llvm::StringRef ProjectRoot) { |
187 | const auto Channel = |
188 | grpc::CreateChannel(target: Address.str(), creds: grpc::InsecureChannelCredentials()); |
189 | return std::unique_ptr<clangd::SymbolIndex>( |
190 | new IndexClient(Channel, Address, ProjectRoot)); |
191 | } |
192 | |
193 | } // namespace remote |
194 | } // namespace clangd |
195 | } // namespace clang |
196 | |