1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (all(feature = "full" , tokio_unstable, not(target_os = "wasi" )))] |
3 | |
4 | use std::future::Future; |
5 | use std::sync::{Arc, Mutex}; |
6 | use std::task::Poll; |
7 | use tokio::macros::support::poll_fn; |
8 | |
9 | use tokio::runtime::Runtime; |
10 | use tokio::task::consume_budget; |
11 | use tokio::time::{self, Duration}; |
12 | |
13 | #[test] |
14 | fn num_workers() { |
15 | let rt = current_thread(); |
16 | assert_eq!(1, rt.metrics().num_workers()); |
17 | |
18 | let rt = threaded(); |
19 | assert_eq!(2, rt.metrics().num_workers()); |
20 | } |
21 | |
22 | #[test] |
23 | fn num_blocking_threads() { |
24 | let rt = current_thread(); |
25 | assert_eq!(0, rt.metrics().num_blocking_threads()); |
26 | let _ = rt.block_on(rt.spawn_blocking(move || {})); |
27 | assert_eq!(1, rt.metrics().num_blocking_threads()); |
28 | } |
29 | |
30 | #[test] |
31 | fn num_idle_blocking_threads() { |
32 | let rt = current_thread(); |
33 | assert_eq!(0, rt.metrics().num_idle_blocking_threads()); |
34 | let _ = rt.block_on(rt.spawn_blocking(move || {})); |
35 | rt.block_on(async { |
36 | time::sleep(Duration::from_millis(5)).await; |
37 | }); |
38 | |
39 | // We need to wait until the blocking thread has become idle. Usually 5ms is |
40 | // enough for this to happen, but not always. When it isn't enough, sleep |
41 | // for another second. We don't always wait for a whole second since we want |
42 | // the test suite to finish quickly. |
43 | // |
44 | // Note that the timeout for idle threads to be killed is 10 seconds. |
45 | if 0 == rt.metrics().num_idle_blocking_threads() { |
46 | rt.block_on(async { |
47 | time::sleep(Duration::from_secs(1)).await; |
48 | }); |
49 | } |
50 | |
51 | assert_eq!(1, rt.metrics().num_idle_blocking_threads()); |
52 | } |
53 | |
54 | #[test] |
55 | fn blocking_queue_depth() { |
56 | let rt = tokio::runtime::Builder::new_current_thread() |
57 | .enable_all() |
58 | .max_blocking_threads(1) |
59 | .build() |
60 | .unwrap(); |
61 | |
62 | assert_eq!(0, rt.metrics().blocking_queue_depth()); |
63 | |
64 | let ready = Arc::new(Mutex::new(())); |
65 | let guard = ready.lock().unwrap(); |
66 | |
67 | let ready_cloned = ready.clone(); |
68 | let wait_until_ready = move || { |
69 | let _unused = ready_cloned.lock().unwrap(); |
70 | }; |
71 | |
72 | let h1 = rt.spawn_blocking(wait_until_ready.clone()); |
73 | let h2 = rt.spawn_blocking(wait_until_ready); |
74 | assert!(rt.metrics().blocking_queue_depth() > 0); |
75 | |
76 | drop(guard); |
77 | |
78 | let _ = rt.block_on(h1); |
79 | let _ = rt.block_on(h2); |
80 | |
81 | assert_eq!(0, rt.metrics().blocking_queue_depth()); |
82 | } |
83 | |
84 | #[test] |
85 | fn active_tasks_count() { |
86 | let rt = current_thread(); |
87 | let metrics = rt.metrics(); |
88 | assert_eq!(0, metrics.active_tasks_count()); |
89 | rt.spawn(async move { |
90 | assert_eq!(1, metrics.active_tasks_count()); |
91 | }); |
92 | |
93 | let rt = threaded(); |
94 | let metrics = rt.metrics(); |
95 | assert_eq!(0, metrics.active_tasks_count()); |
96 | rt.spawn(async move { |
97 | assert_eq!(1, metrics.active_tasks_count()); |
98 | }); |
99 | } |
100 | |
101 | #[test] |
102 | fn remote_schedule_count() { |
103 | use std::thread; |
104 | |
105 | let rt = current_thread(); |
106 | let handle = rt.handle().clone(); |
107 | let task = thread::spawn(move || { |
108 | handle.spawn(async { |
109 | // DO nothing |
110 | }) |
111 | }) |
112 | .join() |
113 | .unwrap(); |
114 | |
115 | rt.block_on(task).unwrap(); |
116 | |
117 | assert_eq!(1, rt.metrics().remote_schedule_count()); |
118 | |
119 | let rt = threaded(); |
120 | let handle = rt.handle().clone(); |
121 | let task = thread::spawn(move || { |
122 | handle.spawn(async { |
123 | // DO nothing |
124 | }) |
125 | }) |
126 | .join() |
127 | .unwrap(); |
128 | |
129 | rt.block_on(task).unwrap(); |
130 | |
131 | assert_eq!(1, rt.metrics().remote_schedule_count()); |
132 | } |
133 | |
134 | #[test] |
135 | fn worker_park_count() { |
136 | let rt = current_thread(); |
137 | let metrics = rt.metrics(); |
138 | rt.block_on(async { |
139 | time::sleep(Duration::from_millis(1)).await; |
140 | }); |
141 | drop(rt); |
142 | assert!(1 <= metrics.worker_park_count(0)); |
143 | |
144 | let rt = threaded(); |
145 | let metrics = rt.metrics(); |
146 | rt.block_on(async { |
147 | time::sleep(Duration::from_millis(1)).await; |
148 | }); |
149 | drop(rt); |
150 | assert!(1 <= metrics.worker_park_count(0)); |
151 | assert!(1 <= metrics.worker_park_count(1)); |
152 | } |
153 | |
154 | #[test] |
155 | fn worker_noop_count() { |
156 | // There isn't really a great way to generate no-op parks as they happen as |
157 | // false-positive events under concurrency. |
158 | |
159 | let rt = current_thread(); |
160 | let metrics = rt.metrics(); |
161 | rt.block_on(async { |
162 | time::sleep(Duration::from_millis(1)).await; |
163 | }); |
164 | drop(rt); |
165 | assert!(0 < metrics.worker_noop_count(0)); |
166 | |
167 | let rt = threaded(); |
168 | let metrics = rt.metrics(); |
169 | rt.block_on(async { |
170 | time::sleep(Duration::from_millis(1)).await; |
171 | }); |
172 | drop(rt); |
173 | assert!(0 < metrics.worker_noop_count(0)); |
174 | assert!(0 < metrics.worker_noop_count(1)); |
175 | } |
176 | |
177 | #[test] |
178 | fn worker_steal_count() { |
179 | // This metric only applies to the multi-threaded runtime. |
180 | // |
181 | // We use a blocking channel to backup one worker thread. |
182 | use std::sync::mpsc::channel; |
183 | |
184 | let rt = threaded(); |
185 | let metrics = rt.metrics(); |
186 | |
187 | rt.block_on(async { |
188 | let (tx, rx) = channel(); |
189 | |
190 | // Move to the runtime. |
191 | tokio::spawn(async move { |
192 | // Spawn the task that sends to the channel |
193 | tokio::spawn(async move { |
194 | tx.send(()).unwrap(); |
195 | }); |
196 | |
197 | // Spawn a task that bumps the previous task out of the "next |
198 | // scheduled" slot. |
199 | tokio::spawn(async {}); |
200 | |
201 | // Blocking receive on the channel. |
202 | rx.recv().unwrap(); |
203 | }) |
204 | .await |
205 | .unwrap(); |
206 | }); |
207 | |
208 | drop(rt); |
209 | |
210 | let n: u64 = (0..metrics.num_workers()) |
211 | .map(|i| metrics.worker_steal_count(i)) |
212 | .sum(); |
213 | |
214 | assert_eq!(1, n); |
215 | } |
216 | |
217 | #[test] |
218 | fn worker_poll_count_and_time() { |
219 | const N: u64 = 5; |
220 | |
221 | async fn task() { |
222 | // Sync sleep |
223 | std::thread::sleep(std::time::Duration::from_micros(10)); |
224 | } |
225 | |
226 | let rt = current_thread(); |
227 | let metrics = rt.metrics(); |
228 | rt.block_on(async { |
229 | for _ in 0..N { |
230 | tokio::spawn(task()).await.unwrap(); |
231 | } |
232 | }); |
233 | drop(rt); |
234 | assert_eq!(N, metrics.worker_poll_count(0)); |
235 | // Not currently supported for current-thread runtime |
236 | assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0)); |
237 | |
238 | // Does not populate the histogram |
239 | assert!(!metrics.poll_count_histogram_enabled()); |
240 | for i in 0..10 { |
241 | assert_eq!(0, metrics.poll_count_histogram_bucket_count(0, i)); |
242 | } |
243 | |
244 | let rt = threaded(); |
245 | let metrics = rt.metrics(); |
246 | rt.block_on(async { |
247 | for _ in 0..N { |
248 | tokio::spawn(task()).await.unwrap(); |
249 | } |
250 | }); |
251 | drop(rt); |
252 | // Account for the `block_on` task |
253 | let n = (0..metrics.num_workers()) |
254 | .map(|i| metrics.worker_poll_count(i)) |
255 | .sum(); |
256 | |
257 | assert_eq!(N, n); |
258 | |
259 | let n: Duration = (0..metrics.num_workers()) |
260 | .map(|i| metrics.worker_mean_poll_time(i)) |
261 | .sum(); |
262 | |
263 | assert!(n > Duration::default()); |
264 | |
265 | // Does not populate the histogram |
266 | assert!(!metrics.poll_count_histogram_enabled()); |
267 | for n in 0..metrics.num_workers() { |
268 | for i in 0..10 { |
269 | assert_eq!(0, metrics.poll_count_histogram_bucket_count(n, i)); |
270 | } |
271 | } |
272 | } |
273 | |
274 | #[test] |
275 | fn worker_poll_count_histogram() { |
276 | const N: u64 = 5; |
277 | |
278 | let rts = [ |
279 | tokio::runtime::Builder::new_current_thread() |
280 | .enable_all() |
281 | .enable_metrics_poll_count_histogram() |
282 | .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) |
283 | .metrics_poll_count_histogram_buckets(3) |
284 | .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) |
285 | .build() |
286 | .unwrap(), |
287 | tokio::runtime::Builder::new_multi_thread() |
288 | .worker_threads(2) |
289 | .enable_all() |
290 | .enable_metrics_poll_count_histogram() |
291 | .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) |
292 | .metrics_poll_count_histogram_buckets(3) |
293 | .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) |
294 | .build() |
295 | .unwrap(), |
296 | ]; |
297 | |
298 | for rt in rts { |
299 | let metrics = rt.metrics(); |
300 | rt.block_on(async { |
301 | for _ in 0..N { |
302 | tokio::spawn(async {}).await.unwrap(); |
303 | } |
304 | }); |
305 | drop(rt); |
306 | |
307 | let num_workers = metrics.num_workers(); |
308 | let num_buckets = metrics.poll_count_histogram_num_buckets(); |
309 | |
310 | assert!(metrics.poll_count_histogram_enabled()); |
311 | assert_eq!(num_buckets, 3); |
312 | |
313 | let n = (0..num_workers) |
314 | .flat_map(|i| (0..num_buckets).map(move |j| (i, j))) |
315 | .map(|(worker, bucket)| metrics.poll_count_histogram_bucket_count(worker, bucket)) |
316 | .sum(); |
317 | assert_eq!(N, n); |
318 | } |
319 | } |
320 | |
321 | #[test] |
322 | fn worker_poll_count_histogram_range() { |
323 | let max = Duration::from_nanos(u64::MAX); |
324 | |
325 | let rt = tokio::runtime::Builder::new_current_thread() |
326 | .enable_all() |
327 | .enable_metrics_poll_count_histogram() |
328 | .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) |
329 | .metrics_poll_count_histogram_buckets(3) |
330 | .metrics_poll_count_histogram_resolution(us(50)) |
331 | .build() |
332 | .unwrap(); |
333 | let metrics = rt.metrics(); |
334 | |
335 | assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..us(50)); |
336 | assert_eq!( |
337 | metrics.poll_count_histogram_bucket_range(1), |
338 | us(50)..us(100) |
339 | ); |
340 | assert_eq!(metrics.poll_count_histogram_bucket_range(2), us(100)..max); |
341 | |
342 | let rt = tokio::runtime::Builder::new_current_thread() |
343 | .enable_all() |
344 | .enable_metrics_poll_count_histogram() |
345 | .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Log) |
346 | .metrics_poll_count_histogram_buckets(3) |
347 | .metrics_poll_count_histogram_resolution(us(50)) |
348 | .build() |
349 | .unwrap(); |
350 | let metrics = rt.metrics(); |
351 | |
352 | let a = Duration::from_nanos(50000_u64.next_power_of_two()); |
353 | let b = a * 2; |
354 | |
355 | assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..a); |
356 | assert_eq!(metrics.poll_count_histogram_bucket_range(1), a..b); |
357 | assert_eq!(metrics.poll_count_histogram_bucket_range(2), b..max); |
358 | } |
359 | |
360 | #[test] |
361 | fn worker_poll_count_histogram_disabled_without_explicit_enable() { |
362 | let rts = [ |
363 | tokio::runtime::Builder::new_current_thread() |
364 | .enable_all() |
365 | .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) |
366 | .metrics_poll_count_histogram_buckets(3) |
367 | .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) |
368 | .build() |
369 | .unwrap(), |
370 | tokio::runtime::Builder::new_multi_thread() |
371 | .worker_threads(2) |
372 | .enable_all() |
373 | .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear) |
374 | .metrics_poll_count_histogram_buckets(3) |
375 | .metrics_poll_count_histogram_resolution(Duration::from_millis(50)) |
376 | .build() |
377 | .unwrap(), |
378 | ]; |
379 | |
380 | for rt in rts { |
381 | let metrics = rt.metrics(); |
382 | assert!(!metrics.poll_count_histogram_enabled()); |
383 | } |
384 | } |
385 | |
386 | #[test] |
387 | fn worker_total_busy_duration() { |
388 | const N: usize = 5; |
389 | |
390 | let zero = Duration::from_millis(0); |
391 | |
392 | let rt = current_thread(); |
393 | let metrics = rt.metrics(); |
394 | |
395 | rt.block_on(async { |
396 | for _ in 0..N { |
397 | tokio::spawn(async { |
398 | tokio::task::yield_now().await; |
399 | }) |
400 | .await |
401 | .unwrap(); |
402 | } |
403 | }); |
404 | |
405 | drop(rt); |
406 | |
407 | assert!(zero < metrics.worker_total_busy_duration(0)); |
408 | |
409 | let rt = threaded(); |
410 | let metrics = rt.metrics(); |
411 | |
412 | rt.block_on(async { |
413 | for _ in 0..N { |
414 | tokio::spawn(async { |
415 | tokio::task::yield_now().await; |
416 | }) |
417 | .await |
418 | .unwrap(); |
419 | } |
420 | }); |
421 | |
422 | drop(rt); |
423 | |
424 | for i in 0..metrics.num_workers() { |
425 | assert!(zero < metrics.worker_total_busy_duration(i)); |
426 | } |
427 | } |
428 | |
429 | #[test] |
430 | fn worker_local_schedule_count() { |
431 | let rt = current_thread(); |
432 | let metrics = rt.metrics(); |
433 | rt.block_on(async { |
434 | tokio::spawn(async {}).await.unwrap(); |
435 | }); |
436 | drop(rt); |
437 | |
438 | assert_eq!(1, metrics.worker_local_schedule_count(0)); |
439 | assert_eq!(0, metrics.remote_schedule_count()); |
440 | |
441 | let rt = threaded(); |
442 | let metrics = rt.metrics(); |
443 | rt.block_on(async { |
444 | // Move to the runtime |
445 | tokio::spawn(async { |
446 | tokio::spawn(async {}).await.unwrap(); |
447 | }) |
448 | .await |
449 | .unwrap(); |
450 | }); |
451 | drop(rt); |
452 | |
453 | let n: u64 = (0..metrics.num_workers()) |
454 | .map(|i| metrics.worker_local_schedule_count(i)) |
455 | .sum(); |
456 | |
457 | assert_eq!(2, n); |
458 | assert_eq!(1, metrics.remote_schedule_count()); |
459 | } |
460 | |
461 | #[test] |
462 | fn worker_overflow_count() { |
463 | // Only applies to the threaded worker |
464 | let rt = threaded(); |
465 | let metrics = rt.metrics(); |
466 | rt.block_on(async { |
467 | // Move to the runtime |
468 | tokio::spawn(async { |
469 | let (tx1, rx1) = std::sync::mpsc::channel(); |
470 | let (tx2, rx2) = std::sync::mpsc::channel(); |
471 | |
472 | // First, we need to block the other worker until all tasks have |
473 | // been spawned. |
474 | // |
475 | // We spawn from outside the runtime to ensure that the other worker |
476 | // will pick it up: |
477 | // <https://github.com/tokio-rs/tokio/issues/4730> |
478 | tokio::task::spawn_blocking(|| { |
479 | tokio::spawn(async move { |
480 | tx1.send(()).unwrap(); |
481 | rx2.recv().unwrap(); |
482 | }); |
483 | }); |
484 | |
485 | rx1.recv().unwrap(); |
486 | |
487 | // Spawn many tasks |
488 | for _ in 0..300 { |
489 | tokio::spawn(async {}); |
490 | } |
491 | |
492 | tx2.send(()).unwrap(); |
493 | }) |
494 | .await |
495 | .unwrap(); |
496 | }); |
497 | drop(rt); |
498 | |
499 | let n: u64 = (0..metrics.num_workers()) |
500 | .map(|i| metrics.worker_overflow_count(i)) |
501 | .sum(); |
502 | |
503 | assert_eq!(1, n); |
504 | } |
505 | |
506 | #[test] |
507 | fn injection_queue_depth() { |
508 | use std::thread; |
509 | |
510 | let rt = current_thread(); |
511 | let handle = rt.handle().clone(); |
512 | let metrics = rt.metrics(); |
513 | |
514 | thread::spawn(move || { |
515 | handle.spawn(async {}); |
516 | }) |
517 | .join() |
518 | .unwrap(); |
519 | |
520 | assert_eq!(1, metrics.injection_queue_depth()); |
521 | |
522 | let rt = threaded(); |
523 | let handle = rt.handle().clone(); |
524 | let metrics = rt.metrics(); |
525 | |
526 | // First we need to block the runtime workers |
527 | let (tx1, rx1) = std::sync::mpsc::channel(); |
528 | let (tx2, rx2) = std::sync::mpsc::channel(); |
529 | let (tx3, rx3) = std::sync::mpsc::channel(); |
530 | let rx3 = Arc::new(Mutex::new(rx3)); |
531 | |
532 | rt.spawn(async move { rx1.recv().unwrap() }); |
533 | rt.spawn(async move { rx2.recv().unwrap() }); |
534 | |
535 | // Spawn some more to make sure there are items |
536 | for _ in 0..10 { |
537 | let rx = rx3.clone(); |
538 | rt.spawn(async move { |
539 | rx.lock().unwrap().recv().unwrap(); |
540 | }); |
541 | } |
542 | |
543 | thread::spawn(move || { |
544 | handle.spawn(async {}); |
545 | }) |
546 | .join() |
547 | .unwrap(); |
548 | |
549 | let n = metrics.injection_queue_depth(); |
550 | assert!(1 <= n, "{}" , n); |
551 | assert!(15 >= n, "{}" , n); |
552 | |
553 | for _ in 0..10 { |
554 | tx3.send(()).unwrap(); |
555 | } |
556 | |
557 | tx1.send(()).unwrap(); |
558 | tx2.send(()).unwrap(); |
559 | } |
560 | |
561 | #[test] |
562 | fn worker_local_queue_depth() { |
563 | const N: usize = 100; |
564 | |
565 | let rt = current_thread(); |
566 | let metrics = rt.metrics(); |
567 | rt.block_on(async { |
568 | for _ in 0..N { |
569 | tokio::spawn(async {}); |
570 | } |
571 | |
572 | assert_eq!(N, metrics.worker_local_queue_depth(0)); |
573 | }); |
574 | |
575 | let rt = threaded(); |
576 | let metrics = rt.metrics(); |
577 | rt.block_on(async move { |
578 | // Move to the runtime |
579 | tokio::spawn(async move { |
580 | let (tx1, rx1) = std::sync::mpsc::channel(); |
581 | let (tx2, rx2) = std::sync::mpsc::channel(); |
582 | |
583 | // First, we need to block the other worker until all tasks have |
584 | // been spawned. |
585 | tokio::spawn(async move { |
586 | tx1.send(()).unwrap(); |
587 | rx2.recv().unwrap(); |
588 | }); |
589 | |
590 | // Bump the next-run spawn |
591 | tokio::spawn(async {}); |
592 | |
593 | rx1.recv().unwrap(); |
594 | |
595 | // Spawn some tasks |
596 | for _ in 0..100 { |
597 | tokio::spawn(async {}); |
598 | } |
599 | |
600 | let n: usize = (0..metrics.num_workers()) |
601 | .map(|i| metrics.worker_local_queue_depth(i)) |
602 | .sum(); |
603 | |
604 | assert_eq!(n, N); |
605 | |
606 | tx2.send(()).unwrap(); |
607 | }) |
608 | .await |
609 | .unwrap(); |
610 | }); |
611 | } |
612 | |
613 | #[test] |
614 | fn budget_exhaustion_yield() { |
615 | let rt = current_thread(); |
616 | let metrics = rt.metrics(); |
617 | |
618 | assert_eq!(0, metrics.budget_forced_yield_count()); |
619 | |
620 | let mut did_yield = false; |
621 | |
622 | // block on a task which consumes budget until it yields |
623 | rt.block_on(poll_fn(|cx| loop { |
624 | if did_yield { |
625 | return Poll::Ready(()); |
626 | } |
627 | |
628 | let fut = consume_budget(); |
629 | tokio::pin!(fut); |
630 | |
631 | if fut.poll(cx).is_pending() { |
632 | did_yield = true; |
633 | return Poll::Pending; |
634 | } |
635 | })); |
636 | |
637 | assert_eq!(1, rt.metrics().budget_forced_yield_count()); |
638 | } |
639 | |
640 | #[test] |
641 | fn budget_exhaustion_yield_with_joins() { |
642 | let rt = current_thread(); |
643 | let metrics = rt.metrics(); |
644 | |
645 | assert_eq!(0, metrics.budget_forced_yield_count()); |
646 | |
647 | let mut did_yield_1 = false; |
648 | let mut did_yield_2 = false; |
649 | |
650 | // block on a task which consumes budget until it yields |
651 | rt.block_on(async { |
652 | tokio::join!( |
653 | poll_fn(|cx| loop { |
654 | if did_yield_1 { |
655 | return Poll::Ready(()); |
656 | } |
657 | |
658 | let fut = consume_budget(); |
659 | tokio::pin!(fut); |
660 | |
661 | if fut.poll(cx).is_pending() { |
662 | did_yield_1 = true; |
663 | return Poll::Pending; |
664 | } |
665 | }), |
666 | poll_fn(|cx| loop { |
667 | if did_yield_2 { |
668 | return Poll::Ready(()); |
669 | } |
670 | |
671 | let fut = consume_budget(); |
672 | tokio::pin!(fut); |
673 | |
674 | if fut.poll(cx).is_pending() { |
675 | did_yield_2 = true; |
676 | return Poll::Pending; |
677 | } |
678 | }) |
679 | ) |
680 | }); |
681 | |
682 | assert_eq!(1, rt.metrics().budget_forced_yield_count()); |
683 | } |
684 | |
685 | #[cfg (any(target_os = "linux" , target_os = "macos" ))] |
686 | #[test] |
687 | fn io_driver_fd_count() { |
688 | let rt = current_thread(); |
689 | let metrics = rt.metrics(); |
690 | |
691 | assert_eq!(metrics.io_driver_fd_registered_count(), 0); |
692 | |
693 | let stream = tokio::net::TcpStream::connect("google.com:80" ); |
694 | let stream = rt.block_on(async move { stream.await.unwrap() }); |
695 | |
696 | assert_eq!(metrics.io_driver_fd_registered_count(), 1); |
697 | assert_eq!(metrics.io_driver_fd_deregistered_count(), 0); |
698 | |
699 | drop(stream); |
700 | |
701 | assert_eq!(metrics.io_driver_fd_deregistered_count(), 1); |
702 | assert_eq!(metrics.io_driver_fd_registered_count(), 1); |
703 | } |
704 | |
705 | #[cfg (any(target_os = "linux" , target_os = "macos" ))] |
706 | #[test] |
707 | fn io_driver_ready_count() { |
708 | let rt = current_thread(); |
709 | let metrics = rt.metrics(); |
710 | |
711 | let stream = tokio::net::TcpStream::connect("google.com:80" ); |
712 | let _stream = rt.block_on(async move { stream.await.unwrap() }); |
713 | |
714 | assert_eq!(metrics.io_driver_ready_count(), 1); |
715 | } |
716 | |
717 | fn current_thread() -> Runtime { |
718 | tokio::runtime::Builder::new_current_thread() |
719 | .enable_all() |
720 | .build() |
721 | .unwrap() |
722 | } |
723 | |
724 | fn threaded() -> Runtime { |
725 | tokio::runtime::Builder::new_multi_thread() |
726 | .worker_threads(2) |
727 | .enable_all() |
728 | .build() |
729 | .unwrap() |
730 | } |
731 | |
732 | fn us(n: u64) -> Duration { |
733 | Duration::from_micros(n) |
734 | } |
735 | |