1 | //! Channel that delivers messages periodically. |
2 | //! |
3 | //! Messages cannot be sent into this kind of channel; they are materialized on demand. |
4 | |
5 | use std::thread; |
6 | use std::time::{Duration, Instant}; |
7 | |
8 | use crossbeam_utils::atomic::AtomicCell; |
9 | |
10 | use crate::context::Context; |
11 | use crate::err::{RecvTimeoutError, TryRecvError}; |
12 | use crate::select::{Operation, SelectHandle, Token}; |
13 | |
14 | /// Result of a receive operation. |
15 | pub(crate) type TickToken = Option<Instant>; |
16 | |
17 | /// Channel that delivers messages periodically. |
18 | pub(crate) struct Channel { |
19 | /// The instant at which the next message will be delivered. |
20 | delivery_time: AtomicCell<Instant>, |
21 | |
22 | /// The time interval in which messages get delivered. |
23 | duration: Duration, |
24 | } |
25 | |
26 | impl Channel { |
27 | /// Creates a channel that delivers messages periodically. |
28 | #[inline ] |
29 | pub(crate) fn new(delivery_time: Instant, dur: Duration) -> Self { |
30 | Channel { |
31 | delivery_time: AtomicCell::new(delivery_time), |
32 | duration: dur, |
33 | } |
34 | } |
35 | |
36 | /// Attempts to receive a message without blocking. |
37 | #[inline ] |
38 | pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { |
39 | loop { |
40 | let now = Instant::now(); |
41 | let delivery_time = self.delivery_time.load(); |
42 | |
43 | if now < delivery_time { |
44 | return Err(TryRecvError::Empty); |
45 | } |
46 | |
47 | if self |
48 | .delivery_time |
49 | .compare_exchange(delivery_time, now + self.duration) |
50 | .is_ok() |
51 | { |
52 | return Ok(delivery_time); |
53 | } |
54 | } |
55 | } |
56 | |
57 | /// Receives a message from the channel. |
58 | #[inline ] |
59 | pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { |
60 | loop { |
61 | let delivery_time = self.delivery_time.load(); |
62 | let now = Instant::now(); |
63 | |
64 | if let Some(d) = deadline { |
65 | if d < delivery_time { |
66 | if now < d { |
67 | thread::sleep(d - now); |
68 | } |
69 | return Err(RecvTimeoutError::Timeout); |
70 | } |
71 | } |
72 | |
73 | if self |
74 | .delivery_time |
75 | .compare_exchange(delivery_time, delivery_time.max(now) + self.duration) |
76 | .is_ok() |
77 | { |
78 | if now < delivery_time { |
79 | thread::sleep(delivery_time - now); |
80 | } |
81 | return Ok(delivery_time); |
82 | } |
83 | } |
84 | } |
85 | |
86 | /// Reads a message from the channel. |
87 | #[inline ] |
88 | pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { |
89 | token.tick.ok_or(()) |
90 | } |
91 | |
92 | /// Returns `true` if the channel is empty. |
93 | #[inline ] |
94 | pub(crate) fn is_empty(&self) -> bool { |
95 | Instant::now() < self.delivery_time.load() |
96 | } |
97 | |
98 | /// Returns `true` if the channel is full. |
99 | #[inline ] |
100 | pub(crate) fn is_full(&self) -> bool { |
101 | !self.is_empty() |
102 | } |
103 | |
104 | /// Returns the number of messages in the channel. |
105 | #[inline ] |
106 | pub(crate) fn len(&self) -> usize { |
107 | if self.is_empty() { |
108 | 0 |
109 | } else { |
110 | 1 |
111 | } |
112 | } |
113 | |
114 | /// Returns the capacity of the channel. |
115 | #[inline ] |
116 | pub(crate) fn capacity(&self) -> Option<usize> { |
117 | Some(1) |
118 | } |
119 | } |
120 | |
121 | impl SelectHandle for Channel { |
122 | #[inline ] |
123 | fn try_select(&self, token: &mut Token) -> bool { |
124 | match self.try_recv() { |
125 | Ok(msg) => { |
126 | token.tick = Some(msg); |
127 | true |
128 | } |
129 | Err(TryRecvError::Disconnected) => { |
130 | token.tick = None; |
131 | true |
132 | } |
133 | Err(TryRecvError::Empty) => false, |
134 | } |
135 | } |
136 | |
137 | #[inline ] |
138 | fn deadline(&self) -> Option<Instant> { |
139 | Some(self.delivery_time.load()) |
140 | } |
141 | |
142 | #[inline ] |
143 | fn register(&self, _oper: Operation, _cx: &Context) -> bool { |
144 | self.is_ready() |
145 | } |
146 | |
147 | #[inline ] |
148 | fn unregister(&self, _oper: Operation) {} |
149 | |
150 | #[inline ] |
151 | fn accept(&self, token: &mut Token, _cx: &Context) -> bool { |
152 | self.try_select(token) |
153 | } |
154 | |
155 | #[inline ] |
156 | fn is_ready(&self) -> bool { |
157 | !self.is_empty() |
158 | } |
159 | |
160 | #[inline ] |
161 | fn watch(&self, _oper: Operation, _cx: &Context) -> bool { |
162 | self.is_ready() |
163 | } |
164 | |
165 | #[inline ] |
166 | fn unwatch(&self, _oper: Operation) {} |
167 | } |
168 | |