1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4 cmp,
5 marker::Unpin,
6 pin::Pin,
7 ptr,
8 sync::{atomic, atomic::AtomicI32},
9};
10
11use futures_core::{Future, Stream};
12use glib::{
13 ffi::{gboolean, gpointer},
14 prelude::*,
15 translate::*,
16};
17use libc::c_void;
18
19use crate::{
20 prelude::*, Clock, ClockEntryType, ClockError, ClockFlags, ClockReturn, ClockSuccess,
21 ClockTime, ClockTimeDiff,
22};
23
24glib::wrapper! {
25 #[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
26 pub struct ClockId(Shared<c_void>);
27
28 match fn {
29 ref => |ptr| ffi::gst_clock_id_ref(ptr),
30 unref => |ptr| ffi::gst_clock_id_unref(ptr),
31 }
32}
33
34impl ClockId {
35 #[doc(alias = "get_time")]
36 #[doc(alias = "gst_clock_id_get_time")]
37 #[doc(alias = "GST_CLOCK_ENTRY_TIME")]
38 pub fn time(&self) -> ClockTime {
39 unsafe {
40 try_from_glib(ffi::gst_clock_id_get_time(self.to_glib_none().0))
41 .expect("undefined time")
42 }
43 }
44
45 #[doc(alias = "gst_clock_id_unschedule")]
46 pub fn unschedule(&self) {
47 unsafe { ffi::gst_clock_id_unschedule(self.to_glib_none().0) }
48 }
49
50 #[doc(alias = "gst_clock_id_wait")]
51 pub fn wait(&self) -> (Result<ClockSuccess, ClockError>, ClockTimeDiff) {
52 unsafe {
53 let mut jitter = 0;
54 let res = try_from_glib(ffi::gst_clock_id_wait(self.to_glib_none().0, &mut jitter));
55 (res, jitter)
56 }
57 }
58
59 #[doc(alias = "gst_clock_id_compare_func")]
60 pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
61 unsafe {
62 let res = ffi::gst_clock_id_compare_func(self.to_glib_none().0, other.to_glib_none().0);
63 res.cmp(&0)
64 }
65 }
66
67 #[cfg(feature = "v1_16")]
68 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
69 #[doc(alias = "get_clock")]
70 #[doc(alias = "gst_clock_id_get_clock")]
71 pub fn clock(&self) -> Option<Clock> {
72 unsafe { from_glib_full(ffi::gst_clock_id_get_clock(self.to_glib_none().0)) }
73 }
74
75 #[cfg(feature = "v1_16")]
76 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
77 #[doc(alias = "gst_clock_id_uses_clock")]
78 pub fn uses_clock<P: IsA<Clock>>(&self, clock: &P) -> bool {
79 unsafe {
80 from_glib(ffi::gst_clock_id_uses_clock(
81 self.to_glib_none().0,
82 clock.as_ref().as_ptr(),
83 ))
84 }
85 }
86
87 #[doc(alias = "get_type")]
88 #[doc(alias = "GST_CLOCK_ENTRY_TYPE")]
89 pub fn type_(&self) -> ClockEntryType {
90 unsafe {
91 let ptr = self.as_ptr() as *mut ffi::GstClockEntry;
92 from_glib((*ptr).type_)
93 }
94 }
95
96 #[doc(alias = "get_status")]
97 #[doc(alias = "GST_CLOCK_ENTRY_STATUS")]
98 pub fn status(&self) -> &AtomicClockReturn {
99 unsafe {
100 let ptr = self.as_ptr() as *mut ffi::GstClockEntry;
101 &*((&(*ptr).status) as *const i32 as *const AtomicClockReturn)
102 }
103 }
104}
105
106#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
107pub struct SingleShotClockId(ClockId);
108
109impl std::ops::Deref for SingleShotClockId {
110 type Target = ClockId;
111
112 #[inline]
113 fn deref(&self) -> &Self::Target {
114 &self.0
115 }
116}
117
118impl From<SingleShotClockId> for ClockId {
119 #[inline]
120 fn from(id: SingleShotClockId) -> ClockId {
121 skip_assert_initialized!();
122 id.0
123 }
124}
125
126impl TryFrom<ClockId> for SingleShotClockId {
127 type Error = glib::BoolError;
128
129 #[inline]
130 fn try_from(id: ClockId) -> Result<SingleShotClockId, glib::BoolError> {
131 skip_assert_initialized!();
132 match id.type_() {
133 ClockEntryType::Single => Ok(SingleShotClockId(id)),
134 _ => Err(glib::bool_error!("Not a single-shot clock id")),
135 }
136 }
137}
138
139impl SingleShotClockId {
140 #[doc(alias = "gst_clock_id_compare_func")]
141 #[inline]
142 pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
143 self.0.compare_by_time(&other.0)
144 }
145
146 #[doc(alias = "gst_clock_id_wait_async")]
147 pub fn wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError>
148 where
149 F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
150 {
151 unsafe extern "C" fn trampoline<
152 F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
153 >(
154 clock: *mut ffi::GstClock,
155 time: ffi::GstClockTime,
156 id: gpointer,
157 func: gpointer,
158 ) -> gboolean {
159 let f: &mut Option<F> = &mut *(func as *mut Option<F>);
160 let f = f.take().unwrap();
161
162 f(
163 &from_glib_borrow(clock),
164 from_glib(time),
165 &from_glib_borrow(id),
166 );
167
168 glib::ffi::GTRUE
169 }
170
171 unsafe extern "C" fn destroy_notify<
172 F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
173 >(
174 ptr: gpointer,
175 ) {
176 let _ = Box::<Option<F>>::from_raw(ptr as *mut _);
177 }
178
179 let func: Box<Option<F>> = Box::new(Some(func));
180
181 unsafe {
182 try_from_glib(ffi::gst_clock_id_wait_async(
183 self.to_glib_none().0,
184 Some(trampoline::<F>),
185 Box::into_raw(func) as gpointer,
186 Some(destroy_notify::<F>),
187 ))
188 }
189 }
190
191 #[allow(clippy::type_complexity)]
192 pub fn wait_async_future(
193 &self,
194 ) -> Result<
195 Pin<
196 Box<
197 dyn Future<Output = Result<(Option<ClockTime>, ClockId), ClockError>>
198 + Send
199 + 'static,
200 >,
201 >,
202 ClockError,
203 > {
204 use futures_channel::oneshot;
205
206 let (sender, receiver) = oneshot::channel();
207
208 self.wait_async(move |_clock, jitter, id| {
209 if sender.send((jitter, id.clone())).is_err() {
210 // Unschedule any future calls if the receiver end is disconnected
211 id.unschedule();
212 }
213 })?;
214
215 Ok(Box::pin(async move {
216 receiver.await.map_err(|_| ClockError::Unscheduled)
217 }))
218 }
219}
220
221#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
222pub struct PeriodicClockId(ClockId);
223
224impl std::ops::Deref for PeriodicClockId {
225 type Target = ClockId;
226
227 #[inline]
228 fn deref(&self) -> &Self::Target {
229 &self.0
230 }
231}
232
233impl From<PeriodicClockId> for ClockId {
234 #[inline]
235 fn from(id: PeriodicClockId) -> ClockId {
236 skip_assert_initialized!();
237 id.0
238 }
239}
240
241impl TryFrom<ClockId> for PeriodicClockId {
242 type Error = glib::BoolError;
243
244 #[inline]
245 fn try_from(id: ClockId) -> Result<PeriodicClockId, glib::BoolError> {
246 skip_assert_initialized!();
247 match id.type_() {
248 ClockEntryType::Periodic => Ok(PeriodicClockId(id)),
249 _ => Err(glib::bool_error!("Not a periodic clock id")),
250 }
251 }
252}
253
254impl PeriodicClockId {
255 #[doc(alias = "get_interval")]
256 #[doc(alias = "GST_CLOCK_ENTRY_INTERVAL")]
257 #[inline]
258 pub fn interval(&self) -> ClockTime {
259 unsafe {
260 let ptr = self.as_ptr() as *mut ffi::GstClockEntry;
261 try_from_glib((*ptr).interval).expect("undefined interval")
262 }
263 }
264
265 #[doc(alias = "gst_clock_id_compare_func")]
266 #[inline]
267 pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
268 self.0.compare_by_time(&other.0)
269 }
270
271 #[doc(alias = "gst_clock_id_wait_async")]
272 pub fn wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError>
273 where
274 F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
275 {
276 unsafe extern "C" fn trampoline<
277 F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
278 >(
279 clock: *mut ffi::GstClock,
280 time: ffi::GstClockTime,
281 id: gpointer,
282 func: gpointer,
283 ) -> gboolean {
284 let f: &F = &*(func as *const F);
285 f(
286 &from_glib_borrow(clock),
287 from_glib(time),
288 &from_glib_borrow(id),
289 );
290 glib::ffi::GTRUE
291 }
292
293 unsafe extern "C" fn destroy_notify<
294 F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,
295 >(
296 ptr: gpointer,
297 ) {
298 let _ = Box::<F>::from_raw(ptr as *mut _);
299 }
300
301 let func: Box<F> = Box::new(func);
302 unsafe {
303 try_from_glib(ffi::gst_clock_id_wait_async(
304 self.to_glib_none().0,
305 Some(trampoline::<F>),
306 Box::into_raw(func) as gpointer,
307 Some(destroy_notify::<F>),
308 ))
309 }
310 }
311
312 #[allow(clippy::type_complexity)]
313 pub fn wait_async_stream(
314 &self,
315 ) -> Result<
316 Pin<Box<dyn Stream<Item = (Option<ClockTime>, ClockId)> + Unpin + Send + 'static>>,
317 ClockError,
318 > {
319 use futures_channel::mpsc;
320
321 let (sender, receiver) = mpsc::unbounded();
322
323 self.wait_async(move |_clock, jitter, id| {
324 if sender.unbounded_send((jitter, id.clone())).is_err() {
325 // Unschedule any future calls if the receiver end is disconnected
326 id.unschedule();
327 }
328 })?;
329
330 Ok(Box::pin(receiver))
331 }
332}
333
334#[repr(transparent)]
335#[derive(Debug)]
336pub struct AtomicClockReturn(AtomicI32);
337
338impl AtomicClockReturn {
339 #[inline]
340 pub fn load(&self) -> ClockReturn {
341 unsafe { from_glib(self.0.load(atomic::Ordering::SeqCst)) }
342 }
343
344 #[inline]
345 pub fn store(&self, val: ClockReturn) {
346 self.0.store(val.into_glib(), atomic::Ordering::SeqCst)
347 }
348
349 #[inline]
350 pub fn swap(&self, val: ClockReturn) -> ClockReturn {
351 unsafe { from_glib(self.0.swap(val.into_glib(), atomic::Ordering::SeqCst)) }
352 }
353
354 #[inline]
355 pub fn compare_exchange(
356 &self,
357 current: ClockReturn,
358 new: ClockReturn,
359 ) -> Result<ClockReturn, ClockReturn> {
360 unsafe {
361 self.0
362 .compare_exchange(
363 current.into_glib(),
364 new.into_glib(),
365 atomic::Ordering::SeqCst,
366 atomic::Ordering::SeqCst,
367 )
368 .map(|v| from_glib(v))
369 .map_err(|v| from_glib(v))
370 }
371 }
372}
373
374unsafe impl Send for ClockId {}
375unsafe impl Sync for ClockId {}
376
377impl Clock {
378 #[doc(alias = "gst_clock_adjust_with_calibration")]
379 pub fn adjust_with_calibration(
380 internal_target: ClockTime,
381 cinternal: ClockTime,
382 cexternal: ClockTime,
383 cnum: ClockTime,
384 cdenom: ClockTime,
385 ) -> ClockTime {
386 skip_assert_initialized!();
387 unsafe {
388 try_from_glib(ffi::gst_clock_adjust_with_calibration(
389 ptr::null_mut(),
390 internal_target.into_glib(),
391 cinternal.into_glib(),
392 cexternal.into_glib(),
393 cnum.into_glib(),
394 cdenom.into_glib(),
395 ))
396 .expect("undefined ClockTime")
397 }
398 }
399
400 #[doc(alias = "gst_clock_unadjust_with_calibration")]
401 pub fn unadjust_with_calibration(
402 external_target: ClockTime,
403 cinternal: ClockTime,
404 cexternal: ClockTime,
405 cnum: ClockTime,
406 cdenom: ClockTime,
407 ) -> ClockTime {
408 skip_assert_initialized!();
409 unsafe {
410 try_from_glib(ffi::gst_clock_unadjust_with_calibration(
411 ptr::null_mut(),
412 external_target.into_glib(),
413 cinternal.into_glib(),
414 cexternal.into_glib(),
415 cnum.into_glib(),
416 cdenom.into_glib(),
417 ))
418 .expect("undefined ClockTime")
419 }
420 }
421}
422
423mod sealed {
424 pub trait Sealed {}
425 impl<T: super::IsA<super::Clock>> Sealed for T {}
426}
427
428pub trait ClockExtManual: sealed::Sealed + IsA<Clock> + 'static {
429 #[doc(alias = "gst_clock_new_periodic_id")]
430 fn new_periodic_id(&self, start_time: ClockTime, interval: ClockTime) -> PeriodicClockId {
431 assert_ne!(interval, ClockTime::ZERO);
432
433 unsafe {
434 PeriodicClockId(from_glib_full(ffi::gst_clock_new_periodic_id(
435 self.as_ref().to_glib_none().0,
436 start_time.into_glib(),
437 interval.into_glib(),
438 )))
439 }
440 }
441
442 #[doc(alias = "gst_clock_periodic_id_reinit")]
443 fn periodic_id_reinit(
444 &self,
445 id: &PeriodicClockId,
446 start_time: ClockTime,
447 interval: ClockTime,
448 ) -> Result<(), glib::BoolError> {
449 unsafe {
450 let res: bool = from_glib(ffi::gst_clock_periodic_id_reinit(
451 self.as_ref().to_glib_none().0,
452 id.to_glib_none().0,
453 start_time.into_glib(),
454 interval.into_glib(),
455 ));
456 if res {
457 Ok(())
458 } else {
459 Err(glib::bool_error!("Failed to reinit periodic clock id"))
460 }
461 }
462 }
463
464 #[doc(alias = "gst_clock_new_single_shot_id")]
465 fn new_single_shot_id(&self, time: ClockTime) -> SingleShotClockId {
466 unsafe {
467 SingleShotClockId(from_glib_full(ffi::gst_clock_new_single_shot_id(
468 self.as_ref().to_glib_none().0,
469 time.into_glib(),
470 )))
471 }
472 }
473
474 #[doc(alias = "gst_clock_single_shot_id_reinit")]
475 fn single_shot_id_reinit(
476 &self,
477 id: &SingleShotClockId,
478 time: ClockTime,
479 ) -> Result<(), glib::BoolError> {
480 unsafe {
481 let res: bool = from_glib(ffi::gst_clock_single_shot_id_reinit(
482 self.as_ref().to_glib_none().0,
483 id.to_glib_none().0,
484 time.into_glib(),
485 ));
486 if res {
487 Ok(())
488 } else {
489 Err(glib::bool_error!("Failed to reinit single shot clock id"))
490 }
491 }
492 }
493
494 fn set_clock_flags(&self, flags: ClockFlags) {
495 unsafe {
496 let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _;
497 let _guard = self.as_ref().object_lock();
498 (*ptr).flags |= flags.into_glib();
499 }
500 }
501
502 fn unset_clock_flags(&self, flags: ClockFlags) {
503 unsafe {
504 let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _;
505 let _guard = self.as_ref().object_lock();
506 (*ptr).flags &= !flags.into_glib();
507 }
508 }
509
510 #[doc(alias = "get_clock_flags")]
511 fn clock_flags(&self) -> ClockFlags {
512 unsafe {
513 let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _;
514 let _guard = self.as_ref().object_lock();
515 from_glib((*ptr).flags)
516 }
517 }
518}
519
520impl<O: IsA<Clock>> ClockExtManual for O {}
521
522#[cfg(test)]
523mod tests {
524 use std::sync::mpsc::channel;
525
526 use super::*;
527 use crate::SystemClock;
528
529 #[test]
530 fn test_wait() {
531 crate::init().unwrap();
532
533 let clock = SystemClock::obtain();
534 let now = clock.time().unwrap();
535 let id = clock.new_single_shot_id(now + 20 * ClockTime::MSECOND);
536 let (res, _) = id.wait();
537
538 assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early));
539 }
540
541 #[test]
542 fn test_wait_async() {
543 crate::init().unwrap();
544
545 let (sender, receiver) = channel();
546
547 let clock = SystemClock::obtain();
548 let now = clock.time().unwrap();
549 let id = clock.new_single_shot_id(now + 20 * ClockTime::MSECOND);
550 let res = id.wait_async(move |_, _, _| {
551 sender.send(()).unwrap();
552 });
553
554 assert!(res == Ok(ClockSuccess::Ok));
555
556 assert_eq!(receiver.recv(), Ok(()));
557 }
558
559 #[test]
560 fn test_wait_periodic() {
561 crate::init().unwrap();
562
563 let clock = SystemClock::obtain();
564 let now = clock.time().unwrap();
565 let id = clock.new_periodic_id(now + 20 * ClockTime::MSECOND, 20 * ClockTime::MSECOND);
566
567 let (res, _) = id.wait();
568 assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early));
569
570 let (res, _) = id.wait();
571 assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early));
572 }
573
574 #[test]
575 fn test_wait_async_periodic() {
576 crate::init().unwrap();
577
578 let (sender, receiver) = channel();
579
580 let clock = SystemClock::obtain();
581 let now = clock.time().unwrap();
582 let id = clock.new_periodic_id(now + 20 * ClockTime::MSECOND, 20 * ClockTime::MSECOND);
583 let res = id.wait_async(move |_, _, _| {
584 let _ = sender.send(());
585 });
586
587 assert!(res == Ok(ClockSuccess::Ok));
588
589 assert_eq!(receiver.recv(), Ok(()));
590 assert_eq!(receiver.recv(), Ok(()));
591 }
592}
593