• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use crate::virtio::snd::constants::*;
6 use crate::virtio::snd::layout::*;
7 
8 use base::{
9     error, AsRawDescriptor, Error as BaseError, Event, FromRawDescriptor, IntoRawDescriptor,
10     MemoryMapping, MemoryMappingBuilder, MmapError, PollToken, SafeDescriptor, ScmSocket,
11     UnixSeqpacket, WaitContext,
12 };
13 use data_model::{DataInit, VolatileMemory, VolatileMemoryError, VolatileSlice};
14 
15 use std::collections::{HashMap, VecDeque};
16 use std::fs::File;
17 use std::io::{Error as IOError, ErrorKind as IOErrorKind, IoSliceMut, Seek, SeekFrom};
18 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
19 use std::path::Path;
20 use std::sync::mpsc::{channel, Receiver, RecvError, Sender};
21 use std::sync::Arc;
22 use std::thread::JoinHandle;
23 
24 use sync::Mutex;
25 
26 use remain::sorted;
27 use thiserror::Error as ThisError;
28 
29 pub type Result<T> = std::result::Result<T, Error>;
30 
31 #[sorted]
32 #[derive(ThisError, Debug)]
33 pub enum Error {
34     #[error("Error memory mapping client_shm: {0}")]
35     BaseMmapError(BaseError),
36     #[error("Sender was dropped without sending buffer status, the recv thread may have exited")]
37     BufferStatusSenderLost(RecvError),
38     #[error("Command failed with status {0}")]
39     CommandFailed(u32),
40     #[error("Error duplicating file descriptor: {0}")]
41     DupError(BaseError),
42     #[error("Failed to create Recv event: {0}")]
43     EventCreateError(BaseError),
44     #[error("Failed to dup Recv event: {0}")]
45     EventDupError(BaseError),
46     #[error("Failed to signal event: {0}")]
47     EventWriteError(BaseError),
48     #[error("Failed to get size of tx shared memory: {0}")]
49     FileSizeError(IOError),
50     #[error("Error accessing guest's shared memory: {0}")]
51     GuestMmapError(MmapError),
52     #[error("No jack with id {0}")]
53     InvalidJackId(u32),
54     #[error("No stream with id {0}")]
55     InvalidStreamId(u32),
56     #[error("IO buffer operation failed: status = {0}")]
57     IOBufferError(u32),
58     #[error("No PCM streams available")]
59     NoStreamsAvailable,
60     #[error("Insuficient space for the new buffer in the queue's buffer area")]
61     OutOfSpace,
62     #[error("Platform not supported")]
63     PlatformNotSupported,
64     #[error("{0}")]
65     ProtocolError(ProtocolErrorKind),
66     #[error("Failed to connect to VioS server: {0:?}")]
67     ServerConnectionError(IOError),
68     #[error("Failed to communicate with VioS server: {0:?}")]
69     ServerError(BaseError),
70     #[error("Failed to communicate with VioS server: {0:?}")]
71     ServerIOError(IOError),
72     #[error("Error accessing VioS server's shared memory: {0}")]
73     ServerMmapError(MmapError),
74     #[error("Failed to duplicate UnixSeqpacket: {0}")]
75     UnixSeqpacketDupError(IOError),
76     #[error("Unsupported frame rate: {0}")]
77     UnsupportedFrameRate(u32),
78     #[error("Error accessing volatile memory: {0}")]
79     VolatileMemoryError(VolatileMemoryError),
80     #[error("Failed to create Recv thread's WaitContext: {0}")]
81     WaitContextCreateError(BaseError),
82     #[error("Error waiting for events")]
83     WaitError(BaseError),
84     #[error("Invalid operation for stream direction: {0}")]
85     WrongDirection(u8),
86 }
87 
88 #[derive(ThisError, Debug)]
89 pub enum ProtocolErrorKind {
90     #[error("The server sent a config of the wrong size: {0}")]
91     UnexpectedConfigSize(usize),
92     #[error("Received {1} file descriptors from the server, expected {0}")]
93     UnexpectedNumberOfFileDescriptors(usize, usize), // expected, received
94     #[error("Server's version ({0}) doesn't match client's")]
95     VersionMismatch(u32),
96     #[error("Received a msg with an unexpected size: expected {0}, received {1}")]
97     UnexpectedMessageSize(usize, usize), // expected, received
98 }
99 
100 /// The client for the VioS backend
101 ///
102 /// Uses a protocol equivalent to virtio-snd over a shared memory file and a unix socket for
103 /// notifications. It's thread safe, it can be encapsulated in an Arc smart pointer and shared
104 /// between threads.
105 pub struct VioSClient {
106     // These mutexes should almost never be held simultaneously. If at some point they have to the
107     // locking order should match the order in which they are declared here.
108     config: VioSConfig,
109     jacks: Vec<virtio_snd_jack_info>,
110     streams: Vec<virtio_snd_pcm_info>,
111     chmaps: Vec<virtio_snd_chmap_info>,
112     // The control socket is used from multiple threads to send and wait for a reply, which needs
113     // to happen atomically, hence the need for a mutex instead of just sharing clones of the
114     // socket.
115     control_socket: Mutex<UnixSeqpacket>,
116     event_socket: UnixSeqpacket,
117     // These are thread safe and don't require locking
118     tx: IoBufferQueue,
119     rx: IoBufferQueue,
120     // This is accessed by the recv_thread and whatever thread processes the events
121     events: Arc<Mutex<VecDeque<virtio_snd_event>>>,
122     event_notifier: Event,
123     // These are accessed by the recv_thread and the stream threads
124     tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
125     rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
126     recv_thread_state: Arc<Mutex<ThreadFlags>>,
127     recv_event: Event,
128     recv_thread: Mutex<Option<JoinHandle<Result<()>>>>,
129 }
130 
131 impl VioSClient {
132     /// Create a new client given the path to the audio server's socket.
try_new<P: AsRef<Path>>(server: P) -> Result<VioSClient>133     pub fn try_new<P: AsRef<Path>>(server: P) -> Result<VioSClient> {
134         let client_socket = UnixSeqpacket::connect(server).map_err(Error::ServerConnectionError)?;
135         let mut config: VioSConfig = Default::default();
136         let mut fds: Vec<RawFd> = Vec::new();
137         const NUM_FDS: usize = 5;
138         fds.resize(NUM_FDS, 0);
139         let (recv_size, fd_count) = client_socket
140             .recv_with_fds(IoSliceMut::new(config.as_mut_slice()), &mut fds)
141             .map_err(Error::ServerError)?;
142 
143         // Resize the vector to the actual number of file descriptors received and wrap them in
144         // SafeDescriptors to prevent leaks
145         fds.resize(fd_count, -1);
146         let mut safe_fds: Vec<SafeDescriptor> = fds
147             .into_iter()
148             .map(|fd| unsafe {
149                 // safe because the SafeDescriptor object completely assumes ownership of the fd.
150                 SafeDescriptor::from_raw_descriptor(fd)
151             })
152             .collect();
153 
154         if recv_size != std::mem::size_of::<VioSConfig>() {
155             return Err(Error::ProtocolError(
156                 ProtocolErrorKind::UnexpectedConfigSize(recv_size),
157             ));
158         }
159 
160         if config.version != VIOS_VERSION {
161             return Err(Error::ProtocolError(ProtocolErrorKind::VersionMismatch(
162                 config.version,
163             )));
164         }
165 
166         fn pop<T: FromRawFd>(
167             safe_fds: &mut Vec<SafeDescriptor>,
168             expected: usize,
169             received: usize,
170         ) -> Result<T> {
171             unsafe {
172                 // Safe because we transfer ownership from the SafeDescriptor to T
173                 Ok(T::from_raw_fd(
174                     safe_fds
175                         .pop()
176                         .ok_or(Error::ProtocolError(
177                             ProtocolErrorKind::UnexpectedNumberOfFileDescriptors(
178                                 expected, received,
179                             ),
180                         ))?
181                         .into_raw_descriptor(),
182                 ))
183             }
184         }
185 
186         let rx_shm_file = pop::<File>(&mut safe_fds, NUM_FDS, fd_count)?;
187         let tx_shm_file = pop::<File>(&mut safe_fds, NUM_FDS, fd_count)?;
188         let rx_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
189         let tx_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
190         let event_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;
191 
192         if !safe_fds.is_empty() {
193             return Err(Error::ProtocolError(
194                 ProtocolErrorKind::UnexpectedNumberOfFileDescriptors(NUM_FDS, fd_count),
195             ));
196         }
197 
198         let tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
199             Arc::new(Mutex::new(HashMap::new()));
200         let rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =
201             Arc::new(Mutex::new(HashMap::new()));
202         let recv_thread_state = Arc::new(Mutex::new(ThreadFlags {
203             running: true,
204             reporting_events: false,
205         }));
206         let recv_event = Event::new().map_err(Error::EventCreateError)?;
207 
208         let mut client = VioSClient {
209             config,
210             jacks: Vec::new(),
211             streams: Vec::new(),
212             chmaps: Vec::new(),
213             control_socket: Mutex::new(client_socket),
214             event_socket,
215             tx: IoBufferQueue::new(tx_socket, tx_shm_file)?,
216             rx: IoBufferQueue::new(rx_socket, rx_shm_file)?,
217             events: Arc::new(Mutex::new(VecDeque::new())),
218             event_notifier: Event::new().map_err(Error::EventCreateError)?,
219             tx_subscribers,
220             rx_subscribers,
221             recv_thread_state,
222             recv_event,
223             recv_thread: Mutex::new(None),
224         };
225         client.request_and_cache_info()?;
226         Ok(client)
227     }
228 
229     /// Get the number of jacks
num_jacks(&self) -> u32230     pub fn num_jacks(&self) -> u32 {
231         self.config.jacks
232     }
233 
234     /// Get the number of pcm streams
num_streams(&self) -> u32235     pub fn num_streams(&self) -> u32 {
236         self.config.streams
237     }
238 
239     /// Get the number of channel maps
num_chmaps(&self) -> u32240     pub fn num_chmaps(&self) -> u32 {
241         self.config.chmaps
242     }
243 
244     /// Get the configuration information on a jack
jack_info(&self, idx: u32) -> Option<virtio_snd_jack_info>245     pub fn jack_info(&self, idx: u32) -> Option<virtio_snd_jack_info> {
246         self.jacks.get(idx as usize).copied()
247     }
248 
249     /// Get the configuration information on a pcm stream
stream_info(&self, idx: u32) -> Option<virtio_snd_pcm_info>250     pub fn stream_info(&self, idx: u32) -> Option<virtio_snd_pcm_info> {
251         self.streams.get(idx as usize).cloned()
252     }
253 
254     /// Get the configuration information on a channel map
chmap_info(&self, idx: u32) -> Option<virtio_snd_chmap_info>255     pub fn chmap_info(&self, idx: u32) -> Option<virtio_snd_chmap_info> {
256         self.chmaps.get(idx as usize).copied()
257     }
258 
259     /// Starts the background thread that receives release messages from the server. If the thread
260     /// was already started this function does nothing.
261     /// This thread must be started prior to attempting any stream IO operation or the calling
262     /// thread would block.
start_bg_thread(&self) -> Result<()>263     pub fn start_bg_thread(&self) -> Result<()> {
264         if self.recv_thread.lock().is_some() {
265             return Ok(());
266         }
267         let recv_event = self.recv_event.try_clone().map_err(Error::EventDupError)?;
268         let tx_socket = self.tx.try_clone_socket()?;
269         let rx_socket = self.rx.try_clone_socket()?;
270         let event_socket = self
271             .event_socket
272             .try_clone()
273             .map_err(Error::UnixSeqpacketDupError)?;
274         let mut opt = self.recv_thread.lock();
275         // The lock on recv_thread was released above to avoid holding more than one lock at a time
276         // while duplicating the fds. So we have to check the condition again.
277         if opt.is_none() {
278             *opt = Some(spawn_recv_thread(
279                 self.tx_subscribers.clone(),
280                 self.rx_subscribers.clone(),
281                 self.event_notifier
282                     .try_clone()
283                     .map_err(Error::EventDupError)?,
284                 self.events.clone(),
285                 recv_event,
286                 self.recv_thread_state.clone(),
287                 tx_socket,
288                 rx_socket,
289                 event_socket,
290             ));
291         }
292         Ok(())
293     }
294 
295     /// Stops the background thread.
stop_bg_thread(&self) -> Result<()>296     pub fn stop_bg_thread(&self) -> Result<()> {
297         if self.recv_thread.lock().is_none() {
298             return Ok(());
299         }
300         self.recv_thread_state.lock().running = false;
301         self.recv_event
302             .write(1u64)
303             .map_err(Error::EventWriteError)?;
304         if let Some(handle) = self.recv_thread.lock().take() {
305             return match handle.join() {
306                 Ok(r) => r,
307                 Err(e) => {
308                     error!("Recv thread panicked: {:?}", e);
309                     Ok(())
310                 }
311             };
312         }
313         Ok(())
314     }
315 
316     /// Gets an Event object that will trigger every time an event is received from the server
get_event_notifier(&self) -> Result<Event>317     pub fn get_event_notifier(&self) -> Result<Event> {
318         // Let the background thread know that there is at least one consumer of events
319         self.recv_thread_state.lock().reporting_events = true;
320         self.event_notifier
321             .try_clone()
322             .map_err(Error::EventDupError)
323     }
324 
325     /// Retrieves one event. Callers should have received a notification through the event notifier
326     /// before calling this function.
pop_event(&self) -> Option<virtio_snd_event>327     pub fn pop_event(&self) -> Option<virtio_snd_event> {
328         self.events.lock().pop_front()
329     }
330 
331     /// Remap a jack. This should only be called if the jack announces support for the operation
332     /// through the features field in the corresponding virtio_snd_jack_info struct.
remap_jack(&self, jack_id: u32, association: u32, sequence: u32) -> Result<()>333     pub fn remap_jack(&self, jack_id: u32, association: u32, sequence: u32) -> Result<()> {
334         if jack_id >= self.config.jacks {
335             return Err(Error::InvalidJackId(jack_id));
336         }
337         let msg = virtio_snd_jack_remap {
338             hdr: virtio_snd_jack_hdr {
339                 hdr: virtio_snd_hdr {
340                     code: VIRTIO_SND_R_JACK_REMAP.into(),
341                 },
342                 jack_id: jack_id.into(),
343             },
344             association: association.into(),
345             sequence: sequence.into(),
346         };
347         let control_socket_lock = self.control_socket.lock();
348         send_cmd(&*control_socket_lock, msg)
349     }
350 
351     /// Configures a stream with the given parameters.
set_stream_parameters(&self, stream_id: u32, params: VioSStreamParams) -> Result<()>352     pub fn set_stream_parameters(&self, stream_id: u32, params: VioSStreamParams) -> Result<()> {
353         self.streams
354             .get(stream_id as usize)
355             .ok_or(Error::InvalidStreamId(stream_id))?;
356         let raw_params: virtio_snd_pcm_set_params = (stream_id, params).into();
357         let control_socket_lock = self.control_socket.lock();
358         send_cmd(&*control_socket_lock, raw_params)
359     }
360 
361     /// Configures a stream with the given parameters.
set_stream_parameters_raw(&self, raw_params: virtio_snd_pcm_set_params) -> Result<()>362     pub fn set_stream_parameters_raw(&self, raw_params: virtio_snd_pcm_set_params) -> Result<()> {
363         let stream_id = raw_params.hdr.stream_id.to_native();
364         self.streams
365             .get(stream_id as usize)
366             .ok_or(Error::InvalidStreamId(stream_id))?;
367         let control_socket_lock = self.control_socket.lock();
368         send_cmd(&*control_socket_lock, raw_params)
369     }
370 
371     /// Send the PREPARE_STREAM command to the server.
prepare_stream(&self, stream_id: u32) -> Result<()>372     pub fn prepare_stream(&self, stream_id: u32) -> Result<()> {
373         self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_PREPARE)
374     }
375 
376     /// Send the RELEASE_STREAM command to the server.
release_stream(&self, stream_id: u32) -> Result<()>377     pub fn release_stream(&self, stream_id: u32) -> Result<()> {
378         self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_RELEASE)
379     }
380 
381     /// Send the START_STREAM command to the server.
start_stream(&self, stream_id: u32) -> Result<()>382     pub fn start_stream(&self, stream_id: u32) -> Result<()> {
383         self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_START)
384     }
385 
386     /// Send the STOP_STREAM command to the server.
stop_stream(&self, stream_id: u32) -> Result<()>387     pub fn stop_stream(&self, stream_id: u32) -> Result<()> {
388         self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_STOP)
389     }
390 
391     /// Send audio frames to the server. Blocks the calling thread until the server acknowledges
392     /// the data.
inject_audio_data<R, Cb: FnOnce(VolatileSlice) -> R>( &self, stream_id: u32, size: usize, callback: Cb, ) -> Result<(u32, R)>393     pub fn inject_audio_data<R, Cb: FnOnce(VolatileSlice) -> R>(
394         &self,
395         stream_id: u32,
396         size: usize,
397         callback: Cb,
398     ) -> Result<(u32, R)> {
399         if self
400             .streams
401             .get(stream_id as usize)
402             .ok_or(Error::InvalidStreamId(stream_id))?
403             .direction
404             != VIRTIO_SND_D_OUTPUT
405         {
406             return Err(Error::WrongDirection(VIRTIO_SND_D_OUTPUT));
407         }
408         self.streams
409             .get(stream_id as usize)
410             .ok_or(Error::InvalidStreamId(stream_id))?;
411         let dst_offset = self.tx.allocate_buffer(size)?;
412         let buffer_slice = self.tx.buffer_at(dst_offset, size)?;
413         let ret = callback(buffer_slice);
414         // Register to receive the status before sending the buffer to the server
415         let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) = channel();
416         self.tx_subscribers.lock().insert(dst_offset, sender);
417         self.tx.send_buffer(stream_id, dst_offset, size)?;
418         let (_, latency) = await_status(receiver)?;
419         Ok((latency, ret))
420     }
421 
422     /// Request audio frames from the server. It blocks until the data is available.
request_audio_data<R, Cb: FnOnce(&VolatileSlice) -> R>( &self, stream_id: u32, size: usize, callback: Cb, ) -> Result<(u32, R)>423     pub fn request_audio_data<R, Cb: FnOnce(&VolatileSlice) -> R>(
424         &self,
425         stream_id: u32,
426         size: usize,
427         callback: Cb,
428     ) -> Result<(u32, R)> {
429         if self
430             .streams
431             .get(stream_id as usize)
432             .ok_or(Error::InvalidStreamId(stream_id))?
433             .direction
434             != VIRTIO_SND_D_INPUT
435         {
436             return Err(Error::WrongDirection(VIRTIO_SND_D_INPUT));
437         }
438         let src_offset = self.rx.allocate_buffer(size)?;
439         // Register to receive the status before sending the buffer to the server
440         let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) = channel();
441         self.rx_subscribers.lock().insert(src_offset, sender);
442         self.rx.send_buffer(stream_id, src_offset, size)?;
443         // Make sure no mutexes are held while awaiting for the buffer to be written to
444         let (recv_size, latency) = await_status(receiver)?;
445         let buffer_slice = self.rx.buffer_at(src_offset, recv_size)?;
446         Ok((latency, callback(&buffer_slice)))
447     }
448 
449     /// Get a list of file descriptors used by the implementation.
keep_fds(&self) -> Vec<RawFd>450     pub fn keep_fds(&self) -> Vec<RawFd> {
451         let control_fd = self.control_socket.lock().as_raw_fd();
452         let event_fd = self.event_socket.as_raw_fd();
453         let recv_event = self.recv_event.as_raw_descriptor();
454         let event_notifier = self.event_notifier.as_raw_descriptor();
455         let mut ret = vec![control_fd, event_fd, recv_event, event_notifier];
456         ret.append(&mut self.tx.keep_fds());
457         ret.append(&mut self.rx.keep_fds());
458         ret
459     }
460 
common_stream_op(&self, stream_id: u32, op: u32) -> Result<()>461     fn common_stream_op(&self, stream_id: u32, op: u32) -> Result<()> {
462         self.streams
463             .get(stream_id as usize)
464             .ok_or(Error::InvalidStreamId(stream_id))?;
465         let msg = virtio_snd_pcm_hdr {
466             hdr: virtio_snd_hdr { code: op.into() },
467             stream_id: stream_id.into(),
468         };
469         let control_socket_lock = self.control_socket.lock();
470         send_cmd(&*control_socket_lock, msg)
471     }
472 
request_and_cache_info(&mut self) -> Result<()>473     fn request_and_cache_info(&mut self) -> Result<()> {
474         self.request_and_cache_jacks_info()?;
475         self.request_and_cache_streams_info()?;
476         self.request_and_cache_chmaps_info()?;
477         Ok(())
478     }
479 
request_info<T: DataInit + Default + Copy + Clone>( &self, req_code: u32, count: usize, ) -> Result<Vec<T>>480     fn request_info<T: DataInit + Default + Copy + Clone>(
481         &self,
482         req_code: u32,
483         count: usize,
484     ) -> Result<Vec<T>> {
485         let info_size = std::mem::size_of::<T>();
486         let status_size = std::mem::size_of::<virtio_snd_hdr>();
487         let req = virtio_snd_query_info {
488             hdr: virtio_snd_hdr {
489                 code: req_code.into(),
490             },
491             start_id: 0u32.into(),
492             count: (count as u32).into(),
493             size: (std::mem::size_of::<virtio_snd_query_info>() as u32).into(),
494         };
495         let control_socket_lock = self.control_socket.lock();
496         seq_socket_send(&*control_socket_lock, req)?;
497         let reply = control_socket_lock
498             .recv_as_vec()
499             .map_err(Error::ServerIOError)?;
500         let mut status: virtio_snd_hdr = Default::default();
501         status
502             .as_mut_slice()
503             .copy_from_slice(&reply[0..status_size]);
504         if status.code.to_native() != VIRTIO_SND_S_OK {
505             return Err(Error::CommandFailed(status.code.to_native()));
506         }
507         if reply.len() != status_size + count * info_size {
508             return Err(Error::ProtocolError(
509                 ProtocolErrorKind::UnexpectedMessageSize(count * info_size, reply.len()),
510             ));
511         }
512         Ok(reply[status_size..]
513             .chunks(info_size)
514             .map(|info_buffer| {
515                 let mut info: T = Default::default();
516                 // Need to use copy_from_slice instead of T::from_slice because the info_buffer may
517                 // not be aligned correctly
518                 info.as_mut_slice().copy_from_slice(info_buffer);
519                 info
520             })
521             .collect())
522     }
523 
request_and_cache_jacks_info(&mut self) -> Result<()>524     fn request_and_cache_jacks_info(&mut self) -> Result<()> {
525         let num_jacks = self.config.jacks as usize;
526         if num_jacks == 0 {
527             return Ok(());
528         }
529         self.jacks = self.request_info(VIRTIO_SND_R_JACK_INFO, num_jacks)?;
530         Ok(())
531     }
532 
request_and_cache_streams_info(&mut self) -> Result<()>533     fn request_and_cache_streams_info(&mut self) -> Result<()> {
534         let num_streams = self.config.streams as usize;
535         if num_streams == 0 {
536             return Ok(());
537         }
538         self.streams = self.request_info(VIRTIO_SND_R_PCM_INFO, num_streams)?;
539         Ok(())
540     }
541 
request_and_cache_chmaps_info(&mut self) -> Result<()>542     fn request_and_cache_chmaps_info(&mut self) -> Result<()> {
543         let num_chmaps = self.config.chmaps as usize;
544         if num_chmaps == 0 {
545             return Ok(());
546         }
547         self.chmaps = self.request_info(VIRTIO_SND_R_CHMAP_INFO, num_chmaps)?;
548         Ok(())
549     }
550 }
551 
552 impl Drop for VioSClient {
drop(&mut self)553     fn drop(&mut self) {
554         if let Err(e) = self.stop_bg_thread() {
555             error!("Error stopping Recv thread: {}", e);
556         }
557     }
558 }
559 
560 #[derive(Clone, Copy)]
561 struct ThreadFlags {
562     running: bool,
563     reporting_events: bool,
564 }
565 
566 #[derive(PollToken)]
567 enum Token {
568     Notification,
569     TxBufferMsg,
570     RxBufferMsg,
571     EventMsg,
572 }
573 
recv_buffer_status_msg( socket: &UnixSeqpacket, subscribers: &Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>, ) -> Result<()>574 fn recv_buffer_status_msg(
575     socket: &UnixSeqpacket,
576     subscribers: &Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
577 ) -> Result<()> {
578     let mut msg: IoStatusMsg = Default::default();
579     let size = socket
580         .recv(msg.as_mut_slice())
581         .map_err(Error::ServerIOError)?;
582     if size != std::mem::size_of::<IoStatusMsg>() {
583         return Err(Error::ProtocolError(
584             ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<IoStatusMsg>(), size),
585         ));
586     }
587     let mut status = msg.status.status.into();
588     if status == u32::MAX {
589         // Anyone waiting for this would continue to wait for as long as status is
590         // u32::MAX
591         status -= 1;
592     }
593     let latency = msg.status.latency_bytes.into();
594     let offset = msg.buffer_offset as usize;
595     let consumed_len = msg.consumed_len as usize;
596     let promise_opt = subscribers.lock().remove(&offset);
597     match promise_opt {
598         None => error!(
599             "Received an unexpected buffer status message: {}. This is a BUG!!",
600             offset
601         ),
602         Some(sender) => {
603             if let Err(e) = sender.send(BufferReleaseMsg {
604                 status,
605                 latency,
606                 consumed_len,
607             }) {
608                 error!("Failed to notify waiting thread: {:?}", e);
609             }
610         }
611     }
612     Ok(())
613 }
614 
recv_event(socket: &UnixSeqpacket) -> Result<virtio_snd_event>615 fn recv_event(socket: &UnixSeqpacket) -> Result<virtio_snd_event> {
616     let mut msg: virtio_snd_event = Default::default();
617     let size = socket
618         .recv(msg.as_mut_slice())
619         .map_err(Error::ServerIOError)?;
620     if size != std::mem::size_of::<virtio_snd_event>() {
621         return Err(Error::ProtocolError(
622             ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<virtio_snd_event>(), size),
623         ));
624     }
625     Ok(msg)
626 }
627 
spawn_recv_thread( tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>, rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>, event_notifier: Event, event_queue: Arc<Mutex<VecDeque<virtio_snd_event>>>, event: Event, state: Arc<Mutex<ThreadFlags>>, tx_socket: UnixSeqpacket, rx_socket: UnixSeqpacket, event_socket: UnixSeqpacket, ) -> JoinHandle<Result<()>>628 fn spawn_recv_thread(
629     tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
630     rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,
631     event_notifier: Event,
632     event_queue: Arc<Mutex<VecDeque<virtio_snd_event>>>,
633     event: Event,
634     state: Arc<Mutex<ThreadFlags>>,
635     tx_socket: UnixSeqpacket,
636     rx_socket: UnixSeqpacket,
637     event_socket: UnixSeqpacket,
638 ) -> JoinHandle<Result<()>> {
639     std::thread::spawn(move || {
640         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
641             (&tx_socket, Token::TxBufferMsg),
642             (&rx_socket, Token::RxBufferMsg),
643             (&event_socket, Token::EventMsg),
644             (&event, Token::Notification),
645         ])
646         .map_err(Error::WaitContextCreateError)?;
647         loop {
648             let state_cpy = *state.lock();
649             if !state_cpy.running {
650                 break;
651             }
652             let events = wait_ctx.wait().map_err(Error::WaitError)?;
653             for evt in events {
654                 match evt.token {
655                     Token::TxBufferMsg => recv_buffer_status_msg(&tx_socket, &tx_subscribers)?,
656                     Token::RxBufferMsg => recv_buffer_status_msg(&rx_socket, &rx_subscribers)?,
657                     Token::EventMsg => {
658                         let evt = recv_event(&event_socket)?;
659                         let state_cpy = *state.lock();
660                         if state_cpy.reporting_events {
661                             event_queue.lock().push_back(evt);
662                             event_notifier.write(1).map_err(Error::EventWriteError)?;
663                         } // else just drop the events
664                     }
665                     Token::Notification => {
666                         // Just consume the notification and check for termination on the next
667                         // iteration
668                         if let Err(e) = event.read() {
669                             error!("Failed to consume notification from recv thread: {:?}", e);
670                         }
671                     }
672                 }
673             }
674         }
675         Ok(())
676     })
677 }
678 
await_status(promise: Receiver<BufferReleaseMsg>) -> Result<(usize, u32)>679 fn await_status(promise: Receiver<BufferReleaseMsg>) -> Result<(usize, u32)> {
680     let BufferReleaseMsg {
681         status,
682         latency,
683         consumed_len,
684     } = promise.recv().map_err(Error::BufferStatusSenderLost)?;
685     if status == VIRTIO_SND_S_OK {
686         Ok((consumed_len, latency))
687     } else {
688         Err(Error::IOBufferError(status))
689     }
690 }
691 
692 struct IoBufferQueue {
693     socket: UnixSeqpacket,
694     file: File,
695     mmap: MemoryMapping,
696     size: usize,
697     next: Mutex<usize>,
698 }
699 
700 impl IoBufferQueue {
new(socket: UnixSeqpacket, mut file: File) -> Result<IoBufferQueue>701     fn new(socket: UnixSeqpacket, mut file: File) -> Result<IoBufferQueue> {
702         let size = file.seek(SeekFrom::End(0)).map_err(Error::FileSizeError)? as usize;
703 
704         let mmap = MemoryMappingBuilder::new(size)
705             .from_file(&file)
706             .build()
707             .map_err(Error::ServerMmapError)?;
708 
709         Ok(IoBufferQueue {
710             socket,
711             file,
712             mmap,
713             size,
714             next: Mutex::new(0),
715         })
716     }
717 
allocate_buffer(&self, size: usize) -> Result<usize>718     fn allocate_buffer(&self, size: usize) -> Result<usize> {
719         if size > self.size {
720             return Err(Error::OutOfSpace);
721         }
722         let mut next_lock = self.next.lock();
723         let offset = if size > self.size - *next_lock {
724             // Can't fit the new buffer at the end of the area, so put it at the beginning
725             0
726         } else {
727             *next_lock
728         };
729         *next_lock = offset + size;
730         Ok(offset)
731     }
732 
buffer_at(&self, offset: usize, len: usize) -> Result<VolatileSlice>733     fn buffer_at(&self, offset: usize, len: usize) -> Result<VolatileSlice> {
734         self.mmap
735             .get_slice(offset, len)
736             .map_err(Error::VolatileMemoryError)
737     }
738 
try_clone_socket(&self) -> Result<UnixSeqpacket>739     fn try_clone_socket(&self) -> Result<UnixSeqpacket> {
740         self.socket
741             .try_clone()
742             .map_err(Error::UnixSeqpacketDupError)
743     }
744 
send_buffer(&self, stream_id: u32, offset: usize, size: usize) -> Result<()>745     fn send_buffer(&self, stream_id: u32, offset: usize, size: usize) -> Result<()> {
746         let msg = IoTransferMsg::new(stream_id, offset, size);
747         seq_socket_send(&self.socket, msg)
748     }
749 
keep_fds(&self) -> Vec<RawFd>750     fn keep_fds(&self) -> Vec<RawFd> {
751         vec![self.file.as_raw_fd(), self.socket.as_raw_fd()]
752     }
753 }
754 
755 /// Groups the parameters used to configure a stream prior to using it.
756 pub struct VioSStreamParams {
757     pub buffer_bytes: u32,
758     pub period_bytes: u32,
759     pub features: u32,
760     pub channels: u8,
761     pub format: u8,
762     pub rate: u8,
763 }
764 
765 impl From<(u32, VioSStreamParams)> for virtio_snd_pcm_set_params {
from(val: (u32, VioSStreamParams)) -> Self766     fn from(val: (u32, VioSStreamParams)) -> Self {
767         virtio_snd_pcm_set_params {
768             hdr: virtio_snd_pcm_hdr {
769                 hdr: virtio_snd_hdr {
770                     code: VIRTIO_SND_R_PCM_SET_PARAMS.into(),
771                 },
772                 stream_id: val.0.into(),
773             },
774             buffer_bytes: val.1.buffer_bytes.into(),
775             period_bytes: val.1.period_bytes.into(),
776             features: val.1.features.into(),
777             channels: val.1.channels,
778             format: val.1.format,
779             rate: val.1.rate,
780             padding: 0u8,
781         }
782     }
783 }
784 
send_cmd<T: DataInit>(control_socket: &UnixSeqpacket, data: T) -> Result<()>785 fn send_cmd<T: DataInit>(control_socket: &UnixSeqpacket, data: T) -> Result<()> {
786     seq_socket_send(control_socket, data)?;
787     recv_cmd_status(control_socket)
788 }
789 
recv_cmd_status(control_socket: &UnixSeqpacket) -> Result<()>790 fn recv_cmd_status(control_socket: &UnixSeqpacket) -> Result<()> {
791     let mut status: virtio_snd_hdr = Default::default();
792     control_socket
793         .recv(status.as_mut_slice())
794         .map_err(Error::ServerIOError)?;
795     if status.code.to_native() == VIRTIO_SND_S_OK {
796         Ok(())
797     } else {
798         Err(Error::CommandFailed(status.code.to_native()))
799     }
800 }
801 
seq_socket_send<T: DataInit>(socket: &UnixSeqpacket, data: T) -> Result<()>802 fn seq_socket_send<T: DataInit>(socket: &UnixSeqpacket, data: T) -> Result<()> {
803     loop {
804         let send_res = socket.send(data.as_slice());
805         if let Err(e) = send_res {
806             match e.kind() {
807                 // Retry if interrupted
808                 IOErrorKind::Interrupted => continue,
809                 _ => return Err(Error::ServerIOError(e)),
810             }
811         }
812         // Success
813         break;
814     }
815     Ok(())
816 }
817 
818 const VIOS_VERSION: u32 = 2;
819 
820 #[repr(C)]
821 #[derive(Copy, Clone, Default)]
822 struct VioSConfig {
823     version: u32,
824     jacks: u32,
825     streams: u32,
826     chmaps: u32,
827 }
828 // Safe because it only has data and has no implicit padding.
829 unsafe impl DataInit for VioSConfig {}
830 
831 struct BufferReleaseMsg {
832     status: u32,
833     latency: u32,
834     consumed_len: usize,
835 }
836 
837 #[repr(C)]
838 #[derive(Copy, Clone)]
839 struct IoTransferMsg {
840     io_xfer: virtio_snd_pcm_xfer,
841     buffer_offset: u32,
842     buffer_len: u32,
843 }
844 // Safe because it only has data and has no implicit padding.
845 unsafe impl DataInit for IoTransferMsg {}
846 
847 impl IoTransferMsg {
new(stream_id: u32, buffer_offset: usize, buffer_len: usize) -> IoTransferMsg848     fn new(stream_id: u32, buffer_offset: usize, buffer_len: usize) -> IoTransferMsg {
849         IoTransferMsg {
850             io_xfer: virtio_snd_pcm_xfer {
851                 stream_id: stream_id.into(),
852             },
853             buffer_offset: buffer_offset as u32,
854             buffer_len: buffer_len as u32,
855         }
856     }
857 }
858 
859 #[repr(C)]
860 #[derive(Copy, Clone, Default)]
861 struct IoStatusMsg {
862     status: virtio_snd_pcm_status,
863     buffer_offset: u32,
864     consumed_len: u32,
865 }
866 // Safe because it only has data and has no implicit padding.
867 unsafe impl DataInit for IoStatusMsg {}
868