1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#include <QtCore/qcoreapplication.h>
5#include <QtCore/qdebug.h>
6#include <QtCore/qmath.h>
7#include <private/qaudiohelpers_p.h>
8
9#include "qpulseaudiosource_p.h"
10#include "qpulseaudio_contextmanager_p.h"
11#include "qpulsehelpers_p.h"
12#include <sys/types.h>
13#include <unistd.h>
14#include <mutex> // for lock_guard
15
16QT_BEGIN_NAMESPACE
17
18const int SourcePeriodTimeMs = 50;
19
20static void inputStreamReadCallback(pa_stream *stream, size_t length, void *userdata)
21{
22 Q_UNUSED(userdata);
23 Q_UNUSED(length);
24 Q_UNUSED(stream);
25 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
26 pa_threaded_mainloop_signal(m: pulseEngine->mainloop(), wait_for_accept: 0);
27}
28
29static void inputStreamStateCallback(pa_stream *stream, void *userdata)
30{
31 using namespace QPulseAudioInternal;
32
33 Q_UNUSED(userdata);
34 pa_stream_state_t state = pa_stream_get_state(p: stream);
35 qCDebug(qLcPulseAudioIn) << "Stream state: " << state;
36 switch (state) {
37 case PA_STREAM_CREATING:
38 break;
39 case PA_STREAM_READY:
40 if (Q_UNLIKELY(qLcPulseAudioIn().isEnabled(QtDebugMsg))) {
41 QPulseAudioSource *audioInput = static_cast<QPulseAudioSource *>(userdata);
42 const pa_buffer_attr *buffer_attr = pa_stream_get_buffer_attr(s: stream);
43 qCDebug(qLcPulseAudioIn) << "*** maxlength: " << buffer_attr->maxlength;
44 qCDebug(qLcPulseAudioIn) << "*** prebuf: " << buffer_attr->prebuf;
45 qCDebug(qLcPulseAudioIn) << "*** fragsize: " << buffer_attr->fragsize;
46 qCDebug(qLcPulseAudioIn) << "*** minreq: " << buffer_attr->minreq;
47 qCDebug(qLcPulseAudioIn) << "*** tlength: " << buffer_attr->tlength;
48
49 pa_sample_spec spec =
50 QPulseAudioInternal::audioFormatToSampleSpec(format: audioInput->format());
51 qCDebug(qLcPulseAudioIn)
52 << "*** bytes_to_usec: " << pa_bytes_to_usec(length: buffer_attr->fragsize, spec: &spec);
53 }
54 break;
55 case PA_STREAM_TERMINATED:
56 break;
57 case PA_STREAM_FAILED:
58 default:
59 qWarning() << "Stream error: " << currentError(stream);
60 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
61 pa_threaded_mainloop_signal(m: pulseEngine->mainloop(), wait_for_accept: 0);
62 break;
63 }
64}
65
66static void inputStreamUnderflowCallback(pa_stream *stream, void *userdata)
67{
68 Q_UNUSED(userdata);
69 Q_UNUSED(stream);
70 qWarning() << "Got a buffer underflow!";
71}
72
73static void inputStreamOverflowCallback(pa_stream *stream, void *userdata)
74{
75 Q_UNUSED(stream);
76 Q_UNUSED(userdata);
77 qWarning() << "Got a buffer overflow!";
78}
79
80static void inputStreamSuccessCallback(pa_stream *stream, int success, void *userdata)
81{
82 Q_UNUSED(stream);
83 Q_UNUSED(userdata);
84 Q_UNUSED(success);
85
86 // if (!success)
87 // TODO: Is cork success? i->operation_success = success;
88
89 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
90 pa_threaded_mainloop_signal(m: pulseEngine->mainloop(), wait_for_accept: 0);
91}
92
93QPulseAudioSource::QPulseAudioSource(const QByteArray &device, QObject *parent)
94 : QPlatformAudioSource(parent),
95 m_totalTimeValue(0),
96 m_audioSource(nullptr),
97 m_volume(qreal(1.0f)),
98 m_pullMode(true),
99 m_opened(false),
100 m_bufferSize(0),
101 m_periodSize(0),
102 m_periodTime(SourcePeriodTimeMs),
103 m_device(device),
104 m_stateMachine(*this)
105{
106}
107
108QPulseAudioSource::~QPulseAudioSource()
109{
110 // TODO: Investigate draining the stream
111 if (auto notifier = m_stateMachine.stop())
112 close();
113}
114
115QAudio::Error QPulseAudioSource::error() const
116{
117 return m_stateMachine.error();
118}
119
120QAudio::State QPulseAudioSource::state() const
121{
122 return m_stateMachine.state();
123}
124
125void QPulseAudioSource::setFormat(const QAudioFormat &format)
126{
127 if (!m_stateMachine.isActiveOrIdle())
128 m_format = format;
129}
130
131QAudioFormat QPulseAudioSource::format() const
132{
133 return m_format;
134}
135
136void QPulseAudioSource::start(QIODevice *device)
137{
138 reset();
139
140 if (!open())
141 return;
142
143 m_pullMode = true;
144 m_audioSource = device;
145
146 m_stateMachine.start();
147}
148
149QIODevice *QPulseAudioSource::start()
150{
151 reset();
152
153 if (!open())
154 return nullptr;
155
156 m_pullMode = false;
157 m_audioSource = new PulseInputPrivate(this);
158 m_audioSource->open(mode: QIODevice::ReadOnly | QIODevice::Unbuffered);
159
160 m_stateMachine.start(activeOrIdle: QAudioStateMachine::RunningState::Idle);
161
162 return m_audioSource;
163}
164
165void QPulseAudioSource::stop()
166{
167 if (auto notifier = m_stateMachine.stop())
168 close();
169}
170
171bool QPulseAudioSource::open()
172{
173 if (m_opened)
174 return true;
175
176 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
177
178 if (!pulseEngine->context()
179 || pa_context_get_state(c: pulseEngine->context()) != PA_CONTEXT_READY) {
180 m_stateMachine.stopOrUpdateError(error: QAudio::FatalError);
181 return false;
182 }
183
184 pa_sample_spec spec = QPulseAudioInternal::audioFormatToSampleSpec(format: m_format);
185 pa_channel_map channel_map = QPulseAudioInternal::channelMapForAudioFormat(format: m_format);
186 Q_ASSERT(spec.channels == channel_map.channels);
187
188 if (!pa_sample_spec_valid(spec: &spec)) {
189 m_stateMachine.stopOrUpdateError(error: QAudio::OpenError);
190 return false;
191 }
192
193 m_spec = spec;
194
195 //if (Q_UNLIKELY(qLcPulseAudioIn().isEnabled(QtDebugMsg)) {
196 // QTime now(QTime::currentTime());
197 // qCDebug(qLcPulseAudioIn) << now.second() << "s " << now.msec() << "ms :open()";
198 //}
199
200 if (m_streamName.isNull())
201 m_streamName =
202 QStringLiteral("QtmPulseStream-%1-%2").arg(a: ::getpid()).arg(a: quintptr(this)).toUtf8();
203
204 if (Q_UNLIKELY(qLcPulseAudioIn().isEnabled(QtDebugMsg))) {
205 qCDebug(qLcPulseAudioIn) << "Format: " << spec.format;
206 qCDebug(qLcPulseAudioIn) << "Rate: " << spec.rate;
207 qCDebug(qLcPulseAudioIn) << "Channels: " << spec.channels;
208 qCDebug(qLcPulseAudioIn) << "Frame size: " << pa_frame_size(spec: &spec);
209 }
210
211 std::unique_lock engineLock{ *pulseEngine };
212
213 m_stream = PAStreamHandle{
214 pa_stream_new(c: pulseEngine->context(), name: m_streamName.constData(), ss: &spec, map: &channel_map),
215 PAStreamHandle::HasRef,
216 };
217
218 pa_stream_set_state_callback(s: m_stream.get(), cb: inputStreamStateCallback, userdata: this);
219 pa_stream_set_read_callback(p: m_stream.get(), cb: inputStreamReadCallback, userdata: this);
220
221 pa_stream_set_underflow_callback(p: m_stream.get(), cb: inputStreamUnderflowCallback, userdata: this);
222 pa_stream_set_overflow_callback(p: m_stream.get(), cb: inputStreamOverflowCallback, userdata: this);
223
224 m_periodSize = pa_usec_to_bytes(t: SourcePeriodTimeMs * 1000, spec: &spec);
225
226 int flags = 0;
227 pa_buffer_attr buffer_attr;
228 buffer_attr.maxlength = static_cast<uint32_t>(-1);
229 buffer_attr.prebuf = static_cast<uint32_t>(-1);
230 buffer_attr.tlength = static_cast<uint32_t>(-1);
231 buffer_attr.minreq = static_cast<uint32_t>(-1);
232 flags |= PA_STREAM_ADJUST_LATENCY;
233
234 if (m_bufferSize > 0)
235 buffer_attr.fragsize = static_cast<uint32_t>(m_bufferSize);
236 else
237 buffer_attr.fragsize = static_cast<uint32_t>(m_periodSize);
238
239 flags |= PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING;
240
241 int connectionResult = pa_stream_connect_record(s: m_stream.get(), dev: m_device.data(), attr: &buffer_attr,
242 flags: static_cast<pa_stream_flags_t>(flags));
243 if (connectionResult < 0) {
244 qWarning() << "pa_stream_connect_record() failed!";
245 m_stream = {};
246 engineLock.unlock();
247 m_stateMachine.stopOrUpdateError(error: QAudio::OpenError);
248 return false;
249 }
250
251 //if (Q_UNLIKELY(qLcPulseAudioIn().isEnabled(QtDebugMsg))) {
252 // auto *ss = pa_stream_get_sample_spec(m_stream);
253 // qCDebug(qLcPulseAudioIn) << "connected stream:";
254 // qCDebug(qLcPulseAudioIn) << " channels" << ss->channels << spec.channels;
255 // qCDebug(qLcPulseAudioIn) << " format" << ss->format << spec.format;
256 // qCDebug(qLcPulseAudioIn) << " rate" << ss->rate << spec.rate;
257 //}
258
259 while (pa_stream_get_state(p: m_stream.get()) != PA_STREAM_READY)
260 pa_threaded_mainloop_wait(m: pulseEngine->mainloop());
261
262 const pa_buffer_attr *actualBufferAttr = pa_stream_get_buffer_attr(s: m_stream.get());
263 m_periodSize = actualBufferAttr->fragsize;
264 m_periodTime = pa_bytes_to_usec(length: m_periodSize, spec: &spec) / 1000;
265 if (actualBufferAttr->tlength != static_cast<uint32_t>(-1))
266 m_bufferSize = actualBufferAttr->tlength;
267
268 engineLock.unlock();
269
270 connect(sender: pulseEngine, signal: &QPulseAudioContextManager::contextFailed, context: this,
271 slot: &QPulseAudioSource::onPulseContextFailed);
272
273 m_opened = true;
274 m_timer.start(msec: m_periodTime, obj: this);
275
276 m_elapsedTimeOffset = 0;
277 m_totalTimeValue = 0;
278
279 return true;
280}
281
282void QPulseAudioSource::close()
283{
284 if (!m_opened)
285 return;
286
287 m_timer.stop();
288
289 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
290
291 if (m_stream) {
292 std::lock_guard lock(*pulseEngine);
293
294 pa_stream_set_state_callback(s: m_stream.get(), cb: nullptr, userdata: nullptr);
295 pa_stream_set_read_callback(p: m_stream.get(), cb: nullptr, userdata: nullptr);
296 pa_stream_set_underflow_callback(p: m_stream.get(), cb: nullptr, userdata: nullptr);
297 pa_stream_set_overflow_callback(p: m_stream.get(), cb: nullptr, userdata: nullptr);
298
299 pa_stream_disconnect(s: m_stream.get());
300 m_stream = {};
301 }
302
303 disconnect(sender: pulseEngine, signal: &QPulseAudioContextManager::contextFailed, receiver: this,
304 slot: &QPulseAudioSource::onPulseContextFailed);
305
306 if (!m_pullMode && m_audioSource) {
307 delete m_audioSource;
308 m_audioSource = nullptr;
309 }
310 m_opened = false;
311}
312
313qsizetype QPulseAudioSource::bytesReady() const
314{
315 using namespace QPulseAudioInternal;
316
317 if (!m_stateMachine.isActiveOrIdle())
318 return 0;
319
320 std::lock_guard lock(*QPulseAudioContextManager::instance());
321
322 qsizetype tempBufferSize = 0;
323 if (!m_pullMode && !m_tempBuffer.isEmpty()) {
324 tempBufferSize = m_tempBuffer.size();
325 }
326 int bytes = pa_stream_readable_size(p: m_stream.get());
327 if (bytes < 0) {
328 qWarning() << "pa_stream_readable_size() failed:" << currentError(m_stream.get());
329 return tempBufferSize;
330 }
331
332 return static_cast<qsizetype>(bytes) + tempBufferSize;
333}
334
335qint64 QPulseAudioSource::read(char *data, qint64 len)
336{
337 using namespace QPulseAudioInternal;
338
339 Q_ASSERT(data != nullptr || len == 0);
340
341 m_stateMachine.updateActiveOrIdle(activeOrIdle: QAudioStateMachine::RunningState::Active, error: QAudio::NoError);
342 int readBytes = 0;
343
344 if (!m_pullMode && !m_tempBuffer.isEmpty()) {
345 readBytes = qMin(a: static_cast<int>(len), b: m_tempBuffer.size());
346 if (readBytes)
347 memcpy(dest: data, src: m_tempBuffer.constData(), n: readBytes);
348 m_totalTimeValue += readBytes;
349
350 if (readBytes < m_tempBuffer.size()) {
351 m_tempBuffer.remove(index: 0, len: readBytes);
352 return readBytes;
353 }
354
355 m_tempBuffer.clear();
356 }
357
358 while (pa_stream_readable_size(p: m_stream.get()) > 0) {
359 size_t readLength = 0;
360
361 if (Q_UNLIKELY(qLcPulseAudioIn().isEnabled(QtDebugMsg))) {
362 auto readableSize = pa_stream_readable_size(p: m_stream.get());
363 qCDebug(qLcPulseAudioIn) << "QPulseAudioSource::read -- " << readableSize
364 << " bytes available from pulse audio";
365 }
366
367 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
368 std::unique_lock engineLock{ *pulseEngine };
369
370 const void *audioBuffer;
371
372 // Second and third parameters (audioBuffer and length) to pa_stream_peek are output
373 // parameters, the audioBuffer pointer is set to point to the actual pulse audio data, and
374 // the length is set to the length of this data.
375 if (pa_stream_peek(p: m_stream.get(), data: &audioBuffer, nbytes: &readLength) < 0) {
376 qWarning() << "pa_stream_peek() failed:" << currentError(m_stream.get());
377 return 0;
378 }
379
380 qint64 actualLength = 0;
381 if (m_pullMode) {
382 QByteArray adjusted(readLength, Qt::Uninitialized);
383 applyVolume(src: audioBuffer, dest: adjusted.data(), len: readLength);
384 actualLength = m_audioSource->write(data: adjusted);
385
386 if (actualLength < qint64(readLength)) {
387 engineLock.unlock();
388 m_stateMachine.updateActiveOrIdle(activeOrIdle: QAudioStateMachine::RunningState::Idle,
389 error: QAudio::UnderrunError);
390 return actualLength;
391 }
392 } else {
393 actualLength = qMin(a: static_cast<int>(len - readBytes), b: static_cast<int>(readLength));
394 applyVolume(src: audioBuffer, dest: data + readBytes, len: actualLength);
395 }
396
397 qCDebug(qLcPulseAudioIn) << "QPulseAudioSource::read -- wrote " << actualLength
398 << " to client";
399
400 if (actualLength < qint64(readLength)) {
401 int diff = readLength - actualLength;
402 int oldSize = m_tempBuffer.size();
403
404 qCDebug(qLcPulseAudioIn) << "QPulseAudioSource::read -- appending " << diff
405 << " bytes of data to temp buffer";
406
407 m_tempBuffer.resize(size: m_tempBuffer.size() + diff);
408 applyVolume(src: static_cast<const char *>(audioBuffer) + actualLength,
409 dest: m_tempBuffer.data() + oldSize, len: diff);
410 QMetaObject::invokeMethod(obj: this, member: "userFeed", c: Qt::QueuedConnection);
411 }
412
413 m_totalTimeValue += actualLength;
414 readBytes += actualLength;
415
416 pa_stream_drop(p: m_stream.get());
417 engineLock.unlock();
418
419 if (!m_pullMode && readBytes >= len)
420 break;
421 }
422
423 qCDebug(qLcPulseAudioIn) << "QPulseAudioSource::read -- returning after reading " << readBytes
424 << " bytes";
425
426 return readBytes;
427}
428
429void QPulseAudioSource::applyVolume(const void *src, void *dest, int len) const
430{
431 QAudioHelperInternal::applyVolume(volume: m_volume, m_format,
432 source: QSpan{ reinterpret_cast<const std::byte *>(src), len },
433 destination: QSpan{ reinterpret_cast<std::byte *>(dest), len });
434}
435
436void QPulseAudioSource::resume()
437{
438 if (auto notifier = m_stateMachine.resume()) {
439 {
440 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
441
442 std::lock_guard lock(*pulseEngine);
443
444 PAOperationHandle operation{
445 pa_stream_cork(s: m_stream.get(), b: 0, cb: inputStreamSuccessCallback, userdata: nullptr),
446 PAOperationHandle::HasRef,
447 };
448 pulseEngine->wait(op: operation);
449 }
450
451 m_timer.start(msec: m_periodTime, obj: this);
452 }
453}
454
455void QPulseAudioSource::setVolume(qreal vol)
456{
457 if (qFuzzyCompare(p1: m_volume, p2: vol))
458 return;
459
460 m_volume = qBound(min: qreal(0), val: vol, max: qreal(1));
461}
462
463qreal QPulseAudioSource::volume() const
464{
465 return m_volume;
466}
467
468void QPulseAudioSource::setBufferSize(qsizetype value)
469{
470 m_bufferSize = value;
471}
472
473qsizetype QPulseAudioSource::bufferSize() const
474{
475 return m_bufferSize;
476}
477
478qint64 QPulseAudioSource::processedUSecs() const
479{
480 if (!m_stream)
481 return 0;
482 pa_usec_t usecs = 0;
483 int result = pa_stream_get_time(s: m_stream.get(), r_usec: &usecs);
484 Q_UNUSED(result);
485 //if (result != 0)
486 // qWarning() << "no timing info from pulse";
487
488 return usecs;
489}
490
491void QPulseAudioSource::suspend()
492{
493 if (auto notifier = m_stateMachine.suspend()) {
494 m_timer.stop();
495
496 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
497
498 std::lock_guard lock(*pulseEngine);
499
500 PAOperationHandle operation{
501 pa_stream_cork(s: m_stream.get(), b: 1, cb: inputStreamSuccessCallback, userdata: nullptr),
502 PAOperationHandle::HasRef,
503 };
504 pulseEngine->wait(op: operation);
505 }
506}
507
508void QPulseAudioSource::timerEvent(QTimerEvent *event)
509{
510 if (event->timerId() == m_timer.timerId())
511 userFeed();
512
513 QPlatformAudioSource::timerEvent(event);
514}
515
516void QPulseAudioSource::userFeed()
517{
518 if (!m_stateMachine.isActiveOrIdle())
519 return;
520
521 //if (Q_UNLIKELY(qLcPulseAudioIn().isEnabled(QtDebugMsg)) {
522 // QTime now(QTime::currentTime());
523 // qCDebug(qLcPulseAudioIn) << now.second() << "s " << now.msec() << "ms :userFeed() IN";
524 //}
525
526 if (m_pullMode) {
527 // reads some audio data and writes it to QIODevice
528 read(data: nullptr,len: 0);
529 } else if (m_audioSource != nullptr) {
530 // emits readyRead() so user will call read() on QIODevice to get some audio data
531 PulseInputPrivate *a = qobject_cast<PulseInputPrivate*>(object: m_audioSource);
532 a->trigger();
533 }
534}
535
536void QPulseAudioSource::reset()
537{
538 if (auto notifier = m_stateMachine.stopOrUpdateError())
539 close();
540}
541
542void QPulseAudioSource::onPulseContextFailed()
543{
544 if (auto notifier = m_stateMachine.stopOrUpdateError(error: QAudio::FatalError))
545 close();
546}
547
548PulseInputPrivate::PulseInputPrivate(QPulseAudioSource *audio)
549{
550 m_audioDevice = qobject_cast<QPulseAudioSource *>(object: audio);
551}
552
553qint64 PulseInputPrivate::bytesAvailable() const
554{
555 return m_audioDevice->bytesReady();
556}
557
558qint64 PulseInputPrivate::readData(char *data, qint64 len)
559{
560 return m_audioDevice->read(data, len);
561}
562
563qint64 PulseInputPrivate::writeData(const char *data, qint64 len)
564{
565 Q_UNUSED(data);
566 Q_UNUSED(len);
567 return 0;
568}
569
570void PulseInputPrivate::trigger()
571{
572 emit readyRead();
573}
574
575QT_END_NAMESPACE
576
577#include "moc_qpulseaudiosource_p.cpp"
578

Provided by KDAB

Privacy Policy
Learn to use CMake with our Intro Training
Find out more

source code of qtmultimedia/src/multimedia/pulseaudio/qpulseaudiosource.cpp