1 | // Copyright (C) 2024 The Qt Company Ltd. |
---|---|
2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only |
3 | |
4 | #include "qgst_bus_observer_p.h" |
5 | |
6 | QT_BEGIN_NAMESPACE |
7 | |
8 | QGstBusObserver::QGstBusObserver(QGstBusHandle bus) |
9 | : QGstBusHandle{ |
10 | std::move(bus), |
11 | } |
12 | { |
13 | if (!get()) |
14 | return; |
15 | |
16 | GPollFD pollFd{}; |
17 | gst_bus_get_pollfd(bus: get(), fd: &pollFd); |
18 | Q_ASSERT(pollFd.fd); |
19 | |
20 | #ifndef Q_OS_WIN |
21 | m_socketNotifier.setSocket(pollFd.fd); |
22 | |
23 | QObject::connect(sender: &m_socketNotifier, signal: &QSocketNotifier::activated, context: &m_socketNotifier, |
24 | slot: [this](QSocketDescriptor, QSocketNotifier::Type) { |
25 | this->processAllPendingMessages(); |
26 | }); |
27 | |
28 | m_socketNotifier.setEnabled(true); |
29 | #else |
30 | m_socketNotifier.setHandle(reinterpret_cast<Qt::HANDLE>(pollFd.fd)); |
31 | |
32 | QObject::connect(&m_socketNotifier, &QWinEventNotifier::activated, &m_socketNotifier, |
33 | [this](QWinEventNotifier::HANDLE) { |
34 | this->processAllPendingMessages(); |
35 | }); |
36 | m_socketNotifier.setEnabled(true); |
37 | #endif |
38 | |
39 | } |
40 | |
41 | QGstBusObserver::~QGstBusObserver() |
42 | { |
43 | close(); |
44 | } |
45 | |
46 | void QGstBusObserver::close() |
47 | { |
48 | if (!get()) |
49 | return; |
50 | |
51 | QGstBusHandle::reset(); |
52 | } |
53 | |
54 | void QGstBusObserver::installMessageFilter(QGstreamerBusMessageFilter *filter) |
55 | { |
56 | Q_ASSERT(filter); |
57 | if (!busFilters.contains(t: filter)) |
58 | busFilters.append(t: filter); |
59 | } |
60 | |
61 | void QGstBusObserver::removeMessageFilter(QGstreamerBusMessageFilter *filter) |
62 | { |
63 | Q_ASSERT(filter); |
64 | busFilters.removeAll(t: filter); |
65 | } |
66 | |
67 | bool QGstBusObserver::processNextPendingMessage(GstMessageType type, |
68 | std::optional<std::chrono::nanoseconds> timeout) |
69 | { |
70 | if (!get()) |
71 | return false; |
72 | |
73 | GstClockTime gstTimeout = [&]() -> GstClockTime { |
74 | if (!timeout) |
75 | return GST_CLOCK_TIME_NONE; // block forever |
76 | return timeout->count(); |
77 | }(); |
78 | |
79 | QGstreamerMessage message{ |
80 | gst_bus_timed_pop_filtered(bus: get(), timeout: gstTimeout, types: type), |
81 | QGstreamerMessage::HasRef, |
82 | }; |
83 | if (!message) |
84 | return false; |
85 | |
86 | for (QGstreamerBusMessageFilter *filter : std::as_const(t&: busFilters)) { |
87 | if (filter->processBusMessage(message)) |
88 | break; |
89 | } |
90 | |
91 | return true; |
92 | } |
93 | |
94 | bool QGstBusObserver::currentThreadIsNotifierThread() const |
95 | { |
96 | return m_socketNotifier.thread()->isCurrentThread(); |
97 | } |
98 | |
99 | void QGstBusObserver::processAllPendingMessages() |
100 | { |
101 | for (;;) { |
102 | bool messageHandled = processNextPendingMessage(type: GST_MESSAGE_ANY, timeout: std::chrono::nanoseconds{ 0 }); |
103 | |
104 | if (!messageHandled) |
105 | return; |
106 | } |
107 | } |
108 | |
109 | QT_END_NAMESPACE |
110 |