1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#include <QtCore/qmap.h>
5#include <QtCore/qtimer.h>
6#include <QtCore/qmutex.h>
7#include <QtCore/qlist.h>
8#include <QtCore/qabstracteventdispatcher.h>
9#include <QtCore/qcoreapplication.h>
10#include <QtCore/qproperty.h>
11
12#include "qgstpipeline_p.h"
13#include "qgstreamermessage_p.h"
14
15QT_BEGIN_NAMESPACE
16
17class QGstPipelinePrivate : public QObject
18{
19 Q_OBJECT
20public:
21
22 int m_ref = 0;
23 guint m_tag = 0;
24 GstBus *m_bus = nullptr;
25 QTimer *m_intervalTimer = nullptr;
26 QMutex filterMutex;
27 QList<QGstreamerSyncMessageFilter*> syncFilters;
28 QList<QGstreamerBusMessageFilter*> busFilters;
29 bool inStoppedState = true;
30 mutable qint64 m_position = 0;
31 double m_rate = 1.;
32 bool m_flushOnConfigChanges = false;
33 bool m_pendingFlush = false;
34
35 int m_configCounter = 0;
36 GstState m_savedState = GST_STATE_NULL;
37
38 QGstPipelinePrivate(GstBus* bus, QObject* parent = 0);
39 ~QGstPipelinePrivate();
40
41 void ref() { ++ m_ref; }
42 void deref() { if (!--m_ref) delete this; }
43
44 void installMessageFilter(QGstreamerSyncMessageFilter *filter);
45 void removeMessageFilter(QGstreamerSyncMessageFilter *filter);
46 void installMessageFilter(QGstreamerBusMessageFilter *filter);
47 void removeMessageFilter(QGstreamerBusMessageFilter *filter);
48
49 static GstBusSyncReply syncGstBusFilter(GstBus* bus, GstMessage* message, QGstPipelinePrivate *d)
50 {
51 Q_UNUSED(bus);
52 QMutexLocker lock(&d->filterMutex);
53
54 for (QGstreamerSyncMessageFilter *filter : std::as_const(t&: d->syncFilters)) {
55 if (filter->processSyncMessage(message: QGstreamerMessage(message))) {
56 gst_message_unref(msg: message);
57 return GST_BUS_DROP;
58 }
59 }
60
61 return GST_BUS_PASS;
62 }
63
64private Q_SLOTS:
65 void interval()
66 {
67 GstMessage* message;
68 while ((message = gst_bus_poll(bus: m_bus, events: GST_MESSAGE_ANY, timeout: 0)) != nullptr) {
69 processMessage(message);
70 gst_message_unref(msg: message);
71 }
72 }
73 void doProcessMessage(const QGstreamerMessage& msg)
74 {
75 for (QGstreamerBusMessageFilter *filter : std::as_const(t&: busFilters)) {
76 if (filter->processBusMessage(message: msg))
77 break;
78 }
79 }
80
81private:
82 void processMessage(GstMessage* message)
83 {
84 QGstreamerMessage msg(message);
85 doProcessMessage(msg);
86 }
87
88 void queueMessage(GstMessage* message)
89 {
90 QGstreamerMessage msg(message);
91 QMetaObject::invokeMethod(obj: this, member: "doProcessMessage", c: Qt::QueuedConnection,
92 Q_ARG(QGstreamerMessage, msg));
93 }
94
95 static gboolean busCallback(GstBus *bus, GstMessage *message, gpointer data)
96 {
97 Q_UNUSED(bus);
98 static_cast<QGstPipelinePrivate *>(data)->queueMessage(message);
99 return TRUE;
100 }
101};
102
103QGstPipelinePrivate::QGstPipelinePrivate(GstBus* bus, QObject* parent)
104 : QObject(parent),
105 m_bus(bus)
106{
107 // glib event loop can be disabled either by env variable or QT_NO_GLIB define, so check the dispacher
108 QAbstractEventDispatcher *dispatcher = QCoreApplication::eventDispatcher();
109 const bool hasGlib = dispatcher && dispatcher->inherits(classname: "QEventDispatcherGlib");
110 if (!hasGlib) {
111 m_intervalTimer = new QTimer(this);
112 m_intervalTimer->setInterval(250);
113 connect(asender: m_intervalTimer, SIGNAL(timeout()), SLOT(interval()));
114 m_intervalTimer->start();
115 } else {
116 m_tag = gst_bus_add_watch_full(bus, G_PRIORITY_DEFAULT, func: busCallback, user_data: this, notify: nullptr);
117 }
118
119 gst_bus_set_sync_handler(bus, func: (GstBusSyncHandler)syncGstBusFilter, user_data: this, notify: nullptr);
120}
121
122QGstPipelinePrivate::~QGstPipelinePrivate()
123{
124 delete m_intervalTimer;
125
126 if (m_tag)
127 gst_bus_remove_watch(bus: m_bus);
128
129 gst_bus_set_sync_handler(bus: m_bus, func: nullptr, user_data: nullptr, notify: nullptr);
130 gst_object_unref(GST_OBJECT(m_bus));
131}
132
133void QGstPipelinePrivate::installMessageFilter(QGstreamerSyncMessageFilter *filter)
134{
135 if (filter) {
136 QMutexLocker lock(&filterMutex);
137 if (!syncFilters.contains(t: filter))
138 syncFilters.append(t: filter);
139 }
140}
141
142void QGstPipelinePrivate::removeMessageFilter(QGstreamerSyncMessageFilter *filter)
143{
144 if (filter) {
145 QMutexLocker lock(&filterMutex);
146 syncFilters.removeAll(t: filter);
147 }
148}
149
150void QGstPipelinePrivate::installMessageFilter(QGstreamerBusMessageFilter *filter)
151{
152 if (filter && !busFilters.contains(t: filter))
153 busFilters.append(t: filter);
154}
155
156void QGstPipelinePrivate::removeMessageFilter(QGstreamerBusMessageFilter *filter)
157{
158 if (filter)
159 busFilters.removeAll(t: filter);
160}
161
162QGstPipeline::QGstPipeline(const QGstPipeline &o)
163 : QGstBin(o.bin(), NeedsRef),
164 d(o.d)
165{
166 if (d)
167 d->ref();
168}
169
170QGstPipeline &QGstPipeline::operator=(const QGstPipeline &o)
171{
172 if (this == &o)
173 return *this;
174 if (o.d)
175 o.d->ref();
176 if (d)
177 d->deref();
178 QGstBin::operator=(o);
179 d = o.d;
180 return *this;
181}
182
183QGstPipeline::QGstPipeline(const char *name)
184 : QGstBin(GST_BIN(gst_pipeline_new(name)), NeedsRef)
185{
186 d = new QGstPipelinePrivate(gst_pipeline_get_bus(pipeline: pipeline()));
187 d->ref();
188}
189
190QGstPipeline::QGstPipeline(GstPipeline *p)
191 : QGstBin(&p->bin, NeedsRef)
192{
193 d = new QGstPipelinePrivate(gst_pipeline_get_bus(pipeline: pipeline()));
194 d->ref();
195}
196
197QGstPipeline::~QGstPipeline()
198{
199 if (d)
200 d->deref();
201}
202
203bool QGstPipeline::inStoppedState() const
204{
205 Q_ASSERT(d);
206 return d->inStoppedState;
207}
208
209void QGstPipeline::setInStoppedState(bool stopped)
210{
211 Q_ASSERT(d);
212 d->inStoppedState = stopped;
213}
214
215void QGstPipeline::setFlushOnConfigChanges(bool flush)
216{
217 d->m_flushOnConfigChanges = flush;
218}
219
220void QGstPipeline::installMessageFilter(QGstreamerSyncMessageFilter *filter)
221{
222 Q_ASSERT(d);
223 d->installMessageFilter(filter);
224}
225
226void QGstPipeline::removeMessageFilter(QGstreamerSyncMessageFilter *filter)
227{
228 Q_ASSERT(d);
229 d->removeMessageFilter(filter);
230}
231
232void QGstPipeline::installMessageFilter(QGstreamerBusMessageFilter *filter)
233{
234 Q_ASSERT(d);
235 d->installMessageFilter(filter);
236}
237
238void QGstPipeline::removeMessageFilter(QGstreamerBusMessageFilter *filter)
239{
240 Q_ASSERT(d);
241 d->removeMessageFilter(filter);
242}
243
244GstStateChangeReturn QGstPipeline::setState(GstState state)
245{
246 auto retval = gst_element_set_state(element: element(), state);
247 if (d->m_pendingFlush) {
248 d->m_pendingFlush = false;
249 flush();
250 }
251 return retval;
252}
253
254void QGstPipeline::beginConfig()
255{
256 if (!d)
257 return;
258 Q_ASSERT(!isNull());
259
260 ++d->m_configCounter;
261 if (d->m_configCounter > 1)
262 return;
263
264 d->m_savedState = state();
265 if (d->m_savedState == GST_STATE_PLAYING)
266 setStateSync(GST_STATE_PAUSED);
267}
268
269void QGstPipeline::endConfig()
270{
271 if (!d)
272 return;
273 Q_ASSERT(!isNull());
274
275 --d->m_configCounter;
276 if (d->m_configCounter)
277 return;
278
279 if (d->m_flushOnConfigChanges)
280 d->m_pendingFlush = true;
281 if (d->m_savedState == GST_STATE_PLAYING)
282 setState(GST_STATE_PLAYING);
283 d->m_savedState = GST_STATE_NULL;
284}
285
286void QGstPipeline::flush()
287{
288 seek(pos: position(), rate: d->m_rate);
289}
290
291bool QGstPipeline::seek(qint64 pos, double rate)
292{
293 // always adjust the rate, so it can be set before playback starts
294 // setting position needs a loaded media file that's seekable
295 d->m_rate = rate;
296 qint64 from = rate > 0 ? pos : 0;
297 qint64 to = rate > 0 ? duration() : pos;
298 bool success = gst_element_seek(element: element(), rate, format: GST_FORMAT_TIME,
299 flags: GstSeekFlags(GST_SEEK_FLAG_FLUSH),
300 start_type: GST_SEEK_TYPE_SET, start: from,
301 stop_type: GST_SEEK_TYPE_SET, stop: to);
302 if (!success)
303 return false;
304
305 d->m_position = pos;
306 return true;
307}
308
309bool QGstPipeline::setPlaybackRate(double rate)
310{
311 if (rate == d->m_rate)
312 return false;
313 seek(pos: position(), rate);
314 return true;
315}
316
317double QGstPipeline::playbackRate() const
318{
319 return d->m_rate;
320}
321
322bool QGstPipeline::setPosition(qint64 pos)
323{
324 return seek(pos, rate: d->m_rate);
325}
326
327qint64 QGstPipeline::position() const
328{
329 gint64 pos;
330 if (gst_element_query_position(element: element(), format: GST_FORMAT_TIME, cur: &pos))
331 d->m_position = pos;
332 return d->m_position;
333}
334
335qint64 QGstPipeline::duration() const
336{
337 gint64 d;
338 if (!gst_element_query_duration(element: element(), format: GST_FORMAT_TIME, duration: &d))
339 return 0.;
340 return d;
341}
342
343QT_END_NAMESPACE
344
345#include "qgstpipeline.moc"
346
347

source code of qtmultimedia/src/plugins/multimedia/gstreamer/common/qgstpipeline.cpp