1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::io::Read;
6 use std::sync::mpsc::Sender;
7 use std::sync::Arc;
8 use std::thread;
9
10 use base::error;
11 use base::warn;
12 use base::Event;
13 use base::EventToken;
14 use base::WaitContext;
15 use data_model::Le32;
16 use sync::Mutex;
17 use zerocopy::Immutable;
18 use zerocopy::IntoBytes;
19
20 use super::super::constants::*;
21 use super::super::layout::*;
22 use super::streams::*;
23 use super::Result;
24 use super::SoundError;
25 use super::*;
26 use crate::virtio::DescriptorChain;
27 use crate::virtio::Queue;
28
29 pub struct Worker {
30 // Lock order: Must never hold more than one queue lock at the same time.
31 pub control_queue: Arc<Mutex<Queue>>,
32 pub event_queue: Option<Queue>,
33 vios_client: Arc<Mutex<VioSClient>>,
34 streams: Vec<StreamProxy>,
35 pub tx_queue: Arc<Mutex<Queue>>,
36 pub rx_queue: Arc<Mutex<Queue>>,
37 io_thread: Option<thread::JoinHandle<Result<()>>>,
38 io_kill: Event,
39 // saved_stream_state holds the previous state of streams. When the sound device is newly
40 // created, this will be empty. It will only contain state if the sound device is put to sleep
41 // OR if we restore a VM.
42 pub saved_stream_state: Vec<StreamSnapshot>,
43 }
44
45 impl Worker {
46 /// Creates a new virtio-snd worker.
try_new( vios_client: Arc<Mutex<VioSClient>>, control_queue: Arc<Mutex<Queue>>, event_queue: Queue, tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, saved_stream_state: Vec<StreamSnapshot>, ) -> Result<Worker>47 pub fn try_new(
48 vios_client: Arc<Mutex<VioSClient>>,
49 control_queue: Arc<Mutex<Queue>>,
50 event_queue: Queue,
51 tx_queue: Arc<Mutex<Queue>>,
52 rx_queue: Arc<Mutex<Queue>>,
53 saved_stream_state: Vec<StreamSnapshot>,
54 ) -> Result<Worker> {
55 let num_streams = vios_client.lock().num_streams();
56 let mut streams: Vec<StreamProxy> = Vec::with_capacity(num_streams as usize);
57 {
58 for stream_id in 0..num_streams {
59 let capture = vios_client
60 .lock()
61 .stream_info(stream_id)
62 .map(|i| i.direction == VIRTIO_SND_D_INPUT)
63 .unwrap_or(false);
64 let io_queue = if capture { &rx_queue } else { &tx_queue };
65 streams.push(Stream::try_new(
66 stream_id,
67 vios_client.clone(),
68 control_queue.clone(),
69 io_queue.clone(),
70 capture,
71 saved_stream_state.get(stream_id as usize).cloned(),
72 )?);
73 }
74 }
75 let (self_kill_io, kill_io) = Event::new()
76 .and_then(|e| Ok((e.try_clone()?, e)))
77 .map_err(SoundError::CreateEvent)?;
78
79 let senders: Vec<Sender<Box<StreamMsg>>> =
80 streams.iter().map(|sp| sp.msg_sender().clone()).collect();
81 let tx_queue_thread = tx_queue.clone();
82 let rx_queue_thread = rx_queue.clone();
83 let io_thread = thread::Builder::new()
84 .name("v_snd_io".to_string())
85 .spawn(move || {
86 try_set_real_time_priority();
87
88 io_loop(tx_queue_thread, rx_queue_thread, senders, kill_io)
89 })
90 .map_err(SoundError::CreateThread)?;
91 Ok(Worker {
92 control_queue,
93 event_queue: Some(event_queue),
94 vios_client,
95 streams,
96 tx_queue,
97 rx_queue,
98 io_thread: Some(io_thread),
99 io_kill: self_kill_io,
100 saved_stream_state: Vec::new(),
101 })
102 }
103
104 /// Emulates the virtio-snd device. It won't return until something is written to the kill_evt
105 /// event or an unrecoverable error occurs.
control_loop(&mut self, kill_evt: Event) -> Result<()>106 pub fn control_loop(&mut self, kill_evt: Event) -> Result<()> {
107 let event_notifier = self
108 .vios_client
109 .lock()
110 .get_event_notifier()
111 .map_err(SoundError::ClientEventNotifier)?;
112 #[derive(EventToken)]
113 enum Token {
114 ControlQAvailable,
115 EventQAvailable,
116 EventTriggered,
117 Kill,
118 }
119 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
120 (self.control_queue.lock().event(), Token::ControlQAvailable),
121 (
122 self.event_queue.as_ref().expect("queue missing").event(),
123 Token::EventQAvailable,
124 ),
125 (&event_notifier, Token::EventTriggered),
126 (&kill_evt, Token::Kill),
127 ])
128 .map_err(SoundError::WaitCtx)?;
129
130 let mut event_queue = self.event_queue.take().expect("event_queue missing");
131 'wait: loop {
132 let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
133
134 for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
135 match wait_evt.token {
136 Token::ControlQAvailable => {
137 self.control_queue
138 .lock()
139 .event()
140 .wait()
141 .map_err(SoundError::QueueEvt)?;
142 self.process_controlq_buffers()?;
143 }
144 Token::EventQAvailable => {
145 // Just read from the event object to make sure the producer of such events
146 // never blocks. The buffers will only be used when actual virtio-snd
147 // events are triggered.
148 event_queue.event().wait().map_err(SoundError::QueueEvt)?;
149 }
150 Token::EventTriggered => {
151 event_notifier.wait().map_err(SoundError::QueueEvt)?;
152 self.process_event_triggered(&mut event_queue)?;
153 }
154 Token::Kill => {
155 let _ = kill_evt.wait();
156 break 'wait;
157 }
158 }
159 }
160 }
161 self.saved_stream_state = self
162 .streams
163 .drain(..)
164 .map(|stream| stream.stop_thread())
165 .collect();
166 self.event_queue = Some(event_queue);
167 Ok(())
168 }
169
stop_io_thread(&mut self)170 fn stop_io_thread(&mut self) {
171 if let Err(e) = self.io_kill.signal() {
172 error!(
173 "virtio-snd: Failed to send Break msg to stream thread: {}",
174 e
175 );
176 }
177 if let Some(th) = self.io_thread.take() {
178 match th.join() {
179 Err(e) => {
180 error!("virtio-snd: Panic detected on stream thread: {:?}", e);
181 }
182 Ok(r) => {
183 if let Err(e) = r {
184 error!("virtio-snd: IO thread exited with and error: {}", e);
185 }
186 }
187 }
188 }
189 }
190
191 // Pops and handles all available ontrol queue buffers. Logs minor errors, but returns an
192 // Err if it encounters an unrecoverable error.
process_controlq_buffers(&mut self) -> Result<()>193 fn process_controlq_buffers(&mut self) -> Result<()> {
194 while let Some(mut avail_desc) = lock_pop_unlock(&self.control_queue) {
195 let reader = &mut avail_desc.reader;
196 let available_bytes = reader.available_bytes();
197 let Ok(hdr) = reader.peek_obj::<virtio_snd_hdr>() else {
198 error!(
199 "virtio-snd: Message received on control queue is too small: {}",
200 available_bytes
201 );
202 return reply_control_op_status(
203 VIRTIO_SND_S_BAD_MSG,
204 avail_desc,
205 &self.control_queue,
206 );
207 };
208 let mut read_buf = vec![0u8; available_bytes];
209 reader
210 .read_exact(&mut read_buf)
211 .map_err(SoundError::QueueIO)?;
212 let request_type = hdr.code.to_native();
213 match request_type {
214 VIRTIO_SND_R_JACK_INFO => {
215 let (code, info_vec) = {
216 match self.parse_info_query(&read_buf) {
217 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
218 Some((start_id, count)) => {
219 let end_id = start_id.saturating_add(count);
220 if end_id > self.vios_client.lock().num_jacks() {
221 error!(
222 "virtio-snd: Requested info on invalid jacks ids: {}..{}",
223 start_id,
224 end_id - 1
225 );
226 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
227 } else {
228 (
229 VIRTIO_SND_S_OK,
230 // Safe to unwrap because we just ensured all the ids are
231 // valid
232 (start_id..end_id)
233 .map(|id| {
234 self.vios_client.lock().jack_info(id).unwrap()
235 })
236 .collect(),
237 )
238 }
239 }
240 }
241 };
242 self.send_info_reply(avail_desc, code, info_vec)?;
243 }
244 VIRTIO_SND_R_JACK_REMAP => {
245 let code = if read_buf.len() != std::mem::size_of::<virtio_snd_jack_remap>() {
246 error!(
247 "virtio-snd: The driver sent the wrong number bytes for a jack_remap struct: {}",
248 read_buf.len()
249 );
250 VIRTIO_SND_S_BAD_MSG
251 } else {
252 let mut request: virtio_snd_jack_remap = Default::default();
253 request.as_mut_bytes().copy_from_slice(&read_buf);
254 let jack_id = request.hdr.jack_id.to_native();
255 let association = request.association.to_native();
256 let sequence = request.sequence.to_native();
257 if let Err(e) =
258 self.vios_client
259 .lock()
260 .remap_jack(jack_id, association, sequence)
261 {
262 error!("virtio-snd: Failed to remap jack: {}", e);
263 vios_error_to_status_code(e)
264 } else {
265 VIRTIO_SND_S_OK
266 }
267 };
268 let writer = &mut avail_desc.writer;
269 writer
270 .write_obj(virtio_snd_hdr {
271 code: Le32::from(code),
272 })
273 .map_err(SoundError::QueueIO)?;
274 let len = writer.bytes_written() as u32;
275 {
276 let mut queue_lock = self.control_queue.lock();
277 queue_lock.add_used(avail_desc, len);
278 queue_lock.trigger_interrupt();
279 }
280 }
281 VIRTIO_SND_R_CHMAP_INFO => {
282 let (code, info_vec) = {
283 match self.parse_info_query(&read_buf) {
284 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
285 Some((start_id, count)) => {
286 let end_id = start_id.saturating_add(count);
287 let num_chmaps = self.vios_client.lock().num_chmaps();
288 if end_id > num_chmaps {
289 error!(
290 "virtio-snd: Requested info on invalid chmaps ids: {}..{}",
291 start_id,
292 end_id - 1
293 );
294 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
295 } else {
296 (
297 VIRTIO_SND_S_OK,
298 // Safe to unwrap because we just ensured all the ids are
299 // valid
300 (start_id..end_id)
301 .map(|id| {
302 self.vios_client.lock().chmap_info(id).unwrap()
303 })
304 .collect(),
305 )
306 }
307 }
308 }
309 };
310 self.send_info_reply(avail_desc, code, info_vec)?;
311 }
312 VIRTIO_SND_R_PCM_INFO => {
313 let (code, info_vec) = {
314 match self.parse_info_query(&read_buf) {
315 None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),
316 Some((start_id, count)) => {
317 let end_id = start_id.saturating_add(count);
318 if end_id > self.vios_client.lock().num_streams() {
319 error!(
320 "virtio-snd: Requested info on invalid stream ids: {}..{}",
321 start_id,
322 end_id - 1
323 );
324 (VIRTIO_SND_S_NOT_SUPP, Vec::new())
325 } else {
326 (
327 VIRTIO_SND_S_OK,
328 // Safe to unwrap because we just ensured all the ids are
329 // valid
330 (start_id..end_id)
331 .map(|id| {
332 self.vios_client.lock().stream_info(id).unwrap()
333 })
334 .collect(),
335 )
336 }
337 }
338 }
339 };
340 self.send_info_reply(avail_desc, code, info_vec)?;
341 }
342 VIRTIO_SND_R_PCM_SET_PARAMS => self.process_set_params(avail_desc, &read_buf)?,
343 VIRTIO_SND_R_PCM_PREPARE => {
344 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Prepare(avail_desc))?
345 }
346 VIRTIO_SND_R_PCM_RELEASE => {
347 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Release(avail_desc))?
348 }
349 VIRTIO_SND_R_PCM_START => {
350 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Start(avail_desc))?
351 }
352 VIRTIO_SND_R_PCM_STOP => {
353 self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Stop(avail_desc))?
354 }
355 _ => {
356 error!(
357 "virtio-snd: Unknown control queue mesage code: {}",
358 request_type
359 );
360 reply_control_op_status(
361 VIRTIO_SND_S_NOT_SUPP,
362 avail_desc,
363 &self.control_queue,
364 )?;
365 }
366 }
367 }
368 Ok(())
369 }
370
process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()>371 fn process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()> {
372 while let Some(evt) = self.vios_client.lock().pop_event() {
373 if let Some(mut desc) = event_queue.pop() {
374 let writer = &mut desc.writer;
375 writer.write_obj(evt).map_err(SoundError::QueueIO)?;
376 let len = writer.bytes_written() as u32;
377 event_queue.add_used(desc, len);
378 event_queue.trigger_interrupt();
379 } else {
380 warn!("virtio-snd: Dropping event because there are no buffers in virtqueue");
381 }
382 }
383 Ok(())
384 }
385
parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)>386 fn parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)> {
387 if read_buf.len() != std::mem::size_of::<virtio_snd_query_info>() {
388 error!(
389 "virtio-snd: The driver sent the wrong number bytes for a pcm_info struct: {}",
390 read_buf.len()
391 );
392 return None;
393 }
394 let mut query: virtio_snd_query_info = Default::default();
395 query.as_mut_bytes().copy_from_slice(read_buf);
396 let start_id = query.start_id.to_native();
397 let count = query.count.to_native();
398 Some((start_id, count))
399 }
400
401 // Returns Err if it encounters an unrecoverable error, Ok otherwise
process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()>402 fn process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()> {
403 if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_set_params>() {
404 error!(
405 "virtio-snd: The driver sent a buffer of the wrong size for a set_params struct: {}",
406 read_buf.len()
407 );
408 return reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue);
409 }
410 let mut params: virtio_snd_pcm_set_params = Default::default();
411 params.as_mut_bytes().copy_from_slice(read_buf);
412 let stream_id = params.hdr.stream_id.to_native();
413 if stream_id < self.vios_client.lock().num_streams() {
414 self.streams[stream_id as usize].send(StreamMsg::SetParams(desc, params))
415 } else {
416 error!(
417 "virtio-snd: Driver requested operation on invalid stream: {}",
418 stream_id
419 );
420 reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue)
421 }
422 }
423
424 // Returns Err if it encounters an unrecoverable error, Ok otherwise
try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()>425 fn try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()> {
426 if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_hdr>() {
427 error!(
428 "virtio-snd: The driver sent a buffer too small to contain a header: {}",
429 read_buf.len()
430 );
431 return reply_control_op_status(
432 VIRTIO_SND_S_BAD_MSG,
433 match msg {
434 StreamMsg::Prepare(d)
435 | StreamMsg::Start(d)
436 | StreamMsg::Stop(d)
437 | StreamMsg::Release(d) => d,
438 _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
439 },
440 &self.control_queue,
441 );
442 }
443 let mut pcm_hdr: virtio_snd_pcm_hdr = Default::default();
444 pcm_hdr.as_mut_bytes().copy_from_slice(read_buf);
445 let stream_id = pcm_hdr.stream_id.to_native();
446 if stream_id < self.vios_client.lock().num_streams() {
447 self.streams[stream_id as usize].send(msg)
448 } else {
449 error!(
450 "virtio-snd: Driver requested operation on invalid stream: {}",
451 stream_id
452 );
453 reply_control_op_status(
454 VIRTIO_SND_S_BAD_MSG,
455 match msg {
456 StreamMsg::Prepare(d)
457 | StreamMsg::Start(d)
458 | StreamMsg::Stop(d)
459 | StreamMsg::Release(d) => d,
460 _ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),
461 },
462 &self.control_queue,
463 )
464 }
465 }
466
send_info_reply<T: Immutable + IntoBytes>( &mut self, mut desc: DescriptorChain, code: u32, info_vec: Vec<T>, ) -> Result<()>467 fn send_info_reply<T: Immutable + IntoBytes>(
468 &mut self,
469 mut desc: DescriptorChain,
470 code: u32,
471 info_vec: Vec<T>,
472 ) -> Result<()> {
473 let writer = &mut desc.writer;
474 writer
475 .write_obj(virtio_snd_hdr {
476 code: Le32::from(code),
477 })
478 .map_err(SoundError::QueueIO)?;
479 for info in info_vec {
480 writer.write_obj(info).map_err(SoundError::QueueIO)?;
481 }
482 let len = writer.bytes_written() as u32;
483 {
484 let mut queue_lock = self.control_queue.lock();
485 queue_lock.add_used(desc, len);
486 queue_lock.trigger_interrupt();
487 }
488 Ok(())
489 }
490 }
491
492 impl Drop for Worker {
drop(&mut self)493 fn drop(&mut self) {
494 self.stop_io_thread();
495 }
496 }
497
io_loop( tx_queue: Arc<Mutex<Queue>>, rx_queue: Arc<Mutex<Queue>>, senders: Vec<Sender<Box<StreamMsg>>>, kill_evt: Event, ) -> Result<()>498 fn io_loop(
499 tx_queue: Arc<Mutex<Queue>>,
500 rx_queue: Arc<Mutex<Queue>>,
501 senders: Vec<Sender<Box<StreamMsg>>>,
502 kill_evt: Event,
503 ) -> Result<()> {
504 #[derive(EventToken)]
505 enum Token {
506 TxQAvailable,
507 RxQAvailable,
508 Kill,
509 }
510 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
511 (tx_queue.lock().event(), Token::TxQAvailable),
512 (rx_queue.lock().event(), Token::RxQAvailable),
513 (&kill_evt, Token::Kill),
514 ])
515 .map_err(SoundError::WaitCtx)?;
516
517 'wait: loop {
518 let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;
519 for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
520 let queue = match wait_evt.token {
521 Token::TxQAvailable => {
522 tx_queue
523 .lock()
524 .event()
525 .wait()
526 .map_err(SoundError::QueueEvt)?;
527 &tx_queue
528 }
529 Token::RxQAvailable => {
530 rx_queue
531 .lock()
532 .event()
533 .wait()
534 .map_err(SoundError::QueueEvt)?;
535 &rx_queue
536 }
537 Token::Kill => {
538 let _ = kill_evt.wait();
539 break 'wait;
540 }
541 };
542 while let Some(mut avail_desc) = lock_pop_unlock(queue) {
543 let reader = &mut avail_desc.reader;
544 let xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(SoundError::QueueIO)?;
545 let stream_id = xfer.stream_id.to_native();
546 if stream_id as usize >= senders.len() {
547 error!(
548 "virtio-snd: Driver sent buffer for invalid stream: {}",
549 stream_id
550 );
551 reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, avail_desc, queue)?;
552 } else {
553 StreamProxy::send_msg(
554 &senders[stream_id as usize],
555 StreamMsg::Buffer(avail_desc),
556 )?;
557 }
558 }
559 }
560 }
561 Ok(())
562 }
563
564 // If queue.lock().pop() is used directly in the condition of a 'while' loop the lock is held over
565 // the entire loop block. Encapsulating it in this fuction guarantees that the lock is dropped
566 // immediately after pop() is called, which allows the code to remain somewhat simpler.
lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain>567 fn lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain> {
568 queue.lock().pop()
569 }
570