1 // Copyright 2019 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 use std::error; 5 use std::fmt; 6 use std::os::unix::io::RawFd; 7 use std::sync::Arc; 8 use std::time::{Duration, Instant}; 9 10 use sync::{Condvar, Mutex}; 11 use sys_util::SharedMemory; 12 13 use crate::{BoxError, SampleFormat, StreamDirection, StreamEffect}; 14 15 type GenericResult<T> = std::result::Result<T, BoxError>; 16 17 /// `BufferSet` is used as a callback mechanism for `ServerRequest` objects. 18 /// It is meant to be implemented by the audio stream, allowing arbitrary code 19 /// to be run after a buffer offset and length is set. 20 pub trait BufferSet { 21 /// Called when the client sets a buffer offset and length. 22 /// 23 /// `offset` is the offset within shared memory of the buffer and `frames` 24 /// indicates the number of audio frames that can be read from or written to 25 /// the buffer. callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>26 fn callback(&mut self, offset: usize, frames: usize) -> GenericResult<()>; 27 28 /// Called when the client ignores a request from the server. ignore(&mut self) -> GenericResult<()>29 fn ignore(&mut self) -> GenericResult<()>; 30 } 31 32 #[derive(Debug)] 33 pub enum Error { 34 TooManyFrames(usize, usize), 35 } 36 37 impl error::Error for Error {} 38 39 impl fmt::Display for Error { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result40 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 41 match self { 42 Error::TooManyFrames(provided, requested) => write!( 43 f, 44 "Provided number of frames {} exceeds requested number of frames {}", 45 provided, requested 46 ), 47 } 48 } 49 } 50 51 /// `ServerRequest` represents an active request from the server for the client 52 /// to provide a buffer in shared memory to playback from or capture to. 53 pub struct ServerRequest<'a> { 54 requested_frames: usize, 55 buffer_set: &'a mut dyn BufferSet, 56 } 57 58 impl<'a> ServerRequest<'a> { 59 /// Create a new ServerRequest object 60 /// 61 /// Create a ServerRequest object representing a request from the server 62 /// for a buffer `requested_frames` in size. 63 /// 64 /// When the client responds to this request by calling 65 /// [`set_buffer_offset_and_frames`](ServerRequest::set_buffer_offset_and_frames), 66 /// BufferSet::callback will be called on `buffer_set`. 67 /// 68 /// # Arguments 69 /// * `requested_frames` - The requested buffer size in frames. 70 /// * `buffer_set` - The object implementing the callback for when a buffer is provided. new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self71 pub fn new<D: BufferSet>(requested_frames: usize, buffer_set: &'a mut D) -> Self { 72 Self { 73 requested_frames, 74 buffer_set, 75 } 76 } 77 78 /// Get the number of frames of audio data requested by the server. 79 /// 80 /// The returned value should never be greater than the `buffer_size` 81 /// given in [`new_stream`](ShmStreamSource::new_stream). requested_frames(&self) -> usize82 pub fn requested_frames(&self) -> usize { 83 self.requested_frames 84 } 85 86 /// Sets the buffer offset and length for the requested buffer. 87 /// 88 /// Sets the buffer offset and length of the buffer that fulfills this 89 /// server request to `offset` and `length`, respectively. This means that 90 /// `length` bytes of audio samples may be read from/written to that 91 /// location in `client_shm` for a playback/capture stream, respectively. 92 /// This function may only be called once for a `ServerRequest`, at which 93 /// point the ServerRequest is dropped and no further calls are possible. 94 /// 95 /// # Arguments 96 /// 97 /// * `offset` - The value to use as the new buffer offset for the next buffer. 98 /// * `frames` - The length of the next buffer in frames. 99 /// 100 /// # Errors 101 /// 102 /// * If `frames` is greater than `requested_frames`. set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()>103 pub fn set_buffer_offset_and_frames(self, offset: usize, frames: usize) -> GenericResult<()> { 104 if frames > self.requested_frames { 105 return Err(Box::new(Error::TooManyFrames( 106 frames, 107 self.requested_frames, 108 ))); 109 } 110 111 self.buffer_set.callback(offset, frames) 112 } 113 114 /// Ignore this request 115 /// 116 /// If the client does not intend to respond to this ServerRequest with a 117 /// buffer, they should call this function. The stream will be notified that 118 /// the request has been ignored and will handle it properly. ignore_request(self) -> GenericResult<()>119 pub fn ignore_request(self) -> GenericResult<()> { 120 self.buffer_set.ignore() 121 } 122 } 123 124 /// `ShmStream` allows a client to interact with an active CRAS stream. 125 pub trait ShmStream: Send { 126 /// Get the size of a frame of audio data for this stream. frame_size(&self) -> usize127 fn frame_size(&self) -> usize; 128 129 /// Get the number of channels of audio data for this stream. num_channels(&self) -> usize130 fn num_channels(&self) -> usize; 131 132 /// Get the frame rate of audio data for this stream. frame_rate(&self) -> u32133 fn frame_rate(&self) -> u32; 134 135 /// Waits until the next server message indicating action is required. 136 /// 137 /// For playback streams, this will be `AUDIO_MESSAGE_REQUEST_DATA`, meaning 138 /// that we must set the buffer offset to the next location where playback 139 /// data can be found. 140 /// For capture streams, this will be `AUDIO_MESSAGE_DATA_READY`, meaning 141 /// that we must set the buffer offset to the next location where captured 142 /// data can be written to. 143 /// Will return early if `timeout` elapses before a message is received. 144 /// 145 /// # Arguments 146 /// 147 /// * `timeout` - The amount of time to wait until a message is received. 148 /// 149 /// # Return value 150 /// 151 /// Returns `Some(request)` where `request` is an object that implements the 152 /// [`ServerRequest`](ServerRequest) trait and which can be used to get the 153 /// number of bytes requested for playback streams or that have already been 154 /// written to shm for capture streams. 155 /// 156 /// If the timeout occurs before a message is received, returns `None`. 157 /// 158 /// # Errors 159 /// 160 /// * If an invalid message type is received for the stream. wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>161 fn wait_for_next_action_with_timeout( 162 &mut self, 163 timeout: Duration, 164 ) -> GenericResult<Option<ServerRequest>>; 165 } 166 167 /// `ShmStreamSource` creates streams for playback or capture of audio. 168 pub trait ShmStreamSource: Send { 169 /// Creates a new [`ShmStream`](ShmStream) 170 /// 171 /// Creates a new `ShmStream` object, which allows: 172 /// * Waiting until the server has communicated that data is ready or 173 /// requested that we make more data available. 174 /// * Setting the location and length of buffers for reading/writing audio data. 175 /// 176 /// # Arguments 177 /// 178 /// * `direction` - The direction of the stream, either `Playback` or `Capture`. 179 /// * `num_channels` - The number of audio channels for the stream. 180 /// * `format` - The audio format to use for audio samples. 181 /// * `frame_rate` - The stream's frame rate in Hz. 182 /// * `buffer_size` - The maximum size of an audio buffer. This will be the 183 /// size used for transfers of audio data between client 184 /// and server. 185 /// * `effects` - Audio effects to use for the stream, such as echo-cancellation. 186 /// * `client_shm` - The shared memory area that will contain samples. 187 /// * `buffer_offsets` - The two initial values to use as buffer offsets 188 /// for streams. This way, the server will not write 189 /// audio data to an arbitrary offset in `client_shm` 190 /// if the client fails to update offsets in time. 191 /// 192 /// # Errors 193 /// 194 /// * If sending the connect stream message to the server fails. 195 #[allow(clippy::too_many_arguments)] new_stream( &mut self, direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, effects: &[StreamEffect], client_shm: &SharedMemory, buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>196 fn new_stream( 197 &mut self, 198 direction: StreamDirection, 199 num_channels: usize, 200 format: SampleFormat, 201 frame_rate: u32, 202 buffer_size: usize, 203 effects: &[StreamEffect], 204 client_shm: &SharedMemory, 205 buffer_offsets: [u64; 2], 206 ) -> GenericResult<Box<dyn ShmStream>>; 207 208 /// Get a list of file descriptors used by the implementation. 209 /// 210 /// Returns any open file descriptors needed by the implementation. 211 /// This list helps users of the ShmStreamSource enter Linux jails without 212 /// closing needed file descriptors. keep_fds(&self) -> Vec<RawFd>213 fn keep_fds(&self) -> Vec<RawFd> { 214 Vec::new() 215 } 216 } 217 218 /// Class that implements ShmStream trait but does nothing with the samples 219 pub struct NullShmStream { 220 num_channels: usize, 221 frame_rate: u32, 222 buffer_size: usize, 223 frame_size: usize, 224 interval: Duration, 225 next_frame: Duration, 226 start_time: Instant, 227 } 228 229 impl NullShmStream { 230 /// Attempt to create a new NullShmStream with the given number of channels, 231 /// format, frame_rate, and buffer_size. new( buffer_size: usize, num_channels: usize, format: SampleFormat, frame_rate: u32, ) -> Self232 pub fn new( 233 buffer_size: usize, 234 num_channels: usize, 235 format: SampleFormat, 236 frame_rate: u32, 237 ) -> Self { 238 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); 239 Self { 240 num_channels, 241 frame_rate, 242 buffer_size, 243 frame_size: format.sample_bytes() * num_channels, 244 interval, 245 next_frame: interval, 246 start_time: Instant::now(), 247 } 248 } 249 } 250 251 impl BufferSet for NullShmStream { callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>252 fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { 253 Ok(()) 254 } 255 ignore(&mut self) -> GenericResult<()>256 fn ignore(&mut self) -> GenericResult<()> { 257 Ok(()) 258 } 259 } 260 261 impl ShmStream for NullShmStream { frame_size(&self) -> usize262 fn frame_size(&self) -> usize { 263 self.frame_size 264 } 265 num_channels(&self) -> usize266 fn num_channels(&self) -> usize { 267 self.num_channels 268 } 269 frame_rate(&self) -> u32270 fn frame_rate(&self) -> u32 { 271 self.frame_rate 272 } 273 wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>274 fn wait_for_next_action_with_timeout( 275 &mut self, 276 timeout: Duration, 277 ) -> GenericResult<Option<ServerRequest>> { 278 let elapsed = self.start_time.elapsed(); 279 if elapsed < self.next_frame { 280 if timeout < self.next_frame - elapsed { 281 std::thread::sleep(timeout); 282 return Ok(None); 283 } else { 284 std::thread::sleep(self.next_frame - elapsed); 285 } 286 } 287 self.next_frame += self.interval; 288 Ok(Some(ServerRequest::new(self.buffer_size, self))) 289 } 290 } 291 292 /// Source of `NullShmStream` objects. 293 #[derive(Default)] 294 pub struct NullShmStreamSource; 295 296 impl NullShmStreamSource { new() -> Self297 pub fn new() -> Self { 298 Self::default() 299 } 300 } 301 302 impl ShmStreamSource for NullShmStreamSource { new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &SharedMemory, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>303 fn new_stream( 304 &mut self, 305 _direction: StreamDirection, 306 num_channels: usize, 307 format: SampleFormat, 308 frame_rate: u32, 309 buffer_size: usize, 310 _effects: &[StreamEffect], 311 _client_shm: &SharedMemory, 312 _buffer_offsets: [u64; 2], 313 ) -> GenericResult<Box<dyn ShmStream>> { 314 let new_stream = NullShmStream::new(buffer_size, num_channels, format, frame_rate); 315 Ok(Box::new(new_stream)) 316 } 317 } 318 319 #[derive(Clone)] 320 pub struct MockShmStream { 321 num_channels: usize, 322 frame_rate: u32, 323 request_size: usize, 324 frame_size: usize, 325 request_notifier: Arc<(Mutex<bool>, Condvar)>, 326 } 327 328 impl MockShmStream { 329 /// Attempt to create a new MockShmStream with the given number of 330 /// channels, frame_rate, format, and buffer_size. new( num_channels: usize, frame_rate: u32, format: SampleFormat, buffer_size: usize, ) -> Self331 pub fn new( 332 num_channels: usize, 333 frame_rate: u32, 334 format: SampleFormat, 335 buffer_size: usize, 336 ) -> Self { 337 Self { 338 num_channels, 339 frame_rate, 340 request_size: buffer_size, 341 frame_size: format.sample_bytes() * num_channels, 342 request_notifier: Arc::new((Mutex::new(false), Condvar::new())), 343 } 344 } 345 346 /// Call to request data from the stream, causing it to return from 347 /// `wait_for_next_action_with_timeout`. Will block until 348 /// `set_buffer_offset_and_frames` is called on the ServerRequest returned 349 /// from `wait_for_next_action_with_timeout`, or until `timeout` elapses. 350 /// Returns true if a response was successfully received. trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool351 pub fn trigger_callback_with_timeout(&mut self, timeout: Duration) -> bool { 352 let &(ref lock, ref cvar) = &*self.request_notifier; 353 let mut requested = lock.lock(); 354 *requested = true; 355 cvar.notify_one(); 356 let start_time = Instant::now(); 357 while *requested { 358 requested = cvar.wait_timeout(requested, timeout).0; 359 if start_time.elapsed() > timeout { 360 // We failed to get a callback in time, mark this as false. 361 *requested = false; 362 return false; 363 } 364 } 365 366 true 367 } 368 notify_request(&mut self)369 fn notify_request(&mut self) { 370 let &(ref lock, ref cvar) = &*self.request_notifier; 371 let mut requested = lock.lock(); 372 *requested = false; 373 cvar.notify_one(); 374 } 375 } 376 377 impl BufferSet for MockShmStream { callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()>378 fn callback(&mut self, _offset: usize, _frames: usize) -> GenericResult<()> { 379 self.notify_request(); 380 Ok(()) 381 } 382 ignore(&mut self) -> GenericResult<()>383 fn ignore(&mut self) -> GenericResult<()> { 384 self.notify_request(); 385 Ok(()) 386 } 387 } 388 389 impl ShmStream for MockShmStream { frame_size(&self) -> usize390 fn frame_size(&self) -> usize { 391 self.frame_size 392 } 393 num_channels(&self) -> usize394 fn num_channels(&self) -> usize { 395 self.num_channels 396 } 397 frame_rate(&self) -> u32398 fn frame_rate(&self) -> u32 { 399 self.frame_rate 400 } 401 wait_for_next_action_with_timeout( &mut self, timeout: Duration, ) -> GenericResult<Option<ServerRequest>>402 fn wait_for_next_action_with_timeout( 403 &mut self, 404 timeout: Duration, 405 ) -> GenericResult<Option<ServerRequest>> { 406 { 407 let start_time = Instant::now(); 408 let &(ref lock, ref cvar) = &*self.request_notifier; 409 let mut requested = lock.lock(); 410 while !*requested { 411 requested = cvar.wait_timeout(requested, timeout).0; 412 if start_time.elapsed() > timeout { 413 return Ok(None); 414 } 415 } 416 } 417 418 Ok(Some(ServerRequest::new(self.request_size, self))) 419 } 420 } 421 422 /// Source of `MockShmStream` objects. 423 #[derive(Clone, Default)] 424 pub struct MockShmStreamSource { 425 last_stream: Arc<(Mutex<Option<MockShmStream>>, Condvar)>, 426 } 427 428 impl MockShmStreamSource { new() -> Self429 pub fn new() -> Self { 430 Default::default() 431 } 432 433 /// Get the last stream that has been created from this source. If no stream 434 /// has been created, block until one has. get_last_stream(&self) -> MockShmStream435 pub fn get_last_stream(&self) -> MockShmStream { 436 let &(ref last_stream, ref cvar) = &*self.last_stream; 437 let mut stream = last_stream.lock(); 438 loop { 439 match &*stream { 440 None => stream = cvar.wait(stream), 441 Some(ref s) => return s.clone(), 442 }; 443 } 444 } 445 } 446 447 impl ShmStreamSource for MockShmStreamSource { new_stream( &mut self, _direction: StreamDirection, num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, _effects: &[StreamEffect], _client_shm: &SharedMemory, _buffer_offsets: [u64; 2], ) -> GenericResult<Box<dyn ShmStream>>448 fn new_stream( 449 &mut self, 450 _direction: StreamDirection, 451 num_channels: usize, 452 format: SampleFormat, 453 frame_rate: u32, 454 buffer_size: usize, 455 _effects: &[StreamEffect], 456 _client_shm: &SharedMemory, 457 _buffer_offsets: [u64; 2], 458 ) -> GenericResult<Box<dyn ShmStream>> { 459 let &(ref last_stream, ref cvar) = &*self.last_stream; 460 let mut stream = last_stream.lock(); 461 462 let new_stream = MockShmStream::new(num_channels, frame_rate, format, buffer_size); 463 *stream = Some(new_stream.clone()); 464 cvar.notify_one(); 465 Ok(Box::new(new_stream)) 466 } 467 } 468 469 #[cfg(test)] 470 pub mod tests { 471 use super::*; 472 473 #[test] mock_trigger_callback()474 fn mock_trigger_callback() { 475 let stream_source = MockShmStreamSource::new(); 476 let mut thread_stream_source = stream_source.clone(); 477 478 let buffer_size = 480; 479 let num_channels = 2; 480 let format = SampleFormat::S24LE; 481 let shm = SharedMemory::anon().expect("Failed to create shm"); 482 483 let handle = std::thread::spawn(move || { 484 let mut stream = thread_stream_source 485 .new_stream( 486 StreamDirection::Playback, 487 num_channels, 488 format, 489 44100, 490 buffer_size, 491 &[], 492 &shm, 493 [400, 8000], 494 ) 495 .expect("Failed to create stream"); 496 497 let request = stream 498 .wait_for_next_action_with_timeout(Duration::from_secs(5)) 499 .expect("Failed to wait for next action"); 500 match request { 501 Some(r) => { 502 let requested = r.requested_frames(); 503 r.set_buffer_offset_and_frames(872, requested) 504 .expect("Failed to set buffer offset and frames"); 505 requested 506 } 507 None => 0, 508 } 509 }); 510 511 let mut stream = stream_source.get_last_stream(); 512 assert!(stream.trigger_callback_with_timeout(Duration::from_secs(1))); 513 514 let requested_frames = handle.join().expect("Failed to join thread"); 515 assert_eq!(requested_frames, buffer_size); 516 } 517 518 #[test] null_consumption_rate()519 fn null_consumption_rate() { 520 let frame_rate = 44100; 521 let buffer_size = 480; 522 let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64); 523 524 let shm = SharedMemory::anon().expect("Failed to create shm"); 525 526 let start = Instant::now(); 527 528 let mut stream_source = NullShmStreamSource::new(); 529 let mut stream = stream_source 530 .new_stream( 531 StreamDirection::Playback, 532 2, 533 SampleFormat::S24LE, 534 frame_rate, 535 buffer_size, 536 &[], 537 &shm, 538 [400, 8000], 539 ) 540 .expect("Failed to create stream"); 541 542 let timeout = Duration::from_secs(5); 543 let request = stream 544 .wait_for_next_action_with_timeout(timeout) 545 .expect("Failed to wait for first request") 546 .expect("First request should not have timed out"); 547 request 548 .set_buffer_offset_and_frames(276, 480) 549 .expect("Failed to set buffer offset and length"); 550 551 // The second call should block until the first buffer is consumed. 552 let _request = stream 553 .wait_for_next_action_with_timeout(timeout) 554 .expect("Failed to wait for second request"); 555 let elapsed = start.elapsed(); 556 assert!( 557 elapsed > interval, 558 "wait_for_next_action_with_timeout didn't block long enough: {:?}", 559 elapsed 560 ); 561 562 assert!( 563 elapsed < timeout, 564 "wait_for_next_action_with_timeout blocked for too long: {:?}", 565 elapsed 566 ); 567 } 568 } 569