• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io::Read;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9 
10 use base::error;
11 use base::warn;
12 use base::Event;
13 use base::EventToken;
14 use base::WaitContext;
15 use data_model::Le32;
16 use sync::Mutex;
17 use zerocopy::Immutable;
18 use zerocopy::IntoBytes;
19 
20 use super::super::constants::*;
21 use super::super::layout::*;
22 use super::streams::*;
23 use super::Result;
24 use super::SoundError;
25 use super::*;
26 use crate::virtio::DescriptorChain;
27 use crate::virtio::Queue;
28 
29 pub struct Worker {
30     // Lock order: Must never hold more than one queue lock at the same time.
31     pub control_queue: Arc<Mutex<Queue>>,
32     pub event_queue: Option<Queue>,
33     vios_client: Arc<Mutex<VioSClient>>,
34     streams: Vec<StreamProxy>,
35     pub tx_queue: Arc<Mutex<Queue>>,
36     pub rx_queue: Arc<Mutex<Queue>>,
37     io_thread: Option<thread::JoinHandle<Result<()>>>,
38     io_kill: Event,
39     // saved_stream_state holds the previous state of streams. When the sound device is newly
40     // created, this will be empty. It will only contain state if the sound device is put to sleep
41     // OR if we restore a VM.
42     pub saved_stream_state: Vec<StreamSnapshot>,
43 }
44 
45 impl Worker {
46     /// Creates a new virtio-snd worker.
try_new( vios_client: Arc<Mutex<VioSClient>>, control_queue: Arc<Mutex<Queue>>, event_queue: Queue, tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, saved_stream_state: Vec<StreamSnapshot>, ) -> Result<Worker>47     pub fn try_new(
48         vios_client: Arc<Mutex<VioSClient>>,
49         control_queue: Arc<Mutex<Queue>>,
50         event_queue: Queue,
51         tx_queue: Arc<Mutex<Queue>>,
52         rx_queue: Arc<Mutex<Queue>>,
53         saved_stream_state: Vec<StreamSnapshot>,
54     ) -> Result<Worker> {
55         let num_streams = vios_client.lock().num_streams();
56         let mut streams: Vec<StreamProxy> = Vec::with_capacity(num_streams as usize);
57         {
58             for stream_id in 0..num_streams {
59                 let capture = vios_client
60                     .lock()
61                     .stream_info(stream_id)
62                     .map(|i| i.direction == VIRTIO_SND_D_INPUT)
63                     .unwrap_or(false);
64                 let io_queue = if capture { &rx_queue } else { &tx_queue };
65                 streams.push(Stream::try_new(
66                     stream_id,
67                     vios_client.clone(),
68                     control_queue.clone(),
69                     io_queue.clone(),
70                     capture,
71                     saved_stream_state.get(stream_id as usize).cloned(),
72                 )?);
73             }
74         }
75         let (self_kill_io, kill_io) = Event::new()
76             .and_then(|e| Ok((e.try_clone()?, e)))
77             .map_err(SoundError::CreateEvent)?;
78 
79         let senders: Vec<Sender<Box<StreamMsg>>> =
80             streams.iter().map(|sp| sp.msg_sender().clone()).collect();
81         let tx_queue_thread = tx_queue.clone();
82         let rx_queue_thread = rx_queue.clone();
83         let io_thread = thread::Builder::new()
84             .name("v_snd_io".to_string())
85             .spawn(move || {
86                 try_set_real_time_priority();
87 
88                 io_loop(tx_queue_thread, rx_queue_thread, senders, kill_io)
89             })
90             .map_err(SoundError::CreateThread)?;
91         Ok(Worker {
92             control_queue,
93             event_queue: Some(event_queue),
94             vios_client,
95             streams,
96             tx_queue,
97             rx_queue,
98             io_thread: Some(io_thread),
99             io_kill: self_kill_io,
100             saved_stream_state: Vec::new(),
101         })
102     }
103 
104     /// Emulates the virtio-snd device. It won't return until something is written to the kill_evt
105     /// event or an unrecoverable error occurs.
control_loop(&mut self, kill_evt: Event) -> Result<()>106     pub fn control_loop(&mut self, kill_evt: Event) -> Result<()> {
107         let event_notifier = self
108             .vios_client
109             .lock()
110             .get_event_notifier()
111             .map_err(SoundError::ClientEventNotifier)?;
112         #[derive(EventToken)]
113         enum Token {
114             ControlQAvailable,
115             EventQAvailable,
116             EventTriggered,
117             Kill,
118         }
119         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
120             (self.control_queue.lock().event(), Token::ControlQAvailable),
121             (
122                 self.event_queue.as_ref().expect("queue missing").event(),
123                 Token::EventQAvailable,
124             ),
125             (&event_notifier, Token::EventTriggered),
126             (&kill_evt, Token::Kill),
127         ])
128         .map_err(SoundError::WaitCtx)?;
129 
130         let mut event_queue = self.event_queue.take().expect("event_queue missing");
131         'wait: loop {
132             let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
133 
134             for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
135                 match wait_evt.token {
136                     Token::ControlQAvailable => {
137                         self.control_queue
138                             .lock()
139                             .event()
140                             .wait()
141                             .map_err(SoundError::QueueEvt)?;
142                         self.process_controlq_buffers()?;
143                     }
144                     Token::EventQAvailable => {
145                         // Just read from the event object to make sure the producer of such events
146                         // never blocks. The buffers will only be used when actual virtio-snd
147                         // events are triggered.
148                         event_queue.event().wait().map_err(SoundError::QueueEvt)?;
149                     }
150                     Token::EventTriggered => {
151                         event_notifier.wait().map_err(SoundError::QueueEvt)?;
152                         self.process_event_triggered(&mut event_queue)?;
153                     }
154                     Token::Kill => {
155                         let _ = kill_evt.wait();
156                         break 'wait;
157                     }
158                 }
159             }
160         }
161         self.saved_stream_state = self
162             .streams
163             .drain(..)
164             .map(|stream| stream.stop_thread())
165             .collect();
166         self.event_queue = Some(event_queue);
167         Ok(())
168     }
169 
stop_io_thread(&mut self)170     fn stop_io_thread(&mut self) {
171         if let Err(e) = self.io_kill.signal() {
172             error!(
173                 "virtio-snd: Failed to send Break msg to stream thread: {}",
174                 e
175             );
176         }
177         if let Some(th) = self.io_thread.take() {
178             match th.join() {
179                 Err(e) => {
180                     error!("virtio-snd: Panic detected on stream thread: {:?}", e);
181                 }
182                 Ok(r) => {
183                     if let Err(e) = r {
184                         error!("virtio-snd: IO thread exited with and error: {}", e);
185                     }
186                 }
187             }
188         }
189     }
190 
191     // Pops and handles all available ontrol queue buffers. Logs minor errors, but returns an
192     // Err if it encounters an unrecoverable error.
process_controlq_buffers(&mut self) -> Result<()>193     fn process_controlq_buffers(&mut self) -> Result<()> {
194         while let Some(mut avail_desc) = lock_pop_unlock(&self.control_queue) {
195             let reader = &mut avail_desc.reader;
196             let available_bytes = reader.available_bytes();
197             let Ok(hdr) = reader.peek_obj::<virtio_snd_hdr>() else {
198                 error!(
199                     "virtio-snd: Message received on control queue is too small: {}",
200                     available_bytes
201                 );
202                 return reply_control_op_status(
203                     VIRTIO_SND_S_BAD_MSG,
204                     avail_desc,
205                     &self.control_queue,
206                 );
207             };
208             let mut read_buf = vec![0u8; available_bytes];
209             reader
210                 .read_exact(&mut read_buf)
211                 .map_err(SoundError::QueueIO)?;
212             let request_type = hdr.code.to_native();
213             match request_type {
214                 VIRTIO_SND_R_JACK_INFO => {
215                     let (code, info_vec) = {
216                         match self.parse_info_query(&read_buf) {
217                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
218                             Some((start_id, count)) => {
219                                 let end_id = start_id.saturating_add(count);
220                                 if end_id > self.vios_client.lock().num_jacks() {
221                                     error!(
222                                         "virtio-snd: Requested info on invalid jacks ids: {}..{}",
223                                         start_id,
224                                         end_id - 1
225                                     );
226                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
227                                 } else {
228                                     (
229                                         VIRTIO_SND_S_OK,
230                                         // Safe to unwrap because we just ensured all the ids are
231                                         // valid
232                                         (start_id..end_id)
233                                             .map(|id| {
234                                                 self.vios_client.lock().jack_info(id).unwrap()
235                                             })
236                                             .collect(),
237                                     )
238                                 }
239                             }
240                         }
241                     };
242                     self.send_info_reply(avail_desc, code, info_vec)?;
243                 }
244                 VIRTIO_SND_R_JACK_REMAP => {
245                     let code = if read_buf.len() != std::mem::size_of::<virtio_snd_jack_remap>() {
246                         error!(
247                         "virtio-snd: The driver sent the wrong number bytes for a jack_remap struct: {}",
248                         read_buf.len()
249                         );
250                         VIRTIO_SND_S_BAD_MSG
251                     } else {
252                         let mut request: virtio_snd_jack_remap = Default::default();
253                         request.as_mut_bytes().copy_from_slice(&read_buf);
254                         let jack_id = request.hdr.jack_id.to_native();
255                         let association = request.association.to_native();
256                         let sequence = request.sequence.to_native();
257                         if let Err(e) =
258                             self.vios_client
259                                 .lock()
260                                 .remap_jack(jack_id, association, sequence)
261                         {
262                             error!("virtio-snd: Failed to remap jack: {}", e);
263                             vios_error_to_status_code(e)
264                         } else {
265                             VIRTIO_SND_S_OK
266                         }
267                     };
268                     let writer = &mut avail_desc.writer;
269                     writer
270                         .write_obj(virtio_snd_hdr {
271                             code: Le32::from(code),
272                         })
273                         .map_err(SoundError::QueueIO)?;
274                     let len = writer.bytes_written() as u32;
275                     {
276                         let mut queue_lock = self.control_queue.lock();
277                         queue_lock.add_used(avail_desc, len);
278                         queue_lock.trigger_interrupt();
279                     }
280                 }
281                 VIRTIO_SND_R_CHMAP_INFO => {
282                     let (code, info_vec) = {
283                         match self.parse_info_query(&read_buf) {
284                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
285                             Some((start_id, count)) => {
286                                 let end_id = start_id.saturating_add(count);
287                                 let num_chmaps = self.vios_client.lock().num_chmaps();
288                                 if end_id > num_chmaps {
289                                     error!(
290                                         "virtio-snd: Requested info on invalid chmaps ids: {}..{}",
291                                         start_id,
292                                         end_id - 1
293                                     );
294                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
295                                 } else {
296                                     (
297                                         VIRTIO_SND_S_OK,
298                                         // Safe to unwrap because we just ensured all the ids are
299                                         // valid
300                                         (start_id..end_id)
301                                             .map(|id| {
302                                                 self.vios_client.lock().chmap_info(id).unwrap()
303                                             })
304                                             .collect(),
305                                     )
306                                 }
307                             }
308                         }
309                     };
310                     self.send_info_reply(avail_desc, code, info_vec)?;
311                 }
312                 VIRTIO_SND_R_PCM_INFO => {
313                     let (code, info_vec) = {
314                         match self.parse_info_query(&read_buf) {
315                             None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
316                             Some((start_id, count)) => {
317                                 let end_id = start_id.saturating_add(count);
318                                 if end_id > self.vios_client.lock().num_streams() {
319                                     error!(
320                                         "virtio-snd: Requested info on invalid stream ids: {}..{}",
321                                         start_id,
322                                         end_id - 1
323                                     );
324                                     (VIRTIO_SND_S_NOT_SUPP, Vec::new())
325                                 } else {
326                                     (
327                                         VIRTIO_SND_S_OK,
328                                         // Safe to unwrap because we just ensured all the ids are
329                                         // valid
330                                         (start_id..end_id)
331                                             .map(|id| {
332                                                 self.vios_client.lock().stream_info(id).unwrap()
333                                             })
334                                             .collect(),
335                                     )
336                                 }
337                             }
338                         }
339                     };
340                     self.send_info_reply(avail_desc, code, info_vec)?;
341                 }
342                 VIRTIO_SND_R_PCM_SET_PARAMS => self.process_set_params(avail_desc, &read_buf)?,
343                 VIRTIO_SND_R_PCM_PREPARE => {
344                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Prepare(avail_desc))?
345                 }
346                 VIRTIO_SND_R_PCM_RELEASE => {
347                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Release(avail_desc))?
348                 }
349                 VIRTIO_SND_R_PCM_START => {
350                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Start(avail_desc))?
351                 }
352                 VIRTIO_SND_R_PCM_STOP => {
353                     self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Stop(avail_desc))?
354                 }
355                 _ => {
356                     error!(
357                         "virtio-snd: Unknown control queue mesage code: {}",
358                         request_type
359                     );
360                     reply_control_op_status(
361                         VIRTIO_SND_S_NOT_SUPP,
362                         avail_desc,
363                         &self.control_queue,
364                     )?;
365                 }
366             }
367         }
368         Ok(())
369     }
370 
process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()>371     fn process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()> {
372         while let Some(evt) = self.vios_client.lock().pop_event() {
373             if let Some(mut desc) = event_queue.pop() {
374                 let writer = &mut desc.writer;
375                 writer.write_obj(evt).map_err(SoundError::QueueIO)?;
376                 let len = writer.bytes_written() as u32;
377                 event_queue.add_used(desc, len);
378                 event_queue.trigger_interrupt();
379             } else {
380                 warn!("virtio-snd: Dropping event because there are no buffers in virtqueue");
381             }
382         }
383         Ok(())
384     }
385 
parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)>386     fn parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)> {
387         if read_buf.len() != std::mem::size_of::<virtio_snd_query_info>() {
388             error!(
389                 "virtio-snd: The driver sent the wrong number bytes for a pcm_info struct: {}",
390                 read_buf.len()
391             );
392             return None;
393         }
394         let mut query: virtio_snd_query_info = Default::default();
395         query.as_mut_bytes().copy_from_slice(read_buf);
396         let start_id = query.start_id.to_native();
397         let count = query.count.to_native();
398         Some((start_id, count))
399     }
400 
401     // Returns Err if it encounters an unrecoverable error, Ok otherwise
process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()>402     fn process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()> {
403         if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_set_params>() {
404             error!(
405                 "virtio-snd: The driver sent a buffer of the wrong size for a set_params struct: {}",
406                 read_buf.len()
407                 );
408             return reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue);
409         }
410         let mut params: virtio_snd_pcm_set_params = Default::default();
411         params.as_mut_bytes().copy_from_slice(read_buf);
412         let stream_id = params.hdr.stream_id.to_native();
413         if stream_id < self.vios_client.lock().num_streams() {
414             self.streams[stream_id as usize].send(StreamMsg::SetParams(desc, params))
415         } else {
416             error!(
417                 "virtio-snd: Driver requested operation on invalid stream: {}",
418                 stream_id
419             );
420             reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue)
421         }
422     }
423 
424     // Returns Err if it encounters an unrecoverable error, Ok otherwise
try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()>425     fn try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()> {
426         if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_hdr>() {
427             error!(
428                 "virtio-snd: The driver sent a buffer too small to contain a header: {}",
429                 read_buf.len()
430             );
431             return reply_control_op_status(
432                 VIRTIO_SND_S_BAD_MSG,
433                 match msg {
434                     StreamMsg::Prepare(d)
435                     | StreamMsg::Start(d)
436                     | StreamMsg::Stop(d)
437                     | StreamMsg::Release(d) => d,
438                     _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
439                 },
440                 &self.control_queue,
441             );
442         }
443         let mut pcm_hdr: virtio_snd_pcm_hdr = Default::default();
444         pcm_hdr.as_mut_bytes().copy_from_slice(read_buf);
445         let stream_id = pcm_hdr.stream_id.to_native();
446         if stream_id < self.vios_client.lock().num_streams() {
447             self.streams[stream_id as usize].send(msg)
448         } else {
449             error!(
450                 "virtio-snd: Driver requested operation on invalid stream: {}",
451                 stream_id
452             );
453             reply_control_op_status(
454                 VIRTIO_SND_S_BAD_MSG,
455                 match msg {
456                     StreamMsg::Prepare(d)
457                     | StreamMsg::Start(d)
458                     | StreamMsg::Stop(d)
459                     | StreamMsg::Release(d) => d,
460                     _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
461                 },
462                 &self.control_queue,
463             )
464         }
465     }
466 
send_info_reply<T: Immutable + IntoBytes>( &mut self, mut desc: DescriptorChain, code: u32, info_vec: Vec<T>, ) -> Result<()>467     fn send_info_reply<T: Immutable + IntoBytes>(
468         &mut self,
469         mut desc: DescriptorChain,
470         code: u32,
471         info_vec: Vec<T>,
472     ) -> Result<()> {
473         let writer = &mut desc.writer;
474         writer
475             .write_obj(virtio_snd_hdr {
476                 code: Le32::from(code),
477             })
478             .map_err(SoundError::QueueIO)?;
479         for info in info_vec {
480             writer.write_obj(info).map_err(SoundError::QueueIO)?;
481         }
482         let len = writer.bytes_written() as u32;
483         {
484             let mut queue_lock = self.control_queue.lock();
485             queue_lock.add_used(desc, len);
486             queue_lock.trigger_interrupt();
487         }
488         Ok(())
489     }
490 }
491 
492 impl Drop for Worker {
drop(&mut self)493     fn drop(&mut self) {
494         self.stop_io_thread();
495     }
496 }
497 
io_loop( tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, senders: Vec<Sender<Box<StreamMsg>>>, kill_evt: Event, ) -> Result<()>498 fn io_loop(
499     tx_queue: Arc<Mutex<Queue>>,
500     rx_queue: Arc<Mutex<Queue>>,
501     senders: Vec<Sender<Box<StreamMsg>>>,
502     kill_evt: Event,
503 ) -> Result<()> {
504     #[derive(EventToken)]
505     enum Token {
506         TxQAvailable,
507         RxQAvailable,
508         Kill,
509     }
510     let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
511         (tx_queue.lock().event(), Token::TxQAvailable),
512         (rx_queue.lock().event(), Token::RxQAvailable),
513         (&kill_evt, Token::Kill),
514     ])
515     .map_err(SoundError::WaitCtx)?;
516 
517     'wait: loop {
518         let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
519         for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
520             let queue = match wait_evt.token {
521                 Token::TxQAvailable => {
522                     tx_queue
523                         .lock()
524                         .event()
525                         .wait()
526                         .map_err(SoundError::QueueEvt)?;
527                     &tx_queue
528                 }
529                 Token::RxQAvailable => {
530                     rx_queue
531                         .lock()
532                         .event()
533                         .wait()
534                         .map_err(SoundError::QueueEvt)?;
535                     &rx_queue
536                 }
537                 Token::Kill => {
538                     let _ = kill_evt.wait();
539                     break 'wait;
540                 }
541             };
542             while let Some(mut avail_desc) = lock_pop_unlock(queue) {
543                 let reader = &mut avail_desc.reader;
544                 let xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(SoundError::QueueIO)?;
545                 let stream_id = xfer.stream_id.to_native();
546                 if stream_id as usize >= senders.len() {
547                     error!(
548                         "virtio-snd: Driver sent buffer for invalid stream: {}",
549                         stream_id
550                     );
551                     reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, avail_desc, queue)?;
552                 } else {
553                     StreamProxy::send_msg(
554                         &senders[stream_id as usize],
555                         StreamMsg::Buffer(avail_desc),
556                     )?;
557                 }
558             }
559         }
560     }
561     Ok(())
562 }
563 
564 // If queue.lock().pop() is used directly in the condition of a 'while' loop the lock is held over
565 // the entire loop block. Encapsulating it in this fuction guarantees that the lock is dropped
566 // immediately after pop() is called, which allows the code to remain somewhat simpler.
lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain>567 fn lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain> {
568     queue.lock().pop()
569 }
570