1// Copyright (C) 2024 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#include "qgstreamer_qiodevice_handler_p.h"
5
6#include <QtCore/qdebug.h>
7#include <QtCore/qglobal.h>
8#include <QtCore/qiodevice.h>
9#include <QtCore/qmutex.h>
10#include <QtCore/qobject.h>
11#include <QtCore/qspan.h>
12#include <QtCore/qurl.h>
13#include <QtCore/quuid.h>
14
15#include <gst/base/gstbasesrc.h>
16#include <map>
17#include <memory>
18#include <utility>
19
20QT_BEGIN_NAMESPACE
21
22namespace {
23
24using namespace Qt::Literals;
25
26// QIODeviceRegistry
27
28class QIODeviceRegistry : public QObject
29{
30public:
31 struct Record
32 {
33 explicit Record(QByteArray, QIODevice *);
34
35 void unsetDevice();
36 bool isValid() const;
37
38 const QByteArray id;
39
40 template <typename Functor>
41 auto runWhileLocked(Functor &&f)
42 {
43 QMutexLocker guard(&mutex);
44 return f(device);
45 }
46
47 private:
48 QIODevice *device;
49 mutable QMutex mutex;
50 };
51 using SharedRecord = std::shared_ptr<Record>;
52
53 QByteArray registerQIODevice(QIODevice *);
54 SharedRecord findRecord(QByteArrayView);
55
56private:
57 void unregisterDevice(QIODevice *);
58 void deviceDestroyed(QIODevice *);
59
60 QMutex m_registryMutex;
61 std::map<QByteArray, SharedRecord, std::less<>> m_registry;
62 std::map<QIODevice *, QByteArray> m_reverseLookupTable;
63};
64
65QByteArray QIODeviceRegistry::registerQIODevice(QIODevice *device)
66{
67 Q_ASSERT(device);
68
69 if (device->isSequential())
70 qWarning() << "GStreamer: sequential QIODevices are not fully supported";
71
72 QMutexLocker lock(&m_registryMutex);
73
74 auto it = m_reverseLookupTable.find(x: device);
75 if (it != m_reverseLookupTable.end())
76 return it->second;
77
78 QByteArray identifier =
79 "qiodevice:/"_ba + QUuid::createUuid().toByteArray(mode: QUuid::StringFormat::Id128);
80
81 m_registry.emplace(args&: identifier, args: std::make_shared<Record>(args&: identifier, args&: device));
82
83 QMetaObject::Connection destroyedConnection = QObject::connect(
84 sender: device, signal: &QObject::destroyed, context: this,
85 slot: [this, device] {
86 // Caveat: if the QIODevice has not been closed, we unregister the device, however gstreamer
87 // worker threads have a chance to briefly read from a partially destroyed QIODevice.
88 // There's nothing we can do about it
89 unregisterDevice(device);
90 },
91 type: Qt::DirectConnection);
92
93 QObject::connect(
94 sender: device, signal: &QIODevice::aboutToClose, context: this,
95 slot: [this, device, destroyedConnection = std::move(destroyedConnection)] {
96 unregisterDevice(device);
97 disconnect(destroyedConnection);
98 },
99 type: Qt::DirectConnection);
100
101 m_reverseLookupTable.emplace(args&: device, args&: identifier);
102 return identifier;
103}
104
105QIODeviceRegistry::SharedRecord QIODeviceRegistry::findRecord(QByteArrayView id)
106{
107 QMutexLocker registryLock(&m_registryMutex);
108
109 auto it = m_registry.find(x: id);
110 if (it != m_registry.end())
111 return it->second;
112 return {};
113}
114
115void QIODeviceRegistry::unregisterDevice(QIODevice *device)
116{
117 QMutexLocker registryLock(&m_registryMutex);
118 auto reverseLookupIt = m_reverseLookupTable.find(x: device);
119 if (reverseLookupIt == m_reverseLookupTable.end())
120 return;
121
122 auto it = m_registry.find(x: reverseLookupIt->second);
123 Q_ASSERT(it != m_registry.end());
124
125 it->second->unsetDevice();
126 m_reverseLookupTable.erase(position: reverseLookupIt);
127 m_registry.erase(position: it);
128}
129
130QIODeviceRegistry::Record::Record(QByteArray id, QIODevice *device)
131 : id {
132 std::move(id),
133 },
134 device {
135 device,
136 }
137{
138 if (!device->isOpen())
139 device->open(mode: QIODevice::ReadOnly);
140}
141
142void QIODeviceRegistry::Record::unsetDevice()
143{
144 QMutexLocker lock(&mutex);
145 device = nullptr;
146}
147
148bool QIODeviceRegistry::Record::isValid() const
149{
150 QMutexLocker lock(&mutex);
151 return device;
152}
153
154Q_GLOBAL_STATIC(QIODeviceRegistry, gQIODeviceRegistry);
155
156// qt helpers
157
158// glib / gstreamer object
159enum PropertyId : uint8_t {
160 PROP_NONE,
161 PROP_URI,
162};
163
164struct QGstQIODeviceSrc
165{
166 void getProperty(guint propId, GValue *value, const GParamSpec *pspec) const;
167 void setProperty(guint propId, const GValue *value, const GParamSpec *pspec);
168 auto lockObject() const;
169
170 bool start();
171 bool stop();
172
173 bool isSeekable();
174 std::optional<guint64> size();
175 GstFlowReturn fill(guint64 offset, guint length, GstBuffer *buf);
176 void getURI(GValue *value) const;
177 bool setURI(const char *location, GError **err = nullptr);
178
179 GstBaseSrc baseSrc;
180 QIODeviceRegistry::SharedRecord record;
181};
182
183void QGstQIODeviceSrc::getProperty(guint propId, GValue *value, const GParamSpec *pspec) const
184{
185 switch (propId) {
186 case PROP_URI:
187 return getURI(value);
188
189 default:
190 G_OBJECT_WARN_INVALID_PROPERTY_ID(this, propId, pspec);
191 break;
192 }
193}
194
195void QGstQIODeviceSrc::setProperty(guint propId, const GValue *value, const GParamSpec *pspec)
196{
197 switch (propId) {
198 case PROP_URI:
199 setURI(location: g_value_get_string(value));
200 break;
201 default:
202 G_OBJECT_WARN_INVALID_PROPERTY_ID(this, propId, pspec);
203 break;
204 }
205}
206
207auto QGstQIODeviceSrc::lockObject() const
208{
209 GST_OBJECT_LOCK(this);
210 return qScopeGuard(f: [this] {
211 GST_OBJECT_UNLOCK(this);
212 });
213}
214
215bool QGstQIODeviceSrc::start()
216{
217 auto lock = lockObject();
218
219 return record && record->isValid();
220}
221
222bool QGstQIODeviceSrc::stop()
223{
224 return true;
225}
226
227bool QGstQIODeviceSrc::isSeekable()
228{
229 auto lock = lockObject();
230 return record->runWhileLocked(f: [&](QIODevice *device) {
231 return !device->isSequential();
232 });
233}
234
235std::optional<guint64> QGstQIODeviceSrc::size()
236{
237 auto lock = lockObject();
238 if (!record)
239 return std::nullopt;
240
241 qint64 size = record->runWhileLocked(f: [&](QIODevice *device) {
242 return device->size();
243 });
244
245 if (size == -1)
246 return std::nullopt;
247 return size;
248}
249
250GstFlowReturn QGstQIODeviceSrc::fill(guint64 offset, guint length, GstBuffer *buf)
251{
252 auto lock = lockObject();
253
254 if (!record)
255 return GST_FLOW_ERROR;
256
257 GstMapInfo info;
258 if (!gst_buffer_map(buffer: buf, info: &info, flags: GST_MAP_WRITE)) {
259 GST_ELEMENT_ERROR(this, RESOURCE, WRITE, (nullptr), ("Can't map buffer for writing"));
260 return GST_FLOW_ERROR;
261 }
262
263 int64_t totalRead = 0;
264 GstFlowReturn ret = record->runWhileLocked(f: [&](QIODevice *device) -> GstFlowReturn {
265 if (device->pos() != qint64(offset)) {
266 bool success = device->seek(pos: offset);
267 if (!success) {
268 qWarning() << "seek on iodevice failed";
269 return GST_FLOW_ERROR;
270 }
271 }
272
273 int64_t remain = length;
274 while (remain) {
275 int64_t bytesRead =
276 device->read(data: reinterpret_cast<char *>(info.data + totalRead), maxlen: remain);
277 if (bytesRead == -1) {
278 if (device->atEnd()) {
279 return GST_FLOW_EOS;
280 }
281 GST_ELEMENT_ERROR(this, RESOURCE, READ, (nullptr), GST_ERROR_SYSTEM);
282 return GST_FLOW_ERROR;
283 }
284
285 remain -= bytesRead;
286 totalRead += bytesRead;
287 }
288
289 return GST_FLOW_OK;
290 });
291
292 if (ret != GST_FLOW_OK) {
293 gst_buffer_unmap(buffer: buf, info: &info);
294 gst_buffer_resize(buffer: buf, offset: 0, size: 0);
295 return ret;
296 }
297
298 gst_buffer_unmap(buffer: buf, info: &info);
299 if (totalRead != length)
300 gst_buffer_resize(buffer: buf, offset: 0, size: totalRead);
301
302 GST_BUFFER_OFFSET(buf) = offset;
303 GST_BUFFER_OFFSET_END(buf) = offset + totalRead;
304
305 return GST_FLOW_OK;
306}
307
308void QGstQIODeviceSrc::getURI(GValue *value) const
309{
310 auto lock = lockObject();
311 if (record)
312 g_value_set_string(value, v_string: record->id);
313 else
314 g_value_set_string(value, v_string: nullptr);
315}
316
317bool QGstQIODeviceSrc::setURI(const char *location, GError **err)
318{
319 Q_ASSERT(QLatin1StringView(location).startsWith("qiodevice:/"_L1));
320
321 {
322 auto lock = lockObject();
323 GstState state = GST_STATE(this);
324 if (state != GST_STATE_READY && state != GST_STATE_NULL) {
325 g_warning(
326 "Changing the `uri' property on qiodevicesrc when the resource is open is not "
327 "supported.");
328 if (err)
329 g_set_error(err, GST_URI_ERROR, code: GST_URI_ERROR_BAD_STATE,
330 format: "Changing the `uri' property on qiodevicesrc when the resource is open "
331 "is not supported.");
332 return false;
333 }
334 }
335
336 auto newRecord = gQIODeviceRegistry->findRecord(id: QByteArrayView{ location });
337
338 {
339 auto lock = lockObject();
340 record = std::move(newRecord);
341 }
342
343 g_object_notify(G_OBJECT(this), property_name: "uri");
344
345 return true;
346}
347
348struct QGstQIODeviceSrcClass
349{
350 GstBaseSrcClass parent_class;
351};
352
353// GObject
354static void gst_qiodevice_src_class_init(QGstQIODeviceSrcClass *klass);
355static void gst_qiodevice_src_init(QGstQIODeviceSrc *self);
356
357GType gst_qiodevice_src_get_type();
358
359template <typename T>
360QGstQIODeviceSrc *asQGstQIODeviceSrc(T *obj)
361{
362 return (G_TYPE_CHECK_INSTANCE_CAST((obj), gst_qiodevice_src_get_type(), QGstQIODeviceSrc));
363}
364
365// URI handler
366void qGstInitQIODeviceURIHandler(gpointer, gpointer);
367
368#define gst_qiodevice_src_parent_class parent_class
369G_DEFINE_TYPE_WITH_CODE(QGstQIODeviceSrc, gst_qiodevice_src, GST_TYPE_BASE_SRC,
370 G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, qGstInitQIODeviceURIHandler));
371
372// implementations
373
374void gst_qiodevice_src_init(QGstQIODeviceSrc *self)
375{
376 using SharedRecord = QIODeviceRegistry::SharedRecord;
377
378 new (reinterpret_cast<void *>(&self->record)) SharedRecord;
379
380 static constexpr guint defaultBlockSize = 16384;
381 gst_base_src_set_blocksize(src: &self->baseSrc, blocksize: defaultBlockSize);
382}
383
384void gst_qiodevice_src_class_init(QGstQIODeviceSrcClass *klass)
385{
386 // GObject
387 GObjectClass *gobjectClass = G_OBJECT_CLASS(klass);
388 gobjectClass->set_property = [](GObject *instance, guint propId, const GValue *value,
389 GParamSpec *pspec) {
390 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: instance);
391 return src->setProperty(propId, value, pspec);
392 };
393
394 gobjectClass->get_property = [](GObject *instance, guint propId, GValue *value,
395 GParamSpec *pspec) {
396 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: instance);
397 return src->getProperty(propId, value, pspec);
398 };
399
400 g_object_class_install_property(
401 oclass: gobjectClass, property_id: PROP_URI,
402 pspec: g_param_spec_string(name: "uri", nick: "QRC Location", blurb: "Path of the qrc to read", default_value: nullptr,
403 flags: GParamFlags(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS
404 | GST_PARAM_MUTABLE_READY)));
405
406 gobjectClass->finalize = [](GObject *instance) {
407 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: instance);
408 using SharedRecord = QIODeviceRegistry::SharedRecord;
409 src->record.~SharedRecord();
410 G_OBJECT_CLASS(parent_class)->finalize(instance);
411 };
412
413 // GstElement
414 GstElementClass *gstelementClass = GST_ELEMENT_CLASS(klass);
415 gst_element_class_set_static_metadata(klass: gstelementClass, longname: "QRC Source", classification: "Source/QRC",
416 description: "Read from arbitrary point in QRC resource",
417 author: "Tim Blechmann <tim.blechmann@qt.io>");
418
419 static GstStaticPadTemplate srctemplate =
420 GST_STATIC_PAD_TEMPLATE("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
421
422 gst_element_class_add_static_pad_template(klass: gstelementClass, static_templ: &srctemplate);
423
424 // GstBaseSrc
425 GstBaseSrcClass *gstbasesrcClass = GST_BASE_SRC_CLASS(klass);
426 gstbasesrcClass->start = [](GstBaseSrc *basesrc) -> gboolean {
427 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: basesrc);
428 return src->start();
429 };
430 gstbasesrcClass->stop = [](GstBaseSrc *basesrc) -> gboolean {
431 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: basesrc);
432 return src->stop();
433 };
434
435 gstbasesrcClass->is_seekable = [](GstBaseSrc *basesrc) -> gboolean {
436 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: basesrc);
437 return src->isSeekable();
438 };
439 gstbasesrcClass->get_size = [](GstBaseSrc *basesrc, guint64 *size) -> gboolean {
440 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: basesrc);
441 auto optionalSize = src->size();
442 if (!optionalSize)
443 return false;
444 *size = optionalSize.value();
445 return true;
446 };
447 gstbasesrcClass->fill = [](GstBaseSrc *basesrc, guint64 offset, guint length,
448 GstBuffer *buf) -> GstFlowReturn {
449 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: basesrc);
450 return src->fill(offset, length, buf);
451 };
452}
453
454void qGstInitQIODeviceURIHandler(gpointer g_handlerInterface, gpointer)
455{
456 GstURIHandlerInterface *iface = (GstURIHandlerInterface *)g_handlerInterface;
457
458 iface->get_type = [](GType) {
459 return GST_URI_SRC;
460 };
461 iface->get_protocols = [](GType) {
462 static constexpr const gchar *protocols[] = {
463 "qiodevice",
464 nullptr,
465 };
466 return protocols;
467 };
468 iface->get_uri = [](GstURIHandler *handler) -> gchar * {
469 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: handler);
470 auto lock = src->lockObject();
471 if (src->record)
472 return g_strdup(str: src->record->id.constData());
473 return nullptr;
474 };
475 iface->set_uri = [](GstURIHandler *handler, const gchar *uri, GError **err) -> gboolean {
476 QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: handler);
477 return src->setURI(location: uri, err);
478 };
479}
480
481} // namespace
482
483// plugin registration
484
485void qGstRegisterQIODeviceHandler(GstPlugin *plugin)
486{
487 gst_element_register(plugin, name: "qiodevicesrc", rank: GST_RANK_PRIMARY, type: gst_qiodevice_src_get_type());
488}
489
490QUrl qGstRegisterQIODevice(QIODevice *device)
491{
492 return QUrl{
493 QString::fromLatin1(ba: gQIODeviceRegistry->registerQIODevice(device)),
494 };
495}
496
497QT_END_NAMESPACE
498

source code of qtmultimedia/src/plugins/multimedia/gstreamer/uri_handler/qgstreamer_qiodevice_handler.cpp