• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #[cfg(unix)]
6 use std::os::unix::io::RawFd;
7 use std::sync::Arc;
8 use std::sync::Condvar;
9 use std::sync::Mutex;
10 use std::time::Duration;
11 use std::time::Instant;
12 
13 use remain::sorted;
14 use thiserror::Error;
15 
16 use crate::BoxError;
17 use crate::SampleFormat;
18 use crate::StreamDirection;
19 use crate::StreamEffect;
20 
21 type GenericResult<T> = std::result::Result<T, BoxError>;
22 
23 /// `BufferSet` is used as a callback mechanism for `ServerRequest` objects.
24 /// It is meant to be implemented by the audio stream, allowing arbitrary code
25 /// to be run after a buffer offset and length is set.
26 pub trait BufferSet {
27     /// Called when the client sets a buffer offset and length.
28     ///
29     /// `offset` is the offset within shared memory of the buffer and `frames`
30     /// indicates the number of audio frames that can be read from or written to
31     /// the buffer.
callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>32     fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>;
33 
34     /// Called when the client ignores a request from the server.
ignore(&mut self) -> GenericResult<()>35     fn ignore(&mut self) -> GenericResult<()>;
36 }
37 
38 #[sorted]
39 #[derive(Error, Debug)]
40 pub enum Error {
41     #[error("Provided number of frames {0} exceeds requested number of frames {1}")]
42     TooManyFrames(usize, usize),
43 }
44 
45 /// `ServerRequest` represents an active request from the server for the client
46 /// to provide a buffer in shared memory to playback from or capture to.
47 pub struct ServerRequest<'a> {
48     requested_frames: usize,
49     buffer_set: &'a mut dyn BufferSet,
50 }
51 
52 impl<'a> ServerRequest<'a> {
53     /// Create a new ServerRequest object
54     ///
55     /// Create a ServerRequest object representing a request from the server
56     /// for a buffer `requested_frames` in size.
57     ///
58     /// When the client responds to this request by calling
59     /// [`set_buffer_offset_and_frames`](ServerRequest::set_buffer_offset_and_frames),
60     /// BufferSet::callback will be called on `buffer_set`.
61     ///
62     /// # Arguments
63     /// * `requested_frames` - The requested buffer size in frames.
64     /// * `buffer_set` - The object implementing the callback for when a buffer is provided.
new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self65     pub fn new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self {
66         Self {
67             requested_frames,
68             buffer_set,
69         }
70     }
71 
72     /// Get the number of frames of audio data requested by the server.
73     ///
74     /// The returned value should never be greater than the `buffer_size`
75     /// given in [`new_stream`](ShmStreamSource::new_stream).
requested_frames(&self) -> usize76     pub fn requested_frames(&self) -> usize {
77         self.requested_frames
78     }
79 
80     /// Sets the buffer offset and length for the requested buffer.
81     ///
82     /// Sets the buffer offset and length of the buffer that fulfills this
83     /// server request to `offset` and `length`, respectively. This means that
84     /// `length` bytes of audio samples may be read from/written to that
85     /// location in `client_shm` for a playback/capture stream, respectively.
86     /// This function may only be called once for a `ServerRequest`, at which
87     /// point the ServerRequest is dropped and no further calls are possible.
88     ///
89     /// # Arguments
90     ///
91     /// * `offset` - The value to use as the new buffer offset for the next buffer.
92     /// * `frames` - The length of the next buffer in frames.
93     ///
94     /// # Errors
95     ///
96     /// * If `frames` is greater than `requested_frames`.
set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()>97     pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> {
98         if frames > self.requested_frames {
99             return Err(Box::new(Error::TooManyFrames(
100                 frames,
101                 self.requested_frames,
102             )));
103         }
104 
105         self.buffer_set.callback(offset, frames)
106     }
107 
108     /// Ignore this request
109     ///
110     /// If the client does not intend to respond to this ServerRequest with a
111     /// buffer, they should call this function. The stream will be notified that
112     /// the request has been ignored and will handle it properly.
ignore_request(self) -> GenericResult<()>113     pub fn ignore_request(self) -> GenericResult<()> {
114         self.buffer_set.ignore()
115     }
116 }
117 
118 /// `ShmStream` allows a client to interact with an active CRAS stream.
119 pub trait ShmStream: Send {
120     /// Get the size of a frame of audio data for this stream.
frame_size(&self) -> usize121     fn frame_size(&self) -> usize;
122 
123     /// Get the number of channels of audio data for this stream.
num_channels(&self) -> usize124     fn num_channels(&self) -> usize;
125 
126     /// Get the frame rate of audio data for this stream.
frame_rate(&self) -> u32127     fn frame_rate(&self) -> u32;
128 
129     /// Waits until the next server message indicating action is required.
130     ///
131     /// For playback streams, this will be `AUDIO_MESSAGE_REQUEST_DATA`, meaning
132     /// that we must set the buffer offset to the next location where playback
133     /// data can be found.
134     /// For capture streams, this will be `AUDIO_MESSAGE_DATA_READY`, meaning
135     /// that we must set the buffer offset to the next location where captured
136     /// data can be written to.
137     /// Will return early if `timeout` elapses before a message is received.
138     ///
139     /// # Arguments
140     ///
141     /// * `timeout` - The amount of time to wait until a message is received.
142     ///
143     /// # Return value
144     ///
145     /// Returns `Some(request)` where `request` is an object that implements the
146     /// [`ServerRequest`](ServerRequest) trait and which can be used to get the
147     /// number of bytes requested for playback streams or that have already been
148     /// written to shm for capture streams.
149     ///
150     /// If the timeout occurs before a message is received, returns `None`.
151     ///
152     /// # Errors
153     ///
154     /// * If an invalid message type is received for the stream.
wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>155     fn wait_for_next_action_with_timeout(
156         &mut self,
157         timeout: Duration,
158     ) -> GenericResult<Option<ServerRequest>>;
159 }
160 
161 /// `SharedMemory` specifies features of shared memory areas passed on to `ShmStreamSource`.
162 pub trait SharedMemory {
163     type Error: std::error::Error;
164 
165     /// Creates a new shared memory file descriptor without specifying a name.
anon(size: u64) -> Result<Self, Self::Error> where Self: Sized166     fn anon(size: u64) -> Result<Self, Self::Error>
167     where
168         Self: Sized;
169 
170     /// Gets the size in bytes of the shared memory.
171     ///
172     /// The size returned here does not reflect changes by other interfaces or users of the shared
173     /// memory file descriptor..
size(&self) -> u64174     fn size(&self) -> u64;
175 
176     /// Returns the underlying raw fd.
177     #[cfg(unix)]
as_raw_fd(&self) -> RawFd178     fn as_raw_fd(&self) -> RawFd;
179 }
180 
181 /// `ShmStreamSource` creates streams for playback or capture of audio.
182 pub trait ShmStreamSource<E: std::error::Error>: Send {
183     /// Creates a new [`ShmStream`](ShmStream)
184     ///
185     /// Creates a new `ShmStream` object, which allows:
186     /// * Waiting until the server has communicated that data is ready or
187     ///   requested that we make more data available.
188     /// * Setting the location and length of buffers for reading/writing audio data.
189     ///
190     /// # Arguments
191     ///
192     /// * `direction` - The direction of the stream, either `Playback` or `Capture`.
193     /// * `num_channels` - The number of audio channels for the stream.
194     /// * `format` - The audio format to use for audio samples.
195     /// * `frame_rate` - The stream's frame rate in Hz.
196     /// * `buffer_size` - The maximum size of an audio buffer. This will be the
197     ///                   size used for transfers of audio data between client
198     ///                   and server.
199     /// * `effects` - Audio effects to use for the stream, such as echo-cancellation.
200     /// * `client_shm` - The shared memory area that will contain samples.
201     /// * `buffer_offsets` - The two initial values to use as buffer offsets
202     ///                      for streams. This way, the server will not write
203     ///                      audio data to an arbitrary offset in `client_shm`
204     ///                      if the client fails to update offsets in time.
205     ///
206     /// # Errors
207     ///
208     /// * If sending the connect stream message to the server fails.
209     #[allow(clippy::too_many_arguments)]
new_stream( &mut self, direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, effects: &[StreamEffect], client_shm: &dyn SharedMemory<Error = E>, buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>210     fn new_stream(
211         &mut self,
212         direction: StreamDirection,
213         num_channels: usize,
214         format: SampleFormat,
215         frame_rate: u32,
216         buffer_size: usize,
217         effects: &[StreamEffect],
218         client_shm: &dyn SharedMemory<Error = E>,
219         buffer_offsets: [u64; 2],
220     ) -> GenericResult<Box<dyn ShmStream>>;
221 
222     /// Get a list of file descriptors used by the implementation.
223     ///
224     /// Returns any open file descriptors needed by the implementation.
225     /// This list helps users of the ShmStreamSource enter Linux jails without
226     /// closing needed file descriptors.
227     #[cfg(unix)]
keep_fds(&self) -> Vec<RawFd>228     fn keep_fds(&self) -> Vec<RawFd> {
229         Vec::new()
230     }
231 }
232 
233 /// Class that implements ShmStream trait but does nothing with the samples
234 pub struct NullShmStream {
235     num_channels: usize,
236     frame_rate: u32,
237     buffer_size: usize,
238     frame_size: usize,
239     interval: Duration,
240     next_frame: Duration,
241     start_time: Instant,
242 }
243 
244 impl NullShmStream {
245     /// Attempt to create a new NullShmStream with the given number of channels,
246     /// format, frame_rate, and buffer_size.
new( buffer_size: usize, num_channels: usize, format: SampleFormat, frame_rate: u32, ) -> Self247     pub fn new(
248         buffer_size: usize,
249         num_channels: usize,
250         format: SampleFormat,
251         frame_rate: u32,
252     ) -> Self {
253         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
254         Self {
255             num_channels,
256             frame_rate,
257             buffer_size,
258             frame_size: format.sample_bytes() * num_channels,
259             interval,
260             next_frame: interval,
261             start_time: Instant::now(),
262         }
263     }
264 }
265 
266 impl BufferSet for NullShmStream {
callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>267     fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
268         Ok(())
269     }
270 
ignore(&mut self) -> GenericResult<()>271     fn ignore(&mut self) -> GenericResult<()> {
272         Ok(())
273     }
274 }
275 
276 impl ShmStream for NullShmStream {
frame_size(&self) -> usize277     fn frame_size(&self) -> usize {
278         self.frame_size
279     }
280 
num_channels(&self) -> usize281     fn num_channels(&self) -> usize {
282         self.num_channels
283     }
284 
frame_rate(&self) -> u32285     fn frame_rate(&self) -> u32 {
286         self.frame_rate
287     }
288 
wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>289     fn wait_for_next_action_with_timeout(
290         &mut self,
291         timeout: Duration,
292     ) -> GenericResult<Option<ServerRequest>> {
293         let elapsed = self.start_time.elapsed();
294         if elapsed < self.next_frame {
295             if timeout < self.next_frame - elapsed {
296                 std::thread::sleep(timeout);
297                 return Ok(None);
298             } else {
299                 std::thread::sleep(self.next_frame - elapsed);
300             }
301         }
302         self.next_frame += self.interval;
303         Ok(Some(ServerRequest::new(self.buffer_size, self)))
304     }
305 }
306 
307 /// Source of `NullShmStream` objects.
308 #[derive(Default)]
309 pub struct NullShmStreamSource;
310 
311 impl NullShmStreamSource {
new() -> Self312     pub fn new() -> Self {
313         Self::default()
314     }
315 }
316 
317 impl<E: std::error::Error> ShmStreamSource<E> for NullShmStreamSource {
new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &dyn SharedMemory<Error = E>, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>318     fn new_stream(
319         &mut self,
320         _direction: StreamDirection,
321         num_channels: usize,
322         format: SampleFormat,
323         frame_rate: u32,
324         buffer_size: usize,
325         _effects: &[StreamEffect],
326         _client_shm: &dyn SharedMemory<Error = E>,
327         _buffer_offsets: [u64; 2],
328     ) -> GenericResult<Box<dyn ShmStream>> {
329         let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate);
330         Ok(Box::new(new_stream))
331     }
332 }
333 
334 #[derive(Clone)]
335 pub struct MockShmStream {
336     num_channels: usize,
337     frame_rate: u32,
338     request_size: usize,
339     frame_size: usize,
340     request_notifier: Arc<(Mutex<bool>, Condvar)>,
341 }
342 
343 impl MockShmStream {
344     /// Attempt to create a new MockShmStream with the given number of
345     /// channels, frame_rate, format, and buffer_size.
new( num_channels: usize, frame_rate: u32, format: SampleFormat, buffer_size: usize, ) -> Self346     pub fn new(
347         num_channels: usize,
348         frame_rate: u32,
349         format: SampleFormat,
350         buffer_size: usize,
351     ) -> Self {
352         #[allow(clippy::mutex_atomic)]
353         Self {
354             num_channels,
355             frame_rate,
356             request_size: buffer_size,
357             frame_size: format.sample_bytes() * num_channels,
358             request_notifier: Arc::new((Mutex::new(false), Condvar::new())),
359         }
360     }
361 
362     /// Call to request data from the stream, causing it to return from
363     /// `wait_for_next_action_with_timeout`. Will block until
364     /// `set_buffer_offset_and_frames` is called on the ServerRequest returned
365     /// from `wait_for_next_action_with_timeout`, or until `timeout` elapses.
366     /// Returns true if a response was successfully received.
trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool367     pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool {
368         let (lock, cvar) = &*self.request_notifier;
369         let mut requested = lock.lock().unwrap();
370         *requested = true;
371         cvar.notify_one();
372         let start_time = Instant::now();
373         while *requested {
374             requested = cvar.wait_timeout(requested, timeout).unwrap().0;
375             if start_time.elapsed() > timeout {
376                 // We failed to get a callback in time, mark this as false.
377                 *requested = false;
378                 return false;
379             }
380         }
381 
382         true
383     }
384 
notify_request(&mut self)385     fn notify_request(&mut self) {
386         let (lock, cvar) = &*self.request_notifier;
387         let mut requested = lock.lock().unwrap();
388         *requested = false;
389         cvar.notify_one();
390     }
391 }
392 
393 impl BufferSet for MockShmStream {
callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>394     fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
395         self.notify_request();
396         Ok(())
397     }
398 
ignore(&mut self) -> GenericResult<()>399     fn ignore(&mut self) -> GenericResult<()> {
400         self.notify_request();
401         Ok(())
402     }
403 }
404 
405 impl ShmStream for MockShmStream {
frame_size(&self) -> usize406     fn frame_size(&self) -> usize {
407         self.frame_size
408     }
409 
num_channels(&self) -> usize410     fn num_channels(&self) -> usize {
411         self.num_channels
412     }
413 
frame_rate(&self) -> u32414     fn frame_rate(&self) -> u32 {
415         self.frame_rate
416     }
417 
wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>418     fn wait_for_next_action_with_timeout(
419         &mut self,
420         timeout: Duration,
421     ) -> GenericResult<Option<ServerRequest>> {
422         {
423             let start_time = Instant::now();
424             let (lock, cvar) = &*self.request_notifier;
425             let mut requested = lock.lock().unwrap();
426             while !*requested {
427                 requested = cvar.wait_timeout(requested, timeout).unwrap().0;
428                 if start_time.elapsed() > timeout {
429                     return Ok(None);
430                 }
431             }
432         }
433 
434         Ok(Some(ServerRequest::new(self.request_size, self)))
435     }
436 }
437 
438 /// Source of `MockShmStream` objects.
439 #[derive(Clone, Default)]
440 pub struct MockShmStreamSource {
441     last_stream: Arc<(Mutex<Option<MockShmStream>>, Condvar)>,
442 }
443 
444 impl MockShmStreamSource {
new() -> Self445     pub fn new() -> Self {
446         Default::default()
447     }
448 
449     /// Get the last stream that has been created from this source. If no stream
450     /// has been created, block until one has.
get_last_stream(&self) -> MockShmStream451     pub fn get_last_stream(&self) -> MockShmStream {
452         let (last_stream, cvar) = &*self.last_stream;
453         let mut stream = last_stream.lock().unwrap();
454         loop {
455             match &*stream {
456                 None => stream = cvar.wait(stream).unwrap(),
457                 Some(ref s) => return s.clone(),
458             };
459         }
460     }
461 }
462 
463 impl<E: std::error::Error> ShmStreamSource<E> for MockShmStreamSource {
new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &dyn SharedMemory<Error = E>, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>464     fn new_stream(
465         &mut self,
466         _direction: StreamDirection,
467         num_channels: usize,
468         format: SampleFormat,
469         frame_rate: u32,
470         buffer_size: usize,
471         _effects: &[StreamEffect],
472         _client_shm: &dyn SharedMemory<Error = E>,
473         _buffer_offsets: [u64; 2],
474     ) -> GenericResult<Box<dyn ShmStream>> {
475         let (last_stream, cvar) = &*self.last_stream;
476         let mut stream = last_stream.lock().unwrap();
477 
478         let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size);
479         *stream = Some(new_stream.clone());
480         cvar.notify_one();
481         Ok(Box::new(new_stream))
482     }
483 }
484 
485 // Tests that run only for Unix, where `base::SharedMemory` is used.
486 #[cfg(all(test, unix))]
487 pub mod tests {
488     use super::*;
489 
490     struct MockSharedMemory {}
491 
492     impl SharedMemory for MockSharedMemory {
493         type Error = super::Error;
494 
anon(_: u64) -> Result<Self, Self::Error>495         fn anon(_: u64) -> Result<Self, Self::Error> {
496             Ok(MockSharedMemory {})
497         }
498 
size(&self) -> u64499         fn size(&self) -> u64 {
500             0
501         }
502 
503         #[cfg(unix)]
as_raw_fd(&self) -> RawFd504         fn as_raw_fd(&self) -> RawFd {
505             0
506         }
507     }
508 
509     #[test]
mock_trigger_callback()510     fn mock_trigger_callback() {
511         let stream_source = MockShmStreamSource::new();
512         let mut thread_stream_source = stream_source.clone();
513 
514         let buffer_size = 480;
515         let num_channels = 2;
516         let format = SampleFormat::S24LE;
517         let shm = MockSharedMemory {};
518 
519         let handle = std::thread::spawn(move || {
520             let mut stream = thread_stream_source
521                 .new_stream(
522                     StreamDirection::Playback,
523                     num_channels,
524                     format,
525                     44100,
526                     buffer_size,
527                     &[],
528                     &shm,
529                     [400, 8000],
530                 )
531                 .expect("Failed to create stream");
532 
533             let request = stream
534                 .wait_for_next_action_with_timeout(Duration::from_secs(5))
535                 .expect("Failed to wait for next action");
536             match request {
537                 Some(r) => {
538                     let requested = r.requested_frames();
539                     r.set_buffer_offset_and_frames(872, requested)
540                         .expect("Failed to set buffer offset and frames");
541                     requested
542                 }
543                 None => 0,
544             }
545         });
546 
547         let mut stream = stream_source.get_last_stream();
548         assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1)));
549 
550         let requested_frames = handle.join().expect("Failed to join thread");
551         assert_eq!(requested_frames, buffer_size);
552     }
553 
554     #[test]
null_consumption_rate()555     fn null_consumption_rate() {
556         let frame_rate = 44100;
557         let buffer_size = 480;
558         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
559 
560         let shm = MockSharedMemory {};
561 
562         let start = Instant::now();
563 
564         let mut stream_source = NullShmStreamSource::new();
565         let mut stream = stream_source
566             .new_stream(
567                 StreamDirection::Playback,
568                 2,
569                 SampleFormat::S24LE,
570                 frame_rate,
571                 buffer_size,
572                 &[],
573                 &shm,
574                 [400, 8000],
575             )
576             .expect("Failed to create stream");
577 
578         let timeout = Duration::from_secs(5);
579         let request = stream
580             .wait_for_next_action_with_timeout(timeout)
581             .expect("Failed to wait for first request")
582             .expect("First request should not have timed out");
583         request
584             .set_buffer_offset_and_frames(276, 480)
585             .expect("Failed to set buffer offset and length");
586 
587         // The second call should block until the first buffer is consumed.
588         let _request = stream
589             .wait_for_next_action_with_timeout(timeout)
590             .expect("Failed to wait for second request");
591         let elapsed = start.elapsed();
592         assert!(
593             elapsed > interval,
594             "wait_for_next_action_with_timeout didn't block long enough: {:?}",
595             elapsed
596         );
597 
598         assert!(
599             elapsed < timeout,
600             "wait_for_next_action_with_timeout blocked for too long: {:?}",
601             elapsed
602         );
603     }
604 }
605