1use core::future::{poll_fn, Future};
2use core::pin::{pin, Pin};
3use core::task::{Context, Poll};
4
5use futures_util::future::{select, Either};
6use futures_util::stream::FusedStream;
7use futures_util::Stream;
8
9use crate::{Duration, Instant};
10
11/// Error returned by [`with_timeout`] and [`with_deadline`] on timeout.
12#[derive(Debug, Clone, PartialEq, Eq)]
13#[cfg_attr(feature = "defmt", derive(defmt::Format))]
14pub struct TimeoutError;
15
16/// Runs a given future with a timeout.
17///
18/// If the future completes before the timeout, its output is returned. Otherwise, on timeout,
19/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
20pub async fn with_timeout<F: Future>(timeout: Duration, fut: F) -> Result<F::Output, TimeoutError> {
21 let timeout_fut: Timer = Timer::after(duration:timeout);
22 match select(future1:pin!(fut), future2:timeout_fut).await {
23 Either::Left((r: ::Output, _)) => Ok(r),
24 Either::Right(_) => Err(TimeoutError),
25 }
26}
27
28/// Runs a given future with a deadline time.
29///
30/// If the future completes before the deadline, its output is returned. Otherwise, on timeout,
31/// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
32pub async fn with_deadline<F: Future>(at: Instant, fut: F) -> Result<F::Output, TimeoutError> {
33 let timeout_fut: Timer = Timer::at(at);
34 match select(future1:pin!(fut), future2:timeout_fut).await {
35 Either::Left((r: ::Output, _)) => Ok(r),
36 Either::Right(_) => Err(TimeoutError),
37 }
38}
39
40/// Provides functions to run a given future with a timeout or a deadline.
41pub trait WithTimeout {
42 /// Output type of the future.
43 type Output;
44
45 /// Runs a given future with a timeout.
46 ///
47 /// If the future completes before the timeout, its output is returned. Otherwise, on timeout,
48 /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
49 async fn with_timeout(self, timeout: Duration) -> Result<Self::Output, TimeoutError>;
50
51 /// Runs a given future with a deadline time.
52 ///
53 /// If the future completes before the deadline, its output is returned. Otherwise, on timeout,
54 /// work on the future is stopped (`poll` is no longer called), the future is dropped and `Err(TimeoutError)` is returned.
55 async fn with_deadline(self, at: Instant) -> Result<Self::Output, TimeoutError>;
56}
57
58impl<F: Future> WithTimeout for F {
59 type Output = F::Output;
60
61 async fn with_timeout(self, timeout: Duration) -> Result<Self::Output, TimeoutError> {
62 with_timeout(timeout, self).await
63 }
64
65 async fn with_deadline(self, at: Instant) -> Result<Self::Output, TimeoutError> {
66 with_deadline(at, self).await
67 }
68}
69
70/// A future that completes at a specified [Instant](struct.Instant.html).
71#[must_use = "futures do nothing unless you `.await` or poll them"]
72pub struct Timer {
73 expires_at: Instant,
74 yielded_once: bool,
75}
76
77impl Timer {
78 /// Expire at specified [Instant](struct.Instant.html)
79 pub fn at(expires_at: Instant) -> Self {
80 Self {
81 expires_at,
82 yielded_once: false,
83 }
84 }
85
86 /// Expire after specified [Duration](struct.Duration.html).
87 /// This can be used as a `sleep` abstraction.
88 ///
89 /// Example:
90 /// ``` no_run
91 /// use embassy_time::{Duration, Timer};
92 ///
93 /// #[embassy_executor::task]
94 /// async fn demo_sleep_seconds() {
95 /// // suspend this task for one second.
96 /// Timer::after(Duration::from_secs(1)).await;
97 /// }
98 /// ```
99 pub fn after(duration: Duration) -> Self {
100 Self {
101 expires_at: Instant::now() + duration,
102 yielded_once: false,
103 }
104 }
105
106 /// Expire after the specified number of ticks.
107 ///
108 /// This method is a convenience wrapper for calling `Timer::after(Duration::from_ticks())`.
109 /// For more details, refer to [`Timer::after()`] and [`Duration::from_ticks()`].
110 #[inline]
111 pub fn after_ticks(ticks: u64) -> Self {
112 Self::after(Duration::from_ticks(ticks))
113 }
114
115 /// Expire after the specified number of nanoseconds.
116 ///
117 /// This method is a convenience wrapper for calling `Timer::after(Duration::from_nanos())`.
118 /// For more details, refer to [`Timer::after()`] and [`Duration::from_nanos()`].
119 #[inline]
120 pub fn after_nanos(nanos: u64) -> Self {
121 Self::after(Duration::from_nanos(nanos))
122 }
123
124 /// Expire after the specified number of microseconds.
125 ///
126 /// This method is a convenience wrapper for calling `Timer::after(Duration::from_micros())`.
127 /// For more details, refer to [`Timer::after()`] and [`Duration::from_micros()`].
128 #[inline]
129 pub fn after_micros(micros: u64) -> Self {
130 Self::after(Duration::from_micros(micros))
131 }
132
133 /// Expire after the specified number of milliseconds.
134 ///
135 /// This method is a convenience wrapper for calling `Timer::after(Duration::from_millis())`.
136 /// For more details, refer to [`Timer::after`] and [`Duration::from_millis()`].
137 #[inline]
138 pub fn after_millis(millis: u64) -> Self {
139 Self::after(Duration::from_millis(millis))
140 }
141
142 /// Expire after the specified number of seconds.
143 ///
144 /// This method is a convenience wrapper for calling `Timer::after(Duration::from_secs())`.
145 /// For more details, refer to [`Timer::after`] and [`Duration::from_secs()`].
146 #[inline]
147 pub fn after_secs(secs: u64) -> Self {
148 Self::after(Duration::from_secs(secs))
149 }
150}
151
152impl Unpin for Timer {}
153
154impl Future for Timer {
155 type Output = ();
156 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
157 if self.yielded_once && self.expires_at <= Instant::now() {
158 Poll::Ready(())
159 } else {
160 embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
161 self.yielded_once = true;
162 Poll::Pending
163 }
164 }
165}
166
167/// Asynchronous stream that yields every Duration, indefinitely.
168///
169/// This stream will tick at uniform intervals, even if blocking work is performed between ticks.
170///
171/// For instance, consider the following code fragment.
172/// ``` no_run
173/// use embassy_time::{Duration, Timer};
174/// # fn foo() {}
175///
176/// #[embassy_executor::task]
177/// async fn ticker_example_0() {
178/// loop {
179/// foo();
180/// Timer::after(Duration::from_secs(1)).await;
181/// }
182/// }
183/// ```
184///
185/// This fragment will not call `foo` every second.
186/// Instead, it will call it every second + the time it took to previously call `foo`.
187///
188/// Example using ticker, which will consistently call `foo` once a second.
189///
190/// ``` no_run
191/// use embassy_time::{Duration, Ticker};
192/// # fn foo(){}
193///
194/// #[embassy_executor::task]
195/// async fn ticker_example_1() {
196/// let mut ticker = Ticker::every(Duration::from_secs(1));
197/// loop {
198/// foo();
199/// ticker.next().await;
200/// }
201/// }
202/// ```
203pub struct Ticker {
204 expires_at: Instant,
205 duration: Duration,
206}
207
208impl Ticker {
209 /// Creates a new ticker that ticks at the specified duration interval.
210 pub fn every(duration: Duration) -> Self {
211 let expires_at = Instant::now() + duration;
212 Self { expires_at, duration }
213 }
214
215 /// Resets the ticker back to its original state.
216 /// This causes the ticker to go back to zero, even if the current tick isn't over yet.
217 pub fn reset(&mut self) {
218 self.expires_at = Instant::now() + self.duration;
219 }
220
221 /// Reset the ticker at the deadline.
222 /// If the deadline is in the past, the ticker will fire instantly.
223 pub fn reset_at(&mut self, deadline: Instant) {
224 self.expires_at = deadline + self.duration;
225 }
226
227 /// Resets the ticker, after the specified duration has passed.
228 /// If the specified duration is zero, the next tick will be after the duration of the ticker.
229 pub fn reset_after(&mut self, after: Duration) {
230 self.expires_at = Instant::now() + after + self.duration;
231 }
232
233 /// Waits for the next tick.
234 pub fn next(&mut self) -> impl Future<Output = ()> + Send + Sync + '_ {
235 poll_fn(|cx| {
236 if self.expires_at <= Instant::now() {
237 let dur = self.duration;
238 self.expires_at += dur;
239 Poll::Ready(())
240 } else {
241 embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
242 Poll::Pending
243 }
244 })
245 }
246}
247
248impl Unpin for Ticker {}
249
250impl Stream for Ticker {
251 type Item = ();
252 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
253 if self.expires_at <= Instant::now() {
254 let dur: Duration = self.duration;
255 self.expires_at += dur;
256 Poll::Ready(Some(()))
257 } else {
258 embassy_time_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
259 Poll::Pending
260 }
261 }
262}
263
264impl FusedStream for Ticker {
265 fn is_terminated(&self) -> bool {
266 // `Ticker` keeps yielding values until dropped, it never terminates.
267 false
268 }
269}
270