1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use anyhow::Context;
6 use anyhow::Result;
7 use base::info;
8 use base::named_pipes::BlockingMode;
9 use base::named_pipes::FramingMode;
10 use base::named_pipes::PipeConnection;
11 use base::CloseNotifier;
12 use base::Event;
13 use base::RawDescriptor;
14 use base::ReadNotifier;
15 use base::Tube;
16 use cros_async::EventAsync;
17 use cros_async::Executor;
18 use futures::pin_mut;
19 use futures::select;
20 use futures::FutureExt;
21 use tube_transporter::TubeTransferDataList;
22 use tube_transporter::TubeTransporterReader;
23 use vmm_vhost::message::MasterReq;
24 use vmm_vhost::message::VhostUserMsgHeader;
25 use vmm_vhost::SlaveReqHandler;
26 use vmm_vhost::VhostUserSlaveReqHandler;
27
28 use crate::virtio::vhost::user::device::handler::CallEvent;
29 use crate::virtio::vhost::user::device::handler::DeviceRequestHandler;
30 use crate::virtio::vhost::user::device::handler::VhostUserRegularOps;
31
32 pub type Doorbell = CallEvent;
33
read_from_tube_transporter( raw_transport_tube: RawDescriptor, ) -> anyhow::Result<TubeTransferDataList>34 pub fn read_from_tube_transporter(
35 raw_transport_tube: RawDescriptor,
36 ) -> anyhow::Result<TubeTransferDataList> {
37 // Safe because we know that raw_transport_tube is valid (passed by inheritance), and that
38 // the blocking & framing modes are accurate because we create them ourselves in the broker.
39 let tube_transporter = TubeTransporterReader::create_tube_transporter_reader(unsafe {
40 PipeConnection::from_raw_descriptor(
41 raw_transport_tube,
42 FramingMode::Message,
43 BlockingMode::Wait,
44 )
45 });
46
47 tube_transporter.read_tubes().map_err(anyhow::Error::msg)
48 }
49
run_handler( handler: Box<dyn VhostUserSlaveReqHandler>, vhost_user_tube: Tube, exit_event: Event, ex: &Executor, ) -> Result<()>50 pub async fn run_handler(
51 handler: Box<dyn VhostUserSlaveReqHandler>,
52 vhost_user_tube: Tube,
53 exit_event: Event,
54 ex: &Executor,
55 ) -> Result<()> {
56 let read_notifier = vhost_user_tube.get_read_notifier();
57 let close_notifier = vhost_user_tube.get_close_notifier();
58
59 let read_event = EventAsync::clone_raw_without_reset(read_notifier, ex)
60 .context("failed to create an async event")?;
61 let close_event = EventAsync::clone_raw_without_reset(close_notifier, ex)
62 .context("failed to create an async event")?;
63 let exit_event = EventAsync::new(exit_event, ex).context("failed to create an async event")?;
64
65 let mut req_handler = SlaveReqHandler::from_stream(vhost_user_tube, handler);
66
67 let read_event_fut = read_event.next_val().fuse();
68 let close_event_fut = close_event.next_val().fuse();
69 let exit_event_fut = exit_event.next_val().fuse();
70 pin_mut!(read_event_fut);
71 pin_mut!(close_event_fut);
72 pin_mut!(exit_event_fut);
73
74 let mut pending_header: Option<(VhostUserMsgHeader<MasterReq>, Option<Vec<std::fs::File>>)> =
75 None;
76 loop {
77 select! {
78 _read_res = read_event_fut => {
79 match pending_header.take() {
80 None => {
81 let (hdr, files) = req_handler
82 .recv_header()
83 .context("failed to handle a vhost-user request")?;
84 if req_handler.needs_wait_for_payload(&hdr) {
85 // Wait for the message body being notified.
86 pending_header = Some((hdr, files));
87 } else {
88 req_handler
89 .process_message(hdr, files)
90 .context("failed to handle a vhost-user request")?;
91 }
92 }
93 Some((hdr, files)) => {
94 req_handler
95 .process_message(hdr, files)
96 .context("failed to handle a vhost-user request")?;
97 }
98 }
99 read_event_fut.set(read_event.next_val().fuse());
100 }
101 // Tube closed event.
102 _close_res = close_event_fut => {
103 info!("exit run loop: got close event");
104 return Ok(())
105 }
106 // Broker exit event.
107 _exit_res = exit_event_fut => {
108 info!("exit run loop: got exit event");
109 return Ok(())
110 }
111 }
112 }
113 }
114
115 #[cfg(test)]
116 mod tests {
117 use std::sync::Barrier;
118
119 use super::*;
120 use crate::virtio::vhost::user::device::handler::tests::*;
121 use crate::virtio::vhost::user::device::handler::VhostUserRegularOps;
122 use crate::virtio::vhost::user::device::handler::*;
123 use crate::virtio::vhost::user::vmm::VhostUserHandler;
124 #[test]
test_vhost_user_activate()125 fn test_vhost_user_activate() {
126 const QUEUES_NUM: usize = 2;
127
128 let (dev_tube, main_tube) = Tube::pair().unwrap();
129
130 let vmm_bar = Arc::new(Barrier::new(2));
131 let dev_bar = vmm_bar.clone();
132
133 std::thread::spawn(move || {
134 // VMM side
135 let allow_features = VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
136 let init_features = VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
137 let allow_protocol_features = VhostUserProtocolFeatures::CONFIG;
138
139 let mut vmm_handler = VhostUserHandler::new_from_connection(
140 main_tube,
141 QUEUES_NUM as u64,
142 allow_features,
143 init_features,
144 allow_protocol_features,
145 )
146 .unwrap();
147
148 vmm_handler_send_requests(&mut vmm_handler, QUEUES_NUM);
149
150 vmm_bar.wait();
151 });
152
153 // Device side
154 let backend = std::sync::Mutex::new(DeviceRequestHandler::new(
155 Box::new(FakeBackend::new()),
156 Box::new(VhostUserRegularOps),
157 ));
158
159 let mut req_handler = SlaveReqHandler::from_stream(dev_tube, backend);
160
161 test_handle_requests(&mut req_handler, QUEUES_NUM);
162
163 dev_bar.wait();
164 }
165 }
166