1 // Copyright 2021 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::collections::VecDeque;
6 use std::sync::mpsc::channel;
7 use std::sync::mpsc::Receiver;
8 use std::sync::mpsc::Sender;
9 use std::sync::Arc;
10 use std::thread;
11 use std::time::Duration;
12 use std::time::Instant;
13
14 use base::error;
15 use base::set_rt_prio_limit;
16 use base::set_rt_round_robin;
17 use base::warn;
18 use data_model::Le32;
19 use sync::Mutex;
20 use vm_memory::GuestMemory;
21
22 use super::Error as VioSError;
23 use super::Result;
24 use super::SoundError;
25 use super::*;
26 use crate::virtio::snd::common::from_virtio_frame_rate;
27 use crate::virtio::snd::constants::*;
28 use crate::virtio::snd::layout::*;
29 use crate::virtio::DescriptorChain;
30 use crate::virtio::Interrupt;
31 use crate::virtio::Queue;
32 use crate::virtio::Reader;
33 use crate::virtio::Writer;
34
35 /// Messages that the worker can send to the stream (thread).
36 pub enum StreamMsg {
37 SetParams(DescriptorChain, virtio_snd_pcm_set_params),
38 Prepare(DescriptorChain),
39 Start(DescriptorChain),
40 Stop(DescriptorChain),
41 Release(DescriptorChain),
42 Buffer(DescriptorChain),
43 Break,
44 }
45
46 enum StreamState {
47 New,
48 ParamsSet,
49 Prepared,
50 Started,
51 Stopped,
52 Released,
53 }
54
55 pub struct Stream {
56 stream_id: u32,
57 receiver: Receiver<StreamMsg>,
58 vios_client: Arc<VioSClient>,
59 guest_memory: GuestMemory,
60 control_queue: Arc<Mutex<Queue>>,
61 io_queue: Arc<Mutex<Queue>>,
62 interrupt: Interrupt,
63 capture: bool,
64 current_state: StreamState,
65 period: Duration,
66 start_time: Instant,
67 next_buffer: Duration,
68 buffer_queue: VecDeque<DescriptorChain>,
69 }
70
71 impl Stream {
72 /// Start a new stream thread and return its handler.
try_new( stream_id: u32, vios_client: Arc<VioSClient>, guest_memory: GuestMemory, interrupt: Interrupt, control_queue: Arc<Mutex<Queue>>, io_queue: Arc<Mutex<Queue>>, capture: bool, ) -> Result<StreamProxy>73 pub fn try_new(
74 stream_id: u32,
75 vios_client: Arc<VioSClient>,
76 guest_memory: GuestMemory,
77 interrupt: Interrupt,
78 control_queue: Arc<Mutex<Queue>>,
79 io_queue: Arc<Mutex<Queue>>,
80 capture: bool,
81 ) -> Result<StreamProxy> {
82 let (sender, receiver): (Sender<StreamMsg>, Receiver<StreamMsg>) = channel();
83 let thread = thread::Builder::new()
84 .name(format!("v_snd_stream:{stream_id}"))
85 .spawn(move || {
86 try_set_real_time_priority();
87
88 let mut stream = Stream {
89 stream_id,
90 receiver,
91 vios_client,
92 guest_memory,
93 control_queue,
94 io_queue,
95 interrupt,
96 capture,
97 current_state: StreamState::New,
98 period: Duration::from_millis(0),
99 start_time: Instant::now(),
100 next_buffer: Duration::from_millis(0),
101 buffer_queue: VecDeque::new(),
102 };
103
104 if let Err(e) = stream.stream_loop() {
105 error!("virtio-snd: Error in stream {}: {}", stream_id, e);
106 }
107 })
108 .map_err(SoundError::CreateThread)?;
109 Ok(StreamProxy {
110 sender,
111 thread: Some(thread),
112 })
113 }
114
stream_loop(&mut self) -> Result<()>115 fn stream_loop(&mut self) -> Result<()> {
116 loop {
117 if !self.recv_msg()? {
118 break;
119 }
120 self.maybe_process_queued_buffers()?;
121 }
122 Ok(())
123 }
124
recv_msg(&mut self) -> Result<bool>125 fn recv_msg(&mut self) -> Result<bool> {
126 let msg = self.receiver.recv().map_err(SoundError::StreamThreadRecv)?;
127 let (code, desc, next_state) = match msg {
128 StreamMsg::SetParams(desc, params) => {
129 let code = match self.vios_client.set_stream_parameters_raw(params) {
130 Ok(()) => {
131 let frame_rate = from_virtio_frame_rate(params.rate).unwrap_or(0) as u64;
132 self.period = Duration::from_millis(
133 (params.period_bytes.to_native() as u64 * 1000u64)
134 / frame_rate
135 / params.channels as u64
136 / bytes_per_sample(params.format) as u64,
137 );
138 VIRTIO_SND_S_OK
139 }
140 Err(e) => {
141 error!(
142 "virtio-snd: Error setting parameters for stream {}: {}",
143 self.stream_id, e
144 );
145 vios_error_to_status_code(e)
146 }
147 };
148 (code, desc, StreamState::ParamsSet)
149 }
150 StreamMsg::Prepare(desc) => {
151 let code = match self.vios_client.prepare_stream(self.stream_id) {
152 Ok(()) => VIRTIO_SND_S_OK,
153 Err(e) => {
154 error!(
155 "virtio-snd: Failed to prepare stream {}: {}",
156 self.stream_id, e
157 );
158 vios_error_to_status_code(e)
159 }
160 };
161 (code, desc, StreamState::Prepared)
162 }
163 StreamMsg::Start(desc) => {
164 let code = match self.vios_client.start_stream(self.stream_id) {
165 Ok(()) => VIRTIO_SND_S_OK,
166 Err(e) => {
167 error!(
168 "virtio-snd: Failed to start stream {}: {}",
169 self.stream_id, e
170 );
171 vios_error_to_status_code(e)
172 }
173 };
174 self.start_time = Instant::now();
175 self.next_buffer = Duration::from_millis(0);
176 (code, desc, StreamState::Started)
177 }
178 StreamMsg::Stop(desc) => {
179 let code = match self.vios_client.stop_stream(self.stream_id) {
180 Ok(()) => VIRTIO_SND_S_OK,
181 Err(e) => {
182 error!(
183 "virtio-snd: Failed to stop stream {}: {}",
184 self.stream_id, e
185 );
186 vios_error_to_status_code(e)
187 }
188 };
189 (code, desc, StreamState::Stopped)
190 }
191 StreamMsg::Release(desc) => {
192 let code = match self.vios_client.release_stream(self.stream_id) {
193 Ok(()) => VIRTIO_SND_S_OK,
194 Err(e) => {
195 error!(
196 "virtio-snd: Failed to release stream {}: {}",
197 self.stream_id, e
198 );
199 vios_error_to_status_code(e)
200 }
201 };
202 (code, desc, StreamState::Released)
203 }
204 StreamMsg::Buffer(d) => {
205 // Buffers may arrive while in several states:
206 // - Prepared: Buffer should be queued and played when start cmd arrives
207 // - Started: Buffer should be processed immediately
208 // - Stopped: Buffer should be returned to the guest immediately
209 // Because we may need to wait to process the buffer, we always queue it and
210 // decide what to do with queued buffers after every message.
211 self.buffer_queue.push_back(d);
212 // return here to avoid replying on control queue below
213 return Ok(true);
214 }
215 StreamMsg::Break => {
216 return Ok(false);
217 }
218 };
219 reply_control_op_status(
220 code,
221 desc,
222 &self.guest_memory,
223 &self.control_queue,
224 &self.interrupt,
225 )?;
226 self.current_state = next_state;
227 Ok(true)
228 }
229
maybe_process_queued_buffers(&mut self) -> Result<()>230 fn maybe_process_queued_buffers(&mut self) -> Result<()> {
231 match self.current_state {
232 StreamState::Started => {
233 while let Some(desc) = self.buffer_queue.pop_front() {
234 let mut reader = Reader::new(self.guest_memory.clone(), desc.clone())
235 .map_err(SoundError::CreateReader)?;
236 // Ignore the first buffer, it was already read by the time this thread
237 // receives the descriptor
238 reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
239 let desc_index = desc.index;
240 let mut writer = Writer::new(self.guest_memory.clone(), desc)
241 .map_err(SoundError::CreateWriter)?;
242 let io_res = if self.capture {
243 let buffer_size =
244 writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>();
245 self.vios_client
246 .request_audio_data(self.stream_id, buffer_size, |vslice| {
247 writer.write_from_volatile_slice(*vslice)
248 })
249 } else {
250 self.vios_client.inject_audio_data(
251 self.stream_id,
252 reader.available_bytes(),
253 |vslice| reader.read_to_volatile_slice(vslice),
254 )
255 };
256 let (code, latency) = match io_res {
257 Ok((latency, _)) => (VIRTIO_SND_S_OK, latency),
258 Err(e) => {
259 error!(
260 "virtio-snd: Failed IO operation in stream {}: {}",
261 self.stream_id, e
262 );
263 (VIRTIO_SND_S_IO_ERR, 0)
264 }
265 };
266 if let Err(e) = writer.write_obj(virtio_snd_pcm_status {
267 status: Le32::from(code),
268 latency_bytes: Le32::from(latency),
269 }) {
270 error!(
271 "virtio-snd: Failed to write pcm status from stream {} thread: {}",
272 self.stream_id, e
273 );
274 }
275
276 self.next_buffer += self.period;
277 let elapsed = self.start_time.elapsed();
278 if elapsed < self.next_buffer {
279 // Completing an IO request can be considered an elapsed period
280 // notification by the driver, so we must wait the right amount of time to
281 // release the buffer if the sound server client returned too soon.
282 std::thread::sleep(self.next_buffer - elapsed);
283 }
284 {
285 let mut io_queue_lock = self.io_queue.lock();
286 io_queue_lock.add_used(
287 &self.guest_memory,
288 desc_index,
289 writer.bytes_written() as u32,
290 );
291 io_queue_lock.trigger_interrupt(&self.guest_memory, &self.interrupt);
292 }
293 }
294 }
295 StreamState::Stopped | StreamState::Released => {
296 // For some reason playback buffers can arrive after stop and release (maybe because
297 // buffer-ready notifications arrive over eventfds and those are processed in
298 // random order?). The spec requires the device to not confirm the release of a
299 // stream until all IO buffers have been released, but that's impossible to
300 // guarantee if a buffer arrives after release is requested. Luckily it seems to
301 // work fine if the buffer is released after the release command is completed.
302 while let Some(desc) = self.buffer_queue.pop_front() {
303 reply_pcm_buffer_status(
304 VIRTIO_SND_S_OK,
305 0,
306 desc,
307 &self.guest_memory,
308 &self.io_queue,
309 &self.interrupt,
310 )?;
311 }
312 }
313 StreamState::Prepared => {} // Do nothing, any buffers will be processed after start
314 _ => {
315 if self.buffer_queue.len() > 0 {
316 warn!("virtio-snd: Buffers received while in unexpected state");
317 }
318 }
319 }
320 Ok(())
321 }
322 }
323
324 impl Drop for Stream {
drop(&mut self)325 fn drop(&mut self) {
326 // Try to stop and release the stream in case it was playing, these operations will fail if the
327 // stream is already released, just ignore that failure
328 let _ = self.vios_client.stop_stream(self.stream_id);
329 let _ = self.vios_client.release_stream(self.stream_id);
330
331 // Also release any pending buffer
332 while let Some(desc) = self.buffer_queue.pop_front() {
333 if let Err(e) = reply_pcm_buffer_status(
334 VIRTIO_SND_S_IO_ERR,
335 0,
336 desc,
337 &self.guest_memory,
338 &self.io_queue,
339 &self.interrupt,
340 ) {
341 error!(
342 "virtio-snd: Failed to reply buffer on stream {}: {}",
343 self.stream_id, e
344 );
345 }
346 }
347 }
348 }
349
350 /// Basically a proxy to the thread handling a particular stream.
351 pub struct StreamProxy {
352 sender: Sender<StreamMsg>,
353 thread: Option<thread::JoinHandle<()>>,
354 }
355
356 impl StreamProxy {
357 /// Access the underlying sender to clone it or send messages
msg_sender(&self) -> &Sender<StreamMsg>358 pub fn msg_sender(&self) -> &Sender<StreamMsg> {
359 &self.sender
360 }
361
362 /// Send a message to the stream thread on the other side of this sender
send_msg(sender: &Sender<StreamMsg>, msg: StreamMsg) -> Result<()>363 pub fn send_msg(sender: &Sender<StreamMsg>, msg: StreamMsg) -> Result<()> {
364 sender.send(msg).map_err(SoundError::StreamThreadSend)
365 }
366
367 /// Convenience function to send a message to this stream's thread
send(&self, msg: StreamMsg) -> Result<()>368 pub fn send(&self, msg: StreamMsg) -> Result<()> {
369 Self::send_msg(&self.sender, msg)
370 }
371
stop_thread(&mut self)372 fn stop_thread(&mut self) {
373 if let Err(e) = self.send(StreamMsg::Break) {
374 error!(
375 "virtio-snd: Failed to send Break msg to stream thread: {}",
376 e
377 );
378 }
379 if let Some(th) = self.thread.take() {
380 if let Err(e) = th.join() {
381 error!("virtio-snd: Panic detected on stream thread: {:?}", e);
382 }
383 }
384 }
385 }
386
387 impl Drop for StreamProxy {
drop(&mut self)388 fn drop(&mut self) {
389 self.stop_thread();
390 }
391 }
392
393 /// Attempts to set the current thread's priority to a value hight enough to handle audio IO. This
394 /// may fail due to insuficient permissions.
try_set_real_time_priority()395 pub fn try_set_real_time_priority() {
396 const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
397 if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
398 .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
399 {
400 warn!("Failed to set audio stream thread to real time: {}", e);
401 }
402 }
403
404 /// Gets the appropriate virtio-snd error to return to the driver from a VioSError.
vios_error_to_status_code(e: VioSError) -> u32405 pub fn vios_error_to_status_code(e: VioSError) -> u32 {
406 match e {
407 VioSError::ServerIOError(_) => VIRTIO_SND_S_IO_ERR,
408 _ => VIRTIO_SND_S_NOT_SUPP,
409 }
410 }
411
412 /// Encapsulates sending the virtio_snd_hdr struct back to the driver.
reply_control_op_status( code: u32, desc: DescriptorChain, guest_memory: &GuestMemory, queue: &Arc<Mutex<Queue>>, interrupt: &Interrupt, ) -> Result<()>413 pub fn reply_control_op_status(
414 code: u32,
415 desc: DescriptorChain,
416 guest_memory: &GuestMemory,
417 queue: &Arc<Mutex<Queue>>,
418 interrupt: &Interrupt,
419 ) -> Result<()> {
420 let desc_index = desc.index;
421 let mut writer = Writer::new(guest_memory.clone(), desc).map_err(SoundError::Descriptor)?;
422 writer
423 .write_obj(virtio_snd_hdr {
424 code: Le32::from(code),
425 })
426 .map_err(SoundError::QueueIO)?;
427 {
428 let mut queue_lock = queue.lock();
429 queue_lock.add_used(guest_memory, desc_index, writer.bytes_written() as u32);
430 queue_lock.trigger_interrupt(guest_memory, interrupt);
431 }
432 Ok(())
433 }
434
435 /// Encapsulates sending the virtio_snd_pcm_status struct back to the driver.
reply_pcm_buffer_status( status: u32, latency_bytes: u32, desc: DescriptorChain, guest_memory: &GuestMemory, queue: &Arc<Mutex<Queue>>, interrupt: &Interrupt, ) -> Result<()>436 pub fn reply_pcm_buffer_status(
437 status: u32,
438 latency_bytes: u32,
439 desc: DescriptorChain,
440 guest_memory: &GuestMemory,
441 queue: &Arc<Mutex<Queue>>,
442 interrupt: &Interrupt,
443 ) -> Result<()> {
444 let desc_index = desc.index;
445 let mut writer = Writer::new(guest_memory.clone(), desc).map_err(SoundError::Descriptor)?;
446 if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
447 writer
448 .consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
449 }
450 writer
451 .write_obj(virtio_snd_pcm_status {
452 status: Le32::from(status),
453 latency_bytes: Le32::from(latency_bytes),
454 })
455 .map_err(SoundError::QueueIO)?;
456 {
457 let mut queue_lock = queue.lock();
458 queue_lock.add_used(guest_memory, desc_index, writer.bytes_written() as u32);
459 queue_lock.trigger_interrupt(guest_memory, interrupt);
460 }
461 Ok(())
462 }
463
bytes_per_sample(format: u8) -> usize464 fn bytes_per_sample(format: u8) -> usize {
465 match format {
466 VIRTIO_SND_PCM_FMT_IMA_ADPCM => 1usize,
467 VIRTIO_SND_PCM_FMT_MU_LAW => 1usize,
468 VIRTIO_SND_PCM_FMT_A_LAW => 1usize,
469 VIRTIO_SND_PCM_FMT_S8 => 1usize,
470 VIRTIO_SND_PCM_FMT_U8 => 1usize,
471 VIRTIO_SND_PCM_FMT_S16 => 2usize,
472 VIRTIO_SND_PCM_FMT_U16 => 2usize,
473 VIRTIO_SND_PCM_FMT_S32 => 4usize,
474 VIRTIO_SND_PCM_FMT_U32 => 4usize,
475 VIRTIO_SND_PCM_FMT_FLOAT => 4usize,
476 VIRTIO_SND_PCM_FMT_FLOAT64 => 8usize,
477 // VIRTIO_SND_PCM_FMT_DSD_U8
478 // VIRTIO_SND_PCM_FMT_DSD_U16
479 // VIRTIO_SND_PCM_FMT_DSD_U32
480 // VIRTIO_SND_PCM_FMT_IEC958_SUBFRAME
481 // VIRTIO_SND_PCM_FMT_S18_3
482 // VIRTIO_SND_PCM_FMT_U18_3
483 // VIRTIO_SND_PCM_FMT_S20_3
484 // VIRTIO_SND_PCM_FMT_U20_3
485 // VIRTIO_SND_PCM_FMT_S24_3
486 // VIRTIO_SND_PCM_FMT_U24_3
487 // VIRTIO_SND_PCM_FMT_S20
488 // VIRTIO_SND_PCM_FMT_U20
489 // VIRTIO_SND_PCM_FMT_S24
490 // VIRTIO_SND_PCM_FMT_U24
491 _ => {
492 // Some of these formats are not consistently stored in a particular size (24bits is
493 // sometimes stored in a 32bit word) while others are of variable size.
494 // The size per sample estimated here is designed to greatly underestimate the time it
495 // takes to play a buffer and depend instead on timings provided by the sound server if
496 // it supports these formats.
497 warn!(
498 "Unknown sample size for format {}, depending on sound server timing instead.",
499 format
500 );
501 1000usize
502 }
503 }
504 }
505