1 | // Copyright (C) 2024 The Qt Company Ltd. |
2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only |
3 | |
4 | #include "qgst_bus_observer_p.h" |
5 | |
6 | QT_BEGIN_NAMESPACE |
7 | |
8 | QGstBusObserver::QGstBusObserver(QGstBusHandle bus) |
9 | : QGstBusHandle{ |
10 | std::move(bus), |
11 | } |
12 | { |
13 | if (!get()) |
14 | return; |
15 | |
16 | GPollFD pollFd{}; |
17 | gst_bus_get_pollfd(bus: get(), fd: &pollFd); |
18 | Q_ASSERT(pollFd.fd); |
19 | |
20 | #ifndef Q_OS_WIN |
21 | m_socketNotifier.setSocket(pollFd.fd); |
22 | |
23 | QObject::connect(sender: &m_socketNotifier, signal: &QSocketNotifier::activated, context: &m_socketNotifier, |
24 | slot: [this](QSocketDescriptor, QSocketNotifier::Type) { |
25 | this->processAllPendingMessages(); |
26 | }); |
27 | |
28 | m_socketNotifier.setEnabled(true); |
29 | #else |
30 | m_socketNotifier.setHandle(reinterpret_cast<Qt::HANDLE>(pollFd.fd)); |
31 | |
32 | QObject::connect(&m_socketNotifier, &QWinEventNotifier::activated, &m_socketNotifier, |
33 | [this](QWinEventNotifier::HANDLE) { |
34 | this->processAllPendingMessages(); |
35 | }); |
36 | m_socketNotifier.setEnabled(true); |
37 | #endif |
38 | |
39 | gst_bus_set_sync_handler(bus: get(), func: (GstBusSyncHandler)syncGstBusFilter, user_data: this, notify: nullptr); |
40 | } |
41 | |
42 | QGstBusObserver::~QGstBusObserver() |
43 | { |
44 | close(); |
45 | } |
46 | |
47 | void QGstBusObserver::close() |
48 | { |
49 | if (!get()) |
50 | return; |
51 | |
52 | gst_bus_set_sync_handler(bus: get(), func: nullptr, user_data: nullptr, notify: nullptr); |
53 | QGstBusHandle::close(); |
54 | } |
55 | |
56 | void QGstBusObserver::installMessageFilter(QGstreamerSyncMessageFilter *filter) |
57 | { |
58 | Q_ASSERT(filter); |
59 | QMutexLocker lock(&filterMutex); |
60 | if (!syncFilters.contains(t: filter)) |
61 | syncFilters.append(t: filter); |
62 | } |
63 | |
64 | void QGstBusObserver::removeMessageFilter(QGstreamerSyncMessageFilter *filter) |
65 | { |
66 | Q_ASSERT(filter); |
67 | QMutexLocker lock(&filterMutex); |
68 | syncFilters.removeAll(t: filter); |
69 | } |
70 | |
71 | void QGstBusObserver::installMessageFilter(QGstreamerBusMessageFilter *filter) |
72 | { |
73 | Q_ASSERT(filter); |
74 | if (!busFilters.contains(t: filter)) |
75 | busFilters.append(t: filter); |
76 | } |
77 | |
78 | void QGstBusObserver::removeMessageFilter(QGstreamerBusMessageFilter *filter) |
79 | { |
80 | Q_ASSERT(filter); |
81 | busFilters.removeAll(t: filter); |
82 | } |
83 | |
84 | bool QGstBusObserver::processNextPendingMessage(GstMessageType type, |
85 | std::optional<std::chrono::nanoseconds> timeout) |
86 | { |
87 | if (!get()) |
88 | return false; |
89 | |
90 | GstClockTime gstTimeout = [&]() -> GstClockTime { |
91 | if (!timeout) |
92 | return GST_CLOCK_TIME_NONE; // block forever |
93 | return timeout->count(); |
94 | }(); |
95 | |
96 | QGstreamerMessage message{ |
97 | gst_bus_timed_pop_filtered(bus: get(), timeout: gstTimeout, types: type), |
98 | QGstreamerMessage::HasRef, |
99 | }; |
100 | if (!message) |
101 | return false; |
102 | |
103 | for (QGstreamerBusMessageFilter *filter : std::as_const(t&: busFilters)) { |
104 | if (filter->processBusMessage(message)) |
105 | break; |
106 | } |
107 | |
108 | return true; |
109 | } |
110 | |
111 | bool QGstBusObserver::currentThreadIsNotifierThread() const |
112 | { |
113 | return m_socketNotifier.thread()->isCurrentThread(); |
114 | } |
115 | |
116 | void QGstBusObserver::processAllPendingMessages() |
117 | { |
118 | for (;;) { |
119 | bool messageHandled = processNextPendingMessage(type: GST_MESSAGE_ANY, timeout: std::chrono::nanoseconds{ 0 }); |
120 | |
121 | if (!messageHandled) |
122 | return; |
123 | } |
124 | } |
125 | |
126 | GstBusSyncReply QGstBusObserver::syncGstBusFilter(GstBus *bus, GstMessage *message, |
127 | QGstBusObserver *self) |
128 | { |
129 | if (!message) |
130 | return GST_BUS_PASS; |
131 | |
132 | QMutexLocker lock(&self->filterMutex); |
133 | Q_ASSERT(bus == self->get()); |
134 | |
135 | for (QGstreamerSyncMessageFilter *filter : std::as_const(t&: self->syncFilters)) { |
136 | if (filter->processSyncMessage(message: QGstreamerMessage{ message, QGstreamerMessage::NeedsRef })) { |
137 | gst_message_unref(msg: message); |
138 | return GST_BUS_DROP; |
139 | } |
140 | } |
141 | |
142 | return GST_BUS_PASS; |
143 | } |
144 | |
145 | QT_END_NAMESPACE |
146 | |