• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::VecDeque;
6 use std::sync::mpsc::channel;
7 use std::sync::mpsc::Receiver;
8 use std::sync::mpsc::Sender;
9 use std::sync::Arc;
10 use std::thread;
11 use std::time::Duration;
12 use std::time::Instant;
13 
14 use base::error;
15 use base::set_rt_prio_limit;
16 use base::set_rt_round_robin;
17 use base::warn;
18 use data_model::Le32;
19 use sync::Mutex;
20 use vm_memory::GuestMemory;
21 
22 use super::Error as VioSError;
23 use super::Result;
24 use super::SoundError;
25 use super::*;
26 use crate::virtio::snd::common::from_virtio_frame_rate;
27 use crate::virtio::snd::constants::*;
28 use crate::virtio::snd::layout::*;
29 use crate::virtio::DescriptorChain;
30 use crate::virtio::Interrupt;
31 use crate::virtio::Queue;
32 use crate::virtio::Reader;
33 use crate::virtio::Writer;
34 
35 /// Messages that the worker can send to the stream (thread).
36 pub enum StreamMsg {
37     SetParams(DescriptorChain, virtio_snd_pcm_set_params),
38     Prepare(DescriptorChain),
39     Start(DescriptorChain),
40     Stop(DescriptorChain),
41     Release(DescriptorChain),
42     Buffer(DescriptorChain),
43     Break,
44 }
45 
46 enum StreamState {
47     New,
48     ParamsSet,
49     Prepared,
50     Started,
51     Stopped,
52     Released,
53 }
54 
55 pub struct Stream {
56     stream_id: u32,
57     receiver: Receiver<StreamMsg>,
58     vios_client: Arc<VioSClient>,
59     guest_memory: GuestMemory,
60     control_queue: Arc<Mutex<Queue>>,
61     io_queue: Arc<Mutex<Queue>>,
62     interrupt: Interrupt,
63     capture: bool,
64     current_state: StreamState,
65     period: Duration,
66     start_time: Instant,
67     next_buffer: Duration,
68     buffer_queue: VecDeque<DescriptorChain>,
69 }
70 
71 impl Stream {
72     /// Start a new stream thread and return its handler.
try_new( stream_id: u32, vios_client: Arc<VioSClient>, guest_memory: GuestMemory, interrupt: Interrupt, control_queue: Arc<Mutex<Queue>>, io_queue: Arc<Mutex<Queue>>, capture: bool, ) -> Result<StreamProxy>73     pub fn try_new(
74         stream_id: u32,
75         vios_client: Arc<VioSClient>,
76         guest_memory: GuestMemory,
77         interrupt: Interrupt,
78         control_queue: Arc<Mutex<Queue>>,
79         io_queue: Arc<Mutex<Queue>>,
80         capture: bool,
81     ) -> Result<StreamProxy> {
82         let (sender, receiver): (Sender<StreamMsg>, Receiver<StreamMsg>) = channel();
83         let thread = thread::Builder::new()
84             .name(format!("v_snd_stream:{stream_id}"))
85             .spawn(move || {
86                 try_set_real_time_priority();
87 
88                 let mut stream = Stream {
89                     stream_id,
90                     receiver,
91                     vios_client,
92                     guest_memory,
93                     control_queue,
94                     io_queue,
95                     interrupt,
96                     capture,
97                     current_state: StreamState::New,
98                     period: Duration::from_millis(0),
99                     start_time: Instant::now(),
100                     next_buffer: Duration::from_millis(0),
101                     buffer_queue: VecDeque::new(),
102                 };
103 
104                 if let Err(e) = stream.stream_loop() {
105                     error!("virtio-snd: Error in stream {}: {}", stream_id, e);
106                 }
107             })
108             .map_err(SoundError::CreateThread)?;
109         Ok(StreamProxy {
110             sender,
111             thread: Some(thread),
112         })
113     }
114 
stream_loop(&mut self) -> Result<()>115     fn stream_loop(&mut self) -> Result<()> {
116         loop {
117             if !self.recv_msg()? {
118                 break;
119             }
120             self.maybe_process_queued_buffers()?;
121         }
122         Ok(())
123     }
124 
recv_msg(&mut self) -> Result<bool>125     fn recv_msg(&mut self) -> Result<bool> {
126         let msg = self.receiver.recv().map_err(SoundError::StreamThreadRecv)?;
127         let (code, desc, next_state) = match msg {
128             StreamMsg::SetParams(desc, params) => {
129                 let code = match self.vios_client.set_stream_parameters_raw(params) {
130                     Ok(()) => {
131                         let frame_rate = from_virtio_frame_rate(params.rate).unwrap_or(0) as u64;
132                         self.period = Duration::from_millis(
133                             (params.period_bytes.to_native() as u64 * 1000u64)
134                                 / frame_rate
135                                 / params.channels as u64
136                                 / bytes_per_sample(params.format) as u64,
137                         );
138                         VIRTIO_SND_S_OK
139                     }
140                     Err(e) => {
141                         error!(
142                             "virtio-snd: Error setting parameters for stream {}: {}",
143                             self.stream_id, e
144                         );
145                         vios_error_to_status_code(e)
146                     }
147                 };
148                 (code, desc, StreamState::ParamsSet)
149             }
150             StreamMsg::Prepare(desc) => {
151                 let code = match self.vios_client.prepare_stream(self.stream_id) {
152                     Ok(()) => VIRTIO_SND_S_OK,
153                     Err(e) => {
154                         error!(
155                             "virtio-snd: Failed to prepare stream {}: {}",
156                             self.stream_id, e
157                         );
158                         vios_error_to_status_code(e)
159                     }
160                 };
161                 (code, desc, StreamState::Prepared)
162             }
163             StreamMsg::Start(desc) => {
164                 let code = match self.vios_client.start_stream(self.stream_id) {
165                     Ok(()) => VIRTIO_SND_S_OK,
166                     Err(e) => {
167                         error!(
168                             "virtio-snd: Failed to start stream {}: {}",
169                             self.stream_id, e
170                         );
171                         vios_error_to_status_code(e)
172                     }
173                 };
174                 self.start_time = Instant::now();
175                 self.next_buffer = Duration::from_millis(0);
176                 (code, desc, StreamState::Started)
177             }
178             StreamMsg::Stop(desc) => {
179                 let code = match self.vios_client.stop_stream(self.stream_id) {
180                     Ok(()) => VIRTIO_SND_S_OK,
181                     Err(e) => {
182                         error!(
183                             "virtio-snd: Failed to stop stream {}: {}",
184                             self.stream_id, e
185                         );
186                         vios_error_to_status_code(e)
187                     }
188                 };
189                 (code, desc, StreamState::Stopped)
190             }
191             StreamMsg::Release(desc) => {
192                 let code = match self.vios_client.release_stream(self.stream_id) {
193                     Ok(()) => VIRTIO_SND_S_OK,
194                     Err(e) => {
195                         error!(
196                             "virtio-snd: Failed to release stream {}: {}",
197                             self.stream_id, e
198                         );
199                         vios_error_to_status_code(e)
200                     }
201                 };
202                 (code, desc, StreamState::Released)
203             }
204             StreamMsg::Buffer(d) => {
205                 // Buffers may arrive while in several states:
206                 // - Prepared: Buffer should be queued and played when start cmd arrives
207                 // - Started: Buffer should be processed immediately
208                 // - Stopped: Buffer should be returned to the guest immediately
209                 // Because we may need to wait to process the buffer, we always queue it and
210                 // decide what to do with queued buffers after every message.
211                 self.buffer_queue.push_back(d);
212                 // return here to avoid replying on control queue below
213                 return Ok(true);
214             }
215             StreamMsg::Break => {
216                 return Ok(false);
217             }
218         };
219         reply_control_op_status(
220             code,
221             desc,
222             &self.guest_memory,
223             &self.control_queue,
224             &self.interrupt,
225         )?;
226         self.current_state = next_state;
227         Ok(true)
228     }
229 
maybe_process_queued_buffers(&mut self) -> Result<()>230     fn maybe_process_queued_buffers(&mut self) -> Result<()> {
231         match self.current_state {
232             StreamState::Started => {
233                 while let Some(desc) = self.buffer_queue.pop_front() {
234                     let mut reader = Reader::new(self.guest_memory.clone(), desc.clone())
235                         .map_err(SoundError::CreateReader)?;
236                     // Ignore the first buffer, it was already read by the time this thread
237                     // receives the descriptor
238                     reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
239                     let desc_index = desc.index;
240                     let mut writer = Writer::new(self.guest_memory.clone(), desc)
241                         .map_err(SoundError::CreateWriter)?;
242                     let io_res = if self.capture {
243                         let buffer_size =
244                             writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>();
245                         self.vios_client
246                             .request_audio_data(self.stream_id, buffer_size, |vslice| {
247                                 writer.write_from_volatile_slice(*vslice)
248                             })
249                     } else {
250                         self.vios_client.inject_audio_data(
251                             self.stream_id,
252                             reader.available_bytes(),
253                             |vslice| reader.read_to_volatile_slice(vslice),
254                         )
255                     };
256                     let (code, latency) = match io_res {
257                         Ok((latency, _)) => (VIRTIO_SND_S_OK, latency),
258                         Err(e) => {
259                             error!(
260                                 "virtio-snd: Failed IO operation in stream {}: {}",
261                                 self.stream_id, e
262                             );
263                             (VIRTIO_SND_S_IO_ERR, 0)
264                         }
265                     };
266                     if let Err(e) = writer.write_obj(virtio_snd_pcm_status {
267                         status: Le32::from(code),
268                         latency_bytes: Le32::from(latency),
269                     }) {
270                         error!(
271                             "virtio-snd: Failed to write pcm status from stream {} thread: {}",
272                             self.stream_id, e
273                         );
274                     }
275 
276                     self.next_buffer += self.period;
277                     let elapsed = self.start_time.elapsed();
278                     if elapsed < self.next_buffer {
279                         // Completing an IO request can be considered an elapsed period
280                         // notification by the driver, so we must wait the right amount of time to
281                         // release the buffer if the sound server client returned too soon.
282                         std::thread::sleep(self.next_buffer - elapsed);
283                     }
284                     {
285                         let mut io_queue_lock = self.io_queue.lock();
286                         io_queue_lock.add_used(
287                             &self.guest_memory,
288                             desc_index,
289                             writer.bytes_written() as u32,
290                         );
291                         io_queue_lock.trigger_interrupt(&self.guest_memory, &self.interrupt);
292                     }
293                 }
294             }
295             StreamState::Stopped | StreamState::Released => {
296                 // For some reason playback buffers can arrive after stop and release (maybe because
297                 // buffer-ready notifications arrive over eventfds and those are processed in
298                 // random order?). The spec requires the device to not confirm the release of a
299                 // stream until all IO buffers have been released, but that's impossible to
300                 // guarantee if a buffer arrives after release is requested. Luckily it seems to
301                 // work fine if the buffer is released after the release command is completed.
302                 while let Some(desc) = self.buffer_queue.pop_front() {
303                     reply_pcm_buffer_status(
304                         VIRTIO_SND_S_OK,
305                         0,
306                         desc,
307                         &self.guest_memory,
308                         &self.io_queue,
309                         &self.interrupt,
310                     )?;
311                 }
312             }
313             StreamState::Prepared => {} // Do nothing, any buffers will be processed after start
314             _ => {
315                 if self.buffer_queue.len() > 0 {
316                     warn!("virtio-snd: Buffers received while in unexpected state");
317                 }
318             }
319         }
320         Ok(())
321     }
322 }
323 
324 impl Drop for Stream {
drop(&mut self)325     fn drop(&mut self) {
326         // Try to stop and release the stream in case it was playing, these operations will fail if the
327         // stream is already released, just ignore that failure
328         let _ = self.vios_client.stop_stream(self.stream_id);
329         let _ = self.vios_client.release_stream(self.stream_id);
330 
331         // Also release any pending buffer
332         while let Some(desc) = self.buffer_queue.pop_front() {
333             if let Err(e) = reply_pcm_buffer_status(
334                 VIRTIO_SND_S_IO_ERR,
335                 0,
336                 desc,
337                 &self.guest_memory,
338                 &self.io_queue,
339                 &self.interrupt,
340             ) {
341                 error!(
342                     "virtio-snd: Failed to reply buffer on stream {}: {}",
343                     self.stream_id, e
344                 );
345             }
346         }
347     }
348 }
349 
350 /// Basically a proxy to the thread handling a particular stream.
351 pub struct StreamProxy {
352     sender: Sender<StreamMsg>,
353     thread: Option<thread::JoinHandle<()>>,
354 }
355 
356 impl StreamProxy {
357     /// Access the underlying sender to clone it or send messages
msg_sender(&self) -> &Sender<StreamMsg>358     pub fn msg_sender(&self) -> &Sender<StreamMsg> {
359         &self.sender
360     }
361 
362     /// Send a message to the stream thread on the other side of this sender
send_msg(sender: &Sender<StreamMsg>, msg: StreamMsg) -> Result<()>363     pub fn send_msg(sender: &Sender<StreamMsg>, msg: StreamMsg) -> Result<()> {
364         sender.send(msg).map_err(SoundError::StreamThreadSend)
365     }
366 
367     /// Convenience function to send a message to this stream's thread
send(&self, msg: StreamMsg) -> Result<()>368     pub fn send(&self, msg: StreamMsg) -> Result<()> {
369         Self::send_msg(&self.sender, msg)
370     }
371 
stop_thread(&mut self)372     fn stop_thread(&mut self) {
373         if let Err(e) = self.send(StreamMsg::Break) {
374             error!(
375                 "virtio-snd: Failed to send Break msg to stream thread: {}",
376                 e
377             );
378         }
379         if let Some(th) = self.thread.take() {
380             if let Err(e) = th.join() {
381                 error!("virtio-snd: Panic detected on stream thread: {:?}", e);
382             }
383         }
384     }
385 }
386 
387 impl Drop for StreamProxy {
drop(&mut self)388     fn drop(&mut self) {
389         self.stop_thread();
390     }
391 }
392 
393 /// Attempts to set the current thread's priority to a value hight enough to handle audio IO. This
394 /// may fail due to insuficient permissions.
try_set_real_time_priority()395 pub fn try_set_real_time_priority() {
396     const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
397     if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
398         .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
399     {
400         warn!("Failed to set audio stream thread to real time: {}", e);
401     }
402 }
403 
404 /// Gets the appropriate virtio-snd error to return to the driver from a VioSError.
vios_error_to_status_code(e: VioSError) -> u32405 pub fn vios_error_to_status_code(e: VioSError) -> u32 {
406     match e {
407         VioSError::ServerIOError(_) => VIRTIO_SND_S_IO_ERR,
408         _ => VIRTIO_SND_S_NOT_SUPP,
409     }
410 }
411 
412 /// Encapsulates sending the virtio_snd_hdr struct back to the driver.
reply_control_op_status( code: u32, desc: DescriptorChain, guest_memory: &GuestMemory, queue: &Arc<Mutex<Queue>>, interrupt: &Interrupt, ) -> Result<()>413 pub fn reply_control_op_status(
414     code: u32,
415     desc: DescriptorChain,
416     guest_memory: &GuestMemory,
417     queue: &Arc<Mutex<Queue>>,
418     interrupt: &Interrupt,
419 ) -> Result<()> {
420     let desc_index = desc.index;
421     let mut writer = Writer::new(guest_memory.clone(), desc).map_err(SoundError::Descriptor)?;
422     writer
423         .write_obj(virtio_snd_hdr {
424             code: Le32::from(code),
425         })
426         .map_err(SoundError::QueueIO)?;
427     {
428         let mut queue_lock = queue.lock();
429         queue_lock.add_used(guest_memory, desc_index, writer.bytes_written() as u32);
430         queue_lock.trigger_interrupt(guest_memory, interrupt);
431     }
432     Ok(())
433 }
434 
435 /// Encapsulates sending the virtio_snd_pcm_status struct back to the driver.
reply_pcm_buffer_status( status: u32, latency_bytes: u32, desc: DescriptorChain, guest_memory: &GuestMemory, queue: &Arc<Mutex<Queue>>, interrupt: &Interrupt, ) -> Result<()>436 pub fn reply_pcm_buffer_status(
437     status: u32,
438     latency_bytes: u32,
439     desc: DescriptorChain,
440     guest_memory: &GuestMemory,
441     queue: &Arc<Mutex<Queue>>,
442     interrupt: &Interrupt,
443 ) -> Result<()> {
444     let desc_index = desc.index;
445     let mut writer = Writer::new(guest_memory.clone(), desc).map_err(SoundError::Descriptor)?;
446     if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
447         writer
448             .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
449     }
450     writer
451         .write_obj(virtio_snd_pcm_status {
452             status: Le32::from(status),
453             latency_bytes: Le32::from(latency_bytes),
454         })
455         .map_err(SoundError::QueueIO)?;
456     {
457         let mut queue_lock = queue.lock();
458         queue_lock.add_used(guest_memory, desc_index, writer.bytes_written() as u32);
459         queue_lock.trigger_interrupt(guest_memory, interrupt);
460     }
461     Ok(())
462 }
463 
bytes_per_sample(format: u8) -> usize464 fn bytes_per_sample(format: u8) -> usize {
465     match format {
466         VIRTIO_SND_PCM_FMT_IMA_ADPCM => 1usize,
467         VIRTIO_SND_PCM_FMT_MU_LAW => 1usize,
468         VIRTIO_SND_PCM_FMT_A_LAW => 1usize,
469         VIRTIO_SND_PCM_FMT_S8 => 1usize,
470         VIRTIO_SND_PCM_FMT_U8 => 1usize,
471         VIRTIO_SND_PCM_FMT_S16 => 2usize,
472         VIRTIO_SND_PCM_FMT_U16 => 2usize,
473         VIRTIO_SND_PCM_FMT_S32 => 4usize,
474         VIRTIO_SND_PCM_FMT_U32 => 4usize,
475         VIRTIO_SND_PCM_FMT_FLOAT => 4usize,
476         VIRTIO_SND_PCM_FMT_FLOAT64 => 8usize,
477         // VIRTIO_SND_PCM_FMT_DSD_U8
478         // VIRTIO_SND_PCM_FMT_DSD_U16
479         // VIRTIO_SND_PCM_FMT_DSD_U32
480         // VIRTIO_SND_PCM_FMT_IEC958_SUBFRAME
481         // VIRTIO_SND_PCM_FMT_S18_3
482         // VIRTIO_SND_PCM_FMT_U18_3
483         // VIRTIO_SND_PCM_FMT_S20_3
484         // VIRTIO_SND_PCM_FMT_U20_3
485         // VIRTIO_SND_PCM_FMT_S24_3
486         // VIRTIO_SND_PCM_FMT_U24_3
487         // VIRTIO_SND_PCM_FMT_S20
488         // VIRTIO_SND_PCM_FMT_U20
489         // VIRTIO_SND_PCM_FMT_S24
490         // VIRTIO_SND_PCM_FMT_U24
491         _ => {
492             // Some of these formats are not consistently stored in a particular size (24bits is
493             // sometimes stored in a 32bit word) while others are of variable size.
494             // The size per sample estimated here is designed to greatly underestimate the time it
495             // takes to play a buffer and depend instead on timings provided by the sound server if
496             // it supports these formats.
497             warn!(
498                 "Unknown sample size for format {}, depending on sound server timing instead.",
499                 format
500             );
501             1000usize
502         }
503     }
504 }
505