1 | // Copyright (C) 2024 The Qt Company Ltd. |
2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only |
3 | |
4 | #include "qgstreamer_qiodevice_handler_p.h" |
5 | |
6 | #include <QtCore/qdebug.h> |
7 | #include <QtCore/qglobal.h> |
8 | #include <QtCore/qiodevice.h> |
9 | #include <QtCore/qmutex.h> |
10 | #include <QtCore/qobject.h> |
11 | #include <QtCore/qspan.h> |
12 | #include <QtCore/qurl.h> |
13 | #include <QtCore/quuid.h> |
14 | |
15 | #include <gst/base/gstbasesrc.h> |
16 | #include <map> |
17 | #include <memory> |
18 | #include <utility> |
19 | |
20 | QT_BEGIN_NAMESPACE |
21 | |
22 | namespace { |
23 | |
24 | using namespace Qt::Literals; |
25 | |
26 | // QIODeviceRegistry |
27 | |
28 | class QIODeviceRegistry : public QObject |
29 | { |
30 | public: |
31 | struct Record |
32 | { |
33 | explicit Record(QByteArray, QIODevice *); |
34 | |
35 | void unsetDevice(); |
36 | bool isValid() const; |
37 | |
38 | const QByteArray id; |
39 | |
40 | template <typename Functor> |
41 | auto runWhileLocked(Functor &&f) |
42 | { |
43 | QMutexLocker guard(&mutex); |
44 | return f(device); |
45 | } |
46 | |
47 | private: |
48 | QIODevice *device; |
49 | mutable QMutex mutex; |
50 | }; |
51 | using SharedRecord = std::shared_ptr<Record>; |
52 | |
53 | QByteArray registerQIODevice(QIODevice *); |
54 | SharedRecord findRecord(QByteArrayView); |
55 | |
56 | private: |
57 | void unregisterDevice(QIODevice *); |
58 | void deviceDestroyed(QIODevice *); |
59 | |
60 | QMutex m_registryMutex; |
61 | std::map<QByteArray, SharedRecord, std::less<>> m_registry; |
62 | std::map<QIODevice *, QByteArray> m_reverseLookupTable; |
63 | }; |
64 | |
65 | QByteArray QIODeviceRegistry::registerQIODevice(QIODevice *device) |
66 | { |
67 | Q_ASSERT(device); |
68 | |
69 | if (device->isSequential()) |
70 | qWarning() << "GStreamer: sequential QIODevices are not fully supported" ; |
71 | |
72 | QMutexLocker lock(&m_registryMutex); |
73 | |
74 | auto it = m_reverseLookupTable.find(x: device); |
75 | if (it != m_reverseLookupTable.end()) |
76 | return it->second; |
77 | |
78 | QByteArray identifier = |
79 | "qiodevice:/"_ba + QUuid::createUuid().toByteArray(mode: QUuid::StringFormat::Id128); |
80 | |
81 | m_registry.emplace(args&: identifier, args: std::make_shared<Record>(args&: identifier, args&: device)); |
82 | |
83 | QMetaObject::Connection destroyedConnection = QObject::connect( |
84 | sender: device, signal: &QObject::destroyed, context: this, |
85 | slot: [this, device] { |
86 | // Caveat: if the QIODevice has not been closed, we unregister the device, however gstreamer |
87 | // worker threads have a chance to briefly read from a partially destroyed QIODevice. |
88 | // There's nothing we can do about it |
89 | unregisterDevice(device); |
90 | }, |
91 | type: Qt::DirectConnection); |
92 | |
93 | QObject::connect( |
94 | sender: device, signal: &QIODevice::aboutToClose, context: this, |
95 | slot: [this, device, destroyedConnection = std::move(destroyedConnection)] { |
96 | unregisterDevice(device); |
97 | disconnect(destroyedConnection); |
98 | }, |
99 | type: Qt::DirectConnection); |
100 | |
101 | m_reverseLookupTable.emplace(args&: device, args&: identifier); |
102 | return identifier; |
103 | } |
104 | |
105 | QIODeviceRegistry::SharedRecord QIODeviceRegistry::findRecord(QByteArrayView id) |
106 | { |
107 | QMutexLocker registryLock(&m_registryMutex); |
108 | |
109 | auto it = m_registry.find(x: id); |
110 | if (it != m_registry.end()) |
111 | return it->second; |
112 | return {}; |
113 | } |
114 | |
115 | void QIODeviceRegistry::unregisterDevice(QIODevice *device) |
116 | { |
117 | QMutexLocker registryLock(&m_registryMutex); |
118 | auto reverseLookupIt = m_reverseLookupTable.find(x: device); |
119 | if (reverseLookupIt == m_reverseLookupTable.end()) |
120 | return; |
121 | |
122 | auto it = m_registry.find(x: reverseLookupIt->second); |
123 | Q_ASSERT(it != m_registry.end()); |
124 | |
125 | it->second->unsetDevice(); |
126 | m_reverseLookupTable.erase(position: reverseLookupIt); |
127 | m_registry.erase(position: it); |
128 | } |
129 | |
130 | QIODeviceRegistry::Record::Record(QByteArray id, QIODevice *device) |
131 | : id { |
132 | std::move(id), |
133 | }, |
134 | device { |
135 | device, |
136 | } |
137 | { |
138 | if (!device->isOpen()) |
139 | device->open(mode: QIODevice::ReadOnly); |
140 | } |
141 | |
142 | void QIODeviceRegistry::Record::unsetDevice() |
143 | { |
144 | QMutexLocker lock(&mutex); |
145 | device = nullptr; |
146 | } |
147 | |
148 | bool QIODeviceRegistry::Record::isValid() const |
149 | { |
150 | QMutexLocker lock(&mutex); |
151 | return device; |
152 | } |
153 | |
154 | Q_GLOBAL_STATIC(QIODeviceRegistry, gQIODeviceRegistry); |
155 | |
156 | // qt helpers |
157 | |
158 | // glib / gstreamer object |
159 | enum PropertyId : uint8_t { |
160 | PROP_NONE, |
161 | PROP_URI, |
162 | }; |
163 | |
164 | struct QGstQIODeviceSrc |
165 | { |
166 | void getProperty(guint propId, GValue *value, const GParamSpec *pspec) const; |
167 | void setProperty(guint propId, const GValue *value, const GParamSpec *pspec); |
168 | auto lockObject() const; |
169 | |
170 | bool start(); |
171 | bool stop(); |
172 | |
173 | bool isSeekable(); |
174 | std::optional<guint64> size(); |
175 | GstFlowReturn fill(guint64 offset, guint length, GstBuffer *buf); |
176 | void getURI(GValue *value) const; |
177 | bool setURI(const char *location, GError **err = nullptr); |
178 | |
179 | GstBaseSrc baseSrc; |
180 | QIODeviceRegistry::SharedRecord record; |
181 | }; |
182 | |
183 | void QGstQIODeviceSrc::getProperty(guint propId, GValue *value, const GParamSpec *pspec) const |
184 | { |
185 | switch (propId) { |
186 | case PROP_URI: |
187 | return getURI(value); |
188 | |
189 | default: |
190 | G_OBJECT_WARN_INVALID_PROPERTY_ID(this, propId, pspec); |
191 | break; |
192 | } |
193 | } |
194 | |
195 | void QGstQIODeviceSrc::setProperty(guint propId, const GValue *value, const GParamSpec *pspec) |
196 | { |
197 | switch (propId) { |
198 | case PROP_URI: |
199 | setURI(location: g_value_get_string(value)); |
200 | break; |
201 | default: |
202 | G_OBJECT_WARN_INVALID_PROPERTY_ID(this, propId, pspec); |
203 | break; |
204 | } |
205 | } |
206 | |
207 | auto QGstQIODeviceSrc::lockObject() const |
208 | { |
209 | GST_OBJECT_LOCK(this); |
210 | return qScopeGuard(f: [this] { |
211 | GST_OBJECT_UNLOCK(this); |
212 | }); |
213 | } |
214 | |
215 | bool QGstQIODeviceSrc::start() |
216 | { |
217 | auto lock = lockObject(); |
218 | |
219 | return record && record->isValid(); |
220 | } |
221 | |
222 | bool QGstQIODeviceSrc::stop() |
223 | { |
224 | return true; |
225 | } |
226 | |
227 | bool QGstQIODeviceSrc::isSeekable() |
228 | { |
229 | auto lock = lockObject(); |
230 | return record->runWhileLocked(f: [&](QIODevice *device) { |
231 | return !device->isSequential(); |
232 | }); |
233 | } |
234 | |
235 | std::optional<guint64> QGstQIODeviceSrc::size() |
236 | { |
237 | auto lock = lockObject(); |
238 | if (!record) |
239 | return std::nullopt; |
240 | |
241 | qint64 size = record->runWhileLocked(f: [&](QIODevice *device) { |
242 | return device->size(); |
243 | }); |
244 | |
245 | if (size == -1) |
246 | return std::nullopt; |
247 | return size; |
248 | } |
249 | |
250 | GstFlowReturn QGstQIODeviceSrc::fill(guint64 offset, guint length, GstBuffer *buf) |
251 | { |
252 | auto lock = lockObject(); |
253 | |
254 | if (!record) |
255 | return GST_FLOW_ERROR; |
256 | |
257 | GstMapInfo info; |
258 | if (!gst_buffer_map(buffer: buf, info: &info, flags: GST_MAP_WRITE)) { |
259 | GST_ELEMENT_ERROR(this, RESOURCE, WRITE, (nullptr), ("Can't map buffer for writing" )); |
260 | return GST_FLOW_ERROR; |
261 | } |
262 | |
263 | int64_t totalRead = 0; |
264 | GstFlowReturn ret = record->runWhileLocked(f: [&](QIODevice *device) -> GstFlowReturn { |
265 | if (device->pos() != qint64(offset)) { |
266 | bool success = device->seek(pos: offset); |
267 | if (!success) { |
268 | qWarning() << "seek on iodevice failed" ; |
269 | return GST_FLOW_ERROR; |
270 | } |
271 | } |
272 | |
273 | int64_t remain = length; |
274 | while (remain) { |
275 | int64_t bytesRead = |
276 | device->read(data: reinterpret_cast<char *>(info.data + totalRead), maxlen: remain); |
277 | if (bytesRead == -1) { |
278 | if (device->atEnd()) { |
279 | return GST_FLOW_EOS; |
280 | } |
281 | GST_ELEMENT_ERROR(this, RESOURCE, READ, (nullptr), GST_ERROR_SYSTEM); |
282 | return GST_FLOW_ERROR; |
283 | } |
284 | |
285 | remain -= bytesRead; |
286 | totalRead += bytesRead; |
287 | } |
288 | |
289 | return GST_FLOW_OK; |
290 | }); |
291 | |
292 | if (ret != GST_FLOW_OK) { |
293 | gst_buffer_unmap(buffer: buf, info: &info); |
294 | gst_buffer_resize(buffer: buf, offset: 0, size: 0); |
295 | return ret; |
296 | } |
297 | |
298 | gst_buffer_unmap(buffer: buf, info: &info); |
299 | if (totalRead != length) |
300 | gst_buffer_resize(buffer: buf, offset: 0, size: totalRead); |
301 | |
302 | GST_BUFFER_OFFSET(buf) = offset; |
303 | GST_BUFFER_OFFSET_END(buf) = offset + totalRead; |
304 | |
305 | return GST_FLOW_OK; |
306 | } |
307 | |
308 | void QGstQIODeviceSrc::getURI(GValue *value) const |
309 | { |
310 | auto lock = lockObject(); |
311 | if (record) |
312 | g_value_set_string(value, v_string: record->id); |
313 | else |
314 | g_value_set_string(value, v_string: nullptr); |
315 | } |
316 | |
317 | bool QGstQIODeviceSrc::setURI(const char *location, GError **err) |
318 | { |
319 | Q_ASSERT(QLatin1StringView(location).startsWith("qiodevice:/"_L1 )); |
320 | |
321 | { |
322 | auto lock = lockObject(); |
323 | GstState state = GST_STATE(this); |
324 | if (state != GST_STATE_READY && state != GST_STATE_NULL) { |
325 | g_warning( |
326 | "Changing the `uri' property on qiodevicesrc when the resource is open is not " |
327 | "supported." ); |
328 | if (err) |
329 | g_set_error(err, GST_URI_ERROR, code: GST_URI_ERROR_BAD_STATE, |
330 | format: "Changing the `uri' property on qiodevicesrc when the resource is open " |
331 | "is not supported." ); |
332 | return false; |
333 | } |
334 | } |
335 | |
336 | auto newRecord = gQIODeviceRegistry->findRecord(id: QByteArrayView{ location }); |
337 | |
338 | { |
339 | auto lock = lockObject(); |
340 | record = std::move(newRecord); |
341 | } |
342 | |
343 | g_object_notify(G_OBJECT(this), property_name: "uri" ); |
344 | |
345 | return true; |
346 | } |
347 | |
348 | struct QGstQIODeviceSrcClass |
349 | { |
350 | GstBaseSrcClass parent_class; |
351 | }; |
352 | |
353 | // GObject |
354 | static void gst_qiodevice_src_class_init(QGstQIODeviceSrcClass *klass); |
355 | static void gst_qiodevice_src_init(QGstQIODeviceSrc *self); |
356 | |
357 | GType gst_qiodevice_src_get_type(); |
358 | |
359 | template <typename T> |
360 | QGstQIODeviceSrc *asQGstQIODeviceSrc(T *obj) |
361 | { |
362 | return (G_TYPE_CHECK_INSTANCE_CAST((obj), gst_qiodevice_src_get_type(), QGstQIODeviceSrc)); |
363 | } |
364 | |
365 | // URI handler |
366 | void qGstInitQIODeviceURIHandler(gpointer, gpointer); |
367 | |
368 | #define gst_qiodevice_src_parent_class parent_class |
369 | G_DEFINE_TYPE_WITH_CODE(QGstQIODeviceSrc, gst_qiodevice_src, GST_TYPE_BASE_SRC, |
370 | G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, qGstInitQIODeviceURIHandler)); |
371 | |
372 | // implementations |
373 | |
374 | void gst_qiodevice_src_init(QGstQIODeviceSrc *self) |
375 | { |
376 | using SharedRecord = QIODeviceRegistry::SharedRecord; |
377 | |
378 | new (reinterpret_cast<void *>(&self->record)) SharedRecord; |
379 | |
380 | static constexpr guint defaultBlockSize = 16384; |
381 | gst_base_src_set_blocksize(src: &self->baseSrc, blocksize: defaultBlockSize); |
382 | } |
383 | |
384 | void gst_qiodevice_src_class_init(QGstQIODeviceSrcClass *klass) |
385 | { |
386 | // GObject |
387 | GObjectClass *gobjectClass = G_OBJECT_CLASS(klass); |
388 | gobjectClass->set_property = [](GObject *instance, guint propId, const GValue *value, |
389 | GParamSpec *pspec) { |
390 | QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: instance); |
391 | return src->setProperty(propId, value, pspec); |
392 | }; |
393 | |
394 | gobjectClass->get_property = [](GObject *instance, guint propId, GValue *value, |
395 | GParamSpec *pspec) { |
396 | QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: instance); |
397 | return src->getProperty(propId, value, pspec); |
398 | }; |
399 | |
400 | g_object_class_install_property( |
401 | oclass: gobjectClass, property_id: PROP_URI, |
402 | pspec: g_param_spec_string(name: "uri" , nick: "QRC Location" , blurb: "Path of the qrc to read" , default_value: nullptr, |
403 | flags: GParamFlags(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
404 | | GST_PARAM_MUTABLE_READY))); |
405 | |
406 | gobjectClass->finalize = [](GObject *instance) { |
407 | QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: instance); |
408 | using SharedRecord = QIODeviceRegistry::SharedRecord; |
409 | src->record.~SharedRecord(); |
410 | G_OBJECT_CLASS(parent_class)->finalize(instance); |
411 | }; |
412 | |
413 | // GstElement |
414 | GstElementClass *gstelementClass = GST_ELEMENT_CLASS(klass); |
415 | gst_element_class_set_static_metadata(klass: gstelementClass, longname: "QRC Source" , classification: "Source/QRC" , |
416 | description: "Read from arbitrary point in QRC resource" , |
417 | author: "Tim Blechmann <tim.blechmann@qt.io>" ); |
418 | |
419 | static GstStaticPadTemplate srctemplate = |
420 | GST_STATIC_PAD_TEMPLATE("src" , GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); |
421 | |
422 | gst_element_class_add_static_pad_template(klass: gstelementClass, static_templ: &srctemplate); |
423 | |
424 | // GstBaseSrc |
425 | GstBaseSrcClass *gstbasesrcClass = GST_BASE_SRC_CLASS(klass); |
426 | gstbasesrcClass->start = [](GstBaseSrc *basesrc) -> gboolean { |
427 | QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: basesrc); |
428 | return src->start(); |
429 | }; |
430 | gstbasesrcClass->stop = [](GstBaseSrc *basesrc) -> gboolean { |
431 | QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: basesrc); |
432 | return src->stop(); |
433 | }; |
434 | |
435 | gstbasesrcClass->is_seekable = [](GstBaseSrc *basesrc) -> gboolean { |
436 | QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: basesrc); |
437 | return src->isSeekable(); |
438 | }; |
439 | gstbasesrcClass->get_size = [](GstBaseSrc *basesrc, guint64 *size) -> gboolean { |
440 | QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: basesrc); |
441 | auto optionalSize = src->size(); |
442 | if (!optionalSize) |
443 | return false; |
444 | *size = optionalSize.value(); |
445 | return true; |
446 | }; |
447 | gstbasesrcClass->fill = [](GstBaseSrc *basesrc, guint64 offset, guint length, |
448 | GstBuffer *buf) -> GstFlowReturn { |
449 | QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: basesrc); |
450 | return src->fill(offset, length, buf); |
451 | }; |
452 | } |
453 | |
454 | void qGstInitQIODeviceURIHandler(gpointer g_handlerInterface, gpointer) |
455 | { |
456 | GstURIHandlerInterface *iface = (GstURIHandlerInterface *)g_handlerInterface; |
457 | |
458 | iface->get_type = [](GType) { |
459 | return GST_URI_SRC; |
460 | }; |
461 | iface->get_protocols = [](GType) { |
462 | static constexpr const gchar *protocols[] = { |
463 | "qiodevice" , |
464 | nullptr, |
465 | }; |
466 | return protocols; |
467 | }; |
468 | iface->get_uri = [](GstURIHandler *handler) -> gchar * { |
469 | QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: handler); |
470 | auto lock = src->lockObject(); |
471 | if (src->record) |
472 | return g_strdup(str: src->record->id.constData()); |
473 | return nullptr; |
474 | }; |
475 | iface->set_uri = [](GstURIHandler *handler, const gchar *uri, GError **err) -> gboolean { |
476 | QGstQIODeviceSrc *src = asQGstQIODeviceSrc(obj: handler); |
477 | return src->setURI(location: uri, err); |
478 | }; |
479 | } |
480 | |
481 | } // namespace |
482 | |
483 | // plugin registration |
484 | |
485 | void qGstRegisterQIODeviceHandler(GstPlugin *plugin) |
486 | { |
487 | gst_element_register(plugin, name: "qiodevicesrc" , rank: GST_RANK_PRIMARY, type: gst_qiodevice_src_get_type()); |
488 | } |
489 | |
490 | QUrl qGstRegisterQIODevice(QIODevice *device) |
491 | { |
492 | return QUrl{ |
493 | QString::fromLatin1(ba: gQIODeviceRegistry->registerQIODevice(device)), |
494 | }; |
495 | } |
496 | |
497 | QT_END_NAMESPACE |
498 | |