1//===-- Communication.cpp -------------------------------------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
9#include "lldb/Core/Communication.h"
10
11#include "lldb/Host/HostThread.h"
12#include "lldb/Host/ThreadLauncher.h"
13#include "lldb/Utility/Connection.h"
14#include "lldb/Utility/ConstString.h"
15#include "lldb/Utility/Event.h"
16#include "lldb/Utility/LLDBLog.h"
17#include "lldb/Utility/Listener.h"
18#include "lldb/Utility/Log.h"
19#include "lldb/Utility/Status.h"
20
21#include "llvm/ADT/None.h"
22#include "llvm/ADT/Optional.h"
23#include "llvm/Support/Compiler.h"
24
25#include <algorithm>
26#include <chrono>
27#include <cstring>
28#include <memory>
29
30#include <cerrno>
31#include <cinttypes>
32#include <cstdio>
33
34using namespace lldb;
35using namespace lldb_private;
36
37ConstString &Communication::GetStaticBroadcasterClass() {
38 static ConstString class_name("lldb.communication");
39 return class_name;
40}
41
42Communication::Communication(const char *name)
43 : Broadcaster(nullptr, name), m_connection_sp(),
44 m_read_thread_enabled(false), m_read_thread_did_exit(false), m_bytes(),
45 m_bytes_mutex(), m_write_mutex(), m_synchronize_mutex(),
46 m_callback(nullptr), m_callback_baton(nullptr), m_close_on_eof(true)
47
48{
49
50 LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
51 "{0} Communication::Communication (name = {1})", this, name);
52
53 SetEventName(eBroadcastBitDisconnected, "disconnected");
54 SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");
55 SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");
56 SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");
57 SetEventName(eBroadcastBitPacketAvailable, "packet available");
58 SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");
59
60 CheckInWithManager();
61}
62
63Communication::~Communication() {
64 LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
65 "{0} Communication::~Communication (name = {1})", this,
66 GetBroadcasterName().AsCString());
67 Clear();
68}
69
70void Communication::Clear() {
71 SetReadThreadBytesReceivedCallback(nullptr, nullptr);
72 StopReadThread(nullptr);
73 Disconnect(nullptr);
74}
75
76ConnectionStatus Communication::Connect(const char *url, Status *error_ptr) {
77 Clear();
78
79 LLDB_LOG(GetLog(LLDBLog::Communication),
80 "{0} Communication::Connect (url = {1})", this, url);
81
82 lldb::ConnectionSP connection_sp(m_connection_sp);
83 if (connection_sp)
84 return connection_sp->Connect(url, error_ptr);
85 if (error_ptr)
86 error_ptr->SetErrorString("Invalid connection.");
87 return eConnectionStatusNoConnection;
88}
89
90ConnectionStatus Communication::Disconnect(Status *error_ptr) {
91 LLDB_LOG(GetLog(LLDBLog::Communication), "{0} Communication::Disconnect ()",
92 this);
93
94 assert((!m_read_thread_enabled || m_read_thread_did_exit) &&
95 "Disconnecting while the read thread is running is racy!");
96 lldb::ConnectionSP connection_sp(m_connection_sp);
97 if (connection_sp) {
98 ConnectionStatus status = connection_sp->Disconnect(error_ptr);
99 // We currently don't protect connection_sp with any mutex for multi-
100 // threaded environments. So lets not nuke our connection class without
101 // putting some multi-threaded protections in. We also probably don't want
102 // to pay for the overhead it might cause if every time we access the
103 // connection we have to take a lock.
104 //
105 // This unique pointer will cleanup after itself when this object goes
106 // away, so there is no need to currently have it destroy itself
107 // immediately upon disconnect.
108 // connection_sp.reset();
109 return status;
110 }
111 return eConnectionStatusNoConnection;
112}
113
114bool Communication::IsConnected() const {
115 lldb::ConnectionSP connection_sp(m_connection_sp);
116 return (connection_sp ? connection_sp->IsConnected() : false);
117}
118
119bool Communication::HasConnection() const {
120 return m_connection_sp.get() != nullptr;
121}
122
123size_t Communication::Read(void *dst, size_t dst_len,
124 const Timeout<std::micro> &timeout,
125 ConnectionStatus &status, Status *error_ptr) {
126 Log *log = GetLog(LLDBLog::Communication);
127 LLDB_LOG(
128 log,
129 "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
130 this, dst, dst_len, timeout, m_connection_sp.get());
131
132 if (m_read_thread_enabled) {
133 // We have a dedicated read thread that is getting data for us
134 size_t cached_bytes = GetCachedBytes(dst, dst_len);
135 if (cached_bytes > 0 || (timeout && timeout->count() == 0)) {
136 status = eConnectionStatusSuccess;
137 return cached_bytes;
138 }
139
140 if (!m_connection_sp) {
141 if (error_ptr)
142 error_ptr->SetErrorString("Invalid connection.");
143 status = eConnectionStatusNoConnection;
144 return 0;
145 }
146
147 ListenerSP listener_sp(Listener::MakeListener("Communication::Read"));
148 listener_sp->StartListeningForEvents(
149 this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
150 EventSP event_sp;
151 while (listener_sp->GetEvent(event_sp, timeout)) {
152 const uint32_t event_type = event_sp->GetType();
153 if (event_type & eBroadcastBitReadThreadGotBytes) {
154 return GetCachedBytes(dst, dst_len);
155 }
156
157 if (event_type & eBroadcastBitReadThreadDidExit) {
158 if (GetCloseOnEOF())
159 Disconnect(nullptr);
160 break;
161 }
162 }
163 return 0;
164 }
165
166 // We aren't using a read thread, just read the data synchronously in this
167 // thread.
168 return ReadFromConnection(dst, dst_len, timeout, status, error_ptr);
169}
170
171size_t Communication::Write(const void *src, size_t src_len,
172 ConnectionStatus &status, Status *error_ptr) {
173 lldb::ConnectionSP connection_sp(m_connection_sp);
174
175 std::lock_guard<std::mutex> guard(m_write_mutex);
176 LLDB_LOG(GetLog(LLDBLog::Communication),
177 "{0} Communication::Write (src = {1}, src_len = {2}"
178 ") connection = {3}",
179 this, src, (uint64_t)src_len, connection_sp.get());
180
181 if (connection_sp)
182 return connection_sp->Write(src, src_len, status, error_ptr);
183
184 if (error_ptr)
185 error_ptr->SetErrorString("Invalid connection.");
186 status = eConnectionStatusNoConnection;
187 return 0;
188}
189
190size_t Communication::WriteAll(const void *src, size_t src_len,
191 ConnectionStatus &status, Status *error_ptr) {
192 size_t total_written = 0;
193 do
194 total_written += Write(static_cast<const char *>(src) + total_written,
195 src_len - total_written, status, error_ptr);
196 while (status == eConnectionStatusSuccess && total_written < src_len);
197 return total_written;
198}
199
200bool Communication::StartReadThread(Status *error_ptr) {
201 if (error_ptr)
202 error_ptr->Clear();
203
204 if (m_read_thread.IsJoinable())
205 return true;
206
207 LLDB_LOG(GetLog(LLDBLog::Communication),
208 "{0} Communication::StartReadThread ()", this);
209
210 const std::string thread_name =
211 llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName());
212
213 m_read_thread_enabled = true;
214 m_read_thread_did_exit = false;
215 auto maybe_thread = ThreadLauncher::LaunchThread(
216 thread_name, [this] { return ReadThread(); });
217 if (maybe_thread) {
218 m_read_thread = *maybe_thread;
219 } else {
220 if (error_ptr)
221 *error_ptr = Status(maybe_thread.takeError());
222 else {
223 LLDB_LOG(GetLog(LLDBLog::Host), "failed to launch host thread: {}",
224 llvm::toString(maybe_thread.takeError()));
225 }
226 }
227
228 if (!m_read_thread.IsJoinable())
229 m_read_thread_enabled = false;
230
231 return m_read_thread_enabled;
232}
233
234bool Communication::StopReadThread(Status *error_ptr) {
235 if (!m_read_thread.IsJoinable())
236 return true;
237
238 LLDB_LOG(GetLog(LLDBLog::Communication),
239 "{0} Communication::StopReadThread ()", this);
240
241 m_read_thread_enabled = false;
242
243 BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
244
245 // error = m_read_thread.Cancel();
246
247 Status error = m_read_thread.Join(nullptr);
248 return error.Success();
249}
250
251bool Communication::JoinReadThread(Status *error_ptr) {
252 if (!m_read_thread.IsJoinable())
253 return true;
254
255 Status error = m_read_thread.Join(nullptr);
256 return error.Success();
257}
258
259size_t Communication::GetCachedBytes(void *dst, size_t dst_len) {
260 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
261 if (!m_bytes.empty()) {
262 // If DST is nullptr and we have a thread, then return the number of bytes
263 // that are available so the caller can call again
264 if (dst == nullptr)
265 return m_bytes.size();
266
267 const size_t len = std::min<size_t>(dst_len, m_bytes.size());
268
269 ::memcpy(dst, m_bytes.c_str(), len);
270 m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
271
272 return len;
273 }
274 return 0;
275}
276
277void Communication::AppendBytesToCache(const uint8_t *bytes, size_t len,
278 bool broadcast,
279 ConnectionStatus status) {
280 LLDB_LOG(GetLog(LLDBLog::Communication),
281 "{0} Communication::AppendBytesToCache (src = {1}, src_len = {2}, "
282 "broadcast = {3})",
283 this, bytes, (uint64_t)len, broadcast);
284 if ((bytes == nullptr || len == 0) &&
285 (status != lldb::eConnectionStatusEndOfFile))
286 return;
287 if (m_callback) {
288 // If the user registered a callback, then call it and do not broadcast
289 m_callback(m_callback_baton, bytes, len);
290 } else if (bytes != nullptr && len > 0) {
291 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
292 m_bytes.append((const char *)bytes, len);
293 if (broadcast)
294 BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes);
295 }
296}
297
298size_t Communication::ReadFromConnection(void *dst, size_t dst_len,
299 const Timeout<std::micro> &timeout,
300 ConnectionStatus &status,
301 Status *error_ptr) {
302 lldb::ConnectionSP connection_sp(m_connection_sp);
303 if (connection_sp)
304 return connection_sp->Read(dst, dst_len, timeout, status, error_ptr);
305
306 if (error_ptr)
307 error_ptr->SetErrorString("Invalid connection.");
308 status = eConnectionStatusNoConnection;
309 return 0;
310}
311
312bool Communication::ReadThreadIsRunning() { return m_read_thread_enabled; }
313
314lldb::thread_result_t Communication::ReadThread() {
315 Log *log = GetLog(LLDBLog::Communication);
316
317 LLDB_LOG(log, "Communication({0}) thread starting...", this);
318
319 uint8_t buf[1024];
320
321 Status error;
322 ConnectionStatus status = eConnectionStatusSuccess;
323 bool done = false;
324 bool disconnect = false;
325 while (!done && m_read_thread_enabled) {
326 size_t bytes_read = ReadFromConnection(
327 buf, sizeof(buf), std::chrono::seconds(5), status, &error);
328 if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
329 AppendBytesToCache(buf, bytes_read, true, status);
330
331 switch (status) {
332 case eConnectionStatusSuccess:
333 break;
334
335 case eConnectionStatusEndOfFile:
336 done = true;
337 disconnect = GetCloseOnEOF();
338 break;
339 case eConnectionStatusError: // Check GetError() for details
340 if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
341 // EIO on a pipe is usually caused by remote shutdown
342 disconnect = GetCloseOnEOF();
343 done = true;
344 }
345 if (error.Fail())
346 LLDB_LOG(log, "error: {0}, status = {1}", error,
347 Communication::ConnectionStatusAsString(status));
348 break;
349 case eConnectionStatusInterrupted: // Synchronization signal from
350 // SynchronizeWithReadThread()
351 // The connection returns eConnectionStatusInterrupted only when there is
352 // no input pending to be read, so we can signal that.
353 BroadcastEvent(eBroadcastBitNoMorePendingInput);
354 break;
355 case eConnectionStatusNoConnection: // No connection
356 case eConnectionStatusLostConnection: // Lost connection while connected to
357 // a valid connection
358 done = true;
359 [[fallthrough]];
360 case eConnectionStatusTimedOut: // Request timed out
361 if (error.Fail())
362 LLDB_LOG(log, "error: {0}, status = {1}", error,
363 Communication::ConnectionStatusAsString(status));
364 break;
365 }
366 }
367 log = GetLog(LLDBLog::Communication);
368 LLDB_LOG(log, "Communication({0}) thread exiting...", this);
369
370 // Handle threads wishing to synchronize with us.
371 {
372 // Prevent new ones from showing up.
373 m_read_thread_did_exit = true;
374
375 // Unblock any existing thread waiting for the synchronization signal.
376 BroadcastEvent(eBroadcastBitNoMorePendingInput);
377
378 // Wait for the thread to finish...
379 std::lock_guard<std::mutex> guard(m_synchronize_mutex);
380 // ... and disconnect.
381 if (disconnect)
382 Disconnect();
383 }
384
385 // Let clients know that this thread is exiting
386 BroadcastEvent(eBroadcastBitReadThreadDidExit);
387 return {};
388}
389
390void Communication::SetReadThreadBytesReceivedCallback(
391 ReadThreadBytesReceived callback, void *callback_baton) {
392 m_callback = callback;
393 m_callback_baton = callback_baton;
394}
395
396void Communication::SynchronizeWithReadThread() {
397 // Only one thread can do the synchronization dance at a time.
398 std::lock_guard<std::mutex> guard(m_synchronize_mutex);
399
400 // First start listening for the synchronization event.
401 ListenerSP listener_sp(
402 Listener::MakeListener("Communication::SyncronizeWithReadThread"));
403 listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
404
405 // If the thread is not running, there is no point in synchronizing.
406 if (!m_read_thread_enabled || m_read_thread_did_exit)
407 return;
408
409 // Notify the read thread.
410 m_connection_sp->InterruptRead();
411
412 // Wait for the synchronization event.
413 EventSP event_sp;
414 listener_sp->GetEvent(event_sp, llvm::None);
415}
416
417void Communication::SetConnection(std::unique_ptr<Connection> connection) {
418 Disconnect(nullptr);
419 StopReadThread(nullptr);
420 m_connection_sp = std::move(connection);
421}
422
423std::string
424Communication::ConnectionStatusAsString(lldb::ConnectionStatus status) {
425 switch (status) {
426 case eConnectionStatusSuccess:
427 return "success";
428 case eConnectionStatusError:
429 return "error";
430 case eConnectionStatusTimedOut:
431 return "timed out";
432 case eConnectionStatusNoConnection:
433 return "no connection";
434 case eConnectionStatusLostConnection:
435 return "lost connection";
436 case eConnectionStatusEndOfFile:
437 return "end of file";
438 case eConnectionStatusInterrupted:
439 return "interrupted";
440 }
441
442 return "@" + std::to_string(status);
443}
444

source code of lldb/source/Core/Communication.cpp