1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use anyhow::anyhow;
6 use anyhow::Context;
7 use anyhow::Result;
8 use base::info;
9 use base::named_pipes::BlockingMode;
10 use base::named_pipes::FramingMode;
11 use base::named_pipes::PipeConnection;
12 use base::CloseNotifier;
13 use base::Event;
14 use base::RawDescriptor;
15 use base::ReadNotifier;
16 use base::Tube;
17 use cros_async::AsyncResult;
18 use cros_async::EventAsync;
19 use cros_async::Executor;
20 use futures::pin_mut;
21 use futures::select;
22 use futures::FutureExt;
23 use tube_transporter::TubeTransferDataList;
24 use tube_transporter::TubeTransporterReader;
25 use vmm_vhost::message::FrontendReq;
26 use vmm_vhost::message::VhostUserMsgHeader;
27 use vmm_vhost::BackendServer;
28 use vmm_vhost::Connection;
29
read_from_tube_transporter( raw_transport_tube: RawDescriptor, ) -> anyhow::Result<TubeTransferDataList>30 pub fn read_from_tube_transporter(
31 raw_transport_tube: RawDescriptor,
32 ) -> anyhow::Result<TubeTransferDataList> {
33 let tube_transporter = TubeTransporterReader::create_tube_transporter_reader(
34 // SAFETY:
35 // Safe because we know that raw_transport_tube is valid (passed by inheritance), and that
36 // the blocking & framing modes are accurate because we create them ourselves in the
37 // broker.
38 unsafe {
39 PipeConnection::from_raw_descriptor(
40 raw_transport_tube,
41 FramingMode::Message,
42 BlockingMode::Wait,
43 )
44 },
45 );
46
47 tube_transporter.read_tubes().map_err(anyhow::Error::msg)
48 }
49
50 /// Runs the generic handler over a given vhost-user device backend.
run_handler( handler: Box<dyn vmm_vhost::Backend>, vhost_user_tube: Tube, exit_event: Event, ex: &Executor, ) -> Result<()>51 pub async fn run_handler(
52 handler: Box<dyn vmm_vhost::Backend>,
53 vhost_user_tube: Tube,
54 exit_event: Event,
55 ex: &Executor,
56 ) -> Result<()> {
57 let read_notifier = vhost_user_tube
58 .get_read_notifier_event()
59 .try_clone()
60 .context("failed to clone event")?;
61 let close_notifier = vhost_user_tube
62 .get_close_notifier_event()
63 .try_clone()
64 .context("failed to clone event")?;
65
66 let read_event = EventAsync::new_without_reset(read_notifier, ex)
67 .context("failed to create an async event")?;
68 let close_event = EventAsync::new_without_reset(close_notifier, ex)
69 .context("failed to create an async event")?;
70 let exit_event = EventAsync::new(exit_event, ex).context("failed to create an async event")?;
71
72 let mut backend_server = BackendServer::new(Connection::from(vhost_user_tube), handler);
73
74 let read_event_fut = read_event.next_val().fuse();
75 let close_event_fut = close_event.next_val().fuse();
76 let exit_event_fut = exit_event.next_val().fuse();
77 pin_mut!(read_event_fut);
78 pin_mut!(close_event_fut);
79 pin_mut!(exit_event_fut);
80
81 let mut pending_header: Option<(VhostUserMsgHeader<FrontendReq>, Vec<std::fs::File>)> = None;
82 loop {
83 select! {
84 _read_res = read_event_fut => {
85 match pending_header.take() {
86 None => {
87 let (hdr, files) = backend_server
88 .recv_header()
89 .context("failed to handle a vhost-user request")?;
90 if backend_server.needs_wait_for_payload(&hdr) {
91 // Wait for the message body being notified.
92 pending_header = Some((hdr, files));
93 } else {
94 backend_server
95 .process_message(hdr, files)
96 .context("failed to handle a vhost-user request")?;
97 }
98 }
99 Some((hdr, files)) => {
100 backend_server
101 .process_message(hdr, files)
102 .context("failed to handle a vhost-user request")?;
103 }
104 }
105 read_event_fut.set(read_event.next_val().fuse());
106 }
107 // Tube closed event.
108 _close_res = close_event_fut => {
109 info!("exit run loop: got close event");
110 return Ok(())
111 }
112 // Broker exit event.
113 _exit_res = exit_event_fut => {
114 info!("exit run loop: got exit event");
115 return Ok(())
116 }
117 }
118 }
119 }
120