• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 //! ```
5 //! use audio_streams::{BoxError, capture::CaptureBuffer, SampleFormat, StreamSource,
6 //!     NoopStreamSource};
7 //! use std::io::Read;
8 //!
9 //! const buffer_size: usize = 120;
10 //! const num_channels: usize = 2;
11 //!
12 //! # fn main() -> std::result::Result<(),BoxError> {
13 //! let mut stream_source = NoopStreamSource::new();
14 //! let sample_format = SampleFormat::S16LE;
15 //! let frame_size = num_channels * sample_format.sample_bytes();
16 //!
17 //! let (_, mut stream) = stream_source
18 //!     .new_capture_stream(num_channels, sample_format, 48000, buffer_size, &[])?;
19 //! // Capture 10 buffers of zeros.
20 //! let mut buf = Vec::new();
21 //! buf.resize(buffer_size * frame_size, 0xa5u8);
22 //! for _ in 0..10 {
23 //!     let mut copy_func = |stream_buffer: &mut CaptureBuffer| {
24 //!         assert_eq!(stream_buffer.read(&mut buf)?, buffer_size * frame_size);
25 //!         Ok(())
26 //!     };
27 //!     stream.read_capture_buffer(&mut copy_func)?;
28 //! }
29 //! # Ok (())
30 //! # }
31 //! ```
32 
33 use async_trait::async_trait;
34 use std::{
35     io::{self, Read, Write},
36     time::{Duration, Instant},
37 };
38 
39 use super::async_api::AudioStreamsExecutor;
40 use super::{
41     AsyncBufferCommit, AudioBuffer, BoxError, BufferCommit, NoopBufferCommit, SampleFormat,
42 };
43 use remain::sorted;
44 use thiserror::Error;
45 
46 /// `CaptureBufferStream` provides `CaptureBuffer`s to read with audio samples from capture.
47 pub trait CaptureBufferStream: Send {
next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>48     fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>;
49 
50     /// Call `f` with a `CaptureBuffer`, and trigger the buffer done call back after. `f` can read
51     /// the capture data from the given `CaptureBuffer`.
read_capture_buffer<'b, 's: 'b>( &'s mut self, f: &mut dyn FnMut(&mut CaptureBuffer<'b>) -> Result<(), BoxError>, ) -> Result<(), BoxError>52     fn read_capture_buffer<'b, 's: 'b>(
53         &'s mut self,
54         f: &mut dyn FnMut(&mut CaptureBuffer<'b>) -> Result<(), BoxError>,
55     ) -> Result<(), BoxError> {
56         let mut buf = self.next_capture_buffer()?;
57         f(&mut buf)?;
58         buf.commit();
59         Ok(())
60     }
61 }
62 
63 impl<S: CaptureBufferStream + ?Sized> CaptureBufferStream for &mut S {
next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>64     fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
65         (**self).next_capture_buffer()
66     }
67 }
68 
69 #[async_trait(?Send)]
70 pub trait AsyncCaptureBufferStream: Send {
next_capture_buffer<'a>( &'a mut self, _ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncCaptureBuffer<'a>, BoxError>71     async fn next_capture_buffer<'a>(
72         &'a mut self,
73         _ex: &dyn AudioStreamsExecutor,
74     ) -> Result<AsyncCaptureBuffer<'a>, BoxError>;
75 }
76 
77 #[async_trait(?Send)]
78 impl<S: AsyncCaptureBufferStream + ?Sized> AsyncCaptureBufferStream for &mut S {
next_capture_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncCaptureBuffer<'a>, BoxError>79     async fn next_capture_buffer<'a>(
80         &'a mut self,
81         ex: &dyn AudioStreamsExecutor,
82     ) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
83         (**self).next_capture_buffer(ex).await
84     }
85 }
86 
87 /// `CaptureBuffer` contains a block of audio samples got from capture stream. It provides
88 /// temporary view to those samples and will notifies capture stream when dropped.
89 /// Note that it'll always send `buffer.len() / frame_size` to drop function when it got destroyed
90 /// since `CaptureBufferStream` assumes that users get all the samples from the buffer.
91 pub struct CaptureBuffer<'a> {
92     buffer: AudioBuffer<'a>,
93     drop: &'a mut dyn BufferCommit,
94 }
95 
96 /// Async version of 'CaptureBuffer`
97 pub struct AsyncCaptureBuffer<'a> {
98     buffer: AudioBuffer<'a>,
99     trigger: &'a mut dyn AsyncBufferCommit,
100 }
101 
102 /// Errors that are possible from a `CaptureBuffer`.
103 #[sorted]
104 #[derive(Error, Debug)]
105 pub enum CaptureBufferError {
106     #[error("Invalid buffer length")]
107     InvalidLength,
108 }
109 
110 impl<'a> CaptureBuffer<'a> {
111     /// Creates a new `CaptureBuffer` that holds a reference to the backing memory specified in
112     /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], drop: &'a mut F, ) -> Result<Self, CaptureBufferError> where F: BufferCommit,113     pub fn new<F>(
114         frame_size: usize,
115         buffer: &'a mut [u8],
116         drop: &'a mut F,
117     ) -> Result<Self, CaptureBufferError>
118     where
119         F: BufferCommit,
120     {
121         if buffer.len() % frame_size != 0 {
122             return Err(CaptureBufferError::InvalidLength);
123         }
124 
125         Ok(CaptureBuffer {
126             buffer: AudioBuffer {
127                 buffer,
128                 frame_size,
129                 offset: 0,
130             },
131             drop,
132         })
133     }
134 
135     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize136     pub fn frame_capacity(&self) -> usize {
137         self.buffer.frame_capacity()
138     }
139 
140     /// This triggers the callback of `BufferCommit`. This should be called after the data is read
141     /// from the buffer.
142     ///
143     /// Always sends `frame_capacity`.
commit(&mut self)144     pub fn commit(&mut self) {
145         self.drop.commit(self.frame_capacity());
146     }
147 
148     /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>149     pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
150         self.buffer.read_copy_cb(size, cb)
151     }
152 }
153 
154 impl<'a> Read for CaptureBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>155     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
156         self.buffer.read(buf)
157     }
158 }
159 
160 impl<'a> AsyncCaptureBuffer<'a> {
161     /// Creates a new `AsyncCaptureBuffer` that holds a reference to the backing memory specified in
162     /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], trigger: &'a mut F, ) -> Result<Self, CaptureBufferError> where F: AsyncBufferCommit,163     pub fn new<F>(
164         frame_size: usize,
165         buffer: &'a mut [u8],
166         trigger: &'a mut F,
167     ) -> Result<Self, CaptureBufferError>
168     where
169         F: AsyncBufferCommit,
170     {
171         if buffer.len() % frame_size != 0 {
172             return Err(CaptureBufferError::InvalidLength);
173         }
174 
175         Ok(AsyncCaptureBuffer {
176             buffer: AudioBuffer {
177                 buffer,
178                 frame_size,
179                 offset: 0,
180             },
181             trigger,
182         })
183     }
184 
185     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize186     pub fn frame_capacity(&self) -> usize {
187         self.buffer.frame_capacity()
188     }
189 
190     /// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
191     /// read from the buffer.
192     ///
193     /// Always sends `frame_capacity`.
commit(&mut self)194     pub async fn commit(&mut self) {
195         self.trigger.commit(self.frame_capacity()).await;
196     }
197 
198     /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>199     pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
200         self.buffer.read_copy_cb(size, cb)
201     }
202 
203     /// Copy data to an io::Write
copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize>204     pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
205         self.buffer.copy_to(writer)
206     }
207 }
208 
209 impl<'a> Read for AsyncCaptureBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>210     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
211         self.buffer.read(buf)
212     }
213 }
214 
215 /// Stream that provides null capture samples.
216 pub struct NoopCaptureStream {
217     buffer: Vec<u8>,
218     frame_size: usize,
219     interval: Duration,
220     next_frame: Duration,
221     start_time: Option<Instant>,
222     buffer_drop: NoopBufferCommit,
223 }
224 
225 impl NoopCaptureStream {
new( num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Self226     pub fn new(
227         num_channels: usize,
228         format: SampleFormat,
229         frame_rate: u32,
230         buffer_size: usize,
231     ) -> Self {
232         let frame_size = format.sample_bytes() * num_channels;
233         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
234         NoopCaptureStream {
235             buffer: vec![0; buffer_size * frame_size],
236             frame_size,
237             interval,
238             next_frame: interval,
239             start_time: None,
240             buffer_drop: NoopBufferCommit {
241                 which_buffer: false,
242             },
243         }
244     }
245 }
246 
247 impl CaptureBufferStream for NoopCaptureStream {
next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>248     fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
249         if let Some(start_time) = self.start_time {
250             let elapsed = start_time.elapsed();
251             if elapsed < self.next_frame {
252                 std::thread::sleep(self.next_frame - elapsed);
253             }
254             self.next_frame += self.interval;
255         } else {
256             self.start_time = Some(Instant::now());
257             self.next_frame = self.interval;
258         }
259         Ok(CaptureBuffer::new(
260             self.frame_size,
261             &mut self.buffer,
262             &mut self.buffer_drop,
263         )?)
264     }
265 }
266 
267 #[async_trait(?Send)]
268 impl AsyncCaptureBufferStream for NoopCaptureStream {
next_capture_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncCaptureBuffer<'a>, BoxError>269     async fn next_capture_buffer<'a>(
270         &'a mut self,
271         ex: &dyn AudioStreamsExecutor,
272     ) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
273         if let Some(start_time) = self.start_time {
274             let elapsed = start_time.elapsed();
275             if elapsed < self.next_frame {
276                 ex.delay(self.next_frame - elapsed).await?;
277             }
278             self.next_frame += self.interval;
279         } else {
280             self.start_time = Some(Instant::now());
281             self.next_frame = self.interval;
282         }
283         Ok(AsyncCaptureBuffer::new(
284             self.frame_size,
285             &mut self.buffer,
286             &mut self.buffer_drop,
287         )?)
288     }
289 }
290 
291 /// Call `f` with a `AsyncCaptureBuffer`, and trigger the buffer done call back after. `f` can read
292 /// the capture data from the given `AsyncCaptureBuffer`.
293 ///
294 /// This cannot be a trait method because trait methods with generic parameters are not object safe.
async_read_capture_buffer<F>( stream: &mut dyn AsyncCaptureBufferStream, f: F, ex: &dyn AudioStreamsExecutor, ) -> Result<(), BoxError> where F: FnOnce(&mut AsyncCaptureBuffer) -> Result<(), BoxError>,295 pub async fn async_read_capture_buffer<F>(
296     stream: &mut dyn AsyncCaptureBufferStream,
297     f: F,
298     ex: &dyn AudioStreamsExecutor,
299 ) -> Result<(), BoxError>
300 where
301     F: FnOnce(&mut AsyncCaptureBuffer) -> Result<(), BoxError>,
302 {
303     let mut buf = stream.next_capture_buffer(ex).await?;
304     f(&mut buf)?;
305     buf.commit().await;
306     Ok(())
307 }
308 
309 #[cfg(test)]
310 mod tests {
311     use super::super::async_api::test::TestExecutor;
312     use super::super::*;
313     use super::*;
314     use futures::FutureExt;
315 
316     #[test]
invalid_buffer_length()317     fn invalid_buffer_length() {
318         // Capture buffers can't be created with a size that isn't divisible by the frame size.
319         let mut cp_buf = [0xa5u8; 480 * 2 * 2 + 1];
320         let mut buffer_drop = NoopBufferCommit {
321             which_buffer: false,
322         };
323         assert!(CaptureBuffer::new(2, &mut cp_buf, &mut buffer_drop).is_err());
324     }
325 
326     #[test]
commit()327     fn commit() {
328         struct TestCommit {
329             frame_count: usize,
330         }
331         impl BufferCommit for TestCommit {
332             fn commit(&mut self, nwritten: usize) {
333                 self.frame_count += nwritten;
334             }
335         }
336         let mut test_commit = TestCommit { frame_count: 0 };
337         {
338             const FRAME_SIZE: usize = 4;
339             let mut buf = [0u8; 480 * FRAME_SIZE];
340             let mut cp_buf = CaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
341             let mut local_buf = [0u8; 240 * FRAME_SIZE];
342             assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
343             cp_buf.commit();
344         }
345         // This should be 480 no matter how many samples are read.
346         assert_eq!(test_commit.frame_count, 480);
347     }
348 
349     #[test]
sixteen_bit_stereo()350     fn sixteen_bit_stereo() {
351         let mut server = NoopStreamSource::new();
352         let (_, mut stream) = server
353             .new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
354             .unwrap();
355         let mut copy_func = |b: &mut CaptureBuffer| {
356             assert_eq!(b.buffer.frame_capacity(), 480);
357             let mut pb_buf = [0xa5u8; 480 * 2 * 2];
358             assert_eq!(b.read(&mut pb_buf).unwrap(), 480 * 2 * 2);
359             Ok(())
360         };
361         stream.read_capture_buffer(&mut copy_func).unwrap();
362     }
363 
364     #[test]
consumption_rate()365     fn consumption_rate() {
366         let mut server = NoopStreamSource::new();
367         let (_, mut stream) = server
368             .new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
369             .unwrap();
370         let start = Instant::now();
371         {
372             let mut copy_func = |b: &mut CaptureBuffer| {
373                 let mut cp_buf = [0xa5u8; 480 * 2 * 2];
374                 assert_eq!(b.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
375                 for buf in cp_buf.iter() {
376                     assert_eq!(*buf, 0, "Read samples should all be zeros.");
377                 }
378                 Ok(())
379             };
380             stream.read_capture_buffer(&mut copy_func).unwrap();
381         }
382         // The second call should block until the first buffer is consumed.
383         let mut assert_func = |_: &mut CaptureBuffer| {
384             let elapsed = start.elapsed();
385             assert!(
386                 elapsed > Duration::from_millis(10),
387                 "next_capture_buffer didn't block long enough {}",
388                 elapsed.subsec_millis()
389             );
390             Ok(())
391         };
392         stream.read_capture_buffer(&mut assert_func).unwrap();
393     }
394 
395     #[test]
async_commit()396     fn async_commit() {
397         struct TestCommit {
398             frame_count: usize,
399         }
400         #[async_trait(?Send)]
401         impl AsyncBufferCommit for TestCommit {
402             async fn commit(&mut self, nwritten: usize) {
403                 self.frame_count += nwritten;
404             }
405         }
406         async fn this_test() {
407             let mut test_commit = TestCommit { frame_count: 0 };
408             {
409                 const FRAME_SIZE: usize = 4;
410                 let mut buf = [0u8; 480 * FRAME_SIZE];
411                 let mut cp_buf =
412                     AsyncCaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
413                 let mut local_buf = [0u8; 240 * FRAME_SIZE];
414                 assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
415                 cp_buf.commit().await;
416             }
417             // This should be 480 no matter how many samples are read.
418             assert_eq!(test_commit.frame_count, 480);
419         }
420 
421         this_test().now_or_never();
422     }
423 
424     #[test]
consumption_rate_async()425     fn consumption_rate_async() {
426         async fn this_test(ex: &TestExecutor) {
427             let mut server = NoopStreamSource::new();
428             let (_, mut stream) = server
429                 .new_async_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[], ex)
430                 .unwrap();
431             let start = Instant::now();
432             {
433                 let copy_func = |buf: &mut AsyncCaptureBuffer| {
434                     let mut cp_buf = [0xa5u8; 480 * 2 * 2];
435                     assert_eq!(buf.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
436                     for buf in cp_buf.iter() {
437                         assert_eq!(*buf, 0, "Read samples should all be zeros.");
438                     }
439                     Ok(())
440                 };
441                 async_read_capture_buffer(&mut *stream, copy_func, ex)
442                     .await
443                     .unwrap();
444             }
445             // The second call should block until the first buffer is consumed.
446             let assert_func = |_: &mut AsyncCaptureBuffer| {
447                 let elapsed = start.elapsed();
448                 assert!(
449                     elapsed > Duration::from_millis(10),
450                     "write_playback_buffer didn't block long enough {}",
451                     elapsed.subsec_millis()
452                 );
453                 Ok(())
454             };
455             async_read_capture_buffer(&mut *stream, assert_func, ex)
456                 .await
457                 .unwrap();
458         }
459 
460         let ex = TestExecutor {};
461         this_test(&ex).now_or_never();
462     }
463 }
464