• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #[cfg(unix)]
6 use std::os::unix::io::RawFd;
7 use std::sync::Arc;
8 use std::time::{Duration, Instant};
9 
10 use remain::sorted;
11 use sync::{Condvar, Mutex};
12 use thiserror::Error;
13 
14 use crate::{BoxError, SampleFormat, StreamDirection, StreamEffect};
15 
16 type GenericResult<T> = std::result::Result<T, BoxError>;
17 
18 /// `BufferSet` is used as a callback mechanism for `ServerRequest` objects.
19 /// It is meant to be implemented by the audio stream, allowing arbitrary code
20 /// to be run after a buffer offset and length is set.
21 pub trait BufferSet {
22     /// Called when the client sets a buffer offset and length.
23     ///
24     /// `offset` is the offset within shared memory of the buffer and `frames`
25     /// indicates the number of audio frames that can be read from or written to
26     /// the buffer.
callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>27     fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>;
28 
29     /// Called when the client ignores a request from the server.
ignore(&mut self) -> GenericResult<()>30     fn ignore(&mut self) -> GenericResult<()>;
31 }
32 
33 #[sorted]
34 #[derive(Error, Debug)]
35 pub enum Error {
36     #[error("Provided number of frames {0} exceeds requested number of frames {1}")]
37     TooManyFrames(usize, usize),
38 }
39 
40 /// `ServerRequest` represents an active request from the server for the client
41 /// to provide a buffer in shared memory to playback from or capture to.
42 pub struct ServerRequest<'a> {
43     requested_frames: usize,
44     buffer_set: &'a mut dyn BufferSet,
45 }
46 
47 impl<'a> ServerRequest<'a> {
48     /// Create a new ServerRequest object
49     ///
50     /// Create a ServerRequest object representing a request from the server
51     /// for a buffer `requested_frames` in size.
52     ///
53     /// When the client responds to this request by calling
54     /// [`set_buffer_offset_and_frames`](ServerRequest::set_buffer_offset_and_frames),
55     /// BufferSet::callback will be called on `buffer_set`.
56     ///
57     /// # Arguments
58     /// * `requested_frames` - The requested buffer size in frames.
59     /// * `buffer_set` - The object implementing the callback for when a buffer is provided.
new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self60     pub fn new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self {
61         Self {
62             requested_frames,
63             buffer_set,
64         }
65     }
66 
67     /// Get the number of frames of audio data requested by the server.
68     ///
69     /// The returned value should never be greater than the `buffer_size`
70     /// given in [`new_stream`](ShmStreamSource::new_stream).
requested_frames(&self) -> usize71     pub fn requested_frames(&self) -> usize {
72         self.requested_frames
73     }
74 
75     /// Sets the buffer offset and length for the requested buffer.
76     ///
77     /// Sets the buffer offset and length of the buffer that fulfills this
78     /// server request to `offset` and `length`, respectively. This means that
79     /// `length` bytes of audio samples may be read from/written to that
80     /// location in `client_shm` for a playback/capture stream, respectively.
81     /// This function may only be called once for a `ServerRequest`, at which
82     /// point the ServerRequest is dropped and no further calls are possible.
83     ///
84     /// # Arguments
85     ///
86     /// * `offset` - The value to use as the new buffer offset for the next buffer.
87     /// * `frames` - The length of the next buffer in frames.
88     ///
89     /// # Errors
90     ///
91     /// * If `frames` is greater than `requested_frames`.
set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()>92     pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> {
93         if frames > self.requested_frames {
94             return Err(Box::new(Error::TooManyFrames(
95                 frames,
96                 self.requested_frames,
97             )));
98         }
99 
100         self.buffer_set.callback(offset, frames)
101     }
102 
103     /// Ignore this request
104     ///
105     /// If the client does not intend to respond to this ServerRequest with a
106     /// buffer, they should call this function. The stream will be notified that
107     /// the request has been ignored and will handle it properly.
ignore_request(self) -> GenericResult<()>108     pub fn ignore_request(self) -> GenericResult<()> {
109         self.buffer_set.ignore()
110     }
111 }
112 
113 /// `ShmStream` allows a client to interact with an active CRAS stream.
114 pub trait ShmStream: Send {
115     /// Get the size of a frame of audio data for this stream.
frame_size(&self) -> usize116     fn frame_size(&self) -> usize;
117 
118     /// Get the number of channels of audio data for this stream.
num_channels(&self) -> usize119     fn num_channels(&self) -> usize;
120 
121     /// Get the frame rate of audio data for this stream.
frame_rate(&self) -> u32122     fn frame_rate(&self) -> u32;
123 
124     /// Waits until the next server message indicating action is required.
125     ///
126     /// For playback streams, this will be `AUDIO_MESSAGE_REQUEST_DATA`, meaning
127     /// that we must set the buffer offset to the next location where playback
128     /// data can be found.
129     /// For capture streams, this will be `AUDIO_MESSAGE_DATA_READY`, meaning
130     /// that we must set the buffer offset to the next location where captured
131     /// data can be written to.
132     /// Will return early if `timeout` elapses before a message is received.
133     ///
134     /// # Arguments
135     ///
136     /// * `timeout` - The amount of time to wait until a message is received.
137     ///
138     /// # Return value
139     ///
140     /// Returns `Some(request)` where `request` is an object that implements the
141     /// [`ServerRequest`](ServerRequest) trait and which can be used to get the
142     /// number of bytes requested for playback streams or that have already been
143     /// written to shm for capture streams.
144     ///
145     /// If the timeout occurs before a message is received, returns `None`.
146     ///
147     /// # Errors
148     ///
149     /// * If an invalid message type is received for the stream.
wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>150     fn wait_for_next_action_with_timeout(
151         &mut self,
152         timeout: Duration,
153     ) -> GenericResult<Option<ServerRequest>>;
154 }
155 
156 /// `SharedMemory` specifies features of shared memory areas passed on to `ShmStreamSource`.
157 pub trait SharedMemory {
158     type Error: std::error::Error;
159 
160     /// Creates a new shared memory file descriptor without specifying a name.
anon(size: u64) -> Result<Self, Self::Error> where Self: Sized161     fn anon(size: u64) -> Result<Self, Self::Error>
162     where
163         Self: Sized;
164 
165     /// Gets the size in bytes of the shared memory.
166     ///
167     /// The size returned here does not reflect changes by other interfaces or users of the shared
168     /// memory file descriptor..
size(&self) -> u64169     fn size(&self) -> u64;
170 
171     /// Returns the underlying raw fd.
172     #[cfg(unix)]
as_raw_fd(&self) -> RawFd173     fn as_raw_fd(&self) -> RawFd;
174 }
175 
176 /// `ShmStreamSource` creates streams for playback or capture of audio.
177 pub trait ShmStreamSource<E: std::error::Error>: Send {
178     /// Creates a new [`ShmStream`](ShmStream)
179     ///
180     /// Creates a new `ShmStream` object, which allows:
181     /// * Waiting until the server has communicated that data is ready or
182     ///   requested that we make more data available.
183     /// * Setting the location and length of buffers for reading/writing audio data.
184     ///
185     /// # Arguments
186     ///
187     /// * `direction` - The direction of the stream, either `Playback` or `Capture`.
188     /// * `num_channels` - The number of audio channels for the stream.
189     /// * `format` - The audio format to use for audio samples.
190     /// * `frame_rate` - The stream's frame rate in Hz.
191     /// * `buffer_size` - The maximum size of an audio buffer. This will be the
192     ///                   size used for transfers of audio data between client
193     ///                   and server.
194     /// * `effects` - Audio effects to use for the stream, such as echo-cancellation.
195     /// * `client_shm` - The shared memory area that will contain samples.
196     /// * `buffer_offsets` - The two initial values to use as buffer offsets
197     ///                      for streams. This way, the server will not write
198     ///                      audio data to an arbitrary offset in `client_shm`
199     ///                      if the client fails to update offsets in time.
200     ///
201     /// # Errors
202     ///
203     /// * If sending the connect stream message to the server fails.
204     #[allow(clippy::too_many_arguments)]
new_stream( &mut self, direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, effects: &[StreamEffect], client_shm: &dyn SharedMemory<Error = E>, buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>205     fn new_stream(
206         &mut self,
207         direction: StreamDirection,
208         num_channels: usize,
209         format: SampleFormat,
210         frame_rate: u32,
211         buffer_size: usize,
212         effects: &[StreamEffect],
213         client_shm: &dyn SharedMemory<Error = E>,
214         buffer_offsets: [u64; 2],
215     ) -> GenericResult<Box<dyn ShmStream>>;
216 
217     /// Get a list of file descriptors used by the implementation.
218     ///
219     /// Returns any open file descriptors needed by the implementation.
220     /// This list helps users of the ShmStreamSource enter Linux jails without
221     /// closing needed file descriptors.
222     #[cfg(unix)]
keep_fds(&self) -> Vec<RawFd>223     fn keep_fds(&self) -> Vec<RawFd> {
224         Vec::new()
225     }
226 }
227 
228 /// Class that implements ShmStream trait but does nothing with the samples
229 pub struct NullShmStream {
230     num_channels: usize,
231     frame_rate: u32,
232     buffer_size: usize,
233     frame_size: usize,
234     interval: Duration,
235     next_frame: Duration,
236     start_time: Instant,
237 }
238 
239 impl NullShmStream {
240     /// Attempt to create a new NullShmStream with the given number of channels,
241     /// format, frame_rate, and buffer_size.
new( buffer_size: usize, num_channels: usize, format: SampleFormat, frame_rate: u32, ) -> Self242     pub fn new(
243         buffer_size: usize,
244         num_channels: usize,
245         format: SampleFormat,
246         frame_rate: u32,
247     ) -> Self {
248         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
249         Self {
250             num_channels,
251             frame_rate,
252             buffer_size,
253             frame_size: format.sample_bytes() * num_channels,
254             interval,
255             next_frame: interval,
256             start_time: Instant::now(),
257         }
258     }
259 }
260 
261 impl BufferSet for NullShmStream {
callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>262     fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
263         Ok(())
264     }
265 
ignore(&mut self) -> GenericResult<()>266     fn ignore(&mut self) -> GenericResult<()> {
267         Ok(())
268     }
269 }
270 
271 impl ShmStream for NullShmStream {
frame_size(&self) -> usize272     fn frame_size(&self) -> usize {
273         self.frame_size
274     }
275 
num_channels(&self) -> usize276     fn num_channels(&self) -> usize {
277         self.num_channels
278     }
279 
frame_rate(&self) -> u32280     fn frame_rate(&self) -> u32 {
281         self.frame_rate
282     }
283 
wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>284     fn wait_for_next_action_with_timeout(
285         &mut self,
286         timeout: Duration,
287     ) -> GenericResult<Option<ServerRequest>> {
288         let elapsed = self.start_time.elapsed();
289         if elapsed < self.next_frame {
290             if timeout < self.next_frame - elapsed {
291                 std::thread::sleep(timeout);
292                 return Ok(None);
293             } else {
294                 std::thread::sleep(self.next_frame - elapsed);
295             }
296         }
297         self.next_frame += self.interval;
298         Ok(Some(ServerRequest::new(self.buffer_size, self)))
299     }
300 }
301 
302 /// Source of `NullShmStream` objects.
303 #[derive(Default)]
304 pub struct NullShmStreamSource;
305 
306 impl NullShmStreamSource {
new() -> Self307     pub fn new() -> Self {
308         Self::default()
309     }
310 }
311 
312 impl<E: std::error::Error> ShmStreamSource<E> for NullShmStreamSource {
new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &dyn SharedMemory<Error = E>, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>313     fn new_stream(
314         &mut self,
315         _direction: StreamDirection,
316         num_channels: usize,
317         format: SampleFormat,
318         frame_rate: u32,
319         buffer_size: usize,
320         _effects: &[StreamEffect],
321         _client_shm: &dyn SharedMemory<Error = E>,
322         _buffer_offsets: [u64; 2],
323     ) -> GenericResult<Box<dyn ShmStream>> {
324         let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate);
325         Ok(Box::new(new_stream))
326     }
327 }
328 
329 #[derive(Clone)]
330 pub struct MockShmStream {
331     num_channels: usize,
332     frame_rate: u32,
333     request_size: usize,
334     frame_size: usize,
335     request_notifier: Arc<(Mutex<bool>, Condvar)>,
336 }
337 
338 impl MockShmStream {
339     /// Attempt to create a new MockShmStream with the given number of
340     /// channels, frame_rate, format, and buffer_size.
new( num_channels: usize, frame_rate: u32, format: SampleFormat, buffer_size: usize, ) -> Self341     pub fn new(
342         num_channels: usize,
343         frame_rate: u32,
344         format: SampleFormat,
345         buffer_size: usize,
346     ) -> Self {
347         Self {
348             num_channels,
349             frame_rate,
350             request_size: buffer_size,
351             frame_size: format.sample_bytes() * num_channels,
352             request_notifier: Arc::new((Mutex::new(false), Condvar::new())),
353         }
354     }
355 
356     /// Call to request data from the stream, causing it to return from
357     /// `wait_for_next_action_with_timeout`. Will block until
358     /// `set_buffer_offset_and_frames` is called on the ServerRequest returned
359     /// from `wait_for_next_action_with_timeout`, or until `timeout` elapses.
360     /// Returns true if a response was successfully received.
trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool361     pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool {
362         let &(ref lock, ref cvar) = &*self.request_notifier;
363         let mut requested = lock.lock();
364         *requested = true;
365         cvar.notify_one();
366         let start_time = Instant::now();
367         while *requested {
368             requested = cvar.wait_timeout(requested, timeout).0;
369             if start_time.elapsed() > timeout {
370                 // We failed to get a callback in time, mark this as false.
371                 *requested = false;
372                 return false;
373             }
374         }
375 
376         true
377     }
378 
notify_request(&mut self)379     fn notify_request(&mut self) {
380         let &(ref lock, ref cvar) = &*self.request_notifier;
381         let mut requested = lock.lock();
382         *requested = false;
383         cvar.notify_one();
384     }
385 }
386 
387 impl BufferSet for MockShmStream {
callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>388     fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> {
389         self.notify_request();
390         Ok(())
391     }
392 
ignore(&mut self) -> GenericResult<()>393     fn ignore(&mut self) -> GenericResult<()> {
394         self.notify_request();
395         Ok(())
396     }
397 }
398 
399 impl ShmStream for MockShmStream {
frame_size(&self) -> usize400     fn frame_size(&self) -> usize {
401         self.frame_size
402     }
403 
num_channels(&self) -> usize404     fn num_channels(&self) -> usize {
405         self.num_channels
406     }
407 
frame_rate(&self) -> u32408     fn frame_rate(&self) -> u32 {
409         self.frame_rate
410     }
411 
wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>412     fn wait_for_next_action_with_timeout(
413         &mut self,
414         timeout: Duration,
415     ) -> GenericResult<Option<ServerRequest>> {
416         {
417             let start_time = Instant::now();
418             let &(ref lock, ref cvar) = &*self.request_notifier;
419             let mut requested = lock.lock();
420             while !*requested {
421                 requested = cvar.wait_timeout(requested, timeout).0;
422                 if start_time.elapsed() > timeout {
423                     return Ok(None);
424                 }
425             }
426         }
427 
428         Ok(Some(ServerRequest::new(self.request_size, self)))
429     }
430 }
431 
432 /// Source of `MockShmStream` objects.
433 #[derive(Clone, Default)]
434 pub struct MockShmStreamSource {
435     last_stream: Arc<(Mutex<Option<MockShmStream>>, Condvar)>,
436 }
437 
438 impl MockShmStreamSource {
new() -> Self439     pub fn new() -> Self {
440         Default::default()
441     }
442 
443     /// Get the last stream that has been created from this source. If no stream
444     /// has been created, block until one has.
get_last_stream(&self) -> MockShmStream445     pub fn get_last_stream(&self) -> MockShmStream {
446         let &(ref last_stream, ref cvar) = &*self.last_stream;
447         let mut stream = last_stream.lock();
448         loop {
449             match &*stream {
450                 None => stream = cvar.wait(stream),
451                 Some(ref s) => return s.clone(),
452             };
453         }
454     }
455 }
456 
457 impl<E: std::error::Error> ShmStreamSource<E> for MockShmStreamSource {
new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &dyn SharedMemory<Error = E>, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>458     fn new_stream(
459         &mut self,
460         _direction: StreamDirection,
461         num_channels: usize,
462         format: SampleFormat,
463         frame_rate: u32,
464         buffer_size: usize,
465         _effects: &[StreamEffect],
466         _client_shm: &dyn SharedMemory<Error = E>,
467         _buffer_offsets: [u64; 2],
468     ) -> GenericResult<Box<dyn ShmStream>> {
469         let &(ref last_stream, ref cvar) = &*self.last_stream;
470         let mut stream = last_stream.lock();
471 
472         let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size);
473         *stream = Some(new_stream.clone());
474         cvar.notify_one();
475         Ok(Box::new(new_stream))
476     }
477 }
478 
479 // Tests that run only for Unix, where `sys_util::SharedMemory` is used.
480 #[cfg(all(test, unix))]
481 pub mod tests {
482     use super::*;
483 
484     use std::os::unix::io::AsRawFd;
485     use sys_util::SharedMemory as SysSharedMemory;
486 
487     impl SharedMemory for SysSharedMemory {
488         type Error = sys_util::Error;
489 
anon(_: u64) -> Result<Self, Self::Error>490         fn anon(_: u64) -> Result<Self, Self::Error> {
491             SysSharedMemory::anon()
492         }
493 
size(&self) -> u64494         fn size(&self) -> u64 {
495             self.size()
496         }
497 
498         #[cfg(unix)]
as_raw_fd(&self) -> RawFd499         fn as_raw_fd(&self) -> RawFd {
500             AsRawFd::as_raw_fd(self)
501         }
502     }
503 
504     #[test]
mock_trigger_callback()505     fn mock_trigger_callback() {
506         let stream_source = MockShmStreamSource::new();
507         let mut thread_stream_source = stream_source.clone();
508 
509         let buffer_size = 480;
510         let num_channels = 2;
511         let format = SampleFormat::S24LE;
512         let shm = SysSharedMemory::anon().expect("Failed to create shm");
513 
514         let handle = std::thread::spawn(move || {
515             let mut stream = thread_stream_source
516                 .new_stream(
517                     StreamDirection::Playback,
518                     num_channels,
519                     format,
520                     44100,
521                     buffer_size,
522                     &[],
523                     &shm,
524                     [400, 8000],
525                 )
526                 .expect("Failed to create stream");
527 
528             let request = stream
529                 .wait_for_next_action_with_timeout(Duration::from_secs(5))
530                 .expect("Failed to wait for next action");
531             match request {
532                 Some(r) => {
533                     let requested = r.requested_frames();
534                     r.set_buffer_offset_and_frames(872, requested)
535                         .expect("Failed to set buffer offset and frames");
536                     requested
537                 }
538                 None => 0,
539             }
540         });
541 
542         let mut stream = stream_source.get_last_stream();
543         assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1)));
544 
545         let requested_frames = handle.join().expect("Failed to join thread");
546         assert_eq!(requested_frames, buffer_size);
547     }
548 
549     #[test]
null_consumption_rate()550     fn null_consumption_rate() {
551         let frame_rate = 44100;
552         let buffer_size = 480;
553         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
554 
555         let shm = SysSharedMemory::anon().expect("Failed to create shm");
556 
557         let start = Instant::now();
558 
559         let mut stream_source = NullShmStreamSource::new();
560         let mut stream = stream_source
561             .new_stream(
562                 StreamDirection::Playback,
563                 2,
564                 SampleFormat::S24LE,
565                 frame_rate,
566                 buffer_size,
567                 &[],
568                 &shm,
569                 [400, 8000],
570             )
571             .expect("Failed to create stream");
572 
573         let timeout = Duration::from_secs(5);
574         let request = stream
575             .wait_for_next_action_with_timeout(timeout)
576             .expect("Failed to wait for first request")
577             .expect("First request should not have timed out");
578         request
579             .set_buffer_offset_and_frames(276, 480)
580             .expect("Failed to set buffer offset and length");
581 
582         // The second call should block until the first buffer is consumed.
583         let _request = stream
584             .wait_for_next_action_with_timeout(timeout)
585             .expect("Failed to wait for second request");
586         let elapsed = start.elapsed();
587         assert!(
588             elapsed > interval,
589             "wait_for_next_action_with_timeout didn't block long enough: {:?}",
590             elapsed
591         );
592 
593         assert!(
594             elapsed < timeout,
595             "wait_for_next_action_with_timeout blocked for too long: {:?}",
596             elapsed
597         );
598     }
599 }
600