| 1 | //===-- ThreadedCommunication.cpp -----------------------------------------===// |
| 2 | // |
| 3 | // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
| 4 | // See https://llvm.org/LICENSE.txt for license information. |
| 5 | // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
| 6 | // |
| 7 | //===----------------------------------------------------------------------===// |
| 8 | |
| 9 | #include "lldb/Core/ThreadedCommunication.h" |
| 10 | |
| 11 | #include "lldb/Host/ThreadLauncher.h" |
| 12 | #include "lldb/Utility/Connection.h" |
| 13 | #include "lldb/Utility/ConstString.h" |
| 14 | #include "lldb/Utility/Event.h" |
| 15 | #include "lldb/Utility/LLDBLog.h" |
| 16 | #include "lldb/Utility/Listener.h" |
| 17 | #include "lldb/Utility/Log.h" |
| 18 | #include "lldb/Utility/Status.h" |
| 19 | |
| 20 | #include "llvm/Support/Compiler.h" |
| 21 | |
| 22 | #include <algorithm> |
| 23 | #include <chrono> |
| 24 | #include <cstring> |
| 25 | #include <memory> |
| 26 | #include <shared_mutex> |
| 27 | |
| 28 | #include <cerrno> |
| 29 | #include <cinttypes> |
| 30 | #include <cstdio> |
| 31 | |
| 32 | using namespace lldb; |
| 33 | using namespace lldb_private; |
| 34 | |
| 35 | llvm::StringRef ThreadedCommunication::GetStaticBroadcasterClass() { |
| 36 | static constexpr llvm::StringLiteral class_name("lldb.communication" ); |
| 37 | return class_name; |
| 38 | } |
| 39 | |
| 40 | ThreadedCommunication::ThreadedCommunication(const char *name) |
| 41 | : Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false), |
| 42 | m_read_thread_did_exit(false), m_bytes(), m_bytes_mutex(), |
| 43 | m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) { |
| 44 | LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), |
| 45 | "{0} ThreadedCommunication::ThreadedCommunication (name = {1})" , |
| 46 | this, name); |
| 47 | |
| 48 | SetEventName(event_mask: eBroadcastBitDisconnected, name: "disconnected" ); |
| 49 | SetEventName(event_mask: eBroadcastBitReadThreadGotBytes, name: "got bytes" ); |
| 50 | SetEventName(event_mask: eBroadcastBitReadThreadDidExit, name: "read thread did exit" ); |
| 51 | SetEventName(event_mask: eBroadcastBitReadThreadShouldExit, name: "read thread should exit" ); |
| 52 | SetEventName(event_mask: eBroadcastBitPacketAvailable, name: "packet available" ); |
| 53 | SetEventName(event_mask: eBroadcastBitNoMorePendingInput, name: "no more pending input" ); |
| 54 | |
| 55 | CheckInWithManager(); |
| 56 | } |
| 57 | |
| 58 | ThreadedCommunication::~ThreadedCommunication() { |
| 59 | LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), |
| 60 | "{0} ThreadedCommunication::~ThreadedCommunication (name = {1})" , |
| 61 | this, GetBroadcasterName()); |
| 62 | } |
| 63 | |
| 64 | void ThreadedCommunication::Clear() { |
| 65 | SetReadThreadBytesReceivedCallback(callback: nullptr, callback_baton: nullptr); |
| 66 | StopReadThread(error_ptr: nullptr); |
| 67 | Communication::Clear(); |
| 68 | } |
| 69 | |
| 70 | ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) { |
| 71 | assert((!m_read_thread_enabled || m_read_thread_did_exit) && |
| 72 | "Disconnecting while the read thread is running is racy!" ); |
| 73 | return Communication::Disconnect(error_ptr); |
| 74 | } |
| 75 | |
| 76 | size_t ThreadedCommunication::Read(void *dst, size_t dst_len, |
| 77 | const Timeout<std::micro> &timeout, |
| 78 | ConnectionStatus &status, |
| 79 | Status *error_ptr) { |
| 80 | Log *log = GetLog(mask: LLDBLog::Communication); |
| 81 | LLDB_LOG( |
| 82 | log, |
| 83 | "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}" , |
| 84 | this, dst, dst_len, timeout, m_connection_sp.get()); |
| 85 | |
| 86 | if (m_read_thread_enabled) { |
| 87 | // We have a dedicated read thread that is getting data for us |
| 88 | size_t cached_bytes = GetCachedBytes(dst, dst_len); |
| 89 | if (cached_bytes > 0) { |
| 90 | status = eConnectionStatusSuccess; |
| 91 | return cached_bytes; |
| 92 | } |
| 93 | if (timeout && timeout->count() == 0) { |
| 94 | if (error_ptr) |
| 95 | *error_ptr = Status::FromErrorString(str: "Timed out." ); |
| 96 | status = eConnectionStatusTimedOut; |
| 97 | return 0; |
| 98 | } |
| 99 | |
| 100 | if (!m_connection_sp) { |
| 101 | if (error_ptr) |
| 102 | *error_ptr = Status::FromErrorString(str: "Invalid connection." ); |
| 103 | status = eConnectionStatusNoConnection; |
| 104 | return 0; |
| 105 | } |
| 106 | |
| 107 | // No data yet, we have to start listening. |
| 108 | ListenerSP listener_sp( |
| 109 | Listener::MakeListener(name: "ThreadedCommunication::Read" )); |
| 110 | listener_sp->StartListeningForEvents( |
| 111 | broadcaster: this, event_mask: eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit); |
| 112 | |
| 113 | // Re-check for data, as it might have arrived while we were setting up our |
| 114 | // listener. |
| 115 | cached_bytes = GetCachedBytes(dst, dst_len); |
| 116 | if (cached_bytes > 0) { |
| 117 | status = eConnectionStatusSuccess; |
| 118 | return cached_bytes; |
| 119 | } |
| 120 | |
| 121 | EventSP event_sp; |
| 122 | // Explicitly check for the thread exit, for the same reason. |
| 123 | if (m_read_thread_did_exit) { |
| 124 | // We've missed the event, lets just conjure one up. |
| 125 | event_sp = std::make_shared<Event>(args: eBroadcastBitReadThreadDidExit); |
| 126 | } else { |
| 127 | if (!listener_sp->GetEvent(event_sp, timeout)) { |
| 128 | if (error_ptr) |
| 129 | *error_ptr = Status::FromErrorString(str: "Timed out." ); |
| 130 | status = eConnectionStatusTimedOut; |
| 131 | return 0; |
| 132 | } |
| 133 | } |
| 134 | const uint32_t event_type = event_sp->GetType(); |
| 135 | if (event_type & eBroadcastBitReadThreadGotBytes) { |
| 136 | return GetCachedBytes(dst, dst_len); |
| 137 | } |
| 138 | |
| 139 | if (event_type & eBroadcastBitReadThreadDidExit) { |
| 140 | // If the thread exited of its own accord, it either means it |
| 141 | // hit an end-of-file condition or an error. |
| 142 | status = m_pass_status; |
| 143 | if (error_ptr) |
| 144 | *error_ptr = std::move(m_pass_error); |
| 145 | |
| 146 | if (GetCloseOnEOF()) |
| 147 | Disconnect(error_ptr: nullptr); |
| 148 | return 0; |
| 149 | } |
| 150 | llvm_unreachable("Got unexpected event type!" ); |
| 151 | } |
| 152 | |
| 153 | // We aren't using a read thread, just read the data synchronously in this |
| 154 | // thread. |
| 155 | return Communication::Read(dst, dst_len, timeout, status, error_ptr); |
| 156 | } |
| 157 | |
| 158 | bool ThreadedCommunication::StartReadThread(Status *error_ptr) { |
| 159 | std::lock_guard<std::mutex> lock(m_read_thread_mutex); |
| 160 | |
| 161 | if (error_ptr) |
| 162 | error_ptr->Clear(); |
| 163 | |
| 164 | if (m_read_thread.IsJoinable()) |
| 165 | return true; |
| 166 | |
| 167 | LLDB_LOG(GetLog(LLDBLog::Communication), |
| 168 | "{0} ThreadedCommunication::StartReadThread ()" , this); |
| 169 | |
| 170 | const std::string thread_name = |
| 171 | llvm::formatv(Fmt: "<lldb.comm.{0}>" , Vals: GetBroadcasterName()); |
| 172 | |
| 173 | m_read_thread_enabled = true; |
| 174 | m_read_thread_did_exit = false; |
| 175 | auto maybe_thread = ThreadLauncher::LaunchThread( |
| 176 | name: thread_name, thread_function: [this] { return ReadThread(); }); |
| 177 | if (maybe_thread) { |
| 178 | m_read_thread = *maybe_thread; |
| 179 | } else { |
| 180 | if (error_ptr) |
| 181 | *error_ptr = Status::FromError(error: maybe_thread.takeError()); |
| 182 | else { |
| 183 | LLDB_LOG_ERROR(GetLog(LLDBLog::Host), maybe_thread.takeError(), |
| 184 | "failed to launch host thread: {0}" ); |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | if (!m_read_thread.IsJoinable()) |
| 189 | m_read_thread_enabled = false; |
| 190 | |
| 191 | return m_read_thread_enabled; |
| 192 | } |
| 193 | |
| 194 | bool ThreadedCommunication::StopReadThread(Status *error_ptr) { |
| 195 | std::lock_guard<std::mutex> lock(m_read_thread_mutex); |
| 196 | |
| 197 | if (!m_read_thread.IsJoinable()) |
| 198 | return true; |
| 199 | |
| 200 | LLDB_LOG(GetLog(LLDBLog::Communication), |
| 201 | "{0} ThreadedCommunication::StopReadThread ()" , this); |
| 202 | |
| 203 | m_read_thread_enabled = false; |
| 204 | |
| 205 | BroadcastEvent(event_type: eBroadcastBitReadThreadShouldExit, event_data_sp: nullptr); |
| 206 | |
| 207 | Status error = m_read_thread.Join(result: nullptr); |
| 208 | return error.Success(); |
| 209 | } |
| 210 | |
| 211 | bool ThreadedCommunication::JoinReadThread(Status *error_ptr) { |
| 212 | std::lock_guard<std::mutex> lock(m_read_thread_mutex); |
| 213 | |
| 214 | if (!m_read_thread.IsJoinable()) |
| 215 | return true; |
| 216 | |
| 217 | Status error = m_read_thread.Join(result: nullptr); |
| 218 | return error.Success(); |
| 219 | } |
| 220 | |
| 221 | size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) { |
| 222 | std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); |
| 223 | if (!m_bytes.empty()) { |
| 224 | // If DST is nullptr and we have a thread, then return the number of bytes |
| 225 | // that are available so the caller can call again |
| 226 | if (dst == nullptr) |
| 227 | return m_bytes.size(); |
| 228 | |
| 229 | const size_t len = std::min<size_t>(a: dst_len, b: m_bytes.size()); |
| 230 | |
| 231 | ::memcpy(dest: dst, src: m_bytes.c_str(), n: len); |
| 232 | m_bytes.erase(first: m_bytes.begin(), last: m_bytes.begin() + len); |
| 233 | |
| 234 | return len; |
| 235 | } |
| 236 | return 0; |
| 237 | } |
| 238 | |
| 239 | void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len, |
| 240 | bool broadcast, |
| 241 | ConnectionStatus status) { |
| 242 | LLDB_LOG(GetLog(LLDBLog::Communication), |
| 243 | "{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len " |
| 244 | "= {2}, " |
| 245 | "broadcast = {3})" , |
| 246 | this, bytes, (uint64_t)len, broadcast); |
| 247 | if ((bytes == nullptr || len == 0) && |
| 248 | (status != lldb::eConnectionStatusEndOfFile)) |
| 249 | return; |
| 250 | if (m_callback) { |
| 251 | // If the user registered a callback, then call it and do not broadcast |
| 252 | m_callback(m_callback_baton, bytes, len); |
| 253 | } else if (bytes != nullptr && len > 0) { |
| 254 | std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); |
| 255 | m_bytes.append(s: (const char *)bytes, n: len); |
| 256 | if (broadcast) |
| 257 | BroadcastEventIfUnique(event_type: eBroadcastBitReadThreadGotBytes); |
| 258 | } |
| 259 | } |
| 260 | |
| 261 | bool ThreadedCommunication::ReadThreadIsRunning() { |
| 262 | return m_read_thread_enabled; |
| 263 | } |
| 264 | |
| 265 | lldb::thread_result_t ThreadedCommunication::ReadThread() { |
| 266 | Log *log = GetLog(mask: LLDBLog::Communication); |
| 267 | |
| 268 | LLDB_LOG(log, "Communication({0}) thread starting..." , this); |
| 269 | |
| 270 | uint8_t buf[1024]; |
| 271 | |
| 272 | Status error; |
| 273 | ConnectionStatus status = eConnectionStatusSuccess; |
| 274 | bool done = false; |
| 275 | bool disconnect = false; |
| 276 | while (!done && m_read_thread_enabled) { |
| 277 | size_t bytes_read = ReadFromConnection( |
| 278 | dst: buf, dst_len: sizeof(buf), timeout: std::chrono::seconds(5), status, error_ptr: &error); |
| 279 | if (bytes_read > 0 || status == eConnectionStatusEndOfFile) |
| 280 | AppendBytesToCache(bytes: buf, len: bytes_read, broadcast: true, status); |
| 281 | |
| 282 | switch (status) { |
| 283 | case eConnectionStatusSuccess: |
| 284 | break; |
| 285 | |
| 286 | case eConnectionStatusEndOfFile: |
| 287 | done = true; |
| 288 | disconnect = GetCloseOnEOF(); |
| 289 | break; |
| 290 | case eConnectionStatusError: // Check GetError() for details |
| 291 | if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { |
| 292 | // EIO on a pipe is usually caused by remote shutdown |
| 293 | disconnect = GetCloseOnEOF(); |
| 294 | done = true; |
| 295 | } |
| 296 | if (error.Fail()) |
| 297 | LLDB_LOG(log, "error: {0}, status = {1}" , error, |
| 298 | ThreadedCommunication::ConnectionStatusAsString(status)); |
| 299 | break; |
| 300 | case eConnectionStatusInterrupted: // Synchronization signal from |
| 301 | // SynchronizeWithReadThread() |
| 302 | // The connection returns eConnectionStatusInterrupted only when there is |
| 303 | // no input pending to be read, so we can signal that. |
| 304 | BroadcastEvent(event_type: eBroadcastBitNoMorePendingInput); |
| 305 | break; |
| 306 | case eConnectionStatusNoConnection: // No connection |
| 307 | case eConnectionStatusLostConnection: // Lost connection while connected to |
| 308 | // a valid connection |
| 309 | done = true; |
| 310 | [[fallthrough]]; |
| 311 | case eConnectionStatusTimedOut: // Request timed out |
| 312 | if (error.Fail()) |
| 313 | LLDB_LOG(log, "error: {0}, status = {1}" , error, |
| 314 | ThreadedCommunication::ConnectionStatusAsString(status)); |
| 315 | break; |
| 316 | } |
| 317 | } |
| 318 | m_pass_status = status; |
| 319 | m_pass_error = std::move(error); |
| 320 | LLDB_LOG(log, "Communication({0}) thread exiting..." , this); |
| 321 | |
| 322 | // Start shutting down. We need to do this in a very specific order to ensure |
| 323 | // we don't race with threads wanting to read/synchronize with us. |
| 324 | |
| 325 | // First, we signal our intent to exit. This ensures no new thread start |
| 326 | // waiting on events from us. |
| 327 | m_read_thread_did_exit = true; |
| 328 | |
| 329 | // Unblock any existing thread waiting for the synchronization signal. |
| 330 | BroadcastEvent(event_type: eBroadcastBitNoMorePendingInput); |
| 331 | |
| 332 | { |
| 333 | // Wait for the synchronization thread to finish... |
| 334 | std::lock_guard<std::mutex> guard(m_synchronize_mutex); |
| 335 | // ... and disconnect. |
| 336 | if (disconnect) |
| 337 | Disconnect(); |
| 338 | } |
| 339 | |
| 340 | // Finally, unblock any readers waiting for us to exit. |
| 341 | BroadcastEvent(event_type: eBroadcastBitReadThreadDidExit); |
| 342 | return {}; |
| 343 | } |
| 344 | |
| 345 | void ThreadedCommunication::SetReadThreadBytesReceivedCallback( |
| 346 | ReadThreadBytesReceived callback, void *callback_baton) { |
| 347 | m_callback = callback; |
| 348 | m_callback_baton = callback_baton; |
| 349 | } |
| 350 | |
| 351 | void ThreadedCommunication::SynchronizeWithReadThread() { |
| 352 | // Only one thread can do the synchronization dance at a time. |
| 353 | std::lock_guard<std::mutex> guard(m_synchronize_mutex); |
| 354 | |
| 355 | // First start listening for the synchronization event. |
| 356 | ListenerSP listener_sp(Listener::MakeListener( |
| 357 | name: "ThreadedCommunication::SyncronizeWithReadThread" )); |
| 358 | listener_sp->StartListeningForEvents(broadcaster: this, event_mask: eBroadcastBitNoMorePendingInput); |
| 359 | |
| 360 | // If the thread is not running, there is no point in synchronizing. |
| 361 | if (!m_read_thread_enabled || m_read_thread_did_exit) |
| 362 | return; |
| 363 | |
| 364 | // Notify the read thread. |
| 365 | m_connection_sp->InterruptRead(); |
| 366 | |
| 367 | // Wait for the synchronization event. |
| 368 | EventSP event_sp; |
| 369 | listener_sp->GetEvent(event_sp, timeout: std::nullopt); |
| 370 | } |
| 371 | |
| 372 | void ThreadedCommunication::SetConnection( |
| 373 | std::unique_ptr<Connection> connection) { |
| 374 | StopReadThread(error_ptr: nullptr); |
| 375 | Communication::SetConnection(std::move(connection)); |
| 376 | } |
| 377 | |