1//! Thread-local channel context.
2
3use super::select::Selected;
4use super::waker::current_thread_id;
5
6use crate::cell::Cell;
7use crate::ptr;
8use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
9use crate::sync::Arc;
10use crate::thread::{self, Thread};
11use crate::time::Instant;
12
13/// Thread-local context.
14#[derive(Debug, Clone)]
15pub struct Context {
16 inner: Arc<Inner>,
17}
18
19/// Inner representation of `Context`.
20#[derive(Debug)]
21struct Inner {
22 /// Selected operation.
23 select: AtomicUsize,
24
25 /// A slot into which another thread may store a pointer to its `Packet`.
26 packet: AtomicPtr<()>,
27
28 /// Thread handle.
29 thread: Thread,
30
31 /// Thread id.
32 thread_id: usize,
33}
34
35impl Context {
36 /// Creates a new context for the duration of the closure.
37 #[inline]
38 pub fn with<F, R>(f: F) -> R
39 where
40 F: FnOnce(&Context) -> R,
41 {
42 thread_local! {
43 /// Cached thread-local context.
44 static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
45 }
46
47 let mut f = Some(f);
48 let mut f = |cx: &Context| -> R {
49 let f = f.take().unwrap();
50 f(cx)
51 };
52
53 CONTEXT
54 .try_with(|cell| match cell.take() {
55 None => f(&Context::new()),
56 Some(cx) => {
57 cx.reset();
58 let res = f(&cx);
59 cell.set(Some(cx));
60 res
61 }
62 })
63 .unwrap_or_else(|_| f(&Context::new()))
64 }
65
66 /// Creates a new `Context`.
67 #[cold]
68 fn new() -> Context {
69 Context {
70 inner: Arc::new(Inner {
71 select: AtomicUsize::new(Selected::Waiting.into()),
72 packet: AtomicPtr::new(ptr::null_mut()),
73 thread: thread::current(),
74 thread_id: current_thread_id(),
75 }),
76 }
77 }
78
79 /// Resets `select` and `packet`.
80 #[inline]
81 fn reset(&self) {
82 self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
83 self.inner.packet.store(ptr::null_mut(), Ordering::Release);
84 }
85
86 /// Attempts to select an operation.
87 ///
88 /// On failure, the previously selected operation is returned.
89 #[inline]
90 pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
91 self.inner
92 .select
93 .compare_exchange(
94 Selected::Waiting.into(),
95 select.into(),
96 Ordering::AcqRel,
97 Ordering::Acquire,
98 )
99 .map(|_| ())
100 .map_err(|e| e.into())
101 }
102
103 /// Stores a packet.
104 ///
105 /// This method must be called after `try_select` succeeds and there is a packet to provide.
106 #[inline]
107 pub fn store_packet(&self, packet: *mut ()) {
108 if !packet.is_null() {
109 self.inner.packet.store(packet, Ordering::Release);
110 }
111 }
112
113 /// Waits until an operation is selected and returns it.
114 ///
115 /// If the deadline is reached, `Selected::Aborted` will be selected.
116 #[inline]
117 pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
118 loop {
119 // Check whether an operation has been selected.
120 let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
121 if sel != Selected::Waiting {
122 return sel;
123 }
124
125 // If there's a deadline, park the current thread until the deadline is reached.
126 if let Some(end) = deadline {
127 let now = Instant::now();
128
129 if now < end {
130 thread::park_timeout(end - now);
131 } else {
132 // The deadline has been reached. Try aborting select.
133 return match self.try_select(Selected::Aborted) {
134 Ok(()) => Selected::Aborted,
135 Err(s) => s,
136 };
137 }
138 } else {
139 thread::park();
140 }
141 }
142 }
143
144 /// Unparks the thread this context belongs to.
145 #[inline]
146 pub fn unpark(&self) {
147 self.inner.thread.unpark();
148 }
149
150 /// Returns the id of the thread this context belongs to.
151 #[inline]
152 pub fn thread_id(&self) -> usize {
153 self.inner.thread_id
154 }
155}
156