1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{marker::Unpin, pin, pin::Pin, time::Duration};
4
5use futures_channel::{mpsc, oneshot};
6use futures_core::{
7 future::{FusedFuture, Future},
8 stream::{FusedStream, Stream},
9 task,
10 task::Poll,
11};
12
13use crate::{ControlFlow, MainContext, Priority, Source};
14
15// rustdoc-stripper-ignore-next
16/// Represents a `Future` around a `glib::Source`. The future will
17/// be resolved once the source has provided a value
18pub struct SourceFuture<F, T> {
19 create_source: Option<F>,
20 source: Option<(Source, oneshot::Receiver<T>)>,
21}
22
23impl<F, T: 'static> SourceFuture<F, T>
24where
25 F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
26{
27 // rustdoc-stripper-ignore-next
28 /// Create a new `SourceFuture`
29 ///
30 /// The provided closure should return a newly created `glib::Source` when called
31 /// and pass the value provided by the source to the oneshot sender that is passed
32 /// to the closure.
33 pub fn new(create_source: F) -> SourceFuture<F, T> {
34 SourceFuture {
35 create_source: Some(create_source),
36 source: None,
37 }
38 }
39}
40
41impl<F, T> Unpin for SourceFuture<F, T> {}
42
43impl<F, T> Future for SourceFuture<F, T>
44where
45 F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
46{
47 type Output = T;
48
49 fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<T> {
50 let SourceFuture {
51 ref mut create_source,
52 ref mut source,
53 ..
54 } = *self;
55
56 if let Some(create_source) = create_source.take() {
57 let main_context = MainContext::ref_thread_default();
58 assert!(
59 main_context.is_owner(),
60 "Spawning futures only allowed if the thread is owning the MainContext"
61 );
62
63 // Channel for sending back the Source result to our future here.
64 //
65 // In theory, we could directly continue polling the
66 // corresponding task from the Source callback,
67 // however this would break at the very least
68 // the g_main_current_source() API.
69 let (send, recv) = oneshot::channel();
70
71 let s = create_source(send);
72
73 s.attach(Some(&main_context));
74 *source = Some((s, recv));
75 }
76
77 // At this point we must have a receiver
78 let res = {
79 let &mut (_, ref mut receiver) = source.as_mut().unwrap();
80 Pin::new(receiver).poll(ctx)
81 };
82 #[allow(clippy::match_wild_err_arm)]
83 match res {
84 Poll::Ready(Err(_)) => panic!("Source sender was unexpectedly closed"),
85 Poll::Ready(Ok(v)) => {
86 // Get rid of the reference to the source, it triggered
87 let _ = source.take();
88 Poll::Ready(v)
89 }
90 Poll::Pending => Poll::Pending,
91 }
92 }
93}
94
95impl<F, T> FusedFuture for SourceFuture<F, T>
96where
97 F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
98{
99 fn is_terminated(&self) -> bool {
100 self.create_source.is_none()
101 && self
102 .source
103 .as_ref()
104 .map_or(default:true, |(_, receiver: &Receiver)| receiver.is_terminated())
105 }
106}
107
108impl<T, F> Drop for SourceFuture<T, F> {
109 fn drop(&mut self) {
110 // Get rid of the source, we don't care anymore if it still triggers
111 if let Some((source: Source, _)) = self.source.take() {
112 source.destroy();
113 }
114 }
115}
116
117// rustdoc-stripper-ignore-next
118/// Create a `Future` that will resolve after the given number of milliseconds.
119///
120/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
121pub fn timeout_future(value: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
122 timeout_future_with_priority(crate::Priority::default(), value)
123}
124
125// rustdoc-stripper-ignore-next
126/// Create a `Future` that will resolve after the given number of milliseconds.
127///
128/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
129pub fn timeout_future_with_priority(
130 priority: Priority,
131 value: Duration,
132) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
133 Box::pin(SourceFuture::new(create_source:move |send: Sender<()>| {
134 let mut send: Option> = Some(send);
135 crate::timeout_source_new(interval:value, name:None, priority, func:move || {
136 let _ = send.take().unwrap().send(());
137 ControlFlow::Break
138 })
139 }))
140}
141
142// rustdoc-stripper-ignore-next
143/// Create a `Future` that will resolve after the given number of seconds.
144///
145/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
146pub fn timeout_future_seconds(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
147 timeout_future_seconds_with_priority(crate::Priority::default(), value)
148}
149
150// rustdoc-stripper-ignore-next
151/// Create a `Future` that will resolve after the given number of seconds.
152///
153/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
154pub fn timeout_future_seconds_with_priority(
155 priority: Priority,
156 value: u32,
157) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
158 Box::pin(SourceFuture::new(create_source:move |send: Sender<()>| {
159 let mut send: Option> = Some(send);
160 crate::timeout_source_new_seconds(interval:value, name:None, priority, func:move || {
161 let _ = send.take().unwrap().send(());
162 ControlFlow::Break
163 })
164 }))
165}
166
167// rustdoc-stripper-ignore-next
168/// Create a `Future` that will resolve once the child process with the given pid exits
169///
170/// The `Future` will resolve to the pid of the child process and the exit code.
171///
172/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
173pub fn child_watch_future(
174 pid: crate::Pid,
175) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
176 child_watch_future_with_priority(crate::Priority::default(), pid)
177}
178
179// rustdoc-stripper-ignore-next
180/// Create a `Future` that will resolve once the child process with the given pid exits
181///
182/// The `Future` will resolve to the pid of the child process and the exit code.
183///
184/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
185pub fn child_watch_future_with_priority(
186 priority: Priority,
187 pid: crate::Pid,
188) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
189 Box::pin(SourceFuture::new(create_source:move |send: Sender<(Pid, i32)>| {
190 let mut send: Option> = Some(send);
191 crate::child_watch_source_new(pid, name:None, priority, func:move |pid: Pid, code: i32| {
192 let _ = send.take().unwrap().send((pid, code));
193 })
194 }))
195}
196
197#[cfg(any(unix, docsrs))]
198#[cfg_attr(docsrs, doc(cfg(unix)))]
199// rustdoc-stripper-ignore-next
200/// Create a `Future` that will resolve once the given UNIX signal is raised
201///
202/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
203pub fn unix_signal_future(signum: i32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
204 unix_signal_future_with_priority(crate::Priority::default(), signum)
205}
206
207#[cfg(any(unix, docsrs))]
208#[cfg_attr(docsrs, doc(cfg(unix)))]
209// rustdoc-stripper-ignore-next
210/// Create a `Future` that will resolve once the given UNIX signal is raised
211///
212/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
213pub fn unix_signal_future_with_priority(
214 priority: Priority,
215 signum: i32,
216) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
217 Box::pin(SourceFuture::new(create_source:move |send: Sender<()>| {
218 let mut send: Option> = Some(send);
219 crate::unix_signal_source_new(signum, name:None, priority, func:move || {
220 let _ = send.take().unwrap().send(());
221 ControlFlow::Break
222 })
223 }))
224}
225
226// rustdoc-stripper-ignore-next
227/// Represents a `Stream` around a `glib::Source`. The stream will
228/// be provide all values that are provided by the source
229pub struct SourceStream<F, T> {
230 create_source: Option<F>,
231 source: Option<(Source, mpsc::UnboundedReceiver<T>)>,
232}
233
234impl<F, T> Unpin for SourceStream<F, T> {}
235
236impl<F, T: 'static> SourceStream<F, T>
237where
238 F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
239{
240 // rustdoc-stripper-ignore-next
241 /// Create a new `SourceStream`
242 ///
243 /// The provided closure should return a newly created `glib::Source` when called
244 /// and pass the values provided by the source to the sender that is passed
245 /// to the closure.
246 pub fn new(create_source: F) -> SourceStream<F, T> {
247 SourceStream {
248 create_source: Some(create_source),
249 source: None,
250 }
251 }
252}
253
254impl<F, T> Stream for SourceStream<F, T>
255where
256 F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
257{
258 type Item = T;
259
260 fn poll_next(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Option<T>> {
261 let SourceStream {
262 ref mut create_source,
263 ref mut source,
264 ..
265 } = *self;
266
267 if let Some(create_source) = create_source.take() {
268 let main_context = MainContext::ref_thread_default();
269 assert!(
270 main_context.is_owner(),
271 "Spawning futures only allowed if the thread is owning the MainContext"
272 );
273
274 // Channel for sending back the Source result to our future here.
275 //
276 // In theory we could directly continue polling the
277 // corresponding task from the Source callback,
278 // however this would break at the very least
279 // the g_main_current_source() API.
280 let (send, recv) = mpsc::unbounded();
281
282 let s = create_source(send);
283
284 s.attach(Some(&main_context));
285 *source = Some((s, recv));
286 }
287
288 // At this point we must have a receiver
289 let res = {
290 let &mut (_, ref mut receiver) = source.as_mut().unwrap();
291 Pin::new(receiver).poll_next(ctx)
292 };
293 #[allow(clippy::match_wild_err_arm)]
294 match res {
295 Poll::Ready(v) => {
296 if v.is_none() {
297 // Get rid of the reference to the source, it triggered
298 let _ = source.take();
299 }
300 Poll::Ready(v)
301 }
302 Poll::Pending => Poll::Pending,
303 }
304 }
305}
306
307impl<F, T> FusedStream for SourceStream<F, T>
308where
309 F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
310{
311 fn is_terminated(&self) -> bool {
312 self.create_source.is_none()
313 && self
314 .source
315 .as_ref()
316 .map_or(default:true, |(_, receiver: &UnboundedReceiver)| receiver.is_terminated())
317 }
318}
319
320impl<T, F> Drop for SourceStream<T, F> {
321 fn drop(&mut self) {
322 // Get rid of the source, we don't care anymore if it still triggers
323 if let Some((source: Source, _)) = self.source.take() {
324 source.destroy();
325 }
326 }
327}
328
329// rustdoc-stripper-ignore-next
330/// Create a `Stream` that will provide a value every given number of milliseconds.
331///
332/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
333pub fn interval_stream(value: Duration) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
334 interval_stream_with_priority(crate::Priority::default(), value)
335}
336
337// rustdoc-stripper-ignore-next
338/// Create a `Stream` that will provide a value every given number of milliseconds.
339///
340/// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
341pub fn interval_stream_with_priority(
342 priority: Priority,
343 value: Duration,
344) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
345 Box::pin(SourceStream::new(create_source:move |send: UnboundedSender<()>| {
346 crate::timeout_source_new(interval:value, name:None, priority, func:move || {
347 if send.unbounded_send(()).is_err() {
348 ControlFlow::Break
349 } else {
350 ControlFlow::Continue
351 }
352 })
353 }))
354}
355
356// rustdoc-stripper-ignore-next
357/// Create a `Stream` that will provide a value every given number of seconds.
358///
359/// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
360pub fn interval_stream_seconds(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
361 interval_stream_seconds_with_priority(crate::Priority::default(), value)
362}
363
364// rustdoc-stripper-ignore-next
365/// Create a `Stream` that will provide a value every given number of seconds.
366///
367/// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
368pub fn interval_stream_seconds_with_priority(
369 priority: Priority,
370 value: u32,
371) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
372 Box::pin(SourceStream::new(create_source:move |send: UnboundedSender<()>| {
373 crate::timeout_source_new_seconds(interval:value, name:None, priority, func:move || {
374 if send.unbounded_send(()).is_err() {
375 ControlFlow::Break
376 } else {
377 ControlFlow::Continue
378 }
379 })
380 }))
381}
382
383#[cfg(any(unix, docsrs))]
384#[cfg_attr(docsrs, doc(cfg(unix)))]
385// rustdoc-stripper-ignore-next
386/// Create a `Stream` that will provide a value whenever the given UNIX signal is raised
387///
388/// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
389pub fn unix_signal_stream(signum: i32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
390 unix_signal_stream_with_priority(crate::Priority::default(), signum)
391}
392
393#[cfg(any(unix, docsrs))]
394#[cfg_attr(docsrs, doc(cfg(unix)))]
395// rustdoc-stripper-ignore-next
396/// Create a `Stream` that will provide a value whenever the given UNIX signal is raised
397///
398/// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
399pub fn unix_signal_stream_with_priority(
400 priority: Priority,
401 signum: i32,
402) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
403 Box::pin(SourceStream::new(create_source:move |send: UnboundedSender<()>| {
404 crate::unix_signal_source_new(signum, name:None, priority, func:move || {
405 if send.unbounded_send(()).is_err() {
406 ControlFlow::Break
407 } else {
408 ControlFlow::Continue
409 }
410 })
411 }))
412}
413
414#[cfg(test)]
415mod tests {
416 use std::{thread, time::Duration};
417
418 use futures_util::{future::FutureExt, stream::StreamExt};
419
420 use super::*;
421
422 #[test]
423 fn test_timeout() {
424 let c = MainContext::new();
425
426 c.block_on(timeout_future(Duration::from_millis(20)));
427 }
428
429 #[test]
430 fn test_timeout_send() {
431 let c = MainContext::new();
432 let l = crate::MainLoop::new(Some(&c), false);
433
434 let l_clone = l.clone();
435 c.spawn(timeout_future(Duration::from_millis(20)).then(move |()| {
436 l_clone.quit();
437 futures_util::future::ready(())
438 }));
439
440 l.run();
441 }
442
443 #[test]
444 fn test_interval() {
445 let c = MainContext::new();
446
447 let mut count = 0;
448
449 {
450 let count = &mut count;
451 c.block_on(
452 interval_stream(Duration::from_millis(20))
453 .take(2)
454 .for_each(|()| {
455 *count += 1;
456
457 futures_util::future::ready(())
458 })
459 .map(|_| ()),
460 );
461 }
462
463 assert_eq!(count, 2);
464 }
465
466 #[test]
467 fn test_timeout_and_channel() {
468 let c = MainContext::default();
469
470 let res = c.block_on(timeout_future(Duration::from_millis(20)).then(|()| {
471 let (sender, receiver) = oneshot::channel();
472
473 thread::spawn(move || {
474 sender.send(1).unwrap();
475 });
476
477 receiver.then(|i| futures_util::future::ready(i.unwrap()))
478 }));
479
480 assert_eq!(res, 1);
481 }
482}
483