1#![warn(rust_2018_idioms)]
2#![cfg(all(feature = "full", tokio_unstable, not(target_os = "wasi")))]
3
4use std::future::Future;
5use std::sync::{Arc, Mutex};
6use std::task::Poll;
7use tokio::macros::support::poll_fn;
8
9use tokio::runtime::Runtime;
10use tokio::task::consume_budget;
11use tokio::time::{self, Duration};
12
13#[test]
14fn num_workers() {
15 let rt = current_thread();
16 assert_eq!(1, rt.metrics().num_workers());
17
18 let rt = threaded();
19 assert_eq!(2, rt.metrics().num_workers());
20}
21
22#[test]
23fn num_blocking_threads() {
24 let rt = current_thread();
25 assert_eq!(0, rt.metrics().num_blocking_threads());
26 let _ = rt.block_on(rt.spawn_blocking(move || {}));
27 assert_eq!(1, rt.metrics().num_blocking_threads());
28}
29
30#[test]
31fn num_idle_blocking_threads() {
32 let rt = current_thread();
33 assert_eq!(0, rt.metrics().num_idle_blocking_threads());
34 let _ = rt.block_on(rt.spawn_blocking(move || {}));
35 rt.block_on(async {
36 time::sleep(Duration::from_millis(5)).await;
37 });
38
39 // We need to wait until the blocking thread has become idle. Usually 5ms is
40 // enough for this to happen, but not always. When it isn't enough, sleep
41 // for another second. We don't always wait for a whole second since we want
42 // the test suite to finish quickly.
43 //
44 // Note that the timeout for idle threads to be killed is 10 seconds.
45 if 0 == rt.metrics().num_idle_blocking_threads() {
46 rt.block_on(async {
47 time::sleep(Duration::from_secs(1)).await;
48 });
49 }
50
51 assert_eq!(1, rt.metrics().num_idle_blocking_threads());
52}
53
54#[test]
55fn blocking_queue_depth() {
56 let rt = tokio::runtime::Builder::new_current_thread()
57 .enable_all()
58 .max_blocking_threads(1)
59 .build()
60 .unwrap();
61
62 assert_eq!(0, rt.metrics().blocking_queue_depth());
63
64 let ready = Arc::new(Mutex::new(()));
65 let guard = ready.lock().unwrap();
66
67 let ready_cloned = ready.clone();
68 let wait_until_ready = move || {
69 let _unused = ready_cloned.lock().unwrap();
70 };
71
72 let h1 = rt.spawn_blocking(wait_until_ready.clone());
73 let h2 = rt.spawn_blocking(wait_until_ready);
74 assert!(rt.metrics().blocking_queue_depth() > 0);
75
76 drop(guard);
77
78 let _ = rt.block_on(h1);
79 let _ = rt.block_on(h2);
80
81 assert_eq!(0, rt.metrics().blocking_queue_depth());
82}
83
84#[test]
85fn active_tasks_count() {
86 let rt = current_thread();
87 let metrics = rt.metrics();
88 assert_eq!(0, metrics.active_tasks_count());
89 rt.spawn(async move {
90 assert_eq!(1, metrics.active_tasks_count());
91 });
92
93 let rt = threaded();
94 let metrics = rt.metrics();
95 assert_eq!(0, metrics.active_tasks_count());
96 rt.spawn(async move {
97 assert_eq!(1, metrics.active_tasks_count());
98 });
99}
100
101#[test]
102fn remote_schedule_count() {
103 use std::thread;
104
105 let rt = current_thread();
106 let handle = rt.handle().clone();
107 let task = thread::spawn(move || {
108 handle.spawn(async {
109 // DO nothing
110 })
111 })
112 .join()
113 .unwrap();
114
115 rt.block_on(task).unwrap();
116
117 assert_eq!(1, rt.metrics().remote_schedule_count());
118
119 let rt = threaded();
120 let handle = rt.handle().clone();
121 let task = thread::spawn(move || {
122 handle.spawn(async {
123 // DO nothing
124 })
125 })
126 .join()
127 .unwrap();
128
129 rt.block_on(task).unwrap();
130
131 assert_eq!(1, rt.metrics().remote_schedule_count());
132}
133
134#[test]
135fn worker_park_count() {
136 let rt = current_thread();
137 let metrics = rt.metrics();
138 rt.block_on(async {
139 time::sleep(Duration::from_millis(1)).await;
140 });
141 drop(rt);
142 assert!(1 <= metrics.worker_park_count(0));
143
144 let rt = threaded();
145 let metrics = rt.metrics();
146 rt.block_on(async {
147 time::sleep(Duration::from_millis(1)).await;
148 });
149 drop(rt);
150 assert!(1 <= metrics.worker_park_count(0));
151 assert!(1 <= metrics.worker_park_count(1));
152}
153
154#[test]
155fn worker_noop_count() {
156 // There isn't really a great way to generate no-op parks as they happen as
157 // false-positive events under concurrency.
158
159 let rt = current_thread();
160 let metrics = rt.metrics();
161 rt.block_on(async {
162 time::sleep(Duration::from_millis(1)).await;
163 });
164 drop(rt);
165 assert!(0 < metrics.worker_noop_count(0));
166
167 let rt = threaded();
168 let metrics = rt.metrics();
169 rt.block_on(async {
170 time::sleep(Duration::from_millis(1)).await;
171 });
172 drop(rt);
173 assert!(0 < metrics.worker_noop_count(0));
174 assert!(0 < metrics.worker_noop_count(1));
175}
176
177#[test]
178fn worker_steal_count() {
179 // This metric only applies to the multi-threaded runtime.
180 //
181 // We use a blocking channel to backup one worker thread.
182 use std::sync::mpsc::channel;
183
184 let rt = threaded();
185 let metrics = rt.metrics();
186
187 rt.block_on(async {
188 let (tx, rx) = channel();
189
190 // Move to the runtime.
191 tokio::spawn(async move {
192 // Spawn the task that sends to the channel
193 tokio::spawn(async move {
194 tx.send(()).unwrap();
195 });
196
197 // Spawn a task that bumps the previous task out of the "next
198 // scheduled" slot.
199 tokio::spawn(async {});
200
201 // Blocking receive on the channel.
202 rx.recv().unwrap();
203 })
204 .await
205 .unwrap();
206 });
207
208 drop(rt);
209
210 let n: u64 = (0..metrics.num_workers())
211 .map(|i| metrics.worker_steal_count(i))
212 .sum();
213
214 assert_eq!(1, n);
215}
216
217#[test]
218fn worker_poll_count_and_time() {
219 const N: u64 = 5;
220
221 async fn task() {
222 // Sync sleep
223 std::thread::sleep(std::time::Duration::from_micros(10));
224 }
225
226 let rt = current_thread();
227 let metrics = rt.metrics();
228 rt.block_on(async {
229 for _ in 0..N {
230 tokio::spawn(task()).await.unwrap();
231 }
232 });
233 drop(rt);
234 assert_eq!(N, metrics.worker_poll_count(0));
235 // Not currently supported for current-thread runtime
236 assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0));
237
238 // Does not populate the histogram
239 assert!(!metrics.poll_count_histogram_enabled());
240 for i in 0..10 {
241 assert_eq!(0, metrics.poll_count_histogram_bucket_count(0, i));
242 }
243
244 let rt = threaded();
245 let metrics = rt.metrics();
246 rt.block_on(async {
247 for _ in 0..N {
248 tokio::spawn(task()).await.unwrap();
249 }
250 });
251 drop(rt);
252 // Account for the `block_on` task
253 let n = (0..metrics.num_workers())
254 .map(|i| metrics.worker_poll_count(i))
255 .sum();
256
257 assert_eq!(N, n);
258
259 let n: Duration = (0..metrics.num_workers())
260 .map(|i| metrics.worker_mean_poll_time(i))
261 .sum();
262
263 assert!(n > Duration::default());
264
265 // Does not populate the histogram
266 assert!(!metrics.poll_count_histogram_enabled());
267 for n in 0..metrics.num_workers() {
268 for i in 0..10 {
269 assert_eq!(0, metrics.poll_count_histogram_bucket_count(n, i));
270 }
271 }
272}
273
274#[test]
275fn worker_poll_count_histogram() {
276 const N: u64 = 5;
277
278 let rts = [
279 tokio::runtime::Builder::new_current_thread()
280 .enable_all()
281 .enable_metrics_poll_count_histogram()
282 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
283 .metrics_poll_count_histogram_buckets(3)
284 .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
285 .build()
286 .unwrap(),
287 tokio::runtime::Builder::new_multi_thread()
288 .worker_threads(2)
289 .enable_all()
290 .enable_metrics_poll_count_histogram()
291 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
292 .metrics_poll_count_histogram_buckets(3)
293 .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
294 .build()
295 .unwrap(),
296 ];
297
298 for rt in rts {
299 let metrics = rt.metrics();
300 rt.block_on(async {
301 for _ in 0..N {
302 tokio::spawn(async {}).await.unwrap();
303 }
304 });
305 drop(rt);
306
307 let num_workers = metrics.num_workers();
308 let num_buckets = metrics.poll_count_histogram_num_buckets();
309
310 assert!(metrics.poll_count_histogram_enabled());
311 assert_eq!(num_buckets, 3);
312
313 let n = (0..num_workers)
314 .flat_map(|i| (0..num_buckets).map(move |j| (i, j)))
315 .map(|(worker, bucket)| metrics.poll_count_histogram_bucket_count(worker, bucket))
316 .sum();
317 assert_eq!(N, n);
318 }
319}
320
321#[test]
322fn worker_poll_count_histogram_range() {
323 let max = Duration::from_nanos(u64::MAX);
324
325 let rt = tokio::runtime::Builder::new_current_thread()
326 .enable_all()
327 .enable_metrics_poll_count_histogram()
328 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
329 .metrics_poll_count_histogram_buckets(3)
330 .metrics_poll_count_histogram_resolution(us(50))
331 .build()
332 .unwrap();
333 let metrics = rt.metrics();
334
335 assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..us(50));
336 assert_eq!(
337 metrics.poll_count_histogram_bucket_range(1),
338 us(50)..us(100)
339 );
340 assert_eq!(metrics.poll_count_histogram_bucket_range(2), us(100)..max);
341
342 let rt = tokio::runtime::Builder::new_current_thread()
343 .enable_all()
344 .enable_metrics_poll_count_histogram()
345 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Log)
346 .metrics_poll_count_histogram_buckets(3)
347 .metrics_poll_count_histogram_resolution(us(50))
348 .build()
349 .unwrap();
350 let metrics = rt.metrics();
351
352 let a = Duration::from_nanos(50000_u64.next_power_of_two());
353 let b = a * 2;
354
355 assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..a);
356 assert_eq!(metrics.poll_count_histogram_bucket_range(1), a..b);
357 assert_eq!(metrics.poll_count_histogram_bucket_range(2), b..max);
358}
359
360#[test]
361fn worker_poll_count_histogram_disabled_without_explicit_enable() {
362 let rts = [
363 tokio::runtime::Builder::new_current_thread()
364 .enable_all()
365 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
366 .metrics_poll_count_histogram_buckets(3)
367 .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
368 .build()
369 .unwrap(),
370 tokio::runtime::Builder::new_multi_thread()
371 .worker_threads(2)
372 .enable_all()
373 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
374 .metrics_poll_count_histogram_buckets(3)
375 .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
376 .build()
377 .unwrap(),
378 ];
379
380 for rt in rts {
381 let metrics = rt.metrics();
382 assert!(!metrics.poll_count_histogram_enabled());
383 }
384}
385
386#[test]
387fn worker_total_busy_duration() {
388 const N: usize = 5;
389
390 let zero = Duration::from_millis(0);
391
392 let rt = current_thread();
393 let metrics = rt.metrics();
394
395 rt.block_on(async {
396 for _ in 0..N {
397 tokio::spawn(async {
398 tokio::task::yield_now().await;
399 })
400 .await
401 .unwrap();
402 }
403 });
404
405 drop(rt);
406
407 assert!(zero < metrics.worker_total_busy_duration(0));
408
409 let rt = threaded();
410 let metrics = rt.metrics();
411
412 rt.block_on(async {
413 for _ in 0..N {
414 tokio::spawn(async {
415 tokio::task::yield_now().await;
416 })
417 .await
418 .unwrap();
419 }
420 });
421
422 drop(rt);
423
424 for i in 0..metrics.num_workers() {
425 assert!(zero < metrics.worker_total_busy_duration(i));
426 }
427}
428
429#[test]
430fn worker_local_schedule_count() {
431 let rt = current_thread();
432 let metrics = rt.metrics();
433 rt.block_on(async {
434 tokio::spawn(async {}).await.unwrap();
435 });
436 drop(rt);
437
438 assert_eq!(1, metrics.worker_local_schedule_count(0));
439 assert_eq!(0, metrics.remote_schedule_count());
440
441 let rt = threaded();
442 let metrics = rt.metrics();
443 rt.block_on(async {
444 // Move to the runtime
445 tokio::spawn(async {
446 tokio::spawn(async {}).await.unwrap();
447 })
448 .await
449 .unwrap();
450 });
451 drop(rt);
452
453 let n: u64 = (0..metrics.num_workers())
454 .map(|i| metrics.worker_local_schedule_count(i))
455 .sum();
456
457 assert_eq!(2, n);
458 assert_eq!(1, metrics.remote_schedule_count());
459}
460
461#[test]
462fn worker_overflow_count() {
463 // Only applies to the threaded worker
464 let rt = threaded();
465 let metrics = rt.metrics();
466 rt.block_on(async {
467 // Move to the runtime
468 tokio::spawn(async {
469 let (tx1, rx1) = std::sync::mpsc::channel();
470 let (tx2, rx2) = std::sync::mpsc::channel();
471
472 // First, we need to block the other worker until all tasks have
473 // been spawned.
474 //
475 // We spawn from outside the runtime to ensure that the other worker
476 // will pick it up:
477 // <https://github.com/tokio-rs/tokio/issues/4730>
478 tokio::task::spawn_blocking(|| {
479 tokio::spawn(async move {
480 tx1.send(()).unwrap();
481 rx2.recv().unwrap();
482 });
483 });
484
485 rx1.recv().unwrap();
486
487 // Spawn many tasks
488 for _ in 0..300 {
489 tokio::spawn(async {});
490 }
491
492 tx2.send(()).unwrap();
493 })
494 .await
495 .unwrap();
496 });
497 drop(rt);
498
499 let n: u64 = (0..metrics.num_workers())
500 .map(|i| metrics.worker_overflow_count(i))
501 .sum();
502
503 assert_eq!(1, n);
504}
505
506#[test]
507fn injection_queue_depth() {
508 use std::thread;
509
510 let rt = current_thread();
511 let handle = rt.handle().clone();
512 let metrics = rt.metrics();
513
514 thread::spawn(move || {
515 handle.spawn(async {});
516 })
517 .join()
518 .unwrap();
519
520 assert_eq!(1, metrics.injection_queue_depth());
521
522 let rt = threaded();
523 let handle = rt.handle().clone();
524 let metrics = rt.metrics();
525
526 // First we need to block the runtime workers
527 let (tx1, rx1) = std::sync::mpsc::channel();
528 let (tx2, rx2) = std::sync::mpsc::channel();
529 let (tx3, rx3) = std::sync::mpsc::channel();
530 let rx3 = Arc::new(Mutex::new(rx3));
531
532 rt.spawn(async move { rx1.recv().unwrap() });
533 rt.spawn(async move { rx2.recv().unwrap() });
534
535 // Spawn some more to make sure there are items
536 for _ in 0..10 {
537 let rx = rx3.clone();
538 rt.spawn(async move {
539 rx.lock().unwrap().recv().unwrap();
540 });
541 }
542
543 thread::spawn(move || {
544 handle.spawn(async {});
545 })
546 .join()
547 .unwrap();
548
549 let n = metrics.injection_queue_depth();
550 assert!(1 <= n, "{}", n);
551 assert!(15 >= n, "{}", n);
552
553 for _ in 0..10 {
554 tx3.send(()).unwrap();
555 }
556
557 tx1.send(()).unwrap();
558 tx2.send(()).unwrap();
559}
560
561#[test]
562fn worker_local_queue_depth() {
563 const N: usize = 100;
564
565 let rt = current_thread();
566 let metrics = rt.metrics();
567 rt.block_on(async {
568 for _ in 0..N {
569 tokio::spawn(async {});
570 }
571
572 assert_eq!(N, metrics.worker_local_queue_depth(0));
573 });
574
575 let rt = threaded();
576 let metrics = rt.metrics();
577 rt.block_on(async move {
578 // Move to the runtime
579 tokio::spawn(async move {
580 let (tx1, rx1) = std::sync::mpsc::channel();
581 let (tx2, rx2) = std::sync::mpsc::channel();
582
583 // First, we need to block the other worker until all tasks have
584 // been spawned.
585 tokio::spawn(async move {
586 tx1.send(()).unwrap();
587 rx2.recv().unwrap();
588 });
589
590 // Bump the next-run spawn
591 tokio::spawn(async {});
592
593 rx1.recv().unwrap();
594
595 // Spawn some tasks
596 for _ in 0..100 {
597 tokio::spawn(async {});
598 }
599
600 let n: usize = (0..metrics.num_workers())
601 .map(|i| metrics.worker_local_queue_depth(i))
602 .sum();
603
604 assert_eq!(n, N);
605
606 tx2.send(()).unwrap();
607 })
608 .await
609 .unwrap();
610 });
611}
612
613#[test]
614fn budget_exhaustion_yield() {
615 let rt = current_thread();
616 let metrics = rt.metrics();
617
618 assert_eq!(0, metrics.budget_forced_yield_count());
619
620 let mut did_yield = false;
621
622 // block on a task which consumes budget until it yields
623 rt.block_on(poll_fn(|cx| loop {
624 if did_yield {
625 return Poll::Ready(());
626 }
627
628 let fut = consume_budget();
629 tokio::pin!(fut);
630
631 if fut.poll(cx).is_pending() {
632 did_yield = true;
633 return Poll::Pending;
634 }
635 }));
636
637 assert_eq!(1, rt.metrics().budget_forced_yield_count());
638}
639
640#[test]
641fn budget_exhaustion_yield_with_joins() {
642 let rt = current_thread();
643 let metrics = rt.metrics();
644
645 assert_eq!(0, metrics.budget_forced_yield_count());
646
647 let mut did_yield_1 = false;
648 let mut did_yield_2 = false;
649
650 // block on a task which consumes budget until it yields
651 rt.block_on(async {
652 tokio::join!(
653 poll_fn(|cx| loop {
654 if did_yield_1 {
655 return Poll::Ready(());
656 }
657
658 let fut = consume_budget();
659 tokio::pin!(fut);
660
661 if fut.poll(cx).is_pending() {
662 did_yield_1 = true;
663 return Poll::Pending;
664 }
665 }),
666 poll_fn(|cx| loop {
667 if did_yield_2 {
668 return Poll::Ready(());
669 }
670
671 let fut = consume_budget();
672 tokio::pin!(fut);
673
674 if fut.poll(cx).is_pending() {
675 did_yield_2 = true;
676 return Poll::Pending;
677 }
678 })
679 )
680 });
681
682 assert_eq!(1, rt.metrics().budget_forced_yield_count());
683}
684
685#[cfg(any(target_os = "linux", target_os = "macos"))]
686#[test]
687fn io_driver_fd_count() {
688 let rt = current_thread();
689 let metrics = rt.metrics();
690
691 assert_eq!(metrics.io_driver_fd_registered_count(), 0);
692
693 let stream = tokio::net::TcpStream::connect("google.com:80");
694 let stream = rt.block_on(async move { stream.await.unwrap() });
695
696 assert_eq!(metrics.io_driver_fd_registered_count(), 1);
697 assert_eq!(metrics.io_driver_fd_deregistered_count(), 0);
698
699 drop(stream);
700
701 assert_eq!(metrics.io_driver_fd_deregistered_count(), 1);
702 assert_eq!(metrics.io_driver_fd_registered_count(), 1);
703}
704
705#[cfg(any(target_os = "linux", target_os = "macos"))]
706#[test]
707fn io_driver_ready_count() {
708 let rt = current_thread();
709 let metrics = rt.metrics();
710
711 let stream = tokio::net::TcpStream::connect("google.com:80");
712 let _stream = rt.block_on(async move { stream.await.unwrap() });
713
714 assert_eq!(metrics.io_driver_ready_count(), 1);
715}
716
717fn current_thread() -> Runtime {
718 tokio::runtime::Builder::new_current_thread()
719 .enable_all()
720 .build()
721 .unwrap()
722}
723
724fn threaded() -> Runtime {
725 tokio::runtime::Builder::new_multi_thread()
726 .worker_threads(2)
727 .enable_all()
728 .build()
729 .unwrap()
730}
731
732fn us(n: u64) -> Duration {
733 Duration::from_micros(n)
734}
735