1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::collections::VecDeque;
6 use std::sync::mpsc::channel;
7 use std::sync::mpsc::Receiver;
8 use std::sync::mpsc::Sender;
9 use std::sync::Arc;
10 use std::thread;
11 use std::time::Duration;
12 use std::time::Instant;
13
14 use base::error;
15 use base::set_rt_prio_limit;
16 use base::set_rt_round_robin;
17 use base::warn;
18 use data_model::Le32;
19 use serde::Deserialize;
20 use serde::Serialize;
21 use sync::Mutex;
22
23 use super::Error as VioSError;
24 use super::Result;
25 use super::SoundError;
26 use super::*;
27 use crate::virtio::snd::common::from_virtio_frame_rate;
28 use crate::virtio::snd::constants::*;
29 use crate::virtio::snd::layout::*;
30 use crate::virtio::DescriptorChain;
31 use crate::virtio::Interrupt;
32 use crate::virtio::Queue;
33
34 /// Messages that the worker can send to the stream (thread).
35 pub enum StreamMsg {
36 SetParams(DescriptorChain, virtio_snd_pcm_set_params),
37 Prepare(DescriptorChain),
38 Start(DescriptorChain),
39 Stop(DescriptorChain),
40 Release(DescriptorChain),
41 Buffer(DescriptorChain),
42 Break,
43 }
44
45 #[derive(Clone, Serialize, Deserialize)]
46 pub enum StreamState {
47 New,
48 ParamsSet,
49 Prepared,
50 Started,
51 Stopped,
52 Released,
53 }
54
55 pub struct Stream {
56 stream_id: u32,
57 receiver: Receiver<Box<StreamMsg>>,
58 vios_client: Arc<Mutex<VioSClient>>,
59 control_queue: Arc<Mutex<Queue>>,
60 io_queue: Arc<Mutex<Queue>>,
61 interrupt: Interrupt,
62 capture: bool,
63 current_state: StreamState,
64 period: Duration,
65 start_time: Instant,
66 next_buffer: Duration,
67 buffer_queue: VecDeque<DescriptorChain>,
68 }
69
70 #[derive(Clone, Serialize, Deserialize)]
71 pub struct StreamSnapshot {
72 pub current_state: StreamState,
73 pub period: Duration,
74 pub next_buffer: Duration,
75 }
76
77 impl Stream {
78 /// Start a new stream thread and return its handler.
try_new( stream_id: u32, vios_client: Arc<Mutex<VioSClient>>, interrupt: Interrupt, control_queue: Arc<Mutex<Queue>>, io_queue: Arc<Mutex<Queue>>, capture: bool, stream_state: Option<StreamSnapshot>, ) -> Result<StreamProxy>79 pub fn try_new(
80 stream_id: u32,
81 vios_client: Arc<Mutex<VioSClient>>,
82 interrupt: Interrupt,
83 control_queue: Arc<Mutex<Queue>>,
84 io_queue: Arc<Mutex<Queue>>,
85 capture: bool,
86 stream_state: Option<StreamSnapshot>,
87 ) -> Result<StreamProxy> {
88 let (sender, receiver): (Sender<Box<StreamMsg>>, Receiver<Box<StreamMsg>>) = channel();
89 let thread = thread::Builder::new()
90 .name(format!("v_snd_stream:{stream_id}"))
91 .spawn(move || {
92 try_set_real_time_priority();
93 let (current_state, period, next_buffer) =
94 if let Some(stream_state) = stream_state.clone() {
95 (
96 stream_state.current_state,
97 stream_state.period,
98 stream_state.next_buffer,
99 )
100 } else {
101 (
102 StreamState::New,
103 Duration::from_millis(0),
104 Duration::from_millis(0),
105 )
106 };
107
108 let mut stream = Stream {
109 stream_id,
110 receiver,
111 vios_client: vios_client.clone(),
112 control_queue,
113 io_queue,
114 interrupt,
115 capture,
116 current_state,
117 period,
118 start_time: Instant::now(),
119 next_buffer,
120 buffer_queue: VecDeque::new(),
121 };
122
123 if let Some(stream_state) = stream_state {
124 if let Err(e) = vios_client
125 .lock()
126 .restore_stream(stream_id, stream_state.current_state)
127 {
128 error!("failed to restore stream params: {}", e);
129 };
130 }
131 if let Err(e) = stream.stream_loop() {
132 error!("virtio-snd: Error in stream {}: {}", stream_id, e);
133 }
134 let state = stream.current_state.clone();
135 StreamSnapshot {
136 current_state: state,
137 period: stream.period,
138 next_buffer: stream.next_buffer,
139 }
140 })
141 .map_err(SoundError::CreateThread)?;
142 Ok(StreamProxy {
143 sender,
144 thread: Some(thread),
145 })
146 }
147
stream_loop(&mut self) -> Result<()>148 fn stream_loop(&mut self) -> Result<()> {
149 loop {
150 if !self.recv_msg()? {
151 break;
152 }
153 self.maybe_process_queued_buffers()?;
154 }
155 Ok(())
156 }
157
recv_msg(&mut self) -> Result<bool>158 fn recv_msg(&mut self) -> Result<bool> {
159 let msg = self.receiver.recv().map_err(SoundError::StreamThreadRecv)?;
160 let (code, desc, next_state) = match *msg {
161 StreamMsg::SetParams(desc, params) => {
162 let code = match self.vios_client.lock().set_stream_parameters_raw(params) {
163 Ok(()) => {
164 let frame_rate = from_virtio_frame_rate(params.rate).unwrap_or(0) as u64;
165 self.period = Duration::from_nanos(
166 (params.period_bytes.to_native() as u64 * 1_000_000_000u64)
167 / frame_rate
168 / params.channels as u64
169 / bytes_per_sample(params.format) as u64,
170 );
171 VIRTIO_SND_S_OK
172 }
173 Err(e) => {
174 error!(
175 "virtio-snd: Error setting parameters for stream {}: {}",
176 self.stream_id, e
177 );
178 vios_error_to_status_code(e)
179 }
180 };
181 (code, desc, StreamState::ParamsSet)
182 }
183 StreamMsg::Prepare(desc) => {
184 let code = match self.vios_client.lock().prepare_stream(self.stream_id) {
185 Ok(()) => VIRTIO_SND_S_OK,
186 Err(e) => {
187 error!(
188 "virtio-snd: Failed to prepare stream {}: {}",
189 self.stream_id, e
190 );
191 vios_error_to_status_code(e)
192 }
193 };
194 (code, desc, StreamState::Prepared)
195 }
196 StreamMsg::Start(desc) => {
197 let code = match self.vios_client.lock().start_stream(self.stream_id) {
198 Ok(()) => VIRTIO_SND_S_OK,
199 Err(e) => {
200 error!(
201 "virtio-snd: Failed to start stream {}: {}",
202 self.stream_id, e
203 );
204 vios_error_to_status_code(e)
205 }
206 };
207 self.start_time = Instant::now();
208 self.next_buffer = Duration::from_millis(0);
209 (code, desc, StreamState::Started)
210 }
211 StreamMsg::Stop(desc) => {
212 let code = match self.vios_client.lock().stop_stream(self.stream_id) {
213 Ok(()) => VIRTIO_SND_S_OK,
214 Err(e) => {
215 error!(
216 "virtio-snd: Failed to stop stream {}: {}",
217 self.stream_id, e
218 );
219 vios_error_to_status_code(e)
220 }
221 };
222 (code, desc, StreamState::Stopped)
223 }
224 StreamMsg::Release(desc) => {
225 let code = match self.vios_client.lock().release_stream(self.stream_id) {
226 Ok(()) => VIRTIO_SND_S_OK,
227 Err(e) => {
228 error!(
229 "virtio-snd: Failed to release stream {}: {}",
230 self.stream_id, e
231 );
232 vios_error_to_status_code(e)
233 }
234 };
235 (code, desc, StreamState::Released)
236 }
237 StreamMsg::Buffer(d) => {
238 // Buffers may arrive while in several states:
239 // - Prepared: Buffer should be queued and played when start cmd arrives
240 // - Started: Buffer should be processed immediately
241 // - Stopped: Buffer should be returned to the guest immediately
242 // Because we may need to wait to process the buffer, we always queue it and
243 // decide what to do with queued buffers after every message.
244 self.buffer_queue.push_back(d);
245 // return here to avoid replying on control queue below
246 return Ok(true);
247 }
248 StreamMsg::Break => {
249 return Ok(false);
250 }
251 };
252 reply_control_op_status(code, desc, &self.control_queue, &self.interrupt)?;
253 self.current_state = next_state;
254 Ok(true)
255 }
256
maybe_process_queued_buffers(&mut self) -> Result<()>257 fn maybe_process_queued_buffers(&mut self) -> Result<()> {
258 match self.current_state {
259 StreamState::Started => {
260 while let Some(mut desc) = self.buffer_queue.pop_front() {
261 let reader = &mut desc.reader;
262 // Ignore the first buffer, it was already read by the time this thread
263 // receives the descriptor
264 reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
265 let writer = &mut desc.writer;
266 let io_res = if self.capture {
267 let buffer_size =
268 writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>();
269 self.vios_client.lock().request_audio_data(
270 self.stream_id,
271 buffer_size,
272 |vslice| writer.write_from_volatile_slice(*vslice),
273 )
274 } else {
275 self.vios_client.lock().inject_audio_data(
276 self.stream_id,
277 reader.available_bytes(),
278 |vslice| reader.read_to_volatile_slice(vslice),
279 )
280 };
281 let (code, latency) = match io_res {
282 Ok((latency, _)) => (VIRTIO_SND_S_OK, latency),
283 Err(e) => {
284 error!(
285 "virtio-snd: Failed IO operation in stream {}: {}",
286 self.stream_id, e
287 );
288 (VIRTIO_SND_S_IO_ERR, 0)
289 }
290 };
291 if let Err(e) = writer.write_obj(virtio_snd_pcm_status {
292 status: Le32::from(code),
293 latency_bytes: Le32::from(latency),
294 }) {
295 error!(
296 "virtio-snd: Failed to write pcm status from stream {} thread: {}",
297 self.stream_id, e
298 );
299 }
300
301 self.next_buffer += self.period;
302 let elapsed = self.start_time.elapsed();
303 if elapsed < self.next_buffer {
304 // Completing an IO request can be considered an elapsed period
305 // notification by the driver, so we must wait the right amount of time to
306 // release the buffer if the sound server client returned too soon.
307 std::thread::sleep(self.next_buffer - elapsed);
308 }
309 let len = writer.bytes_written() as u32;
310 {
311 let mut io_queue_lock = self.io_queue.lock();
312 io_queue_lock.add_used(desc, len);
313 io_queue_lock.trigger_interrupt(&self.interrupt);
314 }
315 }
316 }
317 StreamState::Stopped | StreamState::Released => {
318 // For some reason playback buffers can arrive after stop and release (maybe because
319 // buffer-ready notifications arrive over eventfds and those are processed in
320 // random order?). The spec requires the device to not confirm the release of a
321 // stream until all IO buffers have been released, but that's impossible to
322 // guarantee if a buffer arrives after release is requested. Luckily it seems to
323 // work fine if the buffer is released after the release command is completed.
324 while let Some(desc) = self.buffer_queue.pop_front() {
325 reply_pcm_buffer_status(
326 VIRTIO_SND_S_OK,
327 0,
328 desc,
329 &self.io_queue,
330 &self.interrupt,
331 )?;
332 }
333 }
334 StreamState::Prepared => {} // Do nothing, any buffers will be processed after start
335 _ => {
336 if !self.buffer_queue.is_empty() {
337 warn!("virtio-snd: Buffers received while in unexpected state");
338 }
339 }
340 }
341 Ok(())
342 }
343 }
344
345 impl Drop for Stream {
drop(&mut self)346 fn drop(&mut self) {
347 // Try to stop and release the stream in case it was playing, these operations will fail if
348 // the stream is already released, just ignore that failure
349 let _ = self.vios_client.lock().stop_stream(self.stream_id);
350 let _ = self.vios_client.lock().release_stream(self.stream_id);
351
352 // Also release any pending buffer
353 while let Some(desc) = self.buffer_queue.pop_front() {
354 if let Err(e) = reply_pcm_buffer_status(
355 VIRTIO_SND_S_IO_ERR,
356 0,
357 desc,
358 &self.io_queue,
359 &self.interrupt,
360 ) {
361 error!(
362 "virtio-snd: Failed to reply buffer on stream {}: {}",
363 self.stream_id, e
364 );
365 }
366 }
367 }
368 }
369
370 /// Basically a proxy to the thread handling a particular stream.
371 pub struct StreamProxy {
372 sender: Sender<Box<StreamMsg>>,
373 thread: Option<thread::JoinHandle<StreamSnapshot>>,
374 }
375
376 impl StreamProxy {
377 /// Access the underlying sender to clone it or send messages
msg_sender(&self) -> &Sender<Box<StreamMsg>>378 pub fn msg_sender(&self) -> &Sender<Box<StreamMsg>> {
379 &self.sender
380 }
381
382 /// Send a message to the stream thread on the other side of this sender
send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()>383 pub fn send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()> {
384 sender
385 .send(Box::new(msg))
386 .map_err(SoundError::StreamThreadSend)
387 }
388
389 /// Convenience function to send a message to this stream's thread
send(&self, msg: StreamMsg) -> Result<()>390 pub fn send(&self, msg: StreamMsg) -> Result<()> {
391 Self::send_msg(&self.sender, msg)
392 }
393
stop_thread(mut self) -> StreamSnapshot394 pub fn stop_thread(mut self) -> StreamSnapshot {
395 self.stop_thread_inner().unwrap()
396 }
397
stop_thread_inner(&mut self) -> Option<StreamSnapshot>398 fn stop_thread_inner(&mut self) -> Option<StreamSnapshot> {
399 if let Some(th) = self.thread.take() {
400 if let Err(e) = self.send(StreamMsg::Break) {
401 error!(
402 "virtio-snd: Failed to send Break msg to stream thread: {}",
403 e
404 );
405 }
406 match th.join() {
407 Ok(state) => Some(state),
408 Err(e) => panic!("virtio-snd: Panic detected on stream thread: {:?}", e),
409 }
410 } else {
411 None
412 }
413 }
414 }
415
416 impl Drop for StreamProxy {
drop(&mut self)417 fn drop(&mut self) {
418 let _ = self.stop_thread_inner();
419 }
420 }
421
422 /// Attempts to set the current thread's priority to a value hight enough to handle audio IO. This
423 /// may fail due to insuficient permissions.
try_set_real_time_priority()424 pub fn try_set_real_time_priority() {
425 const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
426 if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
427 .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
428 {
429 warn!("Failed to set audio stream thread to real time: {}", e);
430 }
431 }
432
433 /// Gets the appropriate virtio-snd error to return to the driver from a VioSError.
vios_error_to_status_code(e: VioSError) -> u32434 pub fn vios_error_to_status_code(e: VioSError) -> u32 {
435 match e {
436 VioSError::ServerIOError(_) => VIRTIO_SND_S_IO_ERR,
437 _ => VIRTIO_SND_S_NOT_SUPP,
438 }
439 }
440
441 /// Encapsulates sending the virtio_snd_hdr struct back to the driver.
reply_control_op_status( code: u32, mut desc: DescriptorChain, queue: &Arc<Mutex<Queue>>, interrupt: &Interrupt, ) -> Result<()>442 pub fn reply_control_op_status(
443 code: u32,
444 mut desc: DescriptorChain,
445 queue: &Arc<Mutex<Queue>>,
446 interrupt: &Interrupt,
447 ) -> Result<()> {
448 let writer = &mut desc.writer;
449 writer
450 .write_obj(virtio_snd_hdr {
451 code: Le32::from(code),
452 })
453 .map_err(SoundError::QueueIO)?;
454 let len = writer.bytes_written() as u32;
455 {
456 let mut queue_lock = queue.lock();
457 queue_lock.add_used(desc, len);
458 queue_lock.trigger_interrupt(interrupt);
459 }
460 Ok(())
461 }
462
463 /// Encapsulates sending the virtio_snd_pcm_status struct back to the driver.
reply_pcm_buffer_status( status: u32, latency_bytes: u32, mut desc: DescriptorChain, queue: &Arc<Mutex<Queue>>, interrupt: &Interrupt, ) -> Result<()>464 pub fn reply_pcm_buffer_status(
465 status: u32,
466 latency_bytes: u32,
467 mut desc: DescriptorChain,
468 queue: &Arc<Mutex<Queue>>,
469 interrupt: &Interrupt,
470 ) -> Result<()> {
471 let writer = &mut desc.writer;
472 if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
473 writer
474 .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
475 }
476 writer
477 .write_obj(virtio_snd_pcm_status {
478 status: Le32::from(status),
479 latency_bytes: Le32::from(latency_bytes),
480 })
481 .map_err(SoundError::QueueIO)?;
482 let len = writer.bytes_written() as u32;
483 {
484 let mut queue_lock = queue.lock();
485 queue_lock.add_used(desc, len);
486 queue_lock.trigger_interrupt(interrupt);
487 }
488 Ok(())
489 }
490
bytes_per_sample(format: u8) -> usize491 fn bytes_per_sample(format: u8) -> usize {
492 match format {
493 VIRTIO_SND_PCM_FMT_IMA_ADPCM => 1usize,
494 VIRTIO_SND_PCM_FMT_MU_LAW => 1usize,
495 VIRTIO_SND_PCM_FMT_A_LAW => 1usize,
496 VIRTIO_SND_PCM_FMT_S8 => 1usize,
497 VIRTIO_SND_PCM_FMT_U8 => 1usize,
498 VIRTIO_SND_PCM_FMT_S16 => 2usize,
499 VIRTIO_SND_PCM_FMT_U16 => 2usize,
500 VIRTIO_SND_PCM_FMT_S32 => 4usize,
501 VIRTIO_SND_PCM_FMT_U32 => 4usize,
502 VIRTIO_SND_PCM_FMT_FLOAT => 4usize,
503 VIRTIO_SND_PCM_FMT_FLOAT64 => 8usize,
504 // VIRTIO_SND_PCM_FMT_DSD_U8
505 // VIRTIO_SND_PCM_FMT_DSD_U16
506 // VIRTIO_SND_PCM_FMT_DSD_U32
507 // VIRTIO_SND_PCM_FMT_IEC958_SUBFRAME
508 // VIRTIO_SND_PCM_FMT_S18_3
509 // VIRTIO_SND_PCM_FMT_U18_3
510 // VIRTIO_SND_PCM_FMT_S20_3
511 // VIRTIO_SND_PCM_FMT_U20_3
512 // VIRTIO_SND_PCM_FMT_S24_3
513 // VIRTIO_SND_PCM_FMT_U24_3
514 // VIRTIO_SND_PCM_FMT_S20
515 // VIRTIO_SND_PCM_FMT_U20
516 // VIRTIO_SND_PCM_FMT_S24
517 // VIRTIO_SND_PCM_FMT_U24
518 _ => {
519 // Some of these formats are not consistently stored in a particular size (24bits is
520 // sometimes stored in a 32bit word) while others are of variable size.
521 // The size per sample estimated here is designed to greatly underestimate the time it
522 // takes to play a buffer and depend instead on timings provided by the sound server if
523 // it supports these formats.
524 warn!(
525 "Unknown sample size for format {}, depending on sound server timing instead.",
526 format
527 );
528 1000usize
529 }
530 }
531 }
532