• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::collections::HashMap;
8 use std::fmt;
9 use std::fmt::Display;
10 use std::io;
11 use std::io::Read;
12 use std::io::Write;
13 use std::os::windows::io::RawHandle;
14 use std::rc::Rc;
15 use std::result;
16 use std::sync::Arc;
17 use std::thread;
18 
19 use anyhow::anyhow;
20 use anyhow::Context;
21 use base::error;
22 use base::info;
23 use base::named_pipes;
24 use base::named_pipes::BlockingMode;
25 use base::named_pipes::FramingMode;
26 use base::named_pipes::OverlappedWrapper;
27 use base::named_pipes::PipeConnection;
28 use base::warn;
29 use base::AsRawDescriptor;
30 use base::Error as SysError;
31 use base::Event;
32 use base::EventExt;
33 use base::WorkerThread;
34 use cros_async::select3;
35 use cros_async::select6;
36 use cros_async::sync::RwLock;
37 use cros_async::AsyncError;
38 use cros_async::EventAsync;
39 use cros_async::Executor;
40 use cros_async::SelectResult;
41 use data_model::Le32;
42 use data_model::Le64;
43 use futures::channel::mpsc;
44 use futures::channel::oneshot;
45 use futures::pin_mut;
46 use futures::select;
47 use futures::select_biased;
48 use futures::stream::FuturesUnordered;
49 use futures::FutureExt;
50 use futures::SinkExt;
51 use futures::StreamExt;
52 use remain::sorted;
53 use serde::Deserialize;
54 use serde::Serialize;
55 use snapshot::AnySnapshot;
56 use thiserror::Error as ThisError;
57 use vm_memory::GuestMemory;
58 use zerocopy::FromBytes;
59 use zerocopy::FromZeros;
60 use zerocopy::IntoBytes;
61 
62 use crate::virtio::async_utils;
63 use crate::virtio::copy_config;
64 use crate::virtio::create_stop_oneshot;
65 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_config;
66 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_event;
67 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_hdr;
68 use crate::virtio::vsock::sys::windows::protocol::vsock_op;
69 use crate::virtio::vsock::sys::windows::protocol::TYPE_STREAM_SOCKET;
70 use crate::virtio::DescriptorChain;
71 use crate::virtio::DeviceType;
72 use crate::virtio::Interrupt;
73 use crate::virtio::Queue;
74 use crate::virtio::StoppedWorker;
75 use crate::virtio::VirtioDevice;
76 use crate::virtio::Writer;
77 use crate::Suspendable;
78 
79 #[sorted]
80 #[derive(ThisError, Debug)]
81 pub enum VsockError {
82     #[error("Failed to await next descriptor chain from queue: {0}")]
83     AwaitQueue(AsyncError),
84     #[error("OverlappedWrapper error.")]
85     BadOverlappedWrapper,
86     #[error("Failed to clone descriptor: {0}")]
87     CloneDescriptor(SysError),
88     #[error("Failed to create EventAsync: {0}")]
89     CreateEventAsync(AsyncError),
90     #[error("Failed to create wait context: {0}")]
91     CreateWaitContext(SysError),
92     #[error("Failed to read queue: {0}")]
93     ReadQueue(io::Error),
94     #[error("Failed to reset event object: {0}")]
95     ResetEventObject(SysError),
96     #[error("Failed to run executor: {0}")]
97     RunExecutor(AsyncError),
98     #[error("Failed to write to pipe, port {0}: {1}")]
99     WriteFailed(PortPair, io::Error),
100     #[error("Failed to write queue: {0}")]
101     WriteQueue(io::Error),
102 }
103 pub type Result<T> = result::Result<T, VsockError>;
104 
105 // Vsock has three virt IO queues: rx, tx, and event.
106 const QUEUE_SIZE: u16 = 256;
107 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE, QUEUE_SIZE];
108 // We overload port numbers so that if message is to be received from
109 // CONNECTION_EVENT_PORT_NUM (invalid port number), we recognize that a
110 // new connection was set up.
111 const CONNECTION_EVENT_PORT_NUM: u32 = u32::MAX;
112 
113 /// Number of bytes in a kilobyte. Used to simplify and clarify buffer size definitions.
114 const KB: usize = 1024;
115 
116 /// Size of the buffer we read into from the host side named pipe. Note that data flows from the
117 /// host pipe -> this buffer -> rx queue.
118 /// This should be large enough to facilitate fast transmission of host data, see b/232950349.
119 const TEMP_READ_BUF_SIZE_BYTES: usize = 4 * KB;
120 
121 /// In the case where the host side named pipe does not have a specified buffer size, we'll default
122 /// to telling the guest that this is the amount of extra RX space available (e.g. buf_alloc).
123 /// This should be larger than the volume of data that the guest will generally send at one time to
124 /// minimize credit update packtes (see MIN_FREE_BUFFER_PCT below).
125 const DEFAULT_BUF_ALLOC_BYTES: usize = 128 * KB;
126 
127 /// Minimum free buffer threshold to notify the peer with a credit update
128 /// message. This is in case we are ingesting many messages without an
129 /// opportunity to send a message back to the peer with a buffer size update.
130 /// This value is a percentage of `buf_alloc`.
131 /// TODO(b/204246759): This value was chosen without much more thought than "it
132 /// works". It should probably be adjusted, along with DEFAULT_BUF_ALLOC, to a
133 /// value that makes empirical sense for the packet sizes that we expect to
134 /// receive.
135 /// TODO(b/239848326): Replace float with integer, in order to remove risk
136 /// of losing precision. Ie. change to `10` and perform
137 /// `FOO * MIN_FREE_BUFFER_PCT / 100`
138 const MIN_FREE_BUFFER_PCT: f64 = 0.1;
139 
140 // Number of packets to buffer in the tx processing channels.
141 const CHANNEL_SIZE: usize = 256;
142 
143 type VsockConnectionMap = RwLock<HashMap<PortPair, VsockConnection>>;
144 
145 /// Virtio device for exposing entropy to the guest OS through virtio.
146 pub struct Vsock {
147     guest_cid: u64,
148     host_guid: Option<String>,
149     features: u64,
150     acked_features: u64,
151     worker_thread: Option<WorkerThread<Option<(PausedQueues, VsockConnectionMap)>>>,
152     /// Stores any active connections when the device sleeps. This allows us to sleep/wake
153     /// without disrupting active connections, which is useful when taking a snapshot.
154     sleeping_connections: Option<VsockConnectionMap>,
155     /// If true, we should send a TRANSPORT_RESET event to the guest at the next opportunity.
156     /// Used to inform the guest all connections are broken when we restore a snapshot.
157     needs_transport_reset: bool,
158 }
159 
160 /// Snapshotted state of Vsock. These fields are serialized in order to validate they haven't
161 /// changed when this device is restored.
162 #[derive(Serialize, Deserialize)]
163 struct VsockSnapshot {
164     guest_cid: u64,
165     features: u64,
166     acked_features: u64,
167 }
168 
169 impl Vsock {
new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock>170     pub fn new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock> {
171         Ok(Vsock {
172             guest_cid,
173             host_guid,
174             features: base_features,
175             acked_features: 0,
176             worker_thread: None,
177             sleeping_connections: None,
178             needs_transport_reset: false,
179         })
180     }
181 
get_config(&self) -> virtio_vsock_config182     fn get_config(&self) -> virtio_vsock_config {
183         virtio_vsock_config {
184             guest_cid: Le64::from(self.guest_cid),
185         }
186     }
187 
stop_worker(&mut self) -> StoppedWorker<(PausedQueues, VsockConnectionMap)>188     fn stop_worker(&mut self) -> StoppedWorker<(PausedQueues, VsockConnectionMap)> {
189         if let Some(worker_thread) = self.worker_thread.take() {
190             if let Some(queues_and_conns) = worker_thread.stop() {
191                 StoppedWorker::WithQueues(Box::new(queues_and_conns))
192             } else {
193                 StoppedWorker::MissingQueues
194             }
195         } else {
196             StoppedWorker::AlreadyStopped
197         }
198     }
199 
start_worker( &mut self, mem: GuestMemory, mut queues: VsockQueues, existing_connections: Option<VsockConnectionMap>, ) -> anyhow::Result<()>200     fn start_worker(
201         &mut self,
202         mem: GuestMemory,
203         mut queues: VsockQueues,
204         existing_connections: Option<VsockConnectionMap>,
205     ) -> anyhow::Result<()> {
206         let rx_queue = queues.rx;
207         let tx_queue = queues.tx;
208         let event_queue = queues.event;
209 
210         let host_guid = self.host_guid.clone();
211         let guest_cid = self.guest_cid;
212         let needs_transport_reset = self.needs_transport_reset;
213         self.needs_transport_reset = false;
214         self.worker_thread = Some(WorkerThread::start(
215             "userspace_virtio_vsock",
216             move |kill_evt| {
217                 let mut worker = Worker::new(
218                     mem,
219                     host_guid,
220                     guest_cid,
221                     existing_connections,
222                     needs_transport_reset,
223                 );
224                 let result = worker.run(rx_queue, tx_queue, event_queue, kill_evt);
225 
226                 match result {
227                     Err(e) => {
228                         error!("userspace vsock worker thread exited with error: {:?}", e);
229                         None
230                     }
231                     Ok(paused_queues_and_connections_option) => {
232                         paused_queues_and_connections_option
233                     }
234                 }
235             },
236         ));
237 
238         Ok(())
239     }
240 }
241 
242 impl VirtioDevice for Vsock {
keep_rds(&self) -> Vec<RawHandle>243     fn keep_rds(&self) -> Vec<RawHandle> {
244         Vec::new()
245     }
246 
read_config(&self, offset: u64, data: &mut [u8])247     fn read_config(&self, offset: u64, data: &mut [u8]) {
248         copy_config(data, 0, self.get_config().as_bytes(), offset);
249     }
250 
device_type(&self) -> DeviceType251     fn device_type(&self) -> DeviceType {
252         DeviceType::Vsock
253     }
254 
queue_max_sizes(&self) -> &[u16]255     fn queue_max_sizes(&self) -> &[u16] {
256         QUEUE_SIZES
257     }
258 
features(&self) -> u64259     fn features(&self) -> u64 {
260         self.features
261     }
262 
ack_features(&mut self, value: u64)263     fn ack_features(&mut self, value: u64) {
264         self.acked_features |= value;
265     }
266 
activate( &mut self, mem: GuestMemory, _interrupt: Interrupt, mut queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>267     fn activate(
268         &mut self,
269         mem: GuestMemory,
270         _interrupt: Interrupt,
271         mut queues: BTreeMap<usize, Queue>,
272     ) -> anyhow::Result<()> {
273         if queues.len() != QUEUE_SIZES.len() {
274             return Err(anyhow!(
275                 "Failed to activate vsock device. queues.len(): {} != {}",
276                 queues.len(),
277                 QUEUE_SIZES.len(),
278             ));
279         }
280 
281         let vsock_queues = VsockQueues {
282             rx: queues.remove(&0).unwrap(),
283             tx: queues.remove(&1).unwrap(),
284             event: queues.remove(&2).unwrap(),
285         };
286 
287         self.start_worker(mem, vsock_queues, None)
288     }
289 
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>290     fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
291         match self.stop_worker() {
292             StoppedWorker::WithQueues(paused_queues_and_conns) => {
293                 let (queues, sleeping_connections) = *paused_queues_and_conns;
294                 self.sleeping_connections = Some(sleeping_connections);
295                 Ok(Some(queues.into()))
296             }
297             StoppedWorker::MissingQueues => {
298                 anyhow::bail!("vsock queue workers did not stop cleanly")
299             }
300             StoppedWorker::AlreadyStopped => {
301                 // The device isn't in the activated state.
302                 Ok(None)
303             }
304         }
305     }
306 
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>307     fn virtio_wake(
308         &mut self,
309         queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
310     ) -> anyhow::Result<()> {
311         if let Some((mem, _interrupt, queues)) = queues_state {
312             let connections = self.sleeping_connections.take();
313             self.start_worker(
314                 mem,
315                 queues
316                     .try_into()
317                     .expect("Failed to convert queue BTreeMap to VsockQueues"),
318                 connections,
319             )?;
320         }
321         Ok(())
322     }
323 
virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot>324     fn virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
325         AnySnapshot::to_any(VsockSnapshot {
326             guest_cid: self.guest_cid,
327             features: self.features,
328             acked_features: self.acked_features,
329         })
330         .context("failed to serialize vsock snapshot")
331     }
332 
virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()>333     fn virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
334         let vsock_snapshot: VsockSnapshot =
335             AnySnapshot::from_any(data).context("error deserializing vsock snapshot")?;
336         anyhow::ensure!(
337             self.guest_cid == vsock_snapshot.guest_cid,
338             "expected guest_cid to match, but they did not. Live: {}, snapshot: {}",
339             self.guest_cid,
340             vsock_snapshot.guest_cid
341         );
342         anyhow::ensure!(
343             self.features == vsock_snapshot.features,
344             "vsock: expected features to match, but they did not. Live: {}, snapshot: {}",
345             self.features,
346             vsock_snapshot.features
347         );
348         self.acked_features = vsock_snapshot.acked_features;
349         self.needs_transport_reset = true;
350 
351         Ok(())
352     }
353 }
354 
355 #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
356 pub struct PortPair {
357     host: u32,
358     guest: u32,
359 }
360 
361 impl Display for PortPair {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result362     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
363         write!(f, "(host port: {}, guest port: {})", self.host, self.guest)
364     }
365 }
366 
367 impl PortPair {
from_tx_header(header: &virtio_vsock_hdr) -> PortPair368     fn from_tx_header(header: &virtio_vsock_hdr) -> PortPair {
369         PortPair {
370             host: header.dst_port.to_native(),
371             guest: header.src_port.to_native(),
372         }
373     }
374 }
375 
376 // Note: variables herein do not have to be atomic because this struct is guarded
377 // by a RwLock.
378 struct VsockConnection {
379     // The guest port.
380     guest_port: Le32,
381 
382     // The actual named (asynchronous) pipe connection.
383     pipe: PipeConnection,
384     // The overlapped struct contains an event object for the named pipe.
385     // This lets us select() on the pipes by waiting on the events.
386     // This is for Reads only.
387     overlapped: Box<OverlappedWrapper>,
388     // Read buffer for the named pipes. These are needed because reads complete
389     // asynchronously.
390     buffer: Box<[u8; TEMP_READ_BUF_SIZE_BYTES]>,
391 
392     // Total free-running count of received bytes.
393     recv_cnt: usize,
394 
395     // Total free-running count of received bytes that the peer has been informed of.
396     prev_recv_cnt: usize,
397 
398     // Total auxiliary buffer space available to receive packets from the driver, not including
399     // the virtqueue itself. For us, this is tx buffer on the named pipe into which we drain
400     // packets for the connection. Note that if the named pipe has a grow on demand TX buffer,
401     // we use DEFAULT_BUF_ALLOC instead.
402     buf_alloc: usize,
403 
404     // Peer (driver) total free-running count of received bytes.
405     peer_recv_cnt: usize,
406 
407     // Peer (driver) total rx buffer allocated.
408     peer_buf_alloc: usize,
409 
410     // Total free-running count of transmitted bytes.
411     tx_cnt: usize,
412 
413     // State tracking for full buffer condition. Currently just used for logging. If the peer's
414     // buffer does not have space for a maximum-sized message (TEMP_READ_BUF_SIZE_BYTES), this
415     // gets set to `true`. Once there's enough space in the buffer, this gets unset.
416     is_buffer_full: bool,
417 }
418 
419 struct Worker {
420     mem: GuestMemory,
421     host_guid: Option<String>,
422     guest_cid: u64,
423     // Map of host port to a VsockConnection.
424     connections: VsockConnectionMap,
425     connection_event: Event,
426     device_event_queue_tx: mpsc::Sender<virtio_vsock_event>,
427     device_event_queue_rx: Option<mpsc::Receiver<virtio_vsock_event>>,
428     send_protocol_reset: bool,
429 }
430 
431 impl Worker {
new( mem: GuestMemory, host_guid: Option<String>, guest_cid: u64, existing_connections: Option<VsockConnectionMap>, send_protocol_reset: bool, ) -> Worker432     fn new(
433         mem: GuestMemory,
434         host_guid: Option<String>,
435         guest_cid: u64,
436         existing_connections: Option<VsockConnectionMap>,
437         send_protocol_reset: bool,
438     ) -> Worker {
439         // Buffer size here is arbitrary, but must be at least one since we need
440         // to be able to write a reset event to the channel when the device
441         // worker is brought up on a VM restore. Note that we send exactly one
442         // message per VM session, so we should never see these messages backing
443         // up.
444         let (device_event_queue_tx, device_event_queue_rx) = mpsc::channel(4);
445 
446         Worker {
447             mem,
448             host_guid,
449             guest_cid,
450             connections: existing_connections.unwrap_or_default(),
451             connection_event: Event::new().unwrap(),
452             device_event_queue_tx,
453             device_event_queue_rx: Some(device_event_queue_rx),
454             send_protocol_reset,
455         }
456     }
457 
process_rx_queue( &self, recv_queue: Arc<RwLock<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, mut stop_rx: oneshot::Receiver<()>, ) -> Result<()>458     async fn process_rx_queue(
459         &self,
460         recv_queue: Arc<RwLock<Queue>>,
461         mut rx_queue_evt: EventAsync,
462         ex: &Executor,
463         mut stop_rx: oneshot::Receiver<()>,
464     ) -> Result<()> {
465         'connections_changed: loop {
466             // Run continuously until exit evt
467 
468             // TODO(b/200810561): Optimize this FuturesUnordered code.
469             // Set up the EventAsyncs to select on
470             let futures = FuturesUnordered::new();
471             // This needs to be its own scope since it holds a RwLock on `self.connections`.
472             {
473                 let connections = self.connections.read_lock().await;
474                 for (port, connection) in connections.iter() {
475                     let h_evt = connection
476                         .overlapped
477                         .get_h_event_ref()
478                         .ok_or_else(|| {
479                             error!("Missing h_event in OverlappedWrapper.");
480                             VsockError::BadOverlappedWrapper
481                         })
482                         .unwrap()
483                         .try_clone()
484                         .map_err(|e| {
485                             error!("Could not clone h_event.");
486                             VsockError::CloneDescriptor(e)
487                         })?;
488                     let evt_async = EventAsync::new(h_evt, ex).map_err(|e| {
489                         error!("Could not create EventAsync.");
490                         VsockError::CreateEventAsync(e)
491                     })?;
492                     futures.push(wait_event_and_return_port_pair(evt_async, *port));
493                 }
494             }
495             let connection_evt_clone = self.connection_event.try_clone().map_err(|e| {
496                 error!("Could not clone connection_event.");
497                 VsockError::CloneDescriptor(e)
498             })?;
499             let connection_evt_async = EventAsync::new(connection_evt_clone, ex).map_err(|e| {
500                 error!("Could not create EventAsync.");
501                 VsockError::CreateEventAsync(e)
502             })?;
503             futures.push(wait_event_and_return_port_pair(
504                 connection_evt_async,
505                 PortPair {
506                     host: CONNECTION_EVENT_PORT_NUM,
507                     guest: 0,
508                 },
509             ));
510 
511             // Wait to service the sockets. Note that for fairness, it is critical that we service
512             // all ready sockets in a single wakeup to avoid starvation. This is why ready_chunks
513             // is used, as it returns all currently *ready* futures from the stream.
514             //
515             // The expect here only triggers if the FuturesUnordered stream is exhausted. This never
516             // happens because it has at least one item, and we only request chunks once.
517             let futures_len = futures.len();
518             let mut ready_chunks = futures.ready_chunks(futures_len);
519             let ports = select_biased! {
520                 ports = ready_chunks.next() => {
521                     ports.expect("failed to wait on vsock sockets")
522                 }
523                 _ = stop_rx => {
524                     break;
525                 }
526             };
527 
528             for port in ports {
529                 if port.host == CONNECTION_EVENT_PORT_NUM {
530                     // New connection event. Setup futures again.
531                     if let Err(e) = self.connection_event.reset() {
532                         error!("vsock: port: {}: could not reset connection_event.", port);
533                         return Err(VsockError::ResetEventObject(e));
534                     }
535                     continue 'connections_changed;
536                 }
537                 let mut connections = self.connections.lock().await;
538                 let connection = if let Some(conn) = connections.get_mut(&port) {
539                     conn
540                 } else {
541                     // We could have been scheduled to run the rx queue *before* the connection was
542                     // closed. In that case, we do nothing. The code which closed the connection
543                     // (e.g. in response to a message in the tx/rx queues) will handle notifying
544                     // the guest/host as required.
545                     continue 'connections_changed;
546                 };
547 
548                 // Check if the peer has enough space in their buffer to
549                 // receive the maximum amount of data that we could possibly
550                 // read from the host pipe. If not, we should continue to
551                 // process other sockets as each socket has an independent
552                 // buffer.
553                 let peer_free_buf_size =
554                     connection.peer_buf_alloc - (connection.tx_cnt - connection.peer_recv_cnt);
555                 if peer_free_buf_size < TEMP_READ_BUF_SIZE_BYTES {
556                     if !connection.is_buffer_full {
557                         warn!(
558                             "vsock: port {}: Peer has insufficient free buffer space ({} bytes available)",
559                             port, peer_free_buf_size
560                         );
561                         connection.is_buffer_full = true;
562                     }
563                     continue;
564                 } else if connection.is_buffer_full {
565                     connection.is_buffer_full = false;
566                 }
567 
568                 let pipe_connection = &mut connection.pipe;
569                 let overlapped = &mut connection.overlapped;
570                 let guest_port = connection.guest_port;
571                 let buffer = &mut connection.buffer;
572 
573                 match overlapped.get_h_event_ref() {
574                     Some(h_event) => {
575                         if let Err(e) = h_event.reset() {
576                             error!(
577                                 "vsock: port: {}: Could not reset event in OverlappedWrapper.",
578                                 port
579                             );
580                             return Err(VsockError::ResetEventObject(e));
581                         }
582                     }
583                     None => {
584                         error!(
585                             "vsock: port: {}: missing h_event in OverlappedWrapper.",
586                             port
587                         );
588                         return Err(VsockError::BadOverlappedWrapper);
589                     }
590                 }
591 
592                 let data_size = match pipe_connection.get_overlapped_result(&mut *overlapped) {
593                     Ok(size) => size as usize,
594                     Err(e) => {
595                         error!("vsock: port {}: Failed to read from pipe {}", port, e);
596                         // TODO(b/237278629): Close the connection if we fail to read.
597                         continue 'connections_changed;
598                     }
599                 };
600 
601                 let response_header = virtio_vsock_hdr {
602                     src_cid: 2.into(),              // Host CID
603                     dst_cid: self.guest_cid.into(), // Guest CID
604                     src_port: Le32::from(port.host),
605                     dst_port: guest_port,
606                     len: Le32::from(data_size as u32),
607                     type_: TYPE_STREAM_SOCKET.into(),
608                     op: vsock_op::VIRTIO_VSOCK_OP_RW.into(),
609                     buf_alloc: Le32::from(connection.buf_alloc as u32),
610                     fwd_cnt: Le32::from(connection.recv_cnt as u32),
611                     ..Default::default()
612                 };
613 
614                 connection.prev_recv_cnt = connection.recv_cnt;
615 
616                 // We have to only write to the queue once, so we construct a new buffer
617                 // with the concatenated header and data.
618                 const HEADER_SIZE: usize = std::mem::size_of::<virtio_vsock_hdr>();
619                 let data_read = &buffer[..data_size];
620                 let mut header_and_data = vec![0u8; HEADER_SIZE + data_size];
621                 header_and_data[..HEADER_SIZE].copy_from_slice(response_header.as_bytes());
622                 header_and_data[HEADER_SIZE..].copy_from_slice(data_read);
623                 {
624                     let mut recv_queue_lock = recv_queue.lock().await;
625                     let write_fut = self
626                         .write_bytes_to_queue(
627                             &mut recv_queue_lock,
628                             &mut rx_queue_evt,
629                             &header_and_data[..],
630                         )
631                         .fuse();
632                     pin_mut!(write_fut);
633                     // If `stop_rx` is fired but the virt queue is full, this loop will break
634                     // without draining the `header_and_data`.
635                     select_biased! {
636                         write = write_fut => {},
637                         _ = stop_rx => {
638                             break;
639                         }
640                     }
641                 }
642 
643                 connection.tx_cnt += data_size;
644 
645                 // Start reading again so we receive the message and
646                 // event signal immediately.
647 
648                 // SAFETY:
649                 // Unsafe because the read could happen at any time
650                 // after this function is called. We ensure safety
651                 // by allocating the buffer and overlapped struct
652                 // on the heap.
653                 unsafe {
654                     match pipe_connection.read_overlapped(&mut buffer[..], &mut *overlapped) {
655                         Ok(()) => {}
656                         Err(e) => {
657                             error!("vsock: port {}: Failed to read from pipe {}", port, e);
658                         }
659                     }
660                 }
661             }
662         }
663         Ok(())
664     }
665 
process_tx_queue( &self, mut queue: Queue, mut queue_evt: EventAsync, mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>, mut stop_rx: oneshot::Receiver<()>, ) -> Result<Queue>666     async fn process_tx_queue(
667         &self,
668         mut queue: Queue,
669         mut queue_evt: EventAsync,
670         mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>,
671         mut stop_rx: oneshot::Receiver<()>,
672     ) -> Result<Queue> {
673         loop {
674             // Run continuously until exit evt
675             let mut avail_desc = match queue
676                 .next_async_interruptable(&mut queue_evt, &mut stop_rx)
677                 .await
678             {
679                 Ok(Some(d)) => d,
680                 Ok(None) => break,
681                 Err(e) => {
682                     error!("vsock: Failed to read descriptor {}", e);
683                     return Err(VsockError::AwaitQueue(e));
684                 }
685             };
686 
687             let reader = &mut avail_desc.reader;
688             while reader.available_bytes() >= std::mem::size_of::<virtio_vsock_hdr>() {
689                 let header = match reader.read_obj::<virtio_vsock_hdr>() {
690                     Ok(hdr) => hdr,
691                     Err(e) => {
692                         error!("vsock: Error while reading header: {}", e);
693                         break;
694                     }
695                 };
696 
697                 let len = header.len.to_native() as usize;
698                 if reader.available_bytes() < len {
699                     error!("vsock: Error reading packet data");
700                     break;
701                 }
702 
703                 let mut data = vec![0_u8; len];
704                 if len > 0 {
705                     if let Err(e) = reader.read_exact(&mut data) {
706                         error!("vosck: failed to read data from tx packet: {:?}", e);
707                     }
708                 }
709 
710                 if let Err(e) = process_packets_queue.send((header, data)).await {
711                     error!(
712                         "Error while sending packet to queue, dropping packet: {:?}",
713                         e
714                     )
715                 };
716             }
717 
718             queue.add_used(avail_desc, 0);
719             queue.trigger_interrupt();
720         }
721 
722         Ok(queue)
723     }
724 
calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize725     fn calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize {
726         match pipe.get_info() {
727             Ok(info) => {
728                 if info.outgoing_buffer_size > 0 {
729                     info.outgoing_buffer_size as usize
730                 } else {
731                     info!(
732                         "vsock: port {}: using default extra rx buffer size \
733                                             (named pipe does not have an explicit buffer size)",
734                         port
735                     );
736 
737                     // A zero buffer size implies that the buffer grows as
738                     // needed. We set our own cap here for flow control
739                     // purposes.
740                     DEFAULT_BUF_ALLOC_BYTES
741                 }
742             }
743             Err(e) => {
744                 error!(
745                     "vsock: port {}: failed to get named pipe info, using default \
746                                         buf size. Error: {}",
747                     port, e
748                 );
749                 DEFAULT_BUF_ALLOC_BYTES
750             }
751         }
752     }
753 
754     /// Processes a connection request and returns whether to return a response (true), or reset
755     /// (false).
handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool756     async fn handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool {
757         let port = PortPair::from_tx_header(&header);
758         info!("vsock: port {}: Received connection request", port);
759 
760         if self.connections.read_lock().await.contains_key(&port) {
761             // Connection exists, nothing for us to do.
762             warn!(
763                 "vsock: port {}: accepting connection request on already connected port",
764                 port
765             );
766             return true;
767         }
768 
769         if self.host_guid.is_none() {
770             error!(
771                 "vsock: port {}: Cannot accept guest-initiated connections \
772                         without host-guid, rejecting connection",
773                 port,
774             );
775             return false;
776         }
777 
778         // We have a new connection to establish.
779         let mut overlapped_wrapper =
780             Box::new(OverlappedWrapper::new(/* include_event= */ true).unwrap());
781         let pipe_result = named_pipes::create_client_pipe(
782             get_pipe_name(
783                 self.host_guid.as_ref().unwrap(),
784                 header.dst_port.to_native(),
785             )
786             .as_str(),
787             &FramingMode::Byte,
788             &BlockingMode::Wait,
789             true, /* overlapped */
790         );
791 
792         match pipe_result {
793             Ok(mut pipe_connection) => {
794                 let mut buffer = Box::new([0u8; TEMP_READ_BUF_SIZE_BYTES]);
795                 info!("vsock: port {}: created client pipe", port);
796 
797                 // SAFETY:
798                 // Unsafe because the read could happen at any time
799                 // after this function is called. We ensure safety
800                 // by allocating the buffer and overlapped struct
801                 // on the heap.
802                 unsafe {
803                     match pipe_connection.read_overlapped(&mut buffer[..], &mut overlapped_wrapper)
804                     {
805                         Ok(()) => {}
806                         Err(e) => {
807                             error!("vsock: port {}: Failed to read from pipe {}", port, e);
808                             return false;
809                         }
810                     }
811                 }
812                 info!("vsock: port {}: started read on client pipe", port);
813 
814                 let buf_alloc = Self::calculate_buf_alloc_from_pipe(&pipe_connection, port);
815                 let connection = VsockConnection {
816                     guest_port: header.src_port,
817                     pipe: pipe_connection,
818                     overlapped: overlapped_wrapper,
819                     peer_buf_alloc: header.buf_alloc.to_native() as usize,
820                     peer_recv_cnt: header.fwd_cnt.to_native() as usize,
821                     buf_alloc,
822                     buffer,
823                     // The connection has just been made, so we haven't received
824                     // anything yet.
825                     recv_cnt: 0_usize,
826                     prev_recv_cnt: 0_usize,
827                     tx_cnt: 0_usize,
828                     is_buffer_full: false,
829                 };
830                 self.connections.lock().await.insert(port, connection);
831                 self.connection_event.signal().unwrap_or_else(|_| {
832                     panic!(
833                         "Failed to signal new connection event for vsock port {}.",
834                         port
835                     )
836                 });
837                 info!("vsock: port {}: signaled connection ready", port);
838                 true
839             }
840             Err(e) => {
841                 info!(
842                     "vsock: No waiting pipe connection on port {}, \
843                                 not connecting (err: {:?})",
844                     port, e
845                 );
846                 false
847             }
848         }
849     }
850 
handle_vsock_guest_data( &self, header: virtio_vsock_hdr, data: &[u8], ex: &Executor, ) -> Result<()>851     async fn handle_vsock_guest_data(
852         &self,
853         header: virtio_vsock_hdr,
854         data: &[u8],
855         ex: &Executor,
856     ) -> Result<()> {
857         let port = PortPair::from_tx_header(&header);
858         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
859         {
860             let mut connections = self.connections.lock().await;
861             if let Some(connection) = connections.get_mut(&port) {
862                 // Update peer buffer/recv counters
863                 connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
864                 connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
865 
866                 let pipe = &mut connection.pipe;
867                 // We have to provide a OVERLAPPED struct to write to the pipe.
868                 //
869                 // SAFETY: safe because data & overlapped_wrapper live until the
870                 // overlapped operation completes (we wait on completion below).
871                 if let Err(e) = unsafe { pipe.write_overlapped(data, &mut overlapped_wrapper) } {
872                     return Err(VsockError::WriteFailed(port, e));
873                 }
874             } else {
875                 error!(
876                     "vsock: Guest attempted to send data packet over unconnected \
877                             port ({}), dropping packet",
878                     port
879                 );
880                 return Ok(());
881             }
882         }
883         if let Some(write_completed_event) = overlapped_wrapper.get_h_event_ref() {
884             // Don't block the executor while the write completes. This time should
885             // always be negligible, but will sometimes be non-zero in cases where
886             // traffic is high on the NamedPipe, especially a duplex pipe.
887             if let Ok(cloned_event) = write_completed_event.try_clone() {
888                 if let Ok(async_event) = EventAsync::new_without_reset(cloned_event, ex) {
889                     let _ = async_event.next_val().await;
890                 } else {
891                     error!(
892                         "vsock: port {}: Failed to convert write event to async",
893                         port
894                     );
895                 }
896             } else {
897                 error!(
898                     "vsock: port {}: Failed to clone write completion event",
899                     port
900                 );
901             }
902         } else {
903             error!(
904                 "vsock: port: {}: Failed to get overlapped event for write",
905                 port
906             );
907         }
908 
909         let mut connections = self.connections.lock().await;
910         if let Some(connection) = connections.get_mut(&port) {
911             let pipe = &mut connection.pipe;
912             match pipe.get_overlapped_result(&mut overlapped_wrapper) {
913                 Ok(len) => {
914                     // We've received bytes from the guest, account for them in our
915                     // received bytes counter.
916                     connection.recv_cnt += len as usize;
917 
918                     if len != data.len() as u32 {
919                         return Err(VsockError::WriteFailed(
920                             port,
921                             std::io::Error::new(
922                                 std::io::ErrorKind::Other,
923                                 format!(
924                                     "port {} failed to write correct number of bytes:
925                                         (expected: {}, wrote: {})",
926                                     port,
927                                     data.len(),
928                                     len
929                                 ),
930                             ),
931                         ));
932                     }
933                 }
934                 Err(e) => {
935                     return Err(VsockError::WriteFailed(port, e));
936                 }
937             }
938         } else {
939             error!(
940                 "vsock: Guest attempted to send data packet over unconnected \
941                         port ({}), dropping packet",
942                 port
943             );
944         }
945         Ok(())
946     }
947 
process_tx_packets( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: Event, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, ex: &Executor, mut stop_rx: oneshot::Receiver<()>, )948     async fn process_tx_packets(
949         &self,
950         send_queue: &Arc<RwLock<Queue>>,
951         rx_queue_evt: Event,
952         mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
953         ex: &Executor,
954         mut stop_rx: oneshot::Receiver<()>,
955     ) {
956         let mut packet_queues = HashMap::new();
957         let mut futures = FuturesUnordered::new();
958         // Push a pending future that will never complete into FuturesUnordered.
959         // This will keep us from spinning on spurious notifications when we
960         // don't have any open connections.
961         futures.push(std::future::pending::<PortPair>().left_future());
962 
963         let mut stop_future = FuturesUnordered::new();
964         stop_future.push(stop_rx);
965         loop {
966             let (new_packet, connection, stop_rx_res) =
967                 select3(packet_recv_queue.next(), futures.next(), stop_future.next()).await;
968             match connection {
969                 SelectResult::Finished(Some(port)) => {
970                     packet_queues.remove(&port);
971                 }
972                 SelectResult::Finished(_) => {
973                     // This is only triggered when FuturesUnordered completes
974                     // all pending futures. Right now, this can never happen, as
975                     // we have a pending future that we push that will never
976                     // complete.
977                 }
978                 SelectResult::Pending(_) => {
979                     // Nothing to do.
980                 }
981             };
982             match new_packet {
983                 SelectResult::Finished(Some(packet)) => {
984                     let port = PortPair::from_tx_header(&packet.0);
985                     let queue = packet_queues.entry(port).or_insert_with(|| {
986                         let (send, recv) = mpsc::channel(CHANNEL_SIZE);
987                         let event_async = EventAsync::new(
988                             rx_queue_evt.try_clone().expect("Failed to clone event"),
989                             ex,
990                         )
991                         .expect("Failed to set up the rx queue event");
992                         futures.push(
993                             self.process_tx_packets_for_port(
994                                 port,
995                                 recv,
996                                 send_queue,
997                                 event_async,
998                                 ex,
999                             )
1000                             .right_future(),
1001                         );
1002                         send
1003                     });
1004                     // Try to send the packet. Do not block other ports if the queue is full.
1005                     if let Err(e) = queue.try_send(packet) {
1006                         error!(
1007                             "vsock: port {}: error sending packet to queue, dropping packet: {:?}",
1008                             port, e
1009                         )
1010                     }
1011                 }
1012                 SelectResult::Finished(_) => {
1013                     // Triggers when the channel is closed; no more packets coming.
1014                     packet_recv_queue.close();
1015                     return;
1016                 }
1017                 SelectResult::Pending(_) => {
1018                     // Nothing to do.
1019                 }
1020             }
1021             match stop_rx_res {
1022                 SelectResult::Finished(_) => {
1023                     break;
1024                 }
1025                 SelectResult::Pending(_) => {
1026                     // Nothing to do.
1027                 }
1028             }
1029         }
1030     }
1031 
process_tx_packets_for_port( &self, port: PortPair, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, send_queue: &Arc<RwLock<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, ) -> PortPair1032     async fn process_tx_packets_for_port(
1033         &self,
1034         port: PortPair,
1035         mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
1036         send_queue: &Arc<RwLock<Queue>>,
1037         mut rx_queue_evt: EventAsync,
1038         ex: &Executor,
1039     ) -> PortPair {
1040         while let Some((header, data)) = packet_recv_queue.next().await {
1041             if !self
1042                 .handle_tx_packet(header, &data, send_queue, &mut rx_queue_evt, ex)
1043                 .await
1044             {
1045                 packet_recv_queue.close();
1046                 if let Ok(Some(_)) = packet_recv_queue.try_next() {
1047                     warn!("vsock: closing port {} with unprocessed packets", port);
1048                 } else {
1049                     info!("vsock: closing port {} cleanly", port)
1050                 }
1051                 break;
1052             }
1053         }
1054         port
1055     }
1056 
handle_tx_packet( &self, header: virtio_vsock_hdr, data: &[u8], send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, ex: &Executor, ) -> bool1057     async fn handle_tx_packet(
1058         &self,
1059         header: virtio_vsock_hdr,
1060         data: &[u8],
1061         send_queue: &Arc<RwLock<Queue>>,
1062         rx_queue_evt: &mut EventAsync,
1063         ex: &Executor,
1064     ) -> bool {
1065         let mut is_open = true;
1066         let port = PortPair::from_tx_header(&header);
1067         match header.op.to_native() {
1068             vsock_op::VIRTIO_VSOCK_OP_INVALID => {
1069                 error!("vsock: Invalid Operation requested, dropping packet");
1070             }
1071             vsock_op::VIRTIO_VSOCK_OP_REQUEST => {
1072                 let (resp_op, buf_alloc, fwd_cnt) =
1073                     if self.handle_vsock_connection_request(header).await {
1074                         let connections = self.connections.read_lock().await;
1075 
1076                         connections.get(&port).map_or_else(
1077                             || {
1078                                 warn!("vsock: port: {} connection closed during connect", port);
1079                                 is_open = false;
1080                                 (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
1081                             },
1082                             |conn| {
1083                                 (
1084                                     vsock_op::VIRTIO_VSOCK_OP_RESPONSE,
1085                                     conn.buf_alloc as u32,
1086                                     conn.recv_cnt as u32,
1087                                 )
1088                             },
1089                         )
1090                     } else {
1091                         is_open = false;
1092                         (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
1093                     };
1094 
1095                 let response_header = virtio_vsock_hdr {
1096                     src_cid: { header.dst_cid },
1097                     dst_cid: { header.src_cid },
1098                     src_port: { header.dst_port },
1099                     dst_port: { header.src_port },
1100                     len: 0.into(),
1101                     type_: TYPE_STREAM_SOCKET.into(),
1102                     op: resp_op.into(),
1103                     buf_alloc: Le32::from(buf_alloc),
1104                     fwd_cnt: Le32::from(fwd_cnt),
1105                     ..Default::default()
1106                 };
1107                 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
1108                 // bytes.
1109                 self.write_bytes_to_queue(
1110                     &mut *send_queue.lock().await,
1111                     rx_queue_evt,
1112                     response_header.as_bytes(),
1113                 )
1114                 .await
1115                 .expect("vsock: failed to write to queue");
1116                 info!(
1117                     "vsock: port {}: replied {} to connection request",
1118                     port,
1119                     if resp_op == vsock_op::VIRTIO_VSOCK_OP_RESPONSE {
1120                         "success"
1121                     } else {
1122                         "reset"
1123                     },
1124                 );
1125             }
1126             vsock_op::VIRTIO_VSOCK_OP_RESPONSE => {
1127                 // TODO(b/237811512): Implement this for host-initiated connections
1128             }
1129             vsock_op::VIRTIO_VSOCK_OP_RST => {
1130                 // TODO(b/237811512): Implement this for host-initiated connections
1131             }
1132             vsock_op::VIRTIO_VSOCK_OP_SHUTDOWN => {
1133                 // While the header provides flags to specify tx/rx-specific shutdown,
1134                 // we only support full shutdown.
1135                 // TODO(b/237811512): Provide an optimal way to notify host of shutdowns
1136                 // while still maintaining easy reconnections.
1137                 let mut connections = self.connections.lock().await;
1138                 if connections.remove(&port).is_some() {
1139                     let mut response = virtio_vsock_hdr {
1140                         src_cid: { header.dst_cid },
1141                         dst_cid: { header.src_cid },
1142                         src_port: { header.dst_port },
1143                         dst_port: { header.src_port },
1144                         len: 0.into(),
1145                         type_: TYPE_STREAM_SOCKET.into(),
1146                         op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1147                         // There is no buffer on a closed connection
1148                         buf_alloc: 0.into(),
1149                         // There is no fwd_cnt anymore on a closed connection
1150                         fwd_cnt: 0.into(),
1151                         ..Default::default()
1152                     };
1153                     // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
1154                     // bytes
1155                     self.write_bytes_to_queue(
1156                         &mut *send_queue.lock().await,
1157                         rx_queue_evt,
1158                         response.as_mut_bytes(),
1159                     )
1160                     .await
1161                     .expect("vsock: failed to write to queue");
1162                     self.connection_event
1163                         .signal()
1164                         .expect("vsock: failed to write to event");
1165                     info!("vsock: port: {}: disconnected by the guest", port);
1166                 } else {
1167                     error!("vsock: Attempted to close unopened port: {}", port);
1168                 }
1169                 is_open = false;
1170             }
1171             vsock_op::VIRTIO_VSOCK_OP_RW => {
1172                 match self.handle_vsock_guest_data(header, data, ex).await {
1173                     Ok(()) => {
1174                         if self
1175                             .check_free_buffer_threshold(header)
1176                             .await
1177                             .unwrap_or(false)
1178                         {
1179                             // Send a credit update if we're below the minimum free
1180                             // buffer size. We skip this if the connection is closed,
1181                             // which could've happened if we were closed on the other
1182                             // end.
1183                             info!(
1184                                 "vsock: port {}: Buffer below threshold; sending credit update.",
1185                                 port
1186                             );
1187                             self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1188                                 .await;
1189                         }
1190                     }
1191                     Err(e) => {
1192                         error!("vsock: port {}: resetting connection: {}", port, e);
1193                         self.send_vsock_reset(send_queue, rx_queue_evt, header)
1194                             .await;
1195                         is_open = false;
1196                     }
1197                 }
1198             }
1199             // An update from our peer with their buffer state, which they are sending
1200             // (probably) due to a a credit request *we* made.
1201             vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE => {
1202                 let port = PortPair::from_tx_header(&header);
1203                 let mut connections = self.connections.lock().await;
1204                 if let Some(connection) = connections.get_mut(&port) {
1205                     connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
1206                     connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
1207                 } else {
1208                     error!("vsock: port {}: got credit update on unknown port", port);
1209                     is_open = false;
1210                 }
1211             }
1212             // A request from our peer to get *our* buffer state. We reply to the RX queue.
1213             vsock_op::VIRTIO_VSOCK_OP_CREDIT_REQUEST => {
1214                 info!(
1215                     "vsock: port {}: Got credit request from peer; sending credit update.",
1216                     port,
1217                 );
1218                 self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1219                     .await;
1220             }
1221             _ => {
1222                 error!(
1223                     "vsock: port {}: unknown operation requested, dropping packet",
1224                     port
1225                 );
1226             }
1227         }
1228         is_open
1229     }
1230 
1231     // Checks if how much free buffer our peer thinks that *we* have available
1232     // is below our threshold percentage. If the connection is closed, returns `None`.
check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool>1233     async fn check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool> {
1234         let mut connections = self.connections.lock().await;
1235         let port = PortPair::from_tx_header(&header);
1236         connections.get_mut(&port).map(|connection| {
1237             let threshold: usize = (MIN_FREE_BUFFER_PCT * connection.buf_alloc as f64) as usize;
1238             connection.buf_alloc - (connection.recv_cnt - connection.prev_recv_cnt) < threshold
1239         })
1240     }
1241 
send_vsock_credit_update( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1242     async fn send_vsock_credit_update(
1243         &self,
1244         send_queue: &Arc<RwLock<Queue>>,
1245         rx_queue_evt: &mut EventAsync,
1246         header: virtio_vsock_hdr,
1247     ) {
1248         let mut connections = self.connections.lock().await;
1249         let port = PortPair::from_tx_header(&header);
1250 
1251         if let Some(connection) = connections.get_mut(&port) {
1252             let mut response = virtio_vsock_hdr {
1253                 src_cid: { header.dst_cid },
1254                 dst_cid: { header.src_cid },
1255                 src_port: { header.dst_port },
1256                 dst_port: { header.src_port },
1257                 len: 0.into(),
1258                 type_: TYPE_STREAM_SOCKET.into(),
1259                 op: vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE.into(),
1260                 buf_alloc: Le32::from(connection.buf_alloc as u32),
1261                 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1262                 ..Default::default()
1263             };
1264 
1265             connection.prev_recv_cnt = connection.recv_cnt;
1266 
1267             // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1268             // to bytes
1269             self.write_bytes_to_queue(
1270                 &mut *send_queue.lock().await,
1271                 rx_queue_evt,
1272                 response.as_mut_bytes(),
1273             )
1274             .await
1275             .unwrap_or_else(|_| panic!("vsock: port {}: failed to write to queue", port));
1276         } else {
1277             error!(
1278                 "vsock: port {}: error sending credit update on unknown port",
1279                 port
1280             );
1281         }
1282     }
1283 
send_vsock_reset( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1284     async fn send_vsock_reset(
1285         &self,
1286         send_queue: &Arc<RwLock<Queue>>,
1287         rx_queue_evt: &mut EventAsync,
1288         header: virtio_vsock_hdr,
1289     ) {
1290         let mut connections = self.connections.lock().await;
1291         let port = PortPair::from_tx_header(&header);
1292         if let Some(connection) = connections.remove(&port) {
1293             let mut response = virtio_vsock_hdr {
1294                 src_cid: { header.dst_cid },
1295                 dst_cid: { header.src_cid },
1296                 src_port: { header.dst_port },
1297                 dst_port: { header.src_port },
1298                 len: 0.into(),
1299                 type_: TYPE_STREAM_SOCKET.into(),
1300                 op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1301                 buf_alloc: Le32::from(connection.buf_alloc as u32),
1302                 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1303                 ..Default::default()
1304             };
1305 
1306             // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1307             // to bytes
1308             self.write_bytes_to_queue(
1309                 &mut *send_queue.lock().await,
1310                 rx_queue_evt,
1311                 response.as_mut_bytes(),
1312             )
1313             .await
1314             .expect("failed to write to queue");
1315         } else {
1316             error!("vsock: port {}: error closing unknown port", port);
1317         }
1318     }
1319 
write_bytes_to_queue( &self, queue: &mut Queue, queue_evt: &mut EventAsync, bytes: &[u8], ) -> Result<()>1320     async fn write_bytes_to_queue(
1321         &self,
1322         queue: &mut Queue,
1323         queue_evt: &mut EventAsync,
1324         bytes: &[u8],
1325     ) -> Result<()> {
1326         let mut avail_desc = match queue.next_async(queue_evt).await {
1327             Ok(d) => d,
1328             Err(e) => {
1329                 error!("vsock: failed to read descriptor {}", e);
1330                 return Err(VsockError::AwaitQueue(e));
1331             }
1332         };
1333         self.write_bytes_to_queue_inner(queue, avail_desc, bytes)
1334     }
1335 
write_bytes_to_queue_interruptable( &self, queue: &mut Queue, queue_evt: &mut EventAsync, bytes: &[u8], mut stop_rx: &mut oneshot::Receiver<()>, ) -> Result<()>1336     async fn write_bytes_to_queue_interruptable(
1337         &self,
1338         queue: &mut Queue,
1339         queue_evt: &mut EventAsync,
1340         bytes: &[u8],
1341         mut stop_rx: &mut oneshot::Receiver<()>,
1342     ) -> Result<()> {
1343         let mut avail_desc = match queue.next_async_interruptable(queue_evt, stop_rx).await {
1344             Ok(d) => match d {
1345                 Some(desc) => desc,
1346                 None => return Ok(()),
1347             },
1348             Err(e) => {
1349                 error!("vsock: failed to read descriptor {}", e);
1350                 return Err(VsockError::AwaitQueue(e));
1351             }
1352         };
1353         self.write_bytes_to_queue_inner(queue, avail_desc, bytes)
1354     }
1355 
write_bytes_to_queue_inner( &self, queue: &mut Queue, mut desc_chain: DescriptorChain, bytes: &[u8], ) -> Result<()>1356     fn write_bytes_to_queue_inner(
1357         &self,
1358         queue: &mut Queue,
1359         mut desc_chain: DescriptorChain,
1360         bytes: &[u8],
1361     ) -> Result<()> {
1362         let writer = &mut desc_chain.writer;
1363         let res = writer.write_all(bytes);
1364 
1365         if let Err(e) = res {
1366             error!(
1367                 "vsock: failed to write {} bytes to queue, err: {:?}",
1368                 bytes.len(),
1369                 e
1370             );
1371             return Err(VsockError::WriteQueue(e));
1372         }
1373 
1374         let bytes_written = writer.bytes_written() as u32;
1375         if bytes_written > 0 {
1376             queue.add_used(desc_chain, bytes_written);
1377             queue.trigger_interrupt();
1378             Ok(())
1379         } else {
1380             error!("vsock: failed to write bytes to queue");
1381             Err(VsockError::WriteQueue(std::io::Error::new(
1382                 std::io::ErrorKind::Other,
1383                 "failed to write bytes to queue",
1384             )))
1385         }
1386     }
1387 
process_event_queue( &self, mut queue: Queue, mut queue_evt: EventAsync, mut stop_rx: oneshot::Receiver<()>, mut vsock_event_receiver: mpsc::Receiver<virtio_vsock_event>, ) -> Result<Queue>1388     async fn process_event_queue(
1389         &self,
1390         mut queue: Queue,
1391         mut queue_evt: EventAsync,
1392         mut stop_rx: oneshot::Receiver<()>,
1393         mut vsock_event_receiver: mpsc::Receiver<virtio_vsock_event>,
1394     ) -> Result<Queue> {
1395         loop {
1396             let vsock_event = select_biased! {
1397                 vsock_event = vsock_event_receiver.next() => {
1398                     vsock_event
1399                 }
1400                 _ = stop_rx => {
1401                     break;
1402                 }
1403             };
1404             let vsock_event = match vsock_event {
1405                 Some(event) => event,
1406                 None => break,
1407             };
1408             self.write_bytes_to_queue_interruptable(
1409                 &mut queue,
1410                 &mut queue_evt,
1411                 vsock_event.as_bytes(),
1412                 &mut stop_rx,
1413             )
1414             .await?;
1415         }
1416         Ok(queue)
1417     }
1418 
run( mut self, rx_queue: Queue, tx_queue: Queue, event_queue: Queue, kill_evt: Event, ) -> Result<Option<(PausedQueues, VsockConnectionMap)>>1419     fn run(
1420         mut self,
1421         rx_queue: Queue,
1422         tx_queue: Queue,
1423         event_queue: Queue,
1424         kill_evt: Event,
1425     ) -> Result<Option<(PausedQueues, VsockConnectionMap)>> {
1426         let rx_queue_evt = rx_queue
1427             .event()
1428             .try_clone()
1429             .map_err(VsockError::CloneDescriptor)?;
1430 
1431         // Note that this mutex won't ever be contended because the HandleExecutor is single
1432         // threaded. We need the mutex for compile time correctness, but technically it is not
1433         // actually providing mandatory locking, at least not at the moment. If we later use a
1434         // multi-threaded executor, then this lock will be important.
1435         let rx_queue_arc = Arc::new(RwLock::new(rx_queue));
1436 
1437         // Run executor / create futures in a scope, preventing extra reference to `rx_queue_arc`.
1438         let res = {
1439             let ex = Executor::new().unwrap();
1440 
1441             let rx_evt_async = EventAsync::new(
1442                 rx_queue_evt
1443                     .try_clone()
1444                     .map_err(VsockError::CloneDescriptor)?,
1445                 &ex,
1446             )
1447             .expect("Failed to set up the rx queue event");
1448             let mut stop_queue_oneshots = Vec::new();
1449 
1450             let vsock_event_receiver = self
1451                 .device_event_queue_rx
1452                 .take()
1453                 .expect("event queue rx must be present");
1454 
1455             let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1456             let rx_handler =
1457                 self.process_rx_queue(rx_queue_arc.clone(), rx_evt_async, &ex, stop_rx);
1458             let rx_handler = rx_handler.fuse();
1459             pin_mut!(rx_handler);
1460 
1461             let (send, recv) = mpsc::channel(CHANNEL_SIZE);
1462 
1463             let tx_evt_async = EventAsync::new(
1464                 tx_queue
1465                     .event()
1466                     .try_clone()
1467                     .map_err(VsockError::CloneDescriptor)?,
1468                 &ex,
1469             )
1470             .expect("Failed to set up the tx queue event");
1471             let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1472             let tx_handler = self.process_tx_queue(tx_queue, tx_evt_async, send, stop_rx);
1473             let tx_handler = tx_handler.fuse();
1474             pin_mut!(tx_handler);
1475 
1476             let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1477             let packet_handler =
1478                 self.process_tx_packets(&rx_queue_arc, rx_queue_evt, recv, &ex, stop_rx);
1479             let packet_handler = packet_handler.fuse();
1480             pin_mut!(packet_handler);
1481 
1482             let event_evt_async = EventAsync::new(
1483                 event_queue
1484                     .event()
1485                     .try_clone()
1486                     .map_err(VsockError::CloneDescriptor)?,
1487                 &ex,
1488             )
1489             .expect("Failed to set up the event queue event");
1490             let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1491             let event_handler = self.process_event_queue(
1492                 event_queue,
1493                 event_evt_async,
1494                 stop_rx,
1495                 vsock_event_receiver,
1496             );
1497             let event_handler = event_handler.fuse();
1498             pin_mut!(event_handler);
1499 
1500             let kill_evt = EventAsync::new(kill_evt, &ex).expect("Failed to set up the kill event");
1501             let kill_handler = kill_evt.next_val();
1502             pin_mut!(kill_handler);
1503 
1504             let mut device_event_queue_tx = self.device_event_queue_tx.clone();
1505             if self.send_protocol_reset {
1506                 ex.run_until(async move { device_event_queue_tx.send(
1507                    virtio_vsock_event {
1508                        id: virtio_sys::virtio_vsock::virtio_vsock_event_id_VIRTIO_VSOCK_EVENT_TRANSPORT_RESET
1509                            .into(),
1510                    }).await
1511                 }).expect("failed to write to empty mpsc queue.");
1512             }
1513 
1514             ex.run_until(async {
1515                 select! {
1516                     _ = kill_handler.fuse() => (),
1517                     _ = rx_handler => return Err(anyhow!("rx_handler stopped unexpetedly")),
1518                     _ = tx_handler => return Err(anyhow!("tx_handler stop unexpectedly.")),
1519                     _ = packet_handler => return Err(anyhow!("packet_handler stop unexpectedly.")),
1520                     _ = event_handler => return Err(anyhow!("event_handler stop unexpectedly.")),
1521                 }
1522                 // kill_evt has fired
1523 
1524                 for stop_tx in stop_queue_oneshots {
1525                     if stop_tx.send(()).is_err() {
1526                         return Err(anyhow!("failed to request stop for queue future"));
1527                     }
1528                 }
1529 
1530                 rx_handler.await.context("Failed to stop rx handler.")?;
1531                 packet_handler.await;
1532 
1533                 Ok((
1534                     tx_handler.await.context("Failed to stop tx handler.")?,
1535                     event_handler
1536                         .await
1537                         .context("Failed to stop event handler.")?,
1538                 ))
1539             })
1540         };
1541 
1542         // At this point, a request to stop this worker has been sent or an error has happened in
1543         // one of the futures, which will stop this worker anyways.
1544 
1545         let queues_and_connections = match res {
1546             Ok(main_future_res) => match main_future_res {
1547                 Ok((tx_queue, event_handler_queue)) => {
1548                     let rx_queue = match Arc::try_unwrap(rx_queue_arc) {
1549                         Ok(queue_rw_lock) => queue_rw_lock.into_inner(),
1550                         Err(_) => panic!("failed to recover queue from worker"),
1551                     };
1552                     let paused_queues = PausedQueues::new(rx_queue, tx_queue, event_handler_queue);
1553                     Some((paused_queues, self.connections))
1554                 }
1555                 Err(e) => {
1556                     error!("Error happened in a vsock future: {}", e);
1557                     None
1558                 }
1559             },
1560             Err(e) => {
1561                 error!("error happened in executor: {}", e);
1562                 None
1563             }
1564         };
1565 
1566         Ok(queues_and_connections)
1567     }
1568 }
1569 
1570 /// Queues & events for the vsock device.
1571 struct VsockQueues {
1572     rx: Queue,
1573     tx: Queue,
1574     event: Queue,
1575 }
1576 
1577 impl TryFrom<BTreeMap<usize, Queue>> for VsockQueues {
1578     type Error = anyhow::Error;
try_from(mut queues: BTreeMap<usize, Queue>) -> result::Result<Self, Self::Error>1579     fn try_from(mut queues: BTreeMap<usize, Queue>) -> result::Result<Self, Self::Error> {
1580         if queues.len() < 3 {
1581             anyhow::bail!(
1582                 "{} queues were found, but an activated vsock must have at 3 active queues.",
1583                 queues.len()
1584             );
1585         }
1586 
1587         Ok(VsockQueues {
1588             rx: queues.remove(&0).context("the rx queue is required.")?,
1589             tx: queues.remove(&1).context("the tx queue is required.")?,
1590             event: queues.remove(&2).context("the event queue is required.")?,
1591         })
1592     }
1593 }
1594 
1595 impl From<PausedQueues> for BTreeMap<usize, Queue> {
from(queues: PausedQueues) -> Self1596     fn from(queues: PausedQueues) -> Self {
1597         let mut ret = BTreeMap::new();
1598         ret.insert(0, queues.rx_queue);
1599         ret.insert(1, queues.tx_queue);
1600         ret.insert(2, queues.event_queue);
1601         ret
1602     }
1603 }
1604 
1605 struct PausedQueues {
1606     rx_queue: Queue,
1607     tx_queue: Queue,
1608     event_queue: Queue,
1609 }
1610 
1611 impl PausedQueues {
new(rx_queue: Queue, tx_queue: Queue, event_queue: Queue) -> Self1612     fn new(rx_queue: Queue, tx_queue: Queue, event_queue: Queue) -> Self {
1613         PausedQueues {
1614             rx_queue,
1615             tx_queue,
1616             event_queue,
1617         }
1618     }
1619 }
1620 
get_pipe_name(guid: &str, pipe: u32) -> String1621 fn get_pipe_name(guid: &str, pipe: u32) -> String {
1622     format!("\\\\.\\pipe\\{}\\vsock-{}", guid, pipe)
1623 }
1624 
wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair1625 async fn wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair {
1626     // This doesn't reset the event since we have to call GetOverlappedResult
1627     // on the OVERLAPPED struct first before resetting it.
1628     let _ = evt.get_io_source_ref().wait_for_handle().await;
1629     pair
1630 }
1631