1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#include <QtCore/qcoreapplication.h>
5#include <QtCore/qdebug.h>
6#include <QtCore/qmath.h>
7#include <private/qaudiohelpers_p.h>
8
9#include "qpulseaudiosource_p.h"
10#include "qaudioengine_pulse_p.h"
11#include "qpulseaudiodevice_p.h"
12#include "qpulsehelpers_p.h"
13#include <sys/types.h>
14#include <unistd.h>
15#include <mutex> // for lock_guard
16
17QT_BEGIN_NAMESPACE
18
19const int SourcePeriodTimeMs = 50;
20
21static void inputStreamReadCallback(pa_stream *stream, size_t length, void *userdata)
22{
23 Q_UNUSED(userdata);
24 Q_UNUSED(length);
25 Q_UNUSED(stream);
26 QPulseAudioEngine *pulseEngine = QPulseAudioEngine::instance();
27 pa_threaded_mainloop_signal(m: pulseEngine->mainloop(), wait_for_accept: 0);
28}
29
30static void inputStreamStateCallback(pa_stream *stream, void *userdata)
31{
32 Q_UNUSED(userdata);
33 pa_stream_state_t state = pa_stream_get_state(p: stream);
34#ifdef DEBUG_PULSE
35 qDebug() << "Stream state: " << QPulseAudioInternal::stateToQString(state);
36#endif
37 switch (state) {
38 case PA_STREAM_CREATING:
39 break;
40 case PA_STREAM_READY: {
41#ifdef DEBUG_PULSE
42 QPulseAudioSource *audioInput = static_cast<QPulseAudioSource*>(userdata);
43 const pa_buffer_attr *buffer_attr = pa_stream_get_buffer_attr(stream);
44 qDebug() << "*** maxlength: " << buffer_attr->maxlength;
45 qDebug() << "*** prebuf: " << buffer_attr->prebuf;
46 qDebug() << "*** fragsize: " << buffer_attr->fragsize;
47 qDebug() << "*** minreq: " << buffer_attr->minreq;
48 qDebug() << "*** tlength: " << buffer_attr->tlength;
49
50 pa_sample_spec spec = QPulseAudioInternal::audioFormatToSampleSpec(audioInput->format());
51 qDebug() << "*** bytes_to_usec: " << pa_bytes_to_usec(buffer_attr->fragsize, &spec);
52#endif
53 }
54 break;
55 case PA_STREAM_TERMINATED:
56 break;
57 case PA_STREAM_FAILED:
58 default:
59 qWarning() << QString::fromLatin1(ba: "Stream error: %1").arg(a: QString::fromUtf8(utf8: pa_strerror(error: pa_context_errno(c: pa_stream_get_context(p: stream)))));
60 QPulseAudioEngine *pulseEngine = QPulseAudioEngine::instance();
61 pa_threaded_mainloop_signal(m: pulseEngine->mainloop(), wait_for_accept: 0);
62 break;
63 }
64}
65
66static void inputStreamUnderflowCallback(pa_stream *stream, void *userdata)
67{
68 Q_UNUSED(userdata);
69 Q_UNUSED(stream);
70 qWarning() << "Got a buffer underflow!";
71}
72
73static void inputStreamOverflowCallback(pa_stream *stream, void *userdata)
74{
75 Q_UNUSED(stream);
76 Q_UNUSED(userdata);
77 qWarning() << "Got a buffer overflow!";
78}
79
80static void inputStreamSuccessCallback(pa_stream *stream, int success, void *userdata)
81{
82 Q_UNUSED(stream);
83 Q_UNUSED(userdata);
84 Q_UNUSED(success);
85
86 //if (!success)
87 //TODO: Is cork success? i->operation_success = success;
88
89 QPulseAudioEngine *pulseEngine = QPulseAudioEngine::instance();
90 pa_threaded_mainloop_signal(m: pulseEngine->mainloop(), wait_for_accept: 0);
91}
92
93QPulseAudioSource::QPulseAudioSource(const QByteArray &device, QObject *parent)
94 : QPlatformAudioSource(parent)
95 , m_totalTimeValue(0)
96 , m_audioSource(nullptr)
97 , m_errorState(QAudio::NoError)
98 , m_deviceState(QAudio::StoppedState)
99 , m_volume(qreal(1.0f))
100 , m_pullMode(true)
101 , m_opened(false)
102 , m_bytesAvailable(0)
103 , m_bufferSize(0)
104 , m_periodSize(0)
105 , m_periodTime(SourcePeriodTimeMs)
106 , m_stream(nullptr)
107 , m_device(device)
108{
109 m_timer = new QTimer(this);
110 connect(asender: m_timer, SIGNAL(timeout()), SLOT(userFeed()));
111}
112
113QPulseAudioSource::~QPulseAudioSource()
114{
115 close();
116 disconnect(receiver: m_timer, SIGNAL(timeout()));
117 QCoreApplication::processEvents();
118 delete m_timer;
119}
120
121void QPulseAudioSource::setError(QAudio::Error error)
122{
123 if (m_errorState == error)
124 return;
125
126 m_errorState = error;
127 emit errorChanged(error);
128}
129
130QAudio::Error QPulseAudioSource::error() const
131{
132 return m_errorState;
133}
134
135void QPulseAudioSource::setState(QAudio::State state)
136{
137 if (m_deviceState == state)
138 return;
139
140 m_deviceState = state;
141 emit stateChanged(state);
142}
143
144QAudio::State QPulseAudioSource::state() const
145{
146 return m_deviceState;
147}
148
149void QPulseAudioSource::setFormat(const QAudioFormat &format)
150{
151 if (m_deviceState == QAudio::StoppedState)
152 m_format = format;
153}
154
155QAudioFormat QPulseAudioSource::format() const
156{
157 return m_format;
158}
159
160void QPulseAudioSource::start(QIODevice *device)
161{
162 setState(QAudio::StoppedState);
163 setError(QAudio::NoError);
164
165 if (!m_pullMode && m_audioSource) {
166 delete m_audioSource;
167 m_audioSource = nullptr;
168 }
169
170 close();
171
172 if (!open())
173 return;
174
175 m_pullMode = true;
176 m_audioSource = device;
177
178 setState(QAudio::ActiveState);
179}
180
181QIODevice *QPulseAudioSource::start()
182{
183 setState(QAudio::StoppedState);
184 setError(QAudio::NoError);
185
186 if (!m_pullMode && m_audioSource) {
187 delete m_audioSource;
188 m_audioSource = nullptr;
189 }
190
191 close();
192
193 if (!open())
194 return nullptr;
195
196 m_pullMode = false;
197 m_audioSource = new PulseInputPrivate(this);
198 m_audioSource->open(mode: QIODevice::ReadOnly | QIODevice::Unbuffered);
199
200 setState(QAudio::IdleState);
201
202 return m_audioSource;
203}
204
205void QPulseAudioSource::stop()
206{
207 if (m_deviceState == QAudio::StoppedState)
208 return;
209
210 close();
211
212 setError(QAudio::NoError);
213 setState(QAudio::StoppedState);
214}
215
216bool QPulseAudioSource::open()
217{
218 if (m_opened)
219 return true;
220
221 QPulseAudioEngine *pulseEngine = QPulseAudioEngine::instance();
222
223 if (!pulseEngine->context() || pa_context_get_state(c: pulseEngine->context()) != PA_CONTEXT_READY) {
224 setError(QAudio::FatalError);
225 setState(QAudio::StoppedState);
226 return false;
227 }
228
229 pa_sample_spec spec = QPulseAudioInternal::audioFormatToSampleSpec(format: m_format);
230 pa_channel_map channel_map = QPulseAudioInternal::channelMapForAudioFormat(format: m_format);
231 Q_ASSERT(spec.channels == channel_map.channels);
232
233 if (!pa_sample_spec_valid(spec: &spec)) {
234 setError(QAudio::OpenError);
235 setState(QAudio::StoppedState);
236 return false;
237 }
238
239 m_spec = spec;
240
241#ifdef DEBUG_PULSE
242// QTime now(QTime::currentTime());
243// qDebug()<<now.second()<<"s "<<now.msec()<<"ms :open()";
244#endif
245
246 if (m_streamName.isNull())
247 m_streamName = QString(QLatin1String("QtmPulseStream-%1-%2")).arg(a: ::getpid()).arg(a: quintptr(this)).toUtf8();
248
249#ifdef DEBUG_PULSE
250 qDebug() << "Format: " << QPulseAudioInternal::sampleFormatToQString(spec.format);
251 qDebug() << "Rate: " << spec.rate;
252 qDebug() << "Channels: " << spec.channels;
253 qDebug() << "Frame size: " << pa_frame_size(&spec);
254#endif
255
256 pulseEngine->lock();
257
258 m_stream = pa_stream_new(c: pulseEngine->context(), name: m_streamName.constData(), ss: &spec, map: &channel_map);
259
260 pa_stream_set_state_callback(s: m_stream, cb: inputStreamStateCallback, userdata: this);
261 pa_stream_set_read_callback(p: m_stream, cb: inputStreamReadCallback, userdata: this);
262
263 pa_stream_set_underflow_callback(p: m_stream, cb: inputStreamUnderflowCallback, userdata: this);
264 pa_stream_set_overflow_callback(p: m_stream, cb: inputStreamOverflowCallback, userdata: this);
265
266 m_periodSize = pa_usec_to_bytes(t: SourcePeriodTimeMs*1000, spec: &spec);
267
268 int flags = 0;
269 pa_buffer_attr buffer_attr;
270 buffer_attr.maxlength = (uint32_t) -1;
271 buffer_attr.prebuf = (uint32_t) -1;
272 buffer_attr.tlength = (uint32_t) -1;
273 buffer_attr.minreq = (uint32_t) -1;
274 flags |= PA_STREAM_ADJUST_LATENCY;
275
276 if (m_bufferSize > 0)
277 buffer_attr.fragsize = (uint32_t) m_bufferSize;
278 else
279 buffer_attr.fragsize = (uint32_t) m_periodSize;
280
281 flags |= PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_INTERPOLATE_TIMING;
282 if (pa_stream_connect_record(s: m_stream, dev: m_device.data(), attr: &buffer_attr, flags: (pa_stream_flags_t)flags) < 0) {
283 qWarning() << "pa_stream_connect_record() failed!";
284 pa_stream_unref(s: m_stream);
285 m_stream = nullptr;
286 pulseEngine->unlock();
287 setError(QAudio::OpenError);
288 setState(QAudio::StoppedState);
289 return false;
290 }
291
292// auto *ss = pa_stream_get_sample_spec(m_stream);
293// qDebug() << "connected stream:";
294// qDebug() << " channels" << ss->channels << spec.channels;
295// qDebug() << " format" << ss->format << spec.format;
296// qDebug() << " rate" << ss->rate << spec.rate;
297
298 while (pa_stream_get_state(p: m_stream) != PA_STREAM_READY)
299 pa_threaded_mainloop_wait(m: pulseEngine->mainloop());
300
301 const pa_buffer_attr *actualBufferAttr = pa_stream_get_buffer_attr(s: m_stream);
302 m_periodSize = actualBufferAttr->fragsize;
303 m_periodTime = pa_bytes_to_usec(length: m_periodSize, spec: &spec) / 1000;
304 if (actualBufferAttr->tlength != (uint32_t)-1)
305 m_bufferSize = actualBufferAttr->tlength;
306
307 pulseEngine->unlock();
308
309 connect(sender: pulseEngine, signal: &QPulseAudioEngine::contextFailed, context: this, slot: &QPulseAudioSource::onPulseContextFailed);
310
311 m_opened = true;
312 m_timer->start(msec: m_periodTime);
313
314 m_elapsedTimeOffset = 0;
315 m_totalTimeValue = 0;
316
317 return true;
318}
319
320void QPulseAudioSource::close()
321{
322 if (!m_opened)
323 return;
324
325 m_timer->stop();
326
327 QPulseAudioEngine *pulseEngine = QPulseAudioEngine::instance();
328
329 if (m_stream) {
330 std::lock_guard lock(*pulseEngine);
331
332 pa_stream_set_state_callback(s: m_stream, cb: nullptr, userdata: nullptr);
333 pa_stream_set_read_callback(p: m_stream, cb: nullptr, userdata: nullptr);
334 pa_stream_set_underflow_callback(p: m_stream, cb: nullptr, userdata: nullptr);
335 pa_stream_set_overflow_callback(p: m_stream, cb: nullptr, userdata: nullptr);
336
337 pa_stream_disconnect(s: m_stream);
338 pa_stream_unref(s: m_stream);
339 m_stream = nullptr;
340 }
341
342 disconnect(sender: pulseEngine, signal: &QPulseAudioEngine::contextFailed, receiver: this, slot: &QPulseAudioSource::onPulseContextFailed);
343
344 if (!m_pullMode && m_audioSource) {
345 delete m_audioSource;
346 m_audioSource = nullptr;
347 }
348 m_opened = false;
349}
350
351int QPulseAudioSource::checkBytesReady()
352{
353 if (m_deviceState != QAudio::ActiveState && m_deviceState != QAudio::IdleState) {
354 m_bytesAvailable = 0;
355 } else {
356 m_bytesAvailable = pa_stream_readable_size(p: m_stream);
357 }
358
359 return m_bytesAvailable;
360}
361
362qsizetype QPulseAudioSource::bytesReady() const
363{
364 return qMax(a: m_bytesAvailable, b: 0);
365}
366
367qint64 QPulseAudioSource::read(char *data, qint64 len)
368{
369 Q_ASSERT(data != nullptr || len == 0);
370
371 m_bytesAvailable = checkBytesReady();
372
373 setError(QAudio::NoError);
374 if (state() == QAudio::IdleState)
375 setState(QAudio::ActiveState);
376
377 int readBytes = 0;
378
379 if (!m_pullMode && !m_tempBuffer.isEmpty()) {
380 readBytes = qMin(a: static_cast<int>(len), b: m_tempBuffer.size());
381 if (readBytes)
382 memcpy(dest: data, src: m_tempBuffer.constData(), n: readBytes);
383 m_totalTimeValue += readBytes;
384
385 if (readBytes < m_tempBuffer.size()) {
386 m_tempBuffer.remove(index: 0, len: readBytes);
387 return readBytes;
388 }
389
390 m_tempBuffer.clear();
391 }
392
393 while (pa_stream_readable_size(p: m_stream) > 0) {
394 size_t readLength = 0;
395
396#ifdef DEBUG_PULSE
397 qDebug() << "QPulseAudioSource::read -- " << pa_stream_readable_size(m_stream) << " bytes available from pulse audio";
398#endif
399
400 QPulseAudioEngine *pulseEngine = QPulseAudioEngine::instance();
401 pulseEngine->lock();
402
403 const void *audioBuffer;
404
405 // Second and third parameters (audioBuffer and length) to pa_stream_peek are output parameters,
406 // the audioBuffer pointer is set to point to the actual pulse audio data,
407 // and the length is set to the length of this data.
408 if (pa_stream_peek(p: m_stream, data: &audioBuffer, nbytes: &readLength) < 0) {
409 qWarning() << QString::fromLatin1(ba: "pa_stream_peek() failed: %1")
410 .arg(a: QString::fromUtf8(utf8: pa_strerror(error: pa_context_errno(c: pa_stream_get_context(p: m_stream)))));
411 pulseEngine->unlock();
412 return 0;
413 }
414
415 qint64 actualLength = 0;
416 if (m_pullMode) {
417 QByteArray adjusted(readLength, Qt::Uninitialized);
418 applyVolume(src: audioBuffer, dest: adjusted.data(), len: readLength);
419 actualLength = m_audioSource->write(data: adjusted);
420
421 if (actualLength < qint64(readLength)) {
422 pulseEngine->unlock();
423
424 setError(QAudio::UnderrunError);
425 setState(QAudio::IdleState);
426
427 return actualLength;
428 }
429 } else {
430 actualLength = qMin(a: static_cast<int>(len - readBytes), b: static_cast<int>(readLength));
431 applyVolume(src: audioBuffer, dest: data + readBytes, len: actualLength);
432 }
433
434#ifdef DEBUG_PULSE
435 qDebug() << "QPulseAudioSource::read -- wrote " << actualLength << " to client";
436#endif
437
438 if (actualLength < qint64(readLength)) {
439#ifdef DEBUG_PULSE
440 qDebug() << "QPulseAudioSource::read -- appending " << readLength - actualLength << " bytes of data to temp buffer";
441#endif
442 int diff = readLength - actualLength;
443 int oldSize = m_tempBuffer.size();
444 m_tempBuffer.resize(size: m_tempBuffer.size() + diff);
445 applyVolume(src: static_cast<const char *>(audioBuffer) + actualLength, dest: m_tempBuffer.data() + oldSize, len: diff);
446 QMetaObject::invokeMethod(obj: this, member: "userFeed", c: Qt::QueuedConnection);
447 }
448
449 m_totalTimeValue += actualLength;
450 readBytes += actualLength;
451
452 pa_stream_drop(p: m_stream);
453 pulseEngine->unlock();
454
455 if (!m_pullMode && readBytes >= len)
456 break;
457 }
458
459#ifdef DEBUG_PULSE
460 qDebug() << "QPulseAudioSource::read -- returning after reading " << readBytes << " bytes";
461#endif
462
463 return readBytes;
464}
465
466void QPulseAudioSource::applyVolume(const void *src, void *dest, int len)
467{
468 Q_ASSERT((src && dest) || len == 0);
469 if (m_volume < 1.f)
470 QAudioHelperInternal::qMultiplySamples(factor: m_volume, format: m_format, src, dest, len);
471 else if (len)
472 memcpy(dest: dest, src: src, n: len);
473}
474
475void QPulseAudioSource::resume()
476{
477 if (m_deviceState == QAudio::SuspendedState || m_deviceState == QAudio::IdleState) {
478 QPulseAudioEngine *pulseEngine = QPulseAudioEngine::instance();
479
480 {
481 std::lock_guard lock(*pulseEngine);
482 PAOperationUPtr operation(
483 pa_stream_cork(s: m_stream, b: 0, cb: inputStreamSuccessCallback, userdata: nullptr));
484 pulseEngine->wait(op: operation.get());
485 }
486
487 m_timer->start(msec: m_periodTime);
488
489 setState(QAudio::ActiveState);
490 setError(QAudio::NoError);
491 }
492}
493
494void QPulseAudioSource::setVolume(qreal vol)
495{
496 if (qFuzzyCompare(p1: m_volume, p2: vol))
497 return;
498
499 m_volume = qBound(min: qreal(0), val: vol, max: qreal(1));
500}
501
502qreal QPulseAudioSource::volume() const
503{
504 return m_volume;
505}
506
507void QPulseAudioSource::setBufferSize(qsizetype value)
508{
509 m_bufferSize = value;
510}
511
512qsizetype QPulseAudioSource::bufferSize() const
513{
514 return m_bufferSize;
515}
516
517qint64 QPulseAudioSource::processedUSecs() const
518{
519 if (!m_stream)
520 return 0;
521 pa_usec_t usecs = 0;
522 int result = pa_stream_get_time(s: m_stream, r_usec: &usecs);
523 Q_UNUSED(result);
524// if (result != 0)
525// qWarning() << "no timing info from pulse";
526
527 return usecs;
528}
529
530void QPulseAudioSource::suspend()
531{
532 if (m_deviceState == QAudio::ActiveState) {
533 setError(QAudio::NoError);
534 setState(QAudio::SuspendedState);
535
536 m_timer->stop();
537
538 QPulseAudioEngine *pulseEngine = QPulseAudioEngine::instance();
539
540 std::lock_guard lock(*pulseEngine);
541
542 PAOperationUPtr operation(pa_stream_cork(s: m_stream, b: 1, cb: inputStreamSuccessCallback, userdata: nullptr));
543 pulseEngine->wait(op: operation.get());
544 }
545}
546
547void QPulseAudioSource::userFeed()
548{
549 if (m_deviceState == QAudio::StoppedState || m_deviceState == QAudio::SuspendedState)
550 return;
551#ifdef DEBUG_PULSE
552// QTime now(QTime::currentTime());
553// qDebug()<< now.second() << "s " << now.msec() << "ms :userFeed() IN";
554#endif
555 deviceReady();
556}
557
558bool QPulseAudioSource::deviceReady()
559{
560 if (m_pullMode) {
561 // reads some audio data and writes it to QIODevice
562 read(data: nullptr,len: 0);
563 } else {
564 // emits readyRead() so user will call read() on QIODevice to get some audio data
565 if (m_audioSource != nullptr) {
566 PulseInputPrivate *a = qobject_cast<PulseInputPrivate*>(object: m_audioSource);
567 a->trigger();
568 }
569 }
570 m_bytesAvailable = checkBytesReady();
571
572 if (m_deviceState != QAudio::ActiveState)
573 return true;
574
575 return true;
576}
577
578void QPulseAudioSource::reset()
579{
580 stop();
581 m_bytesAvailable = 0;
582}
583
584void QPulseAudioSource::onPulseContextFailed()
585{
586 close();
587
588 setError(QAudio::FatalError);
589 setState(QAudio::StoppedState);
590}
591
592PulseInputPrivate::PulseInputPrivate(QPulseAudioSource *audio)
593{
594 m_audioDevice = qobject_cast<QPulseAudioSource*>(object: audio);
595}
596
597qint64 PulseInputPrivate::readData(char *data, qint64 len)
598{
599 return m_audioDevice->read(data, len);
600}
601
602qint64 PulseInputPrivate::writeData(const char *data, qint64 len)
603{
604 Q_UNUSED(data);
605 Q_UNUSED(len);
606 return 0;
607}
608
609void PulseInputPrivate::trigger()
610{
611 emit readyRead();
612}
613
614QT_END_NAMESPACE
615
616#include "moc_qpulseaudiosource_p.cpp"
617

source code of qtmultimedia/src/multimedia/pulseaudio/qpulseaudiosource.cpp