• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io::Read;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9 
10 use base::error;
11 use base::warn;
12 use base::Event;
13 use base::EventToken;
14 use base::WaitContext;
15 use data_model::Le32;
16 use sync::Mutex;
17 use zerocopy::AsBytes;
18 
19 use super::super::constants::*;
20 use super::super::layout::*;
21 use super::streams::*;
22 use super::Result;
23 use super::SoundError;
24 use super::*;
25 use crate::virtio::DescriptorChain;
26 use crate::virtio::Interrupt;
27 use crate::virtio::Queue;
28 
29 pub struct Worker {
30     // Lock order: Must never hold more than one queue lock at the same time.
31     interrupt: Interrupt,
32     pub control_queue: Arc<Mutex<Queue>>,
33     pub event_queue: Option<Queue>,
34     vios_client: Arc<Mutex<VioSClient>>,
35     streams: Vec<StreamProxy>,
36     pub tx_queue: Arc<Mutex<Queue>>,
37     pub rx_queue: Arc<Mutex<Queue>>,
38     io_thread: Option<thread::JoinHandle<Result<()>>>,
39     io_kill: Event,
40     // saved_stream_state holds the previous state of streams. When the sound device is newly
41     // created, this will be empty. It will only contain state if the sound device is put to sleep
42     // OR if we restore a VM.
43     pub saved_stream_state: Vec<StreamSnapshot>,
44 }
45 
46 impl Worker {
47     /// Creates a new virtio-snd worker.
try_new( vios_client: Arc<Mutex<VioSClient>>, interrupt: Interrupt, control_queue: Arc<Mutex<Queue>>, event_queue: Queue, tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, saved_stream_state: Vec<StreamSnapshot>, ) -> Result<Worker>48     pub fn try_new(
49         vios_client: Arc<Mutex<VioSClient>>,
50         interrupt: Interrupt,
51         control_queue: Arc<Mutex<Queue>>,
52         event_queue: Queue,
53         tx_queue: Arc<Mutex<Queue>>,
54         rx_queue: Arc<Mutex<Queue>>,
55         saved_stream_state: Vec<StreamSnapshot>,
56     ) -> Result<Worker> {
57         let num_streams = vios_client.lock().num_streams();
58         let mut streams: Vec<StreamProxy> = Vec::with_capacity(num_streams as usize);
59         {
60             for stream_id in 0..num_streams {
61                 let capture = vios_client
62                     .lock()
63                     .stream_info(stream_id)
64                     .map(|i| i.direction == VIRTIO_SND_D_INPUT)
65                     .unwrap_or(false);
66                 let io_queue = if capture { &rx_queue } else { &tx_queue };
67                 streams.push(Stream::try_new(
68                     stream_id,
69                     vios_client.clone(),
70                     interrupt.clone(),
71                     control_queue.clone(),
72                     io_queue.clone(),
73                     capture,
74                     saved_stream_state.get(stream_id as usize).cloned(),
75                 )?);
76             }
77         }
78         let (self_kill_io, kill_io) = Event::new()
79             .and_then(|e| Ok((e.try_clone()?, e)))
80             .map_err(SoundError::CreateEvent)?;
81 
82         let interrupt_clone = interrupt.clone();
83         let senders: Vec<Sender<Box<StreamMsg>>> =
84             streams.iter().map(|sp| sp.msg_sender().clone()).collect();
85         let tx_queue_thread = tx_queue.clone();
86         let rx_queue_thread = rx_queue.clone();
87         let io_thread = thread::Builder::new()
88             .name("v_snd_io".to_string())
89             .spawn(move || {
90                 try_set_real_time_priority();
91 
92                 io_loop(
93                     interrupt_clone,
94                     tx_queue_thread,
95                     rx_queue_thread,
96                     senders,
97                     kill_io,
98                 )
99             })
100             .map_err(SoundError::CreateThread)?;
101         Ok(Worker {
102             interrupt,
103             control_queue,
104             event_queue: Some(event_queue),
105             vios_client,
106             streams,
107             tx_queue,
108             rx_queue,
109             io_thread: Some(io_thread),
110             io_kill: self_kill_io,
111             saved_stream_state: Vec::new(),
112         })
113     }
114 
115     /// Emulates the virtio-snd device. It won't return until something is written to the kill_evt
116     /// event or an unrecoverable error occurs.
control_loop(&mut self, kill_evt: Event) -> Result<()>117     pub fn control_loop(&mut self, kill_evt: Event) -> Result<()> {
118         let event_notifier = self
119             .vios_client
120             .lock()
121             .get_event_notifier()
122             .map_err(SoundError::ClientEventNotifier)?;
123         #[derive(EventToken)]
124         enum Token {
125             ControlQAvailable,
126             EventQAvailable,
127             InterruptResample,
128             EventTriggered,
129             Kill,
130         }
131         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
132             (self.control_queue.lock().event(), Token::ControlQAvailable),
133             (
134                 self.event_queue.as_ref().expect("queue missing").event(),
135                 Token::EventQAvailable,
136             ),
137             (&event_notifier, Token::EventTriggered),
138             (&kill_evt, Token::Kill),
139         ])
140         .map_err(SoundError::WaitCtx)?;
141 
142         if let Some(resample_evt) = self.interrupt.get_resample_evt() {
143             wait_ctx
144                 .add(resample_evt, Token::InterruptResample)
145                 .map_err(SoundError::WaitCtx)?;
146         }
147         let mut event_queue = self.event_queue.take().expect("event_queue missing");
148         'wait: loop {
149             let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
150 
151             for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
152                 match wait_evt.token {
153                     Token::ControlQAvailable => {
154                         self.control_queue
155                             .lock()
156                             .event()
157                             .wait()
158                             .map_err(SoundError::QueueEvt)?;
159                         self.process_controlq_buffers()?;
160                     }
161                     Token::EventQAvailable => {
162                         // Just read from the event object to make sure the producer of such events
163                         // never blocks. The buffers will only be used when actual virtio-snd
164                         // events are triggered.
165                         event_queue.event().wait().map_err(SoundError::QueueEvt)?;
166                     }
167                     Token::EventTriggered => {
168                         event_notifier.wait().map_err(SoundError::QueueEvt)?;
169                         self.process_event_triggered(&mut event_queue)?;
170                     }
171                     Token::InterruptResample => {
172                         self.interrupt.interrupt_resample();
173                     }
174                     Token::Kill => {
175                         let _ = kill_evt.wait();
176                         break 'wait;
177                     }
178                 }
179             }
180         }
181         self.saved_stream_state = self
182             .streams
183             .drain(..)
184             .map(|stream| stream.stop_thread())
185             .collect();
186         self.event_queue = Some(event_queue);
187         Ok(())
188     }
189 
stop_io_thread(&mut self)190     fn stop_io_thread(&mut self) {
191         if let Err(e) = self.io_kill.signal() {
192             error!(
193                 "virtio-snd: Failed to send Break msg to stream thread: {}",
194                 e
195             );
196         }
197         if let Some(th) = self.io_thread.take() {
198             match th.join() {
199                 Err(e) => {
200                     error!("virtio-snd: Panic detected on stream thread: {:?}", e);
201                 }
202                 Ok(r) => {
203                     if let Err(e) = r {
204                         error!("virtio-snd: IO thread exited with and error: {}", e);
205                     }
206                 }
207             }
208         }
209     }
210 
211     // Pops and handles all available ontrol queue buffers. Logs minor errors, but returns an
212     // Err if it encounters an unrecoverable error.
process_controlq_buffers(&mut self) -> Result<()>213     fn process_controlq_buffers(&mut self) -> Result<()> {
214         while let Some(mut avail_desc) = lock_pop_unlock(&self.control_queue) {
215             let reader = &mut avail_desc.reader;
216             let available_bytes = reader.available_bytes();
217             if available_bytes < std::mem::size_of::<virtio_snd_hdr>() {
218                 error!(
219                     "virtio-snd: Message received on control queue is too small: {}",
220                     available_bytes
221                 );
222                 return reply_control_op_status(
223                     VIRTIO_SND_S_BAD_MSG,
224                     avail_desc,
225                     &self.control_queue,
226                     &self.interrupt,
227                 );
228             }
229             let mut read_buf = vec![0u8; available_bytes];
230             reader
231                 .read_exact(&mut read_buf)
232                 .map_err(SoundError::QueueIO)?;
233             let mut code: Le32 = Default::default();
234             // need to copy because the buffer may not be properly aligned
235             code.as_bytes_mut()
236                 .copy_from_slice(&read_buf[..std::mem::size_of::<Le32>()]);
237             let request_type = code.to_native();
238             match request_type {
239                 VIRTIO_SND_R_JACK_INFO => {
240                     let (code, info_vec) = {
241                         match self.parse_info_query(&read_buf) {
242                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
243                             Some((start_id, count)) => {
244                                 let end_id = start_id.saturating_add(count);
245                                 if end_id > self.vios_client.lock().num_jacks() {
246                                     error!(
247                                         "virtio-snd: Requested info on invalid jacks ids: {}..{}",
248                                         start_id,
249                                         end_id - 1
250                                     );
251                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
252                                 } else {
253                                     (
254                                         VIRTIO_SND_S_OK,
255                                         // Safe to unwrap because we just ensured all the ids are
256                                         // valid
257                                         (start_id..end_id)
258                                             .map(|id| {
259                                                 self.vios_client.lock().jack_info(id).unwrap()
260                                             })
261                                             .collect(),
262                                     )
263                                 }
264                             }
265                         }
266                     };
267                     self.send_info_reply(avail_desc, code, info_vec)?;
268                 }
269                 VIRTIO_SND_R_JACK_REMAP => {
270                     let code = if read_buf.len() != std::mem::size_of::<virtio_snd_jack_remap>() {
271                         error!(
272                         "virtio-snd: The driver sent the wrong number bytes for a jack_remap struct: {}",
273                         read_buf.len()
274                         );
275                         VIRTIO_SND_S_BAD_MSG
276                     } else {
277                         let mut request: virtio_snd_jack_remap = Default::default();
278                         request.as_bytes_mut().copy_from_slice(&read_buf);
279                         let jack_id = request.hdr.jack_id.to_native();
280                         let association = request.association.to_native();
281                         let sequence = request.sequence.to_native();
282                         if let Err(e) =
283                             self.vios_client
284                                 .lock()
285                                 .remap_jack(jack_id, association, sequence)
286                         {
287                             error!("virtio-snd: Failed to remap jack: {}", e);
288                             vios_error_to_status_code(e)
289                         } else {
290                             VIRTIO_SND_S_OK
291                         }
292                     };
293                     let writer = &mut avail_desc.writer;
294                     writer
295                         .write_obj(virtio_snd_hdr {
296                             code: Le32::from(code),
297                         })
298                         .map_err(SoundError::QueueIO)?;
299                     let len = writer.bytes_written() as u32;
300                     {
301                         let mut queue_lock = self.control_queue.lock();
302                         queue_lock.add_used(avail_desc, len);
303                         queue_lock.trigger_interrupt(&self.interrupt);
304                     }
305                 }
306                 VIRTIO_SND_R_CHMAP_INFO => {
307                     let (code, info_vec) = {
308                         match self.parse_info_query(&read_buf) {
309                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
310                             Some((start_id, count)) => {
311                                 let end_id = start_id.saturating_add(count);
312                                 let num_chmaps = self.vios_client.lock().num_chmaps();
313                                 if end_id > num_chmaps {
314                                     error!(
315                                         "virtio-snd: Requested info on invalid chmaps ids: {}..{}",
316                                         start_id,
317                                         end_id - 1
318                                     );
319                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
320                                 } else {
321                                     (
322                                         VIRTIO_SND_S_OK,
323                                         // Safe to unwrap because we just ensured all the ids are
324                                         // valid
325                                         (start_id..end_id)
326                                             .map(|id| {
327                                                 self.vios_client.lock().chmap_info(id).unwrap()
328                                             })
329                                             .collect(),
330                                     )
331                                 }
332                             }
333                         }
334                     };
335                     self.send_info_reply(avail_desc, code, info_vec)?;
336                 }
337                 VIRTIO_SND_R_PCM_INFO => {
338                     let (code, info_vec) = {
339                         match self.parse_info_query(&read_buf) {
340                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
341                             Some((start_id, count)) => {
342                                 let end_id = start_id.saturating_add(count);
343                                 if end_id > self.vios_client.lock().num_streams() {
344                                     error!(
345                                         "virtio-snd: Requested info on invalid stream ids: {}..{}",
346                                         start_id,
347                                         end_id - 1
348                                     );
349                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
350                                 } else {
351                                     (
352                                         VIRTIO_SND_S_OK,
353                                         // Safe to unwrap because we just ensured all the ids are
354                                         // valid
355                                         (start_id..end_id)
356                                             .map(|id| {
357                                                 self.vios_client.lock().stream_info(id).unwrap()
358                                             })
359                                             .collect(),
360                                     )
361                                 }
362                             }
363                         }
364                     };
365                     self.send_info_reply(avail_desc, code, info_vec)?;
366                 }
367                 VIRTIO_SND_R_PCM_SET_PARAMS => self.process_set_params(avail_desc, &read_buf)?,
368                 VIRTIO_SND_R_PCM_PREPARE => {
369                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Prepare(avail_desc))?
370                 }
371                 VIRTIO_SND_R_PCM_RELEASE => {
372                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Release(avail_desc))?
373                 }
374                 VIRTIO_SND_R_PCM_START => {
375                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Start(avail_desc))?
376                 }
377                 VIRTIO_SND_R_PCM_STOP => {
378                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Stop(avail_desc))?
379                 }
380                 _ => {
381                     error!(
382                         "virtio-snd: Unknown control queue mesage code: {}",
383                         request_type
384                     );
385                     reply_control_op_status(
386                         VIRTIO_SND_S_NOT_SUPP,
387                         avail_desc,
388                         &self.control_queue,
389                         &self.interrupt,
390                     )?;
391                 }
392             }
393         }
394         Ok(())
395     }
396 
process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()>397     fn process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()> {
398         while let Some(evt) = self.vios_client.lock().pop_event() {
399             if let Some(mut desc) = event_queue.pop() {
400                 let writer = &mut desc.writer;
401                 writer.write_obj(evt).map_err(SoundError::QueueIO)?;
402                 let len = writer.bytes_written() as u32;
403                 event_queue.add_used(desc, len);
404                 event_queue.trigger_interrupt(&self.interrupt);
405             } else {
406                 warn!("virtio-snd: Dropping event because there are no buffers in virtqueue");
407             }
408         }
409         Ok(())
410     }
411 
parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)>412     fn parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)> {
413         if read_buf.len() != std::mem::size_of::<virtio_snd_query_info>() {
414             error!(
415                 "virtio-snd: The driver sent the wrong number bytes for a pcm_info struct: {}",
416                 read_buf.len()
417             );
418             return None;
419         }
420         let mut query: virtio_snd_query_info = Default::default();
421         query.as_bytes_mut().copy_from_slice(read_buf);
422         let start_id = query.start_id.to_native();
423         let count = query.count.to_native();
424         Some((start_id, count))
425     }
426 
427     // Returns Err if it encounters an unrecoverable error, Ok otherwise
process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()>428     fn process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()> {
429         if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_set_params>() {
430             error!(
431                 "virtio-snd: The driver sent a buffer of the wrong size for a set_params struct: {}",
432                 read_buf.len()
433                 );
434             return reply_control_op_status(
435                 VIRTIO_SND_S_BAD_MSG,
436                 desc,
437                 &self.control_queue,
438                 &self.interrupt,
439             );
440         }
441         let mut params: virtio_snd_pcm_set_params = Default::default();
442         params.as_bytes_mut().copy_from_slice(read_buf);
443         let stream_id = params.hdr.stream_id.to_native();
444         if stream_id < self.vios_client.lock().num_streams() {
445             self.streams[stream_id as usize].send(StreamMsg::SetParams(desc, params))
446         } else {
447             error!(
448                 "virtio-snd: Driver requested operation on invalid stream: {}",
449                 stream_id
450             );
451             reply_control_op_status(
452                 VIRTIO_SND_S_BAD_MSG,
453                 desc,
454                 &self.control_queue,
455                 &self.interrupt,
456             )
457         }
458     }
459 
460     // Returns Err if it encounters an unrecoverable error, Ok otherwise
try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()>461     fn try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()> {
462         if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_hdr>() {
463             error!(
464                 "virtio-snd: The driver sent a buffer too small to contain a header: {}",
465                 read_buf.len()
466             );
467             return reply_control_op_status(
468                 VIRTIO_SND_S_BAD_MSG,
469                 match msg {
470                     StreamMsg::Prepare(d)
471                     | StreamMsg::Start(d)
472                     | StreamMsg::Stop(d)
473                     | StreamMsg::Release(d) => d,
474                     _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
475                 },
476                 &self.control_queue,
477                 &self.interrupt,
478             );
479         }
480         let mut pcm_hdr: virtio_snd_pcm_hdr = Default::default();
481         pcm_hdr.as_bytes_mut().copy_from_slice(read_buf);
482         let stream_id = pcm_hdr.stream_id.to_native();
483         if stream_id < self.vios_client.lock().num_streams() {
484             self.streams[stream_id as usize].send(msg)
485         } else {
486             error!(
487                 "virtio-snd: Driver requested operation on invalid stream: {}",
488                 stream_id
489             );
490             reply_control_op_status(
491                 VIRTIO_SND_S_BAD_MSG,
492                 match msg {
493                     StreamMsg::Prepare(d)
494                     | StreamMsg::Start(d)
495                     | StreamMsg::Stop(d)
496                     | StreamMsg::Release(d) => d,
497                     _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
498                 },
499                 &self.control_queue,
500                 &self.interrupt,
501             )
502         }
503     }
504 
send_info_reply<T: AsBytes>( &mut self, mut desc: DescriptorChain, code: u32, info_vec: Vec<T>, ) -> Result<()>505     fn send_info_reply<T: AsBytes>(
506         &mut self,
507         mut desc: DescriptorChain,
508         code: u32,
509         info_vec: Vec<T>,
510     ) -> Result<()> {
511         let writer = &mut desc.writer;
512         writer
513             .write_obj(virtio_snd_hdr {
514                 code: Le32::from(code),
515             })
516             .map_err(SoundError::QueueIO)?;
517         for info in info_vec {
518             writer.write_obj(info).map_err(SoundError::QueueIO)?;
519         }
520         let len = writer.bytes_written() as u32;
521         {
522             let mut queue_lock = self.control_queue.lock();
523             queue_lock.add_used(desc, len);
524             queue_lock.trigger_interrupt(&self.interrupt);
525         }
526         Ok(())
527     }
528 }
529 
530 impl Drop for Worker {
drop(&mut self)531     fn drop(&mut self) {
532         self.stop_io_thread();
533     }
534 }
535 
io_loop( interrupt: Interrupt, tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, senders: Vec<Sender<Box<StreamMsg>>>, kill_evt: Event, ) -> Result<()>536 fn io_loop(
537     interrupt: Interrupt,
538     tx_queue: Arc<Mutex<Queue>>,
539     rx_queue: Arc<Mutex<Queue>>,
540     senders: Vec<Sender<Box<StreamMsg>>>,
541     kill_evt: Event,
542 ) -> Result<()> {
543     #[derive(EventToken)]
544     enum Token {
545         TxQAvailable,
546         RxQAvailable,
547         Kill,
548     }
549     let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
550         (tx_queue.lock().event(), Token::TxQAvailable),
551         (rx_queue.lock().event(), Token::RxQAvailable),
552         (&kill_evt, Token::Kill),
553     ])
554     .map_err(SoundError::WaitCtx)?;
555 
556     'wait: loop {
557         let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
558         for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
559             let queue = match wait_evt.token {
560                 Token::TxQAvailable => {
561                     tx_queue
562                         .lock()
563                         .event()
564                         .wait()
565                         .map_err(SoundError::QueueEvt)?;
566                     &tx_queue
567                 }
568                 Token::RxQAvailable => {
569                     rx_queue
570                         .lock()
571                         .event()
572                         .wait()
573                         .map_err(SoundError::QueueEvt)?;
574                     &rx_queue
575                 }
576                 Token::Kill => {
577                     let _ = kill_evt.wait();
578                     break 'wait;
579                 }
580             };
581             while let Some(mut avail_desc) = lock_pop_unlock(queue) {
582                 let reader = &mut avail_desc.reader;
583                 let xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(SoundError::QueueIO)?;
584                 let stream_id = xfer.stream_id.to_native();
585                 if stream_id as usize >= senders.len() {
586                     error!(
587                         "virtio-snd: Driver sent buffer for invalid stream: {}",
588                         stream_id
589                     );
590                     reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, avail_desc, queue, &interrupt)?;
591                 } else {
592                     StreamProxy::send_msg(
593                         &senders[stream_id as usize],
594                         StreamMsg::Buffer(avail_desc),
595                     )?;
596                 }
597             }
598         }
599     }
600     Ok(())
601 }
602 
603 // If queue.lock().pop() is used directly in the condition of a 'while' loop the lock is held over
604 // the entire loop block. Encapsulating it in this fuction guarantees that the lock is dropped
605 // immediately after pop() is called, which allows the code to remain somewhat simpler.
lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain>606 fn lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain> {
607     queue.lock().pop()
608 }
609