1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::io::Read;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9
10 use base::error;
11 use base::warn;
12 use base::Event;
13 use base::EventToken;
14 use base::WaitContext;
15 use data_model::Le32;
16 use sync::Mutex;
17 use zerocopy::AsBytes;
18
19 use super::super::constants::*;
20 use super::super::layout::*;
21 use super::streams::*;
22 use super::Result;
23 use super::SoundError;
24 use super::*;
25 use crate::virtio::DescriptorChain;
26 use crate::virtio::Interrupt;
27 use crate::virtio::Queue;
28
29 pub struct Worker {
30 // Lock order: Must never hold more than one queue lock at the same time.
31 interrupt: Interrupt,
32 pub control_queue: Arc<Mutex<Queue>>,
33 pub event_queue: Option<Queue>,
34 vios_client: Arc<Mutex<VioSClient>>,
35 streams: Vec<StreamProxy>,
36 pub tx_queue: Arc<Mutex<Queue>>,
37 pub rx_queue: Arc<Mutex<Queue>>,
38 io_thread: Option<thread::JoinHandle<Result<()>>>,
39 io_kill: Event,
40 // saved_stream_state holds the previous state of streams. When the sound device is newly
41 // created, this will be empty. It will only contain state if the sound device is put to sleep
42 // OR if we restore a VM.
43 pub saved_stream_state: Vec<StreamSnapshot>,
44 }
45
46 impl Worker {
47 /// Creates a new virtio-snd worker.
try_new( vios_client: Arc<Mutex<VioSClient>>, interrupt: Interrupt, control_queue: Arc<Mutex<Queue>>, event_queue: Queue, tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, saved_stream_state: Vec<StreamSnapshot>, ) -> Result<Worker>48 pub fn try_new(
49 vios_client: Arc<Mutex<VioSClient>>,
50 interrupt: Interrupt,
51 control_queue: Arc<Mutex<Queue>>,
52 event_queue: Queue,
53 tx_queue: Arc<Mutex<Queue>>,
54 rx_queue: Arc<Mutex<Queue>>,
55 saved_stream_state: Vec<StreamSnapshot>,
56 ) -> Result<Worker> {
57 let num_streams = vios_client.lock().num_streams();
58 let mut streams: Vec<StreamProxy> = Vec::with_capacity(num_streams as usize);
59 {
60 for stream_id in 0..num_streams {
61 let capture = vios_client
62 .lock()
63 .stream_info(stream_id)
64 .map(|i| i.direction == VIRTIO_SND_D_INPUT)
65 .unwrap_or(false);
66 let io_queue = if capture { &rx_queue } else { &tx_queue };
67 streams.push(Stream::try_new(
68 stream_id,
69 vios_client.clone(),
70 interrupt.clone(),
71 control_queue.clone(),
72 io_queue.clone(),
73 capture,
74 saved_stream_state.get(stream_id as usize).cloned(),
75 )?);
76 }
77 }
78 let (self_kill_io, kill_io) = Event::new()
79 .and_then(|e| Ok((e.try_clone()?, e)))
80 .map_err(SoundError::CreateEvent)?;
81
82 let interrupt_clone = interrupt.clone();
83 let senders: Vec<Sender<Box<StreamMsg>>> =
84 streams.iter().map(|sp| sp.msg_sender().clone()).collect();
85 let tx_queue_thread = tx_queue.clone();
86 let rx_queue_thread = rx_queue.clone();
87 let io_thread = thread::Builder::new()
88 .name("v_snd_io".to_string())
89 .spawn(move || {
90 try_set_real_time_priority();
91
92 io_loop(
93 interrupt_clone,
94 tx_queue_thread,
95 rx_queue_thread,
96 senders,
97 kill_io,
98 )
99 })
100 .map_err(SoundError::CreateThread)?;
101 Ok(Worker {
102 interrupt,
103 control_queue,
104 event_queue: Some(event_queue),
105 vios_client,
106 streams,
107 tx_queue,
108 rx_queue,
109 io_thread: Some(io_thread),
110 io_kill: self_kill_io,
111 saved_stream_state: Vec::new(),
112 })
113 }
114
115 /// Emulates the virtio-snd device. It won't return until something is written to the kill_evt
116 /// event or an unrecoverable error occurs.
control_loop(&mut self, kill_evt: Event) -> Result<()>117 pub fn control_loop(&mut self, kill_evt: Event) -> Result<()> {
118 let event_notifier = self
119 .vios_client
120 .lock()
121 .get_event_notifier()
122 .map_err(SoundError::ClientEventNotifier)?;
123 #[derive(EventToken)]
124 enum Token {
125 ControlQAvailable,
126 EventQAvailable,
127 InterruptResample,
128 EventTriggered,
129 Kill,
130 }
131 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
132 (self.control_queue.lock().event(), Token::ControlQAvailable),
133 (
134 self.event_queue.as_ref().expect("queue missing").event(),
135 Token::EventQAvailable,
136 ),
137 (&event_notifier, Token::EventTriggered),
138 (&kill_evt, Token::Kill),
139 ])
140 .map_err(SoundError::WaitCtx)?;
141
142 if let Some(resample_evt) = self.interrupt.get_resample_evt() {
143 wait_ctx
144 .add(resample_evt, Token::InterruptResample)
145 .map_err(SoundError::WaitCtx)?;
146 }
147 let mut event_queue = self.event_queue.take().expect("event_queue missing");
148 'wait: loop {
149 let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
150
151 for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
152 match wait_evt.token {
153 Token::ControlQAvailable => {
154 self.control_queue
155 .lock()
156 .event()
157 .wait()
158 .map_err(SoundError::QueueEvt)?;
159 self.process_controlq_buffers()?;
160 }
161 Token::EventQAvailable => {
162 // Just read from the event object to make sure the producer of such events
163 // never blocks. The buffers will only be used when actual virtio-snd
164 // events are triggered.
165 event_queue.event().wait().map_err(SoundError::QueueEvt)?;
166 }
167 Token::EventTriggered => {
168 event_notifier.wait().map_err(SoundError::QueueEvt)?;
169 self.process_event_triggered(&mut event_queue)?;
170 }
171 Token::InterruptResample => {
172 self.interrupt.interrupt_resample();
173 }
174 Token::Kill => {
175 let _ = kill_evt.wait();
176 break 'wait;
177 }
178 }
179 }
180 }
181 self.saved_stream_state = self
182 .streams
183 .drain(..)
184 .map(|stream| stream.stop_thread())
185 .collect();
186 self.event_queue = Some(event_queue);
187 Ok(())
188 }
189
stop_io_thread(&mut self)190 fn stop_io_thread(&mut self) {
191 if let Err(e) = self.io_kill.signal() {
192 error!(
193 "virtio-snd: Failed to send Break msg to stream thread: {}",
194 e
195 );
196 }
197 if let Some(th) = self.io_thread.take() {
198 match th.join() {
199 Err(e) => {
200 error!("virtio-snd: Panic detected on stream thread: {:?}", e);
201 }
202 Ok(r) => {
203 if let Err(e) = r {
204 error!("virtio-snd: IO thread exited with and error: {}", e);
205 }
206 }
207 }
208 }
209 }
210
211 // Pops and handles all available ontrol queue buffers. Logs minor errors, but returns an
212 // Err if it encounters an unrecoverable error.
process_controlq_buffers(&mut self) -> Result<()>213 fn process_controlq_buffers(&mut self) -> Result<()> {
214 while let Some(mut avail_desc) = lock_pop_unlock(&self.control_queue) {
215 let reader = &mut avail_desc.reader;
216 let available_bytes = reader.available_bytes();
217 if available_bytes < std::mem::size_of::<virtio_snd_hdr>() {
218 error!(
219 "virtio-snd: Message received on control queue is too small: {}",
220 available_bytes
221 );
222 return reply_control_op_status(
223 VIRTIO_SND_S_BAD_MSG,
224 avail_desc,
225 &self.control_queue,
226 &self.interrupt,
227 );
228 }
229 let mut read_buf = vec![0u8; available_bytes];
230 reader
231 .read_exact(&mut read_buf)
232 .map_err(SoundError::QueueIO)?;
233 let mut code: Le32 = Default::default();
234 // need to copy because the buffer may not be properly aligned
235 code.as_bytes_mut()
236 .copy_from_slice(&read_buf[..std::mem::size_of::<Le32>()]);
237 let request_type = code.to_native();
238 match request_type {
239 VIRTIO_SND_R_JACK_INFO => {
240 let (code, info_vec) = {
241 match self.parse_info_query(&read_buf) {
242 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
243 Some((start_id, count)) => {
244 let end_id = start_id.saturating_add(count);
245 if end_id > self.vios_client.lock().num_jacks() {
246 error!(
247 "virtio-snd: Requested info on invalid jacks ids: {}..{}",
248 start_id,
249 end_id - 1
250 );
251 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
252 } else {
253 (
254 VIRTIO_SND_S_OK,
255 // Safe to unwrap because we just ensured all the ids are
256 // valid
257 (start_id..end_id)
258 .map(|id| {
259 self.vios_client.lock().jack_info(id).unwrap()
260 })
261 .collect(),
262 )
263 }
264 }
265 }
266 };
267 self.send_info_reply(avail_desc, code, info_vec)?;
268 }
269 VIRTIO_SND_R_JACK_REMAP => {
270 let code = if read_buf.len() != std::mem::size_of::<virtio_snd_jack_remap>() {
271 error!(
272 "virtio-snd: The driver sent the wrong number bytes for a jack_remap struct: {}",
273 read_buf.len()
274 );
275 VIRTIO_SND_S_BAD_MSG
276 } else {
277 let mut request: virtio_snd_jack_remap = Default::default();
278 request.as_bytes_mut().copy_from_slice(&read_buf);
279 let jack_id = request.hdr.jack_id.to_native();
280 let association = request.association.to_native();
281 let sequence = request.sequence.to_native();
282 if let Err(e) =
283 self.vios_client
284 .lock()
285 .remap_jack(jack_id, association, sequence)
286 {
287 error!("virtio-snd: Failed to remap jack: {}", e);
288 vios_error_to_status_code(e)
289 } else {
290 VIRTIO_SND_S_OK
291 }
292 };
293 let writer = &mut avail_desc.writer;
294 writer
295 .write_obj(virtio_snd_hdr {
296 code: Le32::from(code),
297 })
298 .map_err(SoundError::QueueIO)?;
299 let len = writer.bytes_written() as u32;
300 {
301 let mut queue_lock = self.control_queue.lock();
302 queue_lock.add_used(avail_desc, len);
303 queue_lock.trigger_interrupt(&self.interrupt);
304 }
305 }
306 VIRTIO_SND_R_CHMAP_INFO => {
307 let (code, info_vec) = {
308 match self.parse_info_query(&read_buf) {
309 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
310 Some((start_id, count)) => {
311 let end_id = start_id.saturating_add(count);
312 let num_chmaps = self.vios_client.lock().num_chmaps();
313 if end_id > num_chmaps {
314 error!(
315 "virtio-snd: Requested info on invalid chmaps ids: {}..{}",
316 start_id,
317 end_id - 1
318 );
319 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
320 } else {
321 (
322 VIRTIO_SND_S_OK,
323 // Safe to unwrap because we just ensured all the ids are
324 // valid
325 (start_id..end_id)
326 .map(|id| {
327 self.vios_client.lock().chmap_info(id).unwrap()
328 })
329 .collect(),
330 )
331 }
332 }
333 }
334 };
335 self.send_info_reply(avail_desc, code, info_vec)?;
336 }
337 VIRTIO_SND_R_PCM_INFO => {
338 let (code, info_vec) = {
339 match self.parse_info_query(&read_buf) {
340 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
341 Some((start_id, count)) => {
342 let end_id = start_id.saturating_add(count);
343 if end_id > self.vios_client.lock().num_streams() {
344 error!(
345 "virtio-snd: Requested info on invalid stream ids: {}..{}",
346 start_id,
347 end_id - 1
348 );
349 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
350 } else {
351 (
352 VIRTIO_SND_S_OK,
353 // Safe to unwrap because we just ensured all the ids are
354 // valid
355 (start_id..end_id)
356 .map(|id| {
357 self.vios_client.lock().stream_info(id).unwrap()
358 })
359 .collect(),
360 )
361 }
362 }
363 }
364 };
365 self.send_info_reply(avail_desc, code, info_vec)?;
366 }
367 VIRTIO_SND_R_PCM_SET_PARAMS => self.process_set_params(avail_desc, &read_buf)?,
368 VIRTIO_SND_R_PCM_PREPARE => {
369 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Prepare(avail_desc))?
370 }
371 VIRTIO_SND_R_PCM_RELEASE => {
372 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Release(avail_desc))?
373 }
374 VIRTIO_SND_R_PCM_START => {
375 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Start(avail_desc))?
376 }
377 VIRTIO_SND_R_PCM_STOP => {
378 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Stop(avail_desc))?
379 }
380 _ => {
381 error!(
382 "virtio-snd: Unknown control queue mesage code: {}",
383 request_type
384 );
385 reply_control_op_status(
386 VIRTIO_SND_S_NOT_SUPP,
387 avail_desc,
388 &self.control_queue,
389 &self.interrupt,
390 )?;
391 }
392 }
393 }
394 Ok(())
395 }
396
process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()>397 fn process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()> {
398 while let Some(evt) = self.vios_client.lock().pop_event() {
399 if let Some(mut desc) = event_queue.pop() {
400 let writer = &mut desc.writer;
401 writer.write_obj(evt).map_err(SoundError::QueueIO)?;
402 let len = writer.bytes_written() as u32;
403 event_queue.add_used(desc, len);
404 event_queue.trigger_interrupt(&self.interrupt);
405 } else {
406 warn!("virtio-snd: Dropping event because there are no buffers in virtqueue");
407 }
408 }
409 Ok(())
410 }
411
parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)>412 fn parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)> {
413 if read_buf.len() != std::mem::size_of::<virtio_snd_query_info>() {
414 error!(
415 "virtio-snd: The driver sent the wrong number bytes for a pcm_info struct: {}",
416 read_buf.len()
417 );
418 return None;
419 }
420 let mut query: virtio_snd_query_info = Default::default();
421 query.as_bytes_mut().copy_from_slice(read_buf);
422 let start_id = query.start_id.to_native();
423 let count = query.count.to_native();
424 Some((start_id, count))
425 }
426
427 // Returns Err if it encounters an unrecoverable error, Ok otherwise
process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()>428 fn process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()> {
429 if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_set_params>() {
430 error!(
431 "virtio-snd: The driver sent a buffer of the wrong size for a set_params struct: {}",
432 read_buf.len()
433 );
434 return reply_control_op_status(
435 VIRTIO_SND_S_BAD_MSG,
436 desc,
437 &self.control_queue,
438 &self.interrupt,
439 );
440 }
441 let mut params: virtio_snd_pcm_set_params = Default::default();
442 params.as_bytes_mut().copy_from_slice(read_buf);
443 let stream_id = params.hdr.stream_id.to_native();
444 if stream_id < self.vios_client.lock().num_streams() {
445 self.streams[stream_id as usize].send(StreamMsg::SetParams(desc, params))
446 } else {
447 error!(
448 "virtio-snd: Driver requested operation on invalid stream: {}",
449 stream_id
450 );
451 reply_control_op_status(
452 VIRTIO_SND_S_BAD_MSG,
453 desc,
454 &self.control_queue,
455 &self.interrupt,
456 )
457 }
458 }
459
460 // Returns Err if it encounters an unrecoverable error, Ok otherwise
try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()>461 fn try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()> {
462 if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_hdr>() {
463 error!(
464 "virtio-snd: The driver sent a buffer too small to contain a header: {}",
465 read_buf.len()
466 );
467 return reply_control_op_status(
468 VIRTIO_SND_S_BAD_MSG,
469 match msg {
470 StreamMsg::Prepare(d)
471 | StreamMsg::Start(d)
472 | StreamMsg::Stop(d)
473 | StreamMsg::Release(d) => d,
474 _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
475 },
476 &self.control_queue,
477 &self.interrupt,
478 );
479 }
480 let mut pcm_hdr: virtio_snd_pcm_hdr = Default::default();
481 pcm_hdr.as_bytes_mut().copy_from_slice(read_buf);
482 let stream_id = pcm_hdr.stream_id.to_native();
483 if stream_id < self.vios_client.lock().num_streams() {
484 self.streams[stream_id as usize].send(msg)
485 } else {
486 error!(
487 "virtio-snd: Driver requested operation on invalid stream: {}",
488 stream_id
489 );
490 reply_control_op_status(
491 VIRTIO_SND_S_BAD_MSG,
492 match msg {
493 StreamMsg::Prepare(d)
494 | StreamMsg::Start(d)
495 | StreamMsg::Stop(d)
496 | StreamMsg::Release(d) => d,
497 _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
498 },
499 &self.control_queue,
500 &self.interrupt,
501 )
502 }
503 }
504
send_info_reply<T: AsBytes>( &mut self, mut desc: DescriptorChain, code: u32, info_vec: Vec<T>, ) -> Result<()>505 fn send_info_reply<T: AsBytes>(
506 &mut self,
507 mut desc: DescriptorChain,
508 code: u32,
509 info_vec: Vec<T>,
510 ) -> Result<()> {
511 let writer = &mut desc.writer;
512 writer
513 .write_obj(virtio_snd_hdr {
514 code: Le32::from(code),
515 })
516 .map_err(SoundError::QueueIO)?;
517 for info in info_vec {
518 writer.write_obj(info).map_err(SoundError::QueueIO)?;
519 }
520 let len = writer.bytes_written() as u32;
521 {
522 let mut queue_lock = self.control_queue.lock();
523 queue_lock.add_used(desc, len);
524 queue_lock.trigger_interrupt(&self.interrupt);
525 }
526 Ok(())
527 }
528 }
529
530 impl Drop for Worker {
drop(&mut self)531 fn drop(&mut self) {
532 self.stop_io_thread();
533 }
534 }
535
io_loop( interrupt: Interrupt, tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, senders: Vec<Sender<Box<StreamMsg>>>, kill_evt: Event, ) -> Result<()>536 fn io_loop(
537 interrupt: Interrupt,
538 tx_queue: Arc<Mutex<Queue>>,
539 rx_queue: Arc<Mutex<Queue>>,
540 senders: Vec<Sender<Box<StreamMsg>>>,
541 kill_evt: Event,
542 ) -> Result<()> {
543 #[derive(EventToken)]
544 enum Token {
545 TxQAvailable,
546 RxQAvailable,
547 Kill,
548 }
549 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
550 (tx_queue.lock().event(), Token::TxQAvailable),
551 (rx_queue.lock().event(), Token::RxQAvailable),
552 (&kill_evt, Token::Kill),
553 ])
554 .map_err(SoundError::WaitCtx)?;
555
556 'wait: loop {
557 let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
558 for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
559 let queue = match wait_evt.token {
560 Token::TxQAvailable => {
561 tx_queue
562 .lock()
563 .event()
564 .wait()
565 .map_err(SoundError::QueueEvt)?;
566 &tx_queue
567 }
568 Token::RxQAvailable => {
569 rx_queue
570 .lock()
571 .event()
572 .wait()
573 .map_err(SoundError::QueueEvt)?;
574 &rx_queue
575 }
576 Token::Kill => {
577 let _ = kill_evt.wait();
578 break 'wait;
579 }
580 };
581 while let Some(mut avail_desc) = lock_pop_unlock(queue) {
582 let reader = &mut avail_desc.reader;
583 let xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(SoundError::QueueIO)?;
584 let stream_id = xfer.stream_id.to_native();
585 if stream_id as usize >= senders.len() {
586 error!(
587 "virtio-snd: Driver sent buffer for invalid stream: {}",
588 stream_id
589 );
590 reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, avail_desc, queue, &interrupt)?;
591 } else {
592 StreamProxy::send_msg(
593 &senders[stream_id as usize],
594 StreamMsg::Buffer(avail_desc),
595 )?;
596 }
597 }
598 }
599 }
600 Ok(())
601 }
602
603 // If queue.lock().pop() is used directly in the condition of a 'while' loop the lock is held over
604 // the entire loop block. Encapsulating it in this fuction guarantees that the lock is dropped
605 // immediately after pop() is called, which allows the code to remain somewhat simpler.
lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain>606 fn lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain> {
607 queue.lock().pop()
608 }
609