| 1 | //! Debug Logging |
| 2 | //! |
| 3 | //! To use in a debug build, set the env var `RAYON_LOG` as |
| 4 | //! described below. In a release build, logs are compiled out by |
| 5 | //! default unless Rayon is built with `--cfg rayon_rs_log` (try |
| 6 | //! `RUSTFLAGS="--cfg rayon_rs_log"`). |
| 7 | //! |
| 8 | //! Note that logs are an internally debugging tool and their format |
| 9 | //! is considered unstable, as are the details of how to enable them. |
| 10 | //! |
| 11 | //! # Valid values for RAYON_LOG |
| 12 | //! |
| 13 | //! The `RAYON_LOG` variable can take on the following values: |
| 14 | //! |
| 15 | //! * `tail:<file>` -- dumps the last 10,000 events into the given file; |
| 16 | //! useful for tracking down deadlocks |
| 17 | //! * `profile:<file>` -- dumps only those events needed to reconstruct how |
| 18 | //! many workers are active at a given time |
| 19 | //! * `all:<file>` -- dumps every event to the file; useful for debugging |
| 20 | |
| 21 | use crossbeam_channel::{self, Receiver, Sender}; |
| 22 | use std::collections::VecDeque; |
| 23 | use std::env; |
| 24 | use std::fs::File; |
| 25 | use std::io::{self, BufWriter, Write}; |
| 26 | |
| 27 | /// True if logs are compiled in. |
| 28 | pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions)); |
| 29 | |
| 30 | #[derive (Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] |
| 31 | pub(super) enum Event { |
| 32 | /// Flushes events to disk, used to terminate benchmarking. |
| 33 | Flush, |
| 34 | |
| 35 | /// Indicates that a worker thread started execution. |
| 36 | ThreadStart { |
| 37 | worker: usize, |
| 38 | terminate_addr: usize, |
| 39 | }, |
| 40 | |
| 41 | /// Indicates that a worker thread started execution. |
| 42 | ThreadTerminate { worker: usize }, |
| 43 | |
| 44 | /// Indicates that a worker thread became idle, blocked on `latch_addr`. |
| 45 | ThreadIdle { worker: usize, latch_addr: usize }, |
| 46 | |
| 47 | /// Indicates that an idle worker thread found work to do, after |
| 48 | /// yield rounds. It should no longer be considered idle. |
| 49 | ThreadFoundWork { worker: usize, yields: u32 }, |
| 50 | |
| 51 | /// Indicates that a worker blocked on a latch observed that it was set. |
| 52 | /// |
| 53 | /// Internal debugging event that does not affect the state |
| 54 | /// machine. |
| 55 | ThreadSawLatchSet { worker: usize, latch_addr: usize }, |
| 56 | |
| 57 | /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal |
| 58 | /// sleep state that we saw at the time. |
| 59 | ThreadSleepy { worker: usize, jobs_counter: usize }, |
| 60 | |
| 61 | /// Indicates that the thread's attempt to fall asleep was |
| 62 | /// interrupted because the latch was set. (This is not, in and of |
| 63 | /// itself, a change to the thread state.) |
| 64 | ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize }, |
| 65 | |
| 66 | /// Indicates that the thread's attempt to fall asleep was |
| 67 | /// interrupted because a job was posted. (This is not, in and of |
| 68 | /// itself, a change to the thread state.) |
| 69 | ThreadSleepInterruptedByJob { worker: usize }, |
| 70 | |
| 71 | /// Indicates that an idle worker has gone to sleep. |
| 72 | ThreadSleeping { worker: usize, latch_addr: usize }, |
| 73 | |
| 74 | /// Indicates that a sleeping worker has awoken. |
| 75 | ThreadAwoken { worker: usize, latch_addr: usize }, |
| 76 | |
| 77 | /// Indicates that the given worker thread was notified it should |
| 78 | /// awaken. |
| 79 | ThreadNotify { worker: usize }, |
| 80 | |
| 81 | /// The given worker has pushed a job to its local deque. |
| 82 | JobPushed { worker: usize }, |
| 83 | |
| 84 | /// The given worker has popped a job from its local deque. |
| 85 | JobPopped { worker: usize }, |
| 86 | |
| 87 | /// The given worker has stolen a job from the deque of another. |
| 88 | JobStolen { worker: usize, victim: usize }, |
| 89 | |
| 90 | /// N jobs were injected into the global queue. |
| 91 | JobsInjected { count: usize }, |
| 92 | |
| 93 | /// A job was removed from the global queue. |
| 94 | JobUninjected { worker: usize }, |
| 95 | |
| 96 | /// A job was broadcasted to N threads. |
| 97 | JobBroadcast { count: usize }, |
| 98 | |
| 99 | /// When announcing a job, this was the value of the counters we observed. |
| 100 | /// |
| 101 | /// No effect on thread state, just a debugging event. |
| 102 | JobThreadCounts { |
| 103 | worker: usize, |
| 104 | num_idle: u16, |
| 105 | num_sleepers: u16, |
| 106 | }, |
| 107 | } |
| 108 | |
| 109 | /// Handle to the logging thread, if any. You can use this to deliver |
| 110 | /// logs. You can also clone it freely. |
| 111 | #[derive (Clone)] |
| 112 | pub(super) struct Logger { |
| 113 | sender: Option<Sender<Event>>, |
| 114 | } |
| 115 | |
| 116 | impl Logger { |
| 117 | pub(super) fn new(num_workers: usize) -> Logger { |
| 118 | if !LOG_ENABLED { |
| 119 | return Self::disabled(); |
| 120 | } |
| 121 | |
| 122 | // see the doc comment for the format |
| 123 | let env_log = match env::var("RAYON_LOG" ) { |
| 124 | Ok(s) => s, |
| 125 | Err(_) => return Self::disabled(), |
| 126 | }; |
| 127 | |
| 128 | let (sender, receiver) = crossbeam_channel::unbounded(); |
| 129 | |
| 130 | if let Some(filename) = env_log.strip_prefix("tail:" ) { |
| 131 | let filename = filename.to_string(); |
| 132 | ::std::thread::spawn(move || { |
| 133 | Self::tail_logger_thread(num_workers, filename, 10_000, receiver) |
| 134 | }); |
| 135 | } else if env_log == "all" { |
| 136 | ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver)); |
| 137 | } else if let Some(filename) = env_log.strip_prefix("profile:" ) { |
| 138 | let filename = filename.to_string(); |
| 139 | ::std::thread::spawn(move || { |
| 140 | Self::profile_logger_thread(num_workers, filename, 10_000, receiver) |
| 141 | }); |
| 142 | } else { |
| 143 | panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'" ); |
| 144 | } |
| 145 | |
| 146 | Logger { |
| 147 | sender: Some(sender), |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | fn disabled() -> Logger { |
| 152 | Logger { sender: None } |
| 153 | } |
| 154 | |
| 155 | #[inline ] |
| 156 | pub(super) fn log(&self, event: impl FnOnce() -> Event) { |
| 157 | if !LOG_ENABLED { |
| 158 | return; |
| 159 | } |
| 160 | |
| 161 | if let Some(sender) = &self.sender { |
| 162 | sender.send(event()).unwrap(); |
| 163 | } |
| 164 | } |
| 165 | |
| 166 | fn profile_logger_thread( |
| 167 | num_workers: usize, |
| 168 | log_filename: String, |
| 169 | capacity: usize, |
| 170 | receiver: Receiver<Event>, |
| 171 | ) { |
| 172 | let file = File::create(&log_filename) |
| 173 | .unwrap_or_else(|err| panic!("failed to open ` {}`: {}" , log_filename, err)); |
| 174 | |
| 175 | let mut writer = BufWriter::new(file); |
| 176 | let mut events = Vec::with_capacity(capacity); |
| 177 | let mut state = SimulatorState::new(num_workers); |
| 178 | let timeout = std::time::Duration::from_secs(30); |
| 179 | |
| 180 | loop { |
| 181 | while let Ok(event) = receiver.recv_timeout(timeout) { |
| 182 | if let Event::Flush = event { |
| 183 | break; |
| 184 | } |
| 185 | |
| 186 | events.push(event); |
| 187 | if events.len() == capacity { |
| 188 | break; |
| 189 | } |
| 190 | } |
| 191 | |
| 192 | for event in events.drain(..) { |
| 193 | if state.simulate(&event) { |
| 194 | state.dump(&mut writer, &event).unwrap(); |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | writer.flush().unwrap(); |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | fn tail_logger_thread( |
| 203 | num_workers: usize, |
| 204 | log_filename: String, |
| 205 | capacity: usize, |
| 206 | receiver: Receiver<Event>, |
| 207 | ) { |
| 208 | let file = File::create(&log_filename) |
| 209 | .unwrap_or_else(|err| panic!("failed to open ` {}`: {}" , log_filename, err)); |
| 210 | |
| 211 | let mut writer = BufWriter::new(file); |
| 212 | let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity); |
| 213 | let mut state = SimulatorState::new(num_workers); |
| 214 | let timeout = std::time::Duration::from_secs(30); |
| 215 | let mut skipped = false; |
| 216 | |
| 217 | loop { |
| 218 | while let Ok(event) = receiver.recv_timeout(timeout) { |
| 219 | if let Event::Flush = event { |
| 220 | // We ignore Flush events in tail mode -- |
| 221 | // we're really just looking for |
| 222 | // deadlocks. |
| 223 | continue; |
| 224 | } else { |
| 225 | if events.len() == capacity { |
| 226 | let event = events.pop_front().unwrap(); |
| 227 | state.simulate(&event); |
| 228 | skipped = true; |
| 229 | } |
| 230 | |
| 231 | events.push_back(event); |
| 232 | } |
| 233 | } |
| 234 | |
| 235 | if skipped { |
| 236 | writeln!(writer, "..." ).unwrap(); |
| 237 | skipped = false; |
| 238 | } |
| 239 | |
| 240 | for event in events.drain(..) { |
| 241 | // In tail mode, we dump *all* events out, whether or |
| 242 | // not they were 'interesting' to the state machine. |
| 243 | state.simulate(&event); |
| 244 | state.dump(&mut writer, &event).unwrap(); |
| 245 | } |
| 246 | |
| 247 | writer.flush().unwrap(); |
| 248 | } |
| 249 | } |
| 250 | |
| 251 | fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) { |
| 252 | let stderr = std::io::stderr(); |
| 253 | let mut state = SimulatorState::new(num_workers); |
| 254 | |
| 255 | for event in receiver { |
| 256 | let mut writer = BufWriter::new(stderr.lock()); |
| 257 | state.simulate(&event); |
| 258 | state.dump(&mut writer, &event).unwrap(); |
| 259 | writer.flush().unwrap(); |
| 260 | } |
| 261 | } |
| 262 | } |
| 263 | |
| 264 | #[derive (Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] |
| 265 | enum State { |
| 266 | Working, |
| 267 | Idle, |
| 268 | Notified, |
| 269 | Sleeping, |
| 270 | Terminated, |
| 271 | } |
| 272 | |
| 273 | impl State { |
| 274 | fn letter(&self) -> char { |
| 275 | match self { |
| 276 | State::Working => 'W' , |
| 277 | State::Idle => 'I' , |
| 278 | State::Notified => 'N' , |
| 279 | State::Sleeping => 'S' , |
| 280 | State::Terminated => 'T' , |
| 281 | } |
| 282 | } |
| 283 | } |
| 284 | |
| 285 | struct SimulatorState { |
| 286 | local_queue_size: Vec<usize>, |
| 287 | thread_states: Vec<State>, |
| 288 | injector_size: usize, |
| 289 | } |
| 290 | |
| 291 | impl SimulatorState { |
| 292 | fn new(num_workers: usize) -> Self { |
| 293 | Self { |
| 294 | local_queue_size: (0..num_workers).map(|_| 0).collect(), |
| 295 | thread_states: (0..num_workers).map(|_| State::Working).collect(), |
| 296 | injector_size: 0, |
| 297 | } |
| 298 | } |
| 299 | |
| 300 | fn simulate(&mut self, event: &Event) -> bool { |
| 301 | match *event { |
| 302 | Event::ThreadIdle { worker, .. } => { |
| 303 | assert_eq!(self.thread_states[worker], State::Working); |
| 304 | self.thread_states[worker] = State::Idle; |
| 305 | true |
| 306 | } |
| 307 | |
| 308 | Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => { |
| 309 | self.thread_states[worker] = State::Working; |
| 310 | true |
| 311 | } |
| 312 | |
| 313 | Event::ThreadTerminate { worker, .. } => { |
| 314 | self.thread_states[worker] = State::Terminated; |
| 315 | true |
| 316 | } |
| 317 | |
| 318 | Event::ThreadSleeping { worker, .. } => { |
| 319 | assert_eq!(self.thread_states[worker], State::Idle); |
| 320 | self.thread_states[worker] = State::Sleeping; |
| 321 | true |
| 322 | } |
| 323 | |
| 324 | Event::ThreadAwoken { worker, .. } => { |
| 325 | assert_eq!(self.thread_states[worker], State::Notified); |
| 326 | self.thread_states[worker] = State::Idle; |
| 327 | true |
| 328 | } |
| 329 | |
| 330 | Event::JobPushed { worker } => { |
| 331 | self.local_queue_size[worker] += 1; |
| 332 | true |
| 333 | } |
| 334 | |
| 335 | Event::JobPopped { worker } => { |
| 336 | self.local_queue_size[worker] -= 1; |
| 337 | true |
| 338 | } |
| 339 | |
| 340 | Event::JobStolen { victim, .. } => { |
| 341 | self.local_queue_size[victim] -= 1; |
| 342 | true |
| 343 | } |
| 344 | |
| 345 | Event::JobsInjected { count } => { |
| 346 | self.injector_size += count; |
| 347 | true |
| 348 | } |
| 349 | |
| 350 | Event::JobUninjected { .. } => { |
| 351 | self.injector_size -= 1; |
| 352 | true |
| 353 | } |
| 354 | |
| 355 | Event::ThreadNotify { worker } => { |
| 356 | // Currently, this log event occurs while holding the |
| 357 | // thread lock, so we should *always* see it before |
| 358 | // the worker awakens. |
| 359 | assert_eq!(self.thread_states[worker], State::Sleeping); |
| 360 | self.thread_states[worker] = State::Notified; |
| 361 | true |
| 362 | } |
| 363 | |
| 364 | // remaining events are no-ops from pov of simulating the |
| 365 | // thread state |
| 366 | _ => false, |
| 367 | } |
| 368 | } |
| 369 | |
| 370 | fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> { |
| 371 | let num_idle_threads = self |
| 372 | .thread_states |
| 373 | .iter() |
| 374 | .filter(|s| **s == State::Idle) |
| 375 | .count(); |
| 376 | |
| 377 | let num_sleeping_threads = self |
| 378 | .thread_states |
| 379 | .iter() |
| 380 | .filter(|s| **s == State::Sleeping) |
| 381 | .count(); |
| 382 | |
| 383 | let num_notified_threads = self |
| 384 | .thread_states |
| 385 | .iter() |
| 386 | .filter(|s| **s == State::Notified) |
| 387 | .count(); |
| 388 | |
| 389 | let num_pending_jobs: usize = self.local_queue_size.iter().sum(); |
| 390 | |
| 391 | write!(w, " {:2}," , num_idle_threads)?; |
| 392 | write!(w, " {:2}," , num_sleeping_threads)?; |
| 393 | write!(w, " {:2}," , num_notified_threads)?; |
| 394 | write!(w, " {:4}," , num_pending_jobs)?; |
| 395 | write!(w, " {:4}," , self.injector_size)?; |
| 396 | |
| 397 | let event_str = format!(" {:?}" , event); |
| 398 | write!(w, r#"" {:60}","# , event_str)?; |
| 399 | |
| 400 | for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) { |
| 401 | write!(w, " T {:02}, {}" , i, state.letter(),)?; |
| 402 | |
| 403 | if *queue_size > 0 { |
| 404 | write!(w, ", {:03}," , queue_size)?; |
| 405 | } else { |
| 406 | write!(w, ", ," )?; |
| 407 | } |
| 408 | } |
| 409 | |
| 410 | writeln!(w)?; |
| 411 | Ok(()) |
| 412 | } |
| 413 | } |
| 414 | |