1 // Copyright 2019 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #[cfg(unix)] 6 use std::os::unix::io::RawFd; 7 use std::sync::Arc; 8 use std::time::{Duration, Instant}; 9 10 use remain::sorted; 11 use sync::{Condvar, Mutex}; 12 use thiserror::Error; 13 14 use crate::{BoxError, SampleFormat, StreamDirection, StreamEffect}; 15 16 type GenericResult<T> = std::result::Result<T, BoxError>; 17 18 /// `BufferSet` is used as a callback mechanism for `ServerRequest` objects. 19 /// It is meant to be implemented by the audio stream, allowing arbitrary code 20 /// to be run after a buffer offset and length is set. 21 pub trait BufferSet { 22 /// Called when the client sets a buffer offset and length. 23 /// 24 /// `offset` is the offset within shared memory of the buffer and `frames` 25 /// indicates the number of audio frames that can be read from or written to 26 /// the buffer. callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>27 fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>; 28 29 /// Called when the client ignores a request from the server. ignore(&mut self) -> GenericResult<()>30 fn ignore(&mut self) -> GenericResult<()>; 31 } 32 33 #[sorted] 34 #[derive(Error, Debug)] 35 pub enum Error { 36 #[error("Provided number of frames {0} exceeds requested number of frames {1}")] 37 TooManyFrames(usize, usize), 38 } 39 40 /// `ServerRequest` represents an active request from the server for the client 41 /// to provide a buffer in shared memory to playback from or capture to. 42 pub struct ServerRequest<'a> { 43 requested_frames: usize, 44 buffer_set: &'a mut dyn BufferSet, 45 } 46 47 impl<'a> ServerRequest<'a> { 48 /// Create a new ServerRequest object 49 /// 50 /// Create a ServerRequest object representing a request from the server 51 /// for a buffer `requested_frames` in size. 52 /// 53 /// When the client responds to this request by calling 54 /// [`set_buffer_offset_and_frames`](ServerRequest::set_buffer_offset_and_frames), 55 /// BufferSet::callback will be called on `buffer_set`. 56 /// 57 /// # Arguments 58 /// * `requested_frames` - The requested buffer size in frames. 59 /// * `buffer_set` - The object implementing the callback for when a buffer is provided. new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self60 pub fn new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self { 61 Self { 62 requested_frames, 63 buffer_set, 64 } 65 } 66 67 /// Get the number of frames of audio data requested by the server. 68 /// 69 /// The returned value should never be greater than the `buffer_size` 70 /// given in [`new_stream`](ShmStreamSource::new_stream). requested_frames(&self) -> usize71 pub fn requested_frames(&self) -> usize { 72 self.requested_frames 73 } 74 75 /// Sets the buffer offset and length for the requested buffer. 76 /// 77 /// Sets the buffer offset and length of the buffer that fulfills this 78 /// server request to `offset` and `length`, respectively. This means that 79 /// `length` bytes of audio samples may be read from/written to that 80 /// location in `client_shm` for a playback/capture stream, respectively. 81 /// This function may only be called once for a `ServerRequest`, at which 82 /// point the ServerRequest is dropped and no further calls are possible. 83 /// 84 /// # Arguments 85 /// 86 /// * `offset` - The value to use as the new buffer offset for the next buffer. 87 /// * `frames` - The length of the next buffer in frames. 88 /// 89 /// # Errors 90 /// 91 /// * If `frames` is greater than `requested_frames`. set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()>92 pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> { 93 if frames > self.requested_frames { 94 return Err(Box::new(Error::TooManyFrames( 95 frames, 96 self.requested_frames, 97 ))); 98 } 99 100 self.buffer_set.callback(offset, frames) 101 } 102 103 /// Ignore this request 104 /// 105 /// If the client does not intend to respond to this ServerRequest with a 106 /// buffer, they should call this function. The stream will be notified that 107 /// the request has been ignored and will handle it properly. ignore_request(self) -> GenericResult<()>108 pub fn ignore_request(self) -> GenericResult<()> { 109 self.buffer_set.ignore() 110 } 111 } 112 113 /// `ShmStream` allows a client to interact with an active CRAS stream. 114 pub trait ShmStream: Send { 115 /// Get the size of a frame of audio data for this stream. frame_size(&self) -> usize116 fn frame_size(&self) -> usize; 117 118 /// Get the number of channels of audio data for this stream. num_channels(&self) -> usize119 fn num_channels(&self) -> usize; 120 121 /// Get the frame rate of audio data for this stream. frame_rate(&self) -> u32122 fn frame_rate(&self) -> u32; 123 124 /// Waits until the next server message indicating action is required. 125 /// 126 /// For playback streams, this will be `AUDIO_MESSAGE_REQUEST_DATA`, meaning 127 /// that we must set the buffer offset to the next location where playback 128 /// data can be found. 129 /// For capture streams, this will be `AUDIO_MESSAGE_DATA_READY`, meaning 130 /// that we must set the buffer offset to the next location where captured 131 /// data can be written to. 132 /// Will return early if `timeout` elapses before a message is received. 133 /// 134 /// # Arguments 135 /// 136 /// * `timeout` - The amount of time to wait until a message is received. 137 /// 138 /// # Return value 139 /// 140 /// Returns `Some(request)` where `request` is an object that implements the 141 /// [`ServerRequest`](ServerRequest) trait and which can be used to get the 142 /// number of bytes requested for playback streams or that have already been 143 /// written to shm for capture streams. 144 /// 145 /// If the timeout occurs before a message is received, returns `None`. 146 /// 147 /// # Errors 148 /// 149 /// * If an invalid message type is received for the stream. wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>150 fn wait_for_next_action_with_timeout( 151 &mut self, 152 timeout: Duration, 153 ) -> GenericResult<Option<ServerRequest>>; 154 } 155 156 /// `SharedMemory` specifies features of shared memory areas passed on to `ShmStreamSource`. 157 pub trait SharedMemory { 158 type Error: std::error::Error; 159 160 /// Creates a new shared memory file descriptor without specifying a name. anon(size: u64) -> Result<Self, Self::Error> where Self: Sized161 fn anon(size: u64) -> Result<Self, Self::Error> 162 where 163 Self: Sized; 164 165 /// Gets the size in bytes of the shared memory. 166 /// 167 /// The size returned here does not reflect changes by other interfaces or users of the shared 168 /// memory file descriptor.. size(&self) -> u64169 fn size(&self) -> u64; 170 171 /// Returns the underlying raw fd. 172 #[cfg(unix)] as_raw_fd(&self) -> RawFd173 fn as_raw_fd(&self) -> RawFd; 174 } 175 176 /// `ShmStreamSource` creates streams for playback or capture of audio. 177 pub trait ShmStreamSource<E: std::error::Error>: Send { 178 /// Creates a new [`ShmStream`](ShmStream) 179 /// 180 /// Creates a new `ShmStream` object, which allows: 181 /// * Waiting until the server has communicated that data is ready or 182 /// requested that we make more data available. 183 /// * Setting the location and length of buffers for reading/writing audio data. 184 /// 185 /// # Arguments 186 /// 187 /// * `direction` - The direction of the stream, either `Playback` or `Capture`. 188 /// * `num_channels` - The number of audio channels for the stream. 189 /// * `format` - The audio format to use for audio samples. 190 /// * `frame_rate` - The stream's frame rate in Hz. 191 /// * `buffer_size` - The maximum size of an audio buffer. This will be the 192 /// size used for transfers of audio data between client 193 /// and server. 194 /// * `effects` - Audio effects to use for the stream, such as echo-cancellation. 195 /// * `client_shm` - The shared memory area that will contain samples. 196 /// * `buffer_offsets` - The two initial values to use as buffer offsets 197 /// for streams. This way, the server will not write 198 /// audio data to an arbitrary offset in `client_shm` 199 /// if the client fails to update offsets in time. 200 /// 201 /// # Errors 202 /// 203 /// * If sending the connect stream message to the server fails. 204 #[allow(clippy::too_many_arguments)] new_stream( &mut self, direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, effects: &[StreamEffect], client_shm: &dyn SharedMemory<Error = E>, buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>205 fn new_stream( 206 &mut self, 207 direction: StreamDirection, 208 num_channels: usize, 209 format: SampleFormat, 210 frame_rate: u32, 211 buffer_size: usize, 212 effects: &[StreamEffect], 213 client_shm: &dyn SharedMemory<Error = E>, 214 buffer_offsets: [u64; 2], 215 ) -> GenericResult<Box<dyn ShmStream>>; 216 217 /// Get a list of file descriptors used by the implementation. 218 /// 219 /// Returns any open file descriptors needed by the implementation. 220 /// This list helps users of the ShmStreamSource enter Linux jails without 221 /// closing needed file descriptors. 222 #[cfg(unix)] keep_fds(&self) -> Vec<RawFd>223 fn keep_fds(&self) -> Vec<RawFd> { 224 Vec::new() 225 } 226 } 227 228 /// Class that implements ShmStream trait but does nothing with the samples 229 pub struct NullShmStream { 230 num_channels: usize, 231 frame_rate: u32, 232 buffer_size: usize, 233 frame_size: usize, 234 interval: Duration, 235 next_frame: Duration, 236 start_time: Instant, 237 } 238 239 impl NullShmStream { 240 /// Attempt to create a new NullShmStream with the given number of channels, 241 /// format, frame_rate, and buffer_size. new( buffer_size: usize, num_channels: usize, format: SampleFormat, frame_rate: u32, ) -> Self242 pub fn new( 243 buffer_size: usize, 244 num_channels: usize, 245 format: SampleFormat, 246 frame_rate: u32, 247 ) -> Self { 248 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); 249 Self { 250 num_channels, 251 frame_rate, 252 buffer_size, 253 frame_size: format.sample_bytes() * num_channels, 254 interval, 255 next_frame: interval, 256 start_time: Instant::now(), 257 } 258 } 259 } 260 261 impl BufferSet for NullShmStream { callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>262 fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { 263 Ok(()) 264 } 265 ignore(&mut self) -> GenericResult<()>266 fn ignore(&mut self) -> GenericResult<()> { 267 Ok(()) 268 } 269 } 270 271 impl ShmStream for NullShmStream { frame_size(&self) -> usize272 fn frame_size(&self) -> usize { 273 self.frame_size 274 } 275 num_channels(&self) -> usize276 fn num_channels(&self) -> usize { 277 self.num_channels 278 } 279 frame_rate(&self) -> u32280 fn frame_rate(&self) -> u32 { 281 self.frame_rate 282 } 283 wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>284 fn wait_for_next_action_with_timeout( 285 &mut self, 286 timeout: Duration, 287 ) -> GenericResult<Option<ServerRequest>> { 288 let elapsed = self.start_time.elapsed(); 289 if elapsed < self.next_frame { 290 if timeout < self.next_frame - elapsed { 291 std::thread::sleep(timeout); 292 return Ok(None); 293 } else { 294 std::thread::sleep(self.next_frame - elapsed); 295 } 296 } 297 self.next_frame += self.interval; 298 Ok(Some(ServerRequest::new(self.buffer_size, self))) 299 } 300 } 301 302 /// Source of `NullShmStream` objects. 303 #[derive(Default)] 304 pub struct NullShmStreamSource; 305 306 impl NullShmStreamSource { new() -> Self307 pub fn new() -> Self { 308 Self::default() 309 } 310 } 311 312 impl<E: std::error::Error> ShmStreamSource<E> for NullShmStreamSource { new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &dyn SharedMemory<Error = E>, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>313 fn new_stream( 314 &mut self, 315 _direction: StreamDirection, 316 num_channels: usize, 317 format: SampleFormat, 318 frame_rate: u32, 319 buffer_size: usize, 320 _effects: &[StreamEffect], 321 _client_shm: &dyn SharedMemory<Error = E>, 322 _buffer_offsets: [u64; 2], 323 ) -> GenericResult<Box<dyn ShmStream>> { 324 let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate); 325 Ok(Box::new(new_stream)) 326 } 327 } 328 329 #[derive(Clone)] 330 pub struct MockShmStream { 331 num_channels: usize, 332 frame_rate: u32, 333 request_size: usize, 334 frame_size: usize, 335 request_notifier: Arc<(Mutex<bool>, Condvar)>, 336 } 337 338 impl MockShmStream { 339 /// Attempt to create a new MockShmStream with the given number of 340 /// channels, frame_rate, format, and buffer_size. new( num_channels: usize, frame_rate: u32, format: SampleFormat, buffer_size: usize, ) -> Self341 pub fn new( 342 num_channels: usize, 343 frame_rate: u32, 344 format: SampleFormat, 345 buffer_size: usize, 346 ) -> Self { 347 Self { 348 num_channels, 349 frame_rate, 350 request_size: buffer_size, 351 frame_size: format.sample_bytes() * num_channels, 352 request_notifier: Arc::new((Mutex::new(false), Condvar::new())), 353 } 354 } 355 356 /// Call to request data from the stream, causing it to return from 357 /// `wait_for_next_action_with_timeout`. Will block until 358 /// `set_buffer_offset_and_frames` is called on the ServerRequest returned 359 /// from `wait_for_next_action_with_timeout`, or until `timeout` elapses. 360 /// Returns true if a response was successfully received. trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool361 pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool { 362 let &(ref lock, ref cvar) = &*self.request_notifier; 363 let mut requested = lock.lock(); 364 *requested = true; 365 cvar.notify_one(); 366 let start_time = Instant::now(); 367 while *requested { 368 requested = cvar.wait_timeout(requested, timeout).0; 369 if start_time.elapsed() > timeout { 370 // We failed to get a callback in time, mark this as false. 371 *requested = false; 372 return false; 373 } 374 } 375 376 true 377 } 378 notify_request(&mut self)379 fn notify_request(&mut self) { 380 let &(ref lock, ref cvar) = &*self.request_notifier; 381 let mut requested = lock.lock(); 382 *requested = false; 383 cvar.notify_one(); 384 } 385 } 386 387 impl BufferSet for MockShmStream { callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>388 fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { 389 self.notify_request(); 390 Ok(()) 391 } 392 ignore(&mut self) -> GenericResult<()>393 fn ignore(&mut self) -> GenericResult<()> { 394 self.notify_request(); 395 Ok(()) 396 } 397 } 398 399 impl ShmStream for MockShmStream { frame_size(&self) -> usize400 fn frame_size(&self) -> usize { 401 self.frame_size 402 } 403 num_channels(&self) -> usize404 fn num_channels(&self) -> usize { 405 self.num_channels 406 } 407 frame_rate(&self) -> u32408 fn frame_rate(&self) -> u32 { 409 self.frame_rate 410 } 411 wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>412 fn wait_for_next_action_with_timeout( 413 &mut self, 414 timeout: Duration, 415 ) -> GenericResult<Option<ServerRequest>> { 416 { 417 let start_time = Instant::now(); 418 let &(ref lock, ref cvar) = &*self.request_notifier; 419 let mut requested = lock.lock(); 420 while !*requested { 421 requested = cvar.wait_timeout(requested, timeout).0; 422 if start_time.elapsed() > timeout { 423 return Ok(None); 424 } 425 } 426 } 427 428 Ok(Some(ServerRequest::new(self.request_size, self))) 429 } 430 } 431 432 /// Source of `MockShmStream` objects. 433 #[derive(Clone, Default)] 434 pub struct MockShmStreamSource { 435 last_stream: Arc<(Mutex<Option<MockShmStream>>, Condvar)>, 436 } 437 438 impl MockShmStreamSource { new() -> Self439 pub fn new() -> Self { 440 Default::default() 441 } 442 443 /// Get the last stream that has been created from this source. If no stream 444 /// has been created, block until one has. get_last_stream(&self) -> MockShmStream445 pub fn get_last_stream(&self) -> MockShmStream { 446 let &(ref last_stream, ref cvar) = &*self.last_stream; 447 let mut stream = last_stream.lock(); 448 loop { 449 match &*stream { 450 None => stream = cvar.wait(stream), 451 Some(ref s) => return s.clone(), 452 }; 453 } 454 } 455 } 456 457 impl<E: std::error::Error> ShmStreamSource<E> for MockShmStreamSource { new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &dyn SharedMemory<Error = E>, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>458 fn new_stream( 459 &mut self, 460 _direction: StreamDirection, 461 num_channels: usize, 462 format: SampleFormat, 463 frame_rate: u32, 464 buffer_size: usize, 465 _effects: &[StreamEffect], 466 _client_shm: &dyn SharedMemory<Error = E>, 467 _buffer_offsets: [u64; 2], 468 ) -> GenericResult<Box<dyn ShmStream>> { 469 let &(ref last_stream, ref cvar) = &*self.last_stream; 470 let mut stream = last_stream.lock(); 471 472 let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size); 473 *stream = Some(new_stream.clone()); 474 cvar.notify_one(); 475 Ok(Box::new(new_stream)) 476 } 477 } 478 479 // Tests that run only for Unix, where `sys_util::SharedMemory` is used. 480 #[cfg(all(test, unix))] 481 pub mod tests { 482 use super::*; 483 484 use std::os::unix::io::AsRawFd; 485 use sys_util::SharedMemory as SysSharedMemory; 486 487 impl SharedMemory for SysSharedMemory { 488 type Error = sys_util::Error; 489 anon(_: u64) -> Result<Self, Self::Error>490 fn anon(_: u64) -> Result<Self, Self::Error> { 491 SysSharedMemory::anon() 492 } 493 size(&self) -> u64494 fn size(&self) -> u64 { 495 self.size() 496 } 497 498 #[cfg(unix)] as_raw_fd(&self) -> RawFd499 fn as_raw_fd(&self) -> RawFd { 500 AsRawFd::as_raw_fd(self) 501 } 502 } 503 504 #[test] mock_trigger_callback()505 fn mock_trigger_callback() { 506 let stream_source = MockShmStreamSource::new(); 507 let mut thread_stream_source = stream_source.clone(); 508 509 let buffer_size = 480; 510 let num_channels = 2; 511 let format = SampleFormat::S24LE; 512 let shm = SysSharedMemory::anon().expect("Failed to create shm"); 513 514 let handle = std::thread::spawn(move || { 515 let mut stream = thread_stream_source 516 .new_stream( 517 StreamDirection::Playback, 518 num_channels, 519 format, 520 44100, 521 buffer_size, 522 &[], 523 &shm, 524 [400, 8000], 525 ) 526 .expect("Failed to create stream"); 527 528 let request = stream 529 .wait_for_next_action_with_timeout(Duration::from_secs(5)) 530 .expect("Failed to wait for next action"); 531 match request { 532 Some(r) => { 533 let requested = r.requested_frames(); 534 r.set_buffer_offset_and_frames(872, requested) 535 .expect("Failed to set buffer offset and frames"); 536 requested 537 } 538 None => 0, 539 } 540 }); 541 542 let mut stream = stream_source.get_last_stream(); 543 assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1))); 544 545 let requested_frames = handle.join().expect("Failed to join thread"); 546 assert_eq!(requested_frames, buffer_size); 547 } 548 549 #[test] null_consumption_rate()550 fn null_consumption_rate() { 551 let frame_rate = 44100; 552 let buffer_size = 480; 553 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); 554 555 let shm = SysSharedMemory::anon().expect("Failed to create shm"); 556 557 let start = Instant::now(); 558 559 let mut stream_source = NullShmStreamSource::new(); 560 let mut stream = stream_source 561 .new_stream( 562 StreamDirection::Playback, 563 2, 564 SampleFormat::S24LE, 565 frame_rate, 566 buffer_size, 567 &[], 568 &shm, 569 [400, 8000], 570 ) 571 .expect("Failed to create stream"); 572 573 let timeout = Duration::from_secs(5); 574 let request = stream 575 .wait_for_next_action_with_timeout(timeout) 576 .expect("Failed to wait for first request") 577 .expect("First request should not have timed out"); 578 request 579 .set_buffer_offset_and_frames(276, 480) 580 .expect("Failed to set buffer offset and length"); 581 582 // The second call should block until the first buffer is consumed. 583 let _request = stream 584 .wait_for_next_action_with_timeout(timeout) 585 .expect("Failed to wait for second request"); 586 let elapsed = start.elapsed(); 587 assert!( 588 elapsed > interval, 589 "wait_for_next_action_with_timeout didn't block long enough: {:?}", 590 elapsed 591 ); 592 593 assert!( 594 elapsed < timeout, 595 "wait_for_next_action_with_timeout blocked for too long: {:?}", 596 elapsed 597 ); 598 } 599 } 600