1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use crate::virtio::snd::constants::*;
6 use crate::virtio::snd::layout::*;
7
8 use base::{
9 error, AsRawDescriptor, Error as BaseError, Event, FromRawDescriptor, IntoRawDescriptor,
10 MemoryMapping, MemoryMappingBuilder, MmapError, PollToken, SafeDescriptor, ScmSocket,
11 UnixSeqpacket, WaitContext,
12 };
13 use data_model::{DataInit, VolatileMemory, VolatileMemoryError, VolatileSlice};
14
15 use std::collections::{HashMap, VecDeque};
16 use std::fs::File;
17 use std::io::{Error as IOError, ErrorKind as IOErrorKind, IoSliceMut, Seek, SeekFrom};
18 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
19 use std::path::Path;
20 use std::sync::mpsc::{channel, Receiver, RecvError, Sender};
21 use std::sync::Arc;
22 use std::thread::JoinHandle;
23
24 use sync::Mutex;
25
26 use remain::sorted;
27 use thiserror::Error as ThisError;
28
29 pub type Result<T> = std::result::Result<T, Error>;
30
31 #[sorted]
32 #[derive(ThisError, Debug)]
33 pub enum Error {
34 #[error("Error memory mapping client_shm: {0}")]
35 BaseMmapError(BaseError),
36 #[error("Sender was dropped without sending buffer status, the recv thread may have exited")]
37 BufferStatusSenderLost(RecvError),
38 #[error("Command failed with status {0}")]
39 CommandFailed(u32),
40 #[error("Error duplicating file descriptor: {0}")]
41 DupError(BaseError),
42 #[error("Failed to create Recv event: {0}")]
43 EventCreateError(BaseError),
44 #[error("Failed to dup Recv event: {0}")]
45 EventDupError(BaseError),
46 #[error("Failed to signal event: {0}")]
47 EventWriteError(BaseError),
48 #[error("Failed to get size of tx shared memory: {0}")]
49 FileSizeError(IOError),
50 #[error("Error accessing guest's shared memory: {0}")]
51 GuestMmapError(MmapError),
52 #[error("No jack with id {0}")]
53 InvalidJackId(u32),
54 #[error("No stream with id {0}")]
55 InvalidStreamId(u32),
56 #[error("IO buffer operation failed: status = {0}")]
57 IOBufferError(u32),
58 #[error("No PCM streams available")]
59 NoStreamsAvailable,
60 #[error("Insuficient space for the new buffer in the queue's buffer area")]
61 OutOfSpace,
62 #[error("Platform not supported")]
63 PlatformNotSupported,
64 #[error("{0}")]
65 ProtocolError(ProtocolErrorKind),
66 #[error("Failed to connect to VioS server: {0:?}")]
67 ServerConnectionError(IOError),
68 #[error("Failed to communicate with VioS server: {0:?}")]
69 ServerError(BaseError),
70 #[error("Failed to communicate with VioS server: {0:?}")]
71 ServerIOError(IOError),
72 #[error("Error accessing VioS server's shared memory: {0}")]
73 ServerMmapError(MmapError),
74 #[error("Failed to duplicate UnixSeqpacket: {0}")]
75 UnixSeqpacketDupError(IOError),
76 #[error("Unsupported frame rate: {0}")]
77 UnsupportedFrameRate(u32),
78 #[error("Error accessing volatile memory: {0}")]
79 VolatileMemoryError(VolatileMemoryError),
80 #[error("Failed to create Recv thread's WaitContext: {0}")]
81 WaitContextCreateError(BaseError),
82 #[error("Error waiting for events")]
83 WaitError(BaseError),
84 #[error("Invalid operation for stream direction: {0}")]
85 WrongDirection(u8),
86 }
87
88 #[derive(ThisError, Debug)]
89 pub enum ProtocolErrorKind {
90 #[error("The server sent a config of the wrong size: {0}")]
91 UnexpectedConfigSize(usize),
92 #[error("Received {1} file descriptors from the server, expected {0}")]
93 UnexpectedNumberOfFileDescriptors(usize, usize), // expected, received
94 #[error("Server's version ({0}) doesn't match client's")]
95 VersionMismatch(u32),
96 #[error("Received a msg with an unexpected size: expected {0}, received {1}")]
97 UnexpectedMessageSize(usize, usize), // expected, received
98 }
99
100 /// The client for the VioS backend
101 ///
102 /// Uses a protocol equivalent to virtio-snd over a shared memory file and a unix socket for
103 /// notifications. It's thread safe, it can be encapsulated in an Arc smart pointer and shared
104 /// between threads.
105 pub struct VioSClient {
106 // These mutexes should almost never be held simultaneously. If at some point they have to the
107 // locking order should match the order in which they are declared here.
108 config: VioSConfig,
109 jacks: Vec<virtio_snd_jack_info>,
110 streams: Vec<virtio_snd_pcm_info>,
111 chmaps: Vec<virtio_snd_chmap_info>,
112 // The control socket is used from multiple threads to send and wait for a reply, which needs
113 // to happen atomically, hence the need for a mutex instead of just sharing clones of the
114 // socket.
115 control_socket: Mutex<UnixSeqpacket>,
116 event_socket: UnixSeqpacket,
117 // These are thread safe and don't require locking
118 tx: IoBufferQueue,
119 rx: IoBufferQueue,
120 // This is accessed by the recv_thread and whatever thread processes the events
121 events: Arc<Mutex<VecDeque<virtio_snd_event>>>,
122 event_notifier: Event,
123 // These are accessed by the recv_thread and the stream threads
124 tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
125 rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
126 recv_thread_state: Arc<Mutex<ThreadFlags>>,
127 recv_event: Event,
128 recv_thread: Mutex<Option<JoinHandle<Result<()>>>>,
129 }
130
131 impl VioSClient {
132 /// Create a new client given the path to the audio server's socket.
try_new<P: AsRef<Path>>(server: P) -> Result<VioSClient>133 pub fn try_new<P: AsRef<Path>>(server: P) -> Result<VioSClient> {
134 let client_socket = UnixSeqpacket::connect(server).map_err(Error::ServerConnectionError)?;
135 let mut config: VioSConfig = Default::default();
136 let mut fds: Vec<RawFd> = Vec::new();
137 const NUM_FDS: usize = 5;
138 fds.resize(NUM_FDS, 0);
139 let (recv_size, fd_count) = client_socket
140 .recv_with_fds(IoSliceMut::new(config.as_mut_slice()), &mut fds)
141 .map_err(Error::ServerError)?;
142
143 // Resize the vector to the actual number of file descriptors received and wrap them in
144 // SafeDescriptors to prevent leaks
145 fds.resize(fd_count, -1);
146 let mut safe_fds: Vec<SafeDescriptor> = fds
147 .into_iter()
148 .map(|fd| unsafe {
149 // safe because the SafeDescriptor object completely assumes ownership of the fd.
150 SafeDescriptor::from_raw_descriptor(fd)
151 })
152 .collect();
153
154 if recv_size != std::mem::size_of::<VioSConfig>() {
155 return Err(Error::ProtocolError(
156 ProtocolErrorKind::UnexpectedConfigSize(recv_size),
157 ));
158 }
159
160 if config.version != VIOS_VERSION {
161 return Err(Error::ProtocolError(ProtocolErrorKind::VersionMismatch(
162 config.version,
163 )));
164 }
165
166 fn pop<T: FromRawFd>(
167 safe_fds: &mut Vec<SafeDescriptor>,
168 expected: usize,
169 received: usize,
170 ) -> Result<T> {
171 unsafe {
172 // Safe because we transfer ownership from the SafeDescriptor to T
173 Ok(T::from_raw_fd(
174 safe_fds
175 .pop()
176 .ok_or(Error::ProtocolError(
177 ProtocolErrorKind::UnexpectedNumberOfFileDescriptors(
178 expected, received,
179 ),
180 ))?
181 .into_raw_descriptor(),
182 ))
183 }
184 }
185
186 let rx_shm_file = pop::<File>(&mut safe_fds, NUM_FDS, fd_count)?;
187 let tx_shm_file = pop::<File>(&mut safe_fds, NUM_FDS, fd_count)?;
188 let rx_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
189 let tx_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
190 let event_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
191
192 if !safe_fds.is_empty() {
193 return Err(Error::ProtocolError(
194 ProtocolErrorKind::UnexpectedNumberOfFileDescriptors(NUM_FDS, fd_count),
195 ));
196 }
197
198 let tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
199 Arc::new(Mutex::new(HashMap::new()));
200 let rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
201 Arc::new(Mutex::new(HashMap::new()));
202 let recv_thread_state = Arc::new(Mutex::new(ThreadFlags {
203 running: true,
204 reporting_events: false,
205 }));
206 let recv_event = Event::new().map_err(Error::EventCreateError)?;
207
208 let mut client = VioSClient {
209 config,
210 jacks: Vec::new(),
211 streams: Vec::new(),
212 chmaps: Vec::new(),
213 control_socket: Mutex::new(client_socket),
214 event_socket,
215 tx: IoBufferQueue::new(tx_socket, tx_shm_file)?,
216 rx: IoBufferQueue::new(rx_socket, rx_shm_file)?,
217 events: Arc::new(Mutex::new(VecDeque::new())),
218 event_notifier: Event::new().map_err(Error::EventCreateError)?,
219 tx_subscribers,
220 rx_subscribers,
221 recv_thread_state,
222 recv_event,
223 recv_thread: Mutex::new(None),
224 };
225 client.request_and_cache_info()?;
226 Ok(client)
227 }
228
229 /// Get the number of jacks
num_jacks(&self) -> u32230 pub fn num_jacks(&self) -> u32 {
231 self.config.jacks
232 }
233
234 /// Get the number of pcm streams
num_streams(&self) -> u32235 pub fn num_streams(&self) -> u32 {
236 self.config.streams
237 }
238
239 /// Get the number of channel maps
num_chmaps(&self) -> u32240 pub fn num_chmaps(&self) -> u32 {
241 self.config.chmaps
242 }
243
244 /// Get the configuration information on a jack
jack_info(&self, idx: u32) -> Option<virtio_snd_jack_info>245 pub fn jack_info(&self, idx: u32) -> Option<virtio_snd_jack_info> {
246 self.jacks.get(idx as usize).copied()
247 }
248
249 /// Get the configuration information on a pcm stream
stream_info(&self, idx: u32) -> Option<virtio_snd_pcm_info>250 pub fn stream_info(&self, idx: u32) -> Option<virtio_snd_pcm_info> {
251 self.streams.get(idx as usize).cloned()
252 }
253
254 /// Get the configuration information on a channel map
chmap_info(&self, idx: u32) -> Option<virtio_snd_chmap_info>255 pub fn chmap_info(&self, idx: u32) -> Option<virtio_snd_chmap_info> {
256 self.chmaps.get(idx as usize).copied()
257 }
258
259 /// Starts the background thread that receives release messages from the server. If the thread
260 /// was already started this function does nothing.
261 /// This thread must be started prior to attempting any stream IO operation or the calling
262 /// thread would block.
start_bg_thread(&self) -> Result<()>263 pub fn start_bg_thread(&self) -> Result<()> {
264 if self.recv_thread.lock().is_some() {
265 return Ok(());
266 }
267 let recv_event = self.recv_event.try_clone().map_err(Error::EventDupError)?;
268 let tx_socket = self.tx.try_clone_socket()?;
269 let rx_socket = self.rx.try_clone_socket()?;
270 let event_socket = self
271 .event_socket
272 .try_clone()
273 .map_err(Error::UnixSeqpacketDupError)?;
274 let mut opt = self.recv_thread.lock();
275 // The lock on recv_thread was released above to avoid holding more than one lock at a time
276 // while duplicating the fds. So we have to check the condition again.
277 if opt.is_none() {
278 *opt = Some(spawn_recv_thread(
279 self.tx_subscribers.clone(),
280 self.rx_subscribers.clone(),
281 self.event_notifier
282 .try_clone()
283 .map_err(Error::EventDupError)?,
284 self.events.clone(),
285 recv_event,
286 self.recv_thread_state.clone(),
287 tx_socket,
288 rx_socket,
289 event_socket,
290 ));
291 }
292 Ok(())
293 }
294
295 /// Stops the background thread.
stop_bg_thread(&self) -> Result<()>296 pub fn stop_bg_thread(&self) -> Result<()> {
297 if self.recv_thread.lock().is_none() {
298 return Ok(());
299 }
300 self.recv_thread_state.lock().running = false;
301 self.recv_event
302 .write(1u64)
303 .map_err(Error::EventWriteError)?;
304 if let Some(handle) = self.recv_thread.lock().take() {
305 return match handle.join() {
306 Ok(r) => r,
307 Err(e) => {
308 error!("Recv thread panicked: {:?}", e);
309 Ok(())
310 }
311 };
312 }
313 Ok(())
314 }
315
316 /// Gets an Event object that will trigger every time an event is received from the server
get_event_notifier(&self) -> Result<Event>317 pub fn get_event_notifier(&self) -> Result<Event> {
318 // Let the background thread know that there is at least one consumer of events
319 self.recv_thread_state.lock().reporting_events = true;
320 self.event_notifier
321 .try_clone()
322 .map_err(Error::EventDupError)
323 }
324
325 /// Retrieves one event. Callers should have received a notification through the event notifier
326 /// before calling this function.
pop_event(&self) -> Option<virtio_snd_event>327 pub fn pop_event(&self) -> Option<virtio_snd_event> {
328 self.events.lock().pop_front()
329 }
330
331 /// Remap a jack. This should only be called if the jack announces support for the operation
332 /// through the features field in the corresponding virtio_snd_jack_info struct.
remap_jack(&self, jack_id: u32, association: u32, sequence: u32) -> Result<()>333 pub fn remap_jack(&self, jack_id: u32, association: u32, sequence: u32) -> Result<()> {
334 if jack_id >= self.config.jacks {
335 return Err(Error::InvalidJackId(jack_id));
336 }
337 let msg = virtio_snd_jack_remap {
338 hdr: virtio_snd_jack_hdr {
339 hdr: virtio_snd_hdr {
340 code: VIRTIO_SND_R_JACK_REMAP.into(),
341 },
342 jack_id: jack_id.into(),
343 },
344 association: association.into(),
345 sequence: sequence.into(),
346 };
347 let control_socket_lock = self.control_socket.lock();
348 send_cmd(&*control_socket_lock, msg)
349 }
350
351 /// Configures a stream with the given parameters.
set_stream_parameters(&self, stream_id: u32, params: VioSStreamParams) -> Result<()>352 pub fn set_stream_parameters(&self, stream_id: u32, params: VioSStreamParams) -> Result<()> {
353 self.streams
354 .get(stream_id as usize)
355 .ok_or(Error::InvalidStreamId(stream_id))?;
356 let raw_params: virtio_snd_pcm_set_params = (stream_id, params).into();
357 let control_socket_lock = self.control_socket.lock();
358 send_cmd(&*control_socket_lock, raw_params)
359 }
360
361 /// Configures a stream with the given parameters.
set_stream_parameters_raw(&self, raw_params: virtio_snd_pcm_set_params) -> Result<()>362 pub fn set_stream_parameters_raw(&self, raw_params: virtio_snd_pcm_set_params) -> Result<()> {
363 let stream_id = raw_params.hdr.stream_id.to_native();
364 self.streams
365 .get(stream_id as usize)
366 .ok_or(Error::InvalidStreamId(stream_id))?;
367 let control_socket_lock = self.control_socket.lock();
368 send_cmd(&*control_socket_lock, raw_params)
369 }
370
371 /// Send the PREPARE_STREAM command to the server.
prepare_stream(&self, stream_id: u32) -> Result<()>372 pub fn prepare_stream(&self, stream_id: u32) -> Result<()> {
373 self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_PREPARE)
374 }
375
376 /// Send the RELEASE_STREAM command to the server.
release_stream(&self, stream_id: u32) -> Result<()>377 pub fn release_stream(&self, stream_id: u32) -> Result<()> {
378 self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_RELEASE)
379 }
380
381 /// Send the START_STREAM command to the server.
start_stream(&self, stream_id: u32) -> Result<()>382 pub fn start_stream(&self, stream_id: u32) -> Result<()> {
383 self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_START)
384 }
385
386 /// Send the STOP_STREAM command to the server.
stop_stream(&self, stream_id: u32) -> Result<()>387 pub fn stop_stream(&self, stream_id: u32) -> Result<()> {
388 self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_STOP)
389 }
390
391 /// Send audio frames to the server. Blocks the calling thread until the server acknowledges
392 /// the data.
inject_audio_data<R, Cb: FnOnce(VolatileSlice) -> R>( &self, stream_id: u32, size: usize, callback: Cb, ) -> Result<(u32, R)>393 pub fn inject_audio_data<R, Cb: FnOnce(VolatileSlice) -> R>(
394 &self,
395 stream_id: u32,
396 size: usize,
397 callback: Cb,
398 ) -> Result<(u32, R)> {
399 if self
400 .streams
401 .get(stream_id as usize)
402 .ok_or(Error::InvalidStreamId(stream_id))?
403 .direction
404 != VIRTIO_SND_D_OUTPUT
405 {
406 return Err(Error::WrongDirection(VIRTIO_SND_D_OUTPUT));
407 }
408 self.streams
409 .get(stream_id as usize)
410 .ok_or(Error::InvalidStreamId(stream_id))?;
411 let dst_offset = self.tx.allocate_buffer(size)?;
412 let buffer_slice = self.tx.buffer_at(dst_offset, size)?;
413 let ret = callback(buffer_slice);
414 // Register to receive the status before sending the buffer to the server
415 let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) = channel();
416 self.tx_subscribers.lock().insert(dst_offset, sender);
417 self.tx.send_buffer(stream_id, dst_offset, size)?;
418 let (_, latency) = await_status(receiver)?;
419 Ok((latency, ret))
420 }
421
422 /// Request audio frames from the server. It blocks until the data is available.
request_audio_data<R, Cb: FnOnce(&VolatileSlice) -> R>( &self, stream_id: u32, size: usize, callback: Cb, ) -> Result<(u32, R)>423 pub fn request_audio_data<R, Cb: FnOnce(&VolatileSlice) -> R>(
424 &self,
425 stream_id: u32,
426 size: usize,
427 callback: Cb,
428 ) -> Result<(u32, R)> {
429 if self
430 .streams
431 .get(stream_id as usize)
432 .ok_or(Error::InvalidStreamId(stream_id))?
433 .direction
434 != VIRTIO_SND_D_INPUT
435 {
436 return Err(Error::WrongDirection(VIRTIO_SND_D_INPUT));
437 }
438 let src_offset = self.rx.allocate_buffer(size)?;
439 // Register to receive the status before sending the buffer to the server
440 let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) = channel();
441 self.rx_subscribers.lock().insert(src_offset, sender);
442 self.rx.send_buffer(stream_id, src_offset, size)?;
443 // Make sure no mutexes are held while awaiting for the buffer to be written to
444 let (recv_size, latency) = await_status(receiver)?;
445 let buffer_slice = self.rx.buffer_at(src_offset, recv_size)?;
446 Ok((latency, callback(&buffer_slice)))
447 }
448
449 /// Get a list of file descriptors used by the implementation.
keep_fds(&self) -> Vec<RawFd>450 pub fn keep_fds(&self) -> Vec<RawFd> {
451 let control_fd = self.control_socket.lock().as_raw_fd();
452 let event_fd = self.event_socket.as_raw_fd();
453 let recv_event = self.recv_event.as_raw_descriptor();
454 let event_notifier = self.event_notifier.as_raw_descriptor();
455 let mut ret = vec![control_fd, event_fd, recv_event, event_notifier];
456 ret.append(&mut self.tx.keep_fds());
457 ret.append(&mut self.rx.keep_fds());
458 ret
459 }
460
common_stream_op(&self, stream_id: u32, op: u32) -> Result<()>461 fn common_stream_op(&self, stream_id: u32, op: u32) -> Result<()> {
462 self.streams
463 .get(stream_id as usize)
464 .ok_or(Error::InvalidStreamId(stream_id))?;
465 let msg = virtio_snd_pcm_hdr {
466 hdr: virtio_snd_hdr { code: op.into() },
467 stream_id: stream_id.into(),
468 };
469 let control_socket_lock = self.control_socket.lock();
470 send_cmd(&*control_socket_lock, msg)
471 }
472
request_and_cache_info(&mut self) -> Result<()>473 fn request_and_cache_info(&mut self) -> Result<()> {
474 self.request_and_cache_jacks_info()?;
475 self.request_and_cache_streams_info()?;
476 self.request_and_cache_chmaps_info()?;
477 Ok(())
478 }
479
request_info<T: DataInit + Default + Copy + Clone>( &self, req_code: u32, count: usize, ) -> Result<Vec<T>>480 fn request_info<T: DataInit + Default + Copy + Clone>(
481 &self,
482 req_code: u32,
483 count: usize,
484 ) -> Result<Vec<T>> {
485 let info_size = std::mem::size_of::<T>();
486 let status_size = std::mem::size_of::<virtio_snd_hdr>();
487 let req = virtio_snd_query_info {
488 hdr: virtio_snd_hdr {
489 code: req_code.into(),
490 },
491 start_id: 0u32.into(),
492 count: (count as u32).into(),
493 size: (std::mem::size_of::<virtio_snd_query_info>() as u32).into(),
494 };
495 let control_socket_lock = self.control_socket.lock();
496 seq_socket_send(&*control_socket_lock, req)?;
497 let reply = control_socket_lock
498 .recv_as_vec()
499 .map_err(Error::ServerIOError)?;
500 let mut status: virtio_snd_hdr = Default::default();
501 status
502 .as_mut_slice()
503 .copy_from_slice(&reply[0..status_size]);
504 if status.code.to_native() != VIRTIO_SND_S_OK {
505 return Err(Error::CommandFailed(status.code.to_native()));
506 }
507 if reply.len() != status_size + count * info_size {
508 return Err(Error::ProtocolError(
509 ProtocolErrorKind::UnexpectedMessageSize(count * info_size, reply.len()),
510 ));
511 }
512 Ok(reply[status_size..]
513 .chunks(info_size)
514 .map(|info_buffer| {
515 let mut info: T = Default::default();
516 // Need to use copy_from_slice instead of T::from_slice because the info_buffer may
517 // not be aligned correctly
518 info.as_mut_slice().copy_from_slice(info_buffer);
519 info
520 })
521 .collect())
522 }
523
request_and_cache_jacks_info(&mut self) -> Result<()>524 fn request_and_cache_jacks_info(&mut self) -> Result<()> {
525 let num_jacks = self.config.jacks as usize;
526 if num_jacks == 0 {
527 return Ok(());
528 }
529 self.jacks = self.request_info(VIRTIO_SND_R_JACK_INFO, num_jacks)?;
530 Ok(())
531 }
532
request_and_cache_streams_info(&mut self) -> Result<()>533 fn request_and_cache_streams_info(&mut self) -> Result<()> {
534 let num_streams = self.config.streams as usize;
535 if num_streams == 0 {
536 return Ok(());
537 }
538 self.streams = self.request_info(VIRTIO_SND_R_PCM_INFO, num_streams)?;
539 Ok(())
540 }
541
request_and_cache_chmaps_info(&mut self) -> Result<()>542 fn request_and_cache_chmaps_info(&mut self) -> Result<()> {
543 let num_chmaps = self.config.chmaps as usize;
544 if num_chmaps == 0 {
545 return Ok(());
546 }
547 self.chmaps = self.request_info(VIRTIO_SND_R_CHMAP_INFO, num_chmaps)?;
548 Ok(())
549 }
550 }
551
552 impl Drop for VioSClient {
drop(&mut self)553 fn drop(&mut self) {
554 if let Err(e) = self.stop_bg_thread() {
555 error!("Error stopping Recv thread: {}", e);
556 }
557 }
558 }
559
560 #[derive(Clone, Copy)]
561 struct ThreadFlags {
562 running: bool,
563 reporting_events: bool,
564 }
565
566 #[derive(PollToken)]
567 enum Token {
568 Notification,
569 TxBufferMsg,
570 RxBufferMsg,
571 EventMsg,
572 }
573
recv_buffer_status_msg( socket: &UnixSeqpacket, subscribers: &Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>, ) -> Result<()>574 fn recv_buffer_status_msg(
575 socket: &UnixSeqpacket,
576 subscribers: &Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
577 ) -> Result<()> {
578 let mut msg: IoStatusMsg = Default::default();
579 let size = socket
580 .recv(msg.as_mut_slice())
581 .map_err(Error::ServerIOError)?;
582 if size != std::mem::size_of::<IoStatusMsg>() {
583 return Err(Error::ProtocolError(
584 ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<IoStatusMsg>(), size),
585 ));
586 }
587 let mut status = msg.status.status.into();
588 if status == u32::MAX {
589 // Anyone waiting for this would continue to wait for as long as status is
590 // u32::MAX
591 status -= 1;
592 }
593 let latency = msg.status.latency_bytes.into();
594 let offset = msg.buffer_offset as usize;
595 let consumed_len = msg.consumed_len as usize;
596 let promise_opt = subscribers.lock().remove(&offset);
597 match promise_opt {
598 None => error!(
599 "Received an unexpected buffer status message: {}. This is a BUG!!",
600 offset
601 ),
602 Some(sender) => {
603 if let Err(e) = sender.send(BufferReleaseMsg {
604 status,
605 latency,
606 consumed_len,
607 }) {
608 error!("Failed to notify waiting thread: {:?}", e);
609 }
610 }
611 }
612 Ok(())
613 }
614
recv_event(socket: &UnixSeqpacket) -> Result<virtio_snd_event>615 fn recv_event(socket: &UnixSeqpacket) -> Result<virtio_snd_event> {
616 let mut msg: virtio_snd_event = Default::default();
617 let size = socket
618 .recv(msg.as_mut_slice())
619 .map_err(Error::ServerIOError)?;
620 if size != std::mem::size_of::<virtio_snd_event>() {
621 return Err(Error::ProtocolError(
622 ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<virtio_snd_event>(), size),
623 ));
624 }
625 Ok(msg)
626 }
627
spawn_recv_thread( tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>, rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>, event_notifier: Event, event_queue: Arc<Mutex<VecDeque<virtio_snd_event>>>, event: Event, state: Arc<Mutex<ThreadFlags>>, tx_socket: UnixSeqpacket, rx_socket: UnixSeqpacket, event_socket: UnixSeqpacket, ) -> JoinHandle<Result<()>>628 fn spawn_recv_thread(
629 tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
630 rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
631 event_notifier: Event,
632 event_queue: Arc<Mutex<VecDeque<virtio_snd_event>>>,
633 event: Event,
634 state: Arc<Mutex<ThreadFlags>>,
635 tx_socket: UnixSeqpacket,
636 rx_socket: UnixSeqpacket,
637 event_socket: UnixSeqpacket,
638 ) -> JoinHandle<Result<()>> {
639 std::thread::spawn(move || {
640 let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
641 (&tx_socket, Token::TxBufferMsg),
642 (&rx_socket, Token::RxBufferMsg),
643 (&event_socket, Token::EventMsg),
644 (&event, Token::Notification),
645 ])
646 .map_err(Error::WaitContextCreateError)?;
647 loop {
648 let state_cpy = *state.lock();
649 if !state_cpy.running {
650 break;
651 }
652 let events = wait_ctx.wait().map_err(Error::WaitError)?;
653 for evt in events {
654 match evt.token {
655 Token::TxBufferMsg => recv_buffer_status_msg(&tx_socket, &tx_subscribers)?,
656 Token::RxBufferMsg => recv_buffer_status_msg(&rx_socket, &rx_subscribers)?,
657 Token::EventMsg => {
658 let evt = recv_event(&event_socket)?;
659 let state_cpy = *state.lock();
660 if state_cpy.reporting_events {
661 event_queue.lock().push_back(evt);
662 event_notifier.write(1).map_err(Error::EventWriteError)?;
663 } // else just drop the events
664 }
665 Token::Notification => {
666 // Just consume the notification and check for termination on the next
667 // iteration
668 if let Err(e) = event.read() {
669 error!("Failed to consume notification from recv thread: {:?}", e);
670 }
671 }
672 }
673 }
674 }
675 Ok(())
676 })
677 }
678
await_status(promise: Receiver<BufferReleaseMsg>) -> Result<(usize, u32)>679 fn await_status(promise: Receiver<BufferReleaseMsg>) -> Result<(usize, u32)> {
680 let BufferReleaseMsg {
681 status,
682 latency,
683 consumed_len,
684 } = promise.recv().map_err(Error::BufferStatusSenderLost)?;
685 if status == VIRTIO_SND_S_OK {
686 Ok((consumed_len, latency))
687 } else {
688 Err(Error::IOBufferError(status))
689 }
690 }
691
692 struct IoBufferQueue {
693 socket: UnixSeqpacket,
694 file: File,
695 mmap: MemoryMapping,
696 size: usize,
697 next: Mutex<usize>,
698 }
699
700 impl IoBufferQueue {
new(socket: UnixSeqpacket, mut file: File) -> Result<IoBufferQueue>701 fn new(socket: UnixSeqpacket, mut file: File) -> Result<IoBufferQueue> {
702 let size = file.seek(SeekFrom::End(0)).map_err(Error::FileSizeError)? as usize;
703
704 let mmap = MemoryMappingBuilder::new(size)
705 .from_file(&file)
706 .build()
707 .map_err(Error::ServerMmapError)?;
708
709 Ok(IoBufferQueue {
710 socket,
711 file,
712 mmap,
713 size,
714 next: Mutex::new(0),
715 })
716 }
717
allocate_buffer(&self, size: usize) -> Result<usize>718 fn allocate_buffer(&self, size: usize) -> Result<usize> {
719 if size > self.size {
720 return Err(Error::OutOfSpace);
721 }
722 let mut next_lock = self.next.lock();
723 let offset = if size > self.size - *next_lock {
724 // Can't fit the new buffer at the end of the area, so put it at the beginning
725 0
726 } else {
727 *next_lock
728 };
729 *next_lock = offset + size;
730 Ok(offset)
731 }
732
buffer_at(&self, offset: usize, len: usize) -> Result<VolatileSlice>733 fn buffer_at(&self, offset: usize, len: usize) -> Result<VolatileSlice> {
734 self.mmap
735 .get_slice(offset, len)
736 .map_err(Error::VolatileMemoryError)
737 }
738
try_clone_socket(&self) -> Result<UnixSeqpacket>739 fn try_clone_socket(&self) -> Result<UnixSeqpacket> {
740 self.socket
741 .try_clone()
742 .map_err(Error::UnixSeqpacketDupError)
743 }
744
send_buffer(&self, stream_id: u32, offset: usize, size: usize) -> Result<()>745 fn send_buffer(&self, stream_id: u32, offset: usize, size: usize) -> Result<()> {
746 let msg = IoTransferMsg::new(stream_id, offset, size);
747 seq_socket_send(&self.socket, msg)
748 }
749
keep_fds(&self) -> Vec<RawFd>750 fn keep_fds(&self) -> Vec<RawFd> {
751 vec![self.file.as_raw_fd(), self.socket.as_raw_fd()]
752 }
753 }
754
755 /// Groups the parameters used to configure a stream prior to using it.
756 pub struct VioSStreamParams {
757 pub buffer_bytes: u32,
758 pub period_bytes: u32,
759 pub features: u32,
760 pub channels: u8,
761 pub format: u8,
762 pub rate: u8,
763 }
764
765 impl From<(u32, VioSStreamParams)> for virtio_snd_pcm_set_params {
from(val: (u32, VioSStreamParams)) -> Self766 fn from(val: (u32, VioSStreamParams)) -> Self {
767 virtio_snd_pcm_set_params {
768 hdr: virtio_snd_pcm_hdr {
769 hdr: virtio_snd_hdr {
770 code: VIRTIO_SND_R_PCM_SET_PARAMS.into(),
771 },
772 stream_id: val.0.into(),
773 },
774 buffer_bytes: val.1.buffer_bytes.into(),
775 period_bytes: val.1.period_bytes.into(),
776 features: val.1.features.into(),
777 channels: val.1.channels,
778 format: val.1.format,
779 rate: val.1.rate,
780 padding: 0u8,
781 }
782 }
783 }
784
send_cmd<T: DataInit>(control_socket: &UnixSeqpacket, data: T) -> Result<()>785 fn send_cmd<T: DataInit>(control_socket: &UnixSeqpacket, data: T) -> Result<()> {
786 seq_socket_send(control_socket, data)?;
787 recv_cmd_status(control_socket)
788 }
789
recv_cmd_status(control_socket: &UnixSeqpacket) -> Result<()>790 fn recv_cmd_status(control_socket: &UnixSeqpacket) -> Result<()> {
791 let mut status: virtio_snd_hdr = Default::default();
792 control_socket
793 .recv(status.as_mut_slice())
794 .map_err(Error::ServerIOError)?;
795 if status.code.to_native() == VIRTIO_SND_S_OK {
796 Ok(())
797 } else {
798 Err(Error::CommandFailed(status.code.to_native()))
799 }
800 }
801
seq_socket_send<T: DataInit>(socket: &UnixSeqpacket, data: T) -> Result<()>802 fn seq_socket_send<T: DataInit>(socket: &UnixSeqpacket, data: T) -> Result<()> {
803 loop {
804 let send_res = socket.send(data.as_slice());
805 if let Err(e) = send_res {
806 match e.kind() {
807 // Retry if interrupted
808 IOErrorKind::Interrupted => continue,
809 _ => return Err(Error::ServerIOError(e)),
810 }
811 }
812 // Success
813 break;
814 }
815 Ok(())
816 }
817
818 const VIOS_VERSION: u32 = 2;
819
820 #[repr(C)]
821 #[derive(Copy, Clone, Default)]
822 struct VioSConfig {
823 version: u32,
824 jacks: u32,
825 streams: u32,
826 chmaps: u32,
827 }
828 // Safe because it only has data and has no implicit padding.
829 unsafe impl DataInit for VioSConfig {}
830
831 struct BufferReleaseMsg {
832 status: u32,
833 latency: u32,
834 consumed_len: usize,
835 }
836
837 #[repr(C)]
838 #[derive(Copy, Clone)]
839 struct IoTransferMsg {
840 io_xfer: virtio_snd_pcm_xfer,
841 buffer_offset: u32,
842 buffer_len: u32,
843 }
844 // Safe because it only has data and has no implicit padding.
845 unsafe impl DataInit for IoTransferMsg {}
846
847 impl IoTransferMsg {
new(stream_id: u32, buffer_offset: usize, buffer_len: usize) -> IoTransferMsg848 fn new(stream_id: u32, buffer_offset: usize, buffer_len: usize) -> IoTransferMsg {
849 IoTransferMsg {
850 io_xfer: virtio_snd_pcm_xfer {
851 stream_id: stream_id.into(),
852 },
853 buffer_offset: buffer_offset as u32,
854 buffer_len: buffer_len as u32,
855 }
856 }
857 }
858
859 #[repr(C)]
860 #[derive(Copy, Clone, Default)]
861 struct IoStatusMsg {
862 status: virtio_snd_pcm_status,
863 buffer_offset: u32,
864 consumed_len: u32,
865 }
866 // Safe because it only has data and has no implicit padding.
867 unsafe impl DataInit for IoStatusMsg {}
868