1 | // Copyright (C) 2016 The Qt Company Ltd. |
2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only |
3 | |
4 | #include <QDebug> |
5 | |
6 | #include "qgstappsrc_p.h" |
7 | #include "qgstutils_p.h" |
8 | #include "qnetworkreply.h" |
9 | #include "qloggingcategory.h" |
10 | |
11 | static Q_LOGGING_CATEGORY(qLcAppSrc, "qt.multimedia.appsrc" ) |
12 | |
13 | QT_BEGIN_NAMESPACE |
14 | |
15 | QMaybe<QGstAppSrc *> QGstAppSrc::create(QObject *parent) |
16 | { |
17 | QGstElement appsrc("appsrc" , "appsrc" ); |
18 | if (!appsrc) |
19 | return errorMessageCannotFindElement(element: "appsrc" ); |
20 | |
21 | return new QGstAppSrc(appsrc, parent); |
22 | } |
23 | |
24 | QGstAppSrc::QGstAppSrc(QGstElement appsrc, QObject *parent) |
25 | : QObject(parent), m_appSrc(std::move(appsrc)) |
26 | { |
27 | } |
28 | |
29 | QGstAppSrc::~QGstAppSrc() |
30 | { |
31 | m_appSrc.setStateSync(GST_STATE_NULL); |
32 | streamDestroyed(); |
33 | qCDebug(qLcAppSrc) << "~QGstAppSrc" ; |
34 | } |
35 | |
36 | bool QGstAppSrc::setup(QIODevice *stream, qint64 offset) |
37 | { |
38 | if (m_appSrc.isNull()) |
39 | return false; |
40 | |
41 | if (!setStream(stream, offset)) |
42 | return false; |
43 | |
44 | auto *appSrc = GST_APP_SRC(m_appSrc.element()); |
45 | GstAppSrcCallbacks m_callbacks; |
46 | memset(s: &m_callbacks, c: 0, n: sizeof(GstAppSrcCallbacks)); |
47 | m_callbacks.need_data = &QGstAppSrc::on_need_data; |
48 | m_callbacks.enough_data = &QGstAppSrc::on_enough_data; |
49 | m_callbacks.seek_data = &QGstAppSrc::on_seek_data; |
50 | gst_app_src_set_callbacks(appsrc: appSrc, callbacks: (GstAppSrcCallbacks*)&m_callbacks, user_data: this, notify: nullptr); |
51 | |
52 | m_maxBytes = gst_app_src_get_max_bytes(appsrc: appSrc); |
53 | m_suspended = false; |
54 | |
55 | if (m_sequential) |
56 | m_streamType = GST_APP_STREAM_TYPE_STREAM; |
57 | else |
58 | m_streamType = GST_APP_STREAM_TYPE_RANDOM_ACCESS; |
59 | gst_app_src_set_stream_type(appsrc: appSrc, type: m_streamType); |
60 | gst_app_src_set_size(appsrc: appSrc, size: m_sequential ? -1 : m_stream->size() - m_offset); |
61 | |
62 | m_networkReply = qobject_cast<QNetworkReply *>(object: m_stream); |
63 | m_noMoreData = true; |
64 | |
65 | return true; |
66 | } |
67 | |
68 | void QGstAppSrc::setAudioFormat(const QAudioFormat &f) |
69 | { |
70 | m_format = f; |
71 | if (!m_format.isValid()) |
72 | return; |
73 | |
74 | auto caps = QGstUtils::capsForAudioFormat(format: m_format); |
75 | Q_ASSERT(!caps.isNull()); |
76 | m_appSrc.set(property: "caps" , c: caps); |
77 | m_appSrc.set(property: "format" , i: GST_FORMAT_TIME); |
78 | } |
79 | |
80 | void QGstAppSrc::setExternalAppSrc(const QGstElement &appsrc) |
81 | { |
82 | m_appSrc = appsrc; |
83 | } |
84 | |
85 | bool QGstAppSrc::setStream(QIODevice *stream, qint64 offset) |
86 | { |
87 | if (m_stream) { |
88 | disconnect(sender: m_stream, SIGNAL(readyRead()), receiver: this, SLOT(onDataReady())); |
89 | disconnect(sender: m_stream, SIGNAL(destroyed()), receiver: this, SLOT(streamDestroyed())); |
90 | m_stream = nullptr; |
91 | } |
92 | |
93 | m_dataRequestSize = 0; |
94 | m_sequential = true; |
95 | m_maxBytes = 0; |
96 | streamedSamples = 0; |
97 | |
98 | if (stream) { |
99 | if (!stream->isOpen() && !stream->open(mode: QIODevice::ReadOnly)) |
100 | return false; |
101 | m_stream = stream; |
102 | connect(asender: m_stream, SIGNAL(destroyed()), SLOT(streamDestroyed())); |
103 | connect(sender: m_stream, SIGNAL(readyRead()), receiver: this, SLOT(onDataReady())); |
104 | m_sequential = m_stream->isSequential(); |
105 | m_offset = offset; |
106 | } |
107 | return true; |
108 | } |
109 | |
110 | QGstElement QGstAppSrc::element() |
111 | { |
112 | return m_appSrc; |
113 | } |
114 | |
115 | void QGstAppSrc::write(const char *data, qsizetype size) |
116 | { |
117 | qCDebug(qLcAppSrc) << "write" << size << m_noMoreData << m_dataRequestSize; |
118 | if (!size) |
119 | return; |
120 | Q_ASSERT(!m_stream); |
121 | m_buffer.append(data, size); |
122 | m_noMoreData = false; |
123 | pushData(); |
124 | } |
125 | |
126 | void QGstAppSrc::onDataReady() |
127 | { |
128 | qCDebug(qLcAppSrc) << "onDataReady" << m_stream->bytesAvailable() << m_stream->size(); |
129 | pushData(); |
130 | } |
131 | |
132 | void QGstAppSrc::streamDestroyed() |
133 | { |
134 | qCDebug(qLcAppSrc) << "stream destroyed" ; |
135 | m_stream = nullptr; |
136 | m_dataRequestSize = 0; |
137 | streamedSamples = 0; |
138 | sendEOS(); |
139 | } |
140 | |
141 | void QGstAppSrc::pushData() |
142 | { |
143 | if (m_appSrc.isNull() || !m_dataRequestSize || m_suspended) { |
144 | qCDebug(qLcAppSrc) << "push data: return immediately" << m_appSrc.isNull() << m_dataRequestSize << m_suspended; |
145 | return; |
146 | } |
147 | |
148 | qCDebug(qLcAppSrc) << "pushData" << (m_stream ? m_stream : nullptr) << m_buffer.size(); |
149 | if ((m_stream && m_stream->atEnd())) { |
150 | eosOrIdle(); |
151 | qCDebug(qLcAppSrc) << "end pushData" << (m_stream ? m_stream : nullptr) << m_buffer.size(); |
152 | return; |
153 | } |
154 | |
155 | qint64 size; |
156 | if (m_stream) |
157 | size = m_stream->bytesAvailable(); |
158 | else |
159 | size = m_buffer.size(); |
160 | |
161 | if (!m_dataRequestSize) |
162 | m_dataRequestSize = m_maxBytes; |
163 | size = qMin(a: size, b: (qint64)m_dataRequestSize); |
164 | qCDebug(qLcAppSrc) << " reading" << size << "bytes" << size << m_dataRequestSize; |
165 | |
166 | GstBuffer* buffer = gst_buffer_new_and_alloc(size); |
167 | |
168 | if (m_sequential || !m_stream) |
169 | buffer->offset = bytesReadSoFar; |
170 | else |
171 | buffer->offset = m_stream->pos(); |
172 | |
173 | if (m_format.isValid()) { |
174 | // timestamp raw audio data |
175 | uint nSamples = size/m_format.bytesPerFrame(); |
176 | |
177 | GST_BUFFER_TIMESTAMP(buffer) = gst_util_uint64_scale(val: streamedSamples, GST_SECOND, denom: m_format.sampleRate()); |
178 | GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale(val: nSamples, GST_SECOND, denom: m_format.sampleRate()); |
179 | streamedSamples += nSamples; |
180 | } |
181 | |
182 | GstMapInfo mapInfo; |
183 | gst_buffer_map(buffer, info: &mapInfo, flags: GST_MAP_WRITE); |
184 | void* bufferData = mapInfo.data; |
185 | |
186 | qint64 bytesRead; |
187 | if (m_stream) |
188 | bytesRead = m_stream->read(data: (char*)bufferData, maxlen: size); |
189 | else |
190 | bytesRead = m_buffer.read(data: (char*)bufferData, maxLength: size); |
191 | buffer->offset_end = buffer->offset + bytesRead - 1; |
192 | bytesReadSoFar += bytesRead; |
193 | |
194 | gst_buffer_unmap(buffer, info: &mapInfo); |
195 | qCDebug(qLcAppSrc) << "pushing bytes into gstreamer" << buffer->offset << bytesRead; |
196 | if (bytesRead == 0) { |
197 | gst_buffer_unref(buf: buffer); |
198 | eosOrIdle(); |
199 | qCDebug(qLcAppSrc) << "end pushData" << (m_stream ? m_stream : nullptr) << m_buffer.size(); |
200 | return; |
201 | } |
202 | m_noMoreData = false; |
203 | emit bytesProcessed(bytes: bytesRead); |
204 | |
205 | GstFlowReturn ret = gst_app_src_push_buffer(GST_APP_SRC(m_appSrc.element()), buffer); |
206 | if (ret == GST_FLOW_ERROR) { |
207 | qWarning() << "QGstAppSrc: push buffer error" ; |
208 | } else if (ret == GST_FLOW_FLUSHING) { |
209 | qWarning() << "QGstAppSrc: push buffer wrong state" ; |
210 | } |
211 | qCDebug(qLcAppSrc) << "end pushData" << (m_stream ? m_stream : nullptr) << m_buffer.size(); |
212 | |
213 | } |
214 | |
215 | bool QGstAppSrc::doSeek(qint64 value) |
216 | { |
217 | if (isStreamValid()) |
218 | return m_stream->seek(pos: value + m_offset); |
219 | return false; |
220 | } |
221 | |
222 | |
223 | gboolean QGstAppSrc::on_seek_data(GstAppSrc *, guint64 arg0, gpointer userdata) |
224 | { |
225 | // we do get some spurious seeks to INT_MAX, ignore those |
226 | if (arg0 == std::numeric_limits<quint64>::max()) |
227 | return true; |
228 | |
229 | QGstAppSrc *self = reinterpret_cast<QGstAppSrc*>(userdata); |
230 | Q_ASSERT(self); |
231 | if (self->m_sequential) |
232 | return false; |
233 | |
234 | QMetaObject::invokeMethod(obj: self, member: "doSeek" , c: Qt::AutoConnection, Q_ARG(qint64, arg0)); |
235 | return true; |
236 | } |
237 | |
238 | void QGstAppSrc::on_enough_data(GstAppSrc *, gpointer userdata) |
239 | { |
240 | qCDebug(qLcAppSrc) << "on_enough_data" ; |
241 | QGstAppSrc *self = static_cast<QGstAppSrc*>(userdata); |
242 | Q_ASSERT(self); |
243 | self->m_dataRequestSize = 0; |
244 | } |
245 | |
246 | void QGstAppSrc::on_need_data(GstAppSrc *, guint arg0, gpointer userdata) |
247 | { |
248 | qCDebug(qLcAppSrc) << "on_need_data requesting bytes" << arg0; |
249 | QGstAppSrc *self = static_cast<QGstAppSrc*>(userdata); |
250 | Q_ASSERT(self); |
251 | self->m_dataRequestSize = arg0; |
252 | QMetaObject::invokeMethod(obj: self, member: "pushData" , c: Qt::AutoConnection); |
253 | qCDebug(qLcAppSrc) << "done on_need_data" ; |
254 | } |
255 | |
256 | void QGstAppSrc::sendEOS() |
257 | { |
258 | qCDebug(qLcAppSrc) << "sending EOS" ; |
259 | if (m_appSrc.isNull()) |
260 | return; |
261 | |
262 | gst_app_src_end_of_stream(GST_APP_SRC(m_appSrc.element())); |
263 | } |
264 | |
265 | void QGstAppSrc::eosOrIdle() |
266 | { |
267 | qCDebug(qLcAppSrc) << "eosOrIdle" ; |
268 | if (m_appSrc.isNull()) |
269 | return; |
270 | |
271 | if (!m_sequential) { |
272 | sendEOS(); |
273 | return; |
274 | } |
275 | if (m_noMoreData) |
276 | return; |
277 | qCDebug(qLcAppSrc) << " idle!" ; |
278 | m_noMoreData = true; |
279 | emit noMoreData(); |
280 | } |
281 | |
282 | QT_END_NAMESPACE |
283 | |
284 | #include "moc_qgstappsrc_p.cpp" |
285 | |