// Copyright 2019 The Chromium OS Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. use std::error; use std::fmt; use std::os::unix::io::RawFd; use std::sync::Arc; use std::time::{Duration, Instant}; use sync::{Condvar, Mutex}; use sys_util::SharedMemory; use crate::{BoxError, SampleFormat, StreamDirection, StreamEffect}; type GenericResult = std::result::Result; /// `BufferSet` is used as a callback mechanism for `ServerRequest` objects. /// It is meant to be implemented by the audio stream, allowing arbitrary code /// to be run after a buffer offset and length is set. pub trait BufferSet { /// Called when the client sets a buffer offset and length. /// /// `offset` is the offset within shared memory of the buffer and `frames` /// indicates the number of audio frames that can be read from or written to /// the buffer. fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>; /// Called when the client ignores a request from the server. fn ignore(&mut self) -> GenericResult<()>; } #[derive(Debug)] pub enum Error { TooManyFrames(usize, usize), } impl error::Error for Error {} impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Error::TooManyFrames(provided, requested) => write!( f, "Provided number of frames {} exceeds requested number of frames {}", provided, requested ), } } } /// `ServerRequest` represents an active request from the server for the client /// to provide a buffer in shared memory to playback from or capture to. pub struct ServerRequest<'a> { requested_frames: usize, buffer_set: &'a mut dyn BufferSet, } impl<'a> ServerRequest<'a> { /// Create a new ServerRequest object /// /// Create a ServerRequest object representing a request from the server /// for a buffer `requested_frames` in size. /// /// When the client responds to this request by calling /// [`set_buffer_offset_and_frames`](ServerRequest::set_buffer_offset_and_frames), /// BufferSet::callback will be called on `buffer_set`. /// /// # Arguments /// * `requested_frames` - The requested buffer size in frames. /// * `buffer_set` - The object implementing the callback for when a buffer is provided. pub fn new(requested_frames: usize, buffer_set: &'a mut D) -> Self { Self { requested_frames, buffer_set, } } /// Get the number of frames of audio data requested by the server. /// /// The returned value should never be greater than the `buffer_size` /// given in [`new_stream`](ShmStreamSource::new_stream). pub fn requested_frames(&self) -> usize { self.requested_frames } /// Sets the buffer offset and length for the requested buffer. /// /// Sets the buffer offset and length of the buffer that fulfills this /// server request to `offset` and `length`, respectively. This means that /// `length` bytes of audio samples may be read from/written to that /// location in `client_shm` for a playback/capture stream, respectively. /// This function may only be called once for a `ServerRequest`, at which /// point the ServerRequest is dropped and no further calls are possible. /// /// # Arguments /// /// * `offset` - The value to use as the new buffer offset for the next buffer. /// * `frames` - The length of the next buffer in frames. /// /// # Errors /// /// * If `frames` is greater than `requested_frames`. pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> { if frames > self.requested_frames { return Err(Box::new(Error::TooManyFrames( frames, self.requested_frames, ))); } self.buffer_set.callback(offset, frames) } /// Ignore this request /// /// If the client does not intend to respond to this ServerRequest with a /// buffer, they should call this function. The stream will be notified that /// the request has been ignored and will handle it properly. pub fn ignore_request(self) -> GenericResult<()> { self.buffer_set.ignore() } } /// `ShmStream` allows a client to interact with an active CRAS stream. pub trait ShmStream: Send { /// Get the size of a frame of audio data for this stream. fn frame_size(&self) -> usize; /// Get the number of channels of audio data for this stream. fn num_channels(&self) -> usize; /// Get the frame rate of audio data for this stream. fn frame_rate(&self) -> u32; /// Waits until the next server message indicating action is required. /// /// For playback streams, this will be `AUDIO_MESSAGE_REQUEST_DATA`, meaning /// that we must set the buffer offset to the next location where playback /// data can be found. /// For capture streams, this will be `AUDIO_MESSAGE_DATA_READY`, meaning /// that we must set the buffer offset to the next location where captured /// data can be written to. /// Will return early if `timeout` elapses before a message is received. /// /// # Arguments /// /// * `timeout` - The amount of time to wait until a message is received. /// /// # Return value /// /// Returns `Some(request)` where `request` is an object that implements the /// [`ServerRequest`](ServerRequest) trait and which can be used to get the /// number of bytes requested for playback streams or that have already been /// written to shm for capture streams. /// /// If the timeout occurs before a message is received, returns `None`. /// /// # Errors /// /// * If an invalid message type is received for the stream. fn wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult>; } /// `ShmStreamSource` creates streams for playback or capture of audio. pub trait ShmStreamSource: Send { /// Creates a new [`ShmStream`](ShmStream) /// /// Creates a new `ShmStream` object, which allows: /// * Waiting until the server has communicated that data is ready or /// requested that we make more data available. /// * Setting the location and length of buffers for reading/writing audio data. /// /// # Arguments /// /// * `direction` - The direction of the stream, either `Playback` or `Capture`. /// * `num_channels` - The number of audio channels for the stream. /// * `format` - The audio format to use for audio samples. /// * `frame_rate` - The stream's frame rate in Hz. /// * `buffer_size` - The maximum size of an audio buffer. This will be the /// size used for transfers of audio data between client /// and server. /// * `effects` - Audio effects to use for the stream, such as echo-cancellation. /// * `client_shm` - The shared memory area that will contain samples. /// * `buffer_offsets` - The two initial values to use as buffer offsets /// for streams. This way, the server will not write /// audio data to an arbitrary offset in `client_shm` /// if the client fails to update offsets in time. /// /// # Errors /// /// * If sending the connect stream message to the server fails. #[allow(clippy::too_many_arguments)] fn new_stream( &mut self, direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, effects: &[StreamEffect], client_shm: &SharedMemory, buffer_offsets: [u64; 2], ) -> GenericResult>; /// Get a list of file descriptors used by the implementation. /// /// Returns any open file descriptors needed by the implementation. /// This list helps users of the ShmStreamSource enter Linux jails without /// closing needed file descriptors. fn keep_fds(&self) -> Vec { Vec::new() } } /// Class that implements ShmStream trait but does nothing with the samples pub struct NullShmStream { num_channels: usize, frame_rate: u32, buffer_size: usize, frame_size: usize, interval: Duration, next_frame: Duration, start_time: Instant, } impl NullShmStream { /// Attempt to create a new NullShmStream with the given number of channels, /// format, frame_rate, and buffer_size. pub fn new( buffer_size: usize, num_channels: usize, format: SampleFormat, frame_rate: u32, ) -> Self { let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); Self { num_channels, frame_rate, buffer_size, frame_size: format.sample_bytes() * num_channels, interval, next_frame: interval, start_time: Instant::now(), } } } impl BufferSet for NullShmStream { fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { Ok(()) } fn ignore(&mut self) -> GenericResult<()> { Ok(()) } } impl ShmStream for NullShmStream { fn frame_size(&self) -> usize { self.frame_size } fn num_channels(&self) -> usize { self.num_channels } fn frame_rate(&self) -> u32 { self.frame_rate } fn wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult> { let elapsed = self.start_time.elapsed(); if elapsed < self.next_frame { if timeout < self.next_frame - elapsed { std::thread::sleep(timeout); return Ok(None); } else { std::thread::sleep(self.next_frame - elapsed); } } self.next_frame += self.interval; Ok(Some(ServerRequest::new(self.buffer_size, self))) } } /// Source of `NullShmStream` objects. #[derive(Default)] pub struct NullShmStreamSource; impl NullShmStreamSource { pub fn new() -> Self { Self::default() } } impl ShmStreamSource for NullShmStreamSource { fn new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &SharedMemory, _buffer_offsets: [u64; 2], ) -> GenericResult> { let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate); Ok(Box::new(new_stream)) } } #[derive(Clone)] pub struct MockShmStream { num_channels: usize, frame_rate: u32, request_size: usize, frame_size: usize, request_notifier: Arc<(Mutex, Condvar)>, } impl MockShmStream { /// Attempt to create a new MockShmStream with the given number of /// channels, frame_rate, format, and buffer_size. pub fn new( num_channels: usize, frame_rate: u32, format: SampleFormat, buffer_size: usize, ) -> Self { Self { num_channels, frame_rate, request_size: buffer_size, frame_size: format.sample_bytes() * num_channels, request_notifier: Arc::new((Mutex::new(false), Condvar::new())), } } /// Call to request data from the stream, causing it to return from /// `wait_for_next_action_with_timeout`. Will block until /// `set_buffer_offset_and_frames` is called on the ServerRequest returned /// from `wait_for_next_action_with_timeout`, or until `timeout` elapses. /// Returns true if a response was successfully received. pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool { let &(ref lock, ref cvar) = &*self.request_notifier; let mut requested = lock.lock(); *requested = true; cvar.notify_one(); let start_time = Instant::now(); while *requested { requested = cvar.wait_timeout(requested, timeout).0; if start_time.elapsed() > timeout { // We failed to get a callback in time, mark this as false. *requested = false; return false; } } true } fn notify_request(&mut self) { let &(ref lock, ref cvar) = &*self.request_notifier; let mut requested = lock.lock(); *requested = false; cvar.notify_one(); } } impl BufferSet for MockShmStream { fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { self.notify_request(); Ok(()) } fn ignore(&mut self) -> GenericResult<()> { self.notify_request(); Ok(()) } } impl ShmStream for MockShmStream { fn frame_size(&self) -> usize { self.frame_size } fn num_channels(&self) -> usize { self.num_channels } fn frame_rate(&self) -> u32 { self.frame_rate } fn wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult> { { let start_time = Instant::now(); let &(ref lock, ref cvar) = &*self.request_notifier; let mut requested = lock.lock(); while !*requested { requested = cvar.wait_timeout(requested, timeout).0; if start_time.elapsed() > timeout { return Ok(None); } } } Ok(Some(ServerRequest::new(self.request_size, self))) } } /// Source of `MockShmStream` objects. #[derive(Clone, Default)] pub struct MockShmStreamSource { last_stream: Arc<(Mutex>, Condvar)>, } impl MockShmStreamSource { pub fn new() -> Self { Default::default() } /// Get the last stream that has been created from this source. If no stream /// has been created, block until one has. pub fn get_last_stream(&self) -> MockShmStream { let &(ref last_stream, ref cvar) = &*self.last_stream; let mut stream = last_stream.lock(); loop { match &*stream { None => stream = cvar.wait(stream), Some(ref s) => return s.clone(), }; } } } impl ShmStreamSource for MockShmStreamSource { fn new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &SharedMemory, _buffer_offsets: [u64; 2], ) -> GenericResult> { let &(ref last_stream, ref cvar) = &*self.last_stream; let mut stream = last_stream.lock(); let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size); *stream = Some(new_stream.clone()); cvar.notify_one(); Ok(Box::new(new_stream)) } } #[cfg(test)] pub mod tests { use super::*; #[test] fn mock_trigger_callback() { let stream_source = MockShmStreamSource::new(); let mut thread_stream_source = stream_source.clone(); let buffer_size = 480; let num_channels = 2; let format = SampleFormat::S24LE; let shm = SharedMemory::anon().expect("Failed to create shm"); let handle = std::thread::spawn(move || { let mut stream = thread_stream_source .new_stream( StreamDirection::Playback, num_channels, format, 44100, buffer_size, &[], &shm, [400, 8000], ) .expect("Failed to create stream"); let request = stream .wait_for_next_action_with_timeout(Duration::from_secs(5)) .expect("Failed to wait for next action"); match request { Some(r) => { let requested = r.requested_frames(); r.set_buffer_offset_and_frames(872, requested) .expect("Failed to set buffer offset and frames"); requested } None => 0, } }); let mut stream = stream_source.get_last_stream(); assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1))); let requested_frames = handle.join().expect("Failed to join thread"); assert_eq!(requested_frames, buffer_size); } #[test] fn null_consumption_rate() { let frame_rate = 44100; let buffer_size = 480; let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); let shm = SharedMemory::anon().expect("Failed to create shm"); let start = Instant::now(); let mut stream_source = NullShmStreamSource::new(); let mut stream = stream_source .new_stream( StreamDirection::Playback, 2, SampleFormat::S24LE, frame_rate, buffer_size, &[], &shm, [400, 8000], ) .expect("Failed to create stream"); let timeout = Duration::from_secs(5); let request = stream .wait_for_next_action_with_timeout(timeout) .expect("Failed to wait for first request") .expect("First request should not have timed out"); request .set_buffer_offset_and_frames(276, 480) .expect("Failed to set buffer offset and length"); // The second call should block until the first buffer is consumed. let _request = stream .wait_for_next_action_with_timeout(timeout) .expect("Failed to wait for second request"); let elapsed = start.elapsed(); assert!( elapsed > interval, "wait_for_next_action_with_timeout didn't block long enough: {:?}", elapsed ); assert!( elapsed < timeout, "wait_for_next_action_with_timeout blocked for too long: {:?}", elapsed ); } }