1 | //! Thread-local context used in select. |
2 | |
3 | use std::cell::Cell; |
4 | use std::ptr; |
5 | use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; |
6 | use std::sync::Arc; |
7 | use std::thread::{self, Thread, ThreadId}; |
8 | use std::time::Instant; |
9 | |
10 | use crossbeam_utils::Backoff; |
11 | |
12 | use crate::select::Selected; |
13 | |
14 | /// Thread-local context used in select. |
15 | // This is a private API that is used by the select macro. |
16 | #[derive (Debug, Clone)] |
17 | pub struct Context { |
18 | inner: Arc<Inner>, |
19 | } |
20 | |
21 | /// Inner representation of `Context`. |
22 | #[derive (Debug)] |
23 | struct Inner { |
24 | /// Selected operation. |
25 | select: AtomicUsize, |
26 | |
27 | /// A slot into which another thread may store a pointer to its `Packet`. |
28 | packet: AtomicPtr<()>, |
29 | |
30 | /// Thread handle. |
31 | thread: Thread, |
32 | |
33 | /// Thread id. |
34 | thread_id: ThreadId, |
35 | } |
36 | |
37 | impl Context { |
38 | /// Creates a new context for the duration of the closure. |
39 | #[inline ] |
40 | pub fn with<F, R>(f: F) -> R |
41 | where |
42 | F: FnOnce(&Context) -> R, |
43 | { |
44 | thread_local! { |
45 | /// Cached thread-local context. |
46 | static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new())); |
47 | } |
48 | |
49 | let mut f = Some(f); |
50 | let mut f = |cx: &Context| -> R { |
51 | let f = f.take().unwrap(); |
52 | f(cx) |
53 | }; |
54 | |
55 | CONTEXT |
56 | .try_with(|cell| match cell.take() { |
57 | None => f(&Context::new()), |
58 | Some(cx) => { |
59 | cx.reset(); |
60 | let res = f(&cx); |
61 | cell.set(Some(cx)); |
62 | res |
63 | } |
64 | }) |
65 | .unwrap_or_else(|_| f(&Context::new())) |
66 | } |
67 | |
68 | /// Creates a new `Context`. |
69 | #[cold ] |
70 | fn new() -> Context { |
71 | Context { |
72 | inner: Arc::new(Inner { |
73 | select: AtomicUsize::new(Selected::Waiting.into()), |
74 | packet: AtomicPtr::new(ptr::null_mut()), |
75 | thread: thread::current(), |
76 | thread_id: thread::current().id(), |
77 | }), |
78 | } |
79 | } |
80 | |
81 | /// Resets `select` and `packet`. |
82 | #[inline ] |
83 | fn reset(&self) { |
84 | self.inner |
85 | .select |
86 | .store(Selected::Waiting.into(), Ordering::Release); |
87 | self.inner.packet.store(ptr::null_mut(), Ordering::Release); |
88 | } |
89 | |
90 | /// Attempts to select an operation. |
91 | /// |
92 | /// On failure, the previously selected operation is returned. |
93 | #[inline ] |
94 | pub fn try_select(&self, select: Selected) -> Result<(), Selected> { |
95 | self.inner |
96 | .select |
97 | .compare_exchange( |
98 | Selected::Waiting.into(), |
99 | select.into(), |
100 | Ordering::AcqRel, |
101 | Ordering::Acquire, |
102 | ) |
103 | .map(|_| ()) |
104 | .map_err(|e| e.into()) |
105 | } |
106 | |
107 | /// Returns the selected operation. |
108 | #[inline ] |
109 | pub fn selected(&self) -> Selected { |
110 | Selected::from(self.inner.select.load(Ordering::Acquire)) |
111 | } |
112 | |
113 | /// Stores a packet. |
114 | /// |
115 | /// This method must be called after `try_select` succeeds and there is a packet to provide. |
116 | #[inline ] |
117 | pub fn store_packet(&self, packet: *mut ()) { |
118 | if !packet.is_null() { |
119 | self.inner.packet.store(packet, Ordering::Release); |
120 | } |
121 | } |
122 | |
123 | /// Waits until a packet is provided and returns it. |
124 | #[inline ] |
125 | pub fn wait_packet(&self) -> *mut () { |
126 | let backoff = Backoff::new(); |
127 | loop { |
128 | let packet = self.inner.packet.load(Ordering::Acquire); |
129 | if !packet.is_null() { |
130 | return packet; |
131 | } |
132 | backoff.snooze(); |
133 | } |
134 | } |
135 | |
136 | /// Waits until an operation is selected and returns it. |
137 | /// |
138 | /// If the deadline is reached, `Selected::Aborted` will be selected. |
139 | #[inline ] |
140 | pub fn wait_until(&self, deadline: Option<Instant>) -> Selected { |
141 | // Spin for a short time, waiting until an operation is selected. |
142 | let backoff = Backoff::new(); |
143 | loop { |
144 | let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); |
145 | if sel != Selected::Waiting { |
146 | return sel; |
147 | } |
148 | |
149 | if backoff.is_completed() { |
150 | break; |
151 | } else { |
152 | backoff.snooze(); |
153 | } |
154 | } |
155 | |
156 | loop { |
157 | // Check whether an operation has been selected. |
158 | let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); |
159 | if sel != Selected::Waiting { |
160 | return sel; |
161 | } |
162 | |
163 | // If there's a deadline, park the current thread until the deadline is reached. |
164 | if let Some(end) = deadline { |
165 | let now = Instant::now(); |
166 | |
167 | if now < end { |
168 | thread::park_timeout(end - now); |
169 | } else { |
170 | // The deadline has been reached. Try aborting select. |
171 | return match self.try_select(Selected::Aborted) { |
172 | Ok(()) => Selected::Aborted, |
173 | Err(s) => s, |
174 | }; |
175 | } |
176 | } else { |
177 | thread::park(); |
178 | } |
179 | } |
180 | } |
181 | |
182 | /// Unparks the thread this context belongs to. |
183 | #[inline ] |
184 | pub fn unpark(&self) { |
185 | self.inner.thread.unpark(); |
186 | } |
187 | |
188 | /// Returns the id of the thread this context belongs to. |
189 | #[inline ] |
190 | pub fn thread_id(&self) -> ThreadId { |
191 | self.inner.thread_id |
192 | } |
193 | } |
194 | |