1 | extern crate alsa; |
2 | extern crate libc; |
3 | |
4 | use self::alsa::poll::Descriptors; |
5 | use crate::traits::{DeviceTrait, HostTrait, StreamTrait}; |
6 | use crate::{ |
7 | BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data, |
8 | DefaultStreamConfigError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo, |
9 | PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError, |
10 | SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, |
11 | SupportedStreamConfigsError, |
12 | }; |
13 | use std::cmp; |
14 | use std::convert::TryInto; |
15 | use std::sync::{Arc, Mutex}; |
16 | use std::thread::{self, JoinHandle}; |
17 | use std::time::Duration; |
18 | use std::vec::IntoIter as VecIntoIter; |
19 | |
20 | pub use self::enumerate::{default_input_device, default_output_device, Devices}; |
21 | |
22 | pub type SupportedInputConfigs = VecIntoIter<SupportedStreamConfigRange>; |
23 | pub type SupportedOutputConfigs = VecIntoIter<SupportedStreamConfigRange>; |
24 | |
25 | mod enumerate; |
26 | |
27 | /// The default linux, dragonfly, freebsd and netbsd host type. |
28 | #[derive (Debug)] |
29 | pub struct Host; |
30 | |
31 | impl Host { |
32 | pub fn new() -> Result<Self, crate::HostUnavailable> { |
33 | Ok(Host) |
34 | } |
35 | } |
36 | |
37 | impl HostTrait for Host { |
38 | type Devices = Devices; |
39 | type Device = Device; |
40 | |
41 | fn is_available() -> bool { |
42 | // Assume ALSA is always available on linux/dragonfly/freebsd/netbsd. |
43 | true |
44 | } |
45 | |
46 | fn devices(&self) -> Result<Self::Devices, DevicesError> { |
47 | Devices::new() |
48 | } |
49 | |
50 | fn default_input_device(&self) -> Option<Self::Device> { |
51 | default_input_device() |
52 | } |
53 | |
54 | fn default_output_device(&self) -> Option<Self::Device> { |
55 | default_output_device() |
56 | } |
57 | } |
58 | |
59 | impl DeviceTrait for Device { |
60 | type SupportedInputConfigs = SupportedInputConfigs; |
61 | type SupportedOutputConfigs = SupportedOutputConfigs; |
62 | type Stream = Stream; |
63 | |
64 | fn name(&self) -> Result<String, DeviceNameError> { |
65 | Device::name(self) |
66 | } |
67 | |
68 | fn supported_input_configs( |
69 | &self, |
70 | ) -> Result<Self::SupportedInputConfigs, SupportedStreamConfigsError> { |
71 | Device::supported_input_configs(self) |
72 | } |
73 | |
74 | fn supported_output_configs( |
75 | &self, |
76 | ) -> Result<Self::SupportedOutputConfigs, SupportedStreamConfigsError> { |
77 | Device::supported_output_configs(self) |
78 | } |
79 | |
80 | fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> { |
81 | Device::default_input_config(self) |
82 | } |
83 | |
84 | fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> { |
85 | Device::default_output_config(self) |
86 | } |
87 | |
88 | fn build_input_stream_raw<D, E>( |
89 | &self, |
90 | conf: &StreamConfig, |
91 | sample_format: SampleFormat, |
92 | data_callback: D, |
93 | error_callback: E, |
94 | timeout: Option<Duration>, |
95 | ) -> Result<Self::Stream, BuildStreamError> |
96 | where |
97 | D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, |
98 | E: FnMut(StreamError) + Send + 'static, |
99 | { |
100 | let stream_inner = |
101 | self.build_stream_inner(conf, sample_format, alsa::Direction::Capture)?; |
102 | let stream = Stream::new_input( |
103 | Arc::new(stream_inner), |
104 | data_callback, |
105 | error_callback, |
106 | timeout, |
107 | ); |
108 | Ok(stream) |
109 | } |
110 | |
111 | fn build_output_stream_raw<D, E>( |
112 | &self, |
113 | conf: &StreamConfig, |
114 | sample_format: SampleFormat, |
115 | data_callback: D, |
116 | error_callback: E, |
117 | timeout: Option<Duration>, |
118 | ) -> Result<Self::Stream, BuildStreamError> |
119 | where |
120 | D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, |
121 | E: FnMut(StreamError) + Send + 'static, |
122 | { |
123 | let stream_inner = |
124 | self.build_stream_inner(conf, sample_format, alsa::Direction::Playback)?; |
125 | let stream = Stream::new_output( |
126 | Arc::new(stream_inner), |
127 | data_callback, |
128 | error_callback, |
129 | timeout, |
130 | ); |
131 | Ok(stream) |
132 | } |
133 | } |
134 | |
135 | struct TriggerSender(libc::c_int); |
136 | |
137 | struct TriggerReceiver(libc::c_int); |
138 | |
139 | impl TriggerSender { |
140 | fn wakeup(&self) { |
141 | let buf: u64 = 1u64; |
142 | let ret: isize = unsafe { libc::write(self.0, &buf as *const u64 as *const _, count:8) }; |
143 | assert_eq!(ret, 8); |
144 | } |
145 | } |
146 | |
147 | impl TriggerReceiver { |
148 | fn clear_pipe(&self) { |
149 | let mut out: u64 = 0u64; |
150 | let ret: isize = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, count:8) }; |
151 | assert_eq!(ret, 8); |
152 | } |
153 | } |
154 | |
155 | fn trigger() -> (TriggerSender, TriggerReceiver) { |
156 | let mut fds: [i32; 2] = [0, 0]; |
157 | match unsafe { libc::pipe(fds:fds.as_mut_ptr()) } { |
158 | 0 => (TriggerSender(fds[1]), TriggerReceiver(fds[0])), |
159 | _ => panic!("Could not create pipe" ), |
160 | } |
161 | } |
162 | |
163 | impl Drop for TriggerSender { |
164 | fn drop(&mut self) { |
165 | unsafe { |
166 | libc::close(self.0); |
167 | } |
168 | } |
169 | } |
170 | |
171 | impl Drop for TriggerReceiver { |
172 | fn drop(&mut self) { |
173 | unsafe { |
174 | libc::close(self.0); |
175 | } |
176 | } |
177 | } |
178 | |
179 | #[derive (Default)] |
180 | struct DeviceHandles { |
181 | playback: Option<alsa::PCM>, |
182 | capture: Option<alsa::PCM>, |
183 | } |
184 | |
185 | impl DeviceHandles { |
186 | /// Create `DeviceHandles` for `name` and try to open a handle for both |
187 | /// directions. Returns `Ok` if either direction is opened successfully. |
188 | fn open(name: &str) -> Result<Self, alsa::Error> { |
189 | let mut handles = Self::default(); |
190 | let playback_err = handles.try_open(name, alsa::Direction::Playback).err(); |
191 | let capture_err = handles.try_open(name, alsa::Direction::Capture).err(); |
192 | if let Some(err) = capture_err.and(playback_err) { |
193 | Err(err) |
194 | } else { |
195 | Ok(handles) |
196 | } |
197 | } |
198 | |
199 | /// Get a mutable reference to the `Option` for a specific `stream_type`. |
200 | /// If the `Option` is `None`, the `alsa::PCM` will be opened and placed in |
201 | /// the `Option` before returning. If `handle_mut()` returns `Ok` the contained |
202 | /// `Option` is guaranteed to be `Some(..)`. |
203 | fn try_open( |
204 | &mut self, |
205 | name: &str, |
206 | stream_type: alsa::Direction, |
207 | ) -> Result<&mut Option<alsa::PCM>, alsa::Error> { |
208 | let handle = match stream_type { |
209 | alsa::Direction::Playback => &mut self.playback, |
210 | alsa::Direction::Capture => &mut self.capture, |
211 | }; |
212 | |
213 | if handle.is_none() { |
214 | *handle = Some(alsa::pcm::PCM::new(name, stream_type, true)?); |
215 | } |
216 | |
217 | Ok(handle) |
218 | } |
219 | |
220 | /// Get a mutable reference to the `alsa::PCM` handle for a specific `stream_type`. |
221 | /// If the handle is not yet opened, it will be opened and stored in `self`. |
222 | fn get_mut( |
223 | &mut self, |
224 | name: &str, |
225 | stream_type: alsa::Direction, |
226 | ) -> Result<&mut alsa::PCM, alsa::Error> { |
227 | Ok(self.try_open(name, stream_type)?.as_mut().unwrap()) |
228 | } |
229 | |
230 | /// Take ownership of the `alsa::PCM` handle for a specific `stream_type`. |
231 | /// If the handle is not yet opened, it will be opened and returned. |
232 | fn take(&mut self, name: &str, stream_type: alsa::Direction) -> Result<alsa::PCM, alsa::Error> { |
233 | Ok(self.try_open(name, stream_type)?.take().unwrap()) |
234 | } |
235 | } |
236 | |
237 | #[derive (Clone)] |
238 | pub struct Device { |
239 | name: String, |
240 | handles: Arc<Mutex<DeviceHandles>>, |
241 | } |
242 | |
243 | impl Device { |
244 | fn build_stream_inner( |
245 | &self, |
246 | conf: &StreamConfig, |
247 | sample_format: SampleFormat, |
248 | stream_type: alsa::Direction, |
249 | ) -> Result<StreamInner, BuildStreamError> { |
250 | let handle_result = self |
251 | .handles |
252 | .lock() |
253 | .unwrap() |
254 | .take(&self.name, stream_type) |
255 | .map_err(|e| (e, e.errno())); |
256 | |
257 | let handle = match handle_result { |
258 | Err((_, libc::EBUSY)) => return Err(BuildStreamError::DeviceNotAvailable), |
259 | Err((_, libc::EINVAL)) => return Err(BuildStreamError::InvalidArgument), |
260 | Err((e, _)) => return Err(e.into()), |
261 | Ok(handle) => handle, |
262 | }; |
263 | let can_pause = set_hw_params_from_format(&handle, conf, sample_format)?; |
264 | let period_len = set_sw_params_from_format(&handle, conf, stream_type)?; |
265 | |
266 | handle.prepare()?; |
267 | |
268 | let num_descriptors = handle.count(); |
269 | if num_descriptors == 0 { |
270 | let description = "poll descriptor count for stream was 0" .to_string(); |
271 | let err = BackendSpecificError { description }; |
272 | return Err(err.into()); |
273 | } |
274 | |
275 | // Check to see if we can retrieve valid timestamps from the device. |
276 | // Related: https://bugs.freedesktop.org/show_bug.cgi?id=88503 |
277 | let ts = handle.status()?.get_htstamp(); |
278 | let creation_instant = match (ts.tv_sec, ts.tv_nsec) { |
279 | (0, 0) => Some(std::time::Instant::now()), |
280 | _ => None, |
281 | }; |
282 | |
283 | if let alsa::Direction::Capture = stream_type { |
284 | handle.start()?; |
285 | } |
286 | |
287 | let stream_inner = StreamInner { |
288 | channel: handle, |
289 | sample_format, |
290 | num_descriptors, |
291 | conf: conf.clone(), |
292 | period_len, |
293 | can_pause, |
294 | creation_instant, |
295 | }; |
296 | |
297 | Ok(stream_inner) |
298 | } |
299 | |
300 | #[inline ] |
301 | fn name(&self) -> Result<String, DeviceNameError> { |
302 | Ok(self.name.clone()) |
303 | } |
304 | |
305 | fn supported_configs( |
306 | &self, |
307 | stream_t: alsa::Direction, |
308 | ) -> Result<VecIntoIter<SupportedStreamConfigRange>, SupportedStreamConfigsError> { |
309 | let mut guard = self.handles.lock().unwrap(); |
310 | let handle_result = guard |
311 | .get_mut(&self.name, stream_t) |
312 | .map_err(|e| (e, e.errno())); |
313 | |
314 | let handle = match handle_result { |
315 | Err((_, libc::ENOENT)) | Err((_, libc::EBUSY)) => { |
316 | return Err(SupportedStreamConfigsError::DeviceNotAvailable) |
317 | } |
318 | Err((_, libc::EINVAL)) => return Err(SupportedStreamConfigsError::InvalidArgument), |
319 | Err((e, _)) => return Err(e.into()), |
320 | Ok(handle) => handle, |
321 | }; |
322 | |
323 | let hw_params = alsa::pcm::HwParams::any(handle)?; |
324 | |
325 | // TODO: check endianness |
326 | const FORMATS: [(SampleFormat, alsa::pcm::Format); 8] = [ |
327 | (SampleFormat::I8, alsa::pcm::Format::S8), |
328 | (SampleFormat::U8, alsa::pcm::Format::U8), |
329 | (SampleFormat::I16, alsa::pcm::Format::S16LE), |
330 | //SND_PCM_FORMAT_S16_BE, |
331 | (SampleFormat::U16, alsa::pcm::Format::U16LE), |
332 | //SND_PCM_FORMAT_U16_BE, |
333 | //SND_PCM_FORMAT_S24_LE, |
334 | //SND_PCM_FORMAT_S24_BE, |
335 | //SND_PCM_FORMAT_U24_LE, |
336 | //SND_PCM_FORMAT_U24_BE, |
337 | (SampleFormat::I32, alsa::pcm::Format::S32LE), |
338 | //SND_PCM_FORMAT_S32_BE, |
339 | (SampleFormat::U32, alsa::pcm::Format::U32LE), |
340 | //SND_PCM_FORMAT_U32_BE, |
341 | (SampleFormat::F32, alsa::pcm::Format::FloatLE), |
342 | //SND_PCM_FORMAT_FLOAT_BE, |
343 | (SampleFormat::F64, alsa::pcm::Format::Float64LE), |
344 | //SND_PCM_FORMAT_FLOAT64_BE, |
345 | //SND_PCM_FORMAT_IEC958_SUBFRAME_LE, |
346 | //SND_PCM_FORMAT_IEC958_SUBFRAME_BE, |
347 | //SND_PCM_FORMAT_MU_LAW, |
348 | //SND_PCM_FORMAT_A_LAW, |
349 | //SND_PCM_FORMAT_IMA_ADPCM, |
350 | //SND_PCM_FORMAT_MPEG, |
351 | //SND_PCM_FORMAT_GSM, |
352 | //SND_PCM_FORMAT_SPECIAL, |
353 | //SND_PCM_FORMAT_S24_3LE, |
354 | //SND_PCM_FORMAT_S24_3BE, |
355 | //SND_PCM_FORMAT_U24_3LE, |
356 | //SND_PCM_FORMAT_U24_3BE, |
357 | //SND_PCM_FORMAT_S20_3LE, |
358 | //SND_PCM_FORMAT_S20_3BE, |
359 | //SND_PCM_FORMAT_U20_3LE, |
360 | //SND_PCM_FORMAT_U20_3BE, |
361 | //SND_PCM_FORMAT_S18_3LE, |
362 | //SND_PCM_FORMAT_S18_3BE, |
363 | //SND_PCM_FORMAT_U18_3LE, |
364 | //SND_PCM_FORMAT_U18_3BE, |
365 | ]; |
366 | |
367 | let mut supported_formats = Vec::new(); |
368 | for &(sample_format, alsa_format) in FORMATS.iter() { |
369 | if hw_params.test_format(alsa_format).is_ok() { |
370 | supported_formats.push(sample_format); |
371 | } |
372 | } |
373 | |
374 | let min_rate = hw_params.get_rate_min()?; |
375 | let max_rate = hw_params.get_rate_max()?; |
376 | |
377 | let sample_rates = if min_rate == max_rate || hw_params.test_rate(min_rate + 1).is_ok() { |
378 | vec![(min_rate, max_rate)] |
379 | } else { |
380 | const RATES: [libc::c_uint; 13] = [ |
381 | 5512, 8000, 11025, 16000, 22050, 32000, 44100, 48000, 64000, 88200, 96000, 176400, |
382 | 192000, |
383 | ]; |
384 | |
385 | let mut rates = Vec::new(); |
386 | for &rate in RATES.iter() { |
387 | if hw_params.test_rate(rate).is_ok() { |
388 | rates.push((rate, rate)); |
389 | } |
390 | } |
391 | |
392 | if rates.is_empty() { |
393 | vec![(min_rate, max_rate)] |
394 | } else { |
395 | rates |
396 | } |
397 | }; |
398 | |
399 | let min_channels = hw_params.get_channels_min()?; |
400 | let max_channels = hw_params.get_channels_max()?; |
401 | |
402 | let max_channels = cmp::min(max_channels, 32); // TODO: limiting to 32 channels or too much stuff is returned |
403 | let supported_channels = (min_channels..max_channels + 1) |
404 | .filter_map(|num| { |
405 | if hw_params.test_channels(num).is_ok() { |
406 | Some(num as ChannelCount) |
407 | } else { |
408 | None |
409 | } |
410 | }) |
411 | .collect::<Vec<_>>(); |
412 | |
413 | let min_buffer_size = hw_params.get_buffer_size_min()?; |
414 | let max_buffer_size = hw_params.get_buffer_size_max()?; |
415 | |
416 | let buffer_size_range = SupportedBufferSize::Range { |
417 | min: min_buffer_size as u32, |
418 | max: max_buffer_size as u32, |
419 | }; |
420 | |
421 | let mut output = Vec::with_capacity( |
422 | supported_formats.len() * supported_channels.len() * sample_rates.len(), |
423 | ); |
424 | for &sample_format in supported_formats.iter() { |
425 | for &channels in supported_channels.iter() { |
426 | for &(min_rate, max_rate) in sample_rates.iter() { |
427 | output.push(SupportedStreamConfigRange { |
428 | channels, |
429 | min_sample_rate: SampleRate(min_rate as u32), |
430 | max_sample_rate: SampleRate(max_rate as u32), |
431 | buffer_size: buffer_size_range.clone(), |
432 | sample_format, |
433 | }); |
434 | } |
435 | } |
436 | } |
437 | |
438 | Ok(output.into_iter()) |
439 | } |
440 | |
441 | fn supported_input_configs( |
442 | &self, |
443 | ) -> Result<SupportedInputConfigs, SupportedStreamConfigsError> { |
444 | self.supported_configs(alsa::Direction::Capture) |
445 | } |
446 | |
447 | fn supported_output_configs( |
448 | &self, |
449 | ) -> Result<SupportedOutputConfigs, SupportedStreamConfigsError> { |
450 | self.supported_configs(alsa::Direction::Playback) |
451 | } |
452 | |
453 | // ALSA does not offer default stream formats, so instead we compare all supported formats by |
454 | // the `SupportedStreamConfigRange::cmp_default_heuristics` order and select the greatest. |
455 | fn default_config( |
456 | &self, |
457 | stream_t: alsa::Direction, |
458 | ) -> Result<SupportedStreamConfig, DefaultStreamConfigError> { |
459 | let mut formats: Vec<_> = { |
460 | match self.supported_configs(stream_t) { |
461 | Err(SupportedStreamConfigsError::DeviceNotAvailable) => { |
462 | return Err(DefaultStreamConfigError::DeviceNotAvailable); |
463 | } |
464 | Err(SupportedStreamConfigsError::InvalidArgument) => { |
465 | // this happens sometimes when querying for input and output capabilities, but |
466 | // the device supports only one |
467 | return Err(DefaultStreamConfigError::StreamTypeNotSupported); |
468 | } |
469 | Err(SupportedStreamConfigsError::BackendSpecific { err }) => { |
470 | return Err(err.into()); |
471 | } |
472 | Ok(fmts) => fmts.collect(), |
473 | } |
474 | }; |
475 | |
476 | formats.sort_by(|a, b| a.cmp_default_heuristics(b)); |
477 | |
478 | match formats.into_iter().last() { |
479 | Some(f) => { |
480 | let min_r = f.min_sample_rate; |
481 | let max_r = f.max_sample_rate; |
482 | let mut format = f.with_max_sample_rate(); |
483 | const HZ_44100: SampleRate = SampleRate(44_100); |
484 | if min_r <= HZ_44100 && HZ_44100 <= max_r { |
485 | format.sample_rate = HZ_44100; |
486 | } |
487 | Ok(format) |
488 | } |
489 | None => Err(DefaultStreamConfigError::StreamTypeNotSupported), |
490 | } |
491 | } |
492 | |
493 | fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> { |
494 | self.default_config(alsa::Direction::Capture) |
495 | } |
496 | |
497 | fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> { |
498 | self.default_config(alsa::Direction::Playback) |
499 | } |
500 | } |
501 | |
502 | struct StreamInner { |
503 | // The ALSA channel. |
504 | channel: alsa::pcm::PCM, |
505 | |
506 | // When converting between file descriptors and `snd_pcm_t`, this is the number of |
507 | // file descriptors that this `snd_pcm_t` uses. |
508 | num_descriptors: usize, |
509 | |
510 | // Format of the samples. |
511 | sample_format: SampleFormat, |
512 | |
513 | // The configuration used to open this stream. |
514 | conf: StreamConfig, |
515 | |
516 | // Minimum number of samples to put in the buffer. |
517 | period_len: usize, |
518 | |
519 | #[allow (dead_code)] |
520 | // Whether or not the hardware supports pausing the stream. |
521 | // TODO: We need an API to expose this. See #197, #284. |
522 | can_pause: bool, |
523 | |
524 | // In the case that the device does not return valid timestamps via `get_htstamp`, this field |
525 | // will be `Some` and will contain an `Instant` representing the moment the stream was created. |
526 | // |
527 | // If this field is `Some`, then the stream will use the duration since this instant as a |
528 | // source for timestamps. |
529 | // |
530 | // If this field is `None` then the elapsed duration between `get_trigger_htstamp` and |
531 | // `get_htstamp` is used. |
532 | creation_instant: Option<std::time::Instant>, |
533 | } |
534 | |
535 | // Assume that the ALSA library is built with thread safe option. |
536 | unsafe impl Sync for StreamInner {} |
537 | |
538 | #[derive (Debug, Eq, PartialEq)] |
539 | enum StreamType { |
540 | Input, |
541 | Output, |
542 | } |
543 | |
544 | pub struct Stream { |
545 | /// The high-priority audio processing thread calling callbacks. |
546 | /// Option used for moving out in destructor. |
547 | thread: Option<JoinHandle<()>>, |
548 | |
549 | /// Handle to the underlying stream for playback controls. |
550 | inner: Arc<StreamInner>, |
551 | |
552 | /// Used to signal to stop processing. |
553 | trigger: TriggerSender, |
554 | } |
555 | |
556 | struct StreamWorkerContext { |
557 | descriptors: Vec<libc::pollfd>, |
558 | buffer: Vec<u8>, |
559 | poll_timeout: i32, |
560 | } |
561 | |
562 | impl StreamWorkerContext { |
563 | fn new(poll_timeout: &Option<Duration>) -> Self { |
564 | let poll_timeout: i32 = if let Some(d: &Duration) = poll_timeout { |
565 | d.as_millis().try_into().unwrap() |
566 | } else { |
567 | -1 |
568 | }; |
569 | |
570 | Self { |
571 | descriptors: Vec::new(), |
572 | buffer: Vec::new(), |
573 | poll_timeout, |
574 | } |
575 | } |
576 | } |
577 | |
578 | fn input_stream_worker( |
579 | rx: TriggerReceiver, |
580 | stream: &StreamInner, |
581 | data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static), |
582 | error_callback: &mut (dyn FnMut(StreamError) + Send + 'static), |
583 | timeout: Option<Duration>, |
584 | ) { |
585 | let mut ctxt = StreamWorkerContext::new(&timeout); |
586 | loop { |
587 | let flow = |
588 | poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| { |
589 | error_callback(err.into()); |
590 | PollDescriptorsFlow::Continue |
591 | }); |
592 | |
593 | match flow { |
594 | PollDescriptorsFlow::Continue => { |
595 | continue; |
596 | } |
597 | PollDescriptorsFlow::XRun => { |
598 | if let Err(err) = stream.channel.prepare() { |
599 | error_callback(err.into()); |
600 | } |
601 | continue; |
602 | } |
603 | PollDescriptorsFlow::Return => return, |
604 | PollDescriptorsFlow::Ready { |
605 | status, |
606 | avail_frames: _, |
607 | delay_frames, |
608 | stream_type, |
609 | } => { |
610 | assert_eq!( |
611 | stream_type, |
612 | StreamType::Input, |
613 | "expected input stream, but polling descriptors indicated output" , |
614 | ); |
615 | if let Err(err) = process_input( |
616 | stream, |
617 | &mut ctxt.buffer, |
618 | status, |
619 | delay_frames, |
620 | data_callback, |
621 | ) { |
622 | error_callback(err.into()); |
623 | } |
624 | } |
625 | } |
626 | } |
627 | } |
628 | |
629 | fn output_stream_worker( |
630 | rx: TriggerReceiver, |
631 | stream: &StreamInner, |
632 | data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static), |
633 | error_callback: &mut (dyn FnMut(StreamError) + Send + 'static), |
634 | timeout: Option<Duration>, |
635 | ) { |
636 | let mut ctxt = StreamWorkerContext::new(&timeout); |
637 | loop { |
638 | let flow = |
639 | poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| { |
640 | error_callback(err.into()); |
641 | PollDescriptorsFlow::Continue |
642 | }); |
643 | |
644 | match flow { |
645 | PollDescriptorsFlow::Continue => continue, |
646 | PollDescriptorsFlow::XRun => { |
647 | if let Err(err) = stream.channel.prepare() { |
648 | error_callback(err.into()); |
649 | } |
650 | continue; |
651 | } |
652 | PollDescriptorsFlow::Return => return, |
653 | PollDescriptorsFlow::Ready { |
654 | status, |
655 | avail_frames, |
656 | delay_frames, |
657 | stream_type, |
658 | } => { |
659 | assert_eq!( |
660 | stream_type, |
661 | StreamType::Output, |
662 | "expected output stream, but polling descriptors indicated input" , |
663 | ); |
664 | if let Err(err) = process_output( |
665 | stream, |
666 | &mut ctxt.buffer, |
667 | status, |
668 | avail_frames, |
669 | delay_frames, |
670 | data_callback, |
671 | error_callback, |
672 | ) { |
673 | error_callback(err.into()); |
674 | } |
675 | } |
676 | } |
677 | } |
678 | } |
679 | |
680 | enum PollDescriptorsFlow { |
681 | Continue, |
682 | Return, |
683 | Ready { |
684 | stream_type: StreamType, |
685 | status: alsa::pcm::Status, |
686 | avail_frames: usize, |
687 | delay_frames: usize, |
688 | }, |
689 | XRun, |
690 | } |
691 | |
692 | // This block is shared between both input and output stream worker functions. |
693 | fn poll_descriptors_and_prepare_buffer( |
694 | rx: &TriggerReceiver, |
695 | stream: &StreamInner, |
696 | ctxt: &mut StreamWorkerContext, |
697 | ) -> Result<PollDescriptorsFlow, BackendSpecificError> { |
698 | let StreamWorkerContext { |
699 | ref mut descriptors, |
700 | ref mut buffer, |
701 | ref poll_timeout, |
702 | } = *ctxt; |
703 | |
704 | descriptors.clear(); |
705 | |
706 | // Add the self-pipe for signaling termination. |
707 | descriptors.push(libc::pollfd { |
708 | fd: rx.0, |
709 | events: libc::POLLIN, |
710 | revents: 0, |
711 | }); |
712 | |
713 | // Add ALSA polling fds. |
714 | let len = descriptors.len(); |
715 | descriptors.resize( |
716 | stream.num_descriptors + len, |
717 | libc::pollfd { |
718 | fd: 0, |
719 | events: 0, |
720 | revents: 0, |
721 | }, |
722 | ); |
723 | let filled = stream.channel.fill(&mut descriptors[len..])?; |
724 | debug_assert_eq!(filled, stream.num_descriptors); |
725 | |
726 | // Don't timeout, wait forever. |
727 | let res = alsa::poll::poll(descriptors, *poll_timeout)?; |
728 | if res == 0 { |
729 | let description = String::from("`alsa::poll()` spuriously returned" ); |
730 | return Err(BackendSpecificError { description }); |
731 | } |
732 | |
733 | if descriptors[0].revents != 0 { |
734 | // The stream has been requested to be destroyed. |
735 | rx.clear_pipe(); |
736 | return Ok(PollDescriptorsFlow::Return); |
737 | } |
738 | |
739 | let revents = stream.channel.revents(&descriptors[1..])?; |
740 | if revents.contains(alsa::poll::Flags::ERR) { |
741 | let description = String::from("`alsa::poll()` returned POLLERR" ); |
742 | return Err(BackendSpecificError { description }); |
743 | } |
744 | let stream_type = match revents { |
745 | alsa::poll::Flags::OUT => StreamType::Output, |
746 | alsa::poll::Flags::IN => StreamType::Input, |
747 | _ => { |
748 | // Nothing to process, poll again |
749 | return Ok(PollDescriptorsFlow::Continue); |
750 | } |
751 | }; |
752 | |
753 | let status = stream.channel.status()?; |
754 | let avail_frames = match stream.channel.avail() { |
755 | Err(err) if err.errno() == libc::EPIPE => return Ok(PollDescriptorsFlow::XRun), |
756 | res => res, |
757 | }? as usize; |
758 | let delay_frames = match status.get_delay() { |
759 | // Buffer underrun. TODO: Notify the user. |
760 | d if d < 0 => 0, |
761 | d => d as usize, |
762 | }; |
763 | let available_samples = avail_frames * stream.conf.channels as usize; |
764 | |
765 | // Only go on if there is at least `stream.period_len` samples. |
766 | if available_samples < stream.period_len { |
767 | return Ok(PollDescriptorsFlow::Continue); |
768 | } |
769 | |
770 | // Prepare the data buffer. |
771 | let buffer_size = stream.sample_format.sample_size() * available_samples; |
772 | buffer.resize(buffer_size, 0u8); |
773 | |
774 | Ok(PollDescriptorsFlow::Ready { |
775 | stream_type, |
776 | status, |
777 | avail_frames, |
778 | delay_frames, |
779 | }) |
780 | } |
781 | |
782 | // Read input data from ALSA and deliver it to the user. |
783 | fn process_input( |
784 | stream: &StreamInner, |
785 | buffer: &mut [u8], |
786 | status: alsa::pcm::Status, |
787 | delay_frames: usize, |
788 | data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static), |
789 | ) -> Result<(), BackendSpecificError> { |
790 | stream.channel.io_bytes().readi(buf:buffer)?; |
791 | let sample_format: SampleFormat = stream.sample_format; |
792 | let data: *mut () = buffer.as_mut_ptr() as *mut (); |
793 | let len: usize = buffer.len() / sample_format.sample_size(); |
794 | let data: Data = unsafe { Data::from_parts(data, len, sample_format) }; |
795 | let callback: StreamInstant = stream_timestamp(&status, stream.creation_instant)?; |
796 | let delay_duration: Duration = frames_to_duration(delay_frames, stream.conf.sample_rate); |
797 | let capture: StreamInstant = callback |
798 | .sub(delay_duration) |
799 | .expect(msg:"`capture` is earlier than representation supported by `StreamInstant`" ); |
800 | let timestamp: InputStreamTimestamp = crate::InputStreamTimestamp { callback, capture }; |
801 | let info: InputCallbackInfo = crate::InputCallbackInfo { timestamp }; |
802 | data_callback(&data, &info); |
803 | |
804 | Ok(()) |
805 | } |
806 | |
807 | // Request data from the user's function and write it via ALSA. |
808 | // |
809 | // Returns `true` |
810 | fn process_output( |
811 | stream: &StreamInner, |
812 | buffer: &mut [u8], |
813 | status: alsa::pcm::Status, |
814 | available_frames: usize, |
815 | delay_frames: usize, |
816 | data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static), |
817 | error_callback: &mut dyn FnMut(StreamError), |
818 | ) -> Result<(), BackendSpecificError> { |
819 | { |
820 | // We're now sure that we're ready to write data. |
821 | let sample_format = stream.sample_format; |
822 | let data = buffer.as_mut_ptr() as *mut (); |
823 | let len = buffer.len() / sample_format.sample_size(); |
824 | let mut data = unsafe { Data::from_parts(data, len, sample_format) }; |
825 | let callback = stream_timestamp(&status, stream.creation_instant)?; |
826 | let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate); |
827 | let playback = callback |
828 | .add(delay_duration) |
829 | .expect("`playback` occurs beyond representation supported by `StreamInstant`" ); |
830 | let timestamp = crate::OutputStreamTimestamp { callback, playback }; |
831 | let info = crate::OutputCallbackInfo { timestamp }; |
832 | data_callback(&mut data, &info); |
833 | } |
834 | loop { |
835 | match stream.channel.io_bytes().writei(buffer) { |
836 | Err(err) if err.errno() == libc::EPIPE => { |
837 | // buffer underrun |
838 | // TODO: Notify the user of this. |
839 | let _ = stream.channel.try_recover(err, false); |
840 | } |
841 | Err(err) => { |
842 | error_callback(err.into()); |
843 | continue; |
844 | } |
845 | Ok(result) if result != available_frames => { |
846 | let description = format!( |
847 | "unexpected number of frames written: expected {}, \ |
848 | result {} (this should never happen)" , |
849 | available_frames, result, |
850 | ); |
851 | error_callback(BackendSpecificError { description }.into()); |
852 | continue; |
853 | } |
854 | _ => { |
855 | break; |
856 | } |
857 | } |
858 | } |
859 | Ok(()) |
860 | } |
861 | |
862 | // Use the elapsed duration since the start of the stream. |
863 | // |
864 | // This ensures positive values that are compatible with our `StreamInstant` representation. |
865 | fn stream_timestamp( |
866 | status: &alsa::pcm::Status, |
867 | creation_instant: Option<std::time::Instant>, |
868 | ) -> Result<crate::StreamInstant, BackendSpecificError> { |
869 | match creation_instant { |
870 | None => { |
871 | let trigger_ts: timespec = status.get_trigger_htstamp(); |
872 | let ts: timespec = status.get_htstamp(); |
873 | let nanos: i64 = timespec_diff_nanos(a:ts, b:trigger_ts); |
874 | if nanos < 0 { |
875 | panic!( |
876 | "get_htstamp ` {}. {}` was earlier than get_trigger_htstamp ` {}. {}`" , |
877 | ts.tv_sec, ts.tv_nsec, trigger_ts.tv_sec, trigger_ts.tv_nsec |
878 | ); |
879 | } |
880 | Ok(crate::StreamInstant::from_nanos(nanos)) |
881 | } |
882 | Some(creation: Instant) => { |
883 | let now: Instant = std::time::Instant::now(); |
884 | let duration: Duration = now.duration_since(earlier:creation); |
885 | let instant: StreamInstant = crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128) |
886 | .expect(msg:"stream duration has exceeded `StreamInstant` representation" ); |
887 | Ok(instant) |
888 | } |
889 | } |
890 | } |
891 | |
892 | // Adapted from `timestamp2ns` here: |
893 | // https://fossies.org/linux/alsa-lib/test/audio_time.c |
894 | fn timespec_to_nanos(ts: libc::timespec) -> i64 { |
895 | ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec as i64 |
896 | } |
897 | |
898 | // Adapted from `timediff` here: |
899 | // https://fossies.org/linux/alsa-lib/test/audio_time.c |
900 | fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 { |
901 | timespec_to_nanos(ts:a) - timespec_to_nanos(ts:b) |
902 | } |
903 | |
904 | // Convert the given duration in frames at the given sample rate to a `std::time::Duration`. |
905 | fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { |
906 | let secsf: f64 = frames as f64 / rate.0 as f64; |
907 | let secs: u64 = secsf as u64; |
908 | let nanos: u32 = ((secsf - secs as f64) * 1_000_000_000.0) as u32; |
909 | std::time::Duration::new(secs, nanos) |
910 | } |
911 | |
912 | impl Stream { |
913 | fn new_input<D, E>( |
914 | inner: Arc<StreamInner>, |
915 | mut data_callback: D, |
916 | mut error_callback: E, |
917 | timeout: Option<Duration>, |
918 | ) -> Stream |
919 | where |
920 | D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, |
921 | E: FnMut(StreamError) + Send + 'static, |
922 | { |
923 | let (tx, rx) = trigger(); |
924 | // Clone the handle for passing into worker thread. |
925 | let stream = inner.clone(); |
926 | let thread = thread::Builder::new() |
927 | .name("cpal_alsa_in" .to_owned()) |
928 | .spawn(move || { |
929 | input_stream_worker( |
930 | rx, |
931 | &stream, |
932 | &mut data_callback, |
933 | &mut error_callback, |
934 | timeout, |
935 | ); |
936 | }) |
937 | .unwrap(); |
938 | Stream { |
939 | thread: Some(thread), |
940 | inner, |
941 | trigger: tx, |
942 | } |
943 | } |
944 | |
945 | fn new_output<D, E>( |
946 | inner: Arc<StreamInner>, |
947 | mut data_callback: D, |
948 | mut error_callback: E, |
949 | timeout: Option<Duration>, |
950 | ) -> Stream |
951 | where |
952 | D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, |
953 | E: FnMut(StreamError) + Send + 'static, |
954 | { |
955 | let (tx, rx) = trigger(); |
956 | // Clone the handle for passing into worker thread. |
957 | let stream = inner.clone(); |
958 | let thread = thread::Builder::new() |
959 | .name("cpal_alsa_out" .to_owned()) |
960 | .spawn(move || { |
961 | output_stream_worker( |
962 | rx, |
963 | &stream, |
964 | &mut data_callback, |
965 | &mut error_callback, |
966 | timeout, |
967 | ); |
968 | }) |
969 | .unwrap(); |
970 | Stream { |
971 | thread: Some(thread), |
972 | inner, |
973 | trigger: tx, |
974 | } |
975 | } |
976 | } |
977 | |
978 | impl Drop for Stream { |
979 | fn drop(&mut self) { |
980 | self.trigger.wakeup(); |
981 | self.thread.take().unwrap().join().unwrap(); |
982 | } |
983 | } |
984 | |
985 | impl StreamTrait for Stream { |
986 | fn play(&self) -> Result<(), PlayStreamError> { |
987 | self.inner.channel.pause(false).ok(); |
988 | Ok(()) |
989 | } |
990 | fn pause(&self) -> Result<(), PauseStreamError> { |
991 | self.inner.channel.pause(true).ok(); |
992 | Ok(()) |
993 | } |
994 | } |
995 | |
996 | fn set_hw_params_from_format( |
997 | pcm_handle: &alsa::pcm::PCM, |
998 | config: &StreamConfig, |
999 | sample_format: SampleFormat, |
1000 | ) -> Result<bool, BackendSpecificError> { |
1001 | let hw_params = alsa::pcm::HwParams::any(pcm_handle)?; |
1002 | hw_params.set_access(alsa::pcm::Access::RWInterleaved)?; |
1003 | |
1004 | let sample_format = if cfg!(target_endian = "big" ) { |
1005 | match sample_format { |
1006 | SampleFormat::I8 => alsa::pcm::Format::S8, |
1007 | SampleFormat::I16 => alsa::pcm::Format::S16BE, |
1008 | // SampleFormat::I24 => alsa::pcm::Format::S24BE, |
1009 | SampleFormat::I32 => alsa::pcm::Format::S32BE, |
1010 | // SampleFormat::I48 => alsa::pcm::Format::S48BE, |
1011 | // SampleFormat::I64 => alsa::pcm::Format::S64BE, |
1012 | SampleFormat::U8 => alsa::pcm::Format::U8, |
1013 | SampleFormat::U16 => alsa::pcm::Format::U16BE, |
1014 | // SampleFormat::U24 => alsa::pcm::Format::U24BE, |
1015 | SampleFormat::U32 => alsa::pcm::Format::U32BE, |
1016 | // SampleFormat::U48 => alsa::pcm::Format::U48BE, |
1017 | // SampleFormat::U64 => alsa::pcm::Format::U64BE, |
1018 | SampleFormat::F32 => alsa::pcm::Format::FloatBE, |
1019 | SampleFormat::F64 => alsa::pcm::Format::Float64BE, |
1020 | sample_format => { |
1021 | return Err(BackendSpecificError { |
1022 | description: format!( |
1023 | "Sample format ' {}' is not supported by this backend" , |
1024 | sample_format |
1025 | ), |
1026 | }) |
1027 | } |
1028 | } |
1029 | } else { |
1030 | match sample_format { |
1031 | SampleFormat::I8 => alsa::pcm::Format::S8, |
1032 | SampleFormat::I16 => alsa::pcm::Format::S16LE, |
1033 | // SampleFormat::I24 => alsa::pcm::Format::S24LE, |
1034 | SampleFormat::I32 => alsa::pcm::Format::S32LE, |
1035 | // SampleFormat::I48 => alsa::pcm::Format::S48LE, |
1036 | // SampleFormat::I64 => alsa::pcm::Format::S64LE, |
1037 | SampleFormat::U8 => alsa::pcm::Format::U8, |
1038 | SampleFormat::U16 => alsa::pcm::Format::U16LE, |
1039 | // SampleFormat::U24 => alsa::pcm::Format::U24LE, |
1040 | SampleFormat::U32 => alsa::pcm::Format::U32LE, |
1041 | // SampleFormat::U48 => alsa::pcm::Format::U48LE, |
1042 | // SampleFormat::U64 => alsa::pcm::Format::U64LE, |
1043 | SampleFormat::F32 => alsa::pcm::Format::FloatLE, |
1044 | SampleFormat::F64 => alsa::pcm::Format::Float64LE, |
1045 | sample_format => { |
1046 | return Err(BackendSpecificError { |
1047 | description: format!( |
1048 | "Sample format ' {}' is not supported by this backend" , |
1049 | sample_format |
1050 | ), |
1051 | }) |
1052 | } |
1053 | } |
1054 | }; |
1055 | |
1056 | hw_params.set_format(sample_format)?; |
1057 | hw_params.set_rate(config.sample_rate.0, alsa::ValueOr::Nearest)?; |
1058 | hw_params.set_channels(config.channels as u32)?; |
1059 | |
1060 | match config.buffer_size { |
1061 | BufferSize::Fixed(v) => { |
1062 | hw_params.set_period_size_near((v / 4) as alsa::pcm::Frames, alsa::ValueOr::Nearest)?; |
1063 | hw_params.set_buffer_size(v as alsa::pcm::Frames)?; |
1064 | } |
1065 | BufferSize::Default => { |
1066 | // These values together represent a moderate latency and wakeup interval. |
1067 | // Without them, we are at the mercy of the device |
1068 | hw_params.set_period_time_near(25_000, alsa::ValueOr::Nearest)?; |
1069 | hw_params.set_buffer_time_near(100_000, alsa::ValueOr::Nearest)?; |
1070 | } |
1071 | } |
1072 | |
1073 | pcm_handle.hw_params(&hw_params)?; |
1074 | |
1075 | Ok(hw_params.can_pause()) |
1076 | } |
1077 | |
1078 | fn set_sw_params_from_format( |
1079 | pcm_handle: &alsa::pcm::PCM, |
1080 | config: &StreamConfig, |
1081 | stream_type: alsa::Direction, |
1082 | ) -> Result<usize, BackendSpecificError> { |
1083 | let sw_params = pcm_handle.sw_params_current()?; |
1084 | |
1085 | let period_len = { |
1086 | let (buffer, period) = pcm_handle.get_params()?; |
1087 | if buffer == 0 { |
1088 | return Err(BackendSpecificError { |
1089 | description: "initialization resulted in a null buffer" .to_string(), |
1090 | }); |
1091 | } |
1092 | sw_params.set_avail_min(period as alsa::pcm::Frames)?; |
1093 | |
1094 | let start_threshold = match stream_type { |
1095 | alsa::Direction::Playback => buffer - period, |
1096 | |
1097 | // For capture streams, the start threshold is irrelevant and ignored, |
1098 | // because build_stream_inner() starts the stream before process_input() |
1099 | // reads from it. Set it anyway I guess, since it's better than leaving |
1100 | // it at an unspecified default value. |
1101 | alsa::Direction::Capture => 1, |
1102 | }; |
1103 | sw_params.set_start_threshold(start_threshold.try_into().unwrap())?; |
1104 | |
1105 | period as usize * config.channels as usize |
1106 | }; |
1107 | |
1108 | sw_params.set_tstamp_mode(true)?; |
1109 | sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?; |
1110 | |
1111 | // tstamp_type param cannot be changed after the device is opened. |
1112 | // The default tstamp_type value on most Linux systems is "monotonic", |
1113 | // let's try to use it if setting the tstamp_type fails. |
1114 | if pcm_handle.sw_params(&sw_params).is_err() { |
1115 | sw_params.set_tstamp_type(alsa::pcm::TstampType::Monotonic)?; |
1116 | pcm_handle.sw_params(&sw_params)?; |
1117 | } |
1118 | |
1119 | Ok(period_len) |
1120 | } |
1121 | |
1122 | impl From<alsa::Error> for BackendSpecificError { |
1123 | fn from(err: alsa::Error) -> Self { |
1124 | BackendSpecificError { |
1125 | description: err.to_string(), |
1126 | } |
1127 | } |
1128 | } |
1129 | |
1130 | impl From<alsa::Error> for BuildStreamError { |
1131 | fn from(err: alsa::Error) -> Self { |
1132 | let err: BackendSpecificError = err.into(); |
1133 | err.into() |
1134 | } |
1135 | } |
1136 | |
1137 | impl From<alsa::Error> for SupportedStreamConfigsError { |
1138 | fn from(err: alsa::Error) -> Self { |
1139 | let err: BackendSpecificError = err.into(); |
1140 | err.into() |
1141 | } |
1142 | } |
1143 | |
1144 | impl From<alsa::Error> for PlayStreamError { |
1145 | fn from(err: alsa::Error) -> Self { |
1146 | let err: BackendSpecificError = err.into(); |
1147 | err.into() |
1148 | } |
1149 | } |
1150 | |
1151 | impl From<alsa::Error> for PauseStreamError { |
1152 | fn from(err: alsa::Error) -> Self { |
1153 | let err: BackendSpecificError = err.into(); |
1154 | err.into() |
1155 | } |
1156 | } |
1157 | |
1158 | impl From<alsa::Error> for StreamError { |
1159 | fn from(err: alsa::Error) -> Self { |
1160 | let err: BackendSpecificError = err.into(); |
1161 | err.into() |
1162 | } |
1163 | } |
1164 | |