1//! Thread-local context used in select.
2
3use std::cell::Cell;
4use std::ptr;
5use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::thread::{self, Thread, ThreadId};
8use std::time::Instant;
9
10use crossbeam_utils::Backoff;
11
12use crate::select::Selected;
13
14/// Thread-local context used in select.
15// This is a private API that is used by the select macro.
16#[derive(Debug, Clone)]
17pub struct Context {
18 inner: Arc<Inner>,
19}
20
21/// Inner representation of `Context`.
22#[derive(Debug)]
23struct Inner {
24 /// Selected operation.
25 select: AtomicUsize,
26
27 /// A slot into which another thread may store a pointer to its `Packet`.
28 packet: AtomicPtr<()>,
29
30 /// Thread handle.
31 thread: Thread,
32
33 /// Thread id.
34 thread_id: ThreadId,
35}
36
37impl Context {
38 /// Creates a new context for the duration of the closure.
39 #[inline]
40 pub fn with<F, R>(f: F) -> R
41 where
42 F: FnOnce(&Context) -> R,
43 {
44 std::thread_local! {
45 /// Cached thread-local context.
46 static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
47 }
48
49 let mut f = Some(f);
50 let mut f = |cx: &Context| -> R {
51 let f = f.take().unwrap();
52 f(cx)
53 };
54
55 CONTEXT
56 .try_with(|cell| match cell.take() {
57 None => f(&Context::new()),
58 Some(cx) => {
59 cx.reset();
60 let res = f(&cx);
61 cell.set(Some(cx));
62 res
63 }
64 })
65 .unwrap_or_else(|_| f(&Context::new()))
66 }
67
68 /// Creates a new `Context`.
69 #[cold]
70 fn new() -> Context {
71 Context {
72 inner: Arc::new(Inner {
73 select: AtomicUsize::new(Selected::Waiting.into()),
74 packet: AtomicPtr::new(ptr::null_mut()),
75 thread: thread::current(),
76 thread_id: thread::current().id(),
77 }),
78 }
79 }
80
81 /// Resets `select` and `packet`.
82 #[inline]
83 fn reset(&self) {
84 self.inner
85 .select
86 .store(Selected::Waiting.into(), Ordering::Release);
87 self.inner.packet.store(ptr::null_mut(), Ordering::Release);
88 }
89
90 /// Attempts to select an operation.
91 ///
92 /// On failure, the previously selected operation is returned.
93 #[inline]
94 pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
95 self.inner
96 .select
97 .compare_exchange(
98 Selected::Waiting.into(),
99 select.into(),
100 Ordering::AcqRel,
101 Ordering::Acquire,
102 )
103 .map(|_| ())
104 .map_err(|e| e.into())
105 }
106
107 /// Returns the selected operation.
108 #[inline]
109 pub fn selected(&self) -> Selected {
110 Selected::from(self.inner.select.load(Ordering::Acquire))
111 }
112
113 /// Stores a packet.
114 ///
115 /// This method must be called after `try_select` succeeds and there is a packet to provide.
116 #[inline]
117 pub fn store_packet(&self, packet: *mut ()) {
118 if !packet.is_null() {
119 self.inner.packet.store(packet, Ordering::Release);
120 }
121 }
122
123 /// Waits until a packet is provided and returns it.
124 #[inline]
125 pub fn wait_packet(&self) -> *mut () {
126 let backoff = Backoff::new();
127 loop {
128 let packet = self.inner.packet.load(Ordering::Acquire);
129 if !packet.is_null() {
130 return packet;
131 }
132 backoff.snooze();
133 }
134 }
135
136 /// Waits until an operation is selected and returns it.
137 ///
138 /// If the deadline is reached, `Selected::Aborted` will be selected.
139 #[inline]
140 pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
141 // Spin for a short time, waiting until an operation is selected.
142 let backoff = Backoff::new();
143 loop {
144 let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
145 if sel != Selected::Waiting {
146 return sel;
147 }
148
149 if backoff.is_completed() {
150 break;
151 } else {
152 backoff.snooze();
153 }
154 }
155
156 loop {
157 // Check whether an operation has been selected.
158 let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
159 if sel != Selected::Waiting {
160 return sel;
161 }
162
163 // If there's a deadline, park the current thread until the deadline is reached.
164 if let Some(end) = deadline {
165 let now = Instant::now();
166
167 if now < end {
168 thread::park_timeout(end - now);
169 } else {
170 // The deadline has been reached. Try aborting select.
171 return match self.try_select(Selected::Aborted) {
172 Ok(()) => Selected::Aborted,
173 Err(s) => s,
174 };
175 }
176 } else {
177 thread::park();
178 }
179 }
180 }
181
182 /// Unparks the thread this context belongs to.
183 #[inline]
184 pub fn unpark(&self) {
185 self.inner.thread.unpark();
186 }
187
188 /// Returns the id of the thread this context belongs to.
189 #[inline]
190 pub fn thread_id(&self) -> ThreadId {
191 self.inner.thread_id
192 }
193}
194