• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 use std::error;
5 use std::fmt;
6 use std::os::unix::io::RawFd;
7 use std::sync::Arc;
8 use std::time::{Duration, Instant};
9 
10 use sync::{Condvar, Mutex};
11 use sys_util::SharedMemory;
12 
13 use crate::{BoxError, SampleFormat, StreamDirection, StreamEffect};
14 
15 type GenericResult<T> = std::result::Result<T, BoxError>;
16 
17 /// `BufferSet` is used as a callback mechanism for `ServerRequest` objects.
18 /// It is meant to be implemented by the audio stream, allowing arbitrary code
19 /// to be run after a buffer offset and length is set.
20 pub trait BufferSet {
21     /// Called when the client sets a buffer offset and length.
22     ///
23     /// `offset` is the offset within shared memory of the buffer and `frames`
24     /// indicates the number of audio frames that can be read from or written to
25     /// the buffer.
callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>26     fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>;
27 
28     /// Called when the client ignores a request from the server.
ignore(&mut self) -> GenericResult<()>29     fn ignore(&mut self) -> GenericResult<()>;
30 }
31 
32 #[derive(Debug)]
33 pub enum Error {
34     TooManyFrames(usize, usize),
35 }
36 
37 impl error::Error for Error {}
38 
39 impl fmt::Display for Error {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result40     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41         match self {
42             Error::TooManyFrames(provided, requested) => write!(
43                 f,
44                 "Provided number of frames {} exceeds requested number of frames {}",
45                 provided, requested
46             ),
47         }
48     }
49 }
50 
51 /// `ServerRequest` represents an active request from the server for the client
52 /// to provide a buffer in shared memory to playback from or capture to.
53 pub struct ServerRequest<'a> {
54     requested_frames: usize,
55     buffer_set: &'a mut dyn BufferSet,
56 }
57 
58 impl<'a> ServerRequest<'a> {
59     /// Create a new ServerRequest object
60     ///
61     /// Create a ServerRequest object representing a request from the server
62     /// for a buffer `requested_frames` in size.
63     ///
64     /// When the client responds to this request by calling
65     /// [`set_buffer_offset_and_frames`](ServerRequest::set_buffer_offset_and_frames),
66     /// BufferSet::callback will be called on `buffer_set`.
67     ///
68     /// # Arguments
69     /// * `requested_frames` - The requested buffer size in frames.
70     /// * `buffer_set` - The object implementing the callback for when a buffer is provided.
new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self71     pub fn new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self {
72         Self {
73             requested_frames,
74             buffer_set,
75         }
76     }
77 
78     /// Get the number of frames of audio data requested by the server.
79     ///
80     /// The returned value should never be greater than the `buffer_size`
81     /// given in [`new_stream`](ShmStreamSource::new_stream).
requested_frames(&self) -> usize82     pub fn requested_frames(&self) -> usize {
83         self.requested_frames
84     }
85 
86     /// Sets the buffer offset and length for the requested buffer.
87     ///
88     /// Sets the buffer offset and length of the buffer that fulfills this
89     /// server request to `offset` and `length`, respectively. This means that
90     /// `length` bytes of audio samples may be read from/written to that
91     /// location in `client_shm` for a playback/capture stream, respectively.
92     /// This function may only be called once for a `ServerRequest`, at which
93     /// point the ServerRequest is dropped and no further calls are possible.
94     ///
95     /// # Arguments
96     ///
97     /// * `offset` - The value to use as the new buffer offset for the next buffer.
98     /// * `frames` - The length of the next buffer in frames.
99     ///
100     /// # Errors
101     ///
102     /// * If `frames` is greater than `requested_frames`.
set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()>103     pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> {
104         if frames > self.requested_frames {
105             return Err(Box::new(Error::TooManyFrames(
106                 frames,
107                 self.requested_frames,
108             )));
109         }
110 
111         self.buffer_set.callback(offset, frames)
112     }
113 
114     /// Ignore this request
115     ///
116     /// If the client does not intend to respond to this ServerRequest with a
117     /// buffer, they should call this function. The stream will be notified that
118     /// the request has been ignored and will handle it properly.
ignore_request(self) -> GenericResult<()>119     pub fn ignore_request(self) -> GenericResult<()> {
120         self.buffer_set.ignore()
121     }
122 }
123 
124 /// `ShmStream` allows a client to interact with an active CRAS stream.
125 pub trait ShmStream: Send {
126     /// Get the size of a frame of audio data for this stream.
frame_size(&self) -> usize127     fn frame_size(&self) -> usize;
128 
129     /// Get the number of channels of audio data for this stream.
num_channels(&self) -> usize130     fn num_channels(&self) -> usize;
131 
132     /// Get the frame rate of audio data for this stream.
frame_rate(&self) -> u32133     fn frame_rate(&self) -> u32;
134 
135     /// Waits until the next server message indicating action is required.
136     ///
137     /// For playback streams, this will be `AUDIO_MESSAGE_REQUEST_DATA`, meaning
138     /// that we must set the buffer offset to the next location where playback
139     /// data can be found.
140     /// For capture streams, this will be `AUDIO_MESSAGE_DATA_READY`, meaning
141     /// that we must set the buffer offset to the next location where captured
142     /// data can be written to.
143     /// Will return early if `timeout` elapses before a message is received.
144     ///
145     /// # Arguments
146     ///
147     /// * `timeout` - The amount of time to wait until a message is received.
148     ///
149     /// # Return value
150     ///
151     /// Returns `Some(request)` where `request` is an object that implements the
152     /// [`ServerRequest`](ServerRequest) trait and which can be used to get the
153     /// number of bytes requested for playback streams or that have already been
154     /// written to shm for capture streams.
155     ///
156     /// If the timeout occurs before a message is received, returns `None`.
157     ///
158     /// # Errors
159     ///
160     /// * If an invalid message type is received for the stream.
wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>161     fn wait_for_next_action_with_timeout(
162         &mut self,
163         timeout: Duration,
164     ) -> GenericResult<Option<ServerRequest>>;
165 }
166 
167 /// `ShmStreamSource` creates streams for playback or capture of audio.
168 pub trait ShmStreamSource: Send {
169     /// Creates a new [`ShmStream`](ShmStream)
170     ///
171     /// Creates a new `ShmStream` object, which allows:
172     /// * Waiting until the server has communicated that data is ready or
173     ///   requested that we make more data available.
174     /// * Setting the location and length of buffers for reading/writing audio data.
175     ///
176     /// # Arguments
177     ///
178     /// * `direction` - The direction of the stream, either `Playback` or `Capture`.
179     /// * `num_channels` - The number of audio channels for the stream.
180     /// * `format` - The audio format to use for audio samples.
181     /// * `frame_rate` - The stream's frame rate in Hz.
182     /// * `buffer_size` - The maximum size of an audio buffer. This will be the
183     ///                   size used for transfers of audio data between client
184     ///                   and server.
185     /// * `effects` - Audio effects to use for the stream, such as echo-cancellation.
186     /// * `client_shm` - The shared memory area that will contain samples.
187     /// * `buffer_offsets` - The two initial values to use as buffer offsets
188     ///                      for streams. This way, the server will not write
189     ///                      audio data to an arbitrary offset in `client_shm`
190     ///                      if the client fails to update offsets in time.
191     ///
192     /// # Errors
193     ///
194     /// * If sending the connect stream message to the server fails.
195     #[allow(clippy::too_many_arguments)]
new_stream( &mut self, direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, effects: &[StreamEffect], client_shm: &SharedMemory, buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>196     fn new_stream(
197         &mut self,
198         direction: StreamDirection,
199         num_channels: usize,
200         format: SampleFormat,
201         frame_rate: u32,
202         buffer_size: usize,
203         effects: &[StreamEffect],
204         client_shm: &SharedMemory,
205         buffer_offsets: [u64; 2],
206     ) -> GenericResult<Box<dyn ShmStream>>;
207 
208     /// Get a list of file descriptors used by the implementation.
209     ///
210     /// Returns any open file descriptors needed by the implementation.
211     /// This list helps users of the ShmStreamSource enter Linux jails without
212     /// closing needed file descriptors.
keep_fds(&self) -> Vec<RawFd>213     fn keep_fds(&self) -> Vec<RawFd> {
214         Vec::new()
215     }
216 }
217 
218 /// Class that implements ShmStream trait but does nothing with the samples
219 pub struct NullShmStream {
220     num_channels: usize,
221     frame_rate: u32,
222     buffer_size: usize,
223     frame_size: usize,
224     interval: Duration,
225     next_frame: Duration,
226     start_time: Instant,
227 }
228 
229 impl NullShmStream {
230     /// Attempt to create a new NullShmStream with the given number of channels,
231     /// format, frame_rate, and buffer_size.
new( buffer_size: usize, num_channels: usize, format: SampleFormat, frame_rate: u32, ) -> Self232     pub fn new(
233         buffer_size: usize,
234         num_channels: usize,
235         format: SampleFormat,
236         frame_rate: u32,
237     ) -> Self {
238         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
239         Self {
240             num_channels,
241             frame_rate,
242             buffer_size,
243             frame_size: format.sample_bytes() * num_channels,
244             interval,
245             next_frame: interval,
246             start_time: Instant::now(),
247         }
248     }
249 }
250 
251 impl BufferSet for NullShmStream {
callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>252     fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
253         Ok(())
254     }
255 
ignore(&mut self) -> GenericResult<()>256     fn ignore(&mut self) -> GenericResult<()> {
257         Ok(())
258     }
259 }
260 
261 impl ShmStream for NullShmStream {
frame_size(&self) -> usize262     fn frame_size(&self) -> usize {
263         self.frame_size
264     }
265 
num_channels(&self) -> usize266     fn num_channels(&self) -> usize {
267         self.num_channels
268     }
269 
frame_rate(&self) -> u32270     fn frame_rate(&self) -> u32 {
271         self.frame_rate
272     }
273 
wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>274     fn wait_for_next_action_with_timeout(
275         &mut self,
276         timeout: Duration,
277     ) -> GenericResult<Option<ServerRequest>> {
278         let elapsed = self.start_time.elapsed();
279         if elapsed < self.next_frame {
280             if timeout < self.next_frame - elapsed {
281                 std::thread::sleep(timeout);
282                 return Ok(None);
283             } else {
284                 std::thread::sleep(self.next_frame - elapsed);
285             }
286         }
287         self.next_frame += self.interval;
288         Ok(Some(ServerRequest::new(self.buffer_size, self)))
289     }
290 }
291 
292 /// Source of `NullShmStream` objects.
293 #[derive(Default)]
294 pub struct NullShmStreamSource;
295 
296 impl NullShmStreamSource {
new() -> Self297     pub fn new() -> Self {
298         Self::default()
299     }
300 }
301 
302 impl ShmStreamSource for NullShmStreamSource {
new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &SharedMemory, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>303     fn new_stream(
304         &mut self,
305         _direction: StreamDirection,
306         num_channels: usize,
307         format: SampleFormat,
308         frame_rate: u32,
309         buffer_size: usize,
310         _effects: &[StreamEffect],
311         _client_shm: &SharedMemory,
312         _buffer_offsets: [u64; 2],
313     ) -> GenericResult<Box<dyn ShmStream>> {
314         let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate);
315         Ok(Box::new(new_stream))
316     }
317 }
318 
319 #[derive(Clone)]
320 pub struct MockShmStream {
321     num_channels: usize,
322     frame_rate: u32,
323     request_size: usize,
324     frame_size: usize,
325     request_notifier: Arc<(Mutex<bool>, Condvar)>,
326 }
327 
328 impl MockShmStream {
329     /// Attempt to create a new MockShmStream with the given number of
330     /// channels, frame_rate, format, and buffer_size.
new( num_channels: usize, frame_rate: u32, format: SampleFormat, buffer_size: usize, ) -> Self331     pub fn new(
332         num_channels: usize,
333         frame_rate: u32,
334         format: SampleFormat,
335         buffer_size: usize,
336     ) -> Self {
337         Self {
338             num_channels,
339             frame_rate,
340             request_size: buffer_size,
341             frame_size: format.sample_bytes() * num_channels,
342             request_notifier: Arc::new((Mutex::new(false), Condvar::new())),
343         }
344     }
345 
346     /// Call to request data from the stream, causing it to return from
347     /// `wait_for_next_action_with_timeout`. Will block until
348     /// `set_buffer_offset_and_frames` is called on the ServerRequest returned
349     /// from `wait_for_next_action_with_timeout`, or until `timeout` elapses.
350     /// Returns true if a response was successfully received.
trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool351     pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool {
352         let &(ref lock, ref cvar) = &*self.request_notifier;
353         let mut requested = lock.lock();
354         *requested = true;
355         cvar.notify_one();
356         let start_time = Instant::now();
357         while *requested {
358             requested = cvar.wait_timeout(requested, timeout).0;
359             if start_time.elapsed() > timeout {
360                 // We failed to get a callback in time, mark this as false.
361                 *requested = false;
362                 return false;
363             }
364         }
365 
366         true
367     }
368 
notify_request(&mut self)369     fn notify_request(&mut self) {
370         let &(ref lock, ref cvar) = &*self.request_notifier;
371         let mut requested = lock.lock();
372         *requested = false;
373         cvar.notify_one();
374     }
375 }
376 
377 impl BufferSet for MockShmStream {
callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>378     fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
379         self.notify_request();
380         Ok(())
381     }
382 
ignore(&mut self) -> GenericResult<()>383     fn ignore(&mut self) -> GenericResult<()> {
384         self.notify_request();
385         Ok(())
386     }
387 }
388 
389 impl ShmStream for MockShmStream {
frame_size(&self) -> usize390     fn frame_size(&self) -> usize {
391         self.frame_size
392     }
393 
num_channels(&self) -> usize394     fn num_channels(&self) -> usize {
395         self.num_channels
396     }
397 
frame_rate(&self) -> u32398     fn frame_rate(&self) -> u32 {
399         self.frame_rate
400     }
401 
wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>402     fn wait_for_next_action_with_timeout(
403         &mut self,
404         timeout: Duration,
405     ) -> GenericResult<Option<ServerRequest>> {
406         {
407             let start_time = Instant::now();
408             let &(ref lock, ref cvar) = &*self.request_notifier;
409             let mut requested = lock.lock();
410             while !*requested {
411                 requested = cvar.wait_timeout(requested, timeout).0;
412                 if start_time.elapsed() > timeout {
413                     return Ok(None);
414                 }
415             }
416         }
417 
418         Ok(Some(ServerRequest::new(self.request_size, self)))
419     }
420 }
421 
422 /// Source of `MockShmStream` objects.
423 #[derive(Clone, Default)]
424 pub struct MockShmStreamSource {
425     last_stream: Arc<(Mutex<Option<MockShmStream>>, Condvar)>,
426 }
427 
428 impl MockShmStreamSource {
new() -> Self429     pub fn new() -> Self {
430         Default::default()
431     }
432 
433     /// Get the last stream that has been created from this source. If no stream
434     /// has been created, block until one has.
get_last_stream(&self) -> MockShmStream435     pub fn get_last_stream(&self) -> MockShmStream {
436         let &(ref last_stream, ref cvar) = &*self.last_stream;
437         let mut stream = last_stream.lock();
438         loop {
439             match &*stream {
440                 None => stream = cvar.wait(stream),
441                 Some(ref s) => return s.clone(),
442             };
443         }
444     }
445 }
446 
447 impl ShmStreamSource for MockShmStreamSource {
new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &SharedMemory, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>448     fn new_stream(
449         &mut self,
450         _direction: StreamDirection,
451         num_channels: usize,
452         format: SampleFormat,
453         frame_rate: u32,
454         buffer_size: usize,
455         _effects: &[StreamEffect],
456         _client_shm: &SharedMemory,
457         _buffer_offsets: [u64; 2],
458     ) -> GenericResult<Box<dyn ShmStream>> {
459         let &(ref last_stream, ref cvar) = &*self.last_stream;
460         let mut stream = last_stream.lock();
461 
462         let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size);
463         *stream = Some(new_stream.clone());
464         cvar.notify_one();
465         Ok(Box::new(new_stream))
466     }
467 }
468 
469 #[cfg(test)]
470 pub mod tests {
471     use super::*;
472 
473     #[test]
mock_trigger_callback()474     fn mock_trigger_callback() {
475         let stream_source = MockShmStreamSource::new();
476         let mut thread_stream_source = stream_source.clone();
477 
478         let buffer_size = 480;
479         let num_channels = 2;
480         let format = SampleFormat::S24LE;
481         let shm = SharedMemory::anon().expect("Failed to create shm");
482 
483         let handle = std::thread::spawn(move || {
484             let mut stream = thread_stream_source
485                 .new_stream(
486                     StreamDirection::Playback,
487                     num_channels,
488                     format,
489                     44100,
490                     buffer_size,
491                     &[],
492                     &shm,
493                     [400, 8000],
494                 )
495                 .expect("Failed to create stream");
496 
497             let request = stream
498                 .wait_for_next_action_with_timeout(Duration::from_secs(5))
499                 .expect("Failed to wait for next action");
500             match request {
501                 Some(r) => {
502                     let requested = r.requested_frames();
503                     r.set_buffer_offset_and_frames(872, requested)
504                         .expect("Failed to set buffer offset and frames");
505                     requested
506                 }
507                 None => 0,
508             }
509         });
510 
511         let mut stream = stream_source.get_last_stream();
512         assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1)));
513 
514         let requested_frames = handle.join().expect("Failed to join thread");
515         assert_eq!(requested_frames, buffer_size);
516     }
517 
518     #[test]
null_consumption_rate()519     fn null_consumption_rate() {
520         let frame_rate = 44100;
521         let buffer_size = 480;
522         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
523 
524         let shm = SharedMemory::anon().expect("Failed to create shm");
525 
526         let start = Instant::now();
527 
528         let mut stream_source = NullShmStreamSource::new();
529         let mut stream = stream_source
530             .new_stream(
531                 StreamDirection::Playback,
532                 2,
533                 SampleFormat::S24LE,
534                 frame_rate,
535                 buffer_size,
536                 &[],
537                 &shm,
538                 [400, 8000],
539             )
540             .expect("Failed to create stream");
541 
542         let timeout = Duration::from_secs(5);
543         let request = stream
544             .wait_for_next_action_with_timeout(timeout)
545             .expect("Failed to wait for first request")
546             .expect("First request should not have timed out");
547         request
548             .set_buffer_offset_and_frames(276, 480)
549             .expect("Failed to set buffer offset and length");
550 
551         // The second call should block until the first buffer is consumed.
552         let _request = stream
553             .wait_for_next_action_with_timeout(timeout)
554             .expect("Failed to wait for second request");
555         let elapsed = start.elapsed();
556         assert!(
557             elapsed > interval,
558             "wait_for_next_action_with_timeout didn't block long enough: {:?}",
559             elapsed
560         );
561 
562         assert!(
563             elapsed < timeout,
564             "wait_for_next_action_with_timeout blocked for too long: {:?}",
565             elapsed
566         );
567     }
568 }
569