1 | // Take a look at the license at the top of the repository in the LICENSE file. |
2 | |
3 | use std::{marker::Unpin, pin, pin::Pin, time::Duration}; |
4 | |
5 | use futures_channel::{mpsc, oneshot}; |
6 | use futures_core::{ |
7 | future::{FusedFuture, Future}, |
8 | stream::{FusedStream, Stream}, |
9 | task, |
10 | task::Poll, |
11 | }; |
12 | |
13 | use crate::{ControlFlow, MainContext, Priority, Source}; |
14 | |
15 | // rustdoc-stripper-ignore-next |
16 | /// Represents a `Future` around a `glib::Source`. The future will |
17 | /// be resolved once the source has provided a value |
18 | pub struct SourceFuture<F, T> { |
19 | create_source: Option<F>, |
20 | source: Option<(Source, oneshot::Receiver<T>)>, |
21 | } |
22 | |
23 | impl<F, T: 'static> SourceFuture<F, T> |
24 | where |
25 | F: FnOnce(oneshot::Sender<T>) -> Source + 'static, |
26 | { |
27 | // rustdoc-stripper-ignore-next |
28 | /// Create a new `SourceFuture` |
29 | /// |
30 | /// The provided closure should return a newly created `glib::Source` when called |
31 | /// and pass the value provided by the source to the oneshot sender that is passed |
32 | /// to the closure. |
33 | pub fn new(create_source: F) -> SourceFuture<F, T> { |
34 | SourceFuture { |
35 | create_source: Some(create_source), |
36 | source: None, |
37 | } |
38 | } |
39 | } |
40 | |
41 | impl<F, T> Unpin for SourceFuture<F, T> {} |
42 | |
43 | impl<F, T> Future for SourceFuture<F, T> |
44 | where |
45 | F: FnOnce(oneshot::Sender<T>) -> Source + 'static, |
46 | { |
47 | type Output = T; |
48 | |
49 | fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<T> { |
50 | let SourceFuture { |
51 | ref mut create_source, |
52 | ref mut source, |
53 | .. |
54 | } = *self; |
55 | |
56 | if let Some(create_source) = create_source.take() { |
57 | let main_context = MainContext::ref_thread_default(); |
58 | assert!( |
59 | main_context.is_owner(), |
60 | "Spawning futures only allowed if the thread is owning the MainContext" |
61 | ); |
62 | |
63 | // Channel for sending back the Source result to our future here. |
64 | // |
65 | // In theory, we could directly continue polling the |
66 | // corresponding task from the Source callback, |
67 | // however this would break at the very least |
68 | // the g_main_current_source() API. |
69 | let (send, recv) = oneshot::channel(); |
70 | |
71 | let s = create_source(send); |
72 | |
73 | s.attach(Some(&main_context)); |
74 | *source = Some((s, recv)); |
75 | } |
76 | |
77 | // At this point we must have a receiver |
78 | let res = { |
79 | let &mut (_, ref mut receiver) = source.as_mut().unwrap(); |
80 | Pin::new(receiver).poll(ctx) |
81 | }; |
82 | #[allow (clippy::match_wild_err_arm)] |
83 | match res { |
84 | Poll::Ready(Err(_)) => panic!("Source sender was unexpectedly closed" ), |
85 | Poll::Ready(Ok(v)) => { |
86 | // Get rid of the reference to the source, it triggered |
87 | let _ = source.take(); |
88 | Poll::Ready(v) |
89 | } |
90 | Poll::Pending => Poll::Pending, |
91 | } |
92 | } |
93 | } |
94 | |
95 | impl<F, T> FusedFuture for SourceFuture<F, T> |
96 | where |
97 | F: FnOnce(oneshot::Sender<T>) -> Source + 'static, |
98 | { |
99 | fn is_terminated(&self) -> bool { |
100 | self.create_source.is_none() |
101 | && self |
102 | .source |
103 | .as_ref() |
104 | .map_or(default:true, |(_, receiver: &Receiver)| receiver.is_terminated()) |
105 | } |
106 | } |
107 | |
108 | impl<T, F> Drop for SourceFuture<T, F> { |
109 | fn drop(&mut self) { |
110 | // Get rid of the source, we don't care anymore if it still triggers |
111 | if let Some((source: Source, _)) = self.source.take() { |
112 | source.destroy(); |
113 | } |
114 | } |
115 | } |
116 | |
117 | // rustdoc-stripper-ignore-next |
118 | /// Create a `Future` that will resolve after the given number of milliseconds. |
119 | /// |
120 | /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`. |
121 | pub fn timeout_future(value: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { |
122 | timeout_future_with_priority(crate::Priority::default(), value) |
123 | } |
124 | |
125 | // rustdoc-stripper-ignore-next |
126 | /// Create a `Future` that will resolve after the given number of milliseconds. |
127 | /// |
128 | /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`. |
129 | pub fn timeout_future_with_priority( |
130 | priority: Priority, |
131 | value: Duration, |
132 | ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { |
133 | Box::pin(SourceFuture::new(create_source:move |send: Sender<()>| { |
134 | let mut send: Option> = Some(send); |
135 | crate::timeout_source_new(interval:value, name:None, priority, func:move || { |
136 | let _ = send.take().unwrap().send(()); |
137 | ControlFlow::Break |
138 | }) |
139 | })) |
140 | } |
141 | |
142 | // rustdoc-stripper-ignore-next |
143 | /// Create a `Future` that will resolve after the given number of seconds. |
144 | /// |
145 | /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`. |
146 | pub fn timeout_future_seconds(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { |
147 | timeout_future_seconds_with_priority(crate::Priority::default(), value) |
148 | } |
149 | |
150 | // rustdoc-stripper-ignore-next |
151 | /// Create a `Future` that will resolve after the given number of seconds. |
152 | /// |
153 | /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`. |
154 | pub fn timeout_future_seconds_with_priority( |
155 | priority: Priority, |
156 | value: u32, |
157 | ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { |
158 | Box::pin(SourceFuture::new(create_source:move |send: Sender<()>| { |
159 | let mut send: Option> = Some(send); |
160 | crate::timeout_source_new_seconds(interval:value, name:None, priority, func:move || { |
161 | let _ = send.take().unwrap().send(()); |
162 | ControlFlow::Break |
163 | }) |
164 | })) |
165 | } |
166 | |
167 | // rustdoc-stripper-ignore-next |
168 | /// Create a `Future` that will resolve once the child process with the given pid exits |
169 | /// |
170 | /// The `Future` will resolve to the pid of the child process and the exit code. |
171 | /// |
172 | /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`. |
173 | pub fn child_watch_future( |
174 | pid: crate::Pid, |
175 | ) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> { |
176 | child_watch_future_with_priority(crate::Priority::default(), pid) |
177 | } |
178 | |
179 | // rustdoc-stripper-ignore-next |
180 | /// Create a `Future` that will resolve once the child process with the given pid exits |
181 | /// |
182 | /// The `Future` will resolve to the pid of the child process and the exit code. |
183 | /// |
184 | /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`. |
185 | pub fn child_watch_future_with_priority( |
186 | priority: Priority, |
187 | pid: crate::Pid, |
188 | ) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> { |
189 | Box::pin(SourceFuture::new(create_source:move |send: Sender<(Pid, i32)>| { |
190 | let mut send: Option> = Some(send); |
191 | crate::child_watch_source_new(pid, name:None, priority, func:move |pid: Pid, code: i32| { |
192 | let _ = send.take().unwrap().send((pid, code)); |
193 | }) |
194 | })) |
195 | } |
196 | |
197 | #[cfg (any(unix, docsrs))] |
198 | #[cfg_attr (docsrs, doc(cfg(unix)))] |
199 | // rustdoc-stripper-ignore-next |
200 | /// Create a `Future` that will resolve once the given UNIX signal is raised |
201 | /// |
202 | /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`. |
203 | pub fn unix_signal_future(signum: i32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { |
204 | unix_signal_future_with_priority(crate::Priority::default(), signum) |
205 | } |
206 | |
207 | #[cfg (any(unix, docsrs))] |
208 | #[cfg_attr (docsrs, doc(cfg(unix)))] |
209 | // rustdoc-stripper-ignore-next |
210 | /// Create a `Future` that will resolve once the given UNIX signal is raised |
211 | /// |
212 | /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`. |
213 | pub fn unix_signal_future_with_priority( |
214 | priority: Priority, |
215 | signum: i32, |
216 | ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { |
217 | Box::pin(SourceFuture::new(create_source:move |send: Sender<()>| { |
218 | let mut send: Option> = Some(send); |
219 | crate::unix_signal_source_new(signum, name:None, priority, func:move || { |
220 | let _ = send.take().unwrap().send(()); |
221 | ControlFlow::Break |
222 | }) |
223 | })) |
224 | } |
225 | |
226 | // rustdoc-stripper-ignore-next |
227 | /// Represents a `Stream` around a `glib::Source`. The stream will |
228 | /// be provide all values that are provided by the source |
229 | pub struct SourceStream<F, T> { |
230 | create_source: Option<F>, |
231 | source: Option<(Source, mpsc::UnboundedReceiver<T>)>, |
232 | } |
233 | |
234 | impl<F, T> Unpin for SourceStream<F, T> {} |
235 | |
236 | impl<F, T: 'static> SourceStream<F, T> |
237 | where |
238 | F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static, |
239 | { |
240 | // rustdoc-stripper-ignore-next |
241 | /// Create a new `SourceStream` |
242 | /// |
243 | /// The provided closure should return a newly created `glib::Source` when called |
244 | /// and pass the values provided by the source to the sender that is passed |
245 | /// to the closure. |
246 | pub fn new(create_source: F) -> SourceStream<F, T> { |
247 | SourceStream { |
248 | create_source: Some(create_source), |
249 | source: None, |
250 | } |
251 | } |
252 | } |
253 | |
254 | impl<F, T> Stream for SourceStream<F, T> |
255 | where |
256 | F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static, |
257 | { |
258 | type Item = T; |
259 | |
260 | fn poll_next(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Option<T>> { |
261 | let SourceStream { |
262 | ref mut create_source, |
263 | ref mut source, |
264 | .. |
265 | } = *self; |
266 | |
267 | if let Some(create_source) = create_source.take() { |
268 | let main_context = MainContext::ref_thread_default(); |
269 | assert!( |
270 | main_context.is_owner(), |
271 | "Spawning futures only allowed if the thread is owning the MainContext" |
272 | ); |
273 | |
274 | // Channel for sending back the Source result to our future here. |
275 | // |
276 | // In theory we could directly continue polling the |
277 | // corresponding task from the Source callback, |
278 | // however this would break at the very least |
279 | // the g_main_current_source() API. |
280 | let (send, recv) = mpsc::unbounded(); |
281 | |
282 | let s = create_source(send); |
283 | |
284 | s.attach(Some(&main_context)); |
285 | *source = Some((s, recv)); |
286 | } |
287 | |
288 | // At this point we must have a receiver |
289 | let res = { |
290 | let &mut (_, ref mut receiver) = source.as_mut().unwrap(); |
291 | Pin::new(receiver).poll_next(ctx) |
292 | }; |
293 | #[allow (clippy::match_wild_err_arm)] |
294 | match res { |
295 | Poll::Ready(v) => { |
296 | if v.is_none() { |
297 | // Get rid of the reference to the source, it triggered |
298 | let _ = source.take(); |
299 | } |
300 | Poll::Ready(v) |
301 | } |
302 | Poll::Pending => Poll::Pending, |
303 | } |
304 | } |
305 | } |
306 | |
307 | impl<F, T> FusedStream for SourceStream<F, T> |
308 | where |
309 | F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static, |
310 | { |
311 | fn is_terminated(&self) -> bool { |
312 | self.create_source.is_none() |
313 | && self |
314 | .source |
315 | .as_ref() |
316 | .map_or(default:true, |(_, receiver: &UnboundedReceiver)| receiver.is_terminated()) |
317 | } |
318 | } |
319 | |
320 | impl<T, F> Drop for SourceStream<T, F> { |
321 | fn drop(&mut self) { |
322 | // Get rid of the source, we don't care anymore if it still triggers |
323 | if let Some((source: Source, _)) = self.source.take() { |
324 | source.destroy(); |
325 | } |
326 | } |
327 | } |
328 | |
329 | // rustdoc-stripper-ignore-next |
330 | /// Create a `Stream` that will provide a value every given number of milliseconds. |
331 | /// |
332 | /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`. |
333 | pub fn interval_stream(value: Duration) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> { |
334 | interval_stream_with_priority(crate::Priority::default(), value) |
335 | } |
336 | |
337 | // rustdoc-stripper-ignore-next |
338 | /// Create a `Stream` that will provide a value every given number of milliseconds. |
339 | /// |
340 | /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`. |
341 | pub fn interval_stream_with_priority( |
342 | priority: Priority, |
343 | value: Duration, |
344 | ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> { |
345 | Box::pin(SourceStream::new(create_source:move |send: UnboundedSender<()>| { |
346 | crate::timeout_source_new(interval:value, name:None, priority, func:move || { |
347 | if send.unbounded_send(()).is_err() { |
348 | ControlFlow::Break |
349 | } else { |
350 | ControlFlow::Continue |
351 | } |
352 | }) |
353 | })) |
354 | } |
355 | |
356 | // rustdoc-stripper-ignore-next |
357 | /// Create a `Stream` that will provide a value every given number of seconds. |
358 | /// |
359 | /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`. |
360 | pub fn interval_stream_seconds(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> { |
361 | interval_stream_seconds_with_priority(crate::Priority::default(), value) |
362 | } |
363 | |
364 | // rustdoc-stripper-ignore-next |
365 | /// Create a `Stream` that will provide a value every given number of seconds. |
366 | /// |
367 | /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`. |
368 | pub fn interval_stream_seconds_with_priority( |
369 | priority: Priority, |
370 | value: u32, |
371 | ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> { |
372 | Box::pin(SourceStream::new(create_source:move |send: UnboundedSender<()>| { |
373 | crate::timeout_source_new_seconds(interval:value, name:None, priority, func:move || { |
374 | if send.unbounded_send(()).is_err() { |
375 | ControlFlow::Break |
376 | } else { |
377 | ControlFlow::Continue |
378 | } |
379 | }) |
380 | })) |
381 | } |
382 | |
383 | #[cfg (any(unix, docsrs))] |
384 | #[cfg_attr (docsrs, doc(cfg(unix)))] |
385 | // rustdoc-stripper-ignore-next |
386 | /// Create a `Stream` that will provide a value whenever the given UNIX signal is raised |
387 | /// |
388 | /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`. |
389 | pub fn unix_signal_stream(signum: i32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> { |
390 | unix_signal_stream_with_priority(crate::Priority::default(), signum) |
391 | } |
392 | |
393 | #[cfg (any(unix, docsrs))] |
394 | #[cfg_attr (docsrs, doc(cfg(unix)))] |
395 | // rustdoc-stripper-ignore-next |
396 | /// Create a `Stream` that will provide a value whenever the given UNIX signal is raised |
397 | /// |
398 | /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`. |
399 | pub fn unix_signal_stream_with_priority( |
400 | priority: Priority, |
401 | signum: i32, |
402 | ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> { |
403 | Box::pin(SourceStream::new(create_source:move |send: UnboundedSender<()>| { |
404 | crate::unix_signal_source_new(signum, name:None, priority, func:move || { |
405 | if send.unbounded_send(()).is_err() { |
406 | ControlFlow::Break |
407 | } else { |
408 | ControlFlow::Continue |
409 | } |
410 | }) |
411 | })) |
412 | } |
413 | |
414 | #[cfg (test)] |
415 | mod tests { |
416 | use std::{thread, time::Duration}; |
417 | |
418 | use futures_util::{future::FutureExt, stream::StreamExt}; |
419 | |
420 | use super::*; |
421 | |
422 | #[test ] |
423 | fn test_timeout() { |
424 | let c = MainContext::new(); |
425 | |
426 | c.block_on(timeout_future(Duration::from_millis(20))); |
427 | } |
428 | |
429 | #[test ] |
430 | fn test_timeout_send() { |
431 | let c = MainContext::new(); |
432 | let l = crate::MainLoop::new(Some(&c), false); |
433 | |
434 | let l_clone = l.clone(); |
435 | c.spawn(timeout_future(Duration::from_millis(20)).then(move |()| { |
436 | l_clone.quit(); |
437 | futures_util::future::ready(()) |
438 | })); |
439 | |
440 | l.run(); |
441 | } |
442 | |
443 | #[test ] |
444 | fn test_interval() { |
445 | let c = MainContext::new(); |
446 | |
447 | let mut count = 0; |
448 | |
449 | { |
450 | let count = &mut count; |
451 | c.block_on( |
452 | interval_stream(Duration::from_millis(20)) |
453 | .take(2) |
454 | .for_each(|()| { |
455 | *count += 1; |
456 | |
457 | futures_util::future::ready(()) |
458 | }) |
459 | .map(|_| ()), |
460 | ); |
461 | } |
462 | |
463 | assert_eq!(count, 2); |
464 | } |
465 | |
466 | #[test ] |
467 | fn test_timeout_and_channel() { |
468 | let c = MainContext::default(); |
469 | |
470 | let res = c.block_on(timeout_future(Duration::from_millis(20)).then(|()| { |
471 | let (sender, receiver) = oneshot::channel(); |
472 | |
473 | thread::spawn(move || { |
474 | sender.send(1).unwrap(); |
475 | }); |
476 | |
477 | receiver.then(|i| futures_util::future::ready(i.unwrap())) |
478 | })); |
479 | |
480 | assert_eq!(res, 1); |
481 | } |
482 | } |
483 | |