• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Provides an interface for playing and recording audio.
6 //!
7 //! When implementing an audio playback system, the `StreamSource` trait is implemented.
8 //! Implementors of this trait allow creation of `PlaybackBufferStream` objects. The
9 //! `PlaybackBufferStream` provides the actual audio buffers to be filled with audio samples. These
10 //! buffers can be filled with `write_playback_buffer`.
11 //!
12 //! Users playing audio fill the provided buffers with audio. When a `PlaybackBuffer` is dropped,
13 //! the samples written to it are committed to the `PlaybackBufferStream` it came from.
14 //!
15 //! ```
16 //! use audio_streams::{BoxError, PlaybackBuffer, SampleFormat, StreamSource, NoopStreamSource};
17 //! use std::io::Write;
18 //!
19 //! const buffer_size: usize = 120;
20 //! const num_channels: usize = 2;
21 //!
22 //! # fn main() -> std::result::Result<(), BoxError> {
23 //! let mut stream_source = NoopStreamSource::new();
24 //! let sample_format = SampleFormat::S16LE;
25 //! let frame_size = num_channels * sample_format.sample_bytes();
26 //!
27 //! let (_, mut stream) = stream_source
28 //!     .new_playback_stream(num_channels, sample_format, 48000, buffer_size)?;
29 //! // Play 10 buffers of DC.
30 //! let mut buf = Vec::new();
31 //! buf.resize(buffer_size * frame_size, 0xa5u8);
32 //! for _ in 0..10 {
33 //!     let mut copy_cb = |stream_buffer: &mut PlaybackBuffer| {
34 //!         assert_eq!(stream_buffer.write(&buf)?, buffer_size * frame_size);
35 //!         Ok(())
36 //!     };
37 //!     stream.write_playback_buffer(&mut copy_cb)?;
38 //! }
39 //! # Ok (())
40 //! # }
41 //! ```
42 pub mod async_api;
43 
44 use std::cmp::min;
45 use std::error;
46 use std::fmt;
47 use std::fmt::Display;
48 use std::io;
49 use std::io::Read;
50 use std::io::Write;
51 #[cfg(unix)]
52 use std::os::unix::io::RawFd as RawDescriptor;
53 #[cfg(windows)]
54 use std::os::windows::io::RawHandle as RawDescriptor;
55 use std::result::Result;
56 use std::str::FromStr;
57 use std::time::Duration;
58 use std::time::Instant;
59 
60 pub use async_api::AsyncStream;
61 pub use async_api::AudioStreamsExecutor;
62 use async_trait::async_trait;
63 use remain::sorted;
64 use serde::Deserialize;
65 use serde::Serialize;
66 use thiserror::Error;
67 
68 #[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize)]
69 pub enum SampleFormat {
70     U8,
71     S16LE,
72     S24LE,
73     S32LE,
74 }
75 
76 impl SampleFormat {
sample_bytes(self) -> usize77     pub fn sample_bytes(self) -> usize {
78         use SampleFormat::*;
79         match self {
80             U8 => 1,
81             S16LE => 2,
82             S24LE => 4, // Not a typo, S24_LE samples are stored in 4 byte chunks.
83             S32LE => 4,
84         }
85     }
86 }
87 
88 impl Display for SampleFormat {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result89     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
90         use SampleFormat::*;
91         match self {
92             U8 => write!(f, "Unsigned 8 bit"),
93             S16LE => write!(f, "Signed 16 bit Little Endian"),
94             S24LE => write!(f, "Signed 24 bit Little Endian"),
95             S32LE => write!(f, "Signed 32 bit Little Endian"),
96         }
97     }
98 }
99 
100 impl FromStr for SampleFormat {
101     type Err = SampleFormatError;
from_str(s: &str) -> std::result::Result<Self, Self::Err>102     fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
103         match s {
104             "U8" => Ok(SampleFormat::U8),
105             "S16_LE" => Ok(SampleFormat::S16LE),
106             "S24_LE" => Ok(SampleFormat::S24LE),
107             "S32_LE" => Ok(SampleFormat::S32LE),
108             _ => Err(SampleFormatError::InvalidSampleFormat),
109         }
110     }
111 }
112 
113 /// Errors that are possible from a `SampleFormat`.
114 #[sorted]
115 #[derive(Error, Debug)]
116 pub enum SampleFormatError {
117     #[error("Must be in [U8, S16_LE, S24_LE, S32_LE]")]
118     InvalidSampleFormat,
119 }
120 
121 /// Valid directions of an audio stream.
122 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
123 pub enum StreamDirection {
124     Playback,
125     Capture,
126 }
127 
128 /// Valid effects for an audio stream.
129 #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
130 pub enum StreamEffect {
131     #[default]
132     NoEffect,
133     #[serde(alias = "aec")]
134     EchoCancellation,
135 }
136 
137 pub mod capture;
138 pub mod shm_streams;
139 
140 /// Errors that can pass across threads.
141 pub type BoxError = Box<dyn error::Error + Send + Sync>;
142 
143 /// Errors that are possible from a `StreamEffect`.
144 #[sorted]
145 #[derive(Error, Debug)]
146 pub enum StreamEffectError {
147     #[error("Must be in [EchoCancellation, aec]")]
148     InvalidEffect,
149 }
150 
151 impl FromStr for StreamEffect {
152     type Err = StreamEffectError;
from_str(s: &str) -> std::result::Result<Self, Self::Err>153     fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
154         match s {
155             "EchoCancellation" | "aec" => Ok(StreamEffect::EchoCancellation),
156             _ => Err(StreamEffectError::InvalidEffect),
157         }
158     }
159 }
160 
161 #[sorted]
162 #[derive(Error, Debug)]
163 pub enum Error {
164     #[error("Unimplemented")]
165     Unimplemented,
166 }
167 
168 /// `StreamSourceGenerator` is a trait used to abstract types that create [`StreamSource`].
169 /// It can be used when multiple types of `StreamSource` are needed.
170 pub trait StreamSourceGenerator: Sync + Send {
generate(&self) -> Result<Box<dyn StreamSource>, BoxError>171     fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError>;
172 }
173 
174 /// `StreamSource` creates streams for playback or capture of audio.
175 #[async_trait(?Send)]
176 pub trait StreamSource: Send {
177     /// Returns a stream control and buffer generator object. These are separate as the buffer
178     /// generator might want to be passed to the audio stream.
179     #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>180     fn new_playback_stream(
181         &mut self,
182         num_channels: usize,
183         format: SampleFormat,
184         frame_rate: u32,
185         buffer_size: usize,
186     ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>;
187 
188     /// Returns a stream control and async buffer generator object. These are separate as the buffer
189     /// generator might want to be passed to the audio stream.
190     #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, _num_channels: usize, _format: SampleFormat, _frame_rate: u32, _buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>191     fn new_async_playback_stream(
192         &mut self,
193         _num_channels: usize,
194         _format: SampleFormat,
195         _frame_rate: u32,
196         _buffer_size: usize,
197         _ex: &dyn AudioStreamsExecutor,
198     ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
199         Err(Box::new(Error::Unimplemented))
200     }
201 
202     /// Returns a stream control and async buffer generator object asynchronously.
203     /// Default implementation calls and blocks on `new_async_playback_stream()`.
204     #[allow(clippy::type_complexity)]
async_new_async_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>205     async fn async_new_async_playback_stream(
206         &mut self,
207         num_channels: usize,
208         format: SampleFormat,
209         frame_rate: u32,
210         buffer_size: usize,
211         ex: &dyn AudioStreamsExecutor,
212     ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
213         self.new_async_playback_stream(num_channels, format, frame_rate, buffer_size, ex)
214     }
215 
216     /// Returns a stream control and buffer generator object. These are separate as the buffer
217     /// generator might want to be passed to the audio stream.
218     /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
219     #[allow(clippy::type_complexity)]
new_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::CaptureBufferStream>, ), BoxError, >220     fn new_capture_stream(
221         &mut self,
222         num_channels: usize,
223         format: SampleFormat,
224         frame_rate: u32,
225         buffer_size: usize,
226         _effects: &[StreamEffect],
227     ) -> Result<
228         (
229             Box<dyn StreamControl>,
230             Box<dyn capture::CaptureBufferStream>,
231         ),
232         BoxError,
233     > {
234         Ok((
235             Box::new(NoopStreamControl::new()),
236             Box::new(capture::NoopCaptureStream::new(
237                 num_channels,
238                 format,
239                 frame_rate,
240                 buffer_size,
241             )),
242         ))
243     }
244 
245     /// Returns a stream control and async buffer generator object. These are separate as the buffer
246     /// generator might want to be passed to the audio stream.
247     /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
248     #[allow(clippy::type_complexity)]
new_async_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _ex: &dyn AudioStreamsExecutor, ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::AsyncCaptureBufferStream>, ), BoxError, >249     fn new_async_capture_stream(
250         &mut self,
251         num_channels: usize,
252         format: SampleFormat,
253         frame_rate: u32,
254         buffer_size: usize,
255         _effects: &[StreamEffect],
256         _ex: &dyn AudioStreamsExecutor,
257     ) -> Result<
258         (
259             Box<dyn StreamControl>,
260             Box<dyn capture::AsyncCaptureBufferStream>,
261         ),
262         BoxError,
263     > {
264         Ok((
265             Box::new(NoopStreamControl::new()),
266             Box::new(capture::NoopCaptureStream::new(
267                 num_channels,
268                 format,
269                 frame_rate,
270                 buffer_size,
271             )),
272         ))
273     }
274 
275     /// Returns a stream control and async buffer generator object asynchronously.
276     /// Default implementation calls and blocks on `new_async_capture_stream()`.
277     #[allow(clippy::type_complexity)]
async_new_async_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, effects: &[StreamEffect], ex: &dyn AudioStreamsExecutor, ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::AsyncCaptureBufferStream>, ), BoxError, >278     async fn async_new_async_capture_stream(
279         &mut self,
280         num_channels: usize,
281         format: SampleFormat,
282         frame_rate: u32,
283         buffer_size: usize,
284         effects: &[StreamEffect],
285         ex: &dyn AudioStreamsExecutor,
286     ) -> Result<
287         (
288             Box<dyn StreamControl>,
289             Box<dyn capture::AsyncCaptureBufferStream>,
290         ),
291         BoxError,
292     > {
293         self.new_async_capture_stream(num_channels, format, frame_rate, buffer_size, effects, ex)
294     }
295 
296     /// Returns any open file descriptors needed by the implementor. The FD list helps users of the
297     /// StreamSource enter Linux jails making sure not to close needed FDs.
keep_rds(&self) -> Option<Vec<RawDescriptor>>298     fn keep_rds(&self) -> Option<Vec<RawDescriptor>> {
299         None
300     }
301 }
302 
303 /// `PlaybackBufferStream` provides `PlaybackBuffer`s to fill with audio samples for playback.
304 pub trait PlaybackBufferStream: Send {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>305     fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>;
306 
307     /// Call `f` with a `PlaybackBuffer`, and trigger the buffer done call back after. `f` should
308     /// write playback data to the given `PlaybackBuffer`.
write_playback_buffer<'b, 's: 'b>( &'s mut self, f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>, ) -> Result<(), BoxError>309     fn write_playback_buffer<'b, 's: 'b>(
310         &'s mut self,
311         f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>,
312     ) -> Result<(), BoxError> {
313         let mut buf = self.next_playback_buffer()?;
314         f(&mut buf)?;
315         buf.commit();
316         Ok(())
317     }
318 }
319 
320 impl<S: PlaybackBufferStream + ?Sized> PlaybackBufferStream for &mut S {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>321     fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
322         (**self).next_playback_buffer()
323     }
324 }
325 
326 /// `PlaybackBufferStream` provides `PlaybackBuffer`s asynchronously to fill with audio samples for
327 /// playback.
328 #[async_trait(?Send)]
329 pub trait AsyncPlaybackBufferStream: Send {
next_playback_buffer<'a>( &'a mut self, _ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>330     async fn next_playback_buffer<'a>(
331         &'a mut self,
332         _ex: &dyn AudioStreamsExecutor,
333     ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>;
334 }
335 
336 #[async_trait(?Send)]
337 impl<S: AsyncPlaybackBufferStream + ?Sized> AsyncPlaybackBufferStream for &mut S {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>338     async fn next_playback_buffer<'a>(
339         &'a mut self,
340         ex: &dyn AudioStreamsExecutor,
341     ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
342         (**self).next_playback_buffer(ex).await
343     }
344 }
345 
346 /// Call `f` with a `AsyncPlaybackBuffer`, and trigger the buffer done call back after. `f` should
347 /// write playback data to the given `AsyncPlaybackBuffer`.
348 ///
349 /// This cannot be a trait method because trait methods with generic parameters are not object safe.
async_write_playback_buffer<F>( stream: &mut dyn AsyncPlaybackBufferStream, f: F, ex: &dyn AudioStreamsExecutor, ) -> Result<(), BoxError> where F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,350 pub async fn async_write_playback_buffer<F>(
351     stream: &mut dyn AsyncPlaybackBufferStream,
352     f: F,
353     ex: &dyn AudioStreamsExecutor,
354 ) -> Result<(), BoxError>
355 where
356     F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,
357 {
358     let mut buf = stream.next_playback_buffer(ex).await?;
359     f(&mut buf)?;
360     buf.commit().await;
361     Ok(())
362 }
363 
364 /// `StreamControl` provides a way to set the volume and mute states of a stream. `StreamControl`
365 /// is separate from the stream so it can be owned by a different thread if needed.
366 pub trait StreamControl: Send + Sync {
set_volume(&mut self, _scaler: f64)367     fn set_volume(&mut self, _scaler: f64) {}
set_mute(&mut self, _mute: bool)368     fn set_mute(&mut self, _mute: bool) {}
369 }
370 
371 /// `BufferCommit` is a cleanup funcion that must be called before dropping the buffer,
372 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
373 pub trait BufferCommit {
374     /// `write_playback_buffer` or `read_capture_buffer` would trigger this automatically. `nframes`
375     /// indicates the number of audio frames that were read or written to the device.
commit(&mut self, nframes: usize)376     fn commit(&mut self, nframes: usize);
377     /// `latency_bytes` the current device latency.
378     /// For playback it means how many bytes need to be consumed
379     /// before the current playback buffer will be played.
380     /// For capture it means the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32381     fn latency_bytes(&self) -> u32 {
382         0
383     }
384 }
385 
386 /// `AsyncBufferCommit` is a cleanup funcion that must be called before dropping the buffer,
387 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
388 #[async_trait(?Send)]
389 pub trait AsyncBufferCommit {
390     /// `async_write_playback_buffer` or `async_read_capture_buffer` would trigger this
391     /// automatically. `nframes` indicates the number of audio frames that were read or written to
392     /// the device.
commit(&mut self, nframes: usize)393     async fn commit(&mut self, nframes: usize);
394     /// `latency_bytes` the current device latency.
395     /// For playback it means how many bytes need to be consumed
396     /// before the current playback buffer will be played.
397     /// For capture it means the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32398     fn latency_bytes(&self) -> u32 {
399         0
400     }
401 }
402 
403 /// Errors that are possible from a `PlaybackBuffer`.
404 #[sorted]
405 #[derive(Error, Debug)]
406 pub enum PlaybackBufferError {
407     #[error("Invalid buffer length")]
408     InvalidLength,
409     #[error("Slicing of playback buffer out of bounds")]
410     SliceOutOfBounds,
411 }
412 
413 /// `AudioBuffer` is one buffer that holds buffer_size audio frames.
414 /// It is the inner data of `PlaybackBuffer` and `CaptureBuffer`.
415 struct AudioBuffer<'a> {
416     buffer: &'a mut [u8],
417     offset: usize,     // Read or Write offset in frames.
418     frame_size: usize, // Size of a frame in bytes.
419 }
420 
421 impl<'a> AudioBuffer<'a> {
422     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize423     pub fn frame_capacity(&self) -> usize {
424         self.buffer.len() / self.frame_size
425     }
426 
calc_len(&self, size: usize) -> usize427     fn calc_len(&self, size: usize) -> usize {
428         min(
429             size / self.frame_size * self.frame_size,
430             self.buffer.len() - self.offset,
431         )
432     }
433 
434     /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>435     pub fn write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
436         // only write complete frames.
437         let len = self.calc_len(size);
438         cb(&mut self.buffer[self.offset..(self.offset + len)]);
439         self.offset += len;
440         Ok(len)
441     }
442 
443     /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>444     pub fn read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
445         let len = self.calc_len(size);
446         cb(&self.buffer[self.offset..(self.offset + len)]);
447         self.offset += len;
448         Ok(len)
449     }
450 
451     /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>452     pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
453         let bytes = reader.read(&mut self.buffer[self.offset..])?;
454         self.offset += bytes;
455         Ok(bytes)
456     }
457 
458     /// Copy data to an io::Write
copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize>459     pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
460         let bytes = writer.write(&self.buffer[self.offset..])?;
461         self.offset += bytes;
462         Ok(bytes)
463     }
464 }
465 
466 impl<'a> Write for AudioBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>467     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
468         let written = (&mut self.buffer[self.offset..]).write(&buf[..buf.len()])?;
469         self.offset += written;
470         Ok(written)
471     }
472 
flush(&mut self) -> io::Result<()>473     fn flush(&mut self) -> io::Result<()> {
474         Ok(())
475     }
476 }
477 
478 impl<'a> Read for AudioBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>479     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
480         let len = buf.len() / self.frame_size * self.frame_size;
481         let written = (&mut buf[..len]).write(&self.buffer[self.offset..])?;
482         self.offset += written;
483         Ok(written)
484     }
485 }
486 
487 /// `PlaybackBuffer` is one buffer that holds buffer_size audio frames. It is used to temporarily
488 /// allow access to an audio buffer and notifes the owning stream of write completion when dropped.
489 pub struct PlaybackBuffer<'a> {
490     buffer: AudioBuffer<'a>,
491     drop: &'a mut dyn BufferCommit,
492 }
493 
494 impl<'a> PlaybackBuffer<'a> {
495     /// Creates a new `PlaybackBuffer` that holds a reference to the backing memory specified in
496     /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], drop: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: BufferCommit,497     pub fn new<F>(
498         frame_size: usize,
499         buffer: &'a mut [u8],
500         drop: &'a mut F,
501     ) -> Result<Self, PlaybackBufferError>
502     where
503         F: BufferCommit,
504     {
505         if buffer.len() % frame_size != 0 {
506             return Err(PlaybackBufferError::InvalidLength);
507         }
508 
509         Ok(PlaybackBuffer {
510             buffer: AudioBuffer {
511                 buffer,
512                 offset: 0,
513                 frame_size,
514             },
515             drop,
516         })
517     }
518 
519     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize520     pub fn frame_capacity(&self) -> usize {
521         self.buffer.frame_capacity()
522     }
523 
524     /// This triggers the commit of `BufferCommit`. This should be called after the data is copied
525     /// to the buffer.
commit(&mut self)526     pub fn commit(&mut self) {
527         self.drop
528             .commit(self.buffer.offset / self.buffer.frame_size);
529     }
530 
531     /// It returns how many bytes need to be consumed
532     /// before the current playback buffer will be played.
latency_bytes(&self) -> u32533     pub fn latency_bytes(&self) -> u32 {
534         self.drop.latency_bytes()
535     }
536 
537     /// Writes up to `size` bytes directly to this buffer inside of the given callback function
538     /// with a buffer size error check.
539     ///
540     /// TODO(b/238933737): Investigate removing this method for Windows when
541     /// switching from Ac97 to Virtio-Snd
copy_cb_with_checks<F: FnOnce(&mut [u8])>( &mut self, size: usize, cb: F, ) -> Result<(), PlaybackBufferError>542     pub fn copy_cb_with_checks<F: FnOnce(&mut [u8])>(
543         &mut self,
544         size: usize,
545         cb: F,
546     ) -> Result<(), PlaybackBufferError> {
547         // only write complete frames.
548         let len = size / self.buffer.frame_size * self.buffer.frame_size;
549         if self.buffer.offset + len > self.buffer.buffer.len() {
550             return Err(PlaybackBufferError::SliceOutOfBounds);
551         }
552         cb(&mut self.buffer.buffer[self.buffer.offset..(self.buffer.offset + len)]);
553         self.buffer.offset += len;
554         Ok(())
555     }
556 
557     /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>558     pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
559         self.buffer.write_copy_cb(size, cb)
560     }
561 }
562 
563 impl<'a> Write for PlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>564     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
565         self.buffer.write(buf)
566     }
567 
flush(&mut self) -> io::Result<()>568     fn flush(&mut self) -> io::Result<()> {
569         self.buffer.flush()
570     }
571 }
572 
573 /// `AsyncPlaybackBuffer` is the async version of `PlaybackBuffer`.
574 pub struct AsyncPlaybackBuffer<'a> {
575     buffer: AudioBuffer<'a>,
576     trigger: &'a mut dyn AsyncBufferCommit,
577 }
578 
579 impl<'a> AsyncPlaybackBuffer<'a> {
580     /// Creates a new `AsyncPlaybackBuffer` that holds a reference to the backing memory specified
581     /// in `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], trigger: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: AsyncBufferCommit,582     pub fn new<F>(
583         frame_size: usize,
584         buffer: &'a mut [u8],
585         trigger: &'a mut F,
586     ) -> Result<Self, PlaybackBufferError>
587     where
588         F: AsyncBufferCommit,
589     {
590         if buffer.len() % frame_size != 0 {
591             return Err(PlaybackBufferError::InvalidLength);
592         }
593 
594         Ok(AsyncPlaybackBuffer {
595             buffer: AudioBuffer {
596                 buffer,
597                 offset: 0,
598                 frame_size,
599             },
600             trigger,
601         })
602     }
603 
604     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize605     pub fn frame_capacity(&self) -> usize {
606         self.buffer.frame_capacity()
607     }
608 
609     /// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
610     /// copied to the buffer.
commit(&mut self)611     pub async fn commit(&mut self) {
612         self.trigger
613             .commit(self.buffer.offset / self.buffer.frame_size)
614             .await;
615     }
616 
617     /// It returns the latency in terms of bytes that the capture buffer was recorded.
latency_bytes(&self) -> u32618     pub fn latency_bytes(&self) -> u32 {
619         self.trigger.latency_bytes()
620     }
621 
622     /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>623     pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
624         self.buffer.write_copy_cb(size, cb)
625     }
626 
627     /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>628     pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
629         self.buffer.copy_from(reader)
630     }
631 }
632 
633 impl<'a> Write for AsyncPlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>634     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
635         self.buffer.write(buf)
636     }
637 
flush(&mut self) -> io::Result<()>638     fn flush(&mut self) -> io::Result<()> {
639         self.buffer.flush()
640     }
641 }
642 /// Stream that accepts playback samples but drops them.
643 pub struct NoopStream {
644     buffer: Vec<u8>,
645     frame_size: usize,
646     interval: Duration,
647     next_frame: Duration,
648     start_time: Option<Instant>,
649     buffer_drop: NoopBufferCommit,
650 }
651 
652 /// NoopStream data that is needed from the buffer complete callback.
653 struct NoopBufferCommit {
654     which_buffer: bool,
655 }
656 
657 impl BufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)658     fn commit(&mut self, _nwritten: usize) {
659         // When a buffer completes, switch to the other one.
660         self.which_buffer ^= true;
661     }
662 }
663 
664 #[async_trait(?Send)]
665 impl AsyncBufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)666     async fn commit(&mut self, _nwritten: usize) {
667         // When a buffer completes, switch to the other one.
668         self.which_buffer ^= true;
669     }
670 }
671 
672 impl NoopStream {
new( num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Self673     pub fn new(
674         num_channels: usize,
675         format: SampleFormat,
676         frame_rate: u32,
677         buffer_size: usize,
678     ) -> Self {
679         let frame_size = format.sample_bytes() * num_channels;
680         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
681         NoopStream {
682             buffer: vec![0; buffer_size * frame_size],
683             frame_size,
684             interval,
685             next_frame: interval,
686             start_time: None,
687             buffer_drop: NoopBufferCommit {
688                 which_buffer: false,
689             },
690         }
691     }
692 }
693 
694 impl PlaybackBufferStream for NoopStream {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>695     fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
696         if let Some(start_time) = self.start_time {
697             let elapsed = start_time.elapsed();
698             if elapsed < self.next_frame {
699                 std::thread::sleep(self.next_frame - elapsed);
700             }
701             self.next_frame += self.interval;
702         } else {
703             self.start_time = Some(Instant::now());
704             self.next_frame = self.interval;
705         }
706         Ok(PlaybackBuffer::new(
707             self.frame_size,
708             &mut self.buffer,
709             &mut self.buffer_drop,
710         )?)
711     }
712 }
713 
714 #[async_trait(?Send)]
715 impl AsyncPlaybackBufferStream for NoopStream {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>716     async fn next_playback_buffer<'a>(
717         &'a mut self,
718         ex: &dyn AudioStreamsExecutor,
719     ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
720         if let Some(start_time) = self.start_time {
721             let elapsed = start_time.elapsed();
722             if elapsed < self.next_frame {
723                 ex.delay(self.next_frame - elapsed).await?;
724             }
725             self.next_frame += self.interval;
726         } else {
727             self.start_time = Some(Instant::now());
728             self.next_frame = self.interval;
729         }
730         Ok(AsyncPlaybackBuffer::new(
731             self.frame_size,
732             &mut self.buffer,
733             &mut self.buffer_drop,
734         )?)
735     }
736 }
737 
738 /// No-op control for `NoopStream`s.
739 #[derive(Default)]
740 pub struct NoopStreamControl;
741 
742 impl NoopStreamControl {
new() -> Self743     pub fn new() -> Self {
744         NoopStreamControl {}
745     }
746 }
747 
748 impl StreamControl for NoopStreamControl {}
749 
750 /// Source of `NoopStream` and `NoopStreamControl` objects.
751 #[derive(Default)]
752 pub struct NoopStreamSource;
753 
754 impl NoopStreamSource {
new() -> Self755     pub fn new() -> Self {
756         NoopStreamSource {}
757     }
758 }
759 
760 impl StreamSource for NoopStreamSource {
761     #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>762     fn new_playback_stream(
763         &mut self,
764         num_channels: usize,
765         format: SampleFormat,
766         frame_rate: u32,
767         buffer_size: usize,
768     ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError> {
769         Ok((
770             Box::new(NoopStreamControl::new()),
771             Box::new(NoopStream::new(
772                 num_channels,
773                 format,
774                 frame_rate,
775                 buffer_size,
776             )),
777         ))
778     }
779 
780     #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>781     fn new_async_playback_stream(
782         &mut self,
783         num_channels: usize,
784         format: SampleFormat,
785         frame_rate: u32,
786         buffer_size: usize,
787         _ex: &dyn AudioStreamsExecutor,
788     ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
789         Ok((
790             Box::new(NoopStreamControl::new()),
791             Box::new(NoopStream::new(
792                 num_channels,
793                 format,
794                 frame_rate,
795                 buffer_size,
796             )),
797         ))
798     }
799 }
800 
801 /// `NoopStreamSourceGenerator` is a struct that implements [`StreamSourceGenerator`]
802 /// to generate [`NoopStreamSource`].
803 pub struct NoopStreamSourceGenerator;
804 
805 impl NoopStreamSourceGenerator {
new() -> Self806     pub fn new() -> Self {
807         NoopStreamSourceGenerator {}
808     }
809 }
810 
811 impl StreamSourceGenerator for NoopStreamSourceGenerator {
generate(&self) -> Result<Box<dyn StreamSource>, BoxError>812     fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError> {
813         Ok(Box::new(NoopStreamSource))
814     }
815 }
816 
817 #[cfg(test)]
818 mod tests {
819     use futures::FutureExt;
820     use io::Write;
821 
822     use super::async_api::test::TestExecutor;
823     use super::*;
824 
825     #[test]
invalid_buffer_length()826     fn invalid_buffer_length() {
827         // Playback buffers can't be created with a size that isn't divisible by the frame size.
828         let mut pb_buf = [0xa5u8; 480 * 2 * 2 + 1];
829         let mut buffer_drop = NoopBufferCommit {
830             which_buffer: false,
831         };
832         assert!(PlaybackBuffer::new(2, &mut pb_buf, &mut buffer_drop).is_err());
833     }
834 
835     #[test]
audio_buffer_copy_from()836     fn audio_buffer_copy_from() {
837         const PERIOD_SIZE: usize = 8192;
838         const NUM_CHANNELS: usize = 6;
839         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
840         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
841         let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
842         let mut aud_buf = AudioBuffer {
843             buffer: &mut dst_buf,
844             offset: 0,
845             frame_size: FRAME_SIZE,
846         };
847         aud_buf
848             .copy_from(&mut &src_buf[..])
849             .expect("all data should be copied.");
850         assert_eq!(dst_buf, src_buf);
851     }
852 
853     #[test]
audio_buffer_copy_from_repeat()854     fn audio_buffer_copy_from_repeat() {
855         const PERIOD_SIZE: usize = 8192;
856         const NUM_CHANNELS: usize = 6;
857         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
858         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
859         let mut aud_buf = AudioBuffer {
860             buffer: &mut dst_buf,
861             offset: 0,
862             frame_size: FRAME_SIZE,
863         };
864         let bytes = aud_buf
865             .copy_from(&mut io::repeat(1))
866             .expect("all data should be copied.");
867         assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
868         assert_eq!(dst_buf, [1u8; PERIOD_SIZE * FRAME_SIZE]);
869     }
870 
871     #[test]
audio_buffer_copy_to()872     fn audio_buffer_copy_to() {
873         const PERIOD_SIZE: usize = 8192;
874         const NUM_CHANNELS: usize = 6;
875         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
876         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
877         let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
878         let mut aud_buf = AudioBuffer {
879             buffer: &mut src_buf,
880             offset: 0,
881             frame_size: FRAME_SIZE,
882         };
883         aud_buf
884             .copy_to(&mut &mut dst_buf[..])
885             .expect("all data should be copied.");
886         assert_eq!(dst_buf, src_buf);
887     }
888 
889     #[test]
audio_buffer_copy_to_sink()890     fn audio_buffer_copy_to_sink() {
891         const PERIOD_SIZE: usize = 8192;
892         const NUM_CHANNELS: usize = 6;
893         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
894         let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
895         let mut aud_buf = AudioBuffer {
896             buffer: &mut src_buf,
897             offset: 0,
898             frame_size: FRAME_SIZE,
899         };
900         let bytes = aud_buf
901             .copy_to(&mut io::sink())
902             .expect("all data should be copied.");
903         assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
904     }
905 
906     #[test]
io_copy_audio_buffer()907     fn io_copy_audio_buffer() {
908         const PERIOD_SIZE: usize = 8192;
909         const NUM_CHANNELS: usize = 6;
910         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
911         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
912         let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
913         let mut aud_buf = AudioBuffer {
914             buffer: &mut dst_buf,
915             offset: 0,
916             frame_size: FRAME_SIZE,
917         };
918         io::copy(&mut &src_buf[..], &mut aud_buf).expect("all data should be copied.");
919         assert_eq!(dst_buf, src_buf);
920     }
921 
922     #[test]
commit()923     fn commit() {
924         struct TestCommit {
925             frame_count: usize,
926         }
927         impl BufferCommit for TestCommit {
928             fn commit(&mut self, nwritten: usize) {
929                 self.frame_count += nwritten;
930             }
931         }
932         let mut test_commit = TestCommit { frame_count: 0 };
933         {
934             const FRAME_SIZE: usize = 4;
935             let mut buf = [0u8; 480 * FRAME_SIZE];
936             let mut pb_buf = PlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
937             pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
938             pb_buf.commit();
939         }
940         assert_eq!(test_commit.frame_count, 480);
941     }
942 
943     #[test]
sixteen_bit_stereo()944     fn sixteen_bit_stereo() {
945         let mut server = NoopStreamSource::new();
946         let (_, mut stream) = server
947             .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
948             .unwrap();
949         let mut copy_cb = |buf: &mut PlaybackBuffer| {
950             assert_eq!(buf.buffer.frame_capacity(), 480);
951             let pb_buf = [0xa5u8; 480 * 2 * 2];
952             assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
953             Ok(())
954         };
955         stream.write_playback_buffer(&mut copy_cb).unwrap();
956     }
957 
958     #[test]
consumption_rate()959     fn consumption_rate() {
960         let mut server = NoopStreamSource::new();
961         let (_, mut stream) = server
962             .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
963             .unwrap();
964         let start = Instant::now();
965         {
966             let mut copy_cb = |buf: &mut PlaybackBuffer| {
967                 let pb_buf = [0xa5u8; 480 * 2 * 2];
968                 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
969                 Ok(())
970             };
971             stream.write_playback_buffer(&mut copy_cb).unwrap();
972         }
973         // The second call should block until the first buffer is consumed.
974         let mut assert_cb = |_: &mut PlaybackBuffer| {
975             let elapsed = start.elapsed();
976             assert!(
977                 elapsed > Duration::from_millis(10),
978                 "next_playback_buffer didn't block long enough {}",
979                 elapsed.subsec_millis()
980             );
981             Ok(())
982         };
983         stream.write_playback_buffer(&mut assert_cb).unwrap();
984     }
985 
986     #[test]
async_commit()987     fn async_commit() {
988         struct TestCommit {
989             frame_count: usize,
990         }
991         #[async_trait(?Send)]
992         impl AsyncBufferCommit for TestCommit {
993             async fn commit(&mut self, nwritten: usize) {
994                 self.frame_count += nwritten;
995             }
996         }
997         async fn this_test() {
998             let mut test_commit = TestCommit { frame_count: 0 };
999             {
1000                 const FRAME_SIZE: usize = 4;
1001                 let mut buf = [0u8; 480 * FRAME_SIZE];
1002                 let mut pb_buf =
1003                     AsyncPlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
1004                 pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
1005                 pb_buf.commit().await;
1006             }
1007             assert_eq!(test_commit.frame_count, 480);
1008         }
1009 
1010         this_test().now_or_never();
1011     }
1012 
1013     #[test]
consumption_rate_async()1014     fn consumption_rate_async() {
1015         async fn this_test(ex: &TestExecutor) {
1016             let mut server = NoopStreamSource::new();
1017             let (_, mut stream) = server
1018                 .new_async_playback_stream(2, SampleFormat::S16LE, 48000, 480, ex)
1019                 .unwrap();
1020             let start = Instant::now();
1021             {
1022                 let copy_func = |buf: &mut AsyncPlaybackBuffer| {
1023                     let pb_buf = [0xa5u8; 480 * 2 * 2];
1024                     assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
1025                     Ok(())
1026                 };
1027                 async_write_playback_buffer(&mut *stream, copy_func, ex)
1028                     .await
1029                     .unwrap();
1030             }
1031             // The second call should block until the first buffer is consumed.
1032             let assert_func = |_: &mut AsyncPlaybackBuffer| {
1033                 let elapsed = start.elapsed();
1034                 assert!(
1035                     elapsed > Duration::from_millis(10),
1036                     "write_playback_buffer didn't block long enough {}",
1037                     elapsed.subsec_millis()
1038                 );
1039                 Ok(())
1040             };
1041 
1042             async_write_playback_buffer(&mut *stream, assert_func, ex)
1043                 .await
1044                 .unwrap();
1045         }
1046 
1047         let ex = TestExecutor {};
1048         this_test(&ex).now_or_never();
1049     }
1050 
1051     #[test]
generate_noop_stream_source()1052     fn generate_noop_stream_source() {
1053         let generator: Box<dyn StreamSourceGenerator> = Box::new(NoopStreamSourceGenerator::new());
1054         generator
1055             .generate()
1056             .expect("failed to generate stream source");
1057     }
1058 }
1059