1//! Debug Logging
2//!
3//! To use in a debug build, set the env var `RAYON_LOG` as
4//! described below. In a release build, logs are compiled out by
5//! default unless Rayon is built with `--cfg rayon_rs_log` (try
6//! `RUSTFLAGS="--cfg rayon_rs_log"`).
7//!
8//! Note that logs are an internally debugging tool and their format
9//! is considered unstable, as are the details of how to enable them.
10//!
11//! # Valid values for RAYON_LOG
12//!
13//! The `RAYON_LOG` variable can take on the following values:
14//!
15//! * `tail:<file>` -- dumps the last 10,000 events into the given file;
16//! useful for tracking down deadlocks
17//! * `profile:<file>` -- dumps only those events needed to reconstruct how
18//! many workers are active at a given time
19//! * `all:<file>` -- dumps every event to the file; useful for debugging
20
21use crossbeam_channel::{self, Receiver, Sender};
22use std::collections::VecDeque;
23use std::env;
24use std::fs::File;
25use std::io::{self, BufWriter, Write};
26
27/// True if logs are compiled in.
28pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions));
29
30#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
31pub(super) enum Event {
32 /// Flushes events to disk, used to terminate benchmarking.
33 Flush,
34
35 /// Indicates that a worker thread started execution.
36 ThreadStart {
37 worker: usize,
38 terminate_addr: usize,
39 },
40
41 /// Indicates that a worker thread started execution.
42 ThreadTerminate { worker: usize },
43
44 /// Indicates that a worker thread became idle, blocked on `latch_addr`.
45 ThreadIdle { worker: usize, latch_addr: usize },
46
47 /// Indicates that an idle worker thread found work to do, after
48 /// yield rounds. It should no longer be considered idle.
49 ThreadFoundWork { worker: usize, yields: u32 },
50
51 /// Indicates that a worker blocked on a latch observed that it was set.
52 ///
53 /// Internal debugging event that does not affect the state
54 /// machine.
55 ThreadSawLatchSet { worker: usize, latch_addr: usize },
56
57 /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal
58 /// sleep state that we saw at the time.
59 ThreadSleepy { worker: usize, jobs_counter: usize },
60
61 /// Indicates that the thread's attempt to fall asleep was
62 /// interrupted because the latch was set. (This is not, in and of
63 /// itself, a change to the thread state.)
64 ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize },
65
66 /// Indicates that the thread's attempt to fall asleep was
67 /// interrupted because a job was posted. (This is not, in and of
68 /// itself, a change to the thread state.)
69 ThreadSleepInterruptedByJob { worker: usize },
70
71 /// Indicates that an idle worker has gone to sleep.
72 ThreadSleeping { worker: usize, latch_addr: usize },
73
74 /// Indicates that a sleeping worker has awoken.
75 ThreadAwoken { worker: usize, latch_addr: usize },
76
77 /// Indicates that the given worker thread was notified it should
78 /// awaken.
79 ThreadNotify { worker: usize },
80
81 /// The given worker has pushed a job to its local deque.
82 JobPushed { worker: usize },
83
84 /// The given worker has popped a job from its local deque.
85 JobPopped { worker: usize },
86
87 /// The given worker has stolen a job from the deque of another.
88 JobStolen { worker: usize, victim: usize },
89
90 /// N jobs were injected into the global queue.
91 JobsInjected { count: usize },
92
93 /// A job was removed from the global queue.
94 JobUninjected { worker: usize },
95
96 /// A job was broadcasted to N threads.
97 JobBroadcast { count: usize },
98
99 /// When announcing a job, this was the value of the counters we observed.
100 ///
101 /// No effect on thread state, just a debugging event.
102 JobThreadCounts {
103 worker: usize,
104 num_idle: u16,
105 num_sleepers: u16,
106 },
107}
108
109/// Handle to the logging thread, if any. You can use this to deliver
110/// logs. You can also clone it freely.
111#[derive(Clone)]
112pub(super) struct Logger {
113 sender: Option<Sender<Event>>,
114}
115
116impl Logger {
117 pub(super) fn new(num_workers: usize) -> Logger {
118 if !LOG_ENABLED {
119 return Self::disabled();
120 }
121
122 // see the doc comment for the format
123 let env_log = match env::var("RAYON_LOG") {
124 Ok(s) => s,
125 Err(_) => return Self::disabled(),
126 };
127
128 let (sender, receiver) = crossbeam_channel::unbounded();
129
130 if let Some(filename) = env_log.strip_prefix("tail:") {
131 let filename = filename.to_string();
132 ::std::thread::spawn(move || {
133 Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
134 });
135 } else if env_log == "all" {
136 ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
137 } else if let Some(filename) = env_log.strip_prefix("profile:") {
138 let filename = filename.to_string();
139 ::std::thread::spawn(move || {
140 Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
141 });
142 } else {
143 panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
144 }
145
146 Logger {
147 sender: Some(sender),
148 }
149 }
150
151 fn disabled() -> Logger {
152 Logger { sender: None }
153 }
154
155 #[inline]
156 pub(super) fn log(&self, event: impl FnOnce() -> Event) {
157 if !LOG_ENABLED {
158 return;
159 }
160
161 if let Some(sender) = &self.sender {
162 sender.send(event()).unwrap();
163 }
164 }
165
166 fn profile_logger_thread(
167 num_workers: usize,
168 log_filename: String,
169 capacity: usize,
170 receiver: Receiver<Event>,
171 ) {
172 let file = File::create(&log_filename)
173 .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
174
175 let mut writer = BufWriter::new(file);
176 let mut events = Vec::with_capacity(capacity);
177 let mut state = SimulatorState::new(num_workers);
178 let timeout = std::time::Duration::from_secs(30);
179
180 loop {
181 while let Ok(event) = receiver.recv_timeout(timeout) {
182 if let Event::Flush = event {
183 break;
184 }
185
186 events.push(event);
187 if events.len() == capacity {
188 break;
189 }
190 }
191
192 for event in events.drain(..) {
193 if state.simulate(&event) {
194 state.dump(&mut writer, &event).unwrap();
195 }
196 }
197
198 writer.flush().unwrap();
199 }
200 }
201
202 fn tail_logger_thread(
203 num_workers: usize,
204 log_filename: String,
205 capacity: usize,
206 receiver: Receiver<Event>,
207 ) {
208 let file = File::create(&log_filename)
209 .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
210
211 let mut writer = BufWriter::new(file);
212 let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity);
213 let mut state = SimulatorState::new(num_workers);
214 let timeout = std::time::Duration::from_secs(30);
215 let mut skipped = false;
216
217 loop {
218 while let Ok(event) = receiver.recv_timeout(timeout) {
219 if let Event::Flush = event {
220 // We ignore Flush events in tail mode --
221 // we're really just looking for
222 // deadlocks.
223 continue;
224 } else {
225 if events.len() == capacity {
226 let event = events.pop_front().unwrap();
227 state.simulate(&event);
228 skipped = true;
229 }
230
231 events.push_back(event);
232 }
233 }
234
235 if skipped {
236 writeln!(writer, "...").unwrap();
237 skipped = false;
238 }
239
240 for event in events.drain(..) {
241 // In tail mode, we dump *all* events out, whether or
242 // not they were 'interesting' to the state machine.
243 state.simulate(&event);
244 state.dump(&mut writer, &event).unwrap();
245 }
246
247 writer.flush().unwrap();
248 }
249 }
250
251 fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) {
252 let stderr = std::io::stderr();
253 let mut state = SimulatorState::new(num_workers);
254
255 for event in receiver {
256 let mut writer = BufWriter::new(stderr.lock());
257 state.simulate(&event);
258 state.dump(&mut writer, &event).unwrap();
259 writer.flush().unwrap();
260 }
261 }
262}
263
264#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
265enum State {
266 Working,
267 Idle,
268 Notified,
269 Sleeping,
270 Terminated,
271}
272
273impl State {
274 fn letter(&self) -> char {
275 match self {
276 State::Working => 'W',
277 State::Idle => 'I',
278 State::Notified => 'N',
279 State::Sleeping => 'S',
280 State::Terminated => 'T',
281 }
282 }
283}
284
285struct SimulatorState {
286 local_queue_size: Vec<usize>,
287 thread_states: Vec<State>,
288 injector_size: usize,
289}
290
291impl SimulatorState {
292 fn new(num_workers: usize) -> Self {
293 Self {
294 local_queue_size: (0..num_workers).map(|_| 0).collect(),
295 thread_states: (0..num_workers).map(|_| State::Working).collect(),
296 injector_size: 0,
297 }
298 }
299
300 fn simulate(&mut self, event: &Event) -> bool {
301 match *event {
302 Event::ThreadIdle { worker, .. } => {
303 assert_eq!(self.thread_states[worker], State::Working);
304 self.thread_states[worker] = State::Idle;
305 true
306 }
307
308 Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => {
309 self.thread_states[worker] = State::Working;
310 true
311 }
312
313 Event::ThreadTerminate { worker, .. } => {
314 self.thread_states[worker] = State::Terminated;
315 true
316 }
317
318 Event::ThreadSleeping { worker, .. } => {
319 assert_eq!(self.thread_states[worker], State::Idle);
320 self.thread_states[worker] = State::Sleeping;
321 true
322 }
323
324 Event::ThreadAwoken { worker, .. } => {
325 assert_eq!(self.thread_states[worker], State::Notified);
326 self.thread_states[worker] = State::Idle;
327 true
328 }
329
330 Event::JobPushed { worker } => {
331 self.local_queue_size[worker] += 1;
332 true
333 }
334
335 Event::JobPopped { worker } => {
336 self.local_queue_size[worker] -= 1;
337 true
338 }
339
340 Event::JobStolen { victim, .. } => {
341 self.local_queue_size[victim] -= 1;
342 true
343 }
344
345 Event::JobsInjected { count } => {
346 self.injector_size += count;
347 true
348 }
349
350 Event::JobUninjected { .. } => {
351 self.injector_size -= 1;
352 true
353 }
354
355 Event::ThreadNotify { worker } => {
356 // Currently, this log event occurs while holding the
357 // thread lock, so we should *always* see it before
358 // the worker awakens.
359 assert_eq!(self.thread_states[worker], State::Sleeping);
360 self.thread_states[worker] = State::Notified;
361 true
362 }
363
364 // remaining events are no-ops from pov of simulating the
365 // thread state
366 _ => false,
367 }
368 }
369
370 fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> {
371 let num_idle_threads = self
372 .thread_states
373 .iter()
374 .filter(|s| **s == State::Idle)
375 .count();
376
377 let num_sleeping_threads = self
378 .thread_states
379 .iter()
380 .filter(|s| **s == State::Sleeping)
381 .count();
382
383 let num_notified_threads = self
384 .thread_states
385 .iter()
386 .filter(|s| **s == State::Notified)
387 .count();
388
389 let num_pending_jobs: usize = self.local_queue_size.iter().sum();
390
391 write!(w, "{:2},", num_idle_threads)?;
392 write!(w, "{:2},", num_sleeping_threads)?;
393 write!(w, "{:2},", num_notified_threads)?;
394 write!(w, "{:4},", num_pending_jobs)?;
395 write!(w, "{:4},", self.injector_size)?;
396
397 let event_str = format!("{:?}", event);
398 write!(w, r#""{:60}","#, event_str)?;
399
400 for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) {
401 write!(w, " T{:02},{}", i, state.letter(),)?;
402
403 if *queue_size > 0 {
404 write!(w, ",{:03},", queue_size)?;
405 } else {
406 write!(w, ", ,")?;
407 }
408 }
409
410 writeln!(w)?;
411 Ok(())
412 }
413}
414