1 // Copyright 2019 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! Provides an interface for playing and recording audio.
6 //!
7 //! When implementing an audio playback system, the `StreamSource` trait is implemented.
8 //! Implementors of this trait allow creation of `PlaybackBufferStream` objects. The
9 //! `PlaybackBufferStream` provides the actual audio buffers to be filled with audio samples. These
10 //! buffers can be filled with `write_playback_buffer`.
11 //!
12 //! Users playing audio fill the provided buffers with audio. When a `PlaybackBuffer` is dropped,
13 //! the samples written to it are committed to the `PlaybackBufferStream` it came from.
14 //!
15 //! ```
16 //! use audio_streams::{BoxError, PlaybackBuffer, SampleFormat, StreamSource, NoopStreamSource};
17 //! use std::io::Write;
18 //!
19 //! const buffer_size: usize = 120;
20 //! const num_channels: usize = 2;
21 //!
22 //! # fn main() -> std::result::Result<(), BoxError> {
23 //! let mut stream_source = NoopStreamSource::new();
24 //! let sample_format = SampleFormat::S16LE;
25 //! let frame_size = num_channels * sample_format.sample_bytes();
26 //!
27 //! let (_, mut stream) = stream_source
28 //! .new_playback_stream(num_channels, sample_format, 48000, buffer_size)?;
29 //! // Play 10 buffers of DC.
30 //! let mut buf = Vec::new();
31 //! buf.resize(buffer_size * frame_size, 0xa5u8);
32 //! for _ in 0..10 {
33 //! let mut copy_cb = |stream_buffer: &mut PlaybackBuffer| {
34 //! assert_eq!(stream_buffer.write(&buf)?, buffer_size * frame_size);
35 //! Ok(())
36 //! };
37 //! stream.write_playback_buffer(&mut copy_cb)?;
38 //! }
39 //! # Ok (())
40 //! # }
41 //! ```
42 pub mod async_api;
43
44 use async_trait::async_trait;
45 use std::cmp::min;
46 use std::error;
47 use std::fmt::{self, Display};
48 use std::io::{self, Read, Write};
49 #[cfg(unix)]
50 use std::os::unix::io::RawFd;
51 use std::result::Result;
52 use std::str::FromStr;
53 use std::time::{Duration, Instant};
54
55 pub use async_api::{AsyncStream, AudioStreamsExecutor};
56 use remain::sorted;
57 use thiserror::Error;
58
59 #[derive(Copy, Clone, Debug, PartialEq)]
60 pub enum SampleFormat {
61 U8,
62 S16LE,
63 S24LE,
64 S32LE,
65 }
66
67 impl SampleFormat {
sample_bytes(self) -> usize68 pub fn sample_bytes(self) -> usize {
69 use SampleFormat::*;
70 match self {
71 U8 => 1,
72 S16LE => 2,
73 S24LE => 4, // Not a typo, S24_LE samples are stored in 4 byte chunks.
74 S32LE => 4,
75 }
76 }
77 }
78
79 impl Display for SampleFormat {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result80 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81 use SampleFormat::*;
82 match self {
83 U8 => write!(f, "Unsigned 8 bit"),
84 S16LE => write!(f, "Signed 16 bit Little Endian"),
85 S24LE => write!(f, "Signed 24 bit Little Endian"),
86 S32LE => write!(f, "Signed 32 bit Little Endian"),
87 }
88 }
89 }
90
91 /// Valid directions of an audio stream.
92 #[derive(Copy, Clone, Debug, PartialEq)]
93 pub enum StreamDirection {
94 Playback,
95 Capture,
96 }
97
98 /// Valid effects for an audio stream.
99 #[derive(Copy, Clone, Debug, PartialEq)]
100 pub enum StreamEffect {
101 NoEffect,
102 EchoCancellation,
103 }
104
105 pub mod capture;
106 pub mod shm_streams;
107
108 impl Default for StreamEffect {
default() -> Self109 fn default() -> Self {
110 StreamEffect::NoEffect
111 }
112 }
113
114 /// Errors that can pass across threads.
115 pub type BoxError = Box<dyn error::Error + Send + Sync>;
116
117 /// Errors that are possible from a `StreamEffect`.
118 #[sorted]
119 #[derive(Error, Debug)]
120 pub enum StreamEffectError {
121 #[error("Must be in [EchoCancellation, aec]")]
122 InvalidEffect,
123 }
124
125 impl FromStr for StreamEffect {
126 type Err = StreamEffectError;
from_str(s: &str) -> std::result::Result<Self, Self::Err>127 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
128 match s {
129 "EchoCancellation" | "aec" => Ok(StreamEffect::EchoCancellation),
130 _ => Err(StreamEffectError::InvalidEffect),
131 }
132 }
133 }
134
135 #[sorted]
136 #[derive(Error, Debug)]
137 pub enum Error {
138 #[error("Unimplemented")]
139 Unimplemented,
140 }
141
142 /// `StreamSource` creates streams for playback or capture of audio.
143 pub trait StreamSource: Send {
144 /// Returns a stream control and buffer generator object. These are separate as the buffer
145 /// generator might want to be passed to the audio stream.
146 #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>147 fn new_playback_stream(
148 &mut self,
149 num_channels: usize,
150 format: SampleFormat,
151 frame_rate: u32,
152 buffer_size: usize,
153 ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>;
154
155 /// Returns a stream control and async buffer generator object. These are separate as the buffer
156 /// generator might want to be passed to the audio stream.
157 #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, _num_channels: usize, _format: SampleFormat, _frame_rate: u32, _buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>158 fn new_async_playback_stream(
159 &mut self,
160 _num_channels: usize,
161 _format: SampleFormat,
162 _frame_rate: u32,
163 _buffer_size: usize,
164 _ex: &dyn AudioStreamsExecutor,
165 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
166 Err(Box::new(Error::Unimplemented))
167 }
168
169 /// Returns a stream control and buffer generator object. These are separate as the buffer
170 /// generator might want to be passed to the audio stream.
171 /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
172 #[allow(clippy::type_complexity)]
new_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::CaptureBufferStream>, ), BoxError, >173 fn new_capture_stream(
174 &mut self,
175 num_channels: usize,
176 format: SampleFormat,
177 frame_rate: u32,
178 buffer_size: usize,
179 _effects: &[StreamEffect],
180 ) -> Result<
181 (
182 Box<dyn StreamControl>,
183 Box<dyn capture::CaptureBufferStream>,
184 ),
185 BoxError,
186 > {
187 Ok((
188 Box::new(NoopStreamControl::new()),
189 Box::new(capture::NoopCaptureStream::new(
190 num_channels,
191 format,
192 frame_rate,
193 buffer_size,
194 )),
195 ))
196 }
197
198 /// Returns a stream control and async buffer generator object. These are separate as the buffer
199 /// generator might want to be passed to the audio stream.
200 /// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
201 #[allow(clippy::type_complexity)]
new_async_capture_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _ex: &dyn AudioStreamsExecutor, ) -> Result< ( Box<dyn StreamControl>, Box<dyn capture::AsyncCaptureBufferStream>, ), BoxError, >202 fn new_async_capture_stream(
203 &mut self,
204 num_channels: usize,
205 format: SampleFormat,
206 frame_rate: u32,
207 buffer_size: usize,
208 _effects: &[StreamEffect],
209 _ex: &dyn AudioStreamsExecutor,
210 ) -> Result<
211 (
212 Box<dyn StreamControl>,
213 Box<dyn capture::AsyncCaptureBufferStream>,
214 ),
215 BoxError,
216 > {
217 Ok((
218 Box::new(NoopStreamControl::new()),
219 Box::new(capture::NoopCaptureStream::new(
220 num_channels,
221 format,
222 frame_rate,
223 buffer_size,
224 )),
225 ))
226 }
227
228 /// Returns any open file descriptors needed by the implementor. The FD list helps users of the
229 /// StreamSource enter Linux jails making sure not to close needed FDs.
230 #[cfg(unix)]
keep_fds(&self) -> Option<Vec<RawFd>>231 fn keep_fds(&self) -> Option<Vec<RawFd>> {
232 None
233 }
234 }
235
236 /// `PlaybackBufferStream` provides `PlaybackBuffer`s to fill with audio samples for playback.
237 pub trait PlaybackBufferStream: Send {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>238 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>;
239
240 /// Call `f` with a `PlaybackBuffer`, and trigger the buffer done call back after. `f` should
241 /// write playback data to the given `PlaybackBuffer`.
write_playback_buffer<'b, 's: 'b>( &'s mut self, f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>, ) -> Result<(), BoxError>242 fn write_playback_buffer<'b, 's: 'b>(
243 &'s mut self,
244 f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>,
245 ) -> Result<(), BoxError> {
246 let mut buf = self.next_playback_buffer()?;
247 f(&mut buf)?;
248 buf.commit();
249 Ok(())
250 }
251 }
252
253 impl<S: PlaybackBufferStream + ?Sized> PlaybackBufferStream for &mut S {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>254 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
255 (**self).next_playback_buffer()
256 }
257 }
258
259 /// `PlaybackBufferStream` provides `PlaybackBuffer`s asynchronously to fill with audio samples for
260 /// playback.
261 #[async_trait(?Send)]
262 pub trait AsyncPlaybackBufferStream: Send {
next_playback_buffer<'a>( &'a mut self, _ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>263 async fn next_playback_buffer<'a>(
264 &'a mut self,
265 _ex: &dyn AudioStreamsExecutor,
266 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>;
267 }
268
269 #[async_trait(?Send)]
270 impl<S: AsyncPlaybackBufferStream + ?Sized> AsyncPlaybackBufferStream for &mut S {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>271 async fn next_playback_buffer<'a>(
272 &'a mut self,
273 ex: &dyn AudioStreamsExecutor,
274 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
275 (**self).next_playback_buffer(ex).await
276 }
277 }
278
279 /// Call `f` with a `AsyncPlaybackBuffer`, and trigger the buffer done call back after. `f` should
280 /// write playback data to the given `AsyncPlaybackBuffer`.
281 ///
282 /// This cannot be a trait method because trait methods with generic parameters are not object safe.
async_write_playback_buffer<F>( stream: &mut dyn AsyncPlaybackBufferStream, f: F, ex: &dyn AudioStreamsExecutor, ) -> Result<(), BoxError> where F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,283 pub async fn async_write_playback_buffer<F>(
284 stream: &mut dyn AsyncPlaybackBufferStream,
285 f: F,
286 ex: &dyn AudioStreamsExecutor,
287 ) -> Result<(), BoxError>
288 where
289 F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,
290 {
291 let mut buf = stream.next_playback_buffer(ex).await?;
292 f(&mut buf)?;
293 buf.commit().await;
294 Ok(())
295 }
296
297 /// `StreamControl` provides a way to set the volume and mute states of a stream. `StreamControl`
298 /// is separate from the stream so it can be owned by a different thread if needed.
299 pub trait StreamControl: Send + Sync {
set_volume(&mut self, _scaler: f64)300 fn set_volume(&mut self, _scaler: f64) {}
set_mute(&mut self, _mute: bool)301 fn set_mute(&mut self, _mute: bool) {}
302 }
303
304 /// `BufferCommit` is a cleanup funcion that must be called before dropping the buffer,
305 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
306 pub trait BufferCommit {
307 /// `write_playback_buffer` or `read_capture_buffer` would trigger this automatically. `nframes`
308 /// indicates the number of audio frames that were read or written to the device.
commit(&mut self, nframes: usize)309 fn commit(&mut self, nframes: usize);
310 }
311
312 /// `AsyncBufferCommit` is a cleanup funcion that must be called before dropping the buffer,
313 /// allowing arbitrary code to be run after the buffer is filled or read by the user.
314 #[async_trait(?Send)]
315 pub trait AsyncBufferCommit {
316 /// `async_write_playback_buffer` or `async_read_capture_buffer` would trigger this
317 /// automatically. `nframes` indicates the number of audio frames that were read or written to
318 /// the device.
commit(&mut self, nframes: usize)319 async fn commit(&mut self, nframes: usize);
320 }
321
322 /// Errors that are possible from a `PlaybackBuffer`.
323 #[sorted]
324 #[derive(Error, Debug)]
325 pub enum PlaybackBufferError {
326 #[error("Invalid buffer length")]
327 InvalidLength,
328 }
329
330 /// `AudioBuffer` is one buffer that holds buffer_size audio frames.
331 /// It is the inner data of `PlaybackBuffer` and `CaptureBuffer`.
332 struct AudioBuffer<'a> {
333 buffer: &'a mut [u8],
334 offset: usize, // Read or Write offset in frames.
335 frame_size: usize, // Size of a frame in bytes.
336 }
337
338 impl<'a> AudioBuffer<'a> {
339 /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize340 pub fn frame_capacity(&self) -> usize {
341 self.buffer.len() / self.frame_size
342 }
343
calc_len(&self, size: usize) -> usize344 fn calc_len(&self, size: usize) -> usize {
345 min(
346 size / self.frame_size * self.frame_size,
347 self.buffer.len() - self.offset,
348 )
349 }
350
351 /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>352 pub fn write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
353 // only write complete frames.
354 let len = self.calc_len(size);
355 cb(&mut self.buffer[self.offset..(self.offset + len)]);
356 self.offset += len;
357 Ok(len)
358 }
359
360 /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>361 pub fn read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
362 let len = self.calc_len(size);
363 cb(&self.buffer[self.offset..(self.offset + len)]);
364 self.offset += len;
365 Ok(len)
366 }
367
368 /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>369 pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
370 let bytes = reader.read(&mut self.buffer[self.offset..])?;
371 self.offset += bytes;
372 Ok(bytes)
373 }
374
375 /// Copy data to an io::Write
copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize>376 pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
377 let bytes = writer.write(&self.buffer[self.offset..])?;
378 self.offset += bytes;
379 Ok(bytes)
380 }
381 }
382
383 impl<'a> Write for AudioBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>384 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
385 let written = (&mut self.buffer[self.offset..]).write(&buf[..buf.len()])?;
386 self.offset += written;
387 Ok(written)
388 }
389
flush(&mut self) -> io::Result<()>390 fn flush(&mut self) -> io::Result<()> {
391 Ok(())
392 }
393 }
394
395 impl<'a> Read for AudioBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>396 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
397 let len = buf.len() / self.frame_size * self.frame_size;
398 let written = (&mut buf[..len]).write(&self.buffer[self.offset..])?;
399 self.offset += written;
400 Ok(written)
401 }
402 }
403
404 /// `PlaybackBuffer` is one buffer that holds buffer_size audio frames. It is used to temporarily
405 /// allow access to an audio buffer and notifes the owning stream of write completion when dropped.
406 pub struct PlaybackBuffer<'a> {
407 buffer: AudioBuffer<'a>,
408 drop: &'a mut dyn BufferCommit,
409 }
410
411 impl<'a> PlaybackBuffer<'a> {
412 /// Creates a new `PlaybackBuffer` that holds a reference to the backing memory specified in
413 /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], drop: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: BufferCommit,414 pub fn new<F>(
415 frame_size: usize,
416 buffer: &'a mut [u8],
417 drop: &'a mut F,
418 ) -> Result<Self, PlaybackBufferError>
419 where
420 F: BufferCommit,
421 {
422 if buffer.len() % frame_size != 0 {
423 return Err(PlaybackBufferError::InvalidLength);
424 }
425
426 Ok(PlaybackBuffer {
427 buffer: AudioBuffer {
428 buffer,
429 offset: 0,
430 frame_size,
431 },
432 drop,
433 })
434 }
435
436 /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize437 pub fn frame_capacity(&self) -> usize {
438 self.buffer.frame_capacity()
439 }
440
441 /// This triggers the commit of `BufferCommit`. This should be called after the data is copied
442 /// to the buffer.
commit(&mut self)443 pub fn commit(&mut self) {
444 self.drop
445 .commit(self.buffer.offset / self.buffer.frame_size);
446 }
447
448 /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>449 pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
450 self.buffer.write_copy_cb(size, cb)
451 }
452 }
453
454 impl<'a> Write for PlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>455 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
456 self.buffer.write(buf)
457 }
458
flush(&mut self) -> io::Result<()>459 fn flush(&mut self) -> io::Result<()> {
460 self.buffer.flush()
461 }
462 }
463
464 /// `AsyncPlaybackBuffer` is the async version of `PlaybackBuffer`.
465 pub struct AsyncPlaybackBuffer<'a> {
466 buffer: AudioBuffer<'a>,
467 trigger: &'a mut dyn AsyncBufferCommit,
468 }
469
470 impl<'a> AsyncPlaybackBuffer<'a> {
471 /// Creates a new `AsyncPlaybackBuffer` that holds a reference to the backing memory specified
472 /// in `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], trigger: &'a mut F, ) -> Result<Self, PlaybackBufferError> where F: AsyncBufferCommit,473 pub fn new<F>(
474 frame_size: usize,
475 buffer: &'a mut [u8],
476 trigger: &'a mut F,
477 ) -> Result<Self, PlaybackBufferError>
478 where
479 F: AsyncBufferCommit,
480 {
481 if buffer.len() % frame_size != 0 {
482 return Err(PlaybackBufferError::InvalidLength);
483 }
484
485 Ok(AsyncPlaybackBuffer {
486 buffer: AudioBuffer {
487 buffer,
488 offset: 0,
489 frame_size,
490 },
491 trigger,
492 })
493 }
494
495 /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize496 pub fn frame_capacity(&self) -> usize {
497 self.buffer.frame_capacity()
498 }
499
500 /// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
501 /// copied to the buffer.
commit(&mut self)502 pub async fn commit(&mut self) {
503 self.trigger
504 .commit(self.buffer.offset / self.buffer.frame_size)
505 .await;
506 }
507
508 /// Writes up to `size` bytes directly to this buffer inside of the given callback function.
copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>509 pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
510 self.buffer.write_copy_cb(size, cb)
511 }
512
513 /// Copy data from an io::Reader
copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize>514 pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
515 self.buffer.copy_from(reader)
516 }
517 }
518
519 impl<'a> Write for AsyncPlaybackBuffer<'a> {
write(&mut self, buf: &[u8]) -> io::Result<usize>520 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
521 self.buffer.write(buf)
522 }
523
flush(&mut self) -> io::Result<()>524 fn flush(&mut self) -> io::Result<()> {
525 self.buffer.flush()
526 }
527 }
528 /// Stream that accepts playback samples but drops them.
529 pub struct NoopStream {
530 buffer: Vec<u8>,
531 frame_size: usize,
532 interval: Duration,
533 next_frame: Duration,
534 start_time: Option<Instant>,
535 buffer_drop: NoopBufferCommit,
536 }
537
538 /// NoopStream data that is needed from the buffer complete callback.
539 struct NoopBufferCommit {
540 which_buffer: bool,
541 }
542
543 impl BufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)544 fn commit(&mut self, _nwritten: usize) {
545 // When a buffer completes, switch to the other one.
546 self.which_buffer ^= true;
547 }
548 }
549
550 #[async_trait(?Send)]
551 impl AsyncBufferCommit for NoopBufferCommit {
commit(&mut self, _nwritten: usize)552 async fn commit(&mut self, _nwritten: usize) {
553 // When a buffer completes, switch to the other one.
554 self.which_buffer ^= true;
555 }
556 }
557
558 impl NoopStream {
new( num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Self559 pub fn new(
560 num_channels: usize,
561 format: SampleFormat,
562 frame_rate: u32,
563 buffer_size: usize,
564 ) -> Self {
565 let frame_size = format.sample_bytes() * num_channels;
566 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
567 NoopStream {
568 buffer: vec![0; buffer_size * frame_size],
569 frame_size,
570 interval,
571 next_frame: interval,
572 start_time: None,
573 buffer_drop: NoopBufferCommit {
574 which_buffer: false,
575 },
576 }
577 }
578 }
579
580 impl PlaybackBufferStream for NoopStream {
next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>581 fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
582 if let Some(start_time) = self.start_time {
583 let elapsed = start_time.elapsed();
584 if elapsed < self.next_frame {
585 std::thread::sleep(self.next_frame - elapsed);
586 }
587 self.next_frame += self.interval;
588 } else {
589 self.start_time = Some(Instant::now());
590 self.next_frame = self.interval;
591 }
592 Ok(PlaybackBuffer::new(
593 self.frame_size,
594 &mut self.buffer,
595 &mut self.buffer_drop,
596 )?)
597 }
598 }
599
600 #[async_trait(?Send)]
601 impl AsyncPlaybackBufferStream for NoopStream {
next_playback_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncPlaybackBuffer<'a>, BoxError>602 async fn next_playback_buffer<'a>(
603 &'a mut self,
604 ex: &dyn AudioStreamsExecutor,
605 ) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
606 if let Some(start_time) = self.start_time {
607 let elapsed = start_time.elapsed();
608 if elapsed < self.next_frame {
609 ex.delay(self.next_frame - elapsed).await?;
610 }
611 self.next_frame += self.interval;
612 } else {
613 self.start_time = Some(Instant::now());
614 self.next_frame = self.interval;
615 }
616 Ok(AsyncPlaybackBuffer::new(
617 self.frame_size,
618 &mut self.buffer,
619 &mut self.buffer_drop,
620 )?)
621 }
622 }
623
624 /// No-op control for `NoopStream`s.
625 #[derive(Default)]
626 pub struct NoopStreamControl;
627
628 impl NoopStreamControl {
new() -> Self629 pub fn new() -> Self {
630 NoopStreamControl {}
631 }
632 }
633
634 impl StreamControl for NoopStreamControl {}
635
636 /// Source of `NoopStream` and `NoopStreamControl` objects.
637 #[derive(Default)]
638 pub struct NoopStreamSource;
639
640 impl NoopStreamSource {
new() -> Self641 pub fn new() -> Self {
642 NoopStreamSource {}
643 }
644 }
645
646 impl StreamSource for NoopStreamSource {
647 #[allow(clippy::type_complexity)]
new_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>648 fn new_playback_stream(
649 &mut self,
650 num_channels: usize,
651 format: SampleFormat,
652 frame_rate: u32,
653 buffer_size: usize,
654 ) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError> {
655 Ok((
656 Box::new(NoopStreamControl::new()),
657 Box::new(NoopStream::new(
658 num_channels,
659 format,
660 frame_rate,
661 buffer_size,
662 )),
663 ))
664 }
665
666 #[allow(clippy::type_complexity)]
new_async_playback_stream( &mut self, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _ex: &dyn AudioStreamsExecutor, ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError>667 fn new_async_playback_stream(
668 &mut self,
669 num_channels: usize,
670 format: SampleFormat,
671 frame_rate: u32,
672 buffer_size: usize,
673 _ex: &dyn AudioStreamsExecutor,
674 ) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
675 Ok((
676 Box::new(NoopStreamControl::new()),
677 Box::new(NoopStream::new(
678 num_channels,
679 format,
680 frame_rate,
681 buffer_size,
682 )),
683 ))
684 }
685 }
686
687 #[cfg(test)]
688 mod tests {
689 use super::async_api::test::TestExecutor;
690 use super::*;
691 use futures::FutureExt;
692 use io::{self, Write};
693
694 #[test]
invalid_buffer_length()695 fn invalid_buffer_length() {
696 // Playback buffers can't be created with a size that isn't divisible by the frame size.
697 let mut pb_buf = [0xa5u8; 480 * 2 * 2 + 1];
698 let mut buffer_drop = NoopBufferCommit {
699 which_buffer: false,
700 };
701 assert!(PlaybackBuffer::new(2, &mut pb_buf, &mut buffer_drop).is_err());
702 }
703
704 #[test]
audio_buffer_copy_from()705 fn audio_buffer_copy_from() {
706 const PERIOD_SIZE: usize = 8192;
707 const NUM_CHANNELS: usize = 6;
708 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
709 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
710 let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
711 let mut aud_buf = AudioBuffer {
712 buffer: &mut dst_buf,
713 offset: 0,
714 frame_size: FRAME_SIZE,
715 };
716 aud_buf
717 .copy_from(&mut &src_buf[..])
718 .expect("all data should be copied.");
719 assert_eq!(dst_buf, src_buf);
720 }
721
722 #[test]
audio_buffer_copy_from_repeat()723 fn audio_buffer_copy_from_repeat() {
724 const PERIOD_SIZE: usize = 8192;
725 const NUM_CHANNELS: usize = 6;
726 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
727 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
728 let mut aud_buf = AudioBuffer {
729 buffer: &mut dst_buf,
730 offset: 0,
731 frame_size: FRAME_SIZE,
732 };
733 let bytes = aud_buf
734 .copy_from(&mut io::repeat(1))
735 .expect("all data should be copied.");
736 assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
737 assert_eq!(dst_buf, [1u8; PERIOD_SIZE * FRAME_SIZE]);
738 }
739
740 #[test]
audio_buffer_copy_to()741 fn audio_buffer_copy_to() {
742 const PERIOD_SIZE: usize = 8192;
743 const NUM_CHANNELS: usize = 6;
744 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
745 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
746 let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
747 let mut aud_buf = AudioBuffer {
748 buffer: &mut src_buf,
749 offset: 0,
750 frame_size: FRAME_SIZE,
751 };
752 aud_buf
753 .copy_to(&mut &mut dst_buf[..])
754 .expect("all data should be copied.");
755 assert_eq!(dst_buf, src_buf);
756 }
757
758 #[test]
audio_buffer_copy_to_sink()759 fn audio_buffer_copy_to_sink() {
760 const PERIOD_SIZE: usize = 8192;
761 const NUM_CHANNELS: usize = 6;
762 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
763 let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
764 let mut aud_buf = AudioBuffer {
765 buffer: &mut src_buf,
766 offset: 0,
767 frame_size: FRAME_SIZE,
768 };
769 let bytes = aud_buf
770 .copy_to(&mut io::sink())
771 .expect("all data should be copied.");
772 assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
773 }
774
775 #[test]
io_copy_audio_buffer()776 fn io_copy_audio_buffer() {
777 const PERIOD_SIZE: usize = 8192;
778 const NUM_CHANNELS: usize = 6;
779 const FRAME_SIZE: usize = NUM_CHANNELS * 2;
780 let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
781 let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
782 let mut aud_buf = AudioBuffer {
783 buffer: &mut dst_buf,
784 offset: 0,
785 frame_size: FRAME_SIZE,
786 };
787 io::copy(&mut &src_buf[..], &mut aud_buf).expect("all data should be copied.");
788 assert_eq!(dst_buf, src_buf);
789 }
790
791 #[test]
commit()792 fn commit() {
793 struct TestCommit {
794 frame_count: usize,
795 }
796 impl BufferCommit for TestCommit {
797 fn commit(&mut self, nwritten: usize) {
798 self.frame_count += nwritten;
799 }
800 }
801 let mut test_commit = TestCommit { frame_count: 0 };
802 {
803 const FRAME_SIZE: usize = 4;
804 let mut buf = [0u8; 480 * FRAME_SIZE];
805 let mut pb_buf = PlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
806 pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
807 pb_buf.commit();
808 }
809 assert_eq!(test_commit.frame_count, 480);
810 }
811
812 #[test]
sixteen_bit_stereo()813 fn sixteen_bit_stereo() {
814 let mut server = NoopStreamSource::new();
815 let (_, mut stream) = server
816 .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
817 .unwrap();
818 let mut copy_cb = |buf: &mut PlaybackBuffer| {
819 assert_eq!(buf.buffer.frame_capacity(), 480);
820 let pb_buf = [0xa5u8; 480 * 2 * 2];
821 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
822 Ok(())
823 };
824 stream.write_playback_buffer(&mut copy_cb).unwrap();
825 }
826
827 #[test]
consumption_rate()828 fn consumption_rate() {
829 let mut server = NoopStreamSource::new();
830 let (_, mut stream) = server
831 .new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
832 .unwrap();
833 let start = Instant::now();
834 {
835 let mut copy_cb = |buf: &mut PlaybackBuffer| {
836 let pb_buf = [0xa5u8; 480 * 2 * 2];
837 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
838 Ok(())
839 };
840 stream.write_playback_buffer(&mut copy_cb).unwrap();
841 }
842 // The second call should block until the first buffer is consumed.
843 let mut assert_cb = |_: &mut PlaybackBuffer| {
844 let elapsed = start.elapsed();
845 assert!(
846 elapsed > Duration::from_millis(10),
847 "next_playback_buffer didn't block long enough {}",
848 elapsed.subsec_millis()
849 );
850 Ok(())
851 };
852 stream.write_playback_buffer(&mut assert_cb).unwrap();
853 }
854
855 #[test]
async_commit()856 fn async_commit() {
857 struct TestCommit {
858 frame_count: usize,
859 }
860 #[async_trait(?Send)]
861 impl AsyncBufferCommit for TestCommit {
862 async fn commit(&mut self, nwritten: usize) {
863 self.frame_count += nwritten;
864 }
865 }
866 async fn this_test() {
867 let mut test_commit = TestCommit { frame_count: 0 };
868 {
869 const FRAME_SIZE: usize = 4;
870 let mut buf = [0u8; 480 * FRAME_SIZE];
871 let mut pb_buf =
872 AsyncPlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
873 pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
874 pb_buf.commit().await;
875 }
876 assert_eq!(test_commit.frame_count, 480);
877 }
878
879 this_test().now_or_never();
880 }
881
882 #[test]
consumption_rate_async()883 fn consumption_rate_async() {
884 async fn this_test(ex: &TestExecutor) {
885 let mut server = NoopStreamSource::new();
886 let (_, mut stream) = server
887 .new_async_playback_stream(2, SampleFormat::S16LE, 48000, 480, ex)
888 .unwrap();
889 let start = Instant::now();
890 {
891 let copy_func = |buf: &mut AsyncPlaybackBuffer| {
892 let pb_buf = [0xa5u8; 480 * 2 * 2];
893 assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
894 Ok(())
895 };
896 async_write_playback_buffer(&mut *stream, copy_func, ex)
897 .await
898 .unwrap();
899 }
900 // The second call should block until the first buffer is consumed.
901 let assert_func = |_: &mut AsyncPlaybackBuffer| {
902 let elapsed = start.elapsed();
903 assert!(
904 elapsed > Duration::from_millis(10),
905 "write_playback_buffer didn't block long enough {}",
906 elapsed.subsec_millis()
907 );
908 Ok(())
909 };
910
911 async_write_playback_buffer(&mut *stream, assert_func, ex)
912 .await
913 .unwrap();
914 }
915
916 let ex = TestExecutor {};
917 this_test(&ex).now_or_never();
918 }
919 }
920