1//! Channel that delivers messages periodically.
2//!
3//! Messages cannot be sent into this kind of channel; they are materialized on demand.
4
5use std::thread;
6use std::time::{Duration, Instant};
7
8use crossbeam_utils::atomic::AtomicCell;
9
10use crate::context::Context;
11use crate::err::{RecvTimeoutError, TryRecvError};
12use crate::select::{Operation, SelectHandle, Token};
13
14/// Result of a receive operation.
15pub(crate) type TickToken = Option<Instant>;
16
17/// Channel that delivers messages periodically.
18pub(crate) struct Channel {
19 /// The instant at which the next message will be delivered.
20 delivery_time: AtomicCell<Instant>,
21
22 /// The time interval in which messages get delivered.
23 duration: Duration,
24}
25
26impl Channel {
27 /// Creates a channel that delivers messages periodically.
28 #[inline]
29 pub(crate) fn new(delivery_time: Instant, dur: Duration) -> Self {
30 Channel {
31 delivery_time: AtomicCell::new(delivery_time),
32 duration: dur,
33 }
34 }
35
36 /// Attempts to receive a message without blocking.
37 #[inline]
38 pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39 loop {
40 let now = Instant::now();
41 let delivery_time = self.delivery_time.load();
42
43 if now < delivery_time {
44 return Err(TryRecvError::Empty);
45 }
46
47 if self
48 .delivery_time
49 .compare_exchange(delivery_time, now + self.duration)
50 .is_ok()
51 {
52 return Ok(delivery_time);
53 }
54 }
55 }
56
57 /// Receives a message from the channel.
58 #[inline]
59 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
60 loop {
61 let delivery_time = self.delivery_time.load();
62 let now = Instant::now();
63
64 if let Some(d) = deadline {
65 if d < delivery_time {
66 if now < d {
67 thread::sleep(d - now);
68 }
69 return Err(RecvTimeoutError::Timeout);
70 }
71 }
72
73 if self
74 .delivery_time
75 .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
76 .is_ok()
77 {
78 if now < delivery_time {
79 thread::sleep(delivery_time - now);
80 }
81 return Ok(delivery_time);
82 }
83 }
84 }
85
86 /// Reads a message from the channel.
87 #[inline]
88 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
89 token.tick.ok_or(())
90 }
91
92 /// Returns `true` if the channel is empty.
93 #[inline]
94 pub(crate) fn is_empty(&self) -> bool {
95 Instant::now() < self.delivery_time.load()
96 }
97
98 /// Returns `true` if the channel is full.
99 #[inline]
100 pub(crate) fn is_full(&self) -> bool {
101 !self.is_empty()
102 }
103
104 /// Returns the number of messages in the channel.
105 #[inline]
106 pub(crate) fn len(&self) -> usize {
107 if self.is_empty() {
108 0
109 } else {
110 1
111 }
112 }
113
114 /// Returns the capacity of the channel.
115 #[inline]
116 pub(crate) fn capacity(&self) -> Option<usize> {
117 Some(1)
118 }
119}
120
121impl SelectHandle for Channel {
122 #[inline]
123 fn try_select(&self, token: &mut Token) -> bool {
124 match self.try_recv() {
125 Ok(msg) => {
126 token.tick = Some(msg);
127 true
128 }
129 Err(TryRecvError::Disconnected) => {
130 token.tick = None;
131 true
132 }
133 Err(TryRecvError::Empty) => false,
134 }
135 }
136
137 #[inline]
138 fn deadline(&self) -> Option<Instant> {
139 Some(self.delivery_time.load())
140 }
141
142 #[inline]
143 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
144 self.is_ready()
145 }
146
147 #[inline]
148 fn unregister(&self, _oper: Operation) {}
149
150 #[inline]
151 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
152 self.try_select(token)
153 }
154
155 #[inline]
156 fn is_ready(&self) -> bool {
157 !self.is_empty()
158 }
159
160 #[inline]
161 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
162 self.is_ready()
163 }
164
165 #[inline]
166 fn unwatch(&self, _oper: Operation) {}
167}
168