• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Provides an interface for playing and recording audio.
6 //!
7 //! When implementing an audio playback system, the `StreamSource` trait is implemented.
8 //! Implementors of this trait allow creation of `PlaybackBufferStream` objects. The
9 //! `PlaybackBufferStream` provides the actual audio buffers to be filled with audio samples. These
10 //! buffers can be filled with `write_playback_buffer`.
11 //!
12 //! Users playing audio fill the provided buffers with audio. When a `PlaybackBuffer` is dropped,
13 //! the samples written to it are committed to the `PlaybackBufferStream` it came from.
14 //!
15 //! ```
16 //! use audio_streams::{BoxError, PlaybackBuffer, SampleFormat, StreamSource, NoopStreamSource};
17 //! use std::io::Write;
18 //!
19 //! const buffer_size: usize = 120;
20 //! const num_channels: usize = 2;
21 //!
22 //! # fn main() -> std::result::Result<(), BoxError> {
23 //! let mut stream_source = NoopStreamSource::new();
24 //! let sample_format = SampleFormat::S16LE;
25 //! let frame_size = num_channels * sample_format.sample_bytes();
26 //!
27 //! let (_, mut stream) = stream_source
28 //!     .new_playback_stream(num_channels, sample_format, 48000, buffer_size)?;
29 //! // Play 10 buffers of DC.
30 //! let mut buf = Vec::new();
31 //! buf.resize(buffer_size * frame_size, 0xa5u8);
32 //! for _ in 0..10 {
33 //!     let mut copy_cb = |stream_buffer: &mut PlaybackBuffer| {
34 //!         assert_eq!(stream_buffer.write(&buf)?, buffer_size * frame_size);
35 //!         Ok(())
36 //!     };
37 //!     stream.write_playback_buffer(&mut copy_cb)?;
38 //! }
39 //! # Ok (())
40 //! # }
41 //! ```
42 pub mod async_api;
43 
44 use async_trait::async_trait;
45 use std::cmp::min;
46 use std::error;
47 use std::fmt::{self, Display};
48 use std::io::{self, Read, Write};
49 #[cfg(unix)]
50 use std::os::unix::io::RawFd;
51 use std::result::Result;
52 use std::str::FromStr;
53 use std::time::{Duration, Instant};
54 
55 pub use async_api::{AsyncStream, AudioStreamsExecutor};
56 use remain::sorted;
57 use thiserror::Error;
58 
59 #[derive(Copy, Clone, Debug, PartialEq)]
60 pub enum SampleFormat {
61     U8,
62     S16LE,
63     S24LE,
64     S32LE,
65 }
66 
67 impl SampleFormat {
sample_bytes(self) -> usize68     pub fn sample_bytes(self) -> usize {
69         use SampleFormat::*;
70         match self {
71             U8 => 1,
72             S16LE => 2,
73             S24LE => 4, // Not a typo, S24_LE samples are stored in 4 byte chunks.
74             S32LE => 4,
75         }
76     }
77 }
78 
79 impl Display for SampleFormat {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result80     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81         use SampleFormat::*;
82         match self {
83             U8 => write!(f, "Unsigned 8 bit"),
84             S16LE => write!(f, "Signed 16 bit Little Endian"),
85             S24LE => write!(f, "Signed 24 bit Little Endian"),
86             S32LE => write!(f, "Signed 32 bit Little Endian"),
87         }
88     }
89 }
90 
91 /// Valid directions of an audio stream.
92 #[derive(Copy, Clone, Debug, PartialEq)]
93 pub enum StreamDirection {
94     Playback,
95     Capture,
96 }
97 
98 /// Valid effects for an audio stream.
99 #[derive(Copy, Clone, Debug, PartialEq)]
100 pub enum StreamEffect {
101     NoEffect,
102     EchoCancellation,
103 }
104 
105 pub mod capture;
106 pub mod shm_streams;
107 
108 impl Default for StreamEffect {
default() -> Self109     fn default() -> Self {
110         StreamEffect::NoEffect
111     }
112 }
113 
114 /// Errors that can pass across threads.
115 pub type BoxError = Box<dyn error::Error + Send + Sync>;
116 
117 /// Errors that are possible from a `StreamEffect`.
118 #[sorted]
119 #[derive(Error, Debug)]
120 pub enum StreamEffectError {
121     #[error("Must be in [EchoCancellation, aec]")]
122     InvalidEffect,
123 }
124 
125 impl FromStr for StreamEffect {
126     type Err = StreamEffectError;
from_str(s: &str) -> std::result::Result<Self, Self::Err>127     fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
128         match s {
129             "EchoCancellation" | "aec" => Ok(StreamEffect::EchoCancellation),
130             _ => Err(StreamEffectError::InvalidEffect),
131         }
132     }
133 }
134 
135 #[sorted]
136 #[derive(Error, Debug)]
137 pub enum Error {
138     #[error("Unimplemented")]
139     Unimplemented,
140 }
141 
142 /// `StreamSource` creates streams for playback or capture of audio.
143 pub trait StreamSource: Send {
144     /// Returns a stream control and buffer generator object. These are separate as the buffer
145     /// generator might want to be passed to the audio stream.
146     #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>147     fn new_playback_stream(
148         &mut self,
149         num_channels: usize,
150         format: SampleFormat,
151         frame_rate: u32,
152         buffer_size: usize,
153     ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>;
154 
155     /// Returns a stream control and async buffer generator object. These are separate as the buffer
156     /// generator might want to be passed to the audio stream.
157     #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, _num_channels: usize, _format: SampleFormat, _frame_rate: u32, _buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>158     fn new_async_playback_stream(
159         &mut self,
160         _num_channels: usize,
161         _format: SampleFormat,
162         _frame_rate: u32,
163         _buffer_size: usize,
164         _ex: &dyn AudioStreamsExecutor,
165     ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
166         Err(Box::new(Error::Unimplemented))
167     }
168 
169     /// Returns a stream control and buffer generator object. These are separate as the buffer
170     /// generator might want to be passed to the audio stream.
171     /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
172     #[allow(clippy::type_complexity)]
new_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::CaptureBufferStream>, ), BoxError, >173     fn new_capture_stream(
174         &mut self,
175         num_channels: usize,
176         format: SampleFormat,
177         frame_rate: u32,
178         buffer_size: usize,
179         _effects: &[StreamEffect],
180     ) -> Result<
181         (
182             Box<dyn StreamControl>,
183             Box<dyn capture::CaptureBufferStream>,
184         ),
185         BoxError,
186     > {
187         Ok((
188             Box::new(NoopStreamControl::new()),
189             Box::new(capture::NoopCaptureStream::new(
190                 num_channels,
191                 format,
192                 frame_rate,
193                 buffer_size,
194             )),
195         ))
196     }
197 
198     /// Returns a stream control and async buffer generator object. These are separate as the buffer
199     /// generator might want to be passed to the audio stream.
200     /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
201     #[allow(clippy::type_complexity)]
new_async_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _ex: &dyn AudioStreamsExecutor, ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::AsyncCaptureBufferStream>, ), BoxError, >202     fn new_async_capture_stream(
203         &mut self,
204         num_channels: usize,
205         format: SampleFormat,
206         frame_rate: u32,
207         buffer_size: usize,
208         _effects: &[StreamEffect],
209         _ex: &dyn AudioStreamsExecutor,
210     ) -> Result<
211         (
212             Box<dyn StreamControl>,
213             Box<dyn capture::AsyncCaptureBufferStream>,
214         ),
215         BoxError,
216     > {
217         Ok((
218             Box::new(NoopStreamControl::new()),
219             Box::new(capture::NoopCaptureStream::new(
220                 num_channels,
221                 format,
222                 frame_rate,
223                 buffer_size,
224             )),
225         ))
226     }
227 
228     /// Returns any open file descriptors needed by the implementor. The FD list helps users of the
229     /// StreamSource enter Linux jails making sure not to close needed FDs.
230     #[cfg(unix)]
keep_fds(&self) -> Option<Vec<RawFd>>231     fn keep_fds(&self) -> Option<Vec<RawFd>> {
232         None
233     }
234 }
235 
236 /// `PlaybackBufferStream` provides `PlaybackBuffer`s to fill with audio samples for playback.
237 pub trait PlaybackBufferStream: Send {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>238     fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>;
239 
240     /// Call `f` with a `PlaybackBuffer`, and trigger the buffer done call back after. `f` should
241     /// write playback data to the given `PlaybackBuffer`.
write_playback_buffer<'b, 's: 'b>( &'s mut self, f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>, ) -> Result<(), BoxError>242     fn write_playback_buffer<'b, 's: 'b>(
243         &'s mut self,
244         f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>,
245     ) -> Result<(), BoxError> {
246         let mut buf = self.next_playback_buffer()?;
247         f(&mut buf)?;
248         buf.commit();
249         Ok(())
250     }
251 }
252 
253 impl<S: PlaybackBufferStream + ?Sized> PlaybackBufferStream for &mut S {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>254     fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
255         (**self).next_playback_buffer()
256     }
257 }
258 
259 /// `PlaybackBufferStream` provides `PlaybackBuffer`s asynchronously to fill with audio samples for
260 /// playback.
261 #[async_trait(?Send)]
262 pub trait AsyncPlaybackBufferStream: Send {
next_playback_buffer<'a>( &'a mut self, _ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>263     async fn next_playback_buffer<'a>(
264         &'a mut self,
265         _ex: &dyn AudioStreamsExecutor,
266     ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>;
267 }
268 
269 #[async_trait(?Send)]
270 impl<S: AsyncPlaybackBufferStream + ?Sized> AsyncPlaybackBufferStream for &mut S {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>271     async fn next_playback_buffer<'a>(
272         &'a mut self,
273         ex: &dyn AudioStreamsExecutor,
274     ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
275         (**self).next_playback_buffer(ex).await
276     }
277 }
278 
279 /// Call `f` with a `AsyncPlaybackBuffer`, and trigger the buffer done call back after. `f` should
280 /// write playback data to the given `AsyncPlaybackBuffer`.
281 ///
282 /// This cannot be a trait method because trait methods with generic parameters are not object safe.
async_write_playback_buffer<F>( stream: &mut dyn AsyncPlaybackBufferStream, f: F, ex: &dyn AudioStreamsExecutor, ) -> Result<(), BoxError> where F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,283 pub async fn async_write_playback_buffer<F>(
284     stream: &mut dyn AsyncPlaybackBufferStream,
285     f: F,
286     ex: &dyn AudioStreamsExecutor,
287 ) -> Result<(), BoxError>
288 where
289     F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,
290 {
291     let mut buf = stream.next_playback_buffer(ex).await?;
292     f(&mut buf)?;
293     buf.commit().await;
294     Ok(())
295 }
296 
297 /// `StreamControl` provides a way to set the volume and mute states of a stream. `StreamControl`
298 /// is separate from the stream so it can be owned by a different thread if needed.
299 pub trait StreamControl: Send + Sync {
set_volume(&mut self, _scaler: f64)300     fn set_volume(&mut self, _scaler: f64) {}
set_mute(&mut self, _mute: bool)301     fn set_mute(&mut self, _mute: bool) {}
302 }
303 
304 /// `BufferCommit` is a cleanup funcion that must be called before dropping the buffer,
305 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
306 pub trait BufferCommit {
307     /// `write_playback_buffer` or `read_capture_buffer` would trigger this automatically. `nframes`
308     /// indicates the number of audio frames that were read or written to the device.
commit(&mut self, nframes: usize)309     fn commit(&mut self, nframes: usize);
310 }
311 
312 /// `AsyncBufferCommit` is a cleanup funcion that must be called before dropping the buffer,
313 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
314 #[async_trait(?Send)]
315 pub trait AsyncBufferCommit {
316     /// `async_write_playback_buffer` or `async_read_capture_buffer` would trigger this
317     /// automatically. `nframes` indicates the number of audio frames that were read or written to
318     /// the device.
commit(&mut self, nframes: usize)319     async fn commit(&mut self, nframes: usize);
320 }
321 
322 /// Errors that are possible from a `PlaybackBuffer`.
323 #[sorted]
324 #[derive(Error, Debug)]
325 pub enum PlaybackBufferError {
326     #[error("Invalid buffer length")]
327     InvalidLength,
328 }
329 
330 /// `AudioBuffer` is one buffer that holds buffer_size audio frames.
331 /// It is the inner data of `PlaybackBuffer` and `CaptureBuffer`.
332 struct AudioBuffer<'a> {
333     buffer: &'a mut [u8],
334     offset: usize,     // Read or Write offset in frames.
335     frame_size: usize, // Size of a frame in bytes.
336 }
337 
338 impl<'a> AudioBuffer<'a> {
339     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize340     pub fn frame_capacity(&self) -> usize {
341         self.buffer.len() / self.frame_size
342     }
343 
calc_len(&self, size: usize) -> usize344     fn calc_len(&self, size: usize) -> usize {
345         min(
346             size / self.frame_size * self.frame_size,
347             self.buffer.len() - self.offset,
348         )
349     }
350 
351     /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>352     pub fn write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
353         // only write complete frames.
354         let len = self.calc_len(size);
355         cb(&mut self.buffer[self.offset..(self.offset + len)]);
356         self.offset += len;
357         Ok(len)
358     }
359 
360     /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>361     pub fn read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
362         let len = self.calc_len(size);
363         cb(&self.buffer[self.offset..(self.offset + len)]);
364         self.offset += len;
365         Ok(len)
366     }
367 
368     /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>369     pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
370         let bytes = reader.read(&mut self.buffer[self.offset..])?;
371         self.offset += bytes;
372         Ok(bytes)
373     }
374 
375     /// Copy data to an io::Write
copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize>376     pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
377         let bytes = writer.write(&self.buffer[self.offset..])?;
378         self.offset += bytes;
379         Ok(bytes)
380     }
381 }
382 
383 impl<'a> Write for AudioBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>384     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
385         let written = (&mut self.buffer[self.offset..]).write(&buf[..buf.len()])?;
386         self.offset += written;
387         Ok(written)
388     }
389 
flush(&mut self) -> io::Result<()>390     fn flush(&mut self) -> io::Result<()> {
391         Ok(())
392     }
393 }
394 
395 impl<'a> Read for AudioBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>396     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
397         let len = buf.len() / self.frame_size * self.frame_size;
398         let written = (&mut buf[..len]).write(&self.buffer[self.offset..])?;
399         self.offset += written;
400         Ok(written)
401     }
402 }
403 
404 /// `PlaybackBuffer` is one buffer that holds buffer_size audio frames. It is used to temporarily
405 /// allow access to an audio buffer and notifes the owning stream of write completion when dropped.
406 pub struct PlaybackBuffer<'a> {
407     buffer: AudioBuffer<'a>,
408     drop: &'a mut dyn BufferCommit,
409 }
410 
411 impl<'a> PlaybackBuffer<'a> {
412     /// Creates a new `PlaybackBuffer` that holds a reference to the backing memory specified in
413     /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], drop: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: BufferCommit,414     pub fn new<F>(
415         frame_size: usize,
416         buffer: &'a mut [u8],
417         drop: &'a mut F,
418     ) -> Result<Self, PlaybackBufferError>
419     where
420         F: BufferCommit,
421     {
422         if buffer.len() % frame_size != 0 {
423             return Err(PlaybackBufferError::InvalidLength);
424         }
425 
426         Ok(PlaybackBuffer {
427             buffer: AudioBuffer {
428                 buffer,
429                 offset: 0,
430                 frame_size,
431             },
432             drop,
433         })
434     }
435 
436     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize437     pub fn frame_capacity(&self) -> usize {
438         self.buffer.frame_capacity()
439     }
440 
441     /// This triggers the commit of `BufferCommit`. This should be called after the data is copied
442     /// to the buffer.
commit(&mut self)443     pub fn commit(&mut self) {
444         self.drop
445             .commit(self.buffer.offset / self.buffer.frame_size);
446     }
447 
448     /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>449     pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
450         self.buffer.write_copy_cb(size, cb)
451     }
452 }
453 
454 impl<'a> Write for PlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>455     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
456         self.buffer.write(buf)
457     }
458 
flush(&mut self) -> io::Result<()>459     fn flush(&mut self) -> io::Result<()> {
460         self.buffer.flush()
461     }
462 }
463 
464 /// `AsyncPlaybackBuffer` is the async version of `PlaybackBuffer`.
465 pub struct AsyncPlaybackBuffer<'a> {
466     buffer: AudioBuffer<'a>,
467     trigger: &'a mut dyn AsyncBufferCommit,
468 }
469 
470 impl<'a> AsyncPlaybackBuffer<'a> {
471     /// Creates a new `AsyncPlaybackBuffer` that holds a reference to the backing memory specified
472     /// in `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], trigger: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: AsyncBufferCommit,473     pub fn new<F>(
474         frame_size: usize,
475         buffer: &'a mut [u8],
476         trigger: &'a mut F,
477     ) -> Result<Self, PlaybackBufferError>
478     where
479         F: AsyncBufferCommit,
480     {
481         if buffer.len() % frame_size != 0 {
482             return Err(PlaybackBufferError::InvalidLength);
483         }
484 
485         Ok(AsyncPlaybackBuffer {
486             buffer: AudioBuffer {
487                 buffer,
488                 offset: 0,
489                 frame_size,
490             },
491             trigger,
492         })
493     }
494 
495     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize496     pub fn frame_capacity(&self) -> usize {
497         self.buffer.frame_capacity()
498     }
499 
500     /// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
501     /// copied to the buffer.
commit(&mut self)502     pub async fn commit(&mut self) {
503         self.trigger
504             .commit(self.buffer.offset / self.buffer.frame_size)
505             .await;
506     }
507 
508     /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>509     pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
510         self.buffer.write_copy_cb(size, cb)
511     }
512 
513     /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>514     pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
515         self.buffer.copy_from(reader)
516     }
517 }
518 
519 impl<'a> Write for AsyncPlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>520     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
521         self.buffer.write(buf)
522     }
523 
flush(&mut self) -> io::Result<()>524     fn flush(&mut self) -> io::Result<()> {
525         self.buffer.flush()
526     }
527 }
528 /// Stream that accepts playback samples but drops them.
529 pub struct NoopStream {
530     buffer: Vec<u8>,
531     frame_size: usize,
532     interval: Duration,
533     next_frame: Duration,
534     start_time: Option<Instant>,
535     buffer_drop: NoopBufferCommit,
536 }
537 
538 /// NoopStream data that is needed from the buffer complete callback.
539 struct NoopBufferCommit {
540     which_buffer: bool,
541 }
542 
543 impl BufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)544     fn commit(&mut self, _nwritten: usize) {
545         // When a buffer completes, switch to the other one.
546         self.which_buffer ^= true;
547     }
548 }
549 
550 #[async_trait(?Send)]
551 impl AsyncBufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)552     async fn commit(&mut self, _nwritten: usize) {
553         // When a buffer completes, switch to the other one.
554         self.which_buffer ^= true;
555     }
556 }
557 
558 impl NoopStream {
new( num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Self559     pub fn new(
560         num_channels: usize,
561         format: SampleFormat,
562         frame_rate: u32,
563         buffer_size: usize,
564     ) -> Self {
565         let frame_size = format.sample_bytes() * num_channels;
566         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
567         NoopStream {
568             buffer: vec![0; buffer_size * frame_size],
569             frame_size,
570             interval,
571             next_frame: interval,
572             start_time: None,
573             buffer_drop: NoopBufferCommit {
574                 which_buffer: false,
575             },
576         }
577     }
578 }
579 
580 impl PlaybackBufferStream for NoopStream {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>581     fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
582         if let Some(start_time) = self.start_time {
583             let elapsed = start_time.elapsed();
584             if elapsed < self.next_frame {
585                 std::thread::sleep(self.next_frame - elapsed);
586             }
587             self.next_frame += self.interval;
588         } else {
589             self.start_time = Some(Instant::now());
590             self.next_frame = self.interval;
591         }
592         Ok(PlaybackBuffer::new(
593             self.frame_size,
594             &mut self.buffer,
595             &mut self.buffer_drop,
596         )?)
597     }
598 }
599 
600 #[async_trait(?Send)]
601 impl AsyncPlaybackBufferStream for NoopStream {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>602     async fn next_playback_buffer<'a>(
603         &'a mut self,
604         ex: &dyn AudioStreamsExecutor,
605     ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
606         if let Some(start_time) = self.start_time {
607             let elapsed = start_time.elapsed();
608             if elapsed < self.next_frame {
609                 ex.delay(self.next_frame - elapsed).await?;
610             }
611             self.next_frame += self.interval;
612         } else {
613             self.start_time = Some(Instant::now());
614             self.next_frame = self.interval;
615         }
616         Ok(AsyncPlaybackBuffer::new(
617             self.frame_size,
618             &mut self.buffer,
619             &mut self.buffer_drop,
620         )?)
621     }
622 }
623 
624 /// No-op control for `NoopStream`s.
625 #[derive(Default)]
626 pub struct NoopStreamControl;
627 
628 impl NoopStreamControl {
new() -> Self629     pub fn new() -> Self {
630         NoopStreamControl {}
631     }
632 }
633 
634 impl StreamControl for NoopStreamControl {}
635 
636 /// Source of `NoopStream` and `NoopStreamControl` objects.
637 #[derive(Default)]
638 pub struct NoopStreamSource;
639 
640 impl NoopStreamSource {
new() -> Self641     pub fn new() -> Self {
642         NoopStreamSource {}
643     }
644 }
645 
646 impl StreamSource for NoopStreamSource {
647     #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>648     fn new_playback_stream(
649         &mut self,
650         num_channels: usize,
651         format: SampleFormat,
652         frame_rate: u32,
653         buffer_size: usize,
654     ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError> {
655         Ok((
656             Box::new(NoopStreamControl::new()),
657             Box::new(NoopStream::new(
658                 num_channels,
659                 format,
660                 frame_rate,
661                 buffer_size,
662             )),
663         ))
664     }
665 
666     #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>667     fn new_async_playback_stream(
668         &mut self,
669         num_channels: usize,
670         format: SampleFormat,
671         frame_rate: u32,
672         buffer_size: usize,
673         _ex: &dyn AudioStreamsExecutor,
674     ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
675         Ok((
676             Box::new(NoopStreamControl::new()),
677             Box::new(NoopStream::new(
678                 num_channels,
679                 format,
680                 frame_rate,
681                 buffer_size,
682             )),
683         ))
684     }
685 }
686 
687 #[cfg(test)]
688 mod tests {
689     use super::async_api::test::TestExecutor;
690     use super::*;
691     use futures::FutureExt;
692     use io::{self, Write};
693 
694     #[test]
invalid_buffer_length()695     fn invalid_buffer_length() {
696         // Playback buffers can't be created with a size that isn't divisible by the frame size.
697         let mut pb_buf = [0xa5u8; 480 * 2 * 2 + 1];
698         let mut buffer_drop = NoopBufferCommit {
699             which_buffer: false,
700         };
701         assert!(PlaybackBuffer::new(2, &mut pb_buf, &mut buffer_drop).is_err());
702     }
703 
704     #[test]
audio_buffer_copy_from()705     fn audio_buffer_copy_from() {
706         const PERIOD_SIZE: usize = 8192;
707         const NUM_CHANNELS: usize = 6;
708         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
709         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
710         let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
711         let mut aud_buf = AudioBuffer {
712             buffer: &mut dst_buf,
713             offset: 0,
714             frame_size: FRAME_SIZE,
715         };
716         aud_buf
717             .copy_from(&mut &src_buf[..])
718             .expect("all data should be copied.");
719         assert_eq!(dst_buf, src_buf);
720     }
721 
722     #[test]
audio_buffer_copy_from_repeat()723     fn audio_buffer_copy_from_repeat() {
724         const PERIOD_SIZE: usize = 8192;
725         const NUM_CHANNELS: usize = 6;
726         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
727         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
728         let mut aud_buf = AudioBuffer {
729             buffer: &mut dst_buf,
730             offset: 0,
731             frame_size: FRAME_SIZE,
732         };
733         let bytes = aud_buf
734             .copy_from(&mut io::repeat(1))
735             .expect("all data should be copied.");
736         assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
737         assert_eq!(dst_buf, [1u8; PERIOD_SIZE * FRAME_SIZE]);
738     }
739 
740     #[test]
audio_buffer_copy_to()741     fn audio_buffer_copy_to() {
742         const PERIOD_SIZE: usize = 8192;
743         const NUM_CHANNELS: usize = 6;
744         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
745         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
746         let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
747         let mut aud_buf = AudioBuffer {
748             buffer: &mut src_buf,
749             offset: 0,
750             frame_size: FRAME_SIZE,
751         };
752         aud_buf
753             .copy_to(&mut &mut dst_buf[..])
754             .expect("all data should be copied.");
755         assert_eq!(dst_buf, src_buf);
756     }
757 
758     #[test]
audio_buffer_copy_to_sink()759     fn audio_buffer_copy_to_sink() {
760         const PERIOD_SIZE: usize = 8192;
761         const NUM_CHANNELS: usize = 6;
762         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
763         let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
764         let mut aud_buf = AudioBuffer {
765             buffer: &mut src_buf,
766             offset: 0,
767             frame_size: FRAME_SIZE,
768         };
769         let bytes = aud_buf
770             .copy_to(&mut io::sink())
771             .expect("all data should be copied.");
772         assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
773     }
774 
775     #[test]
io_copy_audio_buffer()776     fn io_copy_audio_buffer() {
777         const PERIOD_SIZE: usize = 8192;
778         const NUM_CHANNELS: usize = 6;
779         const FRAME_SIZE: usize = NUM_CHANNELS * 2;
780         let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
781         let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
782         let mut aud_buf = AudioBuffer {
783             buffer: &mut dst_buf,
784             offset: 0,
785             frame_size: FRAME_SIZE,
786         };
787         io::copy(&mut &src_buf[..], &mut aud_buf).expect("all data should be copied.");
788         assert_eq!(dst_buf, src_buf);
789     }
790 
791     #[test]
commit()792     fn commit() {
793         struct TestCommit {
794             frame_count: usize,
795         }
796         impl BufferCommit for TestCommit {
797             fn commit(&mut self, nwritten: usize) {
798                 self.frame_count += nwritten;
799             }
800         }
801         let mut test_commit = TestCommit { frame_count: 0 };
802         {
803             const FRAME_SIZE: usize = 4;
804             let mut buf = [0u8; 480 * FRAME_SIZE];
805             let mut pb_buf = PlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
806             pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
807             pb_buf.commit();
808         }
809         assert_eq!(test_commit.frame_count, 480);
810     }
811 
812     #[test]
sixteen_bit_stereo()813     fn sixteen_bit_stereo() {
814         let mut server = NoopStreamSource::new();
815         let (_, mut stream) = server
816             .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
817             .unwrap();
818         let mut copy_cb = |buf: &mut PlaybackBuffer| {
819             assert_eq!(buf.buffer.frame_capacity(), 480);
820             let pb_buf = [0xa5u8; 480 * 2 * 2];
821             assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
822             Ok(())
823         };
824         stream.write_playback_buffer(&mut copy_cb).unwrap();
825     }
826 
827     #[test]
consumption_rate()828     fn consumption_rate() {
829         let mut server = NoopStreamSource::new();
830         let (_, mut stream) = server
831             .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
832             .unwrap();
833         let start = Instant::now();
834         {
835             let mut copy_cb = |buf: &mut PlaybackBuffer| {
836                 let pb_buf = [0xa5u8; 480 * 2 * 2];
837                 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
838                 Ok(())
839             };
840             stream.write_playback_buffer(&mut copy_cb).unwrap();
841         }
842         // The second call should block until the first buffer is consumed.
843         let mut assert_cb = |_: &mut PlaybackBuffer| {
844             let elapsed = start.elapsed();
845             assert!(
846                 elapsed > Duration::from_millis(10),
847                 "next_playback_buffer didn't block long enough {}",
848                 elapsed.subsec_millis()
849             );
850             Ok(())
851         };
852         stream.write_playback_buffer(&mut assert_cb).unwrap();
853     }
854 
855     #[test]
async_commit()856     fn async_commit() {
857         struct TestCommit {
858             frame_count: usize,
859         }
860         #[async_trait(?Send)]
861         impl AsyncBufferCommit for TestCommit {
862             async fn commit(&mut self, nwritten: usize) {
863                 self.frame_count += nwritten;
864             }
865         }
866         async fn this_test() {
867             let mut test_commit = TestCommit { frame_count: 0 };
868             {
869                 const FRAME_SIZE: usize = 4;
870                 let mut buf = [0u8; 480 * FRAME_SIZE];
871                 let mut pb_buf =
872                     AsyncPlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
873                 pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
874                 pb_buf.commit().await;
875             }
876             assert_eq!(test_commit.frame_count, 480);
877         }
878 
879         this_test().now_or_never();
880     }
881 
882     #[test]
consumption_rate_async()883     fn consumption_rate_async() {
884         async fn this_test(ex: &TestExecutor) {
885             let mut server = NoopStreamSource::new();
886             let (_, mut stream) = server
887                 .new_async_playback_stream(2, SampleFormat::S16LE, 48000, 480, ex)
888                 .unwrap();
889             let start = Instant::now();
890             {
891                 let copy_func = |buf: &mut AsyncPlaybackBuffer| {
892                     let pb_buf = [0xa5u8; 480 * 2 * 2];
893                     assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
894                     Ok(())
895                 };
896                 async_write_playback_buffer(&mut *stream, copy_func, ex)
897                     .await
898                     .unwrap();
899             }
900             // The second call should block until the first buffer is consumed.
901             let assert_func = |_: &mut AsyncPlaybackBuffer| {
902                 let elapsed = start.elapsed();
903                 assert!(
904                     elapsed > Duration::from_millis(10),
905                     "write_playback_buffer didn't block long enough {}",
906                     elapsed.subsec_millis()
907                 );
908                 Ok(())
909             };
910 
911             async_write_playback_buffer(&mut *stream, assert_func, ex)
912                 .await
913                 .unwrap();
914         }
915 
916         let ex = TestExecutor {};
917         this_test(&ex).now_or_never();
918     }
919 }
920