1 | use core::future::{poll_fn, Future}; |
2 | use core::pin::{pin, Pin}; |
3 | use core::task::{Context, Poll}; |
4 | |
5 | use futures_util::future::{select, Either}; |
6 | use futures_util::stream::FusedStream; |
7 | use futures_util::Stream; |
8 | |
9 | use crate::{Duration, Instant}; |
10 | |
11 | /// Error returned by [`with_timeout`] and [`with_deadline`] on timeout. |
12 | #[derive (Debug, Clone, PartialEq, Eq)] |
13 | #[cfg_attr (feature = "defmt" , derive(defmt::Format))] |
14 | pub struct TimeoutError; |
15 | |
16 | /// Runs a given future with a timeout. |
17 | /// |
18 | /// If the future completes before the timeout, its output is returned. Otherwise, on timeout, |
19 | /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. |
20 | pub async fn with_timeout<F: Future>(timeout: Duration, fut: F) -> Result<F::Output, TimeoutError> { |
21 | let timeout_fut: Timer = Timer::after(duration:timeout); |
22 | match select(future1:pin!(fut), future2:timeout_fut).await { |
23 | Either::Left((r: ::Output, _)) => Ok(r), |
24 | Either::Right(_) => Err(TimeoutError), |
25 | } |
26 | } |
27 | |
28 | /// Runs a given future with a deadline time. |
29 | /// |
30 | /// If the future completes before the deadline, its output is returned. Otherwise, on timeout, |
31 | /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. |
32 | pub async fn with_deadline<F: Future>(at: Instant, fut: F) -> Result<F::Output, TimeoutError> { |
33 | let timeout_fut: Timer = Timer::at(at); |
34 | match select(future1:pin!(fut), future2:timeout_fut).await { |
35 | Either::Left((r: ::Output, _)) => Ok(r), |
36 | Either::Right(_) => Err(TimeoutError), |
37 | } |
38 | } |
39 | |
40 | /// Provides functions to run a given future with a timeout or a deadline. |
41 | pub trait WithTimeout { |
42 | /// Output type of the future. |
43 | type Output; |
44 | |
45 | /// Runs a given future with a timeout. |
46 | /// |
47 | /// If the future completes before the timeout, its output is returned. Otherwise, on timeout, |
48 | /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. |
49 | async fn with_timeout(self, timeout: Duration) -> Result<Self::Output, TimeoutError>; |
50 | |
51 | /// Runs a given future with a deadline time. |
52 | /// |
53 | /// If the future completes before the deadline, its output is returned. Otherwise, on timeout, |
54 | /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned. |
55 | async fn with_deadline(self, at: Instant) -> Result<Self::Output, TimeoutError>; |
56 | } |
57 | |
58 | impl<F: Future> WithTimeout for F { |
59 | type Output = F::Output; |
60 | |
61 | async fn with_timeout(self, timeout: Duration) -> Result<Self::Output, TimeoutError> { |
62 | with_timeout(timeout, self).await |
63 | } |
64 | |
65 | async fn with_deadline(self, at: Instant) -> Result<Self::Output, TimeoutError> { |
66 | with_deadline(at, self).await |
67 | } |
68 | } |
69 | |
70 | /// A future that completes at a specified [Instant](struct.Instant.html). |
71 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
72 | pub struct Timer { |
73 | expires_at: Instant, |
74 | yielded_once: bool, |
75 | } |
76 | |
77 | impl Timer { |
78 | /// Expire at specified [Instant](struct.Instant.html) |
79 | pub fn at(expires_at: Instant) -> Self { |
80 | Self { |
81 | expires_at, |
82 | yielded_once: false, |
83 | } |
84 | } |
85 | |
86 | /// Expire after specified [Duration](struct.Duration.html). |
87 | /// This can be used as a `sleep` abstraction. |
88 | /// |
89 | /// Example: |
90 | /// ``` no_run |
91 | /// use embassy_time::{Duration, Timer}; |
92 | /// |
93 | /// #[embassy_executor::task] |
94 | /// async fn demo_sleep_seconds() { |
95 | /// // suspend this task for one second. |
96 | /// Timer::after(Duration::from_secs(1)).await; |
97 | /// } |
98 | /// ``` |
99 | pub fn after(duration: Duration) -> Self { |
100 | Self { |
101 | expires_at: Instant::now() + duration, |
102 | yielded_once: false, |
103 | } |
104 | } |
105 | |
106 | /// Expire after the specified number of ticks. |
107 | /// |
108 | /// This method is a convenience wrapper for calling `Timer::after(Duration::from_ticks())`. |
109 | /// For more details, refer to [`Timer::after()`] and [`Duration::from_ticks()`]. |
110 | #[inline ] |
111 | pub fn after_ticks(ticks: u64) -> Self { |
112 | Self::after(Duration::from_ticks(ticks)) |
113 | } |
114 | |
115 | /// Expire after the specified number of nanoseconds. |
116 | /// |
117 | /// This method is a convenience wrapper for calling `Timer::after(Duration::from_nanos())`. |
118 | /// For more details, refer to [`Timer::after()`] and [`Duration::from_nanos()`]. |
119 | #[inline ] |
120 | pub fn after_nanos(nanos: u64) -> Self { |
121 | Self::after(Duration::from_nanos(nanos)) |
122 | } |
123 | |
124 | /// Expire after the specified number of microseconds. |
125 | /// |
126 | /// This method is a convenience wrapper for calling `Timer::after(Duration::from_micros())`. |
127 | /// For more details, refer to [`Timer::after()`] and [`Duration::from_micros()`]. |
128 | #[inline ] |
129 | pub fn after_micros(micros: u64) -> Self { |
130 | Self::after(Duration::from_micros(micros)) |
131 | } |
132 | |
133 | /// Expire after the specified number of milliseconds. |
134 | /// |
135 | /// This method is a convenience wrapper for calling `Timer::after(Duration::from_millis())`. |
136 | /// For more details, refer to [`Timer::after`] and [`Duration::from_millis()`]. |
137 | #[inline ] |
138 | pub fn after_millis(millis: u64) -> Self { |
139 | Self::after(Duration::from_millis(millis)) |
140 | } |
141 | |
142 | /// Expire after the specified number of seconds. |
143 | /// |
144 | /// This method is a convenience wrapper for calling `Timer::after(Duration::from_secs())`. |
145 | /// For more details, refer to [`Timer::after`] and [`Duration::from_secs()`]. |
146 | #[inline ] |
147 | pub fn after_secs(secs: u64) -> Self { |
148 | Self::after(Duration::from_secs(secs)) |
149 | } |
150 | } |
151 | |
152 | impl Unpin for Timer {} |
153 | |
154 | impl Future for Timer { |
155 | type Output = (); |
156 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
157 | if self.yielded_once && self.expires_at <= Instant::now() { |
158 | Poll::Ready(()) |
159 | } else { |
160 | embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker()); |
161 | self.yielded_once = true; |
162 | Poll::Pending |
163 | } |
164 | } |
165 | } |
166 | |
167 | /// Asynchronous stream that yields every Duration, indefinitely. |
168 | /// |
169 | /// This stream will tick at uniform intervals, even if blocking work is performed between ticks. |
170 | /// |
171 | /// For instance, consider the following code fragment. |
172 | /// ``` no_run |
173 | /// use embassy_time::{Duration, Timer}; |
174 | /// # fn foo() {} |
175 | /// |
176 | /// #[embassy_executor::task] |
177 | /// async fn ticker_example_0() { |
178 | /// loop { |
179 | /// foo(); |
180 | /// Timer::after(Duration::from_secs(1)).await; |
181 | /// } |
182 | /// } |
183 | /// ``` |
184 | /// |
185 | /// This fragment will not call `foo` every second. |
186 | /// Instead, it will call it every second + the time it took to previously call `foo`. |
187 | /// |
188 | /// Example using ticker, which will consistently call `foo` once a second. |
189 | /// |
190 | /// ``` no_run |
191 | /// use embassy_time::{Duration, Ticker}; |
192 | /// # fn foo(){} |
193 | /// |
194 | /// #[embassy_executor::task] |
195 | /// async fn ticker_example_1() { |
196 | /// let mut ticker = Ticker::every(Duration::from_secs(1)); |
197 | /// loop { |
198 | /// foo(); |
199 | /// ticker.next().await; |
200 | /// } |
201 | /// } |
202 | /// ``` |
203 | pub struct Ticker { |
204 | expires_at: Instant, |
205 | duration: Duration, |
206 | } |
207 | |
208 | impl Ticker { |
209 | /// Creates a new ticker that ticks at the specified duration interval. |
210 | pub fn every(duration: Duration) -> Self { |
211 | let expires_at = Instant::now() + duration; |
212 | Self { expires_at, duration } |
213 | } |
214 | |
215 | /// Resets the ticker back to its original state. |
216 | /// This causes the ticker to go back to zero, even if the current tick isn't over yet. |
217 | pub fn reset(&mut self) { |
218 | self.expires_at = Instant::now() + self.duration; |
219 | } |
220 | |
221 | /// Reset the ticker at the deadline. |
222 | /// If the deadline is in the past, the ticker will fire instantly. |
223 | pub fn reset_at(&mut self, deadline: Instant) { |
224 | self.expires_at = deadline + self.duration; |
225 | } |
226 | |
227 | /// Resets the ticker, after the specified duration has passed. |
228 | /// If the specified duration is zero, the next tick will be after the duration of the ticker. |
229 | pub fn reset_after(&mut self, after: Duration) { |
230 | self.expires_at = Instant::now() + after + self.duration; |
231 | } |
232 | |
233 | /// Waits for the next tick. |
234 | pub fn next(&mut self) -> impl Future<Output = ()> + Send + Sync + '_ { |
235 | poll_fn(|cx| { |
236 | if self.expires_at <= Instant::now() { |
237 | let dur = self.duration; |
238 | self.expires_at += dur; |
239 | Poll::Ready(()) |
240 | } else { |
241 | embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker()); |
242 | Poll::Pending |
243 | } |
244 | }) |
245 | } |
246 | } |
247 | |
248 | impl Unpin for Ticker {} |
249 | |
250 | impl Stream for Ticker { |
251 | type Item = (); |
252 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
253 | if self.expires_at <= Instant::now() { |
254 | let dur: Duration = self.duration; |
255 | self.expires_at += dur; |
256 | Poll::Ready(Some(())) |
257 | } else { |
258 | embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker()); |
259 | Poll::Pending |
260 | } |
261 | } |
262 | } |
263 | |
264 | impl FusedStream for Ticker { |
265 | fn is_terminated(&self) -> bool { |
266 | // `Ticker` keeps yielding values until dropped, it never terminates. |
267 | false |
268 | } |
269 | } |
270 | |