1 | #![cfg (all(feature = "full" , not(target_os = "wasi" )))] // Wasi doesn't support threads |
2 | #![allow (clippy::declare_interior_mutable_const)] |
3 | use std::future::Future; |
4 | use std::pin::Pin; |
5 | use std::task::{Context, Poll}; |
6 | use tokio::sync::oneshot; |
7 | |
8 | #[tokio::test (flavor = "multi_thread" )] |
9 | async fn local() { |
10 | tokio::task_local! { |
11 | static REQ_ID: u32; |
12 | pub static FOO: bool; |
13 | } |
14 | |
15 | let j1 = tokio::spawn(REQ_ID.scope(1, async move { |
16 | assert_eq!(REQ_ID.get(), 1); |
17 | assert_eq!(REQ_ID.get(), 1); |
18 | })); |
19 | |
20 | let j2 = tokio::spawn(REQ_ID.scope(2, async move { |
21 | REQ_ID.with(|v| { |
22 | assert_eq!(REQ_ID.get(), 2); |
23 | assert_eq!(*v, 2); |
24 | }); |
25 | |
26 | tokio::time::sleep(std::time::Duration::from_millis(10)).await; |
27 | |
28 | assert_eq!(REQ_ID.get(), 2); |
29 | })); |
30 | |
31 | let j3 = tokio::spawn(FOO.scope(true, async move { |
32 | assert!(FOO.get()); |
33 | })); |
34 | |
35 | j1.await.unwrap(); |
36 | j2.await.unwrap(); |
37 | j3.await.unwrap(); |
38 | } |
39 | |
40 | #[tokio::test ] |
41 | async fn task_local_available_on_abort() { |
42 | tokio::task_local! { |
43 | static KEY: u32; |
44 | } |
45 | |
46 | struct MyFuture { |
47 | tx_poll: Option<oneshot::Sender<()>>, |
48 | tx_drop: Option<oneshot::Sender<u32>>, |
49 | } |
50 | impl Future for MyFuture { |
51 | type Output = (); |
52 | |
53 | fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { |
54 | if let Some(tx_poll) = self.tx_poll.take() { |
55 | let _ = tx_poll.send(()); |
56 | } |
57 | Poll::Pending |
58 | } |
59 | } |
60 | impl Drop for MyFuture { |
61 | fn drop(&mut self) { |
62 | let _ = self.tx_drop.take().unwrap().send(KEY.get()); |
63 | } |
64 | } |
65 | |
66 | let (tx_drop, rx_drop) = oneshot::channel(); |
67 | let (tx_poll, rx_poll) = oneshot::channel(); |
68 | |
69 | let h = tokio::spawn(KEY.scope( |
70 | 42, |
71 | MyFuture { |
72 | tx_poll: Some(tx_poll), |
73 | tx_drop: Some(tx_drop), |
74 | }, |
75 | )); |
76 | |
77 | rx_poll.await.unwrap(); |
78 | h.abort(); |
79 | assert_eq!(rx_drop.await.unwrap(), 42); |
80 | |
81 | let err = h.await.unwrap_err(); |
82 | if !err.is_cancelled() { |
83 | if let Ok(panic) = err.try_into_panic() { |
84 | std::panic::resume_unwind(panic); |
85 | } else { |
86 | panic!(); |
87 | } |
88 | } |
89 | } |
90 | |
91 | #[tokio::test ] |
92 | async fn task_local_available_on_completion_drop() { |
93 | tokio::task_local! { |
94 | static KEY: u32; |
95 | } |
96 | |
97 | struct MyFuture { |
98 | tx: Option<oneshot::Sender<u32>>, |
99 | } |
100 | impl Future for MyFuture { |
101 | type Output = (); |
102 | |
103 | fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { |
104 | Poll::Ready(()) |
105 | } |
106 | } |
107 | impl Drop for MyFuture { |
108 | fn drop(&mut self) { |
109 | let _ = self.tx.take().unwrap().send(KEY.get()); |
110 | } |
111 | } |
112 | |
113 | let (tx, rx) = oneshot::channel(); |
114 | |
115 | let h = tokio::spawn(KEY.scope(42, MyFuture { tx: Some(tx) })); |
116 | |
117 | assert_eq!(rx.await.unwrap(), 42); |
118 | h.await.unwrap(); |
119 | } |
120 | |