• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::RefCell;
6 use std::collections::HashMap;
7 use std::fmt;
8 use std::fmt::Display;
9 use std::io;
10 use std::io::Read;
11 use std::io::Write;
12 use std::os::windows::io::RawHandle;
13 use std::rc::Rc;
14 use std::result;
15 use std::sync::Arc;
16 use std::thread;
17 
18 use anyhow::anyhow;
19 use anyhow::Context;
20 use base::error;
21 use base::info;
22 use base::named_pipes;
23 use base::named_pipes::BlockingMode;
24 use base::named_pipes::FramingMode;
25 use base::named_pipes::OverlappedWrapper;
26 use base::named_pipes::PipeConnection;
27 use base::warn;
28 use base::AsRawDescriptor;
29 use base::Error as SysError;
30 use base::Event;
31 use base::EventExt;
32 use base::WorkerThread;
33 use cros_async::select2;
34 use cros_async::select6;
35 use cros_async::sync::Mutex;
36 use cros_async::AsyncError;
37 use cros_async::EventAsync;
38 use cros_async::Executor;
39 use cros_async::SelectResult;
40 use data_model::Le32;
41 use data_model::Le64;
42 use futures::channel::mpsc;
43 use futures::pin_mut;
44 use futures::stream::FuturesUnordered;
45 use futures::FutureExt;
46 use futures::SinkExt;
47 use futures::StreamExt;
48 use remain::sorted;
49 use thiserror::Error as ThisError;
50 use vm_memory::GuestMemory;
51 use zerocopy::AsBytes;
52 use zerocopy::FromBytes;
53 
54 use crate::virtio::async_utils;
55 use crate::virtio::copy_config;
56 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_config;
57 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_event;
58 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_hdr;
59 use crate::virtio::vsock::sys::windows::protocol::vsock_op;
60 use crate::virtio::vsock::sys::windows::protocol::TYPE_STREAM_SOCKET;
61 use crate::virtio::DescriptorError;
62 use crate::virtio::DeviceType;
63 use crate::virtio::Interrupt;
64 use crate::virtio::Queue;
65 use crate::virtio::Reader;
66 use crate::virtio::SignalableInterrupt;
67 use crate::virtio::VirtioDevice;
68 use crate::virtio::Writer;
69 use crate::Suspendable;
70 
71 #[sorted]
72 #[derive(ThisError, Debug)]
73 pub enum VsockError {
74     #[error("Failed to await next descriptor chain from queue: {0}")]
75     AwaitQueue(AsyncError),
76     #[error("OverlappedWrapper error.")]
77     BadOverlappedWrapper,
78     #[error("Failed to clone descriptor: {0}")]
79     CloneDescriptor(SysError),
80     #[error("Failed to create EventAsync: {0}")]
81     CreateEventAsync(AsyncError),
82     #[error("Failed to create queue reader: {0}")]
83     CreateReader(DescriptorError),
84     #[error("Failed to create wait context: {0}")]
85     CreateWaitContext(SysError),
86     #[error("Failed to create queue writer: {0}")]
87     CreateWriter(DescriptorError),
88     #[error("Failed to read queue: {0}")]
89     ReadQueue(io::Error),
90     #[error("Failed to reset event object: {0}")]
91     ResetEventObject(SysError),
92     #[error("Failed to run executor: {0}")]
93     RunExecutor(AsyncError),
94     #[error("Failed to write to pipe, port {0}: {1}")]
95     WriteFailed(PortPair, io::Error),
96     #[error("Failed to write queue: {0}")]
97     WriteQueue(io::Error),
98 }
99 pub type Result<T> = result::Result<T, VsockError>;
100 
101 // Vsock has three virt IO queues: rx, tx, and event.
102 const QUEUE_SIZE: u16 = 256;
103 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE, QUEUE_SIZE];
104 // We overload port numbers so that if message is to be received from
105 // CONNECTION_EVENT_PORT_NUM (invalid port number), we recognize that a
106 // new connection was set up.
107 const CONNECTION_EVENT_PORT_NUM: u32 = u32::MAX;
108 
109 /// Number of bytes in a kilobyte. Used to simplify and clarify buffer size definitions.
110 const KB: usize = 1024;
111 
112 /// Size of the buffer we read into from the host side named pipe. Note that data flows from the
113 /// host pipe -> this buffer -> rx queue.
114 /// This should be large enough to facilitate fast transmission of host data, see b/232950349.
115 const TEMP_READ_BUF_SIZE_BYTES: usize = 4 * KB;
116 
117 /// In the case where the host side named pipe does not have a specified buffer size, we'll default
118 /// to telling the guest that this is the amount of extra RX space available (e.g. buf_alloc).
119 /// This should be larger than the volume of data that the guest will generally send at one time to
120 /// minimize credit update packtes (see MIN_FREE_BUFFER_PCT below).
121 const DEFAULT_BUF_ALLOC_BYTES: usize = 128 * KB;
122 
123 /// Minimum free buffer threshold to notify the peer with a credit update
124 /// message. This is in case we are ingesting many messages without an
125 /// opportunity to send a message back to the peer with a buffer size update.
126 /// This value is a percentage of `buf_alloc`.
127 /// TODO(b/204246759): This value was chosen without much more thought than "it
128 /// works". It should probably be adjusted, along with DEFAULT_BUF_ALLOC, to a
129 /// value that makes empirical sense for the packet sizes that we expect to
130 /// receive.
131 /// TODO(b/239848326): Replace float with integer, in order to remove risk
132 /// of losing precision. Ie. change to `10` and perform
133 /// `FOO * MIN_FREE_BUFFER_PCT / 100`
134 const MIN_FREE_BUFFER_PCT: f64 = 0.1;
135 
136 // Number of packets to buffer in the tx processing channels.
137 const CHANNEL_SIZE: usize = 256;
138 
139 /// Virtio device for exposing entropy to the guest OS through virtio.
140 pub struct Vsock {
141     guest_cid: u64,
142     host_guid: Option<String>,
143     features: u64,
144     worker_thread: Option<WorkerThread<()>>,
145 }
146 
147 impl Vsock {
new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock>148     pub fn new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock> {
149         Ok(Vsock {
150             guest_cid,
151             host_guid,
152             features: base_features,
153             worker_thread: None,
154         })
155     }
156 
get_config(&self) -> virtio_vsock_config157     fn get_config(&self) -> virtio_vsock_config {
158         virtio_vsock_config {
159             guest_cid: Le64::from(self.guest_cid),
160         }
161     }
162 }
163 
164 impl VirtioDevice for Vsock {
keep_rds(&self) -> Vec<RawHandle>165     fn keep_rds(&self) -> Vec<RawHandle> {
166         Vec::new()
167     }
168 
read_config(&self, offset: u64, data: &mut [u8])169     fn read_config(&self, offset: u64, data: &mut [u8]) {
170         copy_config(data, 0, self.get_config().as_bytes(), offset);
171     }
172 
device_type(&self) -> DeviceType173     fn device_type(&self) -> DeviceType {
174         DeviceType::Vsock
175     }
176 
queue_max_sizes(&self) -> &[u16]177     fn queue_max_sizes(&self) -> &[u16] {
178         QUEUE_SIZES
179     }
180 
features(&self) -> u64181     fn features(&self) -> u64 {
182         self.features
183     }
184 
ack_features(&mut self, value: u64)185     fn ack_features(&mut self, value: u64) {
186         self.features &= value;
187     }
188 
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: Vec<(Queue, Event)>, ) -> anyhow::Result<()>189     fn activate(
190         &mut self,
191         mem: GuestMemory,
192         interrupt: Interrupt,
193         mut queues: Vec<(Queue, Event)>,
194     ) -> anyhow::Result<()> {
195         if queues.len() != QUEUE_SIZES.len() {
196             return Err(anyhow!(
197                 "Failed to activate vsock device. queues.len(): {} != {}",
198                 queues.len(),
199                 QUEUE_SIZES.len(),
200             ));
201         }
202 
203         let (rx_queue, rx_queue_evt) = queues.remove(0);
204         let (tx_queue, tx_queue_evt) = queues.remove(0);
205         let (event_queue, event_queue_evt) = queues.remove(0);
206 
207         let host_guid = self.host_guid.clone();
208         let guest_cid = self.guest_cid;
209         self.worker_thread = Some(WorkerThread::start(
210             "userspace_virtio_vsock",
211             move |kill_evt| {
212                 let mut worker = Worker::new(mem, interrupt, host_guid, guest_cid);
213                 let result = worker.run(
214                     rx_queue,
215                     tx_queue,
216                     event_queue,
217                     rx_queue_evt,
218                     tx_queue_evt,
219                     event_queue_evt,
220                     kill_evt,
221                 );
222 
223                 if let Err(e) = result {
224                     error!("userspace vsock worker thread exited with error: {:?}", e);
225                 }
226             },
227         ));
228 
229         Ok(())
230     }
231 }
232 
233 impl Suspendable for Vsock {}
234 
235 #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
236 pub struct PortPair {
237     host: u32,
238     guest: u32,
239 }
240 
241 impl Display for PortPair {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result242     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
243         write!(f, "(host port: {}, guest port: {})", self.host, self.guest)
244     }
245 }
246 
247 impl PortPair {
from_tx_header(header: &virtio_vsock_hdr) -> PortPair248     fn from_tx_header(header: &virtio_vsock_hdr) -> PortPair {
249         PortPair {
250             host: header.dst_port.to_native(),
251             guest: header.src_port.to_native(),
252         }
253     }
254 }
255 
256 // Note: variables herein do not have to be atomic because this struct is guarded
257 // by a Mutex.
258 struct VsockConnection {
259     // The guest port.
260     guest_port: Le32,
261 
262     // The actual named (asynchronous) pipe connection.
263     pipe: PipeConnection,
264     // The overlapped struct contains an event object for the named pipe.
265     // This lets us select() on the pipes by waiting on the events.
266     // This is for Reads only.
267     overlapped: Box<OverlappedWrapper>,
268     // Read buffer for the named pipes. These are needed because reads complete
269     // asynchronously.
270     buffer: Box<[u8; TEMP_READ_BUF_SIZE_BYTES]>,
271 
272     // Total free-running count of received bytes.
273     recv_cnt: usize,
274 
275     // Total free-running count of received bytes that the peer has been informed of.
276     prev_recv_cnt: usize,
277 
278     // Total auxiliary buffer space available to receive packets from the driver, not including
279     // the virtqueue itself. For us, this is tx buffer on the named pipe into which we drain packets
280     // for the connection. Note that if the named pipe has a grow on demand TX buffer, we use
281     // DEFAULT_BUF_ALLOC instead.
282     buf_alloc: usize,
283 
284     // Peer (driver) total free-running count of received bytes.
285     peer_recv_cnt: usize,
286 
287     // Peer (driver) total rx buffer allocated.
288     peer_buf_alloc: usize,
289 
290     // Total free-running count of transmitted bytes.
291     tx_cnt: usize,
292 
293     // State tracking for full buffer condition. Currently just used for logging. If the peer's
294     // buffer does not have space for a maximum-sized message (TEMP_READ_BUF_SIZE_BYTES), this
295     // gets set to `true`. Once there's enough space in the buffer, this gets unset.
296     is_buffer_full: bool,
297 }
298 
299 struct Worker {
300     mem: GuestMemory,
301     interrupt: Interrupt,
302     host_guid: Option<String>,
303     guest_cid: u64,
304     // Map of host port to a VsockConnection.
305     connections: Mutex<HashMap<PortPair, VsockConnection>>,
306     connection_event: Event,
307 }
308 
309 impl Worker {
new( mem: GuestMemory, interrupt: Interrupt, host_guid: Option<String>, guest_cid: u64, ) -> Worker310     fn new(
311         mem: GuestMemory,
312         interrupt: Interrupt,
313         host_guid: Option<String>,
314         guest_cid: u64,
315     ) -> Worker {
316         Worker {
317             mem,
318             interrupt,
319             host_guid,
320             guest_cid,
321             connections: Mutex::new(HashMap::new()),
322             connection_event: Event::new().unwrap(),
323         }
324     }
325 
process_rx_queue( &self, recv_queue: Arc<Mutex<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, ) -> Result<()>326     async fn process_rx_queue(
327         &self,
328         recv_queue: Arc<Mutex<Queue>>,
329         mut rx_queue_evt: EventAsync,
330         ex: &Executor,
331     ) -> Result<()> {
332         'connections_changed: loop {
333             // Run continuously until exit evt
334 
335             // TODO(b/200810561): Optimize this FuturesUnordered code.
336             // Set up the EventAsyncs to select on
337             let futures = FuturesUnordered::new();
338             // This needs to be its own scope since it holds a Mutex on `self.connections`.
339             {
340                 let connections = self.connections.read_lock().await;
341                 for (port, connection) in connections.iter() {
342                     let h_evt = connection
343                         .overlapped
344                         .get_h_event_ref()
345                         .ok_or_else(|| {
346                             error!("Missing h_event in OverlappedWrapper.");
347                             VsockError::BadOverlappedWrapper
348                         })
349                         .unwrap()
350                         .try_clone()
351                         .map_err(|e| {
352                             error!("Could not clone h_event.");
353                             VsockError::CloneDescriptor(e)
354                         })?;
355                     let evt_async = EventAsync::new(h_evt, ex).map_err(|e| {
356                         error!("Could not create EventAsync.");
357                         VsockError::CreateEventAsync(e)
358                     })?;
359                     futures.push(wait_event_and_return_port_pair(evt_async, *port));
360                 }
361             }
362             let connection_evt_clone = self.connection_event.try_clone().map_err(|e| {
363                 error!("Could not clone connection_event.");
364                 VsockError::CloneDescriptor(e)
365             })?;
366             let connection_evt_async = EventAsync::new(connection_evt_clone, ex).map_err(|e| {
367                 error!("Could not create EventAsync.");
368                 VsockError::CreateEventAsync(e)
369             })?;
370             futures.push(wait_event_and_return_port_pair(
371                 connection_evt_async,
372                 PortPair {
373                     host: CONNECTION_EVENT_PORT_NUM,
374                     guest: 0,
375                 },
376             ));
377 
378             // Wait to service the sockets. Note that for fairness, it is critical that we service
379             // all ready sockets in a single wakeup to avoid starvation. This is why ready_chunks
380             // is used, as it returns all currently *ready* futures from the stream.
381             //
382             // The expect here only triggers if the FuturesUnordered stream is exhausted. This never
383             // happens because it has at least one item, and we only request chunks once.
384             let futures_len = futures.len();
385             for port in futures
386                 .ready_chunks(futures_len)
387                 .next()
388                 .await
389                 .expect("failed to wait on vsock sockets")
390             {
391                 if port.host == CONNECTION_EVENT_PORT_NUM {
392                     // New connection event. Setup futures again.
393                     if let Err(e) = self.connection_event.reset() {
394                         error!("vsock: port: {}: could not reset connection_event.", port);
395                         return Err(VsockError::ResetEventObject(e));
396                     }
397                     continue 'connections_changed;
398                 }
399                 let mut connections = self.connections.lock().await;
400                 let connection = if let Some(conn) = connections.get_mut(&port) {
401                     conn
402                 } else {
403                     // We could have been scheduled to run the rx queue *before* the connection was
404                     // closed. In that case, we do nothing. The code which closed the connection
405                     // (e.g. in response to a message in the tx/rx queues) will handle notifying
406                     // the guest/host as required.
407                     continue 'connections_changed;
408                 };
409 
410                 // Check if the peer has enough space in their buffer to
411                 // receive the maximum amount of data that we could possibly
412                 // read from the host pipe. If not, we should continue to
413                 // process other sockets as each socket has an independent
414                 // buffer.
415                 let peer_free_buf_size =
416                     connection.peer_buf_alloc - (connection.tx_cnt - connection.peer_recv_cnt);
417                 if peer_free_buf_size < TEMP_READ_BUF_SIZE_BYTES {
418                     if !connection.is_buffer_full {
419                         warn!(
420                             "vsock: port {}: Peer has insufficient free buffer space ({} bytes available)",
421                             port, peer_free_buf_size
422                         );
423                         connection.is_buffer_full = true;
424                     }
425                     continue;
426                 } else if connection.is_buffer_full {
427                     connection.is_buffer_full = false;
428                 }
429 
430                 let pipe_connection = &mut connection.pipe;
431                 let overlapped = &mut connection.overlapped;
432                 let guest_port = connection.guest_port;
433                 let buffer = &mut connection.buffer;
434 
435                 match overlapped.get_h_event_ref() {
436                     Some(h_event) => {
437                         if let Err(e) = h_event.reset() {
438                             error!(
439                                 "vsock: port: {}: Could not reset event in OverlappedWrapper.",
440                                 port
441                             );
442                             return Err(VsockError::ResetEventObject(e));
443                         }
444                     }
445                     None => {
446                         error!(
447                             "vsock: port: {}: missing h_event in OverlappedWrapper.",
448                             port
449                         );
450                         return Err(VsockError::BadOverlappedWrapper);
451                     }
452                 }
453 
454                 let data_size = match pipe_connection.get_overlapped_result(&mut *overlapped) {
455                     Ok(size) => size as usize,
456                     Err(e) => {
457                         error!("vsock: port {}: Failed to read from pipe {}", port, e);
458                         // TODO(b/237278629): Close the connection if we fail to read.
459                         continue 'connections_changed;
460                     }
461                 };
462 
463                 let response_header = virtio_vsock_hdr {
464                     src_cid: 2.into(),              // Host CID
465                     dst_cid: self.guest_cid.into(), // Guest CID
466                     src_port: Le32::from(port.host),
467                     dst_port: guest_port,
468                     len: Le32::from(data_size as u32),
469                     r#type: TYPE_STREAM_SOCKET.into(),
470                     op: vsock_op::VIRTIO_VSOCK_OP_RW.into(),
471                     buf_alloc: Le32::from(connection.buf_alloc as u32),
472                     fwd_cnt: Le32::from(connection.recv_cnt as u32),
473                     ..Default::default()
474                 };
475 
476                 connection.prev_recv_cnt = connection.recv_cnt;
477 
478                 // We have to only write to the queue once, so we construct a new buffer
479                 // with the concatenated header and data.
480                 const HEADER_SIZE: usize = std::mem::size_of::<virtio_vsock_hdr>();
481                 let data_read = &buffer[..data_size];
482                 let mut header_and_data = vec![0u8; HEADER_SIZE + data_size];
483                 header_and_data[..HEADER_SIZE].copy_from_slice(response_header.as_bytes());
484                 header_and_data[HEADER_SIZE..].copy_from_slice(data_read);
485                 self.write_bytes_to_queue(
486                     &mut *recv_queue.lock().await,
487                     &mut rx_queue_evt,
488                     &header_and_data[..],
489                 )
490                 .await?;
491 
492                 connection.tx_cnt += data_size;
493 
494                 // Start reading again so we receive the message and
495                 // event signal immediately.
496 
497                 // Unsafe because the read could happen at any time
498                 // after this function is called. We ensure safety
499                 // by allocating the buffer and overlapped struct
500                 // on the heap.
501                 unsafe {
502                     match pipe_connection.read_overlapped(&mut buffer[..], &mut *overlapped) {
503                         Ok(()) => {}
504                         Err(e) => {
505                             error!("vsock: port {}: Failed to read from pipe {}", port, e);
506                         }
507                     }
508                 }
509             }
510         }
511     }
512 
process_tx_queue( &self, mut queue: Queue, mut queue_evt: EventAsync, mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>, ) -> Result<()>513     async fn process_tx_queue(
514         &self,
515         mut queue: Queue,
516         mut queue_evt: EventAsync,
517         mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>,
518     ) -> Result<()> {
519         loop {
520             // Run continuously until exit evt
521             let (mut reader, index) = self.get_next(&mut queue, &mut queue_evt).await?;
522             while reader.available_bytes() >= std::mem::size_of::<virtio_vsock_hdr>() {
523                 let header = match reader.read_obj::<virtio_vsock_hdr>() {
524                     Ok(hdr) => hdr,
525                     Err(e) => {
526                         error!("vsock: Error while reading header: {}", e);
527                         break;
528                     }
529                 };
530 
531                 let len = header.len.to_native() as usize;
532                 if reader.available_bytes() < len {
533                     error!("vsock: Error reading packet data");
534                     break;
535                 }
536 
537                 let mut data = vec![0_u8; len];
538                 if len > 0 {
539                     if let Err(e) = reader.read_exact(&mut data) {
540                         error!("vosck: failed to read data from tx packet: {:?}", e);
541                     }
542                 }
543 
544                 if let Err(e) = process_packets_queue.send((header, data)).await {
545                     error!(
546                         "Error while sending packet to queue, dropping packet: {:?}",
547                         e
548                     )
549                 };
550             }
551 
552             queue.add_used(&self.mem, index, 0);
553             queue.trigger_interrupt(&self.mem, &self.interrupt);
554         }
555     }
556 
calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize557     fn calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize {
558         match pipe.get_info(/* is_server_connection= */ false) {
559             Ok(info) => {
560                 if info.outgoing_buffer_size > 0 {
561                     info.outgoing_buffer_size as usize
562                 } else {
563                     info!(
564                         "vsock: port {}: using default extra rx buffer size \
565                                             (named pipe does not have an explicit buffer size)",
566                         port
567                     );
568 
569                     // A zero buffer size implies that the buffer grows as
570                     // needed. We set our own cap here for flow control
571                     // purposes.
572                     DEFAULT_BUF_ALLOC_BYTES
573                 }
574             }
575             Err(e) => {
576                 error!(
577                     "vsock: port {}: failed to get named pipe info, using default \
578                                         buf size. Error: {}",
579                     port, e
580                 );
581                 DEFAULT_BUF_ALLOC_BYTES
582             }
583         }
584     }
585 
586     /// Processes a connection request and returns whether to return a response (true), or reset
587     /// (false).
handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool588     async fn handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool {
589         let port = PortPair::from_tx_header(&header);
590         info!("vsock: Received connection request for port {}", port);
591 
592         if self.connections.read_lock().await.contains_key(&port) {
593             // Connection exists, nothing for us to do.
594             warn!(
595                 "vsock: accepting connection request on already connected port {}",
596                 port
597             );
598             return true;
599         }
600 
601         if self.host_guid.is_none() {
602             error!(
603                 "vsock: Cannot accept guest-initiated connections \
604                         without host-guid, rejecting connection"
605             );
606             return false;
607         }
608 
609         // We have a new connection to establish.
610         let mut overlapped_wrapper =
611             Box::new(OverlappedWrapper::new(/* include_event= */ true).unwrap());
612         let pipe_result = named_pipes::create_client_pipe(
613             get_pipe_name(
614                 self.host_guid.as_ref().unwrap(),
615                 header.dst_port.to_native(),
616             )
617             .as_str(),
618             &FramingMode::Byte,
619             &BlockingMode::Wait,
620             true, /* overlapped */
621         );
622 
623         match pipe_result {
624             Ok(mut pipe_connection) => {
625                 let mut buffer = Box::new([0u8; TEMP_READ_BUF_SIZE_BYTES]);
626 
627                 // Unsafe because the read could happen at any time
628                 // after this function is called. We ensure safety
629                 // by allocating the buffer and overlapped struct
630                 // on the heap.
631                 unsafe {
632                     match pipe_connection.read_overlapped(&mut buffer[..], &mut overlapped_wrapper)
633                     {
634                         Ok(()) => {}
635                         Err(e) => {
636                             error!("vsock: port {}: Failed to read from pipe {}", port, e);
637                             return false;
638                         }
639                     }
640                 }
641 
642                 let buf_alloc = Self::calculate_buf_alloc_from_pipe(&pipe_connection, port);
643                 let connection = VsockConnection {
644                     guest_port: header.src_port,
645                     pipe: pipe_connection,
646                     overlapped: overlapped_wrapper,
647                     peer_buf_alloc: header.buf_alloc.to_native() as usize,
648                     peer_recv_cnt: header.fwd_cnt.to_native() as usize,
649                     buf_alloc,
650                     buffer,
651                     // The connection has just been made, so we haven't received
652                     // anything yet.
653                     recv_cnt: 0_usize,
654                     prev_recv_cnt: 0_usize,
655                     tx_cnt: 0_usize,
656                     is_buffer_full: false,
657                 };
658                 self.connections.lock().await.insert(port, connection);
659                 self.connection_event.signal().unwrap_or_else(|_| {
660                     panic!(
661                         "Failed to signal new connection event for vsock port {}.",
662                         port
663                     )
664                 });
665                 true
666             }
667             Err(e) => {
668                 info!(
669                     "vsock: No waiting pipe connection on port {}, \
670                                 not connecting (err: {:?})",
671                     port, e
672                 );
673                 false
674             }
675         }
676     }
677 
handle_vsock_guest_data( &self, header: virtio_vsock_hdr, data: &[u8], ex: &Executor, ) -> Result<()>678     async fn handle_vsock_guest_data(
679         &self,
680         header: virtio_vsock_hdr,
681         data: &[u8],
682         ex: &Executor,
683     ) -> Result<()> {
684         let port = PortPair::from_tx_header(&header);
685         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
686         {
687             let mut connections = self.connections.lock().await;
688             if let Some(connection) = connections.get_mut(&port) {
689                 // Update peer buffer/recv counters
690                 connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
691                 connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
692 
693                 let pipe = &mut connection.pipe;
694                 // We have to provide a OVERLAPPED struct to write to the pipe.
695                 //
696                 // SAFETY: safe because data & overlapped_wrapper live until the
697                 // overlapped operation completes (we wait on completion below).
698                 if let Err(e) = unsafe { pipe.write_overlapped(data, &mut overlapped_wrapper) } {
699                     return Err(VsockError::WriteFailed(port, e));
700                 }
701             } else {
702                 error!(
703                     "vsock: Guest attempted to send data packet over unconnected \
704                             port ({}), dropping packet",
705                     port
706                 );
707                 return Ok(());
708             }
709         }
710         if let Some(write_completed_event) = overlapped_wrapper.get_h_event_ref() {
711             // Don't block the executor while the write completes. This time should
712             // always be negligible, but will sometimes be non-zero in cases where
713             // traffic is high on the NamedPipe, especially a duplex pipe.
714             if let Ok(cloned_event) = write_completed_event.try_clone() {
715                 if let Ok(async_event) = EventAsync::new_without_reset(cloned_event, ex) {
716                     let _ = async_event.next_val().await;
717                 } else {
718                     error!(
719                         "vsock: port {}: Failed to convert write event to async",
720                         port
721                     );
722                 }
723             } else {
724                 error!(
725                     "vsock: port {}: Failed to clone write completion event",
726                     port
727                 );
728             }
729         } else {
730             error!(
731                 "vsock: port: {}: Failed to get overlapped event for write",
732                 port
733             );
734         }
735 
736         let mut connections = self.connections.lock().await;
737         if let Some(connection) = connections.get_mut(&port) {
738             let pipe = &mut connection.pipe;
739             match pipe.get_overlapped_result(&mut overlapped_wrapper) {
740                 Ok(len) => {
741                     // We've received bytes from the guest, account for them in our
742                     // received bytes counter.
743                     connection.recv_cnt += len as usize;
744 
745                     if len != data.len() as u32 {
746                         return Err(VsockError::WriteFailed(
747                             port,
748                             std::io::Error::new(
749                                 std::io::ErrorKind::Other,
750                                 format!(
751                                     "failed to write correct number of bytes:
752                                         (expected: {}, wrote: {})",
753                                     data.len(),
754                                     len
755                                 ),
756                             ),
757                         ));
758                     }
759                 }
760                 Err(e) => {
761                     return Err(VsockError::WriteFailed(port, e));
762                 }
763             }
764         } else {
765             error!(
766                 "vsock: Guest attempted to send data packet over unconnected \
767                         port ({}), dropping packet",
768                 port
769             );
770         }
771         Ok(())
772     }
773 
process_tx_packets( &self, send_queue: &Arc<Mutex<Queue>>, rx_queue_evt: Event, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, ex: &Executor, )774     async fn process_tx_packets(
775         &self,
776         send_queue: &Arc<Mutex<Queue>>,
777         rx_queue_evt: Event,
778         mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
779         ex: &Executor,
780     ) {
781         let mut packet_queues = HashMap::new();
782         let mut futures = FuturesUnordered::new();
783         // Push a pending future that will never complete into FuturesUnordered.
784         // This will keep us from spinning on spurious notifications when we
785         // don't have any open connections.
786         futures.push(std::future::pending::<PortPair>().left_future());
787         loop {
788             let (new_packet, connection) = select2(packet_recv_queue.next(), futures.next()).await;
789             match connection {
790                 SelectResult::Finished(Some(port)) => {
791                     packet_queues.remove(&port);
792                 }
793                 SelectResult::Finished(_) => {
794                     // This is only triggered when FuturesUnordered completes
795                     // all pending futures. Right now, this can never happen, as
796                     // we have a pending future that we push that will never
797                     // complete.
798                 }
799                 SelectResult::Pending(_) => {
800                     // Nothing to do.
801                 }
802             };
803             match new_packet {
804                 SelectResult::Finished(Some(packet)) => {
805                     let port = PortPair::from_tx_header(&packet.0);
806                     let queue = packet_queues.entry(port).or_insert_with(|| {
807                         let (send, recv) = mpsc::channel(CHANNEL_SIZE);
808                         let event_async = EventAsync::new(
809                             rx_queue_evt.try_clone().expect("Failed to clone event"),
810                             ex,
811                         )
812                         .expect("Failed to set up the rx queue event");
813                         futures.push(
814                             self.process_tx_packets_for_port(
815                                 port,
816                                 recv,
817                                 send_queue,
818                                 event_async,
819                                 ex,
820                             )
821                             .right_future(),
822                         );
823                         send
824                     });
825                     // Try to send the packet. Do not block other ports if the queue is full.
826                     if let Err(e) = queue.try_send(packet) {
827                         error!("Error sending packet to queue, dropping packet: {:?}", e)
828                     }
829                 }
830                 SelectResult::Finished(_) => {
831                     // Triggers when the channel is closed; no more packets coming.
832                     packet_recv_queue.close();
833                     return;
834                 }
835                 SelectResult::Pending(_) => {
836                     // Nothing to do.
837                 }
838             }
839         }
840     }
841 
process_tx_packets_for_port( &self, port: PortPair, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, send_queue: &Arc<Mutex<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, ) -> PortPair842     async fn process_tx_packets_for_port(
843         &self,
844         port: PortPair,
845         mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
846         send_queue: &Arc<Mutex<Queue>>,
847         mut rx_queue_evt: EventAsync,
848         ex: &Executor,
849     ) -> PortPair {
850         while let Some((header, data)) = packet_recv_queue.next().await {
851             if !self
852                 .handle_tx_packet(header, &data, send_queue, &mut rx_queue_evt, ex)
853                 .await
854             {
855                 packet_recv_queue.close();
856                 if let Ok(Some(_)) = packet_recv_queue.try_next() {
857                     warn!("vsock: closing port {} with unprocessed packets", port);
858                 } else {
859                     info!("vsock: closing port {} cleanly", port)
860                 }
861                 break;
862             }
863         }
864         port
865     }
866 
handle_tx_packet( &self, header: virtio_vsock_hdr, data: &[u8], send_queue: &Arc<Mutex<Queue>>, rx_queue_evt: &mut EventAsync, ex: &Executor, ) -> bool867     async fn handle_tx_packet(
868         &self,
869         header: virtio_vsock_hdr,
870         data: &[u8],
871         send_queue: &Arc<Mutex<Queue>>,
872         rx_queue_evt: &mut EventAsync,
873         ex: &Executor,
874     ) -> bool {
875         let mut is_open = true;
876         match header.op.to_native() {
877             vsock_op::VIRTIO_VSOCK_OP_INVALID => {
878                 error!("vsock: Invalid Operation requested, dropping packet");
879             }
880             vsock_op::VIRTIO_VSOCK_OP_REQUEST => {
881                 let (resp_op, buf_alloc, fwd_cnt) =
882                     if self.handle_vsock_connection_request(header).await {
883                         let connections = self.connections.read_lock().await;
884                         let port = PortPair::from_tx_header(&header);
885 
886                         connections.get(&port).map_or_else(
887                             || {
888                                 warn!("vsock: port: {} connection closed during connect", port);
889                                 is_open = false;
890                                 (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
891                             },
892                             |conn| {
893                                 (
894                                     vsock_op::VIRTIO_VSOCK_OP_RESPONSE,
895                                     conn.buf_alloc as u32,
896                                     conn.recv_cnt as u32,
897                                 )
898                             },
899                         )
900                     } else {
901                         is_open = false;
902                         (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
903                     };
904 
905                 let response_header = virtio_vsock_hdr {
906                     src_cid: { header.dst_cid },
907                     dst_cid: { header.src_cid },
908                     src_port: { header.dst_port },
909                     dst_port: { header.src_port },
910                     len: 0.into(),
911                     r#type: TYPE_STREAM_SOCKET.into(),
912                     op: resp_op.into(),
913                     buf_alloc: Le32::from(buf_alloc),
914                     fwd_cnt: Le32::from(fwd_cnt),
915                     ..Default::default()
916                 };
917                 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
918                 // bytes.
919                 self.write_bytes_to_queue(
920                     &mut *send_queue.lock().await,
921                     rx_queue_evt,
922                     response_header.as_bytes(),
923                 )
924                 .await
925                 .expect("vsock: failed to write to queue");
926             }
927             vsock_op::VIRTIO_VSOCK_OP_RESPONSE => {
928                 // TODO(b/237811512): Implement this for host-initiated connections
929             }
930             vsock_op::VIRTIO_VSOCK_OP_RST => {
931                 // TODO(b/237811512): Implement this for host-initiated connections
932             }
933             vsock_op::VIRTIO_VSOCK_OP_SHUTDOWN => {
934                 // While the header provides flags to specify tx/rx-specific shutdown,
935                 // we only support full shutdown.
936                 // TODO(b/237811512): Provide an optimal way to notify host of shutdowns
937                 // while still maintaining easy reconnections.
938                 let port = PortPair::from_tx_header(&header);
939                 let mut connections = self.connections.lock().await;
940                 if connections.remove(&port).is_some() {
941                     let mut response = virtio_vsock_hdr {
942                         src_cid: { header.dst_cid },
943                         dst_cid: { header.src_cid },
944                         src_port: { header.dst_port },
945                         dst_port: { header.src_port },
946                         len: 0.into(),
947                         r#type: TYPE_STREAM_SOCKET.into(),
948                         op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
949                         // There is no buffer on a closed connection
950                         buf_alloc: 0.into(),
951                         // There is no fwd_cnt anymore on a closed connection
952                         fwd_cnt: 0.into(),
953                         ..Default::default()
954                     };
955                     // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to bytes
956                     self.write_bytes_to_queue(
957                         &mut *send_queue.lock().await,
958                         rx_queue_evt,
959                         response.as_bytes_mut(),
960                     )
961                     .await
962                     .expect("vsock: failed to write to queue");
963                     self.connection_event
964                         .signal()
965                         .expect("vsock: failed to write to event");
966                 } else {
967                     error!("vsock: Attempted to close unopened port: {}", port);
968                 }
969                 is_open = false;
970             }
971             vsock_op::VIRTIO_VSOCK_OP_RW => {
972                 match self.handle_vsock_guest_data(header, data, ex).await {
973                     Ok(()) => {
974                         if self
975                             .check_free_buffer_threshold(header)
976                             .await
977                             .unwrap_or(false)
978                         {
979                             // Send a credit update if we're below the minimum free
980                             // buffer size. We skip this if the connection is closed,
981                             // which could've happened if we were closed on the other
982                             // end.
983                             info!("vsock: Buffer below threshold; sending credit update.");
984                             self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
985                                 .await;
986                         }
987                     }
988                     Err(e) => {
989                         error!("vsock: resetting connection: {}", e);
990                         self.send_vsock_reset(send_queue, rx_queue_evt, header)
991                             .await;
992                         is_open = false;
993                     }
994                 }
995             }
996             // An update from our peer with their buffer state, which they are sending
997             // (probably) due to a a credit request *we* made.
998             vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE => {
999                 let port = PortPair::from_tx_header(&header);
1000                 let mut connections = self.connections.lock().await;
1001                 if let Some(connection) = connections.get_mut(&port) {
1002                     connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
1003                     connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
1004                 } else {
1005                     error!("vsock: got credit update on unknown port {}", port);
1006                     is_open = false;
1007                 }
1008             }
1009             // A request from our peer to get *our* buffer state. We reply to the RX queue.
1010             vsock_op::VIRTIO_VSOCK_OP_CREDIT_REQUEST => {
1011                 info!(
1012                     "vsock: Got credit request from peer {}; sending credit update.",
1013                     PortPair::from_tx_header(&header)
1014                 );
1015                 self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1016                     .await;
1017             }
1018             _ => {
1019                 error!("vsock: Unknown operation requested, dropping packet");
1020             }
1021         }
1022         is_open
1023     }
1024 
1025     // Checks if how much free buffer our peer thinks that *we* have available
1026     // is below our threshold percentage. If the connection is closed, returns `None`.
check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool>1027     async fn check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool> {
1028         let mut connections = self.connections.lock().await;
1029         let port = PortPair::from_tx_header(&header);
1030         connections.get_mut(&port).map(|connection| {
1031             let threshold: usize = (MIN_FREE_BUFFER_PCT * connection.buf_alloc as f64) as usize;
1032             connection.buf_alloc - (connection.recv_cnt - connection.prev_recv_cnt) < threshold
1033         })
1034     }
1035 
send_vsock_credit_update( &self, send_queue: &Arc<Mutex<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1036     async fn send_vsock_credit_update(
1037         &self,
1038         send_queue: &Arc<Mutex<Queue>>,
1039         rx_queue_evt: &mut EventAsync,
1040         header: virtio_vsock_hdr,
1041     ) {
1042         let mut connections = self.connections.lock().await;
1043         let port = PortPair::from_tx_header(&header);
1044 
1045         if let Some(connection) = connections.get_mut(&port) {
1046             let mut response = virtio_vsock_hdr {
1047                 src_cid: { header.dst_cid },
1048                 dst_cid: { header.src_cid },
1049                 src_port: { header.dst_port },
1050                 dst_port: { header.src_port },
1051                 len: 0.into(),
1052                 r#type: TYPE_STREAM_SOCKET.into(),
1053                 op: vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE.into(),
1054                 buf_alloc: Le32::from(connection.buf_alloc as u32),
1055                 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1056                 ..Default::default()
1057             };
1058 
1059             connection.prev_recv_cnt = connection.recv_cnt;
1060 
1061             // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1062             // to bytes
1063             self.write_bytes_to_queue(
1064                 &mut *send_queue.lock().await,
1065                 rx_queue_evt,
1066                 response.as_bytes_mut(),
1067             )
1068             .await
1069             .expect("vsock: failed to write to queue");
1070         } else {
1071             error!(
1072                 "vsock: error sending credit update on unknown port {}",
1073                 port
1074             );
1075         }
1076     }
1077 
send_vsock_reset( &self, send_queue: &Arc<Mutex<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1078     async fn send_vsock_reset(
1079         &self,
1080         send_queue: &Arc<Mutex<Queue>>,
1081         rx_queue_evt: &mut EventAsync,
1082         header: virtio_vsock_hdr,
1083     ) {
1084         let mut connections = self.connections.lock().await;
1085         let port = PortPair::from_tx_header(&header);
1086         if let Some(connection) = connections.remove(&port) {
1087             let mut response = virtio_vsock_hdr {
1088                 src_cid: { header.dst_cid },
1089                 dst_cid: { header.src_cid },
1090                 src_port: { header.dst_port },
1091                 dst_port: { header.src_port },
1092                 len: 0.into(),
1093                 r#type: TYPE_STREAM_SOCKET.into(),
1094                 op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1095                 buf_alloc: Le32::from(connection.buf_alloc as u32),
1096                 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1097                 ..Default::default()
1098             };
1099 
1100             // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1101             // to bytes
1102             self.write_bytes_to_queue(
1103                 &mut *send_queue.lock().await,
1104                 rx_queue_evt,
1105                 response.as_bytes_mut(),
1106             )
1107             .await
1108             .expect("failed to write to queue");
1109         } else {
1110             error!("vsock: error closing unknown port {}", port);
1111         }
1112     }
1113 
1114     // Get a `Writer`, or wait if there's no descriptors currently available on
1115     // the queue.
get_next_writer( &self, queue: &mut Queue, queue_evt: &mut EventAsync, ) -> Result<(Writer, u16)>1116     async fn get_next_writer(
1117         &self,
1118         queue: &mut Queue,
1119         queue_evt: &mut EventAsync,
1120     ) -> Result<(Writer, u16)> {
1121         let avail_desc = match queue.next_async(&self.mem, queue_evt).await {
1122             Ok(d) => d,
1123             Err(e) => {
1124                 error!("vsock: Failed to read descriptor {}", e);
1125                 return Err(VsockError::AwaitQueue(e));
1126             }
1127         };
1128         let index = avail_desc.index;
1129         Writer::new(self.mem.clone(), avail_desc)
1130             .map_err(|e| {
1131                 error!("vsock: failed to create Writer: {}", e);
1132                 VsockError::CreateWriter(e)
1133             })
1134             .map(|r| (r, index))
1135     }
1136 
write_bytes_to_queue( &self, queue: &mut Queue, queue_evt: &mut EventAsync, bytes: &[u8], ) -> Result<()>1137     async fn write_bytes_to_queue(
1138         &self,
1139         queue: &mut Queue,
1140         queue_evt: &mut EventAsync,
1141         bytes: &[u8],
1142     ) -> Result<()> {
1143         let (mut writer, desc_index) = self.get_next_writer(queue, queue_evt).await?;
1144         let res = writer.write_all(bytes);
1145 
1146         if let Err(e) = res {
1147             error!(
1148                 "vsock: failed to write {} bytes to queue, err: {:?}",
1149                 bytes.len(),
1150                 e
1151             );
1152             return Err(VsockError::WriteQueue(e));
1153         }
1154 
1155         let bytes_written = writer.bytes_written() as u32;
1156         if bytes_written > 0 {
1157             queue.add_used(&self.mem, desc_index, bytes_written);
1158             queue.trigger_interrupt(&self.mem, &self.interrupt);
1159             Ok(())
1160         } else {
1161             error!("vsock: Failed to write bytes to queue");
1162             Err(VsockError::WriteQueue(std::io::Error::new(
1163                 std::io::ErrorKind::Other,
1164                 "failed to write bytes to queue",
1165             )))
1166         }
1167     }
1168 
process_event_queue(&self, mut queue: Queue, mut queue_evt: EventAsync) -> Result<()>1169     async fn process_event_queue(&self, mut queue: Queue, mut queue_evt: EventAsync) -> Result<()> {
1170         loop {
1171             // Log but don't act on events. They are reserved exclusively for guest migration events
1172             // resulting in CID resets, which we don't support.
1173             let (mut reader, _index) = self.get_next(&mut queue, &mut queue_evt).await?;
1174             for event in reader.iter::<virtio_vsock_event>() {
1175                 if event.is_ok() {
1176                     error!(
1177                         "Received event with id {:?}, this should not happen, and will not be handled",
1178                         event.unwrap().id.to_native()
1179                     );
1180                 }
1181             }
1182         }
1183     }
1184 
get_next( &self, queue: &mut Queue, queue_evt: &mut EventAsync, ) -> Result<(Reader, u16)>1185     async fn get_next(
1186         &self,
1187         queue: &mut Queue,
1188         queue_evt: &mut EventAsync,
1189     ) -> Result<(Reader, u16)> {
1190         let avail_desc = match queue.next_async(&self.mem, queue_evt).await {
1191             Ok(d) => d,
1192             Err(e) => {
1193                 error!("vsock: Failed to read descriptor {}", e);
1194                 return Err(VsockError::AwaitQueue(e));
1195             }
1196         };
1197         let index = avail_desc.index;
1198         Reader::new(self.mem.clone(), avail_desc)
1199             .map_err(|e| {
1200                 error!("vsock: failed to create Reader: {}", e);
1201                 VsockError::CreateReader(e)
1202             })
1203             .map(|r| (r, index))
1204     }
1205 
run( &mut self, rx_queue: Queue, tx_queue: Queue, event_queue: Queue, rx_queue_evt: Event, tx_queue_evt: Event, event_queue_evt: Event, kill_evt: Event, ) -> Result<()>1206     fn run(
1207         &mut self,
1208         rx_queue: Queue,
1209         tx_queue: Queue,
1210         event_queue: Queue,
1211         rx_queue_evt: Event,
1212         tx_queue_evt: Event,
1213         event_queue_evt: Event,
1214         kill_evt: Event,
1215     ) -> Result<()> {
1216         let ex = Executor::new().unwrap();
1217 
1218         // Note that this mutex won't ever be contended because the HandleExecutor is single
1219         // threaded. We need the mutex for compile time correctness, but technically it is not
1220         // actually providing mandatory locking, at least not at the moment. If we later use a
1221         // multi-threaded executor, then this lock will be important.
1222         let rx_queue_arc = Arc::new(Mutex::new(rx_queue));
1223 
1224         let rx_evt_async = EventAsync::new(
1225             rx_queue_evt
1226                 .try_clone()
1227                 .map_err(VsockError::CloneDescriptor)?,
1228             &ex,
1229         )
1230         .expect("Failed to set up the rx queue event");
1231         let rx_handler = self.process_rx_queue(rx_queue_arc.clone(), rx_evt_async, &ex);
1232         pin_mut!(rx_handler);
1233 
1234         let (send, recv) = mpsc::channel(CHANNEL_SIZE);
1235 
1236         let tx_evt_async =
1237             EventAsync::new(tx_queue_evt, &ex).expect("Failed to set up the tx queue event");
1238         let tx_handler = self.process_tx_queue(tx_queue, tx_evt_async, send);
1239         pin_mut!(tx_handler);
1240 
1241         let packet_handler = self.process_tx_packets(&rx_queue_arc, rx_queue_evt, recv, &ex);
1242         pin_mut!(packet_handler);
1243 
1244         let event_evt_async =
1245             EventAsync::new(event_queue_evt, &ex).expect("Failed to set up the event queue event");
1246         let event_handler = self.process_event_queue(event_queue, event_evt_async);
1247         pin_mut!(event_handler);
1248 
1249         // Process any requests to resample the irq value.
1250         let resample_handler = async_utils::handle_irq_resample(&ex, self.interrupt.clone());
1251         pin_mut!(resample_handler);
1252 
1253         let kill_evt = EventAsync::new(kill_evt, &ex).expect("Failed to set up the kill event");
1254         let kill_handler = kill_evt.next_val();
1255         pin_mut!(kill_handler);
1256 
1257         if let Err(e) = ex.run_until(select6(
1258             rx_handler,
1259             tx_handler,
1260             packet_handler,
1261             event_handler,
1262             resample_handler,
1263             kill_handler,
1264         )) {
1265             error!("Error happened when running executor: {}", e);
1266             return Err(VsockError::RunExecutor(e));
1267         }
1268         Ok(())
1269     }
1270 }
1271 
get_pipe_name(guid: &str, pipe: u32) -> String1272 fn get_pipe_name(guid: &str, pipe: u32) -> String {
1273     format!("\\\\.\\pipe\\{}\\vsock-{}", guid, pipe)
1274 }
1275 
wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair1276 async fn wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair {
1277     // This doesn't reset the event since we have to call GetOverlappedResult
1278     // on the OVERLAPPED struct first before resetting it.
1279     let _ = evt.get_io_source_ref().wait_for_handle().await;
1280     pair
1281 }
1282