• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use anyhow::Context;
6 use anyhow::Result;
7 use base::info;
8 use base::named_pipes::BlockingMode;
9 use base::named_pipes::FramingMode;
10 use base::named_pipes::PipeConnection;
11 use base::CloseNotifier;
12 use base::Event;
13 use base::RawDescriptor;
14 use base::ReadNotifier;
15 use base::Tube;
16 use cros_async::EventAsync;
17 use cros_async::Executor;
18 use futures::pin_mut;
19 use futures::select;
20 use futures::FutureExt;
21 use tube_transporter::TubeTransferDataList;
22 use tube_transporter::TubeTransporterReader;
23 use vmm_vhost::message::MasterReq;
24 use vmm_vhost::message::VhostUserMsgHeader;
25 use vmm_vhost::SlaveReqHandler;
26 use vmm_vhost::VhostUserSlaveReqHandler;
27 
28 use crate::virtio::vhost::user::device::handler::CallEvent;
29 use crate::virtio::vhost::user::device::handler::DeviceRequestHandler;
30 use crate::virtio::vhost::user::device::handler::VhostUserRegularOps;
31 
32 pub type Doorbell = CallEvent;
33 
read_from_tube_transporter( raw_transport_tube: RawDescriptor, ) -> anyhow::Result<TubeTransferDataList>34 pub fn read_from_tube_transporter(
35     raw_transport_tube: RawDescriptor,
36 ) -> anyhow::Result<TubeTransferDataList> {
37     // Safe because we know that raw_transport_tube is valid (passed by inheritance), and that
38     // the blocking & framing modes are accurate because we create them ourselves in the broker.
39     let tube_transporter = TubeTransporterReader::create_tube_transporter_reader(unsafe {
40         PipeConnection::from_raw_descriptor(
41             raw_transport_tube,
42             FramingMode::Message,
43             BlockingMode::Wait,
44         )
45     });
46 
47     tube_transporter.read_tubes().map_err(anyhow::Error::msg)
48 }
49 
run_handler( handler: Box<dyn VhostUserSlaveReqHandler>, vhost_user_tube: Tube, exit_event: Event, ex: &Executor, ) -> Result<()>50 pub async fn run_handler(
51     handler: Box<dyn VhostUserSlaveReqHandler>,
52     vhost_user_tube: Tube,
53     exit_event: Event,
54     ex: &Executor,
55 ) -> Result<()> {
56     let read_notifier = vhost_user_tube.get_read_notifier();
57     let close_notifier = vhost_user_tube.get_close_notifier();
58 
59     let read_event = EventAsync::clone_raw_without_reset(read_notifier, ex)
60         .context("failed to create an async event")?;
61     let close_event = EventAsync::clone_raw_without_reset(close_notifier, ex)
62         .context("failed to create an async event")?;
63     let exit_event = EventAsync::new(exit_event, ex).context("failed to create an async event")?;
64 
65     let mut req_handler = SlaveReqHandler::from_stream(vhost_user_tube, handler);
66 
67     let read_event_fut = read_event.next_val().fuse();
68     let close_event_fut = close_event.next_val().fuse();
69     let exit_event_fut = exit_event.next_val().fuse();
70     pin_mut!(read_event_fut);
71     pin_mut!(close_event_fut);
72     pin_mut!(exit_event_fut);
73 
74     let mut pending_header: Option<(VhostUserMsgHeader<MasterReq>, Option<Vec<std::fs::File>>)> =
75         None;
76     loop {
77         select! {
78             _read_res = read_event_fut => {
79                 match pending_header.take() {
80                     None => {
81                         let (hdr, files) = req_handler
82                             .recv_header()
83                             .context("failed to handle a vhost-user request")?;
84                         if req_handler.needs_wait_for_payload(&hdr) {
85                             // Wait for the message body being notified.
86                             pending_header = Some((hdr, files));
87                         } else {
88                             req_handler
89                                 .process_message(hdr, files)
90                                 .context("failed to handle a vhost-user request")?;
91                         }
92                     }
93                     Some((hdr, files)) => {
94                         req_handler
95                             .process_message(hdr, files)
96                             .context("failed to handle a vhost-user request")?;
97                     }
98                 }
99                 read_event_fut.set(read_event.next_val().fuse());
100             }
101             // Tube closed event.
102             _close_res = close_event_fut => {
103                 info!("exit run loop: got close event");
104                 return Ok(())
105             }
106             // Broker exit event.
107             _exit_res = exit_event_fut => {
108                 info!("exit run loop: got exit event");
109                 return Ok(())
110             }
111         }
112     }
113 }
114 
115 #[cfg(test)]
116 mod tests {
117     use std::sync::Barrier;
118 
119     use super::*;
120     use crate::virtio::vhost::user::device::handler::tests::*;
121     use crate::virtio::vhost::user::device::handler::VhostUserRegularOps;
122     use crate::virtio::vhost::user::device::handler::*;
123     use crate::virtio::vhost::user::vmm::VhostUserHandler;
124     #[test]
test_vhost_user_activate()125     fn test_vhost_user_activate() {
126         const QUEUES_NUM: usize = 2;
127 
128         let (dev_tube, main_tube) = Tube::pair().unwrap();
129 
130         let vmm_bar = Arc::new(Barrier::new(2));
131         let dev_bar = vmm_bar.clone();
132 
133         std::thread::spawn(move || {
134             // VMM side
135             let allow_features = VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
136             let init_features = VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
137             let allow_protocol_features = VhostUserProtocolFeatures::CONFIG;
138 
139             let mut vmm_handler = VhostUserHandler::new_from_connection(
140                 main_tube,
141                 QUEUES_NUM as u64,
142                 allow_features,
143                 init_features,
144                 allow_protocol_features,
145             )
146             .unwrap();
147 
148             vmm_handler_send_requests(&mut vmm_handler, QUEUES_NUM);
149 
150             vmm_bar.wait();
151         });
152 
153         // Device side
154         let backend = std::sync::Mutex::new(DeviceRequestHandler::new(
155             Box::new(FakeBackend::new()),
156             Box::new(VhostUserRegularOps),
157         ));
158 
159         let mut req_handler = SlaveReqHandler::from_stream(dev_tube, backend);
160 
161         test_handle_requests(&mut req_handler, QUEUES_NUM);
162 
163         dev_bar.wait();
164     }
165 }
166