1// Copyright (C) 2024 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#include "qgst_bus_observer_p.h"
5
6QT_BEGIN_NAMESPACE
7
8QGstBusObserver::QGstBusObserver(QGstBusHandle bus)
9 : QGstBusHandle{
10 std::move(bus),
11 }
12{
13 if (!get())
14 return;
15
16 GPollFD pollFd{};
17 gst_bus_get_pollfd(bus: get(), fd: &pollFd);
18 Q_ASSERT(pollFd.fd);
19
20#ifndef Q_OS_WIN
21 m_socketNotifier.setSocket(pollFd.fd);
22
23 QObject::connect(sender: &m_socketNotifier, signal: &QSocketNotifier::activated, context: &m_socketNotifier,
24 slot: [this](QSocketDescriptor, QSocketNotifier::Type) {
25 this->processAllPendingMessages();
26 });
27
28 m_socketNotifier.setEnabled(true);
29#else
30 m_socketNotifier.setHandle(reinterpret_cast<Qt::HANDLE>(pollFd.fd));
31
32 QObject::connect(&m_socketNotifier, &QWinEventNotifier::activated, &m_socketNotifier,
33 [this](QWinEventNotifier::HANDLE) {
34 this->processAllPendingMessages();
35 });
36 m_socketNotifier.setEnabled(true);
37#endif
38
39 gst_bus_set_sync_handler(bus: get(), func: (GstBusSyncHandler)syncGstBusFilter, user_data: this, notify: nullptr);
40}
41
42QGstBusObserver::~QGstBusObserver()
43{
44 close();
45}
46
47void QGstBusObserver::close()
48{
49 if (!get())
50 return;
51
52 gst_bus_set_sync_handler(bus: get(), func: nullptr, user_data: nullptr, notify: nullptr);
53 QGstBusHandle::close();
54}
55
56void QGstBusObserver::installMessageFilter(QGstreamerSyncMessageFilter *filter)
57{
58 Q_ASSERT(filter);
59 QMutexLocker lock(&filterMutex);
60 if (!syncFilters.contains(t: filter))
61 syncFilters.append(t: filter);
62}
63
64void QGstBusObserver::removeMessageFilter(QGstreamerSyncMessageFilter *filter)
65{
66 Q_ASSERT(filter);
67 QMutexLocker lock(&filterMutex);
68 syncFilters.removeAll(t: filter);
69}
70
71void QGstBusObserver::installMessageFilter(QGstreamerBusMessageFilter *filter)
72{
73 Q_ASSERT(filter);
74 if (!busFilters.contains(t: filter))
75 busFilters.append(t: filter);
76}
77
78void QGstBusObserver::removeMessageFilter(QGstreamerBusMessageFilter *filter)
79{
80 Q_ASSERT(filter);
81 busFilters.removeAll(t: filter);
82}
83
84bool QGstBusObserver::processNextPendingMessage(GstMessageType type,
85 std::optional<std::chrono::nanoseconds> timeout)
86{
87 if (!get())
88 return false;
89
90 GstClockTime gstTimeout = [&]() -> GstClockTime {
91 if (!timeout)
92 return GST_CLOCK_TIME_NONE; // block forever
93 return timeout->count();
94 }();
95
96 QGstreamerMessage message{
97 gst_bus_timed_pop_filtered(bus: get(), timeout: gstTimeout, types: type),
98 QGstreamerMessage::HasRef,
99 };
100 if (!message)
101 return false;
102
103 for (QGstreamerBusMessageFilter *filter : std::as_const(t&: busFilters)) {
104 if (filter->processBusMessage(message))
105 break;
106 }
107
108 return true;
109}
110
111bool QGstBusObserver::currentThreadIsNotifierThread() const
112{
113 return m_socketNotifier.thread()->isCurrentThread();
114}
115
116void QGstBusObserver::processAllPendingMessages()
117{
118 for (;;) {
119 bool messageHandled = processNextPendingMessage(type: GST_MESSAGE_ANY, timeout: std::chrono::nanoseconds{ 0 });
120
121 if (!messageHandled)
122 return;
123 }
124}
125
126GstBusSyncReply QGstBusObserver::syncGstBusFilter(GstBus *bus, GstMessage *message,
127 QGstBusObserver *self)
128{
129 if (!message)
130 return GST_BUS_PASS;
131
132 QMutexLocker lock(&self->filterMutex);
133 Q_ASSERT(bus == self->get());
134
135 for (QGstreamerSyncMessageFilter *filter : std::as_const(t&: self->syncFilters)) {
136 if (filter->processSyncMessage(message: QGstreamerMessage{ message, QGstreamerMessage::NeedsRef })) {
137 gst_message_unref(msg: message);
138 return GST_BUS_DROP;
139 }
140 }
141
142 return GST_BUS_PASS;
143}
144
145QT_END_NAMESPACE
146

Provided by KDAB

Privacy Policy
Start learning QML with our Intro Training
Find out more

source code of qtmultimedia/src/plugins/multimedia/gstreamer/common/qgst_bus_observer.cpp