1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::io::Read;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9
10 use base::error;
11 use base::warn;
12 use base::Event;
13 use base::EventToken;
14 use base::WaitContext;
15 use data_model::Le32;
16 use sync::Mutex;
17 use vm_memory::GuestMemory;
18 use zerocopy::AsBytes;
19
20 use super::super::constants::*;
21 use super::super::layout::*;
22 use super::streams::*;
23 use super::Result;
24 use super::SoundError;
25 use super::*;
26 use crate::virtio::DescriptorChain;
27 use crate::virtio::Interrupt;
28 use crate::virtio::Queue;
29 use crate::virtio::Reader;
30 use crate::virtio::SignalableInterrupt;
31 use crate::virtio::Writer;
32
33 pub struct Worker {
34 // Lock order: Must never hold more than one queue lock at the same time.
35 interrupt: Interrupt,
36 control_queue: Arc<Mutex<Queue>>,
37 control_queue_evt: Event,
38 event_queue: Queue,
39 event_queue_evt: Event,
40 guest_memory: GuestMemory,
41 vios_client: Arc<VioSClient>,
42 streams: Vec<StreamProxy>,
43 io_thread: Option<thread::JoinHandle<Result<()>>>,
44 io_kill: Event,
45 }
46
47 impl Worker {
48 /// Creates a new virtio-snd worker.
try_new( vios_client: Arc<VioSClient>, interrupt: Interrupt, guest_memory: GuestMemory, control_queue: Arc<Mutex<Queue>>, control_queue_evt: Event, event_queue: Queue, event_queue_evt: Event, tx_queue: Arc<Mutex<Queue>>, tx_queue_evt: Event, rx_queue: Arc<Mutex<Queue>>, rx_queue_evt: Event, ) -> Result<Worker>49 pub fn try_new(
50 vios_client: Arc<VioSClient>,
51 interrupt: Interrupt,
52 guest_memory: GuestMemory,
53 control_queue: Arc<Mutex<Queue>>,
54 control_queue_evt: Event,
55 event_queue: Queue,
56 event_queue_evt: Event,
57 tx_queue: Arc<Mutex<Queue>>,
58 tx_queue_evt: Event,
59 rx_queue: Arc<Mutex<Queue>>,
60 rx_queue_evt: Event,
61 ) -> Result<Worker> {
62 let mut streams: Vec<StreamProxy> = Vec::with_capacity(vios_client.num_streams() as usize);
63 {
64 for stream_id in 0..vios_client.num_streams() {
65 let capture = vios_client
66 .stream_info(stream_id)
67 .map(|i| i.direction == VIRTIO_SND_D_INPUT)
68 .unwrap_or(false);
69 let io_queue = if capture { &rx_queue } else { &tx_queue };
70 streams.push(Stream::try_new(
71 stream_id,
72 vios_client.clone(),
73 guest_memory.clone(),
74 interrupt.clone(),
75 control_queue.clone(),
76 io_queue.clone(),
77 capture,
78 )?);
79 }
80 }
81 let (self_kill_io, kill_io) = Event::new()
82 .and_then(|e| Ok((e.try_clone()?, e)))
83 .map_err(SoundError::CreateEvent)?;
84
85 let interrupt_clone = interrupt.clone();
86 let guest_memory_clone = guest_memory.clone();
87 let senders: Vec<Sender<StreamMsg>> =
88 streams.iter().map(|sp| sp.msg_sender().clone()).collect();
89 let io_thread = thread::Builder::new()
90 .name("v_snd_io".to_string())
91 .spawn(move || {
92 try_set_real_time_priority();
93
94 io_loop(
95 interrupt_clone,
96 guest_memory_clone,
97 tx_queue,
98 tx_queue_evt,
99 rx_queue,
100 rx_queue_evt,
101 senders,
102 kill_io,
103 )
104 })
105 .map_err(SoundError::CreateThread)?;
106 Ok(Worker {
107 interrupt,
108 control_queue,
109 control_queue_evt,
110 event_queue,
111 event_queue_evt,
112 guest_memory,
113 vios_client,
114 streams,
115 io_thread: Some(io_thread),
116 io_kill: self_kill_io,
117 })
118 }
119
120 /// Emulates the virtio-snd device. It won't return until something is written to the kill_evt
121 /// event or an unrecoverable error occurs.
control_loop(&mut self, kill_evt: Event) -> Result<()>122 pub fn control_loop(&mut self, kill_evt: Event) -> Result<()> {
123 let event_notifier = self
124 .vios_client
125 .get_event_notifier()
126 .map_err(SoundError::ClientEventNotifier)?;
127 #[derive(EventToken)]
128 enum Token {
129 ControlQAvailable,
130 EventQAvailable,
131 InterruptResample,
132 EventTriggered,
133 Kill,
134 }
135 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
136 (&self.control_queue_evt, Token::ControlQAvailable),
137 (&self.event_queue_evt, Token::EventQAvailable),
138 (&event_notifier, Token::EventTriggered),
139 (&kill_evt, Token::Kill),
140 ])
141 .map_err(SoundError::WaitCtx)?;
142
143 if let Some(resample_evt) = self.interrupt.get_resample_evt() {
144 wait_ctx
145 .add(resample_evt, Token::InterruptResample)
146 .map_err(SoundError::WaitCtx)?;
147 }
148 'wait: loop {
149 let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
150
151 for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
152 match wait_evt.token {
153 Token::ControlQAvailable => {
154 self.control_queue_evt
155 .wait()
156 .map_err(SoundError::QueueEvt)?;
157 self.process_controlq_buffers()?;
158 }
159 Token::EventQAvailable => {
160 // Just read from the event object to make sure the producer of such events
161 // never blocks. The buffers will only be used when actual virtio-snd
162 // events are triggered.
163 self.event_queue_evt.wait().map_err(SoundError::QueueEvt)?;
164 }
165 Token::EventTriggered => {
166 event_notifier.wait().map_err(SoundError::QueueEvt)?;
167 self.process_event_triggered()?;
168 }
169 Token::InterruptResample => {
170 self.interrupt.interrupt_resample();
171 }
172 Token::Kill => {
173 let _ = kill_evt.wait();
174 break 'wait;
175 }
176 }
177 }
178 }
179 Ok(())
180 }
181
stop_io_thread(&mut self)182 fn stop_io_thread(&mut self) {
183 if let Err(e) = self.io_kill.signal() {
184 error!(
185 "virtio-snd: Failed to send Break msg to stream thread: {}",
186 e
187 );
188 }
189 if let Some(th) = self.io_thread.take() {
190 match th.join() {
191 Err(e) => {
192 error!("virtio-snd: Panic detected on stream thread: {:?}", e);
193 }
194 Ok(r) => {
195 if let Err(e) = r {
196 error!("virtio-snd: IO thread exited with and error: {}", e);
197 }
198 }
199 }
200 }
201 }
202
203 // Pops and handles all available ontrol queue buffers. Logs minor errors, but returns an
204 // Err if it encounters an unrecoverable error.
process_controlq_buffers(&mut self) -> Result<()>205 fn process_controlq_buffers(&mut self) -> Result<()> {
206 while let Some(avail_desc) = lock_pop_unlock(&self.control_queue, &self.guest_memory) {
207 let mut reader = Reader::new(self.guest_memory.clone(), avail_desc.clone())
208 .map_err(SoundError::Descriptor)?;
209 let available_bytes = reader.available_bytes();
210 if available_bytes < std::mem::size_of::<virtio_snd_hdr>() {
211 error!(
212 "virtio-snd: Message received on control queue is too small: {}",
213 available_bytes
214 );
215 return reply_control_op_status(
216 VIRTIO_SND_S_BAD_MSG,
217 avail_desc,
218 &self.guest_memory,
219 &self.control_queue,
220 &self.interrupt,
221 );
222 }
223 let mut read_buf = vec![0u8; available_bytes];
224 reader
225 .read_exact(&mut read_buf)
226 .map_err(SoundError::QueueIO)?;
227 let mut code: Le32 = Default::default();
228 // need to copy because the buffer may not be properly aligned
229 code.as_bytes_mut()
230 .copy_from_slice(&read_buf[..std::mem::size_of::<Le32>()]);
231 let request_type = code.to_native();
232 match request_type {
233 VIRTIO_SND_R_JACK_INFO => {
234 let (code, info_vec) = {
235 match self.parse_info_query(&read_buf) {
236 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
237 Some((start_id, count)) => {
238 let end_id = start_id.saturating_add(count);
239 if end_id > self.vios_client.num_jacks() {
240 error!(
241 "virtio-snd: Requested info on invalid jacks ids: {}..{}",
242 start_id,
243 end_id - 1
244 );
245 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
246 } else {
247 (
248 VIRTIO_SND_S_OK,
249 // Safe to unwrap because we just ensured all the ids are valid
250 (start_id..end_id)
251 .map(|id| self.vios_client.jack_info(id).unwrap())
252 .collect(),
253 )
254 }
255 }
256 }
257 };
258 self.send_info_reply(avail_desc, code, info_vec)?;
259 }
260 VIRTIO_SND_R_JACK_REMAP => {
261 let code = if read_buf.len() != std::mem::size_of::<virtio_snd_jack_remap>() {
262 error!(
263 "virtio-snd: The driver sent the wrong number bytes for a jack_remap struct: {}",
264 read_buf.len()
265 );
266 VIRTIO_SND_S_BAD_MSG
267 } else {
268 let mut request: virtio_snd_jack_remap = Default::default();
269 request.as_bytes_mut().copy_from_slice(&read_buf);
270 let jack_id = request.hdr.jack_id.to_native();
271 let association = request.association.to_native();
272 let sequence = request.sequence.to_native();
273 if let Err(e) = self.vios_client.remap_jack(jack_id, association, sequence)
274 {
275 error!("virtio-snd: Failed to remap jack: {}", e);
276 vios_error_to_status_code(e)
277 } else {
278 VIRTIO_SND_S_OK
279 }
280 };
281 let desc_index = avail_desc.index;
282 let mut writer = Writer::new(self.guest_memory.clone(), avail_desc)
283 .map_err(SoundError::Descriptor)?;
284 writer
285 .write_obj(virtio_snd_hdr {
286 code: Le32::from(code),
287 })
288 .map_err(SoundError::QueueIO)?;
289 {
290 let mut queue_lock = self.control_queue.lock();
291 queue_lock.add_used(
292 &self.guest_memory,
293 desc_index,
294 writer.bytes_written() as u32,
295 );
296 queue_lock.trigger_interrupt(&self.guest_memory, &self.interrupt);
297 }
298 }
299 VIRTIO_SND_R_CHMAP_INFO => {
300 let (code, info_vec) = {
301 match self.parse_info_query(&read_buf) {
302 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
303 Some((start_id, count)) => {
304 let end_id = start_id.saturating_add(count);
305 if end_id > self.vios_client.num_chmaps() {
306 error!(
307 "virtio-snd: Requested info on invalid chmaps ids: {}..{}",
308 start_id,
309 end_id - 1
310 );
311 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
312 } else {
313 (
314 VIRTIO_SND_S_OK,
315 // Safe to unwrap because we just ensured all the ids are valid
316 (start_id..end_id)
317 .map(|id| self.vios_client.chmap_info(id).unwrap())
318 .collect(),
319 )
320 }
321 }
322 }
323 };
324 self.send_info_reply(avail_desc, code, info_vec)?;
325 }
326 VIRTIO_SND_R_PCM_INFO => {
327 let (code, info_vec) = {
328 match self.parse_info_query(&read_buf) {
329 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
330 Some((start_id, count)) => {
331 let end_id = start_id.saturating_add(count);
332 if end_id > self.vios_client.num_streams() {
333 error!(
334 "virtio-snd: Requested info on invalid stream ids: {}..{}",
335 start_id,
336 end_id - 1
337 );
338 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
339 } else {
340 (
341 VIRTIO_SND_S_OK,
342 // Safe to unwrap because we just ensured all the ids are valid
343 (start_id..end_id)
344 .map(|id| self.vios_client.stream_info(id).unwrap())
345 .collect(),
346 )
347 }
348 }
349 }
350 };
351 self.send_info_reply(avail_desc, code, info_vec)?;
352 }
353 VIRTIO_SND_R_PCM_SET_PARAMS => self.process_set_params(avail_desc, &read_buf)?,
354 VIRTIO_SND_R_PCM_PREPARE => {
355 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Prepare(avail_desc))?
356 }
357 VIRTIO_SND_R_PCM_RELEASE => {
358 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Release(avail_desc))?
359 }
360 VIRTIO_SND_R_PCM_START => {
361 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Start(avail_desc))?
362 }
363 VIRTIO_SND_R_PCM_STOP => {
364 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Stop(avail_desc))?
365 }
366 _ => {
367 error!(
368 "virtio-snd: Unknown control queue mesage code: {}",
369 request_type
370 );
371 reply_control_op_status(
372 VIRTIO_SND_S_NOT_SUPP,
373 avail_desc,
374 &self.guest_memory,
375 &self.control_queue,
376 &self.interrupt,
377 )?;
378 }
379 }
380 }
381 Ok(())
382 }
383
process_event_triggered(&mut self) -> Result<()>384 fn process_event_triggered(&mut self) -> Result<()> {
385 while let Some(evt) = self.vios_client.pop_event() {
386 if let Some(desc) = self.event_queue.pop(&self.guest_memory) {
387 let desc_index = desc.index;
388 let mut writer =
389 Writer::new(self.guest_memory.clone(), desc).map_err(SoundError::Descriptor)?;
390 writer.write_obj(evt).map_err(SoundError::QueueIO)?;
391 self.event_queue.add_used(
392 &self.guest_memory,
393 desc_index,
394 writer.bytes_written() as u32,
395 );
396 {
397 self.event_queue
398 .trigger_interrupt(&self.guest_memory, &self.interrupt);
399 }
400 } else {
401 warn!("virtio-snd: Dropping event because there are no buffers in virtqueue");
402 }
403 }
404 Ok(())
405 }
406
parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)>407 fn parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)> {
408 if read_buf.len() != std::mem::size_of::<virtio_snd_query_info>() {
409 error!(
410 "virtio-snd: The driver sent the wrong number bytes for a pcm_info struct: {}",
411 read_buf.len()
412 );
413 return None;
414 }
415 let mut query: virtio_snd_query_info = Default::default();
416 query.as_bytes_mut().copy_from_slice(read_buf);
417 let start_id = query.start_id.to_native();
418 let count = query.count.to_native();
419 Some((start_id, count))
420 }
421
422 // Returns Err if it encounters an unrecoverable error, Ok otherwise
process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()>423 fn process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()> {
424 if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_set_params>() {
425 error!(
426 "virtio-snd: The driver sent a buffer of the wrong size for a set_params struct: {}",
427 read_buf.len()
428 );
429 return reply_control_op_status(
430 VIRTIO_SND_S_BAD_MSG,
431 desc,
432 &self.guest_memory,
433 &self.control_queue,
434 &self.interrupt,
435 );
436 }
437 let mut params: virtio_snd_pcm_set_params = Default::default();
438 params.as_bytes_mut().copy_from_slice(read_buf);
439 let stream_id = params.hdr.stream_id.to_native();
440 if stream_id < self.vios_client.num_streams() {
441 self.streams[stream_id as usize].send(StreamMsg::SetParams(desc, params))
442 } else {
443 error!(
444 "virtio-snd: Driver requested operation on invalid stream: {}",
445 stream_id
446 );
447 reply_control_op_status(
448 VIRTIO_SND_S_BAD_MSG,
449 desc,
450 &self.guest_memory,
451 &self.control_queue,
452 &self.interrupt,
453 )
454 }
455 }
456
457 // Returns Err if it encounters an unrecoverable error, Ok otherwise
try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()>458 fn try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()> {
459 if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_hdr>() {
460 error!(
461 "virtio-snd: The driver sent a buffer too small to contain a header: {}",
462 read_buf.len()
463 );
464 return reply_control_op_status(
465 VIRTIO_SND_S_BAD_MSG,
466 match msg {
467 StreamMsg::Prepare(d)
468 | StreamMsg::Start(d)
469 | StreamMsg::Stop(d)
470 | StreamMsg::Release(d) => d,
471 _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
472 },
473 &self.guest_memory,
474 &self.control_queue,
475 &self.interrupt,
476 );
477 }
478 let mut pcm_hdr: virtio_snd_pcm_hdr = Default::default();
479 pcm_hdr.as_bytes_mut().copy_from_slice(read_buf);
480 let stream_id = pcm_hdr.stream_id.to_native();
481 if stream_id < self.vios_client.num_streams() {
482 self.streams[stream_id as usize].send(msg)
483 } else {
484 error!(
485 "virtio-snd: Driver requested operation on invalid stream: {}",
486 stream_id
487 );
488 reply_control_op_status(
489 VIRTIO_SND_S_BAD_MSG,
490 match msg {
491 StreamMsg::Prepare(d)
492 | StreamMsg::Start(d)
493 | StreamMsg::Stop(d)
494 | StreamMsg::Release(d) => d,
495 _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
496 },
497 &self.guest_memory,
498 &self.control_queue,
499 &self.interrupt,
500 )
501 }
502 }
503
send_info_reply<T: AsBytes>( &mut self, desc: DescriptorChain, code: u32, info_vec: Vec<T>, ) -> Result<()>504 fn send_info_reply<T: AsBytes>(
505 &mut self,
506 desc: DescriptorChain,
507 code: u32,
508 info_vec: Vec<T>,
509 ) -> Result<()> {
510 let desc_index = desc.index;
511 let mut writer =
512 Writer::new(self.guest_memory.clone(), desc).map_err(SoundError::Descriptor)?;
513 writer
514 .write_obj(virtio_snd_hdr {
515 code: Le32::from(code),
516 })
517 .map_err(SoundError::QueueIO)?;
518 for info in info_vec {
519 writer.write_obj(info).map_err(SoundError::QueueIO)?;
520 }
521 {
522 let mut queue_lock = self.control_queue.lock();
523 queue_lock.add_used(
524 &self.guest_memory,
525 desc_index,
526 writer.bytes_written() as u32,
527 );
528 queue_lock.trigger_interrupt(&self.guest_memory, &self.interrupt);
529 }
530 Ok(())
531 }
532 }
533
534 impl Drop for Worker {
drop(&mut self)535 fn drop(&mut self) {
536 self.stop_io_thread();
537 }
538 }
539
io_loop( interrupt: Interrupt, guest_memory: GuestMemory, tx_queue: Arc<Mutex<Queue>>, tx_queue_evt: Event, rx_queue: Arc<Mutex<Queue>>, rx_queue_evt: Event, senders: Vec<Sender<StreamMsg>>, kill_evt: Event, ) -> Result<()>540 fn io_loop(
541 interrupt: Interrupt,
542 guest_memory: GuestMemory,
543 tx_queue: Arc<Mutex<Queue>>,
544 tx_queue_evt: Event,
545 rx_queue: Arc<Mutex<Queue>>,
546 rx_queue_evt: Event,
547 senders: Vec<Sender<StreamMsg>>,
548 kill_evt: Event,
549 ) -> Result<()> {
550 #[derive(EventToken)]
551 enum Token {
552 TxQAvailable,
553 RxQAvailable,
554 Kill,
555 }
556 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
557 (&tx_queue_evt, Token::TxQAvailable),
558 (&rx_queue_evt, Token::RxQAvailable),
559 (&kill_evt, Token::Kill),
560 ])
561 .map_err(SoundError::WaitCtx)?;
562
563 'wait: loop {
564 let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
565 for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
566 let queue = match wait_evt.token {
567 Token::TxQAvailable => {
568 tx_queue_evt.wait().map_err(SoundError::QueueEvt)?;
569 &tx_queue
570 }
571 Token::RxQAvailable => {
572 rx_queue_evt.wait().map_err(SoundError::QueueEvt)?;
573 &rx_queue
574 }
575 Token::Kill => {
576 let _ = kill_evt.wait();
577 break 'wait;
578 }
579 };
580 while let Some(avail_desc) = lock_pop_unlock(queue, &guest_memory) {
581 let mut reader = Reader::new(guest_memory.clone(), avail_desc.clone())
582 .map_err(SoundError::Descriptor)?;
583 let xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(SoundError::QueueIO)?;
584 let stream_id = xfer.stream_id.to_native();
585 if stream_id as usize >= senders.len() {
586 error!(
587 "virtio-snd: Driver sent buffer for invalid stream: {}",
588 stream_id
589 );
590 reply_pcm_buffer_status(
591 VIRTIO_SND_S_IO_ERR,
592 0,
593 avail_desc,
594 &guest_memory,
595 queue,
596 &interrupt,
597 )?;
598 } else {
599 StreamProxy::send_msg(
600 &senders[stream_id as usize],
601 StreamMsg::Buffer(avail_desc),
602 )?;
603 }
604 }
605 }
606 }
607 Ok(())
608 }
609
610 // If queue.lock().pop() is used directly in the condition of a 'while' loop the lock is held over
611 // the entire loop block. Encapsulating it in this fuction guarantees that the lock is dropped
612 // immediately after pop() is called, which allows the code to remain somewhat simpler.
lock_pop_unlock( queue: &Arc<Mutex<Queue>>, guest_memory: &GuestMemory, ) -> Option<DescriptorChain>613 fn lock_pop_unlock(
614 queue: &Arc<Mutex<Queue>>,
615 guest_memory: &GuestMemory,
616 ) -> Option<DescriptorChain> {
617 queue.lock().pop(guest_memory)
618 }
619