1 // Copyright 2019 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! Provides an interface for playing and recording audio.
6 //!
7 //! When implementing an audio playback system, the `StreamSource` trait is implemented.
8 //! Implementors of this trait allow creation of `PlaybackBufferStream` objects. The
9 //! `PlaybackBufferStream` provides the actual audio buffers to be filled with audio samples. These
10 //! buffers can be filled with `write_playback_buffer`.
11 //!
12 //! Users playing audio fill the provided buffers with audio. When a `PlaybackBuffer` is dropped,
13 //! the samples written to it are committed to the `PlaybackBufferStream` it came from.
14 //!
15 //! ```
16 //! use audio_streams::{BoxError, PlaybackBuffer, SampleFormat, StreamSource, NoopStreamSource};
17 //! use std::io::Write;
18 //!
19 //! const buffer_size: usize = 120;
20 //! const num_channels: usize = 2;
21 //!
22 //! # fn main() -> std::result::Result<(), BoxError> {
23 //! let mut stream_source = NoopStreamSource::new();
24 //! let sample_format = SampleFormat::S16LE;
25 //! let frame_size = num_channels * sample_format.sample_bytes();
26 //!
27 //! let (_, mut stream) = stream_source
28 //! .new_playback_stream(num_channels, sample_format, 48000, buffer_size)?;
29 //! // Play 10 buffers of DC.
30 //! let mut buf = Vec::new();
31 //! buf.resize(buffer_size * frame_size, 0xa5u8);
32 //! for _ in 0..10 {
33 //! let mut copy_cb = |stream_buffer: &mut PlaybackBuffer| {
34 //! assert_eq!(stream_buffer.write(&buf)?, buffer_size * frame_size);
35 //! Ok(())
36 //! };
37 //! stream.write_playback_buffer(&mut copy_cb)?;
38 //! }
39 //! # Ok (())
40 //! # }
41 //! ```
42 pub mod async_api;
43
44 use std::cmp::min;
45 use std::error;
46 use std::fmt;
47 use std::fmt::Display;
48 use std::io;
49 use std::io::Read;
50 use std::io::Write;
51 #[cfg(unix)]
52 use std::os::unix::io::RawFd as RawDescriptor;
53 #[cfg(windows)]
54 use std::os::windows::io::RawHandle as RawDescriptor;
55 use std::result::Result;
56 use std::str::FromStr;
57 use std::time::Duration;
58 use std::time::Instant;
59
60 pub use async_api::AsyncStream;
61 pub use async_api::AudioStreamsExecutor;
62 use async_trait::async_trait;
63 use remain::sorted;
64 use serde::Deserialize;
65 use serde::Serialize;
66 use thiserror::Error;
67
68 #[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize)]
69 pub enum SampleFormat {
70 U8,
71 S16LE,
72 S24LE,
73 S32LE,
74 }
75
76 impl SampleFormat {
sample_bytes(self) -> usize77 pub fn sample_bytes(self) -> usize {
78 use SampleFormat::*;
79 match self {
80 U8 => 1,
81 S16LE => 2,
82 S24LE => 4, // Not a typo, S24_LE samples are stored in 4 byte chunks.
83 S32LE => 4,
84 }
85 }
86 }
87
88 impl Display for SampleFormat {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result89 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
90 use SampleFormat::*;
91 match self {
92 U8 => write!(f, "Unsigned 8 bit"),
93 S16LE => write!(f, "Signed 16 bit Little Endian"),
94 S24LE => write!(f, "Signed 24 bit Little Endian"),
95 S32LE => write!(f, "Signed 32 bit Little Endian"),
96 }
97 }
98 }
99
100 impl FromStr for SampleFormat {
101 type Err = SampleFormatError;
from_str(s: &str) -> std::result::Result<Self, Self::Err>102 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
103 match s {
104 "U8" => Ok(SampleFormat::U8),
105 "S16_LE" => Ok(SampleFormat::S16LE),
106 "S24_LE" => Ok(SampleFormat::S24LE),
107 "S32_LE" => Ok(SampleFormat::S32LE),
108 _ => Err(SampleFormatError::InvalidSampleFormat),
109 }
110 }
111 }
112
113 /// Errors that are possible from a `SampleFormat`.
114 #[sorted]
115 #[derive(Error, Debug)]
116 pub enum SampleFormatError {
117 #[error("Must be in [U8, S16_LE, S24_LE, S32_LE]")]
118 InvalidSampleFormat,
119 }
120
121 /// Valid directions of an audio stream.
122 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
123 pub enum StreamDirection {
124 Playback,
125 Capture,
126 }
127
128 /// Valid effects for an audio stream.
129 #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
130 pub enum StreamEffect {
131 #[default]
132 NoEffect,
133 #[serde(alias = "aec")]
134 EchoCancellation,
135 }
136
137 pub mod capture;
138 pub mod shm_streams;
139
140 /// Errors that can pass across threads.
141 pub type BoxError = Box<dyn error::Error + Send + Sync>;
142
143 /// Errors that are possible from a `StreamEffect`.
144 #[sorted]
145 #[derive(Error, Debug)]
146 pub enum StreamEffectError {
147 #[error("Must be in [EchoCancellation, aec]")]
148 InvalidEffect,
149 }
150
151 impl FromStr for StreamEffect {
152 type Err = StreamEffectError;
from_str(s: &str) -> std::result::Result<Self, Self::Err>153 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
154 match s {
155 "EchoCancellation" | "aec" => Ok(StreamEffect::EchoCancellation),
156 _ => Err(StreamEffectError::InvalidEffect),
157 }
158 }
159 }
160
161 #[sorted]
162 #[derive(Error, Debug)]
163 pub enum Error {
164 #[error("Unimplemented")]
165 Unimplemented,
166 }
167
168 /// `StreamSourceGenerator` is a trait used to abstract types that create [`StreamSource`].
169 /// It can be used when multiple types of `StreamSource` are needed.
170 pub trait StreamSourceGenerator: Sync + Send {
generate(&self) -> Result<Box<dyn StreamSource>, BoxError>171 fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError>;
172 }
173
174 /// `StreamSource` creates streams for playback or capture of audio.
175 #[async_trait(?Send)]
176 pub trait StreamSource: Send {
177 /// Returns a stream control and buffer generator object. These are separate as the buffer
178 /// generator might want to be passed to the audio stream.
179 #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>180 fn new_playback_stream(
181 &mut self,
182 num_channels: usize,
183 format: SampleFormat,
184 frame_rate: u32,
185 buffer_size: usize,
186 ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>;
187
188 /// Returns a stream control and async buffer generator object. These are separate as the buffer
189 /// generator might want to be passed to the audio stream.
190 #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, _num_channels: usize, _format: SampleFormat, _frame_rate: u32, _buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>191 fn new_async_playback_stream(
192 &mut self,
193 _num_channels: usize,
194 _format: SampleFormat,
195 _frame_rate: u32,
196 _buffer_size: usize,
197 _ex: &dyn AudioStreamsExecutor,
198 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
199 Err(Box::new(Error::Unimplemented))
200 }
201
202 /// Returns a stream control and async buffer generator object asynchronously.
203 /// Default implementation calls and blocks on `new_async_playback_stream()`.
204 #[allow(clippy::type_complexity)]
async_new_async_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>205 async fn async_new_async_playback_stream(
206 &mut self,
207 num_channels: usize,
208 format: SampleFormat,
209 frame_rate: u32,
210 buffer_size: usize,
211 ex: &dyn AudioStreamsExecutor,
212 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
213 self.new_async_playback_stream(num_channels, format, frame_rate, buffer_size, ex)
214 }
215
216 /// Returns a stream control and buffer generator object. These are separate as the buffer
217 /// generator might want to be passed to the audio stream.
218 /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
219 #[allow(clippy::type_complexity)]
new_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::CaptureBufferStream>, ), BoxError, >220 fn new_capture_stream(
221 &mut self,
222 num_channels: usize,
223 format: SampleFormat,
224 frame_rate: u32,
225 buffer_size: usize,
226 _effects: &[StreamEffect],
227 ) -> Result<
228 (
229 Box<dyn StreamControl>,
230 Box<dyn capture::CaptureBufferStream>,
231 ),
232 BoxError,
233 > {
234 Ok((
235 Box::new(NoopStreamControl::new()),
236 Box::new(capture::NoopCaptureStream::new(
237 num_channels,
238 format,
239 frame_rate,
240 buffer_size,
241 )),
242 ))
243 }
244
245 /// Returns a stream control and async buffer generator object. These are separate as the buffer
246 /// generator might want to be passed to the audio stream.
247 /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
248 #[allow(clippy::type_complexity)]
new_async_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _ex: &dyn AudioStreamsExecutor, ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::AsyncCaptureBufferStream>, ), BoxError, >249 fn new_async_capture_stream(
250 &mut self,
251 num_channels: usize,
252 format: SampleFormat,
253 frame_rate: u32,
254 buffer_size: usize,
255 _effects: &[StreamEffect],
256 _ex: &dyn AudioStreamsExecutor,
257 ) -> Result<
258 (
259 Box<dyn StreamControl>,
260 Box<dyn capture::AsyncCaptureBufferStream>,
261 ),
262 BoxError,
263 > {
264 Ok((
265 Box::new(NoopStreamControl::new()),
266 Box::new(capture::NoopCaptureStream::new(
267 num_channels,
268 format,
269 frame_rate,
270 buffer_size,
271 )),
272 ))
273 }
274
275 /// Returns a stream control and async buffer generator object asynchronously.
276 /// Default implementation calls and blocks on `new_async_capture_stream()`.
277 #[allow(clippy::type_complexity)]
async_new_async_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, effects: &[StreamEffect], ex: &dyn AudioStreamsExecutor, ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::AsyncCaptureBufferStream>, ), BoxError, >278 async fn async_new_async_capture_stream(
279 &mut self,
280 num_channels: usize,
281 format: SampleFormat,
282 frame_rate: u32,
283 buffer_size: usize,
284 effects: &[StreamEffect],
285 ex: &dyn AudioStreamsExecutor,
286 ) -> Result<
287 (
288 Box<dyn StreamControl>,
289 Box<dyn capture::AsyncCaptureBufferStream>,
290 ),
291 BoxError,
292 > {
293 self.new_async_capture_stream(num_channels, format, frame_rate, buffer_size, effects, ex)
294 }
295
296 /// Returns any open file descriptors needed by the implementor. The FD list helps users of the
297 /// StreamSource enter Linux jails making sure not to close needed FDs.
keep_rds(&self) -> Option<Vec<RawDescriptor>>298 fn keep_rds(&self) -> Option<Vec<RawDescriptor>> {
299 None
300 }
301 }
302
303 /// `PlaybackBufferStream` provides `PlaybackBuffer`s to fill with audio samples for playback.
304 pub trait PlaybackBufferStream: Send {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>305 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>;
306
307 /// Call `f` with a `PlaybackBuffer`, and trigger the buffer done call back after. `f` should
308 /// write playback data to the given `PlaybackBuffer`.
write_playback_buffer<'b, 's: 'b>( &'s mut self, f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>, ) -> Result<(), BoxError>309 fn write_playback_buffer<'b, 's: 'b>(
310 &'s mut self,
311 f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>,
312 ) -> Result<(), BoxError> {
313 let mut buf = self.next_playback_buffer()?;
314 f(&mut buf)?;
315 buf.commit();
316 Ok(())
317 }
318 }
319
320 impl<S: PlaybackBufferStream + ?Sized> PlaybackBufferStream for &mut S {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>321 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
322 (**self).next_playback_buffer()
323 }
324 }
325
326 /// `PlaybackBufferStream` provides `PlaybackBuffer`s asynchronously to fill with audio samples for
327 /// playback.
328 #[async_trait(?Send)]
329 pub trait AsyncPlaybackBufferStream: Send {
next_playback_buffer<'a>( &'a mut self, _ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>330 async fn next_playback_buffer<'a>(
331 &'a mut self,
332 _ex: &dyn AudioStreamsExecutor,
333 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>;
334 }
335
336 #[async_trait(?Send)]
337 impl<S: AsyncPlaybackBufferStream + ?Sized> AsyncPlaybackBufferStream for &mut S {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>338 async fn next_playback_buffer<'a>(
339 &'a mut self,
340 ex: &dyn AudioStreamsExecutor,
341 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
342 (**self).next_playback_buffer(ex).await
343 }
344 }
345
346 /// Call `f` with a `AsyncPlaybackBuffer`, and trigger the buffer done call back after. `f` should
347 /// write playback data to the given `AsyncPlaybackBuffer`.
348 ///
349 /// This cannot be a trait method because trait methods with generic parameters are not object safe.
async_write_playback_buffer<F>( stream: &mut dyn AsyncPlaybackBufferStream, f: F, ex: &dyn AudioStreamsExecutor, ) -> Result<(), BoxError> where F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,350 pub async fn async_write_playback_buffer<F>(
351 stream: &mut dyn AsyncPlaybackBufferStream,
352 f: F,
353 ex: &dyn AudioStreamsExecutor,
354 ) -> Result<(), BoxError>
355 where
356 F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,
357 {
358 let mut buf = stream.next_playback_buffer(ex).await?;
359 f(&mut buf)?;
360 buf.commit().await;
361 Ok(())
362 }
363
364 /// `StreamControl` provides a way to set the volume and mute states of a stream. `StreamControl`
365 /// is separate from the stream so it can be owned by a different thread if needed.
366 pub trait StreamControl: Send + Sync {
set_volume(&mut self, _scaler: f64)367 fn set_volume(&mut self, _scaler: f64) {}
set_mute(&mut self, _mute: bool)368 fn set_mute(&mut self, _mute: bool) {}
369 }
370
371 /// `BufferCommit` is a cleanup funcion that must be called before dropping the buffer,
372 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
373 pub trait BufferCommit {
374 /// `write_playback_buffer` or `read_capture_buffer` would trigger this automatically. `nframes`
375 /// indicates the number of audio frames that were read or written to the device.
commit(&mut self, nframes: usize)376 fn commit(&mut self, nframes: usize);
377 /// `latency_bytes` the current device latency.
378 /// For playback it means how many bytes need to be consumed
379 /// before the current playback buffer will be played.
380 /// For capture it means the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32381 fn latency_bytes(&self) -> u32 {
382 0
383 }
384 }
385
386 /// `AsyncBufferCommit` is a cleanup funcion that must be called before dropping the buffer,
387 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
388 #[async_trait(?Send)]
389 pub trait AsyncBufferCommit {
390 /// `async_write_playback_buffer` or `async_read_capture_buffer` would trigger this
391 /// automatically. `nframes` indicates the number of audio frames that were read or written to
392 /// the device.
commit(&mut self, nframes: usize)393 async fn commit(&mut self, nframes: usize);
394 /// `latency_bytes` the current device latency.
395 /// For playback it means how many bytes need to be consumed
396 /// before the current playback buffer will be played.
397 /// For capture it means the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32398 fn latency_bytes(&self) -> u32 {
399 0
400 }
401 }
402
403 /// Errors that are possible from a `PlaybackBuffer`.
404 #[sorted]
405 #[derive(Error, Debug)]
406 pub enum PlaybackBufferError {
407 #[error("Invalid buffer length")]
408 InvalidLength,
409 #[error("Slicing of playback buffer out of bounds")]
410 SliceOutOfBounds,
411 }
412
413 /// `AudioBuffer` is one buffer that holds buffer_size audio frames.
414 /// It is the inner data of `PlaybackBuffer` and `CaptureBuffer`.
415 struct AudioBuffer<'a> {
416 buffer: &'a mut [u8],
417 offset: usize, // Read or Write offset in frames.
418 frame_size: usize, // Size of a frame in bytes.
419 }
420
421 impl<'a> AudioBuffer<'a> {
422 /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize423 pub fn frame_capacity(&self) -> usize {
424 self.buffer.len() / self.frame_size
425 }
426
calc_len(&self, size: usize) -> usize427 fn calc_len(&self, size: usize) -> usize {
428 min(
429 size / self.frame_size * self.frame_size,
430 self.buffer.len() - self.offset,
431 )
432 }
433
434 /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>435 pub fn write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
436 // only write complete frames.
437 let len = self.calc_len(size);
438 cb(&mut self.buffer[self.offset..(self.offset + len)]);
439 self.offset += len;
440 Ok(len)
441 }
442
443 /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>444 pub fn read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
445 let len = self.calc_len(size);
446 cb(&self.buffer[self.offset..(self.offset + len)]);
447 self.offset += len;
448 Ok(len)
449 }
450
451 /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>452 pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
453 let bytes = reader.read(&mut self.buffer[self.offset..])?;
454 self.offset += bytes;
455 Ok(bytes)
456 }
457
458 /// Copy data to an io::Write
copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize>459 pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
460 let bytes = writer.write(&self.buffer[self.offset..])?;
461 self.offset += bytes;
462 Ok(bytes)
463 }
464 }
465
466 impl<'a> Write for AudioBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>467 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
468 let written = (&mut self.buffer[self.offset..]).write(&buf[..buf.len()])?;
469 self.offset += written;
470 Ok(written)
471 }
472
flush(&mut self) -> io::Result<()>473 fn flush(&mut self) -> io::Result<()> {
474 Ok(())
475 }
476 }
477
478 impl<'a> Read for AudioBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>479 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
480 let len = buf.len() / self.frame_size * self.frame_size;
481 let written = (&mut buf[..len]).write(&self.buffer[self.offset..])?;
482 self.offset += written;
483 Ok(written)
484 }
485 }
486
487 /// `PlaybackBuffer` is one buffer that holds buffer_size audio frames. It is used to temporarily
488 /// allow access to an audio buffer and notifes the owning stream of write completion when dropped.
489 pub struct PlaybackBuffer<'a> {
490 buffer: AudioBuffer<'a>,
491 drop: &'a mut dyn BufferCommit,
492 }
493
494 impl<'a> PlaybackBuffer<'a> {
495 /// Creates a new `PlaybackBuffer` that holds a reference to the backing memory specified in
496 /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], drop: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: BufferCommit,497 pub fn new<F>(
498 frame_size: usize,
499 buffer: &'a mut [u8],
500 drop: &'a mut F,
501 ) -> Result<Self, PlaybackBufferError>
502 where
503 F: BufferCommit,
504 {
505 if buffer.len() % frame_size != 0 {
506 return Err(PlaybackBufferError::InvalidLength);
507 }
508
509 Ok(PlaybackBuffer {
510 buffer: AudioBuffer {
511 buffer,
512 offset: 0,
513 frame_size,
514 },
515 drop,
516 })
517 }
518
519 /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize520 pub fn frame_capacity(&self) -> usize {
521 self.buffer.frame_capacity()
522 }
523
524 /// This triggers the commit of `BufferCommit`. This should be called after the data is copied
525 /// to the buffer.
commit(&mut self)526 pub fn commit(&mut self) {
527 self.drop
528 .commit(self.buffer.offset / self.buffer.frame_size);
529 }
530
531 /// It returns how many bytes need to be consumed
532 /// before the current playback buffer will be played.
latency_bytes(&self) -> u32533 pub fn latency_bytes(&self) -> u32 {
534 self.drop.latency_bytes()
535 }
536
537 /// Writes up to `size` bytes directly to this buffer inside of the given callback function
538 /// with a buffer size error check.
539 ///
540 /// TODO(b/238933737): Investigate removing this method for Windows when
541 /// switching from Ac97 to Virtio-Snd
copy_cb_with_checks<F: FnOnce(&mut [u8])>( &mut self, size: usize, cb: F, ) -> Result<(), PlaybackBufferError>542 pub fn copy_cb_with_checks<F: FnOnce(&mut [u8])>(
543 &mut self,
544 size: usize,
545 cb: F,
546 ) -> Result<(), PlaybackBufferError> {
547 // only write complete frames.
548 let len = size / self.buffer.frame_size * self.buffer.frame_size;
549 if self.buffer.offset + len > self.buffer.buffer.len() {
550 return Err(PlaybackBufferError::SliceOutOfBounds);
551 }
552 cb(&mut self.buffer.buffer[self.buffer.offset..(self.buffer.offset + len)]);
553 self.buffer.offset += len;
554 Ok(())
555 }
556
557 /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>558 pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
559 self.buffer.write_copy_cb(size, cb)
560 }
561 }
562
563 impl<'a> Write for PlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>564 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
565 self.buffer.write(buf)
566 }
567
flush(&mut self) -> io::Result<()>568 fn flush(&mut self) -> io::Result<()> {
569 self.buffer.flush()
570 }
571 }
572
573 /// `AsyncPlaybackBuffer` is the async version of `PlaybackBuffer`.
574 pub struct AsyncPlaybackBuffer<'a> {
575 buffer: AudioBuffer<'a>,
576 trigger: &'a mut dyn AsyncBufferCommit,
577 }
578
579 impl<'a> AsyncPlaybackBuffer<'a> {
580 /// Creates a new `AsyncPlaybackBuffer` that holds a reference to the backing memory specified
581 /// in `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], trigger: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: AsyncBufferCommit,582 pub fn new<F>(
583 frame_size: usize,
584 buffer: &'a mut [u8],
585 trigger: &'a mut F,
586 ) -> Result<Self, PlaybackBufferError>
587 where
588 F: AsyncBufferCommit,
589 {
590 if buffer.len() % frame_size != 0 {
591 return Err(PlaybackBufferError::InvalidLength);
592 }
593
594 Ok(AsyncPlaybackBuffer {
595 buffer: AudioBuffer {
596 buffer,
597 offset: 0,
598 frame_size,
599 },
600 trigger,
601 })
602 }
603
604 /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize605 pub fn frame_capacity(&self) -> usize {
606 self.buffer.frame_capacity()
607 }
608
609 /// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
610 /// copied to the buffer.
commit(&mut self)611 pub async fn commit(&mut self) {
612 self.trigger
613 .commit(self.buffer.offset / self.buffer.frame_size)
614 .await;
615 }
616
617 /// It returns the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32618 pub fn latency_bytes(&self) -> u32 {
619 self.trigger.latency_bytes()
620 }
621
622 /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>623 pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
624 self.buffer.write_copy_cb(size, cb)
625 }
626
627 /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>628 pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
629 self.buffer.copy_from(reader)
630 }
631 }
632
633 impl<'a> Write for AsyncPlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>634 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
635 self.buffer.write(buf)
636 }
637
flush(&mut self) -> io::Result<()>638 fn flush(&mut self) -> io::Result<()> {
639 self.buffer.flush()
640 }
641 }
642 /// Stream that accepts playback samples but drops them.
643 pub struct NoopStream {
644 buffer: Vec<u8>,
645 frame_size: usize,
646 interval: Duration,
647 next_frame: Duration,
648 start_time: Option<Instant>,
649 buffer_drop: NoopBufferCommit,
650 }
651
652 /// NoopStream data that is needed from the buffer complete callback.
653 struct NoopBufferCommit {
654 which_buffer: bool,
655 }
656
657 impl BufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)658 fn commit(&mut self, _nwritten: usize) {
659 // When a buffer completes, switch to the other one.
660 self.which_buffer ^= true;
661 }
662 }
663
664 #[async_trait(?Send)]
665 impl AsyncBufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)666 async fn commit(&mut self, _nwritten: usize) {
667 // When a buffer completes, switch to the other one.
668 self.which_buffer ^= true;
669 }
670 }
671
672 impl NoopStream {
new( num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Self673 pub fn new(
674 num_channels: usize,
675 format: SampleFormat,
676 frame_rate: u32,
677 buffer_size: usize,
678 ) -> Self {
679 let frame_size = format.sample_bytes() * num_channels;
680 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
681 NoopStream {
682 buffer: vec![0; buffer_size * frame_size],
683 frame_size,
684 interval,
685 next_frame: interval,
686 start_time: None,
687 buffer_drop: NoopBufferCommit {
688 which_buffer: false,
689 },
690 }
691 }
692 }
693
694 impl PlaybackBufferStream for NoopStream {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>695 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
696 if let Some(start_time) = self.start_time {
697 let elapsed = start_time.elapsed();
698 if elapsed < self.next_frame {
699 std::thread::sleep(self.next_frame - elapsed);
700 }
701 self.next_frame += self.interval;
702 } else {
703 self.start_time = Some(Instant::now());
704 self.next_frame = self.interval;
705 }
706 Ok(PlaybackBuffer::new(
707 self.frame_size,
708 &mut self.buffer,
709 &mut self.buffer_drop,
710 )?)
711 }
712 }
713
714 #[async_trait(?Send)]
715 impl AsyncPlaybackBufferStream for NoopStream {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>716 async fn next_playback_buffer<'a>(
717 &'a mut self,
718 ex: &dyn AudioStreamsExecutor,
719 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
720 if let Some(start_time) = self.start_time {
721 let elapsed = start_time.elapsed();
722 if elapsed < self.next_frame {
723 ex.delay(self.next_frame - elapsed).await?;
724 }
725 self.next_frame += self.interval;
726 } else {
727 self.start_time = Some(Instant::now());
728 self.next_frame = self.interval;
729 }
730 Ok(AsyncPlaybackBuffer::new(
731 self.frame_size,
732 &mut self.buffer,
733 &mut self.buffer_drop,
734 )?)
735 }
736 }
737
738 /// No-op control for `NoopStream`s.
739 #[derive(Default)]
740 pub struct NoopStreamControl;
741
742 impl NoopStreamControl {
new() -> Self743 pub fn new() -> Self {
744 NoopStreamControl {}
745 }
746 }
747
748 impl StreamControl for NoopStreamControl {}
749
750 /// Source of `NoopStream` and `NoopStreamControl` objects.
751 #[derive(Default)]
752 pub struct NoopStreamSource;
753
754 impl NoopStreamSource {
new() -> Self755 pub fn new() -> Self {
756 NoopStreamSource {}
757 }
758 }
759
760 impl StreamSource for NoopStreamSource {
761 #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>762 fn new_playback_stream(
763 &mut self,
764 num_channels: usize,
765 format: SampleFormat,
766 frame_rate: u32,
767 buffer_size: usize,
768 ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError> {
769 Ok((
770 Box::new(NoopStreamControl::new()),
771 Box::new(NoopStream::new(
772 num_channels,
773 format,
774 frame_rate,
775 buffer_size,
776 )),
777 ))
778 }
779
780 #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>781 fn new_async_playback_stream(
782 &mut self,
783 num_channels: usize,
784 format: SampleFormat,
785 frame_rate: u32,
786 buffer_size: usize,
787 _ex: &dyn AudioStreamsExecutor,
788 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
789 Ok((
790 Box::new(NoopStreamControl::new()),
791 Box::new(NoopStream::new(
792 num_channels,
793 format,
794 frame_rate,
795 buffer_size,
796 )),
797 ))
798 }
799 }
800
801 /// `NoopStreamSourceGenerator` is a struct that implements [`StreamSourceGenerator`]
802 /// to generate [`NoopStreamSource`].
803 pub struct NoopStreamSourceGenerator;
804
805 impl NoopStreamSourceGenerator {
new() -> Self806 pub fn new() -> Self {
807 NoopStreamSourceGenerator {}
808 }
809 }
810
811 impl StreamSourceGenerator for NoopStreamSourceGenerator {
generate(&self) -> Result<Box<dyn StreamSource>, BoxError>812 fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError> {
813 Ok(Box::new(NoopStreamSource))
814 }
815 }
816
817 #[cfg(test)]
818 mod tests {
819 use futures::FutureExt;
820 use io::Write;
821
822 use super::async_api::test::TestExecutor;
823 use super::*;
824
825 #[test]
invalid_buffer_length()826 fn invalid_buffer_length() {
827 // Playback buffers can't be created with a size that isn't divisible by the frame size.
828 let mut pb_buf = [0xa5u8; 480 * 2 * 2 + 1];
829 let mut buffer_drop = NoopBufferCommit {
830 which_buffer: false,
831 };
832 assert!(PlaybackBuffer::new(2, &mut pb_buf, &mut buffer_drop).is_err());
833 }
834
835 #[test]
audio_buffer_copy_from()836 fn audio_buffer_copy_from() {
837 const PERIOD_SIZE: usize = 8192;
838 const NUM_CHANNELS: usize = 6;
839 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
840 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
841 let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
842 let mut aud_buf = AudioBuffer {
843 buffer: &mut dst_buf,
844 offset: 0,
845 frame_size: FRAME_SIZE,
846 };
847 aud_buf
848 .copy_from(&mut &src_buf[..])
849 .expect("all data should be copied.");
850 assert_eq!(dst_buf, src_buf);
851 }
852
853 #[test]
audio_buffer_copy_from_repeat()854 fn audio_buffer_copy_from_repeat() {
855 const PERIOD_SIZE: usize = 8192;
856 const NUM_CHANNELS: usize = 6;
857 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
858 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
859 let mut aud_buf = AudioBuffer {
860 buffer: &mut dst_buf,
861 offset: 0,
862 frame_size: FRAME_SIZE,
863 };
864 let bytes = aud_buf
865 .copy_from(&mut io::repeat(1))
866 .expect("all data should be copied.");
867 assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
868 assert_eq!(dst_buf, [1u8; PERIOD_SIZE * FRAME_SIZE]);
869 }
870
871 #[test]
audio_buffer_copy_to()872 fn audio_buffer_copy_to() {
873 const PERIOD_SIZE: usize = 8192;
874 const NUM_CHANNELS: usize = 6;
875 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
876 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
877 let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
878 let mut aud_buf = AudioBuffer {
879 buffer: &mut src_buf,
880 offset: 0,
881 frame_size: FRAME_SIZE,
882 };
883 aud_buf
884 .copy_to(&mut &mut dst_buf[..])
885 .expect("all data should be copied.");
886 assert_eq!(dst_buf, src_buf);
887 }
888
889 #[test]
audio_buffer_copy_to_sink()890 fn audio_buffer_copy_to_sink() {
891 const PERIOD_SIZE: usize = 8192;
892 const NUM_CHANNELS: usize = 6;
893 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
894 let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
895 let mut aud_buf = AudioBuffer {
896 buffer: &mut src_buf,
897 offset: 0,
898 frame_size: FRAME_SIZE,
899 };
900 let bytes = aud_buf
901 .copy_to(&mut io::sink())
902 .expect("all data should be copied.");
903 assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
904 }
905
906 #[test]
io_copy_audio_buffer()907 fn io_copy_audio_buffer() {
908 const PERIOD_SIZE: usize = 8192;
909 const NUM_CHANNELS: usize = 6;
910 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
911 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
912 let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
913 let mut aud_buf = AudioBuffer {
914 buffer: &mut dst_buf,
915 offset: 0,
916 frame_size: FRAME_SIZE,
917 };
918 io::copy(&mut &src_buf[..], &mut aud_buf).expect("all data should be copied.");
919 assert_eq!(dst_buf, src_buf);
920 }
921
922 #[test]
commit()923 fn commit() {
924 struct TestCommit {
925 frame_count: usize,
926 }
927 impl BufferCommit for TestCommit {
928 fn commit(&mut self, nwritten: usize) {
929 self.frame_count += nwritten;
930 }
931 }
932 let mut test_commit = TestCommit { frame_count: 0 };
933 {
934 const FRAME_SIZE: usize = 4;
935 let mut buf = [0u8; 480 * FRAME_SIZE];
936 let mut pb_buf = PlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
937 pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
938 pb_buf.commit();
939 }
940 assert_eq!(test_commit.frame_count, 480);
941 }
942
943 #[test]
sixteen_bit_stereo()944 fn sixteen_bit_stereo() {
945 let mut server = NoopStreamSource::new();
946 let (_, mut stream) = server
947 .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
948 .unwrap();
949 let mut copy_cb = |buf: &mut PlaybackBuffer| {
950 assert_eq!(buf.buffer.frame_capacity(), 480);
951 let pb_buf = [0xa5u8; 480 * 2 * 2];
952 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
953 Ok(())
954 };
955 stream.write_playback_buffer(&mut copy_cb).unwrap();
956 }
957
958 #[test]
consumption_rate()959 fn consumption_rate() {
960 let mut server = NoopStreamSource::new();
961 let (_, mut stream) = server
962 .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
963 .unwrap();
964 let start = Instant::now();
965 {
966 let mut copy_cb = |buf: &mut PlaybackBuffer| {
967 let pb_buf = [0xa5u8; 480 * 2 * 2];
968 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
969 Ok(())
970 };
971 stream.write_playback_buffer(&mut copy_cb).unwrap();
972 }
973 // The second call should block until the first buffer is consumed.
974 let mut assert_cb = |_: &mut PlaybackBuffer| {
975 let elapsed = start.elapsed();
976 assert!(
977 elapsed > Duration::from_millis(10),
978 "next_playback_buffer didn't block long enough {}",
979 elapsed.subsec_millis()
980 );
981 Ok(())
982 };
983 stream.write_playback_buffer(&mut assert_cb).unwrap();
984 }
985
986 #[test]
async_commit()987 fn async_commit() {
988 struct TestCommit {
989 frame_count: usize,
990 }
991 #[async_trait(?Send)]
992 impl AsyncBufferCommit for TestCommit {
993 async fn commit(&mut self, nwritten: usize) {
994 self.frame_count += nwritten;
995 }
996 }
997 async fn this_test() {
998 let mut test_commit = TestCommit { frame_count: 0 };
999 {
1000 const FRAME_SIZE: usize = 4;
1001 let mut buf = [0u8; 480 * FRAME_SIZE];
1002 let mut pb_buf =
1003 AsyncPlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
1004 pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
1005 pb_buf.commit().await;
1006 }
1007 assert_eq!(test_commit.frame_count, 480);
1008 }
1009
1010 this_test().now_or_never();
1011 }
1012
1013 #[test]
consumption_rate_async()1014 fn consumption_rate_async() {
1015 async fn this_test(ex: &TestExecutor) {
1016 let mut server = NoopStreamSource::new();
1017 let (_, mut stream) = server
1018 .new_async_playback_stream(2, SampleFormat::S16LE, 48000, 480, ex)
1019 .unwrap();
1020 let start = Instant::now();
1021 {
1022 let copy_func = |buf: &mut AsyncPlaybackBuffer| {
1023 let pb_buf = [0xa5u8; 480 * 2 * 2];
1024 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
1025 Ok(())
1026 };
1027 async_write_playback_buffer(&mut *stream, copy_func, ex)
1028 .await
1029 .unwrap();
1030 }
1031 // The second call should block until the first buffer is consumed.
1032 let assert_func = |_: &mut AsyncPlaybackBuffer| {
1033 let elapsed = start.elapsed();
1034 assert!(
1035 elapsed > Duration::from_millis(10),
1036 "write_playback_buffer didn't block long enough {}",
1037 elapsed.subsec_millis()
1038 );
1039 Ok(())
1040 };
1041
1042 async_write_playback_buffer(&mut *stream, assert_func, ex)
1043 .await
1044 .unwrap();
1045 }
1046
1047 let ex = TestExecutor {};
1048 this_test(&ex).now_or_never();
1049 }
1050
1051 #[test]
generate_noop_stream_source()1052 fn generate_noop_stream_source() {
1053 let generator: Box<dyn StreamSourceGenerator> = Box::new(NoopStreamSourceGenerator::new());
1054 generator
1055 .generate()
1056 .expect("failed to generate stream source");
1057 }
1058 }
1059