1 // Copyright 2019 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #[cfg(unix)] 6 use std::os::unix::io::RawFd; 7 use std::sync::Arc; 8 use std::sync::Condvar; 9 use std::sync::Mutex; 10 use std::time::Duration; 11 use std::time::Instant; 12 13 use remain::sorted; 14 use thiserror::Error; 15 16 use crate::BoxError; 17 use crate::SampleFormat; 18 use crate::StreamDirection; 19 use crate::StreamEffect; 20 21 type GenericResult<T> = std::result::Result<T, BoxError>; 22 23 /// `BufferSet` is used as a callback mechanism for `ServerRequest` objects. 24 /// It is meant to be implemented by the audio stream, allowing arbitrary code 25 /// to be run after a buffer offset and length is set. 26 pub trait BufferSet { 27 /// Called when the client sets a buffer offset and length. 28 /// 29 /// `offset` is the offset within shared memory of the buffer and `frames` 30 /// indicates the number of audio frames that can be read from or written to 31 /// the buffer. callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>32 fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>; 33 34 /// Called when the client ignores a request from the server. ignore(&mut self) -> GenericResult<()>35 fn ignore(&mut self) -> GenericResult<()>; 36 } 37 38 #[sorted] 39 #[derive(Error, Debug)] 40 pub enum Error { 41 #[error("Provided number of frames {0} exceeds requested number of frames {1}")] 42 TooManyFrames(usize, usize), 43 } 44 45 /// `ServerRequest` represents an active request from the server for the client 46 /// to provide a buffer in shared memory to playback from or capture to. 47 pub struct ServerRequest<'a> { 48 requested_frames: usize, 49 buffer_set: &'a mut dyn BufferSet, 50 } 51 52 impl<'a> ServerRequest<'a> { 53 /// Create a new ServerRequest object 54 /// 55 /// Create a ServerRequest object representing a request from the server 56 /// for a buffer `requested_frames` in size. 57 /// 58 /// When the client responds to this request by calling 59 /// [`set_buffer_offset_and_frames`](ServerRequest::set_buffer_offset_and_frames), 60 /// BufferSet::callback will be called on `buffer_set`. 61 /// 62 /// # Arguments 63 /// * `requested_frames` - The requested buffer size in frames. 64 /// * `buffer_set` - The object implementing the callback for when a buffer is provided. new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self65 pub fn new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self { 66 Self { 67 requested_frames, 68 buffer_set, 69 } 70 } 71 72 /// Get the number of frames of audio data requested by the server. 73 /// 74 /// The returned value should never be greater than the `buffer_size` 75 /// given in [`new_stream`](ShmStreamSource::new_stream). requested_frames(&self) -> usize76 pub fn requested_frames(&self) -> usize { 77 self.requested_frames 78 } 79 80 /// Sets the buffer offset and length for the requested buffer. 81 /// 82 /// Sets the buffer offset and length of the buffer that fulfills this 83 /// server request to `offset` and `length`, respectively. This means that 84 /// `length` bytes of audio samples may be read from/written to that 85 /// location in `client_shm` for a playback/capture stream, respectively. 86 /// This function may only be called once for a `ServerRequest`, at which 87 /// point the ServerRequest is dropped and no further calls are possible. 88 /// 89 /// # Arguments 90 /// 91 /// * `offset` - The value to use as the new buffer offset for the next buffer. 92 /// * `frames` - The length of the next buffer in frames. 93 /// 94 /// # Errors 95 /// 96 /// * If `frames` is greater than `requested_frames`. set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()>97 pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> { 98 if frames > self.requested_frames { 99 return Err(Box::new(Error::TooManyFrames( 100 frames, 101 self.requested_frames, 102 ))); 103 } 104 105 self.buffer_set.callback(offset, frames) 106 } 107 108 /// Ignore this request 109 /// 110 /// If the client does not intend to respond to this ServerRequest with a 111 /// buffer, they should call this function. The stream will be notified that 112 /// the request has been ignored and will handle it properly. ignore_request(self) -> GenericResult<()>113 pub fn ignore_request(self) -> GenericResult<()> { 114 self.buffer_set.ignore() 115 } 116 } 117 118 /// `ShmStream` allows a client to interact with an active CRAS stream. 119 pub trait ShmStream: Send { 120 /// Get the size of a frame of audio data for this stream. frame_size(&self) -> usize121 fn frame_size(&self) -> usize; 122 123 /// Get the number of channels of audio data for this stream. num_channels(&self) -> usize124 fn num_channels(&self) -> usize; 125 126 /// Get the frame rate of audio data for this stream. frame_rate(&self) -> u32127 fn frame_rate(&self) -> u32; 128 129 /// Waits until the next server message indicating action is required. 130 /// 131 /// For playback streams, this will be `AUDIO_MESSAGE_REQUEST_DATA`, meaning 132 /// that we must set the buffer offset to the next location where playback 133 /// data can be found. 134 /// For capture streams, this will be `AUDIO_MESSAGE_DATA_READY`, meaning 135 /// that we must set the buffer offset to the next location where captured 136 /// data can be written to. 137 /// Will return early if `timeout` elapses before a message is received. 138 /// 139 /// # Arguments 140 /// 141 /// * `timeout` - The amount of time to wait until a message is received. 142 /// 143 /// # Return value 144 /// 145 /// Returns `Some(request)` where `request` is an object that implements the 146 /// [`ServerRequest`](ServerRequest) trait and which can be used to get the 147 /// number of bytes requested for playback streams or that have already been 148 /// written to shm for capture streams. 149 /// 150 /// If the timeout occurs before a message is received, returns `None`. 151 /// 152 /// # Errors 153 /// 154 /// * If an invalid message type is received for the stream. wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>155 fn wait_for_next_action_with_timeout( 156 &mut self, 157 timeout: Duration, 158 ) -> GenericResult<Option<ServerRequest>>; 159 } 160 161 /// `SharedMemory` specifies features of shared memory areas passed on to `ShmStreamSource`. 162 pub trait SharedMemory { 163 type Error: std::error::Error; 164 165 /// Creates a new shared memory file descriptor without specifying a name. anon(size: u64) -> Result<Self, Self::Error> where Self: Sized166 fn anon(size: u64) -> Result<Self, Self::Error> 167 where 168 Self: Sized; 169 170 /// Gets the size in bytes of the shared memory. 171 /// 172 /// The size returned here does not reflect changes by other interfaces or users of the shared 173 /// memory file descriptor.. size(&self) -> u64174 fn size(&self) -> u64; 175 176 /// Returns the underlying raw fd. 177 #[cfg(unix)] as_raw_fd(&self) -> RawFd178 fn as_raw_fd(&self) -> RawFd; 179 } 180 181 /// `ShmStreamSource` creates streams for playback or capture of audio. 182 pub trait ShmStreamSource<E: std::error::Error>: Send { 183 /// Creates a new [`ShmStream`](ShmStream) 184 /// 185 /// Creates a new `ShmStream` object, which allows: 186 /// * Waiting until the server has communicated that data is ready or 187 /// requested that we make more data available. 188 /// * Setting the location and length of buffers for reading/writing audio data. 189 /// 190 /// # Arguments 191 /// 192 /// * `direction` - The direction of the stream, either `Playback` or `Capture`. 193 /// * `num_channels` - The number of audio channels for the stream. 194 /// * `format` - The audio format to use for audio samples. 195 /// * `frame_rate` - The stream's frame rate in Hz. 196 /// * `buffer_size` - The maximum size of an audio buffer. This will be the 197 /// size used for transfers of audio data between client 198 /// and server. 199 /// * `effects` - Audio effects to use for the stream, such as echo-cancellation. 200 /// * `client_shm` - The shared memory area that will contain samples. 201 /// * `buffer_offsets` - The two initial values to use as buffer offsets 202 /// for streams. This way, the server will not write 203 /// audio data to an arbitrary offset in `client_shm` 204 /// if the client fails to update offsets in time. 205 /// 206 /// # Errors 207 /// 208 /// * If sending the connect stream message to the server fails. 209 #[allow(clippy::too_many_arguments)] new_stream( &mut self, direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, effects: &[StreamEffect], client_shm: &dyn SharedMemory<Error = E>, buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>210 fn new_stream( 211 &mut self, 212 direction: StreamDirection, 213 num_channels: usize, 214 format: SampleFormat, 215 frame_rate: u32, 216 buffer_size: usize, 217 effects: &[StreamEffect], 218 client_shm: &dyn SharedMemory<Error = E>, 219 buffer_offsets: [u64; 2], 220 ) -> GenericResult<Box<dyn ShmStream>>; 221 222 /// Get a list of file descriptors used by the implementation. 223 /// 224 /// Returns any open file descriptors needed by the implementation. 225 /// This list helps users of the ShmStreamSource enter Linux jails without 226 /// closing needed file descriptors. 227 #[cfg(unix)] keep_fds(&self) -> Vec<RawFd>228 fn keep_fds(&self) -> Vec<RawFd> { 229 Vec::new() 230 } 231 } 232 233 /// Class that implements ShmStream trait but does nothing with the samples 234 pub struct NullShmStream { 235 num_channels: usize, 236 frame_rate: u32, 237 buffer_size: usize, 238 frame_size: usize, 239 interval: Duration, 240 next_frame: Duration, 241 start_time: Instant, 242 } 243 244 impl NullShmStream { 245 /// Attempt to create a new NullShmStream with the given number of channels, 246 /// format, frame_rate, and buffer_size. new( buffer_size: usize, num_channels: usize, format: SampleFormat, frame_rate: u32, ) -> Self247 pub fn new( 248 buffer_size: usize, 249 num_channels: usize, 250 format: SampleFormat, 251 frame_rate: u32, 252 ) -> Self { 253 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); 254 Self { 255 num_channels, 256 frame_rate, 257 buffer_size, 258 frame_size: format.sample_bytes() * num_channels, 259 interval, 260 next_frame: interval, 261 start_time: Instant::now(), 262 } 263 } 264 } 265 266 impl BufferSet for NullShmStream { callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>267 fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { 268 Ok(()) 269 } 270 ignore(&mut self) -> GenericResult<()>271 fn ignore(&mut self) -> GenericResult<()> { 272 Ok(()) 273 } 274 } 275 276 impl ShmStream for NullShmStream { frame_size(&self) -> usize277 fn frame_size(&self) -> usize { 278 self.frame_size 279 } 280 num_channels(&self) -> usize281 fn num_channels(&self) -> usize { 282 self.num_channels 283 } 284 frame_rate(&self) -> u32285 fn frame_rate(&self) -> u32 { 286 self.frame_rate 287 } 288 wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>289 fn wait_for_next_action_with_timeout( 290 &mut self, 291 timeout: Duration, 292 ) -> GenericResult<Option<ServerRequest>> { 293 let elapsed = self.start_time.elapsed(); 294 if elapsed < self.next_frame { 295 if timeout < self.next_frame - elapsed { 296 std::thread::sleep(timeout); 297 return Ok(None); 298 } else { 299 std::thread::sleep(self.next_frame - elapsed); 300 } 301 } 302 self.next_frame += self.interval; 303 Ok(Some(ServerRequest::new(self.buffer_size, self))) 304 } 305 } 306 307 /// Source of `NullShmStream` objects. 308 #[derive(Default)] 309 pub struct NullShmStreamSource; 310 311 impl NullShmStreamSource { new() -> Self312 pub fn new() -> Self { 313 Self::default() 314 } 315 } 316 317 impl<E: std::error::Error> ShmStreamSource<E> for NullShmStreamSource { new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &dyn SharedMemory<Error = E>, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>318 fn new_stream( 319 &mut self, 320 _direction: StreamDirection, 321 num_channels: usize, 322 format: SampleFormat, 323 frame_rate: u32, 324 buffer_size: usize, 325 _effects: &[StreamEffect], 326 _client_shm: &dyn SharedMemory<Error = E>, 327 _buffer_offsets: [u64; 2], 328 ) -> GenericResult<Box<dyn ShmStream>> { 329 let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate); 330 Ok(Box::new(new_stream)) 331 } 332 } 333 334 #[derive(Clone)] 335 pub struct MockShmStream { 336 num_channels: usize, 337 frame_rate: u32, 338 request_size: usize, 339 frame_size: usize, 340 request_notifier: Arc<(Mutex<bool>, Condvar)>, 341 } 342 343 impl MockShmStream { 344 /// Attempt to create a new MockShmStream with the given number of 345 /// channels, frame_rate, format, and buffer_size. new( num_channels: usize, frame_rate: u32, format: SampleFormat, buffer_size: usize, ) -> Self346 pub fn new( 347 num_channels: usize, 348 frame_rate: u32, 349 format: SampleFormat, 350 buffer_size: usize, 351 ) -> Self { 352 #[allow(clippy::mutex_atomic)] 353 Self { 354 num_channels, 355 frame_rate, 356 request_size: buffer_size, 357 frame_size: format.sample_bytes() * num_channels, 358 request_notifier: Arc::new((Mutex::new(false), Condvar::new())), 359 } 360 } 361 362 /// Call to request data from the stream, causing it to return from 363 /// `wait_for_next_action_with_timeout`. Will block until 364 /// `set_buffer_offset_and_frames` is called on the ServerRequest returned 365 /// from `wait_for_next_action_with_timeout`, or until `timeout` elapses. 366 /// Returns true if a response was successfully received. trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool367 pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool { 368 let (lock, cvar) = &*self.request_notifier; 369 let mut requested = lock.lock().unwrap(); 370 *requested = true; 371 cvar.notify_one(); 372 let start_time = Instant::now(); 373 while *requested { 374 requested = cvar.wait_timeout(requested, timeout).unwrap().0; 375 if start_time.elapsed() > timeout { 376 // We failed to get a callback in time, mark this as false. 377 *requested = false; 378 return false; 379 } 380 } 381 382 true 383 } 384 notify_request(&mut self)385 fn notify_request(&mut self) { 386 let (lock, cvar) = &*self.request_notifier; 387 let mut requested = lock.lock().unwrap(); 388 *requested = false; 389 cvar.notify_one(); 390 } 391 } 392 393 impl BufferSet for MockShmStream { callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>394 fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { 395 self.notify_request(); 396 Ok(()) 397 } 398 ignore(&mut self) -> GenericResult<()>399 fn ignore(&mut self) -> GenericResult<()> { 400 self.notify_request(); 401 Ok(()) 402 } 403 } 404 405 impl ShmStream for MockShmStream { frame_size(&self) -> usize406 fn frame_size(&self) -> usize { 407 self.frame_size 408 } 409 num_channels(&self) -> usize410 fn num_channels(&self) -> usize { 411 self.num_channels 412 } 413 frame_rate(&self) -> u32414 fn frame_rate(&self) -> u32 { 415 self.frame_rate 416 } 417 wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>418 fn wait_for_next_action_with_timeout( 419 &mut self, 420 timeout: Duration, 421 ) -> GenericResult<Option<ServerRequest>> { 422 { 423 let start_time = Instant::now(); 424 let (lock, cvar) = &*self.request_notifier; 425 let mut requested = lock.lock().unwrap(); 426 while !*requested { 427 requested = cvar.wait_timeout(requested, timeout).unwrap().0; 428 if start_time.elapsed() > timeout { 429 return Ok(None); 430 } 431 } 432 } 433 434 Ok(Some(ServerRequest::new(self.request_size, self))) 435 } 436 } 437 438 /// Source of `MockShmStream` objects. 439 #[derive(Clone, Default)] 440 pub struct MockShmStreamSource { 441 last_stream: Arc<(Mutex<Option<MockShmStream>>, Condvar)>, 442 } 443 444 impl MockShmStreamSource { new() -> Self445 pub fn new() -> Self { 446 Default::default() 447 } 448 449 /// Get the last stream that has been created from this source. If no stream 450 /// has been created, block until one has. get_last_stream(&self) -> MockShmStream451 pub fn get_last_stream(&self) -> MockShmStream { 452 let (last_stream, cvar) = &*self.last_stream; 453 let mut stream = last_stream.lock().unwrap(); 454 loop { 455 match &*stream { 456 None => stream = cvar.wait(stream).unwrap(), 457 Some(ref s) => return s.clone(), 458 }; 459 } 460 } 461 } 462 463 impl<E: std::error::Error> ShmStreamSource<E> for MockShmStreamSource { new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &dyn SharedMemory<Error = E>, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>464 fn new_stream( 465 &mut self, 466 _direction: StreamDirection, 467 num_channels: usize, 468 format: SampleFormat, 469 frame_rate: u32, 470 buffer_size: usize, 471 _effects: &[StreamEffect], 472 _client_shm: &dyn SharedMemory<Error = E>, 473 _buffer_offsets: [u64; 2], 474 ) -> GenericResult<Box<dyn ShmStream>> { 475 let (last_stream, cvar) = &*self.last_stream; 476 let mut stream = last_stream.lock().unwrap(); 477 478 let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size); 479 *stream = Some(new_stream.clone()); 480 cvar.notify_one(); 481 Ok(Box::new(new_stream)) 482 } 483 } 484 485 // Tests that run only for Unix, where `base::SharedMemory` is used. 486 #[cfg(all(test, unix))] 487 pub mod tests { 488 use super::*; 489 490 struct MockSharedMemory {} 491 492 impl SharedMemory for MockSharedMemory { 493 type Error = super::Error; 494 anon(_: u64) -> Result<Self, Self::Error>495 fn anon(_: u64) -> Result<Self, Self::Error> { 496 Ok(MockSharedMemory {}) 497 } 498 size(&self) -> u64499 fn size(&self) -> u64 { 500 0 501 } 502 503 #[cfg(unix)] as_raw_fd(&self) -> RawFd504 fn as_raw_fd(&self) -> RawFd { 505 0 506 } 507 } 508 509 #[test] mock_trigger_callback()510 fn mock_trigger_callback() { 511 let stream_source = MockShmStreamSource::new(); 512 let mut thread_stream_source = stream_source.clone(); 513 514 let buffer_size = 480; 515 let num_channels = 2; 516 let format = SampleFormat::S24LE; 517 let shm = MockSharedMemory {}; 518 519 let handle = std::thread::spawn(move || { 520 let mut stream = thread_stream_source 521 .new_stream( 522 StreamDirection::Playback, 523 num_channels, 524 format, 525 44100, 526 buffer_size, 527 &[], 528 &shm, 529 [400, 8000], 530 ) 531 .expect("Failed to create stream"); 532 533 let request = stream 534 .wait_for_next_action_with_timeout(Duration::from_secs(5)) 535 .expect("Failed to wait for next action"); 536 match request { 537 Some(r) => { 538 let requested = r.requested_frames(); 539 r.set_buffer_offset_and_frames(872, requested) 540 .expect("Failed to set buffer offset and frames"); 541 requested 542 } 543 None => 0, 544 } 545 }); 546 547 let mut stream = stream_source.get_last_stream(); 548 assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1))); 549 550 let requested_frames = handle.join().expect("Failed to join thread"); 551 assert_eq!(requested_frames, buffer_size); 552 } 553 554 #[test] null_consumption_rate()555 fn null_consumption_rate() { 556 let frame_rate = 44100; 557 let buffer_size = 480; 558 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); 559 560 let shm = MockSharedMemory {}; 561 562 let start = Instant::now(); 563 564 let mut stream_source = NullShmStreamSource::new(); 565 let mut stream = stream_source 566 .new_stream( 567 StreamDirection::Playback, 568 2, 569 SampleFormat::S24LE, 570 frame_rate, 571 buffer_size, 572 &[], 573 &shm, 574 [400, 8000], 575 ) 576 .expect("Failed to create stream"); 577 578 let timeout = Duration::from_secs(5); 579 let request = stream 580 .wait_for_next_action_with_timeout(timeout) 581 .expect("Failed to wait for first request") 582 .expect("First request should not have timed out"); 583 request 584 .set_buffer_offset_and_frames(276, 480) 585 .expect("Failed to set buffer offset and length"); 586 587 // The second call should block until the first buffer is consumed. 588 let _request = stream 589 .wait_for_next_action_with_timeout(timeout) 590 .expect("Failed to wait for second request"); 591 let elapsed = start.elapsed(); 592 assert!( 593 elapsed > interval, 594 "wait_for_next_action_with_timeout didn't block long enough: {:?}", 595 elapsed 596 ); 597 598 assert!( 599 elapsed < timeout, 600 "wait_for_next_action_with_timeout blocked for too long: {:?}", 601 elapsed 602 ); 603 } 604 } 605