• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! This module implements the "Vhost User" Virtio device as specified here -
6 //! <https://stefanha.github.io/virtio/vhost-user-slave.html#x1-2830007>. The
7 //! device implements the Virtio-Vhost-User protocol. It acts as a proxy between
8 //! the Vhost-user Master (referred to as the `Vhost-user sibling` in this
9 //! module) running in a sibling VM's VMM and Virtio-Vhost-User Slave
10 //! implementation (referred to as `device backend` in this module) in the
11 //! device VM.
12 
13 use std::fs::File;
14 use std::io::Write;
15 use std::os::unix::net::UnixListener;
16 use std::thread;
17 
18 use base::{
19     error, AsRawDescriptor, Event, EventType, FromRawDescriptor, IntoRawDescriptor, PollToken,
20     RawDescriptor, SafeDescriptor, Tube, TubeError, WaitContext,
21 };
22 use data_model::{DataInit, Le32};
23 use libc::{recv, MSG_DONTWAIT, MSG_PEEK};
24 use resources::Alloc;
25 use uuid::Uuid;
26 use vm_control::{VmMemoryDestination, VmMemoryRequest, VmMemoryResponse, VmMemorySource};
27 use vm_memory::GuestMemory;
28 use vmm_vhost::{
29     connection::socket::Endpoint as SocketEndpoint,
30     connection::EndpointExt,
31     message::{
32         MasterReq, VhostUserMemory, VhostUserMemoryRegion, VhostUserMsgHeader,
33         VhostUserMsgValidator, VhostUserU64,
34     },
35     Protocol, SlaveReqHelper,
36 };
37 
38 use crate::virtio::descriptor_utils::Error as DescriptorUtilsError;
39 use crate::virtio::{
40     copy_config, DescriptorChain, Interrupt, PciCapabilityType, Queue, Reader, SignalableInterrupt,
41     VirtioDevice, VirtioPciCap, Writer, TYPE_VHOST_USER,
42 };
43 use crate::PciAddress;
44 use crate::{
45     pci::{
46         PciBarConfiguration, PciBarIndex, PciBarPrefetchable, PciBarRegionType, PciCapability,
47         PciCapabilityID,
48     },
49     virtio::{VIRTIO_F_ACCESS_PLATFORM, VIRTIO_MSI_NO_VECTOR},
50 };
51 
52 use remain::sorted;
53 use thiserror::Error as ThisError;
54 use vmm_vhost::Error as VhostError;
55 
56 // Note: There are two sets of queues that will be mentioned here. 1st set is
57 // for this Virtio PCI device itself. 2nd set is the actual device backends
58 // which are set up via Virtio Vhost User (a protocol whose messages are
59 // forwarded by this device) such as block, net, etc..
60 //
61 // The queue configuration about any device backends this proxy may support.
62 const MAX_VHOST_DEVICE_QUEUES: usize = 16;
63 
64 // Proxy device i.e. this device's configuration.
65 const NUM_PROXY_DEVICE_QUEUES: usize = 2;
66 const PROXY_DEVICE_QUEUE_SIZE: u16 = 256;
67 const PROXY_DEVICE_QUEUE_SIZES: &[u16] = &[PROXY_DEVICE_QUEUE_SIZE; NUM_PROXY_DEVICE_QUEUES];
68 const CONFIG_UUID_SIZE: usize = 16;
69 // Defined in the specification here -
70 // https://stefanha.github.io/virtio/vhost-user-slave.html#x1-2870004.
71 const VIRTIO_VHOST_USER_STATUS_SLAVE_UP: u8 = 0;
72 
73 const BAR_INDEX: u8 = 2;
74 
75 // Bar configuration.
76 // All offsets are from the starting of bar `BAR_INDEX`.
77 const DOORBELL_OFFSET: u64 = 0;
78 // TODO(abhishekbh): Copied from lspci in qemu with VVU support.
79 const DOORBELL_SIZE: u64 = 0x2000;
80 const NOTIFICATIONS_OFFSET: u64 = DOORBELL_OFFSET + DOORBELL_SIZE;
81 const NOTIFICATIONS_SIZE: u64 = 0x1000;
82 const SHARED_MEMORY_OFFSET: u64 = NOTIFICATIONS_OFFSET + NOTIFICATIONS_SIZE;
83 // TODO(abhishekbh): Copied from qemu with VVU support. This should be same as
84 // VirtioVhostUser.device_bar_size, but it's significantly  lower than the
85 // memory allocated to a sibling VM. Figure out how these two are related.
86 const SHARED_MEMORY_SIZE: u64 = 0x1000;
87 
88 // Notifications region related constants.
89 const NOTIFICATIONS_VRING_SELECT_OFFSET: u64 = 0;
90 const NOTIFICATIONS_MSIX_VECTOR_SELECT_OFFSET: u64 = 2;
91 
92 // Capabilities related configuration.
93 //
94 // Values written in the Doorbell must be within 32 bit i.e. a write to offset 0
95 // to 3 represents a Vring 0 related event, a write to offset 4 to 7 represents
96 // a Vring 1 related event.
97 const DOORBELL_OFFSET_MULTIPLIER: u32 = 4;
98 
99 // Vhost-user sibling message types that require extra processing by this proxy. All
100 // other messages are passed through to the device backend.
101 const SIBLING_ACTION_MESSAGE_TYPES: &[MasterReq] = &[
102     MasterReq::SET_MEM_TABLE,
103     MasterReq::SET_LOG_BASE,
104     MasterReq::SET_LOG_FD,
105     MasterReq::SET_VRING_KICK,
106     MasterReq::SET_VRING_CALL,
107     MasterReq::SET_VRING_ERR,
108     MasterReq::SET_SLAVE_REQ_FD,
109     MasterReq::SET_INFLIGHT_FD,
110 ];
111 
112 // TODO(abhishekbh): Migrate to anyhow::Error.
113 #[sorted]
114 #[derive(ThisError, Debug)]
115 pub enum Error {
116     /// Failed to accept connection on a socket.
117     #[error("failed to accept connection on a socket: {0}")]
118     AcceptConnection(std::io::Error),
119     /// Bar not allocated.
120     #[error("bar not allocated: {0}")]
121     BarNotAllocated(PciBarIndex),
122     /// Call event not set for a vring.
123     #[error("call event not set for {}-th vring: {0}")]
124     CallEventNotSet(usize),
125     /// Failed to create a listener.
126     #[error("failed to create a listener: {0}")]
127     CreateListener(std::io::Error),
128     /// Failed to create a wait context object.
129     #[error("failed to create a wait context object: {0}")]
130     CreateWaitContext(base::Error),
131     /// Failed to create a Writer object.
132     #[error("failed to create a Writer")]
133     CreateWriter,
134     /// Failed to send ACK in response to Vhost-user sibling message.
135     #[error("Failed to send Ack: {0}")]
136     FailedAck(VhostError),
137     /// Failed to accept sibling connection.
138     #[error("failed to accept sibling connection: {0}")]
139     FailedToAcceptSiblingConnection(std::io::Error),
140     /// Failed to read kick event for a vring.
141     #[error("Failed to read kick event for {}-th vring: {1} {0}")]
142     FailedToReadKickEvt(base::Error, usize),
143     /// Failed to receive doorbell data.
144     #[error("failed to receive doorbell data: {0}")]
145     FailedToReceiveDoorbellData(TubeError),
146     /// Failed to write call event.
147     #[error("failed to write call event for {}=th ring: {1} {0}")]
148     FailedToWriteCallEvent(base::Error, usize),
149     /// Invalid PCI bar index.
150     #[error("invalid bar index: {0}")]
151     InvalidBar(PciBarIndex),
152     /// Invalid Vhost-user sibling message.
153     #[error("invalid Vhost-user sibling message")]
154     InvalidSiblingMessage,
155     /// Kick data not set for a vring.
156     #[error("kick data not set for {}-th vring: {0}")]
157     KickDataNotSet(usize),
158     /// Failed to send a memory mapping request.
159     #[error("memory mapping request failed")]
160     MemoryMappingRequestFailure,
161     /// MSI vector not set for a vring.
162     #[error("MSI vector not set for {}-th vring: {0}")]
163     MsiVectorNotSet(usize),
164     /// Failed to parse vring kick / call file descriptors.
165     #[error("failed to parse vring kick / call file descriptors: {0}")]
166     ParseVringFdRequest(VhostError),
167     /// Failed to read payload of a Vhost-user sibling header.
168     #[error("failed to read Vhost-user sibling message header: {0}")]
169     ReadSiblingHeader(VhostError),
170     /// Failed to read payload of a Vhost-user sibling message.
171     #[error("failed to read Vhost-user sibling message payload: {0}")]
172     ReadSiblingPayload(VhostError),
173     /// Rx buffer too small to accomodate data.
174     #[error("rx buffer too small")]
175     RxBufferTooSmall,
176     /// There are no more available descriptors to receive into.
177     #[error("no rx descriptors available")]
178     RxDescriptorsExhausted,
179     /// Sibling is disconnected.
180     #[error("sibling disconnected")]
181     SiblingDisconnected,
182     /// Failed to receive a memory mapping request from the main process.
183     #[error("receiving mapping request from tube failed: {0}")]
184     TubeReceiveFailure(TubeError),
185     /// Failed to send a memory mapping request to the main process.
186     #[error("sending mapping request to tube failed: {0}")]
187     TubeSendFailure(TubeError),
188     /// Adding |SET_VRING_KICK| related epoll event failed.
189     #[error("failed to add kick event to the epoll set: {0}")]
190     WaitContextAddKickEvent(base::Error),
191     /// Removing read event from the sibling VM socket events failed.
192     #[error("failed to disable EPOLLIN on sibling VM socket fd: {0}")]
193     WaitContextDisableSiblingVmSocket(base::Error),
194     /// Adding read event to the sibling VM socket events failed.
195     #[error("failed to enable EPOLLIN on sibling VM socket fd: {0}")]
196     WaitContextEnableSiblingVmSocket(base::Error),
197     /// Failed to wait for events.
198     #[error("failed to wait for events: {0}")]
199     WaitError(base::Error),
200     /// Writing to a buffer in the guest failed.
201     #[error("failed to write to guest buffer: {0}")]
202     WriteBuffer(std::io::Error),
203     /// Failed to create a Writer.
204     #[error("failed to create a Writer: {0}")]
205     WriterCreation(DescriptorUtilsError),
206 }
207 
208 pub type Result<T> = std::result::Result<T, Error>;
209 
210 // Device configuration as per section 5.7.4.
211 #[derive(Debug, Clone, Copy)]
212 #[repr(C)]
213 struct VirtioVhostUserConfig {
214     status: Le32,
215     max_vhost_queues: Le32,
216     uuid: [u8; CONFIG_UUID_SIZE],
217 }
218 
219 // Safe because it only has data and has no implicit padding.
220 unsafe impl DataInit for VirtioVhostUserConfig {}
221 
222 impl Default for VirtioVhostUserConfig {
default() -> Self223     fn default() -> Self {
224         VirtioVhostUserConfig {
225             status: Le32::from(0),
226             max_vhost_queues: Le32::from(MAX_VHOST_DEVICE_QUEUES as u32),
227             uuid: [0; CONFIG_UUID_SIZE],
228         }
229     }
230 }
231 
232 impl VirtioVhostUserConfig {
is_slave_up(&self) -> bool233     fn is_slave_up(&self) -> bool {
234         self.check_status_bit(VIRTIO_VHOST_USER_STATUS_SLAVE_UP)
235     }
236 
check_status_bit(&self, bit: u8) -> bool237     fn check_status_bit(&self, bit: u8) -> bool {
238         let status = self.status.to_native();
239         status & (1 << bit) > 0
240     }
241 }
242 
243 // Checks if the message requires any extra processing by this proxy.
is_action_request(hdr: &VhostUserMsgHeader<MasterReq>) -> bool244 fn is_action_request(hdr: &VhostUserMsgHeader<MasterReq>) -> bool {
245     SIBLING_ACTION_MESSAGE_TYPES
246         .iter()
247         .any(|&h| h == hdr.get_code())
248 }
249 
250 // Checks if |files| are sent by the Vhost-user sibling only for specific messages.
check_attached_files( hdr: &VhostUserMsgHeader<MasterReq>, files: &Option<Vec<File>>, ) -> Result<()>251 fn check_attached_files(
252     hdr: &VhostUserMsgHeader<MasterReq>,
253     files: &Option<Vec<File>>,
254 ) -> Result<()> {
255     match hdr.get_code() {
256         MasterReq::SET_MEM_TABLE
257         | MasterReq::SET_VRING_CALL
258         | MasterReq::SET_VRING_KICK
259         | MasterReq::SET_VRING_ERR
260         | MasterReq::SET_LOG_BASE
261         | MasterReq::SET_LOG_FD
262         | MasterReq::SET_SLAVE_REQ_FD
263         | MasterReq::SET_INFLIGHT_FD
264         | MasterReq::ADD_MEM_REG => {
265             // These messages are always associated with an fd.
266             if files.is_some() {
267                 Ok(())
268             } else {
269                 Err(Error::InvalidSiblingMessage)
270             }
271         }
272         _ if files.is_some() => Err(Error::InvalidSiblingMessage),
273         _ => Ok(()),
274     }
275 }
276 
277 // Check if `hdr` is valid.
is_header_valid(hdr: &VhostUserMsgHeader<MasterReq>) -> bool278 fn is_header_valid(hdr: &VhostUserMsgHeader<MasterReq>) -> bool {
279     if hdr.is_reply() || hdr.get_version() != 0x1 {
280         return false;
281     }
282     true
283 }
284 
285 // Payload sent by the sibling in a |SET_VRING_KICK| message.
286 #[derive(Default)]
287 struct KickData {
288     // Fd sent by the sibling. This is monitored and when it's written to an interrupt is injected
289     // into the guest.
290     kick_evt: Option<Event>,
291 
292     // The interrupt to be injected to the guest in response to an event to |kick_evt|.
293     msi_vector: Option<u16>,
294 }
295 
296 // Vring related data sent through |SET_VRING_KICK| and |SET_VRING_CALL|.
297 #[derive(Default)]
298 struct Vring {
299     kick_data: KickData,
300     call_evt: Option<Event>,
301 }
302 
303 // Processes messages from the Vhost-user sibling and sends it to the device backend and
304 // vice-versa.
305 struct Worker {
306     mem: GuestMemory,
307     interrupt: Interrupt,
308     rx_queue: Queue,
309     tx_queue: Queue,
310 
311     // To communicate with the main process.
312     main_process_tube: Tube,
313 
314     // The bar representing the doorbell, notification and shared memory regions.
315     pci_bar: Alloc,
316 
317     // Offset at which to allocate the next shared memory region, corresponding
318     // to the |SET_MEM_TABLE| sibling message.
319     mem_offset: usize,
320 
321     // Vring related data sent through |SET_VRING_KICK| and |SET_VRING_CALL|
322     // messages.
323     vrings: [Vring; MAX_VHOST_DEVICE_QUEUES],
324 
325     // Helps with communication and parsing messages from the sibling.
326     slave_req_helper: SlaveReqHelper<SocketEndpoint<MasterReq>>,
327 }
328 
329 #[derive(PollToken, Debug, Clone)]
330 enum Token {
331     // Data is available on the Vhost-user sibling socket.
332     SiblingSocket,
333     // The device backend has made a read buffer available.
334     RxQueue,
335     // The device backend has sent a buffer to the |Worker::tx_queue|.
336     TxQueue,
337     // The sibling writes a kick event for the |index|-th vring.
338     SiblingKick { index: usize },
339     // crosvm has requested the device to shut down.
340     Kill,
341     // Message from the main thread.
342     MainThread,
343 }
344 
345 impl Worker {
346     // The entry point into `Worker`.
347     // - At this point the connection with the sibling is already established.
348     // - Process messages from the device over Virtio, from the sibling over a unix domain socket,
349     //   from the main thread in this device over a tube and from the main crosvm process over a
350     //   tube.
run( &mut self, rx_queue_evt: Event, tx_queue_evt: Event, main_thread_tube: Tube, kill_evt: Event, ) -> Result<()>351     fn run(
352         &mut self,
353         rx_queue_evt: Event,
354         tx_queue_evt: Event,
355         main_thread_tube: Tube,
356         kill_evt: Event,
357     ) -> Result<()> {
358         // TODO(abhishekbh): Should interrupt.signal_config_changed be called here ?.
359         let mut wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
360             (&self.slave_req_helper, Token::SiblingSocket),
361             (&rx_queue_evt, Token::RxQueue),
362             (&tx_queue_evt, Token::TxQueue),
363             (&main_thread_tube, Token::MainThread),
364             (&kill_evt, Token::Kill),
365         ])
366         .map_err(Error::CreateWaitContext)?;
367 
368         // Represents if |slave_req_helper.endpoint| is being monitored for data
369         // from the Vhost-user sibling.
370         let mut sibling_socket_polling_enabled = true;
371         'wait: loop {
372             let events = wait_ctx.wait().map_err(Error::WaitError)?;
373             for event in events.iter().filter(|e| e.is_readable) {
374                 match event.token {
375                     Token::SiblingSocket => {
376                         match self.process_rx(&mut wait_ctx) {
377                             Ok(()) => {}
378                             Err(Error::RxDescriptorsExhausted) => {
379                                 // If the driver has no Rx buffers left, then no
380                                 // point monitoring the Vhost-user sibling for data. There
381                                 // would be no way to send it to the device backend.
382                                 wait_ctx
383                                     .modify(
384                                         &self.slave_req_helper,
385                                         EventType::None,
386                                         Token::SiblingSocket,
387                                     )
388                                     .map_err(Error::WaitContextDisableSiblingVmSocket)?;
389                                 sibling_socket_polling_enabled = false;
390                             }
391                             // TODO(b/216407443): Handle sibling disconnection. The sibling sends
392                             // 0-length data the proxy device forwards it to the guest so that the
393                             // VVU backend can get notified that the connection is closed.
394                             Err(e) => return Err(e),
395                         }
396                     }
397                     Token::RxQueue => {
398                         if let Err(e) = rx_queue_evt.read() {
399                             error!("error reading rx queue Event: {}", e);
400                             break 'wait;
401                         }
402 
403                         // Rx buffers are available, now we should monitor the
404                         // Vhost-user sibling connection for data.
405                         if !sibling_socket_polling_enabled {
406                             wait_ctx
407                                 .modify(
408                                     &self.slave_req_helper,
409                                     EventType::Read,
410                                     Token::SiblingSocket,
411                                 )
412                                 .map_err(Error::WaitContextEnableSiblingVmSocket)?;
413                             sibling_socket_polling_enabled = true;
414                         }
415                     }
416                     Token::TxQueue => {
417                         if let Err(e) = tx_queue_evt.read() {
418                             error!("error reading tx queue event: {}", e);
419                             break 'wait;
420                         }
421                         self.process_tx();
422                     }
423                     Token::SiblingKick { index } => {
424                         if let Err(e) = self.process_sibling_kick(index) {
425                             error!(
426                                 "error processing sibling kick for {}-th vring: {}",
427                                 index, e
428                             );
429                             break 'wait;
430                         }
431                     }
432                     Token::MainThread => {
433                         if let Err(e) = self.process_doorbell_message(&main_thread_tube) {
434                             error!("error processing doorbell message: {}", e);
435                             break 'wait;
436                         }
437                     }
438                     Token::Kill => {
439                         let _ = kill_evt.read();
440                         break 'wait;
441                     }
442                 }
443             }
444         }
445         Ok(())
446     }
447 
448     // Processes data from the Vhost-user sibling and forward to the driver via Rx buffers.
process_rx(&mut self, wait_ctx: &mut WaitContext<Token>) -> Result<()>449     fn process_rx(&mut self, wait_ctx: &mut WaitContext<Token>) -> Result<()> {
450         // Keep looping until -
451         // - No more Rx buffers are available on the Rx queue. OR
452         // - No more data is available on the Vhost-user sibling socket (checked via a
453         //   peek).
454         //
455         // If a Rx buffer is available and if data is present on the Vhost
456         // master socket then -
457         // - Parse the Vhost-user sibling message. If it's not an action type message
458         //   then copy the message as is to the Rx buffer and forward it to the
459         //   device backend.
460         //
461         // Peek if any data is left on the Vhost-user sibling socket. If no, then
462         // nothing to forwad to the device backend.
463         while self.is_sibling_data_available()? {
464             let desc = self
465                 .rx_queue
466                 .peek(&self.mem)
467                 .ok_or(Error::RxDescriptorsExhausted)?;
468             // To successfully receive attached file descriptors, we need to
469             // receive messages and corresponding attached file descriptors in
470             // this way:
471             // - receive messsage header and optional attached files.
472             // - receive optional message body and payload according size field
473             //   in message header.
474             // - forward it to the device backend.
475             let (hdr, files) = self
476                 .slave_req_helper
477                 .as_mut()
478                 .recv_header()
479                 .map_err(Error::ReadSiblingHeader)?;
480             check_attached_files(&hdr, &files)?;
481             let buf = self.get_sibling_msg_data(&hdr)?;
482 
483             let index = desc.index;
484             let bytes_written = {
485                 if is_action_request(&hdr) {
486                     // TODO(abhishekbh): Implement action messages.
487                     let res = match hdr.get_code() {
488                         MasterReq::SET_MEM_TABLE => {
489                             // Map the sibling memory in this process and forward the
490                             // sibling memory info to the slave. Only if the mapping
491                             // succeeds send info along to the slave, else send a failed
492                             // Ack back to the master.
493                             self.set_mem_table(&hdr, &buf, files)
494                         }
495                         MasterReq::SET_VRING_CALL => self.set_vring_call(&hdr, &buf, files),
496                         MasterReq::SET_VRING_KICK => {
497                             self.set_vring_kick(wait_ctx, &hdr, &buf, files)
498                         }
499                         _ => {
500                             unimplemented!("unimplemented action message:{:?}", hdr.get_code());
501                         }
502                     };
503 
504                     // If the "action" in response to the action messages
505                     // failed then no bytes have been written to the virt
506                     // queue. Else, the action is done. Now forward the
507                     // message to the virt queue and return how many bytes
508                     // were written.
509                     match res {
510                         Ok(()) => self.forward_msg_to_device(desc, &hdr, &buf),
511                         Err(e) => Err(e),
512                     }
513                 } else {
514                     // If no special processing required. Forward this message as is
515                     // to the device backend.
516                     self.forward_msg_to_device(desc, &hdr, &buf)
517                 }
518             };
519 
520             // If some bytes were written to the virt queue, now it's time
521             // to add a used buffer and notify the guest. Else if there was
522             // an error of any sort, we notify the sibling by sending an ACK
523             // with failure.
524             match bytes_written {
525                 Ok(bytes_written) => {
526                     // The driver is able to deal with a descriptor with 0 bytes written.
527                     self.rx_queue.pop_peeked(&self.mem);
528                     self.rx_queue.add_used(&self.mem, index, bytes_written);
529                     if !self.rx_queue.trigger_interrupt(&self.mem, &self.interrupt) {
530                         // This interrupt should always be injected. We'd rather fail
531                         // fast if there is an error.
532                         panic!("failed to send interrupt");
533                     }
534                 }
535                 Err(e) => {
536                     error!("failed to forward message to the device: {}", e);
537                     self.slave_req_helper
538                         .send_ack_message(&hdr, false)
539                         .map_err(Error::FailedAck)?;
540                 }
541             }
542         }
543 
544         Ok(())
545     }
546 
547     // Returns the sibling connection status.
is_sibling_data_available(&self) -> Result<bool>548     fn is_sibling_data_available(&self) -> Result<bool> {
549         // Peek if any data is left on the Vhost-user sibling socket. If no, then
550         // nothing to forwad to the device backend.
551         let mut peek_buf = [0; 1];
552         let raw_fd = self.slave_req_helper.as_raw_descriptor();
553         // Safe because `raw_fd` and `peek_buf` are owned by this struct.
554         let peek_ret = unsafe {
555             recv(
556                 raw_fd,
557                 peek_buf.as_mut_ptr() as *mut libc::c_void,
558                 peek_buf.len(),
559                 MSG_PEEK | MSG_DONTWAIT,
560             )
561         };
562 
563         match peek_ret {
564             0 => Err(Error::SiblingDisconnected),
565             ret if ret < 0 => match base::Error::last() {
566                 // EAGAIN means that no data is available. Any other error means that the sibling
567                 // has disconnected.
568                 e if e.errno() == libc::EAGAIN => Ok(false),
569                 _ => Err(Error::SiblingDisconnected),
570             },
571             _ => Ok(true),
572         }
573     }
574 
575     // Returns any data attached to a Vhost-user sibling message.
get_sibling_msg_data(&mut self, hdr: &VhostUserMsgHeader<MasterReq>) -> Result<Vec<u8>>576     fn get_sibling_msg_data(&mut self, hdr: &VhostUserMsgHeader<MasterReq>) -> Result<Vec<u8>> {
577         let buf = match hdr.get_size() {
578             0 => vec![0u8; 0],
579             len => {
580                 let rbuf = self
581                     .slave_req_helper
582                     .as_mut()
583                     .recv_data(len as usize)
584                     .map_err(Error::ReadSiblingPayload)?;
585                 if rbuf.len() != len as usize {
586                     self.slave_req_helper
587                         .send_ack_message(hdr, false)
588                         .map_err(Error::FailedAck)?;
589                     return Err(Error::InvalidSiblingMessage);
590                 }
591                 rbuf
592             }
593         };
594         Ok(buf)
595     }
596 
597     // Forwards |hdr, buf| to the device backend via |desc_chain| in the virtio
598     // queue. Returns the number of bytes written to the virt queue.
forward_msg_to_device( &mut self, desc_chain: DescriptorChain, hdr: &VhostUserMsgHeader<MasterReq>, buf: &[u8], ) -> Result<u32>599     fn forward_msg_to_device(
600         &mut self,
601         desc_chain: DescriptorChain,
602         hdr: &VhostUserMsgHeader<MasterReq>,
603         buf: &[u8],
604     ) -> Result<u32> {
605         let bytes_written = match Writer::new(self.mem.clone(), desc_chain) {
606             Ok(mut writer) => {
607                 if writer.available_bytes()
608                     < buf.len() + std::mem::size_of::<VhostUserMsgHeader<MasterReq>>()
609                 {
610                     error!("rx buffer too small to accomodate server data");
611                     return Err(Error::RxBufferTooSmall);
612                 }
613                 // Write header first then any data. Do these separately to prevent any reorders.
614                 let mut written = writer.write(hdr.as_slice()).map_err(Error::WriteBuffer)?;
615                 written += writer.write(buf).map_err(Error::WriteBuffer)?;
616                 written as u32
617             }
618             Err(e) => {
619                 error!("failed to create Writer: {}", e);
620                 return Err(Error::CreateWriter);
621             }
622         };
623         Ok(bytes_written)
624     }
625 
626     // Handles `SET_MEM_TABLE` message from sibling. Parses `hdr` into
627     // memory region information. For each memory region sent by the Vhost
628     // Master, it mmaps a region of memory in the main process. At the end of
629     // this function both this VMM and the sibling have two regions of
630     // virtual memory pointing to the same physical page. These regions will be
631     // accessed by the device VM and the silbing VM.
set_mem_table( &mut self, hdr: &VhostUserMsgHeader<MasterReq>, payload: &[u8], files: Option<Vec<File>>, ) -> Result<()>632     fn set_mem_table(
633         &mut self,
634         hdr: &VhostUserMsgHeader<MasterReq>,
635         payload: &[u8],
636         files: Option<Vec<File>>,
637     ) -> Result<()> {
638         if !is_header_valid(hdr) {
639             return Err(Error::InvalidSiblingMessage);
640         }
641 
642         // `hdr` is followed by a `payload`. `payload` consists of metadata about the number of
643         // memory regions and then memory regions themeselves. The memory regions structs consist of
644         // metadata about actual device related memory passed from the sibling. Ensure that the size
645         // of the payload is consistent with this structure.
646         let payload_size = payload.len();
647         if payload_size < std::mem::size_of::<VhostUserMemory>() {
648             error!("payload size {} lesser than minimum required", payload_size);
649             return Err(Error::InvalidSiblingMessage);
650         }
651         let (msg_slice, regions_slice) = payload.split_at(std::mem::size_of::<VhostUserMemory>());
652         let msg = VhostUserMemory::from_slice(msg_slice).ok_or(Error::InvalidSiblingMessage)?;
653         if !msg.is_valid() {
654             return Err(Error::InvalidSiblingMessage);
655         }
656 
657         let memory_region_metadata_size = std::mem::size_of::<VhostUserMemory>();
658         if payload_size
659             != memory_region_metadata_size
660                 + msg.num_regions as usize * std::mem::size_of::<VhostUserMemoryRegion>()
661         {
662             return Err(Error::InvalidSiblingMessage);
663         }
664 
665         let regions: Vec<&VhostUserMemoryRegion> = regions_slice
666             .chunks(std::mem::size_of::<VhostUserMemoryRegion>())
667             .map(VhostUserMemoryRegion::from_slice)
668             .collect::<Option<_>>()
669             .ok_or_else(|| {
670                 error!("failed to construct VhostUserMemoryRegion array");
671                 Error::InvalidSiblingMessage
672             })?;
673 
674         if !regions.iter().all(|r| r.is_valid()) {
675             return Err(Error::InvalidSiblingMessage);
676         }
677 
678         let files = files.ok_or(Error::InvalidSiblingMessage)?;
679         if files.len() != msg.num_regions as usize {
680             return Err(Error::InvalidSiblingMessage);
681         }
682 
683         self.create_sibling_guest_memory(&regions, files)?;
684         Ok(())
685     }
686 
687     // Mmaps sibling memory in this device's VMM's main process' address
688     // space.
create_sibling_guest_memory( &mut self, contexts: &[&VhostUserMemoryRegion], files: Vec<File>, ) -> Result<()>689     pub fn create_sibling_guest_memory(
690         &mut self,
691         contexts: &[&VhostUserMemoryRegion],
692         files: Vec<File>,
693     ) -> Result<()> {
694         if contexts.len() != files.len() {
695             return Err(Error::InvalidSiblingMessage);
696         }
697 
698         for (region, file) in contexts.iter().zip(files.into_iter()) {
699             let request = VmMemoryRequest::RegisterMemory {
700                 source: VmMemorySource::Descriptor {
701                     descriptor: SafeDescriptor::from(file),
702                     offset: region.mmap_offset,
703                     size: region.memory_size,
704                 },
705                 dest: VmMemoryDestination::ExistingAllocation {
706                     allocation: self.pci_bar,
707                     offset: self.mem_offset as u64,
708                 },
709                 read_only: false,
710             };
711             self.process_memory_mapping_request(&request)?;
712             self.mem_offset += region.memory_size as usize;
713         }
714         Ok(())
715     }
716 
717     // Sends memory mapping request to the main process. If successful adds the
718     // mmaped info into |sibling_mem|, else returns error.
process_memory_mapping_request(&mut self, request: &VmMemoryRequest) -> Result<()>719     fn process_memory_mapping_request(&mut self, request: &VmMemoryRequest) -> Result<()> {
720         self.main_process_tube
721             .send(request)
722             .map_err(Error::TubeSendFailure)?;
723 
724         let response = self
725             .main_process_tube
726             .recv()
727             .map_err(Error::TubeReceiveFailure)?;
728         match response {
729             VmMemoryResponse::RegisterMemory { .. } => Ok(()),
730             VmMemoryResponse::Err(e) => {
731                 error!("memory mapping failed: {}", e);
732                 Err(Error::MemoryMappingRequestFailure)
733             }
734             _ => Err(Error::MemoryMappingRequestFailure),
735         }
736     }
737 
738     // Handles |SET_VRING_CALL|.
set_vring_call( &mut self, hdr: &VhostUserMsgHeader<MasterReq>, payload: &[u8], files: Option<Vec<File>>, ) -> Result<()>739     fn set_vring_call(
740         &mut self,
741         hdr: &VhostUserMsgHeader<MasterReq>,
742         payload: &[u8],
743         files: Option<Vec<File>>,
744     ) -> Result<()> {
745         if !is_header_valid(hdr) {
746             return Err(Error::InvalidSiblingMessage);
747         }
748 
749         let payload_size = payload.len();
750         if payload_size != std::mem::size_of::<VhostUserU64>() {
751             error!("wrong payload size {}", payload_size);
752             return Err(Error::InvalidSiblingMessage);
753         }
754 
755         let (index, file) = self
756             .slave_req_helper
757             .handle_vring_fd_request(payload, files)
758             .map_err(Error::ParseVringFdRequest)?;
759 
760         if index as usize >= MAX_VHOST_DEVICE_QUEUES {
761             error!("illegal Vring index:{}", index);
762             return Err(Error::InvalidSiblingMessage);
763         }
764 
765         let file = file.ok_or_else(|| {
766             error!("no file found for SET_VRING_CALL");
767             Error::InvalidSiblingMessage
768         })?;
769 
770         // Safe because we own the file.
771         self.vrings[index as usize].call_evt =
772             unsafe { Some(Event::from_raw_descriptor(file.into_raw_descriptor())) };
773 
774         Ok(())
775     }
776 
777     // Handles |SET_VRING_KICK|. If successful it sets up an event handler for a
778     // write to the sent kick fd.
set_vring_kick( &mut self, wait_ctx: &mut WaitContext<Token>, hdr: &VhostUserMsgHeader<MasterReq>, payload: &[u8], files: Option<Vec<File>>, ) -> Result<()>779     fn set_vring_kick(
780         &mut self,
781         wait_ctx: &mut WaitContext<Token>,
782         hdr: &VhostUserMsgHeader<MasterReq>,
783         payload: &[u8],
784         files: Option<Vec<File>>,
785     ) -> Result<()> {
786         if !is_header_valid(hdr) {
787             return Err(Error::InvalidSiblingMessage);
788         }
789 
790         let payload_size = payload.len();
791         if payload_size != std::mem::size_of::<VhostUserU64>() {
792             error!("wrong payload size {}", payload_size);
793             return Err(Error::InvalidSiblingMessage);
794         }
795 
796         let (index, file) = self
797             .slave_req_helper
798             .handle_vring_fd_request(payload, files)
799             .map_err(Error::ParseVringFdRequest)?;
800 
801         if index as usize >= MAX_VHOST_DEVICE_QUEUES {
802             error!("illegal Vring index:{}", index);
803             return Err(Error::InvalidSiblingMessage);
804         }
805 
806         let file = file.ok_or_else(|| {
807             error!("no file found for SET_VRING_KICK");
808             Error::InvalidSiblingMessage
809         })?;
810 
811         // Safe because we own the file.
812         let kick_evt = unsafe { Event::from_raw_descriptor(file.into_raw_descriptor()) };
813         let kick_data = &mut self.vrings[index as usize].kick_data;
814 
815         wait_ctx
816             .add(
817                 &kick_evt,
818                 Token::SiblingKick {
819                     index: index as usize,
820                 },
821             )
822             .map_err(Error::WaitContextAddKickEvent)?;
823         kick_data.kick_evt = Some(kick_evt);
824 
825         Ok(())
826     }
827 
828     // Processes data from the device backend (via virtio Tx queue) and forward it to
829     // the Vhost-user sibling over its socket connection.
process_tx(&mut self)830     fn process_tx(&mut self) {
831         while let Some(desc_chain) = self.tx_queue.pop(&self.mem) {
832             let index = desc_chain.index;
833             match Reader::new(self.mem.clone(), desc_chain) {
834                 Ok(mut reader) => {
835                     let expected_count = reader.available_bytes();
836                     match reader.read_to(self.slave_req_helper.as_mut().as_mut(), expected_count) {
837                         Ok(count) => {
838                             // The |reader| guarantees that all the available data is read.
839                             if count != expected_count {
840                                 error!("wrote only {} bytes of {}", count, expected_count);
841                             }
842                         }
843                         Err(e) => error!("failed to write message to vhost-vmm: {}", e),
844                     }
845                 }
846                 Err(e) => error!("failed to create Reader: {}", e),
847             }
848             self.tx_queue.add_used(&self.mem, index, 0);
849             if !self.tx_queue.trigger_interrupt(&self.mem, &self.interrupt) {
850                 panic!("failed inject tx queue interrupt");
851             }
852         }
853     }
854 
855     // Processes a sibling kick for the |index|-th vring and injects the corresponding interrupt
856     // into the guest.
process_sibling_kick(&mut self, index: usize) -> Result<()>857     fn process_sibling_kick(&mut self, index: usize) -> Result<()> {
858         // The sibling is indicating a used queue event on
859         // vring number |index|. Acknowledge the event and
860         // inject the related interrupt into the guest.
861         let kick_data = &self.vrings[index as usize].kick_data;
862         let kick_evt = kick_data
863             .kick_evt
864             .as_ref()
865             .ok_or(Error::KickDataNotSet(index))?;
866         kick_evt
867             .read()
868             .map_err(|e| Error::FailedToReadKickEvt(e, index))?;
869         match kick_data.msi_vector {
870             Some(msi_vector) => {
871                 self.interrupt.signal_used_queue(msi_vector);
872                 Ok(())
873             }
874             None => Err(Error::MsiVectorNotSet(index)),
875         }
876     }
877 
878     // Processes a message sent, on `main_thread_tube`, in response to a doorbell write. It writes
879     // to the corresponding call event of the vring index sent over `main_thread_tube`.
process_doorbell_message(&mut self, main_thread_tube: &Tube) -> Result<()>880     fn process_doorbell_message(&mut self, main_thread_tube: &Tube) -> Result<()> {
881         // It's okay to call |expect| here as there's no way to indicate failure on
882         // a doorbell write operation. We'd rather fail early than have inconsistent
883         // state.
884         let index: usize = main_thread_tube
885             .recv()
886             .map_err(Error::FailedToReceiveDoorbellData)?;
887         let call_evt = self.vrings[index]
888             .call_evt
889             .as_ref()
890             .ok_or(Error::CallEventNotSet(index))?;
891         call_evt
892             .write(1)
893             .map_err(|e| Error::FailedToWriteCallEvent(e, index))?;
894         Ok(())
895     }
896 }
897 
898 // Doorbell capability of the proxy device.
899 #[repr(C)]
900 #[derive(Clone, Copy)]
901 pub struct VirtioPciDoorbellCap {
902     cap: VirtioPciCap,
903     doorbell_off_multiplier: Le32,
904 }
905 // It is safe to implement DataInit; `VirtioPciCap` implements DataInit and for
906 // Le32 any value is valid.
907 unsafe impl DataInit for VirtioPciDoorbellCap {}
908 
909 impl PciCapability for VirtioPciDoorbellCap {
bytes(&self) -> &[u8]910     fn bytes(&self) -> &[u8] {
911         self.as_slice()
912     }
913 
914     // TODO: What should this be.
id(&self) -> PciCapabilityID915     fn id(&self) -> PciCapabilityID {
916         PciCapabilityID::VendorSpecific
917     }
918 
919     // TODO: What should this be.
writable_bits(&self) -> Vec<u32>920     fn writable_bits(&self) -> Vec<u32> {
921         vec![0u32; 4]
922     }
923 }
924 
925 impl VirtioPciDoorbellCap {
new(cap: VirtioPciCap, doorbell_off_multiplier: u32) -> Self926     pub fn new(cap: VirtioPciCap, doorbell_off_multiplier: u32) -> Self {
927         VirtioPciDoorbellCap {
928             cap,
929             doorbell_off_multiplier: Le32::from(doorbell_off_multiplier),
930         }
931     }
932 }
933 
934 // Used to store parameters passed in the |activate| function.
935 struct ActivateParams {
936     mem: GuestMemory,
937     interrupt: Interrupt,
938     queues: Vec<Queue>,
939     queue_evts: Vec<Event>,
940 }
941 
942 pub struct VirtioVhostUser {
943     base_features: u64,
944 
945     // Represents the amount of memory to be mapped for a sibling VM. Each
946     // Virtio Vhost User Slave implementation requires access to the entire sibling
947     // memory.
948     //
949     // TODO(abhishekbh): Understand why shared memory region size and overall bar
950     // size differ in the QEMU implementation.
951     device_bar_size: u64,
952 
953     // Bound socket waiting to accept a socket connection from the Vhost-user
954     // sibling.
955     listener: Option<UnixListener>,
956 
957     // Device configuration.
958     config: VirtioVhostUserConfig,
959 
960     kill_evt: Option<Event>,
961     worker_thread: Option<thread::JoinHandle<Result<Worker>>>,
962 
963     // The bar representing the doorbell, notification and shared memory regions.
964     pci_bar: Option<Alloc>,
965     // The device backend queue index selected by the driver by writing to the
966     // Notifications region at offset `NOTIFICATIONS_MSIX_VECTOR_SELECT_OFFSET`
967     // in the bar. This points into `notification_msix_vectors`.
968     notification_select: Option<u16>,
969     // Stores msix vectors corresponding to each device backend queue.
970     notification_msix_vectors: [Option<u16>; MAX_VHOST_DEVICE_QUEUES],
971 
972     // Cache for params stored in |activate|.
973     activate_params: Option<ActivateParams>,
974 
975     // Is Vhost-user sibling connected.
976     sibling_connected: bool,
977 
978     // To communicate with the main process.
979     main_process_tube: Option<Tube>,
980 
981     // To communicate with the worker thread.
982     worker_thread_tube: Option<Tube>,
983 
984     // PCI address that this device needs to be allocated if specified.
985     pci_address: Option<PciAddress>,
986 }
987 
988 impl VirtioVhostUser {
new( base_features: u64, listener: UnixListener, main_process_tube: Tube, pci_address: Option<PciAddress>, uuid: Option<Uuid>, max_sibling_mem_size: u64, ) -> Result<VirtioVhostUser>989     pub fn new(
990         base_features: u64,
991         listener: UnixListener,
992         main_process_tube: Tube,
993         pci_address: Option<PciAddress>,
994         uuid: Option<Uuid>,
995         max_sibling_mem_size: u64,
996     ) -> Result<VirtioVhostUser> {
997         let device_bar_size = max_sibling_mem_size
998             .checked_next_power_of_two()
999             .expect("Sibling too large");
1000 
1001         Ok(VirtioVhostUser {
1002             base_features: base_features | 1 << VIRTIO_F_ACCESS_PLATFORM,
1003             device_bar_size,
1004             listener: Some(listener),
1005             config: VirtioVhostUserConfig {
1006                 status: Le32::from(0),
1007                 max_vhost_queues: Le32::from(MAX_VHOST_DEVICE_QUEUES as u32),
1008                 uuid: *uuid.unwrap_or_default().as_bytes(),
1009             },
1010             kill_evt: None,
1011             worker_thread: None,
1012             pci_bar: None,
1013             notification_select: None,
1014             notification_msix_vectors: [None; MAX_VHOST_DEVICE_QUEUES],
1015             activate_params: None,
1016             sibling_connected: false,
1017             main_process_tube: Some(main_process_tube),
1018             worker_thread_tube: None,
1019             pci_address,
1020         })
1021     }
1022 
check_bar_metadata(&self, bar_index: PciBarIndex) -> Result<()>1023     fn check_bar_metadata(&self, bar_index: PciBarIndex) -> Result<()> {
1024         if bar_index != BAR_INDEX as usize {
1025             return Err(Error::InvalidBar(bar_index));
1026         }
1027 
1028         if self.pci_bar.is_none() {
1029             return Err(Error::BarNotAllocated(bar_index));
1030         }
1031 
1032         Ok(())
1033     }
1034 
1035     // Handles writes to the DOORBELL region of the BAR as per the VVU spec.
write_bar_doorbell(&mut self, offset: u64)1036     fn write_bar_doorbell(&mut self, offset: u64) {
1037         // The |offset| represents the Vring number who call event needs to be
1038         // written to.
1039         let vring = (offset / DOORBELL_OFFSET_MULTIPLIER as u64) as usize;
1040         match &self.worker_thread_tube {
1041             Some(worker_thread_tube) => {
1042                 if let Err(e) = worker_thread_tube.send(&vring) {
1043                     error!("failed to send doorbell write request: {}", e);
1044                 }
1045             }
1046             None => {
1047                 error!("worker thread tube not allocated");
1048             }
1049         }
1050     }
1051 
1052     // Implement writing to the notifications bar as per the VVU spec.
write_bar_notifications(&mut self, offset: u64, data: &[u8])1053     fn write_bar_notifications(&mut self, offset: u64, data: &[u8]) {
1054         if data.len() < std::mem::size_of::<u16>() {
1055             error!("data buffer is too small: {}", data.len());
1056             return;
1057         }
1058 
1059         // The driver will first write to |NOTIFICATIONS_VRING_SELECT_OFFSET| to
1060         // specify which index in `self.notification_msix_vectors` to write to.
1061         // Then it writes the msix vector value by writing to
1062         // `NOTIFICATIONS_MSIX_VECTOR_SELECT_OFFSET`.
1063         let mut dst = [0u8; 2];
1064         dst.copy_from_slice(&data[..2]);
1065         let val = u16::from_le_bytes(dst);
1066         match offset {
1067             NOTIFICATIONS_VRING_SELECT_OFFSET => {
1068                 self.notification_select = Some(val);
1069             }
1070             NOTIFICATIONS_MSIX_VECTOR_SELECT_OFFSET => {
1071                 if let Some(notification_select) = self.notification_select {
1072                     if notification_select as usize >= self.notification_msix_vectors.len() {
1073                         error!("invalid notification select: {}", notification_select);
1074                         return;
1075                     }
1076                     self.notification_msix_vectors[notification_select as usize] =
1077                         if val == VIRTIO_MSI_NO_VECTOR {
1078                             None
1079                         } else {
1080                             Some(val)
1081                         };
1082                 } else {
1083                     error!("no notification select set");
1084                 }
1085             }
1086             _ => {
1087                 error!("invalid notification cfg offset: {}", offset);
1088             }
1089         }
1090     }
1091 
1092     // Implement reading from the notifications bar as per the VVU spec.
read_bar_notifications(&mut self, offset: u64, data: &mut [u8])1093     fn read_bar_notifications(&mut self, offset: u64, data: &mut [u8]) {
1094         if data.len() < std::mem::size_of::<u16>() {
1095             error!("data buffer is too small: {}", data.len());
1096             return;
1097         }
1098 
1099         // The driver will first write to |NOTIFICATIONS_VRING_SELECT_OFFSET| to
1100         // specify which index in |self.notification_msix_vectors| to read from.
1101         // Then it reads the msix vector value by reading from
1102         // |NOTIFICATIONS_MSIX_VECTOR_SELECT_OFFSET|.
1103         // Return 0 if a vector value hasn't been set for the queue.
1104         let mut val = 0;
1105         if offset == NOTIFICATIONS_VRING_SELECT_OFFSET {
1106             val = self.notification_select.unwrap_or(0);
1107         } else if offset == NOTIFICATIONS_MSIX_VECTOR_SELECT_OFFSET {
1108             if let Some(notification_select) = self.notification_select {
1109                 val = self.notification_msix_vectors[notification_select as usize].unwrap_or(0);
1110             } else {
1111                 error!("no notification select set");
1112             }
1113         } else {
1114             error!("invalid notification cfg offset: {}", offset);
1115         }
1116         let d = u16::to_le_bytes(val);
1117         data[..2].copy_from_slice(&d);
1118     }
1119 
1120     // Initializes state and starts the worker thread which will process all messages to this device
1121     // and send out messages in response.
start_worker(&mut self)1122     fn start_worker(&mut self) {
1123         // This function should never be called if this device hasn't been
1124         // activated.
1125         if self.activate_params.is_none() {
1126             panic!("device not activated");
1127         }
1128 
1129         let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) {
1130             Ok(v) => v,
1131             Err(e) => {
1132                 error!("failed creating kill Event pair: {}", e);
1133                 return;
1134             }
1135         };
1136         self.kill_evt = Some(self_kill_evt);
1137 
1138         let listener = self.listener.take().expect("listener should be set");
1139 
1140         let main_process_tube = self.main_process_tube.take().expect("tube missing");
1141 
1142         // Safe because a PCI bar is guaranteed to be allocated at this point.
1143         let pci_bar = self.pci_bar.expect("PCI bar unallocated");
1144 
1145         // Initialize the Worker with the Msix vector values to be injected for
1146         // each Vhost device queue.
1147         let mut vrings: [Vring; MAX_VHOST_DEVICE_QUEUES] = Default::default();
1148         for (i, vring) in vrings.iter_mut().enumerate() {
1149             vring.kick_data = KickData {
1150                 kick_evt: None,
1151                 msi_vector: self.notification_msix_vectors[i],
1152             };
1153         }
1154 
1155         // Create tube to communicate with the worker thread.
1156         let (worker_thread_tube, main_thread_tube) =
1157             Tube::pair().expect("failed to create tube pair");
1158         self.worker_thread_tube = Some(worker_thread_tube);
1159 
1160         // TODO(abhishekbh): Should interrupt.signal_config_changed be called ?
1161         self.sibling_connected = true;
1162         let mut activate_params = self.activate_params.take().unwrap();
1163         // This thread will wait for the sibling to connect and the continuously parse messages from
1164         // the sibling as well as the device (over Virtio).
1165         let worker_result = thread::Builder::new()
1166             .name("virtio_vhost_user".to_string())
1167             .spawn(move || {
1168                 let rx_queue = activate_params.queues.remove(0);
1169                 let tx_queue = activate_params.queues.remove(0);
1170 
1171                 // Block until the connection with the sibling is established. We do this in a
1172                 // thread to avoid blocking the main thread.
1173                 let (socket, _) = listener
1174                     .accept()
1175                     .map_err(Error::FailedToAcceptSiblingConnection)?;
1176 
1177                 // Although this device is relates to Virtio Vhost User but it uses
1178                 // `slave_req_helper` to parse messages from  a Vhost-user sibling.
1179                 // Thus, we need `slave_req_helper` in `Protocol::Regular` mode and not
1180                 // in `Protocol::Virtio' mode.
1181                 let slave_req_helper: SlaveReqHelper<SocketEndpoint<MasterReq>> =
1182                     SlaveReqHelper::new(SocketEndpoint::from(socket), Protocol::Regular);
1183 
1184                 let mut worker = Worker {
1185                     mem: activate_params.mem,
1186                     interrupt: activate_params.interrupt,
1187                     rx_queue,
1188                     tx_queue,
1189                     main_process_tube,
1190                     pci_bar,
1191                     mem_offset: SHARED_MEMORY_OFFSET as usize,
1192                     vrings,
1193                     slave_req_helper,
1194                 };
1195                 let rx_queue_evt = activate_params.queue_evts.remove(0);
1196                 let tx_queue_evt = activate_params.queue_evts.remove(0);
1197                 if let Err(e) = worker.run(rx_queue_evt, tx_queue_evt, main_thread_tube, kill_evt) {
1198                     // TODO(b/216407443): Handle sibling reconnect events.
1199                     error!("worker thread exited: {}", e);
1200                 }
1201                 Ok(worker)
1202             });
1203 
1204         match worker_result {
1205             Err(e) => {
1206                 error!("failed to spawn virtio_vhost_user worker: {}", e);
1207             }
1208             Ok(join_handle) => {
1209                 self.worker_thread = Some(join_handle);
1210             }
1211         }
1212     }
1213 }
1214 
1215 impl Drop for VirtioVhostUser {
drop(&mut self)1216     fn drop(&mut self) {
1217         if let Some(kill_evt) = self.kill_evt.take() {
1218             match kill_evt.write(1) {
1219                 Ok(()) => {
1220                     if let Some(worker_thread) = self.worker_thread.take() {
1221                         // Ignore the result because there is nothing we can do about it.
1222                         let _ = worker_thread.join();
1223                     }
1224                 }
1225                 Err(e) => error!("failed to write kill event: {}", e),
1226             }
1227         }
1228     }
1229 }
1230 
1231 impl VirtioDevice for VirtioVhostUser {
features(&self) -> u641232     fn features(&self) -> u64 {
1233         self.base_features
1234     }
1235 
keep_rds(&self) -> Vec<RawDescriptor>1236     fn keep_rds(&self) -> Vec<RawDescriptor> {
1237         let mut rds = Vec::new();
1238 
1239         if let Some(rd) = &self.listener {
1240             rds.push(rd.as_raw_descriptor());
1241         }
1242 
1243         if let Some(rd) = &self.kill_evt {
1244             rds.push(rd.as_raw_descriptor());
1245         }
1246 
1247         if let Some(rd) = &self.main_process_tube {
1248             rds.push(rd.as_raw_descriptor());
1249         }
1250 
1251         // `self.worker_thread_tube` is set after a fork / keep_rds is called in multiprocess mode.
1252         // Hence, it's not required to be processed in this function.
1253         rds
1254     }
1255 
device_type(&self) -> u321256     fn device_type(&self) -> u32 {
1257         TYPE_VHOST_USER
1258     }
1259 
queue_max_sizes(&self) -> &[u16]1260     fn queue_max_sizes(&self) -> &[u16] {
1261         PROXY_DEVICE_QUEUE_SIZES
1262     }
1263 
num_interrupts(&self) -> usize1264     fn num_interrupts(&self) -> usize {
1265         // The total interrupts include both this device's interrupts as well as
1266         // the VVU device related interrupt.
1267         NUM_PROXY_DEVICE_QUEUES + MAX_VHOST_DEVICE_QUEUES
1268     }
1269 
read_config(&self, offset: u64, data: &mut [u8])1270     fn read_config(&self, offset: u64, data: &mut [u8]) {
1271         copy_config(
1272             data,
1273             0, /* dst_offset */
1274             self.config.as_slice(),
1275             offset,
1276         );
1277     }
1278 
write_config(&mut self, offset: u64, data: &[u8])1279     fn write_config(&mut self, offset: u64, data: &[u8]) {
1280         copy_config(
1281             self.config.as_mut_slice(),
1282             offset,
1283             data,
1284             0, /* src_offset */
1285         );
1286 
1287         // The driver has indicated that it's safe for the Vhost-user sibling to
1288         // initiate a connection and send data over.
1289         if self.config.is_slave_up() && !self.sibling_connected {
1290             self.start_worker();
1291         }
1292     }
1293 
get_device_caps(&self) -> Vec<Box<dyn crate::pci::PciCapability>>1294     fn get_device_caps(&self) -> Vec<Box<dyn crate::pci::PciCapability>> {
1295         // Allocate capabilities as per sections 5.7.7.5, 5.7.7.6, 5.7.7.7 of
1296         // the link at the top of the file. The PCI bar is organized in the
1297         // following format |Doorbell|Notification|Shared Memory|.
1298         let mut doorbell_virtio_pci_cap = VirtioPciCap::new(
1299             PciCapabilityType::DoorbellConfig,
1300             BAR_INDEX,
1301             DOORBELL_OFFSET as u32,
1302             DOORBELL_SIZE as u32,
1303         );
1304         doorbell_virtio_pci_cap.set_cap_len(std::mem::size_of::<VirtioPciDoorbellCap>() as u8);
1305         let doorbell = Box::new(VirtioPciDoorbellCap::new(
1306             doorbell_virtio_pci_cap,
1307             DOORBELL_OFFSET_MULTIPLIER,
1308         ));
1309 
1310         let notification = Box::new(VirtioPciCap::new(
1311             PciCapabilityType::NotificationConfig,
1312             BAR_INDEX,
1313             NOTIFICATIONS_OFFSET as u32,
1314             NOTIFICATIONS_SIZE as u32,
1315         ));
1316 
1317         let shared_memory = Box::new(VirtioPciCap::new(
1318             PciCapabilityType::SharedMemoryConfig,
1319             BAR_INDEX,
1320             SHARED_MEMORY_OFFSET as u32,
1321             SHARED_MEMORY_SIZE as u32,
1322         ));
1323 
1324         vec![doorbell, notification, shared_memory]
1325     }
1326 
get_device_bars(&mut self, address: PciAddress) -> Vec<PciBarConfiguration>1327     fn get_device_bars(&mut self, address: PciAddress) -> Vec<PciBarConfiguration> {
1328         // A PCI bar corresponding to |Doorbell|Notification|Shared Memory| will
1329         // be allocated and its address (64 bit) will be stored in BAR 2 and BAR
1330         // 3. This is as per the VVU spec and qemu implementation.
1331         self.pci_bar = Some(Alloc::PciBar {
1332             bus: address.bus,
1333             dev: address.dev,
1334             func: address.func,
1335             bar: BAR_INDEX,
1336         });
1337 
1338         vec![PciBarConfiguration::new(
1339             BAR_INDEX as usize,
1340             self.device_bar_size,
1341             PciBarRegionType::Memory64BitRegion,
1342             // NotPrefetchable so as to exit on every read / write event in the
1343             // guest.
1344             PciBarPrefetchable::NotPrefetchable,
1345         )]
1346     }
1347 
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, queues: Vec<Queue>, queue_evts: Vec<Event>, )1348     fn activate(
1349         &mut self,
1350         mem: GuestMemory,
1351         interrupt: Interrupt,
1352         queues: Vec<Queue>,
1353         queue_evts: Vec<Event>,
1354     ) {
1355         if queues.len() != NUM_PROXY_DEVICE_QUEUES || queue_evts.len() != NUM_PROXY_DEVICE_QUEUES {
1356             error!("bad queue length: {} {}", queues.len(), queue_evts.len());
1357             return;
1358         }
1359 
1360         // Cache these to be used later in the `start_worker` function.
1361         self.activate_params = Some(ActivateParams {
1362             mem,
1363             interrupt,
1364             queues,
1365             queue_evts,
1366         });
1367     }
1368 
read_bar(&mut self, bar_index: PciBarIndex, offset: u64, data: &mut [u8])1369     fn read_bar(&mut self, bar_index: PciBarIndex, offset: u64, data: &mut [u8]) {
1370         if let Err(e) = self.check_bar_metadata(bar_index) {
1371             error!("invalid bar metadata: {}", e);
1372             return;
1373         }
1374 
1375         if (NOTIFICATIONS_OFFSET..SHARED_MEMORY_OFFSET).contains(&offset) {
1376             self.read_bar_notifications(offset - NOTIFICATIONS_OFFSET, data);
1377         } else {
1378             error!("addr is outside known region for reads");
1379         }
1380     }
1381 
write_bar(&mut self, bar_index: PciBarIndex, offset: u64, data: &[u8])1382     fn write_bar(&mut self, bar_index: PciBarIndex, offset: u64, data: &[u8]) {
1383         if let Err(e) = self.check_bar_metadata(bar_index) {
1384             error!("invalid bar metadata: {}", e);
1385             return;
1386         }
1387 
1388         if (DOORBELL_OFFSET..NOTIFICATIONS_OFFSET).contains(&offset) {
1389             self.write_bar_doorbell(offset - DOORBELL_OFFSET);
1390         } else if (NOTIFICATIONS_OFFSET..SHARED_MEMORY_OFFSET).contains(&offset) {
1391             self.write_bar_notifications(offset - NOTIFICATIONS_OFFSET, data);
1392         } else {
1393             error!("addr is outside known region for writes");
1394         }
1395     }
1396 
reset(&mut self) -> bool1397     fn reset(&mut self) -> bool {
1398         if let Some(kill_evt) = self.kill_evt.take() {
1399             if let Err(e) = kill_evt.write(1) {
1400                 error!("failed to notify the kill event: {}", e);
1401                 return false;
1402             }
1403         }
1404 
1405         if let Some(worker_thread) = self.worker_thread.take() {
1406             if let Err(e) = worker_thread.join() {
1407                 error!("failed to get back resources: {:?}", e);
1408                 return false;
1409             }
1410         }
1411 
1412         // TODO(abhishekbh): Disconnect from sibling and reset
1413         // `sibling_connected`.
1414         false
1415     }
1416 
pci_address(&self) -> Option<PciAddress>1417     fn pci_address(&self) -> Option<PciAddress> {
1418         self.pci_address
1419     }
1420 }
1421