1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::collections::HashMap;
8 use std::fmt;
9 use std::fmt::Display;
10 use std::io;
11 use std::io::Read;
12 use std::io::Write;
13 use std::os::windows::io::RawHandle;
14 use std::rc::Rc;
15 use std::result;
16 use std::sync::Arc;
17 use std::thread;
18
19 use anyhow::anyhow;
20 use anyhow::Context;
21 use base::error;
22 use base::info;
23 use base::named_pipes;
24 use base::named_pipes::BlockingMode;
25 use base::named_pipes::FramingMode;
26 use base::named_pipes::OverlappedWrapper;
27 use base::named_pipes::PipeConnection;
28 use base::warn;
29 use base::AsRawDescriptor;
30 use base::Error as SysError;
31 use base::Event;
32 use base::EventExt;
33 use base::WorkerThread;
34 use cros_async::select3;
35 use cros_async::select6;
36 use cros_async::sync::RwLock;
37 use cros_async::AsyncError;
38 use cros_async::EventAsync;
39 use cros_async::Executor;
40 use cros_async::SelectResult;
41 use data_model::Le32;
42 use data_model::Le64;
43 use futures::channel::mpsc;
44 use futures::channel::oneshot;
45 use futures::pin_mut;
46 use futures::select;
47 use futures::select_biased;
48 use futures::stream::FuturesUnordered;
49 use futures::FutureExt;
50 use futures::SinkExt;
51 use futures::StreamExt;
52 use remain::sorted;
53 use serde::Deserialize;
54 use serde::Serialize;
55 use snapshot::AnySnapshot;
56 use thiserror::Error as ThisError;
57 use vm_memory::GuestMemory;
58 use zerocopy::FromBytes;
59 use zerocopy::FromZeros;
60 use zerocopy::IntoBytes;
61
62 use crate::virtio::async_utils;
63 use crate::virtio::copy_config;
64 use crate::virtio::create_stop_oneshot;
65 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_config;
66 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_event;
67 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_hdr;
68 use crate::virtio::vsock::sys::windows::protocol::vsock_op;
69 use crate::virtio::vsock::sys::windows::protocol::TYPE_STREAM_SOCKET;
70 use crate::virtio::DescriptorChain;
71 use crate::virtio::DeviceType;
72 use crate::virtio::Interrupt;
73 use crate::virtio::Queue;
74 use crate::virtio::StoppedWorker;
75 use crate::virtio::VirtioDevice;
76 use crate::virtio::Writer;
77 use crate::Suspendable;
78
79 #[sorted]
80 #[derive(ThisError, Debug)]
81 pub enum VsockError {
82 #[error("Failed to await next descriptor chain from queue: {0}")]
83 AwaitQueue(AsyncError),
84 #[error("OverlappedWrapper error.")]
85 BadOverlappedWrapper,
86 #[error("Failed to clone descriptor: {0}")]
87 CloneDescriptor(SysError),
88 #[error("Failed to create EventAsync: {0}")]
89 CreateEventAsync(AsyncError),
90 #[error("Failed to create wait context: {0}")]
91 CreateWaitContext(SysError),
92 #[error("Failed to read queue: {0}")]
93 ReadQueue(io::Error),
94 #[error("Failed to reset event object: {0}")]
95 ResetEventObject(SysError),
96 #[error("Failed to run executor: {0}")]
97 RunExecutor(AsyncError),
98 #[error("Failed to write to pipe, port {0}: {1}")]
99 WriteFailed(PortPair, io::Error),
100 #[error("Failed to write queue: {0}")]
101 WriteQueue(io::Error),
102 }
103 pub type Result<T> = result::Result<T, VsockError>;
104
105 // Vsock has three virt IO queues: rx, tx, and event.
106 const QUEUE_SIZE: u16 = 256;
107 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE, QUEUE_SIZE];
108 // We overload port numbers so that if message is to be received from
109 // CONNECTION_EVENT_PORT_NUM (invalid port number), we recognize that a
110 // new connection was set up.
111 const CONNECTION_EVENT_PORT_NUM: u32 = u32::MAX;
112
113 /// Number of bytes in a kilobyte. Used to simplify and clarify buffer size definitions.
114 const KB: usize = 1024;
115
116 /// Size of the buffer we read into from the host side named pipe. Note that data flows from the
117 /// host pipe -> this buffer -> rx queue.
118 /// This should be large enough to facilitate fast transmission of host data, see b/232950349.
119 const TEMP_READ_BUF_SIZE_BYTES: usize = 4 * KB;
120
121 /// In the case where the host side named pipe does not have a specified buffer size, we'll default
122 /// to telling the guest that this is the amount of extra RX space available (e.g. buf_alloc).
123 /// This should be larger than the volume of data that the guest will generally send at one time to
124 /// minimize credit update packtes (see MIN_FREE_BUFFER_PCT below).
125 const DEFAULT_BUF_ALLOC_BYTES: usize = 128 * KB;
126
127 /// Minimum free buffer threshold to notify the peer with a credit update
128 /// message. This is in case we are ingesting many messages without an
129 /// opportunity to send a message back to the peer with a buffer size update.
130 /// This value is a percentage of `buf_alloc`.
131 /// TODO(b/204246759): This value was chosen without much more thought than "it
132 /// works". It should probably be adjusted, along with DEFAULT_BUF_ALLOC, to a
133 /// value that makes empirical sense for the packet sizes that we expect to
134 /// receive.
135 /// TODO(b/239848326): Replace float with integer, in order to remove risk
136 /// of losing precision. Ie. change to `10` and perform
137 /// `FOO * MIN_FREE_BUFFER_PCT / 100`
138 const MIN_FREE_BUFFER_PCT: f64 = 0.1;
139
140 // Number of packets to buffer in the tx processing channels.
141 const CHANNEL_SIZE: usize = 256;
142
143 type VsockConnectionMap = RwLock<HashMap<PortPair, VsockConnection>>;
144
145 /// Virtio device for exposing entropy to the guest OS through virtio.
146 pub struct Vsock {
147 guest_cid: u64,
148 host_guid: Option<String>,
149 features: u64,
150 acked_features: u64,
151 worker_thread: Option<WorkerThread<Option<(PausedQueues, VsockConnectionMap)>>>,
152 /// Stores any active connections when the device sleeps. This allows us to sleep/wake
153 /// without disrupting active connections, which is useful when taking a snapshot.
154 sleeping_connections: Option<VsockConnectionMap>,
155 /// If true, we should send a TRANSPORT_RESET event to the guest at the next opportunity.
156 /// Used to inform the guest all connections are broken when we restore a snapshot.
157 needs_transport_reset: bool,
158 }
159
160 /// Snapshotted state of Vsock. These fields are serialized in order to validate they haven't
161 /// changed when this device is restored.
162 #[derive(Serialize, Deserialize)]
163 struct VsockSnapshot {
164 guest_cid: u64,
165 features: u64,
166 acked_features: u64,
167 }
168
169 impl Vsock {
new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock>170 pub fn new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock> {
171 Ok(Vsock {
172 guest_cid,
173 host_guid,
174 features: base_features,
175 acked_features: 0,
176 worker_thread: None,
177 sleeping_connections: None,
178 needs_transport_reset: false,
179 })
180 }
181
get_config(&self) -> virtio_vsock_config182 fn get_config(&self) -> virtio_vsock_config {
183 virtio_vsock_config {
184 guest_cid: Le64::from(self.guest_cid),
185 }
186 }
187
stop_worker(&mut self) -> StoppedWorker<(PausedQueues, VsockConnectionMap)>188 fn stop_worker(&mut self) -> StoppedWorker<(PausedQueues, VsockConnectionMap)> {
189 if let Some(worker_thread) = self.worker_thread.take() {
190 if let Some(queues_and_conns) = worker_thread.stop() {
191 StoppedWorker::WithQueues(Box::new(queues_and_conns))
192 } else {
193 StoppedWorker::MissingQueues
194 }
195 } else {
196 StoppedWorker::AlreadyStopped
197 }
198 }
199
start_worker( &mut self, mem: GuestMemory, mut queues: VsockQueues, existing_connections: Option<VsockConnectionMap>, ) -> anyhow::Result<()>200 fn start_worker(
201 &mut self,
202 mem: GuestMemory,
203 mut queues: VsockQueues,
204 existing_connections: Option<VsockConnectionMap>,
205 ) -> anyhow::Result<()> {
206 let rx_queue = queues.rx;
207 let tx_queue = queues.tx;
208 let event_queue = queues.event;
209
210 let host_guid = self.host_guid.clone();
211 let guest_cid = self.guest_cid;
212 let needs_transport_reset = self.needs_transport_reset;
213 self.needs_transport_reset = false;
214 self.worker_thread = Some(WorkerThread::start(
215 "userspace_virtio_vsock",
216 move |kill_evt| {
217 let mut worker = Worker::new(
218 mem,
219 host_guid,
220 guest_cid,
221 existing_connections,
222 needs_transport_reset,
223 );
224 let result = worker.run(rx_queue, tx_queue, event_queue, kill_evt);
225
226 match result {
227 Err(e) => {
228 error!("userspace vsock worker thread exited with error: {:?}", e);
229 None
230 }
231 Ok(paused_queues_and_connections_option) => {
232 paused_queues_and_connections_option
233 }
234 }
235 },
236 ));
237
238 Ok(())
239 }
240 }
241
242 impl VirtioDevice for Vsock {
keep_rds(&self) -> Vec<RawHandle>243 fn keep_rds(&self) -> Vec<RawHandle> {
244 Vec::new()
245 }
246
read_config(&self, offset: u64, data: &mut [u8])247 fn read_config(&self, offset: u64, data: &mut [u8]) {
248 copy_config(data, 0, self.get_config().as_bytes(), offset);
249 }
250
device_type(&self) -> DeviceType251 fn device_type(&self) -> DeviceType {
252 DeviceType::Vsock
253 }
254
queue_max_sizes(&self) -> &[u16]255 fn queue_max_sizes(&self) -> &[u16] {
256 QUEUE_SIZES
257 }
258
features(&self) -> u64259 fn features(&self) -> u64 {
260 self.features
261 }
262
ack_features(&mut self, value: u64)263 fn ack_features(&mut self, value: u64) {
264 self.acked_features |= value;
265 }
266
activate( &mut self, mem: GuestMemory, _interrupt: Interrupt, mut queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>267 fn activate(
268 &mut self,
269 mem: GuestMemory,
270 _interrupt: Interrupt,
271 mut queues: BTreeMap<usize, Queue>,
272 ) -> anyhow::Result<()> {
273 if queues.len() != QUEUE_SIZES.len() {
274 return Err(anyhow!(
275 "Failed to activate vsock device. queues.len(): {} != {}",
276 queues.len(),
277 QUEUE_SIZES.len(),
278 ));
279 }
280
281 let vsock_queues = VsockQueues {
282 rx: queues.remove(&0).unwrap(),
283 tx: queues.remove(&1).unwrap(),
284 event: queues.remove(&2).unwrap(),
285 };
286
287 self.start_worker(mem, vsock_queues, None)
288 }
289
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>290 fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
291 match self.stop_worker() {
292 StoppedWorker::WithQueues(paused_queues_and_conns) => {
293 let (queues, sleeping_connections) = *paused_queues_and_conns;
294 self.sleeping_connections = Some(sleeping_connections);
295 Ok(Some(queues.into()))
296 }
297 StoppedWorker::MissingQueues => {
298 anyhow::bail!("vsock queue workers did not stop cleanly")
299 }
300 StoppedWorker::AlreadyStopped => {
301 // The device isn't in the activated state.
302 Ok(None)
303 }
304 }
305 }
306
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>307 fn virtio_wake(
308 &mut self,
309 queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
310 ) -> anyhow::Result<()> {
311 if let Some((mem, _interrupt, queues)) = queues_state {
312 let connections = self.sleeping_connections.take();
313 self.start_worker(
314 mem,
315 queues
316 .try_into()
317 .expect("Failed to convert queue BTreeMap to VsockQueues"),
318 connections,
319 )?;
320 }
321 Ok(())
322 }
323
virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot>324 fn virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
325 AnySnapshot::to_any(VsockSnapshot {
326 guest_cid: self.guest_cid,
327 features: self.features,
328 acked_features: self.acked_features,
329 })
330 .context("failed to serialize vsock snapshot")
331 }
332
virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()>333 fn virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
334 let vsock_snapshot: VsockSnapshot =
335 AnySnapshot::from_any(data).context("error deserializing vsock snapshot")?;
336 anyhow::ensure!(
337 self.guest_cid == vsock_snapshot.guest_cid,
338 "expected guest_cid to match, but they did not. Live: {}, snapshot: {}",
339 self.guest_cid,
340 vsock_snapshot.guest_cid
341 );
342 anyhow::ensure!(
343 self.features == vsock_snapshot.features,
344 "vsock: expected features to match, but they did not. Live: {}, snapshot: {}",
345 self.features,
346 vsock_snapshot.features
347 );
348 self.acked_features = vsock_snapshot.acked_features;
349 self.needs_transport_reset = true;
350
351 Ok(())
352 }
353 }
354
355 #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
356 pub struct PortPair {
357 host: u32,
358 guest: u32,
359 }
360
361 impl Display for PortPair {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result362 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
363 write!(f, "(host port: {}, guest port: {})", self.host, self.guest)
364 }
365 }
366
367 impl PortPair {
from_tx_header(header: &virtio_vsock_hdr) -> PortPair368 fn from_tx_header(header: &virtio_vsock_hdr) -> PortPair {
369 PortPair {
370 host: header.dst_port.to_native(),
371 guest: header.src_port.to_native(),
372 }
373 }
374 }
375
376 // Note: variables herein do not have to be atomic because this struct is guarded
377 // by a RwLock.
378 struct VsockConnection {
379 // The guest port.
380 guest_port: Le32,
381
382 // The actual named (asynchronous) pipe connection.
383 pipe: PipeConnection,
384 // The overlapped struct contains an event object for the named pipe.
385 // This lets us select() on the pipes by waiting on the events.
386 // This is for Reads only.
387 overlapped: Box<OverlappedWrapper>,
388 // Read buffer for the named pipes. These are needed because reads complete
389 // asynchronously.
390 buffer: Box<[u8; TEMP_READ_BUF_SIZE_BYTES]>,
391
392 // Total free-running count of received bytes.
393 recv_cnt: usize,
394
395 // Total free-running count of received bytes that the peer has been informed of.
396 prev_recv_cnt: usize,
397
398 // Total auxiliary buffer space available to receive packets from the driver, not including
399 // the virtqueue itself. For us, this is tx buffer on the named pipe into which we drain
400 // packets for the connection. Note that if the named pipe has a grow on demand TX buffer,
401 // we use DEFAULT_BUF_ALLOC instead.
402 buf_alloc: usize,
403
404 // Peer (driver) total free-running count of received bytes.
405 peer_recv_cnt: usize,
406
407 // Peer (driver) total rx buffer allocated.
408 peer_buf_alloc: usize,
409
410 // Total free-running count of transmitted bytes.
411 tx_cnt: usize,
412
413 // State tracking for full buffer condition. Currently just used for logging. If the peer's
414 // buffer does not have space for a maximum-sized message (TEMP_READ_BUF_SIZE_BYTES), this
415 // gets set to `true`. Once there's enough space in the buffer, this gets unset.
416 is_buffer_full: bool,
417 }
418
419 struct Worker {
420 mem: GuestMemory,
421 host_guid: Option<String>,
422 guest_cid: u64,
423 // Map of host port to a VsockConnection.
424 connections: VsockConnectionMap,
425 connection_event: Event,
426 device_event_queue_tx: mpsc::Sender<virtio_vsock_event>,
427 device_event_queue_rx: Option<mpsc::Receiver<virtio_vsock_event>>,
428 send_protocol_reset: bool,
429 }
430
431 impl Worker {
new( mem: GuestMemory, host_guid: Option<String>, guest_cid: u64, existing_connections: Option<VsockConnectionMap>, send_protocol_reset: bool, ) -> Worker432 fn new(
433 mem: GuestMemory,
434 host_guid: Option<String>,
435 guest_cid: u64,
436 existing_connections: Option<VsockConnectionMap>,
437 send_protocol_reset: bool,
438 ) -> Worker {
439 // Buffer size here is arbitrary, but must be at least one since we need
440 // to be able to write a reset event to the channel when the device
441 // worker is brought up on a VM restore. Note that we send exactly one
442 // message per VM session, so we should never see these messages backing
443 // up.
444 let (device_event_queue_tx, device_event_queue_rx) = mpsc::channel(4);
445
446 Worker {
447 mem,
448 host_guid,
449 guest_cid,
450 connections: existing_connections.unwrap_or_default(),
451 connection_event: Event::new().unwrap(),
452 device_event_queue_tx,
453 device_event_queue_rx: Some(device_event_queue_rx),
454 send_protocol_reset,
455 }
456 }
457
process_rx_queue( &self, recv_queue: Arc<RwLock<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, mut stop_rx: oneshot::Receiver<()>, ) -> Result<()>458 async fn process_rx_queue(
459 &self,
460 recv_queue: Arc<RwLock<Queue>>,
461 mut rx_queue_evt: EventAsync,
462 ex: &Executor,
463 mut stop_rx: oneshot::Receiver<()>,
464 ) -> Result<()> {
465 'connections_changed: loop {
466 // Run continuously until exit evt
467
468 // TODO(b/200810561): Optimize this FuturesUnordered code.
469 // Set up the EventAsyncs to select on
470 let futures = FuturesUnordered::new();
471 // This needs to be its own scope since it holds a RwLock on `self.connections`.
472 {
473 let connections = self.connections.read_lock().await;
474 for (port, connection) in connections.iter() {
475 let h_evt = connection
476 .overlapped
477 .get_h_event_ref()
478 .ok_or_else(|| {
479 error!("Missing h_event in OverlappedWrapper.");
480 VsockError::BadOverlappedWrapper
481 })
482 .unwrap()
483 .try_clone()
484 .map_err(|e| {
485 error!("Could not clone h_event.");
486 VsockError::CloneDescriptor(e)
487 })?;
488 let evt_async = EventAsync::new(h_evt, ex).map_err(|e| {
489 error!("Could not create EventAsync.");
490 VsockError::CreateEventAsync(e)
491 })?;
492 futures.push(wait_event_and_return_port_pair(evt_async, *port));
493 }
494 }
495 let connection_evt_clone = self.connection_event.try_clone().map_err(|e| {
496 error!("Could not clone connection_event.");
497 VsockError::CloneDescriptor(e)
498 })?;
499 let connection_evt_async = EventAsync::new(connection_evt_clone, ex).map_err(|e| {
500 error!("Could not create EventAsync.");
501 VsockError::CreateEventAsync(e)
502 })?;
503 futures.push(wait_event_and_return_port_pair(
504 connection_evt_async,
505 PortPair {
506 host: CONNECTION_EVENT_PORT_NUM,
507 guest: 0,
508 },
509 ));
510
511 // Wait to service the sockets. Note that for fairness, it is critical that we service
512 // all ready sockets in a single wakeup to avoid starvation. This is why ready_chunks
513 // is used, as it returns all currently *ready* futures from the stream.
514 //
515 // The expect here only triggers if the FuturesUnordered stream is exhausted. This never
516 // happens because it has at least one item, and we only request chunks once.
517 let futures_len = futures.len();
518 let mut ready_chunks = futures.ready_chunks(futures_len);
519 let ports = select_biased! {
520 ports = ready_chunks.next() => {
521 ports.expect("failed to wait on vsock sockets")
522 }
523 _ = stop_rx => {
524 break;
525 }
526 };
527
528 for port in ports {
529 if port.host == CONNECTION_EVENT_PORT_NUM {
530 // New connection event. Setup futures again.
531 if let Err(e) = self.connection_event.reset() {
532 error!("vsock: port: {}: could not reset connection_event.", port);
533 return Err(VsockError::ResetEventObject(e));
534 }
535 continue 'connections_changed;
536 }
537 let mut connections = self.connections.lock().await;
538 let connection = if let Some(conn) = connections.get_mut(&port) {
539 conn
540 } else {
541 // We could have been scheduled to run the rx queue *before* the connection was
542 // closed. In that case, we do nothing. The code which closed the connection
543 // (e.g. in response to a message in the tx/rx queues) will handle notifying
544 // the guest/host as required.
545 continue 'connections_changed;
546 };
547
548 // Check if the peer has enough space in their buffer to
549 // receive the maximum amount of data that we could possibly
550 // read from the host pipe. If not, we should continue to
551 // process other sockets as each socket has an independent
552 // buffer.
553 let peer_free_buf_size =
554 connection.peer_buf_alloc - (connection.tx_cnt - connection.peer_recv_cnt);
555 if peer_free_buf_size < TEMP_READ_BUF_SIZE_BYTES {
556 if !connection.is_buffer_full {
557 warn!(
558 "vsock: port {}: Peer has insufficient free buffer space ({} bytes available)",
559 port, peer_free_buf_size
560 );
561 connection.is_buffer_full = true;
562 }
563 continue;
564 } else if connection.is_buffer_full {
565 connection.is_buffer_full = false;
566 }
567
568 let pipe_connection = &mut connection.pipe;
569 let overlapped = &mut connection.overlapped;
570 let guest_port = connection.guest_port;
571 let buffer = &mut connection.buffer;
572
573 match overlapped.get_h_event_ref() {
574 Some(h_event) => {
575 if let Err(e) = h_event.reset() {
576 error!(
577 "vsock: port: {}: Could not reset event in OverlappedWrapper.",
578 port
579 );
580 return Err(VsockError::ResetEventObject(e));
581 }
582 }
583 None => {
584 error!(
585 "vsock: port: {}: missing h_event in OverlappedWrapper.",
586 port
587 );
588 return Err(VsockError::BadOverlappedWrapper);
589 }
590 }
591
592 let data_size = match pipe_connection.get_overlapped_result(&mut *overlapped) {
593 Ok(size) => size as usize,
594 Err(e) => {
595 error!("vsock: port {}: Failed to read from pipe {}", port, e);
596 // TODO(b/237278629): Close the connection if we fail to read.
597 continue 'connections_changed;
598 }
599 };
600
601 let response_header = virtio_vsock_hdr {
602 src_cid: 2.into(), // Host CID
603 dst_cid: self.guest_cid.into(), // Guest CID
604 src_port: Le32::from(port.host),
605 dst_port: guest_port,
606 len: Le32::from(data_size as u32),
607 type_: TYPE_STREAM_SOCKET.into(),
608 op: vsock_op::VIRTIO_VSOCK_OP_RW.into(),
609 buf_alloc: Le32::from(connection.buf_alloc as u32),
610 fwd_cnt: Le32::from(connection.recv_cnt as u32),
611 ..Default::default()
612 };
613
614 connection.prev_recv_cnt = connection.recv_cnt;
615
616 // We have to only write to the queue once, so we construct a new buffer
617 // with the concatenated header and data.
618 const HEADER_SIZE: usize = std::mem::size_of::<virtio_vsock_hdr>();
619 let data_read = &buffer[..data_size];
620 let mut header_and_data = vec![0u8; HEADER_SIZE + data_size];
621 header_and_data[..HEADER_SIZE].copy_from_slice(response_header.as_bytes());
622 header_and_data[HEADER_SIZE..].copy_from_slice(data_read);
623 {
624 let mut recv_queue_lock = recv_queue.lock().await;
625 let write_fut = self
626 .write_bytes_to_queue(
627 &mut recv_queue_lock,
628 &mut rx_queue_evt,
629 &header_and_data[..],
630 )
631 .fuse();
632 pin_mut!(write_fut);
633 // If `stop_rx` is fired but the virt queue is full, this loop will break
634 // without draining the `header_and_data`.
635 select_biased! {
636 write = write_fut => {},
637 _ = stop_rx => {
638 break;
639 }
640 }
641 }
642
643 connection.tx_cnt += data_size;
644
645 // Start reading again so we receive the message and
646 // event signal immediately.
647
648 // SAFETY:
649 // Unsafe because the read could happen at any time
650 // after this function is called. We ensure safety
651 // by allocating the buffer and overlapped struct
652 // on the heap.
653 unsafe {
654 match pipe_connection.read_overlapped(&mut buffer[..], &mut *overlapped) {
655 Ok(()) => {}
656 Err(e) => {
657 error!("vsock: port {}: Failed to read from pipe {}", port, e);
658 }
659 }
660 }
661 }
662 }
663 Ok(())
664 }
665
process_tx_queue( &self, mut queue: Queue, mut queue_evt: EventAsync, mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>, mut stop_rx: oneshot::Receiver<()>, ) -> Result<Queue>666 async fn process_tx_queue(
667 &self,
668 mut queue: Queue,
669 mut queue_evt: EventAsync,
670 mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>,
671 mut stop_rx: oneshot::Receiver<()>,
672 ) -> Result<Queue> {
673 loop {
674 // Run continuously until exit evt
675 let mut avail_desc = match queue
676 .next_async_interruptable(&mut queue_evt, &mut stop_rx)
677 .await
678 {
679 Ok(Some(d)) => d,
680 Ok(None) => break,
681 Err(e) => {
682 error!("vsock: Failed to read descriptor {}", e);
683 return Err(VsockError::AwaitQueue(e));
684 }
685 };
686
687 let reader = &mut avail_desc.reader;
688 while reader.available_bytes() >= std::mem::size_of::<virtio_vsock_hdr>() {
689 let header = match reader.read_obj::<virtio_vsock_hdr>() {
690 Ok(hdr) => hdr,
691 Err(e) => {
692 error!("vsock: Error while reading header: {}", e);
693 break;
694 }
695 };
696
697 let len = header.len.to_native() as usize;
698 if reader.available_bytes() < len {
699 error!("vsock: Error reading packet data");
700 break;
701 }
702
703 let mut data = vec![0_u8; len];
704 if len > 0 {
705 if let Err(e) = reader.read_exact(&mut data) {
706 error!("vosck: failed to read data from tx packet: {:?}", e);
707 }
708 }
709
710 if let Err(e) = process_packets_queue.send((header, data)).await {
711 error!(
712 "Error while sending packet to queue, dropping packet: {:?}",
713 e
714 )
715 };
716 }
717
718 queue.add_used(avail_desc, 0);
719 queue.trigger_interrupt();
720 }
721
722 Ok(queue)
723 }
724
calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize725 fn calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize {
726 match pipe.get_info() {
727 Ok(info) => {
728 if info.outgoing_buffer_size > 0 {
729 info.outgoing_buffer_size as usize
730 } else {
731 info!(
732 "vsock: port {}: using default extra rx buffer size \
733 (named pipe does not have an explicit buffer size)",
734 port
735 );
736
737 // A zero buffer size implies that the buffer grows as
738 // needed. We set our own cap here for flow control
739 // purposes.
740 DEFAULT_BUF_ALLOC_BYTES
741 }
742 }
743 Err(e) => {
744 error!(
745 "vsock: port {}: failed to get named pipe info, using default \
746 buf size. Error: {}",
747 port, e
748 );
749 DEFAULT_BUF_ALLOC_BYTES
750 }
751 }
752 }
753
754 /// Processes a connection request and returns whether to return a response (true), or reset
755 /// (false).
handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool756 async fn handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool {
757 let port = PortPair::from_tx_header(&header);
758 info!("vsock: port {}: Received connection request", port);
759
760 if self.connections.read_lock().await.contains_key(&port) {
761 // Connection exists, nothing for us to do.
762 warn!(
763 "vsock: port {}: accepting connection request on already connected port",
764 port
765 );
766 return true;
767 }
768
769 if self.host_guid.is_none() {
770 error!(
771 "vsock: port {}: Cannot accept guest-initiated connections \
772 without host-guid, rejecting connection",
773 port,
774 );
775 return false;
776 }
777
778 // We have a new connection to establish.
779 let mut overlapped_wrapper =
780 Box::new(OverlappedWrapper::new(/* include_event= */ true).unwrap());
781 let pipe_result = named_pipes::create_client_pipe(
782 get_pipe_name(
783 self.host_guid.as_ref().unwrap(),
784 header.dst_port.to_native(),
785 )
786 .as_str(),
787 &FramingMode::Byte,
788 &BlockingMode::Wait,
789 true, /* overlapped */
790 );
791
792 match pipe_result {
793 Ok(mut pipe_connection) => {
794 let mut buffer = Box::new([0u8; TEMP_READ_BUF_SIZE_BYTES]);
795 info!("vsock: port {}: created client pipe", port);
796
797 // SAFETY:
798 // Unsafe because the read could happen at any time
799 // after this function is called. We ensure safety
800 // by allocating the buffer and overlapped struct
801 // on the heap.
802 unsafe {
803 match pipe_connection.read_overlapped(&mut buffer[..], &mut overlapped_wrapper)
804 {
805 Ok(()) => {}
806 Err(e) => {
807 error!("vsock: port {}: Failed to read from pipe {}", port, e);
808 return false;
809 }
810 }
811 }
812 info!("vsock: port {}: started read on client pipe", port);
813
814 let buf_alloc = Self::calculate_buf_alloc_from_pipe(&pipe_connection, port);
815 let connection = VsockConnection {
816 guest_port: header.src_port,
817 pipe: pipe_connection,
818 overlapped: overlapped_wrapper,
819 peer_buf_alloc: header.buf_alloc.to_native() as usize,
820 peer_recv_cnt: header.fwd_cnt.to_native() as usize,
821 buf_alloc,
822 buffer,
823 // The connection has just been made, so we haven't received
824 // anything yet.
825 recv_cnt: 0_usize,
826 prev_recv_cnt: 0_usize,
827 tx_cnt: 0_usize,
828 is_buffer_full: false,
829 };
830 self.connections.lock().await.insert(port, connection);
831 self.connection_event.signal().unwrap_or_else(|_| {
832 panic!(
833 "Failed to signal new connection event for vsock port {}.",
834 port
835 )
836 });
837 info!("vsock: port {}: signaled connection ready", port);
838 true
839 }
840 Err(e) => {
841 info!(
842 "vsock: No waiting pipe connection on port {}, \
843 not connecting (err: {:?})",
844 port, e
845 );
846 false
847 }
848 }
849 }
850
handle_vsock_guest_data( &self, header: virtio_vsock_hdr, data: &[u8], ex: &Executor, ) -> Result<()>851 async fn handle_vsock_guest_data(
852 &self,
853 header: virtio_vsock_hdr,
854 data: &[u8],
855 ex: &Executor,
856 ) -> Result<()> {
857 let port = PortPair::from_tx_header(&header);
858 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
859 {
860 let mut connections = self.connections.lock().await;
861 if let Some(connection) = connections.get_mut(&port) {
862 // Update peer buffer/recv counters
863 connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
864 connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
865
866 let pipe = &mut connection.pipe;
867 // We have to provide a OVERLAPPED struct to write to the pipe.
868 //
869 // SAFETY: safe because data & overlapped_wrapper live until the
870 // overlapped operation completes (we wait on completion below).
871 if let Err(e) = unsafe { pipe.write_overlapped(data, &mut overlapped_wrapper) } {
872 return Err(VsockError::WriteFailed(port, e));
873 }
874 } else {
875 error!(
876 "vsock: Guest attempted to send data packet over unconnected \
877 port ({}), dropping packet",
878 port
879 );
880 return Ok(());
881 }
882 }
883 if let Some(write_completed_event) = overlapped_wrapper.get_h_event_ref() {
884 // Don't block the executor while the write completes. This time should
885 // always be negligible, but will sometimes be non-zero in cases where
886 // traffic is high on the NamedPipe, especially a duplex pipe.
887 if let Ok(cloned_event) = write_completed_event.try_clone() {
888 if let Ok(async_event) = EventAsync::new_without_reset(cloned_event, ex) {
889 let _ = async_event.next_val().await;
890 } else {
891 error!(
892 "vsock: port {}: Failed to convert write event to async",
893 port
894 );
895 }
896 } else {
897 error!(
898 "vsock: port {}: Failed to clone write completion event",
899 port
900 );
901 }
902 } else {
903 error!(
904 "vsock: port: {}: Failed to get overlapped event for write",
905 port
906 );
907 }
908
909 let mut connections = self.connections.lock().await;
910 if let Some(connection) = connections.get_mut(&port) {
911 let pipe = &mut connection.pipe;
912 match pipe.get_overlapped_result(&mut overlapped_wrapper) {
913 Ok(len) => {
914 // We've received bytes from the guest, account for them in our
915 // received bytes counter.
916 connection.recv_cnt += len as usize;
917
918 if len != data.len() as u32 {
919 return Err(VsockError::WriteFailed(
920 port,
921 std::io::Error::new(
922 std::io::ErrorKind::Other,
923 format!(
924 "port {} failed to write correct number of bytes:
925 (expected: {}, wrote: {})",
926 port,
927 data.len(),
928 len
929 ),
930 ),
931 ));
932 }
933 }
934 Err(e) => {
935 return Err(VsockError::WriteFailed(port, e));
936 }
937 }
938 } else {
939 error!(
940 "vsock: Guest attempted to send data packet over unconnected \
941 port ({}), dropping packet",
942 port
943 );
944 }
945 Ok(())
946 }
947
process_tx_packets( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: Event, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, ex: &Executor, mut stop_rx: oneshot::Receiver<()>, )948 async fn process_tx_packets(
949 &self,
950 send_queue: &Arc<RwLock<Queue>>,
951 rx_queue_evt: Event,
952 mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
953 ex: &Executor,
954 mut stop_rx: oneshot::Receiver<()>,
955 ) {
956 let mut packet_queues = HashMap::new();
957 let mut futures = FuturesUnordered::new();
958 // Push a pending future that will never complete into FuturesUnordered.
959 // This will keep us from spinning on spurious notifications when we
960 // don't have any open connections.
961 futures.push(std::future::pending::<PortPair>().left_future());
962
963 let mut stop_future = FuturesUnordered::new();
964 stop_future.push(stop_rx);
965 loop {
966 let (new_packet, connection, stop_rx_res) =
967 select3(packet_recv_queue.next(), futures.next(), stop_future.next()).await;
968 match connection {
969 SelectResult::Finished(Some(port)) => {
970 packet_queues.remove(&port);
971 }
972 SelectResult::Finished(_) => {
973 // This is only triggered when FuturesUnordered completes
974 // all pending futures. Right now, this can never happen, as
975 // we have a pending future that we push that will never
976 // complete.
977 }
978 SelectResult::Pending(_) => {
979 // Nothing to do.
980 }
981 };
982 match new_packet {
983 SelectResult::Finished(Some(packet)) => {
984 let port = PortPair::from_tx_header(&packet.0);
985 let queue = packet_queues.entry(port).or_insert_with(|| {
986 let (send, recv) = mpsc::channel(CHANNEL_SIZE);
987 let event_async = EventAsync::new(
988 rx_queue_evt.try_clone().expect("Failed to clone event"),
989 ex,
990 )
991 .expect("Failed to set up the rx queue event");
992 futures.push(
993 self.process_tx_packets_for_port(
994 port,
995 recv,
996 send_queue,
997 event_async,
998 ex,
999 )
1000 .right_future(),
1001 );
1002 send
1003 });
1004 // Try to send the packet. Do not block other ports if the queue is full.
1005 if let Err(e) = queue.try_send(packet) {
1006 error!(
1007 "vsock: port {}: error sending packet to queue, dropping packet: {:?}",
1008 port, e
1009 )
1010 }
1011 }
1012 SelectResult::Finished(_) => {
1013 // Triggers when the channel is closed; no more packets coming.
1014 packet_recv_queue.close();
1015 return;
1016 }
1017 SelectResult::Pending(_) => {
1018 // Nothing to do.
1019 }
1020 }
1021 match stop_rx_res {
1022 SelectResult::Finished(_) => {
1023 break;
1024 }
1025 SelectResult::Pending(_) => {
1026 // Nothing to do.
1027 }
1028 }
1029 }
1030 }
1031
process_tx_packets_for_port( &self, port: PortPair, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, send_queue: &Arc<RwLock<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, ) -> PortPair1032 async fn process_tx_packets_for_port(
1033 &self,
1034 port: PortPair,
1035 mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
1036 send_queue: &Arc<RwLock<Queue>>,
1037 mut rx_queue_evt: EventAsync,
1038 ex: &Executor,
1039 ) -> PortPair {
1040 while let Some((header, data)) = packet_recv_queue.next().await {
1041 if !self
1042 .handle_tx_packet(header, &data, send_queue, &mut rx_queue_evt, ex)
1043 .await
1044 {
1045 packet_recv_queue.close();
1046 if let Ok(Some(_)) = packet_recv_queue.try_next() {
1047 warn!("vsock: closing port {} with unprocessed packets", port);
1048 } else {
1049 info!("vsock: closing port {} cleanly", port)
1050 }
1051 break;
1052 }
1053 }
1054 port
1055 }
1056
handle_tx_packet( &self, header: virtio_vsock_hdr, data: &[u8], send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, ex: &Executor, ) -> bool1057 async fn handle_tx_packet(
1058 &self,
1059 header: virtio_vsock_hdr,
1060 data: &[u8],
1061 send_queue: &Arc<RwLock<Queue>>,
1062 rx_queue_evt: &mut EventAsync,
1063 ex: &Executor,
1064 ) -> bool {
1065 let mut is_open = true;
1066 let port = PortPair::from_tx_header(&header);
1067 match header.op.to_native() {
1068 vsock_op::VIRTIO_VSOCK_OP_INVALID => {
1069 error!("vsock: Invalid Operation requested, dropping packet");
1070 }
1071 vsock_op::VIRTIO_VSOCK_OP_REQUEST => {
1072 let (resp_op, buf_alloc, fwd_cnt) =
1073 if self.handle_vsock_connection_request(header).await {
1074 let connections = self.connections.read_lock().await;
1075
1076 connections.get(&port).map_or_else(
1077 || {
1078 warn!("vsock: port: {} connection closed during connect", port);
1079 is_open = false;
1080 (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
1081 },
1082 |conn| {
1083 (
1084 vsock_op::VIRTIO_VSOCK_OP_RESPONSE,
1085 conn.buf_alloc as u32,
1086 conn.recv_cnt as u32,
1087 )
1088 },
1089 )
1090 } else {
1091 is_open = false;
1092 (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
1093 };
1094
1095 let response_header = virtio_vsock_hdr {
1096 src_cid: { header.dst_cid },
1097 dst_cid: { header.src_cid },
1098 src_port: { header.dst_port },
1099 dst_port: { header.src_port },
1100 len: 0.into(),
1101 type_: TYPE_STREAM_SOCKET.into(),
1102 op: resp_op.into(),
1103 buf_alloc: Le32::from(buf_alloc),
1104 fwd_cnt: Le32::from(fwd_cnt),
1105 ..Default::default()
1106 };
1107 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
1108 // bytes.
1109 self.write_bytes_to_queue(
1110 &mut *send_queue.lock().await,
1111 rx_queue_evt,
1112 response_header.as_bytes(),
1113 )
1114 .await
1115 .expect("vsock: failed to write to queue");
1116 info!(
1117 "vsock: port {}: replied {} to connection request",
1118 port,
1119 if resp_op == vsock_op::VIRTIO_VSOCK_OP_RESPONSE {
1120 "success"
1121 } else {
1122 "reset"
1123 },
1124 );
1125 }
1126 vsock_op::VIRTIO_VSOCK_OP_RESPONSE => {
1127 // TODO(b/237811512): Implement this for host-initiated connections
1128 }
1129 vsock_op::VIRTIO_VSOCK_OP_RST => {
1130 // TODO(b/237811512): Implement this for host-initiated connections
1131 }
1132 vsock_op::VIRTIO_VSOCK_OP_SHUTDOWN => {
1133 // While the header provides flags to specify tx/rx-specific shutdown,
1134 // we only support full shutdown.
1135 // TODO(b/237811512): Provide an optimal way to notify host of shutdowns
1136 // while still maintaining easy reconnections.
1137 let mut connections = self.connections.lock().await;
1138 if connections.remove(&port).is_some() {
1139 let mut response = virtio_vsock_hdr {
1140 src_cid: { header.dst_cid },
1141 dst_cid: { header.src_cid },
1142 src_port: { header.dst_port },
1143 dst_port: { header.src_port },
1144 len: 0.into(),
1145 type_: TYPE_STREAM_SOCKET.into(),
1146 op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1147 // There is no buffer on a closed connection
1148 buf_alloc: 0.into(),
1149 // There is no fwd_cnt anymore on a closed connection
1150 fwd_cnt: 0.into(),
1151 ..Default::default()
1152 };
1153 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
1154 // bytes
1155 self.write_bytes_to_queue(
1156 &mut *send_queue.lock().await,
1157 rx_queue_evt,
1158 response.as_mut_bytes(),
1159 )
1160 .await
1161 .expect("vsock: failed to write to queue");
1162 self.connection_event
1163 .signal()
1164 .expect("vsock: failed to write to event");
1165 info!("vsock: port: {}: disconnected by the guest", port);
1166 } else {
1167 error!("vsock: Attempted to close unopened port: {}", port);
1168 }
1169 is_open = false;
1170 }
1171 vsock_op::VIRTIO_VSOCK_OP_RW => {
1172 match self.handle_vsock_guest_data(header, data, ex).await {
1173 Ok(()) => {
1174 if self
1175 .check_free_buffer_threshold(header)
1176 .await
1177 .unwrap_or(false)
1178 {
1179 // Send a credit update if we're below the minimum free
1180 // buffer size. We skip this if the connection is closed,
1181 // which could've happened if we were closed on the other
1182 // end.
1183 info!(
1184 "vsock: port {}: Buffer below threshold; sending credit update.",
1185 port
1186 );
1187 self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1188 .await;
1189 }
1190 }
1191 Err(e) => {
1192 error!("vsock: port {}: resetting connection: {}", port, e);
1193 self.send_vsock_reset(send_queue, rx_queue_evt, header)
1194 .await;
1195 is_open = false;
1196 }
1197 }
1198 }
1199 // An update from our peer with their buffer state, which they are sending
1200 // (probably) due to a a credit request *we* made.
1201 vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE => {
1202 let port = PortPair::from_tx_header(&header);
1203 let mut connections = self.connections.lock().await;
1204 if let Some(connection) = connections.get_mut(&port) {
1205 connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
1206 connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
1207 } else {
1208 error!("vsock: port {}: got credit update on unknown port", port);
1209 is_open = false;
1210 }
1211 }
1212 // A request from our peer to get *our* buffer state. We reply to the RX queue.
1213 vsock_op::VIRTIO_VSOCK_OP_CREDIT_REQUEST => {
1214 info!(
1215 "vsock: port {}: Got credit request from peer; sending credit update.",
1216 port,
1217 );
1218 self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1219 .await;
1220 }
1221 _ => {
1222 error!(
1223 "vsock: port {}: unknown operation requested, dropping packet",
1224 port
1225 );
1226 }
1227 }
1228 is_open
1229 }
1230
1231 // Checks if how much free buffer our peer thinks that *we* have available
1232 // is below our threshold percentage. If the connection is closed, returns `None`.
check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool>1233 async fn check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool> {
1234 let mut connections = self.connections.lock().await;
1235 let port = PortPair::from_tx_header(&header);
1236 connections.get_mut(&port).map(|connection| {
1237 let threshold: usize = (MIN_FREE_BUFFER_PCT * connection.buf_alloc as f64) as usize;
1238 connection.buf_alloc - (connection.recv_cnt - connection.prev_recv_cnt) < threshold
1239 })
1240 }
1241
send_vsock_credit_update( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1242 async fn send_vsock_credit_update(
1243 &self,
1244 send_queue: &Arc<RwLock<Queue>>,
1245 rx_queue_evt: &mut EventAsync,
1246 header: virtio_vsock_hdr,
1247 ) {
1248 let mut connections = self.connections.lock().await;
1249 let port = PortPair::from_tx_header(&header);
1250
1251 if let Some(connection) = connections.get_mut(&port) {
1252 let mut response = virtio_vsock_hdr {
1253 src_cid: { header.dst_cid },
1254 dst_cid: { header.src_cid },
1255 src_port: { header.dst_port },
1256 dst_port: { header.src_port },
1257 len: 0.into(),
1258 type_: TYPE_STREAM_SOCKET.into(),
1259 op: vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE.into(),
1260 buf_alloc: Le32::from(connection.buf_alloc as u32),
1261 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1262 ..Default::default()
1263 };
1264
1265 connection.prev_recv_cnt = connection.recv_cnt;
1266
1267 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1268 // to bytes
1269 self.write_bytes_to_queue(
1270 &mut *send_queue.lock().await,
1271 rx_queue_evt,
1272 response.as_mut_bytes(),
1273 )
1274 .await
1275 .unwrap_or_else(|_| panic!("vsock: port {}: failed to write to queue", port));
1276 } else {
1277 error!(
1278 "vsock: port {}: error sending credit update on unknown port",
1279 port
1280 );
1281 }
1282 }
1283
send_vsock_reset( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1284 async fn send_vsock_reset(
1285 &self,
1286 send_queue: &Arc<RwLock<Queue>>,
1287 rx_queue_evt: &mut EventAsync,
1288 header: virtio_vsock_hdr,
1289 ) {
1290 let mut connections = self.connections.lock().await;
1291 let port = PortPair::from_tx_header(&header);
1292 if let Some(connection) = connections.remove(&port) {
1293 let mut response = virtio_vsock_hdr {
1294 src_cid: { header.dst_cid },
1295 dst_cid: { header.src_cid },
1296 src_port: { header.dst_port },
1297 dst_port: { header.src_port },
1298 len: 0.into(),
1299 type_: TYPE_STREAM_SOCKET.into(),
1300 op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1301 buf_alloc: Le32::from(connection.buf_alloc as u32),
1302 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1303 ..Default::default()
1304 };
1305
1306 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1307 // to bytes
1308 self.write_bytes_to_queue(
1309 &mut *send_queue.lock().await,
1310 rx_queue_evt,
1311 response.as_mut_bytes(),
1312 )
1313 .await
1314 .expect("failed to write to queue");
1315 } else {
1316 error!("vsock: port {}: error closing unknown port", port);
1317 }
1318 }
1319
write_bytes_to_queue( &self, queue: &mut Queue, queue_evt: &mut EventAsync, bytes: &[u8], ) -> Result<()>1320 async fn write_bytes_to_queue(
1321 &self,
1322 queue: &mut Queue,
1323 queue_evt: &mut EventAsync,
1324 bytes: &[u8],
1325 ) -> Result<()> {
1326 let mut avail_desc = match queue.next_async(queue_evt).await {
1327 Ok(d) => d,
1328 Err(e) => {
1329 error!("vsock: failed to read descriptor {}", e);
1330 return Err(VsockError::AwaitQueue(e));
1331 }
1332 };
1333 self.write_bytes_to_queue_inner(queue, avail_desc, bytes)
1334 }
1335
write_bytes_to_queue_interruptable( &self, queue: &mut Queue, queue_evt: &mut EventAsync, bytes: &[u8], mut stop_rx: &mut oneshot::Receiver<()>, ) -> Result<()>1336 async fn write_bytes_to_queue_interruptable(
1337 &self,
1338 queue: &mut Queue,
1339 queue_evt: &mut EventAsync,
1340 bytes: &[u8],
1341 mut stop_rx: &mut oneshot::Receiver<()>,
1342 ) -> Result<()> {
1343 let mut avail_desc = match queue.next_async_interruptable(queue_evt, stop_rx).await {
1344 Ok(d) => match d {
1345 Some(desc) => desc,
1346 None => return Ok(()),
1347 },
1348 Err(e) => {
1349 error!("vsock: failed to read descriptor {}", e);
1350 return Err(VsockError::AwaitQueue(e));
1351 }
1352 };
1353 self.write_bytes_to_queue_inner(queue, avail_desc, bytes)
1354 }
1355
write_bytes_to_queue_inner( &self, queue: &mut Queue, mut desc_chain: DescriptorChain, bytes: &[u8], ) -> Result<()>1356 fn write_bytes_to_queue_inner(
1357 &self,
1358 queue: &mut Queue,
1359 mut desc_chain: DescriptorChain,
1360 bytes: &[u8],
1361 ) -> Result<()> {
1362 let writer = &mut desc_chain.writer;
1363 let res = writer.write_all(bytes);
1364
1365 if let Err(e) = res {
1366 error!(
1367 "vsock: failed to write {} bytes to queue, err: {:?}",
1368 bytes.len(),
1369 e
1370 );
1371 return Err(VsockError::WriteQueue(e));
1372 }
1373
1374 let bytes_written = writer.bytes_written() as u32;
1375 if bytes_written > 0 {
1376 queue.add_used(desc_chain, bytes_written);
1377 queue.trigger_interrupt();
1378 Ok(())
1379 } else {
1380 error!("vsock: failed to write bytes to queue");
1381 Err(VsockError::WriteQueue(std::io::Error::new(
1382 std::io::ErrorKind::Other,
1383 "failed to write bytes to queue",
1384 )))
1385 }
1386 }
1387
process_event_queue( &self, mut queue: Queue, mut queue_evt: EventAsync, mut stop_rx: oneshot::Receiver<()>, mut vsock_event_receiver: mpsc::Receiver<virtio_vsock_event>, ) -> Result<Queue>1388 async fn process_event_queue(
1389 &self,
1390 mut queue: Queue,
1391 mut queue_evt: EventAsync,
1392 mut stop_rx: oneshot::Receiver<()>,
1393 mut vsock_event_receiver: mpsc::Receiver<virtio_vsock_event>,
1394 ) -> Result<Queue> {
1395 loop {
1396 let vsock_event = select_biased! {
1397 vsock_event = vsock_event_receiver.next() => {
1398 vsock_event
1399 }
1400 _ = stop_rx => {
1401 break;
1402 }
1403 };
1404 let vsock_event = match vsock_event {
1405 Some(event) => event,
1406 None => break,
1407 };
1408 self.write_bytes_to_queue_interruptable(
1409 &mut queue,
1410 &mut queue_evt,
1411 vsock_event.as_bytes(),
1412 &mut stop_rx,
1413 )
1414 .await?;
1415 }
1416 Ok(queue)
1417 }
1418
run( mut self, rx_queue: Queue, tx_queue: Queue, event_queue: Queue, kill_evt: Event, ) -> Result<Option<(PausedQueues, VsockConnectionMap)>>1419 fn run(
1420 mut self,
1421 rx_queue: Queue,
1422 tx_queue: Queue,
1423 event_queue: Queue,
1424 kill_evt: Event,
1425 ) -> Result<Option<(PausedQueues, VsockConnectionMap)>> {
1426 let rx_queue_evt = rx_queue
1427 .event()
1428 .try_clone()
1429 .map_err(VsockError::CloneDescriptor)?;
1430
1431 // Note that this mutex won't ever be contended because the HandleExecutor is single
1432 // threaded. We need the mutex for compile time correctness, but technically it is not
1433 // actually providing mandatory locking, at least not at the moment. If we later use a
1434 // multi-threaded executor, then this lock will be important.
1435 let rx_queue_arc = Arc::new(RwLock::new(rx_queue));
1436
1437 // Run executor / create futures in a scope, preventing extra reference to `rx_queue_arc`.
1438 let res = {
1439 let ex = Executor::new().unwrap();
1440
1441 let rx_evt_async = EventAsync::new(
1442 rx_queue_evt
1443 .try_clone()
1444 .map_err(VsockError::CloneDescriptor)?,
1445 &ex,
1446 )
1447 .expect("Failed to set up the rx queue event");
1448 let mut stop_queue_oneshots = Vec::new();
1449
1450 let vsock_event_receiver = self
1451 .device_event_queue_rx
1452 .take()
1453 .expect("event queue rx must be present");
1454
1455 let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1456 let rx_handler =
1457 self.process_rx_queue(rx_queue_arc.clone(), rx_evt_async, &ex, stop_rx);
1458 let rx_handler = rx_handler.fuse();
1459 pin_mut!(rx_handler);
1460
1461 let (send, recv) = mpsc::channel(CHANNEL_SIZE);
1462
1463 let tx_evt_async = EventAsync::new(
1464 tx_queue
1465 .event()
1466 .try_clone()
1467 .map_err(VsockError::CloneDescriptor)?,
1468 &ex,
1469 )
1470 .expect("Failed to set up the tx queue event");
1471 let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1472 let tx_handler = self.process_tx_queue(tx_queue, tx_evt_async, send, stop_rx);
1473 let tx_handler = tx_handler.fuse();
1474 pin_mut!(tx_handler);
1475
1476 let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1477 let packet_handler =
1478 self.process_tx_packets(&rx_queue_arc, rx_queue_evt, recv, &ex, stop_rx);
1479 let packet_handler = packet_handler.fuse();
1480 pin_mut!(packet_handler);
1481
1482 let event_evt_async = EventAsync::new(
1483 event_queue
1484 .event()
1485 .try_clone()
1486 .map_err(VsockError::CloneDescriptor)?,
1487 &ex,
1488 )
1489 .expect("Failed to set up the event queue event");
1490 let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1491 let event_handler = self.process_event_queue(
1492 event_queue,
1493 event_evt_async,
1494 stop_rx,
1495 vsock_event_receiver,
1496 );
1497 let event_handler = event_handler.fuse();
1498 pin_mut!(event_handler);
1499
1500 let kill_evt = EventAsync::new(kill_evt, &ex).expect("Failed to set up the kill event");
1501 let kill_handler = kill_evt.next_val();
1502 pin_mut!(kill_handler);
1503
1504 let mut device_event_queue_tx = self.device_event_queue_tx.clone();
1505 if self.send_protocol_reset {
1506 ex.run_until(async move { device_event_queue_tx.send(
1507 virtio_vsock_event {
1508 id: virtio_sys::virtio_vsock::virtio_vsock_event_id_VIRTIO_VSOCK_EVENT_TRANSPORT_RESET
1509 .into(),
1510 }).await
1511 }).expect("failed to write to empty mpsc queue.");
1512 }
1513
1514 ex.run_until(async {
1515 select! {
1516 _ = kill_handler.fuse() => (),
1517 _ = rx_handler => return Err(anyhow!("rx_handler stopped unexpetedly")),
1518 _ = tx_handler => return Err(anyhow!("tx_handler stop unexpectedly.")),
1519 _ = packet_handler => return Err(anyhow!("packet_handler stop unexpectedly.")),
1520 _ = event_handler => return Err(anyhow!("event_handler stop unexpectedly.")),
1521 }
1522 // kill_evt has fired
1523
1524 for stop_tx in stop_queue_oneshots {
1525 if stop_tx.send(()).is_err() {
1526 return Err(anyhow!("failed to request stop for queue future"));
1527 }
1528 }
1529
1530 rx_handler.await.context("Failed to stop rx handler.")?;
1531 packet_handler.await;
1532
1533 Ok((
1534 tx_handler.await.context("Failed to stop tx handler.")?,
1535 event_handler
1536 .await
1537 .context("Failed to stop event handler.")?,
1538 ))
1539 })
1540 };
1541
1542 // At this point, a request to stop this worker has been sent or an error has happened in
1543 // one of the futures, which will stop this worker anyways.
1544
1545 let queues_and_connections = match res {
1546 Ok(main_future_res) => match main_future_res {
1547 Ok((tx_queue, event_handler_queue)) => {
1548 let rx_queue = match Arc::try_unwrap(rx_queue_arc) {
1549 Ok(queue_rw_lock) => queue_rw_lock.into_inner(),
1550 Err(_) => panic!("failed to recover queue from worker"),
1551 };
1552 let paused_queues = PausedQueues::new(rx_queue, tx_queue, event_handler_queue);
1553 Some((paused_queues, self.connections))
1554 }
1555 Err(e) => {
1556 error!("Error happened in a vsock future: {}", e);
1557 None
1558 }
1559 },
1560 Err(e) => {
1561 error!("error happened in executor: {}", e);
1562 None
1563 }
1564 };
1565
1566 Ok(queues_and_connections)
1567 }
1568 }
1569
1570 /// Queues & events for the vsock device.
1571 struct VsockQueues {
1572 rx: Queue,
1573 tx: Queue,
1574 event: Queue,
1575 }
1576
1577 impl TryFrom<BTreeMap<usize, Queue>> for VsockQueues {
1578 type Error = anyhow::Error;
try_from(mut queues: BTreeMap<usize, Queue>) -> result::Result<Self, Self::Error>1579 fn try_from(mut queues: BTreeMap<usize, Queue>) -> result::Result<Self, Self::Error> {
1580 if queues.len() < 3 {
1581 anyhow::bail!(
1582 "{} queues were found, but an activated vsock must have at 3 active queues.",
1583 queues.len()
1584 );
1585 }
1586
1587 Ok(VsockQueues {
1588 rx: queues.remove(&0).context("the rx queue is required.")?,
1589 tx: queues.remove(&1).context("the tx queue is required.")?,
1590 event: queues.remove(&2).context("the event queue is required.")?,
1591 })
1592 }
1593 }
1594
1595 impl From<PausedQueues> for BTreeMap<usize, Queue> {
from(queues: PausedQueues) -> Self1596 fn from(queues: PausedQueues) -> Self {
1597 let mut ret = BTreeMap::new();
1598 ret.insert(0, queues.rx_queue);
1599 ret.insert(1, queues.tx_queue);
1600 ret.insert(2, queues.event_queue);
1601 ret
1602 }
1603 }
1604
1605 struct PausedQueues {
1606 rx_queue: Queue,
1607 tx_queue: Queue,
1608 event_queue: Queue,
1609 }
1610
1611 impl PausedQueues {
new(rx_queue: Queue, tx_queue: Queue, event_queue: Queue) -> Self1612 fn new(rx_queue: Queue, tx_queue: Queue, event_queue: Queue) -> Self {
1613 PausedQueues {
1614 rx_queue,
1615 tx_queue,
1616 event_queue,
1617 }
1618 }
1619 }
1620
get_pipe_name(guid: &str, pipe: u32) -> String1621 fn get_pipe_name(guid: &str, pipe: u32) -> String {
1622 format!("\\\\.\\pipe\\{}\\vsock-{}", guid, pipe)
1623 }
1624
wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair1625 async fn wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair {
1626 // This doesn't reset the event since we have to call GetOverlappedResult
1627 // on the OVERLAPPED struct first before resetting it.
1628 let _ = evt.get_io_source_ref().wait_for_handle().await;
1629 pair
1630 }
1631