1extern crate alsa;
2extern crate libc;
3
4use self::alsa::poll::Descriptors;
5use crate::traits::{DeviceTrait, HostTrait, StreamTrait};
6use crate::{
7 BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data,
8 DefaultStreamConfigError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo,
9 PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError,
10 SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange,
11 SupportedStreamConfigsError,
12};
13use std::cmp;
14use std::convert::TryInto;
15use std::sync::{Arc, Mutex};
16use std::thread::{self, JoinHandle};
17use std::time::Duration;
18use std::vec::IntoIter as VecIntoIter;
19
20pub use self::enumerate::{default_input_device, default_output_device, Devices};
21
22pub type SupportedInputConfigs = VecIntoIter<SupportedStreamConfigRange>;
23pub type SupportedOutputConfigs = VecIntoIter<SupportedStreamConfigRange>;
24
25mod enumerate;
26
27/// The default linux, dragonfly, freebsd and netbsd host type.
28#[derive(Debug)]
29pub struct Host;
30
31impl Host {
32 pub fn new() -> Result<Self, crate::HostUnavailable> {
33 Ok(Host)
34 }
35}
36
37impl HostTrait for Host {
38 type Devices = Devices;
39 type Device = Device;
40
41 fn is_available() -> bool {
42 // Assume ALSA is always available on linux/dragonfly/freebsd/netbsd.
43 true
44 }
45
46 fn devices(&self) -> Result<Self::Devices, DevicesError> {
47 Devices::new()
48 }
49
50 fn default_input_device(&self) -> Option<Self::Device> {
51 default_input_device()
52 }
53
54 fn default_output_device(&self) -> Option<Self::Device> {
55 default_output_device()
56 }
57}
58
59impl DeviceTrait for Device {
60 type SupportedInputConfigs = SupportedInputConfigs;
61 type SupportedOutputConfigs = SupportedOutputConfigs;
62 type Stream = Stream;
63
64 fn name(&self) -> Result<String, DeviceNameError> {
65 Device::name(self)
66 }
67
68 fn supported_input_configs(
69 &self,
70 ) -> Result<Self::SupportedInputConfigs, SupportedStreamConfigsError> {
71 Device::supported_input_configs(self)
72 }
73
74 fn supported_output_configs(
75 &self,
76 ) -> Result<Self::SupportedOutputConfigs, SupportedStreamConfigsError> {
77 Device::supported_output_configs(self)
78 }
79
80 fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
81 Device::default_input_config(self)
82 }
83
84 fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
85 Device::default_output_config(self)
86 }
87
88 fn build_input_stream_raw<D, E>(
89 &self,
90 conf: &StreamConfig,
91 sample_format: SampleFormat,
92 data_callback: D,
93 error_callback: E,
94 timeout: Option<Duration>,
95 ) -> Result<Self::Stream, BuildStreamError>
96 where
97 D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
98 E: FnMut(StreamError) + Send + 'static,
99 {
100 let stream_inner =
101 self.build_stream_inner(conf, sample_format, alsa::Direction::Capture)?;
102 let stream = Stream::new_input(
103 Arc::new(stream_inner),
104 data_callback,
105 error_callback,
106 timeout,
107 );
108 Ok(stream)
109 }
110
111 fn build_output_stream_raw<D, E>(
112 &self,
113 conf: &StreamConfig,
114 sample_format: SampleFormat,
115 data_callback: D,
116 error_callback: E,
117 timeout: Option<Duration>,
118 ) -> Result<Self::Stream, BuildStreamError>
119 where
120 D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
121 E: FnMut(StreamError) + Send + 'static,
122 {
123 let stream_inner =
124 self.build_stream_inner(conf, sample_format, alsa::Direction::Playback)?;
125 let stream = Stream::new_output(
126 Arc::new(stream_inner),
127 data_callback,
128 error_callback,
129 timeout,
130 );
131 Ok(stream)
132 }
133}
134
135struct TriggerSender(libc::c_int);
136
137struct TriggerReceiver(libc::c_int);
138
139impl TriggerSender {
140 fn wakeup(&self) {
141 let buf: u64 = 1u64;
142 let ret: isize = unsafe { libc::write(self.0, &buf as *const u64 as *const _, count:8) };
143 assert_eq!(ret, 8);
144 }
145}
146
147impl TriggerReceiver {
148 fn clear_pipe(&self) {
149 let mut out: u64 = 0u64;
150 let ret: isize = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, count:8) };
151 assert_eq!(ret, 8);
152 }
153}
154
155fn trigger() -> (TriggerSender, TriggerReceiver) {
156 let mut fds: [i32; 2] = [0, 0];
157 match unsafe { libc::pipe(fds:fds.as_mut_ptr()) } {
158 0 => (TriggerSender(fds[1]), TriggerReceiver(fds[0])),
159 _ => panic!("Could not create pipe"),
160 }
161}
162
163impl Drop for TriggerSender {
164 fn drop(&mut self) {
165 unsafe {
166 libc::close(self.0);
167 }
168 }
169}
170
171impl Drop for TriggerReceiver {
172 fn drop(&mut self) {
173 unsafe {
174 libc::close(self.0);
175 }
176 }
177}
178
179#[derive(Default)]
180struct DeviceHandles {
181 playback: Option<alsa::PCM>,
182 capture: Option<alsa::PCM>,
183}
184
185impl DeviceHandles {
186 /// Create `DeviceHandles` for `name` and try to open a handle for both
187 /// directions. Returns `Ok` if either direction is opened successfully.
188 fn open(name: &str) -> Result<Self, alsa::Error> {
189 let mut handles = Self::default();
190 let playback_err = handles.try_open(name, alsa::Direction::Playback).err();
191 let capture_err = handles.try_open(name, alsa::Direction::Capture).err();
192 if let Some(err) = capture_err.and(playback_err) {
193 Err(err)
194 } else {
195 Ok(handles)
196 }
197 }
198
199 /// Get a mutable reference to the `Option` for a specific `stream_type`.
200 /// If the `Option` is `None`, the `alsa::PCM` will be opened and placed in
201 /// the `Option` before returning. If `handle_mut()` returns `Ok` the contained
202 /// `Option` is guaranteed to be `Some(..)`.
203 fn try_open(
204 &mut self,
205 name: &str,
206 stream_type: alsa::Direction,
207 ) -> Result<&mut Option<alsa::PCM>, alsa::Error> {
208 let handle = match stream_type {
209 alsa::Direction::Playback => &mut self.playback,
210 alsa::Direction::Capture => &mut self.capture,
211 };
212
213 if handle.is_none() {
214 *handle = Some(alsa::pcm::PCM::new(name, stream_type, true)?);
215 }
216
217 Ok(handle)
218 }
219
220 /// Get a mutable reference to the `alsa::PCM` handle for a specific `stream_type`.
221 /// If the handle is not yet opened, it will be opened and stored in `self`.
222 fn get_mut(
223 &mut self,
224 name: &str,
225 stream_type: alsa::Direction,
226 ) -> Result<&mut alsa::PCM, alsa::Error> {
227 Ok(self.try_open(name, stream_type)?.as_mut().unwrap())
228 }
229
230 /// Take ownership of the `alsa::PCM` handle for a specific `stream_type`.
231 /// If the handle is not yet opened, it will be opened and returned.
232 fn take(&mut self, name: &str, stream_type: alsa::Direction) -> Result<alsa::PCM, alsa::Error> {
233 Ok(self.try_open(name, stream_type)?.take().unwrap())
234 }
235}
236
237#[derive(Clone)]
238pub struct Device {
239 name: String,
240 handles: Arc<Mutex<DeviceHandles>>,
241}
242
243impl Device {
244 fn build_stream_inner(
245 &self,
246 conf: &StreamConfig,
247 sample_format: SampleFormat,
248 stream_type: alsa::Direction,
249 ) -> Result<StreamInner, BuildStreamError> {
250 let handle_result = self
251 .handles
252 .lock()
253 .unwrap()
254 .take(&self.name, stream_type)
255 .map_err(|e| (e, e.errno()));
256
257 let handle = match handle_result {
258 Err((_, libc::EBUSY)) => return Err(BuildStreamError::DeviceNotAvailable),
259 Err((_, libc::EINVAL)) => return Err(BuildStreamError::InvalidArgument),
260 Err((e, _)) => return Err(e.into()),
261 Ok(handle) => handle,
262 };
263 let can_pause = set_hw_params_from_format(&handle, conf, sample_format)?;
264 let period_len = set_sw_params_from_format(&handle, conf, stream_type)?;
265
266 handle.prepare()?;
267
268 let num_descriptors = handle.count();
269 if num_descriptors == 0 {
270 let description = "poll descriptor count for stream was 0".to_string();
271 let err = BackendSpecificError { description };
272 return Err(err.into());
273 }
274
275 // Check to see if we can retrieve valid timestamps from the device.
276 // Related: https://bugs.freedesktop.org/show_bug.cgi?id=88503
277 let ts = handle.status()?.get_htstamp();
278 let creation_instant = match (ts.tv_sec, ts.tv_nsec) {
279 (0, 0) => Some(std::time::Instant::now()),
280 _ => None,
281 };
282
283 if let alsa::Direction::Capture = stream_type {
284 handle.start()?;
285 }
286
287 let stream_inner = StreamInner {
288 channel: handle,
289 sample_format,
290 num_descriptors,
291 conf: conf.clone(),
292 period_len,
293 can_pause,
294 creation_instant,
295 };
296
297 Ok(stream_inner)
298 }
299
300 #[inline]
301 fn name(&self) -> Result<String, DeviceNameError> {
302 Ok(self.name.clone())
303 }
304
305 fn supported_configs(
306 &self,
307 stream_t: alsa::Direction,
308 ) -> Result<VecIntoIter<SupportedStreamConfigRange>, SupportedStreamConfigsError> {
309 let mut guard = self.handles.lock().unwrap();
310 let handle_result = guard
311 .get_mut(&self.name, stream_t)
312 .map_err(|e| (e, e.errno()));
313
314 let handle = match handle_result {
315 Err((_, libc::ENOENT)) | Err((_, libc::EBUSY)) => {
316 return Err(SupportedStreamConfigsError::DeviceNotAvailable)
317 }
318 Err((_, libc::EINVAL)) => return Err(SupportedStreamConfigsError::InvalidArgument),
319 Err((e, _)) => return Err(e.into()),
320 Ok(handle) => handle,
321 };
322
323 let hw_params = alsa::pcm::HwParams::any(handle)?;
324
325 // TODO: check endianness
326 const FORMATS: [(SampleFormat, alsa::pcm::Format); 8] = [
327 (SampleFormat::I8, alsa::pcm::Format::S8),
328 (SampleFormat::U8, alsa::pcm::Format::U8),
329 (SampleFormat::I16, alsa::pcm::Format::S16LE),
330 //SND_PCM_FORMAT_S16_BE,
331 (SampleFormat::U16, alsa::pcm::Format::U16LE),
332 //SND_PCM_FORMAT_U16_BE,
333 //SND_PCM_FORMAT_S24_LE,
334 //SND_PCM_FORMAT_S24_BE,
335 //SND_PCM_FORMAT_U24_LE,
336 //SND_PCM_FORMAT_U24_BE,
337 (SampleFormat::I32, alsa::pcm::Format::S32LE),
338 //SND_PCM_FORMAT_S32_BE,
339 (SampleFormat::U32, alsa::pcm::Format::U32LE),
340 //SND_PCM_FORMAT_U32_BE,
341 (SampleFormat::F32, alsa::pcm::Format::FloatLE),
342 //SND_PCM_FORMAT_FLOAT_BE,
343 (SampleFormat::F64, alsa::pcm::Format::Float64LE),
344 //SND_PCM_FORMAT_FLOAT64_BE,
345 //SND_PCM_FORMAT_IEC958_SUBFRAME_LE,
346 //SND_PCM_FORMAT_IEC958_SUBFRAME_BE,
347 //SND_PCM_FORMAT_MU_LAW,
348 //SND_PCM_FORMAT_A_LAW,
349 //SND_PCM_FORMAT_IMA_ADPCM,
350 //SND_PCM_FORMAT_MPEG,
351 //SND_PCM_FORMAT_GSM,
352 //SND_PCM_FORMAT_SPECIAL,
353 //SND_PCM_FORMAT_S24_3LE,
354 //SND_PCM_FORMAT_S24_3BE,
355 //SND_PCM_FORMAT_U24_3LE,
356 //SND_PCM_FORMAT_U24_3BE,
357 //SND_PCM_FORMAT_S20_3LE,
358 //SND_PCM_FORMAT_S20_3BE,
359 //SND_PCM_FORMAT_U20_3LE,
360 //SND_PCM_FORMAT_U20_3BE,
361 //SND_PCM_FORMAT_S18_3LE,
362 //SND_PCM_FORMAT_S18_3BE,
363 //SND_PCM_FORMAT_U18_3LE,
364 //SND_PCM_FORMAT_U18_3BE,
365 ];
366
367 let mut supported_formats = Vec::new();
368 for &(sample_format, alsa_format) in FORMATS.iter() {
369 if hw_params.test_format(alsa_format).is_ok() {
370 supported_formats.push(sample_format);
371 }
372 }
373
374 let min_rate = hw_params.get_rate_min()?;
375 let max_rate = hw_params.get_rate_max()?;
376
377 let sample_rates = if min_rate == max_rate || hw_params.test_rate(min_rate + 1).is_ok() {
378 vec![(min_rate, max_rate)]
379 } else {
380 const RATES: [libc::c_uint; 13] = [
381 5512, 8000, 11025, 16000, 22050, 32000, 44100, 48000, 64000, 88200, 96000, 176400,
382 192000,
383 ];
384
385 let mut rates = Vec::new();
386 for &rate in RATES.iter() {
387 if hw_params.test_rate(rate).is_ok() {
388 rates.push((rate, rate));
389 }
390 }
391
392 if rates.is_empty() {
393 vec![(min_rate, max_rate)]
394 } else {
395 rates
396 }
397 };
398
399 let min_channels = hw_params.get_channels_min()?;
400 let max_channels = hw_params.get_channels_max()?;
401
402 let max_channels = cmp::min(max_channels, 32); // TODO: limiting to 32 channels or too much stuff is returned
403 let supported_channels = (min_channels..max_channels + 1)
404 .filter_map(|num| {
405 if hw_params.test_channels(num).is_ok() {
406 Some(num as ChannelCount)
407 } else {
408 None
409 }
410 })
411 .collect::<Vec<_>>();
412
413 let min_buffer_size = hw_params.get_buffer_size_min()?;
414 let max_buffer_size = hw_params.get_buffer_size_max()?;
415
416 let buffer_size_range = SupportedBufferSize::Range {
417 min: min_buffer_size as u32,
418 max: max_buffer_size as u32,
419 };
420
421 let mut output = Vec::with_capacity(
422 supported_formats.len() * supported_channels.len() * sample_rates.len(),
423 );
424 for &sample_format in supported_formats.iter() {
425 for &channels in supported_channels.iter() {
426 for &(min_rate, max_rate) in sample_rates.iter() {
427 output.push(SupportedStreamConfigRange {
428 channels,
429 min_sample_rate: SampleRate(min_rate as u32),
430 max_sample_rate: SampleRate(max_rate as u32),
431 buffer_size: buffer_size_range.clone(),
432 sample_format,
433 });
434 }
435 }
436 }
437
438 Ok(output.into_iter())
439 }
440
441 fn supported_input_configs(
442 &self,
443 ) -> Result<SupportedInputConfigs, SupportedStreamConfigsError> {
444 self.supported_configs(alsa::Direction::Capture)
445 }
446
447 fn supported_output_configs(
448 &self,
449 ) -> Result<SupportedOutputConfigs, SupportedStreamConfigsError> {
450 self.supported_configs(alsa::Direction::Playback)
451 }
452
453 // ALSA does not offer default stream formats, so instead we compare all supported formats by
454 // the `SupportedStreamConfigRange::cmp_default_heuristics` order and select the greatest.
455 fn default_config(
456 &self,
457 stream_t: alsa::Direction,
458 ) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
459 let mut formats: Vec<_> = {
460 match self.supported_configs(stream_t) {
461 Err(SupportedStreamConfigsError::DeviceNotAvailable) => {
462 return Err(DefaultStreamConfigError::DeviceNotAvailable);
463 }
464 Err(SupportedStreamConfigsError::InvalidArgument) => {
465 // this happens sometimes when querying for input and output capabilities, but
466 // the device supports only one
467 return Err(DefaultStreamConfigError::StreamTypeNotSupported);
468 }
469 Err(SupportedStreamConfigsError::BackendSpecific { err }) => {
470 return Err(err.into());
471 }
472 Ok(fmts) => fmts.collect(),
473 }
474 };
475
476 formats.sort_by(|a, b| a.cmp_default_heuristics(b));
477
478 match formats.into_iter().last() {
479 Some(f) => {
480 let min_r = f.min_sample_rate;
481 let max_r = f.max_sample_rate;
482 let mut format = f.with_max_sample_rate();
483 const HZ_44100: SampleRate = SampleRate(44_100);
484 if min_r <= HZ_44100 && HZ_44100 <= max_r {
485 format.sample_rate = HZ_44100;
486 }
487 Ok(format)
488 }
489 None => Err(DefaultStreamConfigError::StreamTypeNotSupported),
490 }
491 }
492
493 fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
494 self.default_config(alsa::Direction::Capture)
495 }
496
497 fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
498 self.default_config(alsa::Direction::Playback)
499 }
500}
501
502struct StreamInner {
503 // The ALSA channel.
504 channel: alsa::pcm::PCM,
505
506 // When converting between file descriptors and `snd_pcm_t`, this is the number of
507 // file descriptors that this `snd_pcm_t` uses.
508 num_descriptors: usize,
509
510 // Format of the samples.
511 sample_format: SampleFormat,
512
513 // The configuration used to open this stream.
514 conf: StreamConfig,
515
516 // Minimum number of samples to put in the buffer.
517 period_len: usize,
518
519 #[allow(dead_code)]
520 // Whether or not the hardware supports pausing the stream.
521 // TODO: We need an API to expose this. See #197, #284.
522 can_pause: bool,
523
524 // In the case that the device does not return valid timestamps via `get_htstamp`, this field
525 // will be `Some` and will contain an `Instant` representing the moment the stream was created.
526 //
527 // If this field is `Some`, then the stream will use the duration since this instant as a
528 // source for timestamps.
529 //
530 // If this field is `None` then the elapsed duration between `get_trigger_htstamp` and
531 // `get_htstamp` is used.
532 creation_instant: Option<std::time::Instant>,
533}
534
535// Assume that the ALSA library is built with thread safe option.
536unsafe impl Sync for StreamInner {}
537
538#[derive(Debug, Eq, PartialEq)]
539enum StreamType {
540 Input,
541 Output,
542}
543
544pub struct Stream {
545 /// The high-priority audio processing thread calling callbacks.
546 /// Option used for moving out in destructor.
547 thread: Option<JoinHandle<()>>,
548
549 /// Handle to the underlying stream for playback controls.
550 inner: Arc<StreamInner>,
551
552 /// Used to signal to stop processing.
553 trigger: TriggerSender,
554}
555
556struct StreamWorkerContext {
557 descriptors: Vec<libc::pollfd>,
558 buffer: Vec<u8>,
559 poll_timeout: i32,
560}
561
562impl StreamWorkerContext {
563 fn new(poll_timeout: &Option<Duration>) -> Self {
564 let poll_timeout: i32 = if let Some(d: &Duration) = poll_timeout {
565 d.as_millis().try_into().unwrap()
566 } else {
567 -1
568 };
569
570 Self {
571 descriptors: Vec::new(),
572 buffer: Vec::new(),
573 poll_timeout,
574 }
575 }
576}
577
578fn input_stream_worker(
579 rx: TriggerReceiver,
580 stream: &StreamInner,
581 data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
582 error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
583 timeout: Option<Duration>,
584) {
585 let mut ctxt = StreamWorkerContext::new(&timeout);
586 loop {
587 let flow =
588 poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| {
589 error_callback(err.into());
590 PollDescriptorsFlow::Continue
591 });
592
593 match flow {
594 PollDescriptorsFlow::Continue => {
595 continue;
596 }
597 PollDescriptorsFlow::XRun => {
598 if let Err(err) = stream.channel.prepare() {
599 error_callback(err.into());
600 }
601 continue;
602 }
603 PollDescriptorsFlow::Return => return,
604 PollDescriptorsFlow::Ready {
605 status,
606 avail_frames: _,
607 delay_frames,
608 stream_type,
609 } => {
610 assert_eq!(
611 stream_type,
612 StreamType::Input,
613 "expected input stream, but polling descriptors indicated output",
614 );
615 if let Err(err) = process_input(
616 stream,
617 &mut ctxt.buffer,
618 status,
619 delay_frames,
620 data_callback,
621 ) {
622 error_callback(err.into());
623 }
624 }
625 }
626 }
627}
628
629fn output_stream_worker(
630 rx: TriggerReceiver,
631 stream: &StreamInner,
632 data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
633 error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
634 timeout: Option<Duration>,
635) {
636 let mut ctxt = StreamWorkerContext::new(&timeout);
637 loop {
638 let flow =
639 poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| {
640 error_callback(err.into());
641 PollDescriptorsFlow::Continue
642 });
643
644 match flow {
645 PollDescriptorsFlow::Continue => continue,
646 PollDescriptorsFlow::XRun => {
647 if let Err(err) = stream.channel.prepare() {
648 error_callback(err.into());
649 }
650 continue;
651 }
652 PollDescriptorsFlow::Return => return,
653 PollDescriptorsFlow::Ready {
654 status,
655 avail_frames,
656 delay_frames,
657 stream_type,
658 } => {
659 assert_eq!(
660 stream_type,
661 StreamType::Output,
662 "expected output stream, but polling descriptors indicated input",
663 );
664 if let Err(err) = process_output(
665 stream,
666 &mut ctxt.buffer,
667 status,
668 avail_frames,
669 delay_frames,
670 data_callback,
671 error_callback,
672 ) {
673 error_callback(err.into());
674 }
675 }
676 }
677 }
678}
679
680enum PollDescriptorsFlow {
681 Continue,
682 Return,
683 Ready {
684 stream_type: StreamType,
685 status: alsa::pcm::Status,
686 avail_frames: usize,
687 delay_frames: usize,
688 },
689 XRun,
690}
691
692// This block is shared between both input and output stream worker functions.
693fn poll_descriptors_and_prepare_buffer(
694 rx: &TriggerReceiver,
695 stream: &StreamInner,
696 ctxt: &mut StreamWorkerContext,
697) -> Result<PollDescriptorsFlow, BackendSpecificError> {
698 let StreamWorkerContext {
699 ref mut descriptors,
700 ref mut buffer,
701 ref poll_timeout,
702 } = *ctxt;
703
704 descriptors.clear();
705
706 // Add the self-pipe for signaling termination.
707 descriptors.push(libc::pollfd {
708 fd: rx.0,
709 events: libc::POLLIN,
710 revents: 0,
711 });
712
713 // Add ALSA polling fds.
714 let len = descriptors.len();
715 descriptors.resize(
716 stream.num_descriptors + len,
717 libc::pollfd {
718 fd: 0,
719 events: 0,
720 revents: 0,
721 },
722 );
723 let filled = stream.channel.fill(&mut descriptors[len..])?;
724 debug_assert_eq!(filled, stream.num_descriptors);
725
726 // Don't timeout, wait forever.
727 let res = alsa::poll::poll(descriptors, *poll_timeout)?;
728 if res == 0 {
729 let description = String::from("`alsa::poll()` spuriously returned");
730 return Err(BackendSpecificError { description });
731 }
732
733 if descriptors[0].revents != 0 {
734 // The stream has been requested to be destroyed.
735 rx.clear_pipe();
736 return Ok(PollDescriptorsFlow::Return);
737 }
738
739 let revents = stream.channel.revents(&descriptors[1..])?;
740 if revents.contains(alsa::poll::Flags::ERR) {
741 let description = String::from("`alsa::poll()` returned POLLERR");
742 return Err(BackendSpecificError { description });
743 }
744 let stream_type = match revents {
745 alsa::poll::Flags::OUT => StreamType::Output,
746 alsa::poll::Flags::IN => StreamType::Input,
747 _ => {
748 // Nothing to process, poll again
749 return Ok(PollDescriptorsFlow::Continue);
750 }
751 };
752
753 let status = stream.channel.status()?;
754 let avail_frames = match stream.channel.avail() {
755 Err(err) if err.errno() == libc::EPIPE => return Ok(PollDescriptorsFlow::XRun),
756 res => res,
757 }? as usize;
758 let delay_frames = match status.get_delay() {
759 // Buffer underrun. TODO: Notify the user.
760 d if d < 0 => 0,
761 d => d as usize,
762 };
763 let available_samples = avail_frames * stream.conf.channels as usize;
764
765 // Only go on if there is at least `stream.period_len` samples.
766 if available_samples < stream.period_len {
767 return Ok(PollDescriptorsFlow::Continue);
768 }
769
770 // Prepare the data buffer.
771 let buffer_size = stream.sample_format.sample_size() * available_samples;
772 buffer.resize(buffer_size, 0u8);
773
774 Ok(PollDescriptorsFlow::Ready {
775 stream_type,
776 status,
777 avail_frames,
778 delay_frames,
779 })
780}
781
782// Read input data from ALSA and deliver it to the user.
783fn process_input(
784 stream: &StreamInner,
785 buffer: &mut [u8],
786 status: alsa::pcm::Status,
787 delay_frames: usize,
788 data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
789) -> Result<(), BackendSpecificError> {
790 stream.channel.io_bytes().readi(buf:buffer)?;
791 let sample_format: SampleFormat = stream.sample_format;
792 let data: *mut () = buffer.as_mut_ptr() as *mut ();
793 let len: usize = buffer.len() / sample_format.sample_size();
794 let data: Data = unsafe { Data::from_parts(data, len, sample_format) };
795 let callback: StreamInstant = stream_timestamp(&status, stream.creation_instant)?;
796 let delay_duration: Duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
797 let capture: StreamInstant = callback
798 .sub(delay_duration)
799 .expect(msg:"`capture` is earlier than representation supported by `StreamInstant`");
800 let timestamp: InputStreamTimestamp = crate::InputStreamTimestamp { callback, capture };
801 let info: InputCallbackInfo = crate::InputCallbackInfo { timestamp };
802 data_callback(&data, &info);
803
804 Ok(())
805}
806
807// Request data from the user's function and write it via ALSA.
808//
809// Returns `true`
810fn process_output(
811 stream: &StreamInner,
812 buffer: &mut [u8],
813 status: alsa::pcm::Status,
814 available_frames: usize,
815 delay_frames: usize,
816 data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
817 error_callback: &mut dyn FnMut(StreamError),
818) -> Result<(), BackendSpecificError> {
819 {
820 // We're now sure that we're ready to write data.
821 let sample_format = stream.sample_format;
822 let data = buffer.as_mut_ptr() as *mut ();
823 let len = buffer.len() / sample_format.sample_size();
824 let mut data = unsafe { Data::from_parts(data, len, sample_format) };
825 let callback = stream_timestamp(&status, stream.creation_instant)?;
826 let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
827 let playback = callback
828 .add(delay_duration)
829 .expect("`playback` occurs beyond representation supported by `StreamInstant`");
830 let timestamp = crate::OutputStreamTimestamp { callback, playback };
831 let info = crate::OutputCallbackInfo { timestamp };
832 data_callback(&mut data, &info);
833 }
834 loop {
835 match stream.channel.io_bytes().writei(buffer) {
836 Err(err) if err.errno() == libc::EPIPE => {
837 // buffer underrun
838 // TODO: Notify the user of this.
839 let _ = stream.channel.try_recover(err, false);
840 }
841 Err(err) => {
842 error_callback(err.into());
843 continue;
844 }
845 Ok(result) if result != available_frames => {
846 let description = format!(
847 "unexpected number of frames written: expected {}, \
848 result {} (this should never happen)",
849 available_frames, result,
850 );
851 error_callback(BackendSpecificError { description }.into());
852 continue;
853 }
854 _ => {
855 break;
856 }
857 }
858 }
859 Ok(())
860}
861
862// Use the elapsed duration since the start of the stream.
863//
864// This ensures positive values that are compatible with our `StreamInstant` representation.
865fn stream_timestamp(
866 status: &alsa::pcm::Status,
867 creation_instant: Option<std::time::Instant>,
868) -> Result<crate::StreamInstant, BackendSpecificError> {
869 match creation_instant {
870 None => {
871 let trigger_ts: timespec = status.get_trigger_htstamp();
872 let ts: timespec = status.get_htstamp();
873 let nanos: i64 = timespec_diff_nanos(a:ts, b:trigger_ts);
874 if nanos < 0 {
875 panic!(
876 "get_htstamp `{}.{}` was earlier than get_trigger_htstamp `{}.{}`",
877 ts.tv_sec, ts.tv_nsec, trigger_ts.tv_sec, trigger_ts.tv_nsec
878 );
879 }
880 Ok(crate::StreamInstant::from_nanos(nanos))
881 }
882 Some(creation: Instant) => {
883 let now: Instant = std::time::Instant::now();
884 let duration: Duration = now.duration_since(earlier:creation);
885 let instant: StreamInstant = crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128)
886 .expect(msg:"stream duration has exceeded `StreamInstant` representation");
887 Ok(instant)
888 }
889 }
890}
891
892// Adapted from `timestamp2ns` here:
893// https://fossies.org/linux/alsa-lib/test/audio_time.c
894fn timespec_to_nanos(ts: libc::timespec) -> i64 {
895 ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec as i64
896}
897
898// Adapted from `timediff` here:
899// https://fossies.org/linux/alsa-lib/test/audio_time.c
900fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 {
901 timespec_to_nanos(ts:a) - timespec_to_nanos(ts:b)
902}
903
904// Convert the given duration in frames at the given sample rate to a `std::time::Duration`.
905fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration {
906 let secsf: f64 = frames as f64 / rate.0 as f64;
907 let secs: u64 = secsf as u64;
908 let nanos: u32 = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
909 std::time::Duration::new(secs, nanos)
910}
911
912impl Stream {
913 fn new_input<D, E>(
914 inner: Arc<StreamInner>,
915 mut data_callback: D,
916 mut error_callback: E,
917 timeout: Option<Duration>,
918 ) -> Stream
919 where
920 D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
921 E: FnMut(StreamError) + Send + 'static,
922 {
923 let (tx, rx) = trigger();
924 // Clone the handle for passing into worker thread.
925 let stream = inner.clone();
926 let thread = thread::Builder::new()
927 .name("cpal_alsa_in".to_owned())
928 .spawn(move || {
929 input_stream_worker(
930 rx,
931 &stream,
932 &mut data_callback,
933 &mut error_callback,
934 timeout,
935 );
936 })
937 .unwrap();
938 Stream {
939 thread: Some(thread),
940 inner,
941 trigger: tx,
942 }
943 }
944
945 fn new_output<D, E>(
946 inner: Arc<StreamInner>,
947 mut data_callback: D,
948 mut error_callback: E,
949 timeout: Option<Duration>,
950 ) -> Stream
951 where
952 D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
953 E: FnMut(StreamError) + Send + 'static,
954 {
955 let (tx, rx) = trigger();
956 // Clone the handle for passing into worker thread.
957 let stream = inner.clone();
958 let thread = thread::Builder::new()
959 .name("cpal_alsa_out".to_owned())
960 .spawn(move || {
961 output_stream_worker(
962 rx,
963 &stream,
964 &mut data_callback,
965 &mut error_callback,
966 timeout,
967 );
968 })
969 .unwrap();
970 Stream {
971 thread: Some(thread),
972 inner,
973 trigger: tx,
974 }
975 }
976}
977
978impl Drop for Stream {
979 fn drop(&mut self) {
980 self.trigger.wakeup();
981 self.thread.take().unwrap().join().unwrap();
982 }
983}
984
985impl StreamTrait for Stream {
986 fn play(&self) -> Result<(), PlayStreamError> {
987 self.inner.channel.pause(false).ok();
988 Ok(())
989 }
990 fn pause(&self) -> Result<(), PauseStreamError> {
991 self.inner.channel.pause(true).ok();
992 Ok(())
993 }
994}
995
996fn set_hw_params_from_format(
997 pcm_handle: &alsa::pcm::PCM,
998 config: &StreamConfig,
999 sample_format: SampleFormat,
1000) -> Result<bool, BackendSpecificError> {
1001 let hw_params = alsa::pcm::HwParams::any(pcm_handle)?;
1002 hw_params.set_access(alsa::pcm::Access::RWInterleaved)?;
1003
1004 let sample_format = if cfg!(target_endian = "big") {
1005 match sample_format {
1006 SampleFormat::I8 => alsa::pcm::Format::S8,
1007 SampleFormat::I16 => alsa::pcm::Format::S16BE,
1008 // SampleFormat::I24 => alsa::pcm::Format::S24BE,
1009 SampleFormat::I32 => alsa::pcm::Format::S32BE,
1010 // SampleFormat::I48 => alsa::pcm::Format::S48BE,
1011 // SampleFormat::I64 => alsa::pcm::Format::S64BE,
1012 SampleFormat::U8 => alsa::pcm::Format::U8,
1013 SampleFormat::U16 => alsa::pcm::Format::U16BE,
1014 // SampleFormat::U24 => alsa::pcm::Format::U24BE,
1015 SampleFormat::U32 => alsa::pcm::Format::U32BE,
1016 // SampleFormat::U48 => alsa::pcm::Format::U48BE,
1017 // SampleFormat::U64 => alsa::pcm::Format::U64BE,
1018 SampleFormat::F32 => alsa::pcm::Format::FloatBE,
1019 SampleFormat::F64 => alsa::pcm::Format::Float64BE,
1020 sample_format => {
1021 return Err(BackendSpecificError {
1022 description: format!(
1023 "Sample format '{}' is not supported by this backend",
1024 sample_format
1025 ),
1026 })
1027 }
1028 }
1029 } else {
1030 match sample_format {
1031 SampleFormat::I8 => alsa::pcm::Format::S8,
1032 SampleFormat::I16 => alsa::pcm::Format::S16LE,
1033 // SampleFormat::I24 => alsa::pcm::Format::S24LE,
1034 SampleFormat::I32 => alsa::pcm::Format::S32LE,
1035 // SampleFormat::I48 => alsa::pcm::Format::S48LE,
1036 // SampleFormat::I64 => alsa::pcm::Format::S64LE,
1037 SampleFormat::U8 => alsa::pcm::Format::U8,
1038 SampleFormat::U16 => alsa::pcm::Format::U16LE,
1039 // SampleFormat::U24 => alsa::pcm::Format::U24LE,
1040 SampleFormat::U32 => alsa::pcm::Format::U32LE,
1041 // SampleFormat::U48 => alsa::pcm::Format::U48LE,
1042 // SampleFormat::U64 => alsa::pcm::Format::U64LE,
1043 SampleFormat::F32 => alsa::pcm::Format::FloatLE,
1044 SampleFormat::F64 => alsa::pcm::Format::Float64LE,
1045 sample_format => {
1046 return Err(BackendSpecificError {
1047 description: format!(
1048 "Sample format '{}' is not supported by this backend",
1049 sample_format
1050 ),
1051 })
1052 }
1053 }
1054 };
1055
1056 hw_params.set_format(sample_format)?;
1057 hw_params.set_rate(config.sample_rate.0, alsa::ValueOr::Nearest)?;
1058 hw_params.set_channels(config.channels as u32)?;
1059
1060 match config.buffer_size {
1061 BufferSize::Fixed(v) => {
1062 hw_params.set_period_size_near((v / 4) as alsa::pcm::Frames, alsa::ValueOr::Nearest)?;
1063 hw_params.set_buffer_size(v as alsa::pcm::Frames)?;
1064 }
1065 BufferSize::Default => {
1066 // These values together represent a moderate latency and wakeup interval.
1067 // Without them, we are at the mercy of the device
1068 hw_params.set_period_time_near(25_000, alsa::ValueOr::Nearest)?;
1069 hw_params.set_buffer_time_near(100_000, alsa::ValueOr::Nearest)?;
1070 }
1071 }
1072
1073 pcm_handle.hw_params(&hw_params)?;
1074
1075 Ok(hw_params.can_pause())
1076}
1077
1078fn set_sw_params_from_format(
1079 pcm_handle: &alsa::pcm::PCM,
1080 config: &StreamConfig,
1081 stream_type: alsa::Direction,
1082) -> Result<usize, BackendSpecificError> {
1083 let sw_params = pcm_handle.sw_params_current()?;
1084
1085 let period_len = {
1086 let (buffer, period) = pcm_handle.get_params()?;
1087 if buffer == 0 {
1088 return Err(BackendSpecificError {
1089 description: "initialization resulted in a null buffer".to_string(),
1090 });
1091 }
1092 sw_params.set_avail_min(period as alsa::pcm::Frames)?;
1093
1094 let start_threshold = match stream_type {
1095 alsa::Direction::Playback => buffer - period,
1096
1097 // For capture streams, the start threshold is irrelevant and ignored,
1098 // because build_stream_inner() starts the stream before process_input()
1099 // reads from it. Set it anyway I guess, since it's better than leaving
1100 // it at an unspecified default value.
1101 alsa::Direction::Capture => 1,
1102 };
1103 sw_params.set_start_threshold(start_threshold.try_into().unwrap())?;
1104
1105 period as usize * config.channels as usize
1106 };
1107
1108 sw_params.set_tstamp_mode(true)?;
1109 sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?;
1110
1111 // tstamp_type param cannot be changed after the device is opened.
1112 // The default tstamp_type value on most Linux systems is "monotonic",
1113 // let's try to use it if setting the tstamp_type fails.
1114 if pcm_handle.sw_params(&sw_params).is_err() {
1115 sw_params.set_tstamp_type(alsa::pcm::TstampType::Monotonic)?;
1116 pcm_handle.sw_params(&sw_params)?;
1117 }
1118
1119 Ok(period_len)
1120}
1121
1122impl From<alsa::Error> for BackendSpecificError {
1123 fn from(err: alsa::Error) -> Self {
1124 BackendSpecificError {
1125 description: err.to_string(),
1126 }
1127 }
1128}
1129
1130impl From<alsa::Error> for BuildStreamError {
1131 fn from(err: alsa::Error) -> Self {
1132 let err: BackendSpecificError = err.into();
1133 err.into()
1134 }
1135}
1136
1137impl From<alsa::Error> for SupportedStreamConfigsError {
1138 fn from(err: alsa::Error) -> Self {
1139 let err: BackendSpecificError = err.into();
1140 err.into()
1141 }
1142}
1143
1144impl From<alsa::Error> for PlayStreamError {
1145 fn from(err: alsa::Error) -> Self {
1146 let err: BackendSpecificError = err.into();
1147 err.into()
1148 }
1149}
1150
1151impl From<alsa::Error> for PauseStreamError {
1152 fn from(err: alsa::Error) -> Self {
1153 let err: BackendSpecificError = err.into();
1154 err.into()
1155 }
1156}
1157
1158impl From<alsa::Error> for StreamError {
1159 fn from(err: alsa::Error) -> Self {
1160 let err: BackendSpecificError = err.into();
1161 err.into()
1162 }
1163}
1164