• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io::Read;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9 
10 use base::error;
11 use base::warn;
12 use base::Event;
13 use base::EventToken;
14 use base::WaitContext;
15 use data_model::Le32;
16 use sync::Mutex;
17 use vm_memory::GuestMemory;
18 use zerocopy::AsBytes;
19 
20 use super::super::constants::*;
21 use super::super::layout::*;
22 use super::streams::*;
23 use super::Result;
24 use super::SoundError;
25 use super::*;
26 use crate::virtio::DescriptorChain;
27 use crate::virtio::Interrupt;
28 use crate::virtio::Queue;
29 use crate::virtio::Reader;
30 use crate::virtio::SignalableInterrupt;
31 use crate::virtio::Writer;
32 
33 pub struct Worker {
34     // Lock order: Must never hold more than one queue lock at the same time.
35     interrupt: Interrupt,
36     control_queue: Arc<Mutex<Queue>>,
37     control_queue_evt: Event,
38     event_queue: Queue,
39     event_queue_evt: Event,
40     guest_memory: GuestMemory,
41     vios_client: Arc<VioSClient>,
42     streams: Vec<StreamProxy>,
43     io_thread: Option<thread::JoinHandle<Result<()>>>,
44     io_kill: Event,
45 }
46 
47 impl Worker {
48     /// Creates a new virtio-snd worker.
try_new( vios_client: Arc<VioSClient>, interrupt: Interrupt, guest_memory: GuestMemory, control_queue: Arc<Mutex<Queue>>, control_queue_evt: Event, event_queue: Queue, event_queue_evt: Event, tx_queue: Arc<Mutex<Queue>>, tx_queue_evt: Event, rx_queue: Arc<Mutex<Queue>>, rx_queue_evt: Event, ) -> Result<Worker>49     pub fn try_new(
50         vios_client: Arc<VioSClient>,
51         interrupt: Interrupt,
52         guest_memory: GuestMemory,
53         control_queue: Arc<Mutex<Queue>>,
54         control_queue_evt: Event,
55         event_queue: Queue,
56         event_queue_evt: Event,
57         tx_queue: Arc<Mutex<Queue>>,
58         tx_queue_evt: Event,
59         rx_queue: Arc<Mutex<Queue>>,
60         rx_queue_evt: Event,
61     ) -> Result<Worker> {
62         let mut streams: Vec<StreamProxy> = Vec::with_capacity(vios_client.num_streams() as usize);
63         {
64             for stream_id in 0..vios_client.num_streams() {
65                 let capture = vios_client
66                     .stream_info(stream_id)
67                     .map(|i| i.direction == VIRTIO_SND_D_INPUT)
68                     .unwrap_or(false);
69                 let io_queue = if capture { &rx_queue } else { &tx_queue };
70                 streams.push(Stream::try_new(
71                     stream_id,
72                     vios_client.clone(),
73                     guest_memory.clone(),
74                     interrupt.clone(),
75                     control_queue.clone(),
76                     io_queue.clone(),
77                     capture,
78                 )?);
79             }
80         }
81         let (self_kill_io, kill_io) = Event::new()
82             .and_then(|e| Ok((e.try_clone()?, e)))
83             .map_err(SoundError::CreateEvent)?;
84 
85         let interrupt_clone = interrupt.clone();
86         let guest_memory_clone = guest_memory.clone();
87         let senders: Vec<Sender<StreamMsg>> =
88             streams.iter().map(|sp| sp.msg_sender().clone()).collect();
89         let io_thread = thread::Builder::new()
90             .name("v_snd_io".to_string())
91             .spawn(move || {
92                 try_set_real_time_priority();
93 
94                 io_loop(
95                     interrupt_clone,
96                     guest_memory_clone,
97                     tx_queue,
98                     tx_queue_evt,
99                     rx_queue,
100                     rx_queue_evt,
101                     senders,
102                     kill_io,
103                 )
104             })
105             .map_err(SoundError::CreateThread)?;
106         Ok(Worker {
107             interrupt,
108             control_queue,
109             control_queue_evt,
110             event_queue,
111             event_queue_evt,
112             guest_memory,
113             vios_client,
114             streams,
115             io_thread: Some(io_thread),
116             io_kill: self_kill_io,
117         })
118     }
119 
120     /// Emulates the virtio-snd device. It won't return until something is written to the kill_evt
121     /// event or an unrecoverable error occurs.
control_loop(&mut self, kill_evt: Event) -> Result<()>122     pub fn control_loop(&mut self, kill_evt: Event) -> Result<()> {
123         let event_notifier = self
124             .vios_client
125             .get_event_notifier()
126             .map_err(SoundError::ClientEventNotifier)?;
127         #[derive(EventToken)]
128         enum Token {
129             ControlQAvailable,
130             EventQAvailable,
131             InterruptResample,
132             EventTriggered,
133             Kill,
134         }
135         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
136             (&self.control_queue_evt, Token::ControlQAvailable),
137             (&self.event_queue_evt, Token::EventQAvailable),
138             (&event_notifier, Token::EventTriggered),
139             (&kill_evt, Token::Kill),
140         ])
141         .map_err(SoundError::WaitCtx)?;
142 
143         if let Some(resample_evt) = self.interrupt.get_resample_evt() {
144             wait_ctx
145                 .add(resample_evt, Token::InterruptResample)
146                 .map_err(SoundError::WaitCtx)?;
147         }
148         'wait: loop {
149             let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
150 
151             for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
152                 match wait_evt.token {
153                     Token::ControlQAvailable => {
154                         self.control_queue_evt
155                             .wait()
156                             .map_err(SoundError::QueueEvt)?;
157                         self.process_controlq_buffers()?;
158                     }
159                     Token::EventQAvailable => {
160                         // Just read from the event object to make sure the producer of such events
161                         // never blocks. The buffers will only be used when actual virtio-snd
162                         // events are triggered.
163                         self.event_queue_evt.wait().map_err(SoundError::QueueEvt)?;
164                     }
165                     Token::EventTriggered => {
166                         event_notifier.wait().map_err(SoundError::QueueEvt)?;
167                         self.process_event_triggered()?;
168                     }
169                     Token::InterruptResample => {
170                         self.interrupt.interrupt_resample();
171                     }
172                     Token::Kill => {
173                         let _ = kill_evt.wait();
174                         break 'wait;
175                     }
176                 }
177             }
178         }
179         Ok(())
180     }
181 
stop_io_thread(&mut self)182     fn stop_io_thread(&mut self) {
183         if let Err(e) = self.io_kill.signal() {
184             error!(
185                 "virtio-snd: Failed to send Break msg to stream thread: {}",
186                 e
187             );
188         }
189         if let Some(th) = self.io_thread.take() {
190             match th.join() {
191                 Err(e) => {
192                     error!("virtio-snd: Panic detected on stream thread: {:?}", e);
193                 }
194                 Ok(r) => {
195                     if let Err(e) = r {
196                         error!("virtio-snd: IO thread exited with and error: {}", e);
197                     }
198                 }
199             }
200         }
201     }
202 
203     // Pops and handles all available ontrol queue buffers. Logs minor errors, but returns an
204     // Err if it encounters an unrecoverable error.
process_controlq_buffers(&mut self) -> Result<()>205     fn process_controlq_buffers(&mut self) -> Result<()> {
206         while let Some(avail_desc) = lock_pop_unlock(&self.control_queue, &self.guest_memory) {
207             let mut reader = Reader::new(self.guest_memory.clone(), avail_desc.clone())
208                 .map_err(SoundError::Descriptor)?;
209             let available_bytes = reader.available_bytes();
210             if available_bytes < std::mem::size_of::<virtio_snd_hdr>() {
211                 error!(
212                     "virtio-snd: Message received on control queue is too small: {}",
213                     available_bytes
214                 );
215                 return reply_control_op_status(
216                     VIRTIO_SND_S_BAD_MSG,
217                     avail_desc,
218                     &self.guest_memory,
219                     &self.control_queue,
220                     &self.interrupt,
221                 );
222             }
223             let mut read_buf = vec![0u8; available_bytes];
224             reader
225                 .read_exact(&mut read_buf)
226                 .map_err(SoundError::QueueIO)?;
227             let mut code: Le32 = Default::default();
228             // need to copy because the buffer may not be properly aligned
229             code.as_bytes_mut()
230                 .copy_from_slice(&read_buf[..std::mem::size_of::<Le32>()]);
231             let request_type = code.to_native();
232             match request_type {
233                 VIRTIO_SND_R_JACK_INFO => {
234                     let (code, info_vec) = {
235                         match self.parse_info_query(&read_buf) {
236                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
237                             Some((start_id, count)) => {
238                                 let end_id = start_id.saturating_add(count);
239                                 if end_id > self.vios_client.num_jacks() {
240                                     error!(
241                                         "virtio-snd: Requested info on invalid jacks ids: {}..{}",
242                                         start_id,
243                                         end_id - 1
244                                     );
245                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
246                                 } else {
247                                     (
248                                         VIRTIO_SND_S_OK,
249                                         // Safe to unwrap because we just ensured all the ids are valid
250                                         (start_id..end_id)
251                                             .map(|id| self.vios_client.jack_info(id).unwrap())
252                                             .collect(),
253                                     )
254                                 }
255                             }
256                         }
257                     };
258                     self.send_info_reply(avail_desc, code, info_vec)?;
259                 }
260                 VIRTIO_SND_R_JACK_REMAP => {
261                     let code = if read_buf.len() != std::mem::size_of::<virtio_snd_jack_remap>() {
262                         error!(
263                         "virtio-snd: The driver sent the wrong number bytes for a jack_remap struct: {}",
264                         read_buf.len()
265                         );
266                         VIRTIO_SND_S_BAD_MSG
267                     } else {
268                         let mut request: virtio_snd_jack_remap = Default::default();
269                         request.as_bytes_mut().copy_from_slice(&read_buf);
270                         let jack_id = request.hdr.jack_id.to_native();
271                         let association = request.association.to_native();
272                         let sequence = request.sequence.to_native();
273                         if let Err(e) = self.vios_client.remap_jack(jack_id, association, sequence)
274                         {
275                             error!("virtio-snd: Failed to remap jack: {}", e);
276                             vios_error_to_status_code(e)
277                         } else {
278                             VIRTIO_SND_S_OK
279                         }
280                     };
281                     let desc_index = avail_desc.index;
282                     let mut writer = Writer::new(self.guest_memory.clone(), avail_desc)
283                         .map_err(SoundError::Descriptor)?;
284                     writer
285                         .write_obj(virtio_snd_hdr {
286                             code: Le32::from(code),
287                         })
288                         .map_err(SoundError::QueueIO)?;
289                     {
290                         let mut queue_lock = self.control_queue.lock();
291                         queue_lock.add_used(
292                             &self.guest_memory,
293                             desc_index,
294                             writer.bytes_written() as u32,
295                         );
296                         queue_lock.trigger_interrupt(&self.guest_memory, &self.interrupt);
297                     }
298                 }
299                 VIRTIO_SND_R_CHMAP_INFO => {
300                     let (code, info_vec) = {
301                         match self.parse_info_query(&read_buf) {
302                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
303                             Some((start_id, count)) => {
304                                 let end_id = start_id.saturating_add(count);
305                                 if end_id > self.vios_client.num_chmaps() {
306                                     error!(
307                                         "virtio-snd: Requested info on invalid chmaps ids: {}..{}",
308                                         start_id,
309                                         end_id - 1
310                                     );
311                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
312                                 } else {
313                                     (
314                                         VIRTIO_SND_S_OK,
315                                         // Safe to unwrap because we just ensured all the ids are valid
316                                         (start_id..end_id)
317                                             .map(|id| self.vios_client.chmap_info(id).unwrap())
318                                             .collect(),
319                                     )
320                                 }
321                             }
322                         }
323                     };
324                     self.send_info_reply(avail_desc, code, info_vec)?;
325                 }
326                 VIRTIO_SND_R_PCM_INFO => {
327                     let (code, info_vec) = {
328                         match self.parse_info_query(&read_buf) {
329                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
330                             Some((start_id, count)) => {
331                                 let end_id = start_id.saturating_add(count);
332                                 if end_id > self.vios_client.num_streams() {
333                                     error!(
334                                         "virtio-snd: Requested info on invalid stream ids: {}..{}",
335                                         start_id,
336                                         end_id - 1
337                                     );
338                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
339                                 } else {
340                                     (
341                                         VIRTIO_SND_S_OK,
342                                         // Safe to unwrap because we just ensured all the ids are valid
343                                         (start_id..end_id)
344                                             .map(|id| self.vios_client.stream_info(id).unwrap())
345                                             .collect(),
346                                     )
347                                 }
348                             }
349                         }
350                     };
351                     self.send_info_reply(avail_desc, code, info_vec)?;
352                 }
353                 VIRTIO_SND_R_PCM_SET_PARAMS => self.process_set_params(avail_desc, &read_buf)?,
354                 VIRTIO_SND_R_PCM_PREPARE => {
355                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Prepare(avail_desc))?
356                 }
357                 VIRTIO_SND_R_PCM_RELEASE => {
358                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Release(avail_desc))?
359                 }
360                 VIRTIO_SND_R_PCM_START => {
361                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Start(avail_desc))?
362                 }
363                 VIRTIO_SND_R_PCM_STOP => {
364                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Stop(avail_desc))?
365                 }
366                 _ => {
367                     error!(
368                         "virtio-snd: Unknown control queue mesage code: {}",
369                         request_type
370                     );
371                     reply_control_op_status(
372                         VIRTIO_SND_S_NOT_SUPP,
373                         avail_desc,
374                         &self.guest_memory,
375                         &self.control_queue,
376                         &self.interrupt,
377                     )?;
378                 }
379             }
380         }
381         Ok(())
382     }
383 
process_event_triggered(&mut self) -> Result<()>384     fn process_event_triggered(&mut self) -> Result<()> {
385         while let Some(evt) = self.vios_client.pop_event() {
386             if let Some(desc) = self.event_queue.pop(&self.guest_memory) {
387                 let desc_index = desc.index;
388                 let mut writer =
389                     Writer::new(self.guest_memory.clone(), desc).map_err(SoundError::Descriptor)?;
390                 writer.write_obj(evt).map_err(SoundError::QueueIO)?;
391                 self.event_queue.add_used(
392                     &self.guest_memory,
393                     desc_index,
394                     writer.bytes_written() as u32,
395                 );
396                 {
397                     self.event_queue
398                         .trigger_interrupt(&self.guest_memory, &self.interrupt);
399                 }
400             } else {
401                 warn!("virtio-snd: Dropping event because there are no buffers in virtqueue");
402             }
403         }
404         Ok(())
405     }
406 
parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)>407     fn parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)> {
408         if read_buf.len() != std::mem::size_of::<virtio_snd_query_info>() {
409             error!(
410                 "virtio-snd: The driver sent the wrong number bytes for a pcm_info struct: {}",
411                 read_buf.len()
412             );
413             return None;
414         }
415         let mut query: virtio_snd_query_info = Default::default();
416         query.as_bytes_mut().copy_from_slice(read_buf);
417         let start_id = query.start_id.to_native();
418         let count = query.count.to_native();
419         Some((start_id, count))
420     }
421 
422     // Returns Err if it encounters an unrecoverable error, Ok otherwise
process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()>423     fn process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()> {
424         if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_set_params>() {
425             error!(
426                 "virtio-snd: The driver sent a buffer of the wrong size for a set_params struct: {}",
427                 read_buf.len()
428                 );
429             return reply_control_op_status(
430                 VIRTIO_SND_S_BAD_MSG,
431                 desc,
432                 &self.guest_memory,
433                 &self.control_queue,
434                 &self.interrupt,
435             );
436         }
437         let mut params: virtio_snd_pcm_set_params = Default::default();
438         params.as_bytes_mut().copy_from_slice(read_buf);
439         let stream_id = params.hdr.stream_id.to_native();
440         if stream_id < self.vios_client.num_streams() {
441             self.streams[stream_id as usize].send(StreamMsg::SetParams(desc, params))
442         } else {
443             error!(
444                 "virtio-snd: Driver requested operation on invalid stream: {}",
445                 stream_id
446             );
447             reply_control_op_status(
448                 VIRTIO_SND_S_BAD_MSG,
449                 desc,
450                 &self.guest_memory,
451                 &self.control_queue,
452                 &self.interrupt,
453             )
454         }
455     }
456 
457     // Returns Err if it encounters an unrecoverable error, Ok otherwise
try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()>458     fn try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()> {
459         if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_hdr>() {
460             error!(
461                 "virtio-snd: The driver sent a buffer too small to contain a header: {}",
462                 read_buf.len()
463             );
464             return reply_control_op_status(
465                 VIRTIO_SND_S_BAD_MSG,
466                 match msg {
467                     StreamMsg::Prepare(d)
468                     | StreamMsg::Start(d)
469                     | StreamMsg::Stop(d)
470                     | StreamMsg::Release(d) => d,
471                     _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
472                 },
473                 &self.guest_memory,
474                 &self.control_queue,
475                 &self.interrupt,
476             );
477         }
478         let mut pcm_hdr: virtio_snd_pcm_hdr = Default::default();
479         pcm_hdr.as_bytes_mut().copy_from_slice(read_buf);
480         let stream_id = pcm_hdr.stream_id.to_native();
481         if stream_id < self.vios_client.num_streams() {
482             self.streams[stream_id as usize].send(msg)
483         } else {
484             error!(
485                 "virtio-snd: Driver requested operation on invalid stream: {}",
486                 stream_id
487             );
488             reply_control_op_status(
489                 VIRTIO_SND_S_BAD_MSG,
490                 match msg {
491                     StreamMsg::Prepare(d)
492                     | StreamMsg::Start(d)
493                     | StreamMsg::Stop(d)
494                     | StreamMsg::Release(d) => d,
495                     _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
496                 },
497                 &self.guest_memory,
498                 &self.control_queue,
499                 &self.interrupt,
500             )
501         }
502     }
503 
send_info_reply<T: AsBytes>( &mut self, desc: DescriptorChain, code: u32, info_vec: Vec<T>, ) -> Result<()>504     fn send_info_reply<T: AsBytes>(
505         &mut self,
506         desc: DescriptorChain,
507         code: u32,
508         info_vec: Vec<T>,
509     ) -> Result<()> {
510         let desc_index = desc.index;
511         let mut writer =
512             Writer::new(self.guest_memory.clone(), desc).map_err(SoundError::Descriptor)?;
513         writer
514             .write_obj(virtio_snd_hdr {
515                 code: Le32::from(code),
516             })
517             .map_err(SoundError::QueueIO)?;
518         for info in info_vec {
519             writer.write_obj(info).map_err(SoundError::QueueIO)?;
520         }
521         {
522             let mut queue_lock = self.control_queue.lock();
523             queue_lock.add_used(
524                 &self.guest_memory,
525                 desc_index,
526                 writer.bytes_written() as u32,
527             );
528             queue_lock.trigger_interrupt(&self.guest_memory, &self.interrupt);
529         }
530         Ok(())
531     }
532 }
533 
534 impl Drop for Worker {
drop(&mut self)535     fn drop(&mut self) {
536         self.stop_io_thread();
537     }
538 }
539 
io_loop( interrupt: Interrupt, guest_memory: GuestMemory, tx_queue: Arc<Mutex<Queue>>, tx_queue_evt: Event, rx_queue: Arc<Mutex<Queue>>, rx_queue_evt: Event, senders: Vec<Sender<StreamMsg>>, kill_evt: Event, ) -> Result<()>540 fn io_loop(
541     interrupt: Interrupt,
542     guest_memory: GuestMemory,
543     tx_queue: Arc<Mutex<Queue>>,
544     tx_queue_evt: Event,
545     rx_queue: Arc<Mutex<Queue>>,
546     rx_queue_evt: Event,
547     senders: Vec<Sender<StreamMsg>>,
548     kill_evt: Event,
549 ) -> Result<()> {
550     #[derive(EventToken)]
551     enum Token {
552         TxQAvailable,
553         RxQAvailable,
554         Kill,
555     }
556     let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
557         (&tx_queue_evt, Token::TxQAvailable),
558         (&rx_queue_evt, Token::RxQAvailable),
559         (&kill_evt, Token::Kill),
560     ])
561     .map_err(SoundError::WaitCtx)?;
562 
563     'wait: loop {
564         let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
565         for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
566             let queue = match wait_evt.token {
567                 Token::TxQAvailable => {
568                     tx_queue_evt.wait().map_err(SoundError::QueueEvt)?;
569                     &tx_queue
570                 }
571                 Token::RxQAvailable => {
572                     rx_queue_evt.wait().map_err(SoundError::QueueEvt)?;
573                     &rx_queue
574                 }
575                 Token::Kill => {
576                     let _ = kill_evt.wait();
577                     break 'wait;
578                 }
579             };
580             while let Some(avail_desc) = lock_pop_unlock(queue, &guest_memory) {
581                 let mut reader = Reader::new(guest_memory.clone(), avail_desc.clone())
582                     .map_err(SoundError::Descriptor)?;
583                 let xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(SoundError::QueueIO)?;
584                 let stream_id = xfer.stream_id.to_native();
585                 if stream_id as usize >= senders.len() {
586                     error!(
587                         "virtio-snd: Driver sent buffer for invalid stream: {}",
588                         stream_id
589                     );
590                     reply_pcm_buffer_status(
591                         VIRTIO_SND_S_IO_ERR,
592                         0,
593                         avail_desc,
594                         &guest_memory,
595                         queue,
596                         &interrupt,
597                     )?;
598                 } else {
599                     StreamProxy::send_msg(
600                         &senders[stream_id as usize],
601                         StreamMsg::Buffer(avail_desc),
602                     )?;
603                 }
604             }
605         }
606     }
607     Ok(())
608 }
609 
610 // If queue.lock().pop() is used directly in the condition of a 'while' loop the lock is held over
611 // the entire loop block. Encapsulating it in this fuction guarantees that the lock is dropped
612 // immediately after pop() is called, which allows the code to remain somewhat simpler.
lock_pop_unlock( queue: &Arc<Mutex<Queue>>, guest_memory: &GuestMemory, ) -> Option<DescriptorChain>613 fn lock_pop_unlock(
614     queue: &Arc<Mutex<Queue>>,
615     guest_memory: &GuestMemory,
616 ) -> Option<DescriptorChain> {
617     queue.lock().pop(guest_memory)
618 }
619