1//! Channel that delivers a message at a certain moment in time.
2//!
3//! Messages cannot be sent into this kind of channel; they are materialized on demand.
4
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::thread;
7use std::time::Instant;
8
9use crate::context::Context;
10use crate::err::{RecvTimeoutError, TryRecvError};
11use crate::select::{Operation, SelectHandle, Token};
12use crate::utils;
13
14/// Result of a receive operation.
15pub(crate) type AtToken = Option<Instant>;
16
17/// Channel that delivers a message at a certain moment in time
18pub(crate) struct Channel {
19 /// The instant at which the message will be delivered.
20 delivery_time: Instant,
21
22 /// `true` if the message has been received.
23 received: AtomicBool,
24}
25
26impl Channel {
27 /// Creates a channel that delivers a message at a certain instant in time.
28 #[inline]
29 pub(crate) fn new_deadline(when: Instant) -> Self {
30 Channel {
31 delivery_time: when,
32 received: AtomicBool::new(false),
33 }
34 }
35
36 /// Attempts to receive a message without blocking.
37 #[inline]
38 pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39 // We use relaxed ordering because this is just an optional optimistic check.
40 if self.received.load(Ordering::Relaxed) {
41 // The message has already been received.
42 return Err(TryRecvError::Empty);
43 }
44
45 if Instant::now() < self.delivery_time {
46 // The message was not delivered yet.
47 return Err(TryRecvError::Empty);
48 }
49
50 // Try receiving the message if it is still available.
51 if !self.received.swap(true, Ordering::SeqCst) {
52 // Success! Return delivery time as the message.
53 Ok(self.delivery_time)
54 } else {
55 // The message was already received.
56 Err(TryRecvError::Empty)
57 }
58 }
59
60 /// Receives a message from the channel.
61 #[inline]
62 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
63 // We use relaxed ordering because this is just an optional optimistic check.
64 if self.received.load(Ordering::Relaxed) {
65 // The message has already been received.
66 utils::sleep_until(deadline);
67 return Err(RecvTimeoutError::Timeout);
68 }
69
70 // Wait until the message is received or the deadline is reached.
71 loop {
72 let now = Instant::now();
73
74 let deadline = match deadline {
75 // Check if we can receive the next message.
76 _ if now >= self.delivery_time => break,
77 // Check if the timeout deadline has been reached.
78 Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
79
80 // Sleep until one of the above happens
81 Some(d) if d < self.delivery_time => d,
82 _ => self.delivery_time,
83 };
84
85 thread::sleep(deadline - now);
86 }
87
88 // Try receiving the message if it is still available.
89 if !self.received.swap(true, Ordering::SeqCst) {
90 // Success! Return the message, which is the instant at which it was delivered.
91 Ok(self.delivery_time)
92 } else {
93 // The message was already received. Block forever.
94 utils::sleep_until(None);
95 unreachable!()
96 }
97 }
98
99 /// Reads a message from the channel.
100 #[inline]
101 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
102 token.at.ok_or(())
103 }
104
105 /// Returns `true` if the channel is empty.
106 #[inline]
107 pub(crate) fn is_empty(&self) -> bool {
108 // We use relaxed ordering because this is just an optional optimistic check.
109 if self.received.load(Ordering::Relaxed) {
110 return true;
111 }
112
113 // If the delivery time hasn't been reached yet, the channel is empty.
114 if Instant::now() < self.delivery_time {
115 return true;
116 }
117
118 // The delivery time has been reached. The channel is empty only if the message has already
119 // been received.
120 self.received.load(Ordering::SeqCst)
121 }
122
123 /// Returns `true` if the channel is full.
124 #[inline]
125 pub(crate) fn is_full(&self) -> bool {
126 !self.is_empty()
127 }
128
129 /// Returns the number of messages in the channel.
130 #[inline]
131 pub(crate) fn len(&self) -> usize {
132 if self.is_empty() {
133 0
134 } else {
135 1
136 }
137 }
138
139 /// Returns the capacity of the channel.
140 #[inline]
141 pub(crate) fn capacity(&self) -> Option<usize> {
142 Some(1)
143 }
144}
145
146impl SelectHandle for Channel {
147 #[inline]
148 fn try_select(&self, token: &mut Token) -> bool {
149 match self.try_recv() {
150 Ok(msg) => {
151 token.at = Some(msg);
152 true
153 }
154 Err(TryRecvError::Disconnected) => {
155 token.at = None;
156 true
157 }
158 Err(TryRecvError::Empty) => false,
159 }
160 }
161
162 #[inline]
163 fn deadline(&self) -> Option<Instant> {
164 // We use relaxed ordering because this is just an optional optimistic check.
165 if self.received.load(Ordering::Relaxed) {
166 None
167 } else {
168 Some(self.delivery_time)
169 }
170 }
171
172 #[inline]
173 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
174 self.is_ready()
175 }
176
177 #[inline]
178 fn unregister(&self, _oper: Operation) {}
179
180 #[inline]
181 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
182 self.try_select(token)
183 }
184
185 #[inline]
186 fn is_ready(&self) -> bool {
187 !self.is_empty()
188 }
189
190 #[inline]
191 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
192 self.is_ready()
193 }
194
195 #[inline]
196 fn unwatch(&self, _oper: Operation) {}
197}
198