• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::VecDeque;
6 use std::sync::mpsc::channel;
7 use std::sync::mpsc::Receiver;
8 use std::sync::mpsc::Sender;
9 use std::sync::Arc;
10 use std::thread;
11 use std::time::Duration;
12 use std::time::Instant;
13 
14 use base::error;
15 use base::set_rt_prio_limit;
16 use base::set_rt_round_robin;
17 use base::warn;
18 use data_model::Le32;
19 use serde::Deserialize;
20 use serde::Serialize;
21 use sync::Mutex;
22 
23 use super::Error as VioSError;
24 use super::Result;
25 use super::SoundError;
26 use super::*;
27 use crate::virtio::snd::common::from_virtio_frame_rate;
28 use crate::virtio::snd::constants::*;
29 use crate::virtio::snd::layout::*;
30 use crate::virtio::DescriptorChain;
31 use crate::virtio::Interrupt;
32 use crate::virtio::Queue;
33 
34 /// Messages that the worker can send to the stream (thread).
35 pub enum StreamMsg {
36     SetParams(DescriptorChain, virtio_snd_pcm_set_params),
37     Prepare(DescriptorChain),
38     Start(DescriptorChain),
39     Stop(DescriptorChain),
40     Release(DescriptorChain),
41     Buffer(DescriptorChain),
42     Break,
43 }
44 
45 #[derive(Clone, Serialize, Deserialize)]
46 pub enum StreamState {
47     New,
48     ParamsSet,
49     Prepared,
50     Started,
51     Stopped,
52     Released,
53 }
54 
55 pub struct Stream {
56     stream_id: u32,
57     receiver: Receiver<Box<StreamMsg>>,
58     vios_client: Arc<Mutex<VioSClient>>,
59     control_queue: Arc<Mutex<Queue>>,
60     io_queue: Arc<Mutex<Queue>>,
61     interrupt: Interrupt,
62     capture: bool,
63     current_state: StreamState,
64     period: Duration,
65     start_time: Instant,
66     next_buffer: Duration,
67     buffer_queue: VecDeque<DescriptorChain>,
68 }
69 
70 #[derive(Clone, Serialize, Deserialize)]
71 pub struct StreamSnapshot {
72     pub current_state: StreamState,
73     pub period: Duration,
74     pub next_buffer: Duration,
75 }
76 
77 impl Stream {
78     /// Start a new stream thread and return its handler.
try_new( stream_id: u32, vios_client: Arc<Mutex<VioSClient>>, interrupt: Interrupt, control_queue: Arc<Mutex<Queue>>, io_queue: Arc<Mutex<Queue>>, capture: bool, stream_state: Option<StreamSnapshot>, ) -> Result<StreamProxy>79     pub fn try_new(
80         stream_id: u32,
81         vios_client: Arc<Mutex<VioSClient>>,
82         interrupt: Interrupt,
83         control_queue: Arc<Mutex<Queue>>,
84         io_queue: Arc<Mutex<Queue>>,
85         capture: bool,
86         stream_state: Option<StreamSnapshot>,
87     ) -> Result<StreamProxy> {
88         let (sender, receiver): (Sender<Box<StreamMsg>>, Receiver<Box<StreamMsg>>) = channel();
89         let thread = thread::Builder::new()
90             .name(format!("v_snd_stream:{stream_id}"))
91             .spawn(move || {
92                 try_set_real_time_priority();
93                 let (current_state, period, next_buffer) =
94                     if let Some(stream_state) = stream_state.clone() {
95                         (
96                             stream_state.current_state,
97                             stream_state.period,
98                             stream_state.next_buffer,
99                         )
100                     } else {
101                         (
102                             StreamState::New,
103                             Duration::from_millis(0),
104                             Duration::from_millis(0),
105                         )
106                     };
107 
108                 let mut stream = Stream {
109                     stream_id,
110                     receiver,
111                     vios_client: vios_client.clone(),
112                     control_queue,
113                     io_queue,
114                     interrupt,
115                     capture,
116                     current_state,
117                     period,
118                     start_time: Instant::now(),
119                     next_buffer,
120                     buffer_queue: VecDeque::new(),
121                 };
122 
123                 if let Some(stream_state) = stream_state {
124                     if let Err(e) = vios_client
125                         .lock()
126                         .restore_stream(stream_id, stream_state.current_state)
127                     {
128                         error!("failed to restore stream params: {}", e);
129                     };
130                 }
131                 if let Err(e) = stream.stream_loop() {
132                     error!("virtio-snd: Error in stream {}: {}", stream_id, e);
133                 }
134                 let state = stream.current_state.clone();
135                 StreamSnapshot {
136                     current_state: state,
137                     period: stream.period,
138                     next_buffer: stream.next_buffer,
139                 }
140             })
141             .map_err(SoundError::CreateThread)?;
142         Ok(StreamProxy {
143             sender,
144             thread: Some(thread),
145         })
146     }
147 
stream_loop(&mut self) -> Result<()>148     fn stream_loop(&mut self) -> Result<()> {
149         loop {
150             if !self.recv_msg()? {
151                 break;
152             }
153             self.maybe_process_queued_buffers()?;
154         }
155         Ok(())
156     }
157 
recv_msg(&mut self) -> Result<bool>158     fn recv_msg(&mut self) -> Result<bool> {
159         let msg = self.receiver.recv().map_err(SoundError::StreamThreadRecv)?;
160         let (code, desc, next_state) = match *msg {
161             StreamMsg::SetParams(desc, params) => {
162                 let code = match self.vios_client.lock().set_stream_parameters_raw(params) {
163                     Ok(()) => {
164                         let frame_rate = from_virtio_frame_rate(params.rate).unwrap_or(0) as u64;
165                         self.period = Duration::from_nanos(
166                             (params.period_bytes.to_native() as u64 * 1_000_000_000u64)
167                                 / frame_rate
168                                 / params.channels as u64
169                                 / bytes_per_sample(params.format) as u64,
170                         );
171                         VIRTIO_SND_S_OK
172                     }
173                     Err(e) => {
174                         error!(
175                             "virtio-snd: Error setting parameters for stream {}: {}",
176                             self.stream_id, e
177                         );
178                         vios_error_to_status_code(e)
179                     }
180                 };
181                 (code, desc, StreamState::ParamsSet)
182             }
183             StreamMsg::Prepare(desc) => {
184                 let code = match self.vios_client.lock().prepare_stream(self.stream_id) {
185                     Ok(()) => VIRTIO_SND_S_OK,
186                     Err(e) => {
187                         error!(
188                             "virtio-snd: Failed to prepare stream {}: {}",
189                             self.stream_id, e
190                         );
191                         vios_error_to_status_code(e)
192                     }
193                 };
194                 (code, desc, StreamState::Prepared)
195             }
196             StreamMsg::Start(desc) => {
197                 let code = match self.vios_client.lock().start_stream(self.stream_id) {
198                     Ok(()) => VIRTIO_SND_S_OK,
199                     Err(e) => {
200                         error!(
201                             "virtio-snd: Failed to start stream {}: {}",
202                             self.stream_id, e
203                         );
204                         vios_error_to_status_code(e)
205                     }
206                 };
207                 self.start_time = Instant::now();
208                 self.next_buffer = Duration::from_millis(0);
209                 (code, desc, StreamState::Started)
210             }
211             StreamMsg::Stop(desc) => {
212                 let code = match self.vios_client.lock().stop_stream(self.stream_id) {
213                     Ok(()) => VIRTIO_SND_S_OK,
214                     Err(e) => {
215                         error!(
216                             "virtio-snd: Failed to stop stream {}: {}",
217                             self.stream_id, e
218                         );
219                         vios_error_to_status_code(e)
220                     }
221                 };
222                 (code, desc, StreamState::Stopped)
223             }
224             StreamMsg::Release(desc) => {
225                 let code = match self.vios_client.lock().release_stream(self.stream_id) {
226                     Ok(()) => VIRTIO_SND_S_OK,
227                     Err(e) => {
228                         error!(
229                             "virtio-snd: Failed to release stream {}: {}",
230                             self.stream_id, e
231                         );
232                         vios_error_to_status_code(e)
233                     }
234                 };
235                 (code, desc, StreamState::Released)
236             }
237             StreamMsg::Buffer(d) => {
238                 // Buffers may arrive while in several states:
239                 // - Prepared: Buffer should be queued and played when start cmd arrives
240                 // - Started: Buffer should be processed immediately
241                 // - Stopped: Buffer should be returned to the guest immediately
242                 // Because we may need to wait to process the buffer, we always queue it and
243                 // decide what to do with queued buffers after every message.
244                 self.buffer_queue.push_back(d);
245                 // return here to avoid replying on control queue below
246                 return Ok(true);
247             }
248             StreamMsg::Break => {
249                 return Ok(false);
250             }
251         };
252         reply_control_op_status(code, desc, &self.control_queue, &self.interrupt)?;
253         self.current_state = next_state;
254         Ok(true)
255     }
256 
maybe_process_queued_buffers(&mut self) -> Result<()>257     fn maybe_process_queued_buffers(&mut self) -> Result<()> {
258         match self.current_state {
259             StreamState::Started => {
260                 while let Some(mut desc) = self.buffer_queue.pop_front() {
261                     let reader = &mut desc.reader;
262                     // Ignore the first buffer, it was already read by the time this thread
263                     // receives the descriptor
264                     reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
265                     let writer = &mut desc.writer;
266                     let io_res = if self.capture {
267                         let buffer_size =
268                             writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>();
269                         self.vios_client.lock().request_audio_data(
270                             self.stream_id,
271                             buffer_size,
272                             |vslice| writer.write_from_volatile_slice(*vslice),
273                         )
274                     } else {
275                         self.vios_client.lock().inject_audio_data(
276                             self.stream_id,
277                             reader.available_bytes(),
278                             |vslice| reader.read_to_volatile_slice(vslice),
279                         )
280                     };
281                     let (code, latency) = match io_res {
282                         Ok((latency, _)) => (VIRTIO_SND_S_OK, latency),
283                         Err(e) => {
284                             error!(
285                                 "virtio-snd: Failed IO operation in stream {}: {}",
286                                 self.stream_id, e
287                             );
288                             (VIRTIO_SND_S_IO_ERR, 0)
289                         }
290                     };
291                     if let Err(e) = writer.write_obj(virtio_snd_pcm_status {
292                         status: Le32::from(code),
293                         latency_bytes: Le32::from(latency),
294                     }) {
295                         error!(
296                             "virtio-snd: Failed to write pcm status from stream {} thread: {}",
297                             self.stream_id, e
298                         );
299                     }
300 
301                     self.next_buffer += self.period;
302                     let elapsed = self.start_time.elapsed();
303                     if elapsed < self.next_buffer {
304                         // Completing an IO request can be considered an elapsed period
305                         // notification by the driver, so we must wait the right amount of time to
306                         // release the buffer if the sound server client returned too soon.
307                         std::thread::sleep(self.next_buffer - elapsed);
308                     }
309                     let len = writer.bytes_written() as u32;
310                     {
311                         let mut io_queue_lock = self.io_queue.lock();
312                         io_queue_lock.add_used(desc, len);
313                         io_queue_lock.trigger_interrupt(&self.interrupt);
314                     }
315                 }
316             }
317             StreamState::Stopped | StreamState::Released => {
318                 // For some reason playback buffers can arrive after stop and release (maybe because
319                 // buffer-ready notifications arrive over eventfds and those are processed in
320                 // random order?). The spec requires the device to not confirm the release of a
321                 // stream until all IO buffers have been released, but that's impossible to
322                 // guarantee if a buffer arrives after release is requested. Luckily it seems to
323                 // work fine if the buffer is released after the release command is completed.
324                 while let Some(desc) = self.buffer_queue.pop_front() {
325                     reply_pcm_buffer_status(
326                         VIRTIO_SND_S_OK,
327                         0,
328                         desc,
329                         &self.io_queue,
330                         &self.interrupt,
331                     )?;
332                 }
333             }
334             StreamState::Prepared => {} // Do nothing, any buffers will be processed after start
335             _ => {
336                 if !self.buffer_queue.is_empty() {
337                     warn!("virtio-snd: Buffers received while in unexpected state");
338                 }
339             }
340         }
341         Ok(())
342     }
343 }
344 
345 impl Drop for Stream {
drop(&mut self)346     fn drop(&mut self) {
347         // Try to stop and release the stream in case it was playing, these operations will fail if
348         // the stream is already released, just ignore that failure
349         let _ = self.vios_client.lock().stop_stream(self.stream_id);
350         let _ = self.vios_client.lock().release_stream(self.stream_id);
351 
352         // Also release any pending buffer
353         while let Some(desc) = self.buffer_queue.pop_front() {
354             if let Err(e) = reply_pcm_buffer_status(
355                 VIRTIO_SND_S_IO_ERR,
356                 0,
357                 desc,
358                 &self.io_queue,
359                 &self.interrupt,
360             ) {
361                 error!(
362                     "virtio-snd: Failed to reply buffer on stream {}: {}",
363                     self.stream_id, e
364                 );
365             }
366         }
367     }
368 }
369 
370 /// Basically a proxy to the thread handling a particular stream.
371 pub struct StreamProxy {
372     sender: Sender<Box<StreamMsg>>,
373     thread: Option<thread::JoinHandle<StreamSnapshot>>,
374 }
375 
376 impl StreamProxy {
377     /// Access the underlying sender to clone it or send messages
msg_sender(&self) -> &Sender<Box<StreamMsg>>378     pub fn msg_sender(&self) -> &Sender<Box<StreamMsg>> {
379         &self.sender
380     }
381 
382     /// Send a message to the stream thread on the other side of this sender
send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()>383     pub fn send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()> {
384         sender
385             .send(Box::new(msg))
386             .map_err(SoundError::StreamThreadSend)
387     }
388 
389     /// Convenience function to send a message to this stream's thread
send(&self, msg: StreamMsg) -> Result<()>390     pub fn send(&self, msg: StreamMsg) -> Result<()> {
391         Self::send_msg(&self.sender, msg)
392     }
393 
stop_thread(mut self) -> StreamSnapshot394     pub fn stop_thread(mut self) -> StreamSnapshot {
395         self.stop_thread_inner().unwrap()
396     }
397 
stop_thread_inner(&mut self) -> Option<StreamSnapshot>398     fn stop_thread_inner(&mut self) -> Option<StreamSnapshot> {
399         if let Some(th) = self.thread.take() {
400             if let Err(e) = self.send(StreamMsg::Break) {
401                 error!(
402                     "virtio-snd: Failed to send Break msg to stream thread: {}",
403                     e
404                 );
405             }
406             match th.join() {
407                 Ok(state) => Some(state),
408                 Err(e) => panic!("virtio-snd: Panic detected on stream thread: {:?}", e),
409             }
410         } else {
411             None
412         }
413     }
414 }
415 
416 impl Drop for StreamProxy {
drop(&mut self)417     fn drop(&mut self) {
418         let _ = self.stop_thread_inner();
419     }
420 }
421 
422 /// Attempts to set the current thread's priority to a value hight enough to handle audio IO. This
423 /// may fail due to insuficient permissions.
try_set_real_time_priority()424 pub fn try_set_real_time_priority() {
425     const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
426     if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
427         .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
428     {
429         warn!("Failed to set audio stream thread to real time: {}", e);
430     }
431 }
432 
433 /// Gets the appropriate virtio-snd error to return to the driver from a VioSError.
vios_error_to_status_code(e: VioSError) -> u32434 pub fn vios_error_to_status_code(e: VioSError) -> u32 {
435     match e {
436         VioSError::ServerIOError(_) => VIRTIO_SND_S_IO_ERR,
437         _ => VIRTIO_SND_S_NOT_SUPP,
438     }
439 }
440 
441 /// Encapsulates sending the virtio_snd_hdr struct back to the driver.
reply_control_op_status( code: u32, mut desc: DescriptorChain, queue: &Arc<Mutex<Queue>>, interrupt: &Interrupt, ) -> Result<()>442 pub fn reply_control_op_status(
443     code: u32,
444     mut desc: DescriptorChain,
445     queue: &Arc<Mutex<Queue>>,
446     interrupt: &Interrupt,
447 ) -> Result<()> {
448     let writer = &mut desc.writer;
449     writer
450         .write_obj(virtio_snd_hdr {
451             code: Le32::from(code),
452         })
453         .map_err(SoundError::QueueIO)?;
454     let len = writer.bytes_written() as u32;
455     {
456         let mut queue_lock = queue.lock();
457         queue_lock.add_used(desc, len);
458         queue_lock.trigger_interrupt(interrupt);
459     }
460     Ok(())
461 }
462 
463 /// Encapsulates sending the virtio_snd_pcm_status struct back to the driver.
reply_pcm_buffer_status( status: u32, latency_bytes: u32, mut desc: DescriptorChain, queue: &Arc<Mutex<Queue>>, interrupt: &Interrupt, ) -> Result<()>464 pub fn reply_pcm_buffer_status(
465     status: u32,
466     latency_bytes: u32,
467     mut desc: DescriptorChain,
468     queue: &Arc<Mutex<Queue>>,
469     interrupt: &Interrupt,
470 ) -> Result<()> {
471     let writer = &mut desc.writer;
472     if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
473         writer
474             .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
475     }
476     writer
477         .write_obj(virtio_snd_pcm_status {
478             status: Le32::from(status),
479             latency_bytes: Le32::from(latency_bytes),
480         })
481         .map_err(SoundError::QueueIO)?;
482     let len = writer.bytes_written() as u32;
483     {
484         let mut queue_lock = queue.lock();
485         queue_lock.add_used(desc, len);
486         queue_lock.trigger_interrupt(interrupt);
487     }
488     Ok(())
489 }
490 
bytes_per_sample(format: u8) -> usize491 fn bytes_per_sample(format: u8) -> usize {
492     match format {
493         VIRTIO_SND_PCM_FMT_IMA_ADPCM => 1usize,
494         VIRTIO_SND_PCM_FMT_MU_LAW => 1usize,
495         VIRTIO_SND_PCM_FMT_A_LAW => 1usize,
496         VIRTIO_SND_PCM_FMT_S8 => 1usize,
497         VIRTIO_SND_PCM_FMT_U8 => 1usize,
498         VIRTIO_SND_PCM_FMT_S16 => 2usize,
499         VIRTIO_SND_PCM_FMT_U16 => 2usize,
500         VIRTIO_SND_PCM_FMT_S32 => 4usize,
501         VIRTIO_SND_PCM_FMT_U32 => 4usize,
502         VIRTIO_SND_PCM_FMT_FLOAT => 4usize,
503         VIRTIO_SND_PCM_FMT_FLOAT64 => 8usize,
504         // VIRTIO_SND_PCM_FMT_DSD_U8
505         // VIRTIO_SND_PCM_FMT_DSD_U16
506         // VIRTIO_SND_PCM_FMT_DSD_U32
507         // VIRTIO_SND_PCM_FMT_IEC958_SUBFRAME
508         // VIRTIO_SND_PCM_FMT_S18_3
509         // VIRTIO_SND_PCM_FMT_U18_3
510         // VIRTIO_SND_PCM_FMT_S20_3
511         // VIRTIO_SND_PCM_FMT_U20_3
512         // VIRTIO_SND_PCM_FMT_S24_3
513         // VIRTIO_SND_PCM_FMT_U24_3
514         // VIRTIO_SND_PCM_FMT_S20
515         // VIRTIO_SND_PCM_FMT_U20
516         // VIRTIO_SND_PCM_FMT_S24
517         // VIRTIO_SND_PCM_FMT_U24
518         _ => {
519             // Some of these formats are not consistently stored in a particular size (24bits is
520             // sometimes stored in a 32bit word) while others are of variable size.
521             // The size per sample estimated here is designed to greatly underestimate the time it
522             // takes to play a buffer and depend instead on timings provided by the sound server if
523             // it supports these formats.
524             warn!(
525                 "Unknown sample size for format {}, depending on sound server timing instead.",
526                 format
527             );
528             1000usize
529         }
530     }
531 }
532