1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#include "qpulseaudiosource_p.h"
5
6#include <QtMultimedia/private/qaudiosystem_platform_stream_support_p.h>
7#include <QtMultimedia/private/qpulseaudio_contextmanager_p.h>
8#include <QtMultimedia/private/qpulsehelpers_p.h>
9
10#include <mutex> // for std::lock_guard
11#include <unistd.h>
12
13QT_BEGIN_NAMESPACE
14
15namespace QPulseAudioInternal {
16
17using namespace QtMultimediaPrivate;
18
19QPulseAudioSourceStream::QPulseAudioSourceStream(QAudioDevice device, const QAudioFormat &format,
20 std::optional<qsizetype> ringbufferSize,
21 QPulseAudioSource *parent,
22 float volume,
23 std::optional<int32_t> hardwareBufferSize)
24 : QPlatformAudioSourceStream{
25 std::move(device), format, ringbufferSize, hardwareBufferSize, volume,
26 },
27 m_parent(parent)
28{
29 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
30 pa_sample_spec spec = QPulseAudioInternal::audioFormatToSampleSpec(format);
31 pa_channel_map channel_map = QPulseAudioInternal::channelMapForAudioFormat(format);
32
33 if (!pa_sample_spec_valid(spec: &spec))
34 return;
35
36 const QByteArray streamName =
37 QStringLiteral("QtmPulseStream-%1-%2").arg(a: ::getpid()).arg(a: quintptr(this)).toUtf8();
38
39 if (Q_UNLIKELY(qLcPulseAudioIn().isEnabled(QtDebugMsg))) {
40 qCDebug(qLcPulseAudioIn) << "Format: " << spec.format;
41 qCDebug(qLcPulseAudioIn) << "Rate: " << spec.rate;
42 qCDebug(qLcPulseAudioIn) << "Channels: " << spec.channels;
43 qCDebug(qLcPulseAudioIn) << "Frame size: " << pa_frame_size(spec: &spec);
44 }
45
46 std::lock_guard engineLock{ *pulseEngine };
47
48 m_stream = PAStreamHandle{
49 pa_stream_new(c: pulseEngine->context(), name: streamName.constData(), ss: &spec, map: &channel_map),
50 PAStreamHandle::HasRef,
51 };
52}
53
54QPulseAudioSourceStream::~QPulseAudioSourceStream() = default;
55
56bool QPulseAudioSourceStream::start(QIODevice *device)
57{
58 setQIODevice(device);
59
60 createQIODeviceConnections(device);
61
62 return startStream(StreamType::Ringbuffer);
63}
64
65bool QPulseAudioSourceStream::start(AudioCallback &&audioCallback)
66{
67 m_audioCallback = std::move(audioCallback);
68 return startStream(StreamType::Callback);
69}
70
71QIODevice *QPulseAudioSourceStream::start()
72{
73 QIODevice *device = createRingbufferReaderDevice();
74 bool started = start(device);
75 if (!started)
76 return nullptr;
77
78 return device;
79}
80
81void QPulseAudioSourceStream::stop(ShutdownPolicy shutdownPolicy)
82{
83 requestStop();
84
85 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
86 std::lock_guard engineLock{ *pulseEngine };
87
88 uninstallCallbacks();
89 disconnectQIODeviceConnections();
90
91 if (shutdownPolicy == ShutdownPolicy::DrainRingbuffer) {
92 size_t bytesToRead = pa_stream_readable_size(p: m_stream.get());
93 if (bytesToRead != size_t(-1))
94 readCallbackRingbuffer(bytesToRead);
95 }
96
97 // Note: we need to cork the stream before disconnecting to prevent pulseaudio from deadlocking
98 pa_stream_cork(s: m_stream.get(), b: 1, cb: nullptr, userdata: nullptr);
99
100 pa_stream_disconnect(s: m_stream.get());
101
102 finalizeQIODevice(shutdownPolicy);
103 if (shutdownPolicy == ShutdownPolicy::DiscardRingbuffer)
104 emptyRingbuffer();
105}
106
107void QPulseAudioSourceStream::suspend()
108{
109 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
110 std::lock_guard engineLock{ *pulseEngine };
111
112 pa_stream_cork(s: m_stream.get(), b: 1, cb: nullptr, userdata: nullptr);
113}
114
115void QPulseAudioSourceStream::resume()
116{
117 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
118 std::lock_guard engineLock{ *pulseEngine };
119
120 pa_stream_cork(s: m_stream.get(), b: 0, cb: nullptr, userdata: nullptr);
121}
122
123bool QPulseAudioSourceStream::open() const
124{
125 return bool(m_stream);
126}
127
128void QPulseAudioSourceStream::updateStreamIdle(bool idle)
129{
130 m_parent->updateStreamIdle(idle);
131}
132
133bool QPulseAudioSourceStream::startStream(StreamType streamType)
134{
135 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
136 static const bool serverIsPipewire = [&] {
137 return pulseEngine->serverName().contains(s: u"PulseAudio (on PipeWire");
138 }();
139
140 pa_buffer_attr attr{
141 .maxlength = uint32_t(m_format.bytesForFrames(frameCount: m_hardwareBufferFrames.value_or(u: 1024))),
142 .tlength = uint32_t(-1),
143 .prebuf = uint32_t(-1),
144 .minreq = uint32_t(-1),
145
146 // pulseaudio's vanilla implementation requires us to set a fragment size, otherwise we only
147 // get a single callback every 2-ish seconds.
148 .fragsize = serverIsPipewire
149 ? uint32_t(-1)
150 : uint32_t(m_format.bytesForFrames(frameCount: m_hardwareBufferFrames.value_or(u: 1024))),
151 };
152
153 constexpr pa_stream_flags flags =
154 pa_stream_flags(PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_ADJUST_LATENCY);
155
156 std::lock_guard engineLock{ *pulseEngine };
157 installCallbacks(streamType);
158
159 int status = pa_stream_connect_record(s: m_stream.get(), dev: m_audioDevice.id().data(), attr: &attr, flags);
160 if (status != 0) {
161 qCWarning(qLcPulseAudioOut) << "pa_stream_connect_record() failed!";
162 m_stream = {};
163 return false;
164 }
165 return true;
166}
167
168void QPulseAudioSourceStream::installCallbacks(StreamType streamType)
169{
170 pa_stream_set_overflow_callback(p: m_stream.get(), cb: [](pa_stream *stream, void *data) {
171 auto *self = reinterpret_cast<QPulseAudioSourceStream *>(data);
172 Q_ASSERT(stream == self->m_stream.get());
173 self->underflowCallback();
174 }, userdata: this);
175
176 pa_stream_set_underflow_callback(p: m_stream.get(), cb: [](pa_stream *stream, void *data) {
177 auto *self = reinterpret_cast<QPulseAudioSourceStream *>(data);
178 Q_ASSERT(stream == self->m_stream.get());
179 self->overflowCallback();
180 }, userdata: this);
181
182 pa_stream_set_state_callback(s: m_stream.get(), cb: [](pa_stream *stream, void *data) {
183 auto *self = reinterpret_cast<QPulseAudioSourceStream *>(data);
184 Q_ASSERT(stream == self->m_stream.get());
185 self->stateCallback();
186 }, userdata: this);
187
188 switch (streamType) {
189 case StreamType::Ringbuffer: {
190 pa_stream_set_read_callback(p: m_stream.get(),
191 cb: [](pa_stream *stream, size_t nbytes, void *data) {
192 auto *self = reinterpret_cast<QPulseAudioSourceStream *>(data);
193 Q_ASSERT(stream == self->m_stream.get());
194 self->readCallbackRingbuffer(bytesToRead: nbytes);
195 }, userdata: this);
196 break;
197 }
198 case StreamType::Callback: {
199 pa_stream_set_read_callback(p: m_stream.get(),
200 cb: [](pa_stream *stream, size_t nbytes, void *data) {
201 auto *self = reinterpret_cast<QPulseAudioSourceStream *>(data);
202 Q_ASSERT(stream == self->m_stream.get());
203 self->readCallbackAudioCallback(bytesToRead: nbytes);
204 }, userdata: this);
205 break;
206 }
207 }
208
209 pa_stream_set_latency_update_callback(p: m_stream.get(), cb: [](pa_stream *stream, void *data) {
210 auto *self = reinterpret_cast<QPulseAudioSourceStream *>(data);
211 Q_ASSERT(stream == self->m_stream.get());
212 self->latencyUpdateCallback();
213 }, userdata: this);
214}
215
216void QPulseAudioSourceStream::uninstallCallbacks()
217{
218 pa_stream_set_overflow_callback(p: m_stream.get(), cb: nullptr, userdata: nullptr);
219 pa_stream_set_underflow_callback(p: m_stream.get(), cb: nullptr, userdata: nullptr);
220 pa_stream_set_state_callback(s: m_stream.get(), cb: nullptr, userdata: nullptr);
221 pa_stream_set_read_callback(p: m_stream.get(), cb: nullptr, userdata: nullptr);
222 pa_stream_set_latency_update_callback(p: m_stream.get(), cb: nullptr, userdata: nullptr);
223}
224
225void QPulseAudioSourceStream::readCallbackRingbuffer([[maybe_unused]] size_t bytesToRead)
226{
227 const void *data{};
228 size_t nBytes{};
229 int status = pa_stream_peek(p: m_stream.get(), data: &data, nbytes: &nBytes);
230 if (status < 0) {
231 invokeOnAppThread(f: [this] {
232 handleIOError(parent: m_parent);
233 });
234 return;
235 }
236
237 QSpan<const std::byte> hostBuffer{
238 reinterpret_cast<const std::byte *>(data),
239 qsizetype(nBytes),
240 };
241
242 uint32_t numberOfFrames = m_format.framesForBytes(byteCount: nBytes);
243
244 [[maybe_unused]] uint64_t framesWritten =
245 QPlatformAudioSourceStream::process(hostBuffer, numberOfFrames);
246 status = pa_stream_drop(p: m_stream.get());
247 if (status < 0) {
248 if (!isStopRequested()) {
249 invokeOnAppThread(f: [this] {
250 handleIOError(parent: m_parent);
251 });
252 }
253 }
254}
255
256void QPulseAudioSourceStream::readCallbackAudioCallback([[maybe_unused]] size_t bytesToRead)
257{
258 const void *data{};
259 size_t nBytes{};
260 int status = pa_stream_peek(p: m_stream.get(), data: &data, nbytes: &nBytes);
261 if (status < 0) {
262 QMetaObject::invokeMethod(object: m_parent, function: [this] {
263 handleIOError(parent: m_parent);
264 });
265 return;
266 }
267
268 QSpan<const std::byte> hostBuffer{
269 reinterpret_cast<const std::byte *>(data),
270 qsizetype(nBytes),
271 };
272
273 runAudioCallback(audioCallback&: *m_audioCallback, hostBuffer, format: m_format, volume: volume());
274
275 status = pa_stream_drop(p: m_stream.get());
276 if (status < 0) {
277 if (!isStopRequested()) {
278 QMetaObject::invokeMethod(object: m_parent, function: [this] {
279 handleIOError(parent: m_parent);
280 });
281 }
282 }
283}
284
285////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
286
287QPulseAudioSource::QPulseAudioSource(QAudioDevice device, const QAudioFormat &format,
288 QObject *parent)
289 : BaseClass(std::move(device), format, parent)
290{
291}
292
293bool QPulseAudioSource::validatePulseaudio()
294{
295 QPulseAudioContextManager *pulseEngine = QPulseAudioContextManager::instance();
296 if (!pulseEngine->contextIsGood()) {
297 qWarning() << "Invalid PulseAudio context:" << pulseEngine->getContextState();
298 setError(QtAudio::Error::FatalError);
299 return false;
300 }
301 return true;
302}
303
304void QPulseAudioSource::start(QIODevice *device)
305{
306 if (!validatePulseaudio())
307 return;
308 return BaseClass::start(device);
309}
310
311void QPulseAudioSource::start(AudioCallback &&cb)
312{
313 if (!validatePulseaudio())
314 return;
315 return BaseClass::start(audioCallback: std::move(cb));
316}
317
318QIODevice *QPulseAudioSource::start()
319{
320 if (!validatePulseaudio())
321 return nullptr;
322 return BaseClass::start();
323}
324
325} // namespace QPulseAudioInternal
326
327QT_END_NAMESPACE
328

source code of qtmultimedia/src/multimedia/pulseaudio/qpulseaudiosource.cpp